From be2cfb7ec88b6b1064be60c900acbc9c26c2c222 Mon Sep 17 00:00:00 2001 From: "chenbing399@163.com" Date: Wed, 14 Nov 2018 18:47:29 +0800 Subject: [PATCH 1/2] 1.add the disruption tor order consumption. --- .../com/hmily/tcc/common/utils/LogUtil.java | 3 + .../concurrent/ConsistentHashSelector.java | 113 ++++++++++++++++++ .../core/concurrent/SingletonExecutor.java | 54 +++++++++ .../handler/HmilyConsumerDataHandler.java | 9 +- .../HmilyTransactionEventPublisher.java | 21 ++-- 5 files changed, 185 insertions(+), 15 deletions(-) create mode 100644 hmily-tcc-core/src/main/java/com/hmily/tcc/core/concurrent/ConsistentHashSelector.java create mode 100644 hmily-tcc-core/src/main/java/com/hmily/tcc/core/concurrent/SingletonExecutor.java diff --git a/hmily-tcc-common/src/main/java/com/hmily/tcc/common/utils/LogUtil.java b/hmily-tcc-common/src/main/java/com/hmily/tcc/common/utils/LogUtil.java index 32365ba8..7bdebf4c 100644 --- a/hmily-tcc-common/src/main/java/com/hmily/tcc/common/utils/LogUtil.java +++ b/hmily-tcc-common/src/main/java/com/hmily/tcc/common/utils/LogUtil.java @@ -24,6 +24,7 @@ import java.util.function.Supplier; /** * LogUtil. + * * @author xiaoyu */ public final class LogUtil { @@ -64,6 +65,8 @@ public final class LogUtil { } } + + public static void info(Logger logger, Supplier supplier) { if (logger.isInfoEnabled()) { logger.info(Objects.toString(supplier.get())); diff --git a/hmily-tcc-core/src/main/java/com/hmily/tcc/core/concurrent/ConsistentHashSelector.java b/hmily-tcc-core/src/main/java/com/hmily/tcc/core/concurrent/ConsistentHashSelector.java new file mode 100644 index 00000000..9fd1004b --- /dev/null +++ b/hmily-tcc-core/src/main/java/com/hmily/tcc/core/concurrent/ConsistentHashSelector.java @@ -0,0 +1,113 @@ +package com.hmily.tcc.core.concurrent; + +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * Thread routing selector. + * + * @author chenbin sixh + */ +public final class ConsistentHashSelector { + + /** + * The Virtual invokers. + */ + private final TreeMap virtualInvokers; + + /** + * The Replica number. + */ + private final int replicaNumber = 160; + + /** + * Instantiates a new Consistent hash selector. + * + * @param selects the selects + */ + public ConsistentHashSelector(List selects) { + this.virtualInvokers = new TreeMap<>(); + for (SingletonExecutor executor : selects) { + for (int i = 0; i < replicaNumber / 4; i++) { + byte[] digest = md5(executor.getName() + i); + for (int h = 0; h < 4; h++) { + long m = hash(digest, h); + virtualInvokers.put(m, executor); + } + } + } + } + + /** + * Select singleton executor. + * + * @param key the key + * @return the singleton executor + */ + public SingletonExecutor select(String key) { + byte[] digest = md5(key); + return selectForKey(hash(digest, 0)); + } + + + /** + * Select for key singleton executor. + * + * @param hash the hash + * @return the singleton executor + */ + private SingletonExecutor selectForKey(long hash) { + SingletonExecutor invoker; + Long key = hash; + if (!virtualInvokers.containsKey(key)) { + SortedMap tailMap = virtualInvokers.tailMap(key); + if (tailMap.isEmpty()) { + key = virtualInvokers.firstKey(); + } else { + key = tailMap.firstKey(); + } + } + invoker = virtualInvokers.get(key); + return invoker; + } + + /** + * Ketama is a hash algorithm. + * + * @param digest digest; + * @param number numerical; + * @return hash value ; + */ + private long hash(byte[] digest, int number) { + return (((long) (digest[3 + number * 4] & 0xFF) << 24) + | ((long) (digest[2 + number * 4] & 0xFF) << 16) + | ((long) (digest[1 + number * 4] & 0xFF) << 8) + | (digest[number * 4] & 0xFF)) + & 0xFFFFFFFFL; + } + + /** + * Md 5 byte [ ]. + * + * @param value the value + * @return the byte [ ] + */ + private byte[] md5(String value) { + MessageDigest md5; + try { + md5 = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException(e.getMessage(), e); + } + md5.reset(); + byte[] bytes; + bytes = value.getBytes(StandardCharsets.UTF_8); + md5.update(bytes); + return md5.digest(); + } + +} \ No newline at end of file diff --git a/hmily-tcc-core/src/main/java/com/hmily/tcc/core/concurrent/SingletonExecutor.java b/hmily-tcc-core/src/main/java/com/hmily/tcc/core/concurrent/SingletonExecutor.java new file mode 100644 index 00000000..75c8f02c --- /dev/null +++ b/hmily-tcc-core/src/main/java/com/hmily/tcc/core/concurrent/SingletonExecutor.java @@ -0,0 +1,54 @@ +package com.hmily.tcc.core.concurrent; + + +import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.*; + +/** + * The execution thread of a single task. + * + * @author chenbin + */ +public class SingletonExecutor extends ThreadPoolExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(SingletonExecutor.class); + private static final int QUEUE_SIZE = 5000; + private static final RejectedExecutionHandler HANDLER = (r, executor) -> { + BlockingQueue queue = executor.getQueue(); + while (queue.size() >= QUEUE_SIZE) { + if (executor.isShutdown()) { + throw new RejectedExecutionException("SingletonExecutor closed"); + } + try { + ((SingletonExecutor) executor).onRejected(); + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException ignored) { + } + } + executor.execute(r); + }; + + /** + * thread name. + */ + private String name; + + private void onRejected() { + LOGGER.info("...thread:{}, Saturation occurs, actuator:{}", Thread.currentThread().getName(), name); + } + + public SingletonExecutor(String poolName) { + super(1, 1, 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(QUEUE_SIZE), + HmilyThreadFactory.create(poolName, false), + HANDLER); + this.name = poolName; + } + + public String getName() { + return name; + } +} \ No newline at end of file diff --git a/hmily-tcc-core/src/main/java/com/hmily/tcc/core/disruptor/handler/HmilyConsumerDataHandler.java b/hmily-tcc-core/src/main/java/com/hmily/tcc/core/disruptor/handler/HmilyConsumerDataHandler.java index a80eb429..0ddd4764 100644 --- a/hmily-tcc-core/src/main/java/com/hmily/tcc/core/disruptor/handler/HmilyConsumerDataHandler.java +++ b/hmily-tcc-core/src/main/java/com/hmily/tcc/core/disruptor/handler/HmilyConsumerDataHandler.java @@ -2,6 +2,7 @@ package com.hmily.tcc.core.disruptor.handler; import com.hmily.tcc.common.bean.entity.TccTransaction; import com.hmily.tcc.common.enums.EventTypeEnum; +import com.hmily.tcc.core.concurrent.ConsistentHashSelector; import com.hmily.tcc.core.coordinator.CoordinatorService; import com.hmily.tcc.core.disruptor.event.HmilyTransactionEvent; import com.lmax.disruptor.WorkHandler; @@ -10,22 +11,24 @@ import java.util.concurrent.Executor; /** * this is disruptor consumer. + * * @author xiaoyu(Myth) */ public class HmilyConsumerDataHandler implements WorkHandler { - private Executor executor; + private ConsistentHashSelector executor; private final CoordinatorService coordinatorService; - public HmilyConsumerDataHandler(final Executor executor, final CoordinatorService coordinatorService) { + public HmilyConsumerDataHandler(final ConsistentHashSelector executor, final CoordinatorService coordinatorService) { this.executor = executor; this.coordinatorService = coordinatorService; } @Override public void onEvent(final HmilyTransactionEvent event) { - executor.execute(() -> { + String transId = event.getTccTransaction().getTransId(); + executor.select(transId).execute(() -> { if (event.getType() == EventTypeEnum.SAVE.getCode()) { coordinatorService.save(event.getTccTransaction()); } else if (event.getType() == EventTypeEnum.UPDATE_PARTICIPANT.getCode()) { diff --git a/hmily-tcc-core/src/main/java/com/hmily/tcc/core/disruptor/publisher/HmilyTransactionEventPublisher.java b/hmily-tcc-core/src/main/java/com/hmily/tcc/core/disruptor/publisher/HmilyTransactionEventPublisher.java index 57e3026e..5bfea597 100644 --- a/hmily-tcc-core/src/main/java/com/hmily/tcc/core/disruptor/publisher/HmilyTransactionEventPublisher.java +++ b/hmily-tcc-core/src/main/java/com/hmily/tcc/core/disruptor/publisher/HmilyTransactionEventPublisher.java @@ -21,7 +21,8 @@ package com.hmily.tcc.core.disruptor.publisher; import com.hmily.tcc.common.bean.entity.TccTransaction; import com.hmily.tcc.common.enums.EventTypeEnum; -import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory; +import com.hmily.tcc.core.concurrent.ConsistentHashSelector; +import com.hmily.tcc.core.concurrent.SingletonExecutor; import com.hmily.tcc.core.coordinator.CoordinatorService; import com.hmily.tcc.core.disruptor.event.HmilyTransactionEvent; import com.hmily.tcc.core.disruptor.factory.HmilyTransactionEventFactory; @@ -36,10 +37,8 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** @@ -71,15 +70,13 @@ public class HmilyTransactionEventPublisher implements DisposableBean { return new Thread(null, r, "disruptor-thread-" + index.getAndIncrement()); }, ProducerType.MULTI, new BlockingWaitStrategy()); - final Executor executor = new ThreadPoolExecutor(threadSize, threadSize, 0, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), - HmilyThreadFactory.create("hmily-log-disruptor", false), - new ThreadPoolExecutor.AbortPolicy()); - - HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[threadSize]; + HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[1]; + List selects = new ArrayList<>(); for (int i = 0; i < threadSize; i++) { - consumers[i] = new HmilyConsumerDataHandler(executor, coordinatorService); + selects.add(new SingletonExecutor("hmily-log-disruptor" + i)); } + ConsistentHashSelector selector = new ConsistentHashSelector(selects); + consumers[0] = new HmilyConsumerDataHandler(selector, coordinatorService); disruptor.handleEventsWithWorkerPool(consumers); disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler()); disruptor.start(); From 8882d0720fdef7b94a833a84d13debe76fdb9656 Mon Sep 17 00:00:00 2001 From: "chenbing399@163.com" Date: Wed, 14 Nov 2018 18:50:40 +0800 Subject: [PATCH 2/2] 1.Formatting coding --- .../com/hmily/tcc/common/utils/LogUtil.java | 67 +++++++++++++++++-- 1 file changed, 60 insertions(+), 7 deletions(-) diff --git a/hmily-tcc-common/src/main/java/com/hmily/tcc/common/utils/LogUtil.java b/hmily-tcc-common/src/main/java/com/hmily/tcc/common/utils/LogUtil.java index 7bdebf4c..3ee91200 100644 --- a/hmily-tcc-common/src/main/java/com/hmily/tcc/common/utils/LogUtil.java +++ b/hmily-tcc-common/src/main/java/com/hmily/tcc/common/utils/LogUtil.java @@ -29,23 +29,33 @@ import java.util.function.Supplier; */ public final class LogUtil { + /** + * The constant LOG_UTIL. + */ private static final LogUtil LOG_UTIL = new LogUtil(); + /** + * Instantiates a new Log util. + */ private LogUtil() { } + /** + * Gets instance. + * + * @return the instance + */ public static LogUtil getInstance() { return LOG_UTIL; } - /** - * debug 打印日志. + * debug print log. * - * @param logger 日志 - * @param format 日志信息 - * @param supplier supplier接口 + * @param logger log + * @param format log information + * @param supplier supplier */ public static void debug(Logger logger, String format, Supplier supplier) { if (logger.isDebugEnabled()) { @@ -53,44 +63,87 @@ public final class LogUtil { } } + /** + * Debug. + * + * @param logger the logger + * @param supplier the supplier + */ public static void debug(Logger logger, Supplier supplier) { if (logger.isDebugEnabled()) { logger.debug(Objects.toString(supplier.get())); } } + /** + * Info. + * + * @param logger the logger + * @param format the format + * @param supplier the supplier + */ public static void info(Logger logger, String format, Supplier supplier) { if (logger.isInfoEnabled()) { logger.info(format, supplier.get()); } } - - + /** + * Info. + * + * @param logger the logger + * @param supplier the supplier + */ public static void info(Logger logger, Supplier supplier) { if (logger.isInfoEnabled()) { logger.info(Objects.toString(supplier.get())); } } + /** + * Error. + * + * @param logger the logger + * @param format the format + * @param supplier the supplier + */ public static void error(Logger logger, String format, Supplier supplier) { if (logger.isErrorEnabled()) { logger.error(format, supplier.get()); } } + /** + * Error. + * + * @param logger the logger + * @param supplier the supplier + */ public static void error(Logger logger, Supplier supplier) { if (logger.isErrorEnabled()) { logger.error(Objects.toString(supplier.get())); } } + /** + * Warn. + * + * @param logger the logger + * @param format the format + * @param supplier the supplier + */ public static void warn(Logger logger, String format, Supplier supplier) { if (logger.isWarnEnabled()) { logger.warn(format, supplier.get()); } } + /** + * Warn. + * + * @param logger the logger + * @param supplier the supplier + */ public static void warn(Logger logger, Supplier supplier) { if (logger.isWarnEnabled()) { logger.warn(Objects.toString(supplier.get()));