事务补偿控制

This commit is contained in:
ChaosCoffee 2018-06-29 16:28:09 +08:00
parent c2f25c0a9b
commit cef9fd0fe3
6 changed files with 71 additions and 36 deletions

View File

@ -42,6 +42,8 @@ public class SqlHelper {
" `group_id` varchar(64) NOT NULL,\n" +
" `task_id` varchar(64) NOT NULL,\n" +
" `invocation` longblob NOT NULL,\n" +
" `complete_flag` varchar(2) NOT NULL,\n" +
" `operation` int(3),\n" +
" PRIMARY KEY (`id`)\n" +
")";
break;
@ -57,6 +59,8 @@ public class SqlHelper {
" `group_id` varchar2(64) NOT NULL,\n" +
" `task_id` varchar2(64) NOT NULL,\n" +
" `invocation` BLOB NOT NULL,\n" +
" `complete_flag` varchar(2) NOT NULL,\n" +
" `operation` int(3),\n" +
" PRIMARY KEY (`id`)\n" +
")";
break;
@ -72,6 +76,8 @@ public class SqlHelper {
" `group_id` nchar(64) NOT NULL,\n" +
" `task_id` nchar(64) NOT NULL,\n" +
" `invocation` varbinary NOT NULL,\n" +
" `complete_flag` varchar(2) NOT NULL,\n" +
" `operation` int(3),\n" +
" PRIMARY KEY (`id`)\n" +
")";
break;

View File

@ -19,13 +19,15 @@
package com.raincat.core.spi.repository;
import com.google.common.collect.Lists;
import com.raincat.common.bean.TransactionRecover;
import com.raincat.common.config.TxConfig;
import com.raincat.common.constant.CommonConstant;
import com.raincat.common.enums.CompensationCacheTypeEnum;
import com.raincat.common.enums.CompensationOperationTypeEnum;
import com.raincat.common.exception.TransactionRuntimeException;
import com.raincat.common.holder.RepositoryPathUtils;
import com.raincat.common.holder.TransactionRecoverUtils;
import com.raincat.common.serializer.ObjectSerializer;
import com.raincat.common.bean.TransactionRecover;
import com.raincat.common.config.TxConfig;
import com.raincat.core.spi.TransactionRecoverRepository;
import java.io.File;
@ -39,6 +41,7 @@ import java.util.stream.Collectors;
/**
* file impl.
*
* @author xiaoyu
*/
@SuppressWarnings("unchecked")
@ -73,6 +76,12 @@ public class FileTransactionRecoverRepository implements TransactionRecoverRepos
@Override
public int update(final TransactionRecover transactionRecover) throws TransactionRuntimeException {
if (CompensationOperationTypeEnum.TASK_EXECUTE.getCode() == transactionRecover.getOperation()) {//任务完成时更新操作
TransactionRecover recover = findById(transactionRecover.getId());
recover.setCompleteFlag(CommonConstant.TX_TRANSACTION_COMPLETE_FLAG_OK);
writeFile(recover);
return ROWS;
}
transactionRecover.setLastTime(new Date());
transactionRecover.setVersion(transactionRecover.getVersion() + 1);
transactionRecover.setRetriedCount(transactionRecover.getRetriedCount() + 1);

View File

@ -24,7 +24,9 @@ import com.raincat.common.bean.TransactionInvocation;
import com.raincat.common.bean.TransactionRecover;
import com.raincat.common.config.TxConfig;
import com.raincat.common.config.TxDbConfig;
import com.raincat.common.constant.CommonConstant;
import com.raincat.common.enums.CompensationCacheTypeEnum;
import com.raincat.common.enums.CompensationOperationTypeEnum;
import com.raincat.common.exception.TransactionException;
import com.raincat.common.exception.TransactionRuntimeException;
import com.raincat.common.holder.RepositoryPathUtils;
@ -35,20 +37,14 @@ import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.sql.*;
import java.util.*;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* jdbc impl.
*
* @author xiaoyu
*/
public class JdbcTransactionRecoverRepository implements TransactionRecoverRepository {
@ -69,8 +65,8 @@ public class JdbcTransactionRecoverRepository implements TransactionRecoverRepos
@Override
public int create(final TransactionRecover recover) {
String sql = "insert into " + tableName
+ "(id,target_class,target_method,retried_count,create_time,last_time,version,group_id,task_id,invocation)"
+ " values(?,?,?,?,?,?,?,?,?,?)";
+ "(id,target_class,target_method,retried_count,create_time,last_time,version,group_id,task_id,invocation,complete_flag,operation)"
+ " values(?,?,?,?,?,?,?,?,?,?,?,?)";
try {
final TransactionInvocation transactionInvocation = recover.getTransactionInvocation();
final String className = transactionInvocation.getTargetClazz().getName();
@ -80,7 +76,7 @@ public class JdbcTransactionRecoverRepository implements TransactionRecoverRepos
method, recover.getRetriedCount(),
recover.getCreateTime(), recover.getLastTime(),
recover.getVersion(), recover.getGroupId(),
recover.getTaskId(), serialize);
recover.getTaskId(), serialize, recover.getCompleteFlag(), recover.getOperation());
} catch (TransactionException e) {
e.printStackTrace();
return FAIL_ROWS;
@ -95,9 +91,14 @@ public class JdbcTransactionRecoverRepository implements TransactionRecoverRepos
@Override
public int update(final TransactionRecover transactionRecover) throws TransactionRuntimeException {
String sql = "update " + tableName
+ " set last_time = ?,version =version+ 1,retried_count =retried_count+1 where id = ? and version=? ";
if (CompensationOperationTypeEnum.TASK_EXECUTE.getCode() == transactionRecover.getOperation()) {//任务完成时更新操作
sql = "update " + tableName
+ " set last_time = ?,complete_flag = ? where id = ?";
executeUpdate(sql, new Date(), CommonConstant.TX_TRANSACTION_COMPLETE_FLAG_OK, transactionRecover.getId());
return ROWS;
}
int success = executeUpdate(sql, new Date(), transactionRecover.getId(), transactionRecover.getVersion());
if (success <= 0) {
throw new TransactionRuntimeException(UPDATE_EX);
@ -187,7 +188,7 @@ public class JdbcTransactionRecoverRepository implements TransactionRecoverRepos
private int executeUpdate(final String sql, final Object... params) {
try {
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
PreparedStatement ps = connection.prepareStatement(sql)) {
if (params != null) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
@ -206,7 +207,7 @@ public class JdbcTransactionRecoverRepository implements TransactionRecoverRepos
List<Map<String, Object>> list = null;
try {
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
PreparedStatement ps = connection.prepareStatement(sql)) {
if (params != null) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);

View File

@ -19,23 +19,26 @@
package com.raincat.core.spi.repository;
import com.google.common.base.Splitter;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.WriteResult;
import com.raincat.common.bean.TransactionInvocation;
import com.raincat.common.bean.TransactionRecover;
import com.raincat.common.bean.adapter.MongoAdapter;
import com.raincat.common.config.TxConfig;
import com.raincat.common.config.TxMongoConfig;
import com.raincat.common.constant.CommonConstant;
import com.raincat.common.enums.CompensationCacheTypeEnum;
import com.raincat.common.enums.CompensationOperationTypeEnum;
import com.raincat.common.enums.TransactionStatusEnum;
import com.raincat.common.exception.TransactionException;
import com.raincat.common.exception.TransactionRuntimeException;
import com.raincat.common.holder.Assert;
import com.raincat.common.holder.LogUtil;
import com.raincat.common.holder.RepositoryPathUtils;
import com.raincat.common.holder.TransactionRecoverUtils;
import com.raincat.common.serializer.ObjectSerializer;
import com.raincat.common.bean.adapter.MongoAdapter;
import com.raincat.common.bean.TransactionInvocation;
import com.raincat.common.bean.TransactionRecover;
import com.raincat.common.config.TxConfig;
import com.raincat.common.config.TxMongoConfig;
import com.raincat.core.spi.TransactionRecoverRepository;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.WriteResult;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -80,6 +83,8 @@ public class MongoTransactionRecoverRepository implements TransactionRecoverRepo
mongoAdapter.setTargetClass(invocation.getTargetClazz().getName());
mongoAdapter.setTargetMethod(invocation.getMethod());
mongoAdapter.setContents(objectSerializer.serialize(invocation));
mongoAdapter.setCompleteFlag(transactionRecover.getCompleteFlag());
mongoAdapter.setOperation(transactionRecover.getOperation());
template.save(mongoAdapter, collectionName);
} catch (TransactionException e) {
e.printStackTrace();
@ -101,9 +106,13 @@ public class MongoTransactionRecoverRepository implements TransactionRecoverRepo
Query query = new Query();
query.addCriteria(new Criteria("transId").is(transactionRecover.getId()));
Update update = new Update();
update.set("lastTime", new Date());
update.set("retriedCount", transactionRecover.getRetriedCount() + 1);
update.set("version", transactionRecover.getVersion() + 1);
if (CompensationOperationTypeEnum.TASK_EXECUTE.getCode() == transactionRecover.getOperation()) {//任务完成时更新操作
update.set("completeFlag",CommonConstant.TX_TRANSACTION_COMPLETE_FLAG_OK);
} else if (CompensationOperationTypeEnum.COMPENSATION.getCode() == transactionRecover.getOperation()) {
update.set("lastTime", new Date());
update.set("retriedCount", transactionRecover.getRetriedCount() + 1);
update.set("version", transactionRecover.getVersion() + 1);
}
final WriteResult writeResult = template.updateFirst(query, update, MongoAdapter.class, collectionName);
if (writeResult.getN() <= 0) {
throw new TransactionRuntimeException(UPDATE_EX);
@ -129,6 +138,8 @@ public class MongoTransactionRecoverRepository implements TransactionRecoverRepo
recover.setRetriedCount(cache.getRetriedCount());
recover.setVersion(cache.getVersion());
recover.setStatus(cache.getStatus());
recover.setCompleteFlag(cache.getCompleteFlag());
recover.setOperation(cache.getOperation());
final TransactionInvocation transactionInvocation;
try {
transactionInvocation = objectSerializer.deSerialize(cache.getContents(), TransactionInvocation.class);

View File

@ -93,6 +93,7 @@ public class RedisTransactionRecoverRepository implements TransactionRecoverRepo
if (CompensationOperationTypeEnum.TASK_EXECUTE.getCode() == transactionRecover.getOperation()) {//任务完成时更新操作
TransactionRecover recover = findById(transactionRecover.getId());
recover.setCompleteFlag(CommonConstant.TX_TRANSACTION_COMPLETE_FLAG_OK);
jedisClient.set(redisKey, TransactionRecoverUtils.convert(recover, objectSerializer));
return ROWS;
}
transactionRecover.setVersion(transactionRecover.getVersion() + 1);

View File

@ -19,7 +19,12 @@
package com.raincat.core.spi.repository;
import com.google.common.collect.Lists;
import com.raincat.common.bean.TransactionRecover;
import com.raincat.common.config.TxConfig;
import com.raincat.common.config.TxZookeeperConfig;
import com.raincat.common.constant.CommonConstant;
import com.raincat.common.enums.CompensationCacheTypeEnum;
import com.raincat.common.enums.CompensationOperationTypeEnum;
import com.raincat.common.exception.TransactionException;
import com.raincat.common.exception.TransactionIoException;
import com.raincat.common.exception.TransactionRuntimeException;
@ -27,17 +32,10 @@ import com.raincat.common.holder.LogUtil;
import com.raincat.common.holder.RepositoryPathUtils;
import com.raincat.common.holder.TransactionRecoverUtils;
import com.raincat.common.serializer.ObjectSerializer;
import com.raincat.common.bean.TransactionRecover;
import com.raincat.common.config.TxConfig;
import com.raincat.common.config.TxZookeeperConfig;
import com.raincat.core.spi.TransactionRecoverRepository;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,6 +47,7 @@ import java.util.stream.Collectors;
/**
* zookeeper impl.
*
* @author xiaoyu
*/
public class ZookeeperTransactionRecoverRepository implements TransactionRecoverRepository {
@ -93,6 +92,14 @@ public class ZookeeperTransactionRecoverRepository implements TransactionRecover
@Override
public int update(final TransactionRecover transactionRecover) throws TransactionRuntimeException {
try {
if (CompensationOperationTypeEnum.TASK_EXECUTE.getCode() == transactionRecover.getOperation()) {//任务完成时更新操作
TransactionRecover recover = findById(transactionRecover.getId());
recover.setCompleteFlag(CommonConstant.TX_TRANSACTION_COMPLETE_FLAG_OK);
zooKeeper.setData(getRootPath(recover.getId()),
TransactionRecoverUtils.convert(recover, objectSerializer),
recover.getVersion() - 2);
return ROWS;
}
transactionRecover.setLastTime(new Date());
transactionRecover.setVersion(transactionRecover.getVersion() + 1);
transactionRecover.setRetriedCount(transactionRecover.getRetriedCount() + 1);