事务补偿控制

This commit is contained in:
ChaosCoffee 2018-06-27 18:57:31 +08:00
parent 3eac489fa8
commit 06285060bc
5 changed files with 68 additions and 10 deletions

View File

@ -22,6 +22,7 @@ import com.raincat.common.bean.TransactionInvocation;
import com.raincat.common.bean.TransactionRecover;
import com.raincat.common.config.TxConfig;
import com.raincat.common.constant.CommonConstant;
import com.raincat.common.enums.CompensationOperationTypeEnum;
import com.raincat.common.enums.TransactionStatusEnum;
import com.raincat.common.holder.LogUtil;
import com.raincat.common.netty.bean.TxTransactionGroup;
@ -53,6 +54,7 @@ import java.util.concurrent.TimeUnit;
/**
* TxCompensationServiceImpl.
*
* @author xiaoyu
*/
@Service
@ -70,6 +72,7 @@ public class TxCompensationServiceImpl implements TxCompensationService {
private ScheduledExecutorService scheduledExecutorService;
@Autowired(required = false)
public TxCompensationServiceImpl(final RpcApplicationService rpcApplicationService,
final TxManagerMessageService txManagerMessageService) {
@ -157,11 +160,16 @@ public class TxCompensationServiceImpl implements TxCompensationService {
for (TransactionRecover transactionRecover : transactionRecovers) {
if (transactionRecover.getRetriedCount() > txConfig.getRetryMax()) {
LogUtil.error(LOGGER, "此事务超过了最大重试次数,不再进行重试:{}", () -> transactionRecover.getTransactionInvocation().getTargetClazz().getName()
+ ":" + transactionRecover.getTransactionInvocation().getMethod()
+ "事务组id" + transactionRecover.getGroupId());
+ ":" + transactionRecover.getTransactionInvocation().getMethod()
+ "事务组id" + transactionRecover.getGroupId());
continue;
}
try {
//判断任务是否执行完成
if (!CommonConstant.TX_TRANSACTION_COMPLETE_FLAG_OK.equals(transactionRecover.getCompleteFlag())) {
continue;
}
transactionRecover.setOperation(CompensationOperationTypeEnum.COMPENSATION.getCode());
final int update = transactionRecoverRepository.update(transactionRecover);
if (update > 0) {
final TxTransactionGroup byTxGroupId = txManagerMessageService

View File

@ -20,7 +20,9 @@ package com.raincat.core.compensation.manager;
import com.raincat.common.bean.TransactionInvocation;
import com.raincat.common.bean.TransactionRecover;
import com.raincat.common.constant.CommonConstant;
import com.raincat.common.enums.CompensationActionEnum;
import com.raincat.common.enums.CompensationOperationTypeEnum;
import com.raincat.common.enums.TransactionStatusEnum;
import com.raincat.core.disruptor.publisher.TxTransactionEventPublisher;
import org.springframework.beans.factory.annotation.Autowired;
@ -30,6 +32,7 @@ import java.util.Date;
/**
* TxCompensationManager.
*
* @author xiaoyu
*/
@Service
@ -44,9 +47,10 @@ public class TxCompensationManager {
/**
* save TransactionRecover data.
*
* @param invocation {@linkplain TransactionInvocation}
* @param groupId this is transaction groupId
* @param taskId taskId
* @param groupId this is transaction groupId
* @param taskId taskId
* @return groupId.
*/
public String saveTxCompensation(final TransactionInvocation invocation, final String groupId, final String taskId) {
@ -58,12 +62,14 @@ public class TxCompensationManager {
recover.setGroupId(groupId);
recover.setTaskId(taskId);
recover.setCreateTime(new Date());
recover.setCompleteFlag(CommonConstant.TX_TRANSACTION_COMPLETE_FLAG_BAD);
txTransactionEventPublisher.publishEvent(recover, CompensationActionEnum.SAVE.getCode());
return recover.getId();
}
/**
* delete TransactionRecover.
*
* @param id transaction groupId.
*/
public void removeTxCompensation(final String id) {
@ -72,4 +78,16 @@ public class TxCompensationManager {
txTransactionEventPublisher.publishEvent(recover, CompensationActionEnum.DELETE.getCode());
}
/**
* update TransactionRecover.
*
* @param id
*/
public void updateTxCompensation(final String id) {
TransactionRecover recover = new TransactionRecover();
recover.setId(id);
recover.setCompleteFlag(CommonConstant.TX_TRANSACTION_COMPLETE_FLAG_OK);
recover.setOperation(CompensationOperationTypeEnum.TASK_EXECUTE.getCode());
txTransactionEventPublisher.publishEvent(recover, CompensationActionEnum.UPDATE.getCode());
}
}

View File

@ -19,6 +19,7 @@
package com.raincat.core.service.handler;
import com.raincat.common.bean.TxTransactionInfo;
import com.raincat.common.constant.CommonConstant;
import com.raincat.common.enums.NettyResultEnum;
import com.raincat.common.enums.TransactionRoleEnum;
import com.raincat.common.enums.TransactionStatusEnum;
@ -48,6 +49,7 @@ import java.util.concurrent.ScheduledFuture;
/**
* this is tx transaction actor.
*
* @author xiaoyu
*/
@Component
@ -85,6 +87,9 @@ public class ActorTxTransactionHandler implements TxTransactionHandler {
.execute(() -> {
TxTransactionLocal.getInstance().setTxGroupId(info.getTxGroupId());
final String waitKey = IdWorkerUtils.getInstance().createTaskKey();
String commitStatus = CommonConstant.TX_TRANSACTION_COMMIT_STATUS_BAD;
final BlockTask waitTask = BlockTaskHelper.getInstance().getTask(waitKey);
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
@ -154,8 +159,12 @@ public class ActorTxTransactionHandler implements TxTransactionHandler {
if (TransactionStatusEnum.COMMIT.getCode() == status) {
//提交事务
platformTransactionManager.commit(transactionStatus);
commitStatus = CommonConstant.TX_TRANSACTION_COMMIT_STATUS_OK;
//通知tm 自身事务提交
asyncComplete(info.getTxGroupId(), waitKey, TransactionStatusEnum.COMMIT.getCode(), res);
//删除补偿信息
txCompensationManager.removeTxCompensation(compensateId);
} else {
//回滚当前事务
platformTransactionManager.rollback(transactionStatus);
@ -167,8 +176,10 @@ public class ActorTxTransactionHandler implements TxTransactionHandler {
throwable.printStackTrace();
} finally {
BlockTaskHelper.getInstance().removeByKey(waitKey);
//删除本地补偿信息
txCompensationManager.removeTxCompensation(compensateId);
// 更新补偿信息
if (CommonConstant.TX_TRANSACTION_COMMIT_STATUS_BAD.equals(commitStatus)) {
txCompensationManager.updateTxCompensation(compensateId);
}
}
} else {
platformTransactionManager.rollback(transactionStatus);
@ -186,9 +197,7 @@ public class ActorTxTransactionHandler implements TxTransactionHandler {
throw throwable;
});
task.signal();
}
});
task.await();

View File

@ -19,6 +19,7 @@
package com.raincat.core.service.handler;
import com.raincat.common.bean.TxTransactionInfo;
import com.raincat.common.constant.CommonConstant;
import com.raincat.common.enums.PropagationEnum;
import com.raincat.common.enums.TransactionRoleEnum;
import com.raincat.common.enums.TransactionStatusEnum;
@ -48,6 +49,7 @@ import java.util.concurrent.CompletableFuture;
/**
* this is tx transaction starter .
*
* @author xiaoyu
*/
@Component
@ -81,6 +83,8 @@ public class StartTxTransactionHandler implements TxTransactionHandler {
final String waitKey = IdWorkerUtils.getInstance().createTaskKey();
String commitStatus = CommonConstant.TX_TRANSACTION_COMMIT_STATUS_BAD;
//创建事务组信息
final Boolean success = txManagerMessageService.saveTxTransactionGroup(newTxTransactionGroup(groupId, waitKey, info));
if (success) {
@ -117,6 +121,10 @@ public class StartTxTransactionHandler implements TxTransactionHandler {
//我觉得到这一步了应该是切面走完然后需要提交了此时应该都是进行提交的
//提交事务
platformTransactionManager.commit(transactionStatus);
commitStatus = CommonConstant.TX_TRANSACTION_COMMIT_STATUS_OK;
LOGGER.info("发起者提交本地事务,补偿Id:[{}]", compensateId);
//删除补偿信息
txCompensationManager.removeTxCompensation(compensateId);
//通知tm完成事务
@ -127,8 +135,8 @@ public class StartTxTransactionHandler implements TxTransactionHandler {
} else {
LogUtil.error(LOGGER, () -> "预提交失败!");
//删除补偿信息
txCompensationManager.removeTxCompensation(compensateId);
//这里建议不直接删除补偿信息交由补偿任务控制当前任务无法判定提交超时还是返回失败
//txCompensationManager.removeTxCompensation(compensateId);
platformTransactionManager.rollback(transactionStatus);
}
LogUtil.info(LOGGER, "tx-transaction end, class{}", () -> point.getTarget().getClass());
@ -144,6 +152,13 @@ public class StartTxTransactionHandler implements TxTransactionHandler {
throw throwable;
} finally {
TxTransactionLocal.getInstance().removeTxGroupId();
/**
* 1. 若事务提交成功这里不进行处理此时completeFlag="0" ,则异常情况下进入补偿的任务认为当前任务还在处理中不对其进行补偿处理;
* 2. 若事务未提交,当前任务更新completeFlag="1" 补偿任务可以继续向下执行补偿
*/
if (CommonConstant.TX_TRANSACTION_COMMIT_STATUS_BAD.equals(commitStatus)) {
txCompensationManager.updateTxCompensation(groupId);
}
}
} else {
throw new TransactionRuntimeException("TxManager connection ex");

View File

@ -23,7 +23,9 @@ import com.google.common.collect.Lists;
import com.raincat.common.bean.TransactionRecover;
import com.raincat.common.config.TxConfig;
import com.raincat.common.config.TxRedisConfig;
import com.raincat.common.constant.CommonConstant;
import com.raincat.common.enums.CompensationCacheTypeEnum;
import com.raincat.common.enums.CompensationOperationTypeEnum;
import com.raincat.common.exception.TransactionIoException;
import com.raincat.common.exception.TransactionRuntimeException;
import com.raincat.common.holder.LogUtil;
@ -50,6 +52,7 @@ import java.util.stream.Collectors;
/**
* redis impl.
*
* @author xiaoyu
*/
public class RedisTransactionRecoverRepository implements TransactionRecoverRepository {
@ -87,6 +90,11 @@ public class RedisTransactionRecoverRepository implements TransactionRecoverRepo
public int update(final TransactionRecover transactionRecover) throws TransactionRuntimeException {
try {
final String redisKey = RedisHelper.buildRecoverKey(keyName, transactionRecover.getId());
if (CompensationOperationTypeEnum.TASK_EXECUTE.getCode() == transactionRecover.getOperation()) {//任务完成时更新操作
TransactionRecover recover = findById(transactionRecover.getId());
recover.setCompleteFlag(CommonConstant.TX_TRANSACTION_COMPLETE_FLAG_OK);
return ROWS;
}
transactionRecover.setVersion(transactionRecover.getVersion() + 1);
transactionRecover.setLastTime(new Date());
transactionRecover.setRetriedCount(transactionRecover.getRetriedCount() + 1);