Merge pull request #55 from sixh/master

顺序消费
This commit is contained in:
纳兰丶 2018-11-14 18:55:08 +08:00 committed by GitHub
commit 5b39fecdb2
5 changed files with 243 additions and 20 deletions

View File

@ -24,27 +24,38 @@ import java.util.function.Supplier;
/**
* LogUtil.
*
* @author xiaoyu
*/
public final class LogUtil {
/**
* The constant LOG_UTIL.
*/
private static final LogUtil LOG_UTIL = new LogUtil();
/**
* Instantiates a new Log util.
*/
private LogUtil() {
}
/**
* Gets instance.
*
* @return the instance
*/
public static LogUtil getInstance() {
return LOG_UTIL;
}
/**
* debug 打印日志.
* debug print log.
*
* @param logger 日志
* @param format 日志信息
* @param supplier supplier接口
* @param logger log
* @param format log information
* @param supplier supplier
*/
public static void debug(Logger logger, String format, Supplier<Object> supplier) {
if (logger.isDebugEnabled()) {
@ -52,42 +63,87 @@ public final class LogUtil {
}
}
/**
* Debug.
*
* @param logger the logger
* @param supplier the supplier
*/
public static void debug(Logger logger, Supplier<Object> supplier) {
if (logger.isDebugEnabled()) {
logger.debug(Objects.toString(supplier.get()));
}
}
/**
* Info.
*
* @param logger the logger
* @param format the format
* @param supplier the supplier
*/
public static void info(Logger logger, String format, Supplier<Object> supplier) {
if (logger.isInfoEnabled()) {
logger.info(format, supplier.get());
}
}
/**
* Info.
*
* @param logger the logger
* @param supplier the supplier
*/
public static void info(Logger logger, Supplier<Object> supplier) {
if (logger.isInfoEnabled()) {
logger.info(Objects.toString(supplier.get()));
}
}
/**
* Error.
*
* @param logger the logger
* @param format the format
* @param supplier the supplier
*/
public static void error(Logger logger, String format, Supplier<Object> supplier) {
if (logger.isErrorEnabled()) {
logger.error(format, supplier.get());
}
}
/**
* Error.
*
* @param logger the logger
* @param supplier the supplier
*/
public static void error(Logger logger, Supplier<Object> supplier) {
if (logger.isErrorEnabled()) {
logger.error(Objects.toString(supplier.get()));
}
}
/**
* Warn.
*
* @param logger the logger
* @param format the format
* @param supplier the supplier
*/
public static void warn(Logger logger, String format, Supplier<Object> supplier) {
if (logger.isWarnEnabled()) {
logger.warn(format, supplier.get());
}
}
/**
* Warn.
*
* @param logger the logger
* @param supplier the supplier
*/
public static void warn(Logger logger, Supplier<Object> supplier) {
if (logger.isWarnEnabled()) {
logger.warn(Objects.toString(supplier.get()));

View File

@ -0,0 +1,113 @@
package com.hmily.tcc.core.concurrent;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* Thread routing selector.
*
* @author chenbin sixh
*/
public final class ConsistentHashSelector {
/**
* The Virtual invokers.
*/
private final TreeMap<Long, SingletonExecutor> virtualInvokers;
/**
* The Replica number.
*/
private final int replicaNumber = 160;
/**
* Instantiates a new Consistent hash selector.
*
* @param selects the selects
*/
public ConsistentHashSelector(List<SingletonExecutor> selects) {
this.virtualInvokers = new TreeMap<>();
for (SingletonExecutor executor : selects) {
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(executor.getName() + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, executor);
}
}
}
}
/**
* Select singleton executor.
*
* @param key the key
* @return the singleton executor
*/
public SingletonExecutor select(String key) {
byte[] digest = md5(key);
return selectForKey(hash(digest, 0));
}
/**
* Select for key singleton executor.
*
* @param hash the hash
* @return the singleton executor
*/
private SingletonExecutor selectForKey(long hash) {
SingletonExecutor invoker;
Long key = hash;
if (!virtualInvokers.containsKey(key)) {
SortedMap<Long, SingletonExecutor> tailMap = virtualInvokers.tailMap(key);
if (tailMap.isEmpty()) {
key = virtualInvokers.firstKey();
} else {
key = tailMap.firstKey();
}
}
invoker = virtualInvokers.get(key);
return invoker;
}
/**
* Ketama is a hash algorithm.
*
* @param digest digest;
* @param number numerical;
* @return hash value ;
*/
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
/**
* Md 5 byte [ ].
*
* @param value the value
* @return the byte [ ]
*/
private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes;
bytes = value.getBytes(StandardCharsets.UTF_8);
md5.update(bytes);
return md5.digest();
}
}

View File

@ -0,0 +1,54 @@
package com.hmily.tcc.core.concurrent;
import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
/**
* The execution thread of a single task.
*
* @author chenbin
*/
public class SingletonExecutor extends ThreadPoolExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(SingletonExecutor.class);
private static final int QUEUE_SIZE = 5000;
private static final RejectedExecutionHandler HANDLER = (r, executor) -> {
BlockingQueue<Runnable> queue = executor.getQueue();
while (queue.size() >= QUEUE_SIZE) {
if (executor.isShutdown()) {
throw new RejectedExecutionException("SingletonExecutor closed");
}
try {
((SingletonExecutor) executor).onRejected();
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
}
executor.execute(r);
};
/**
* thread name.
*/
private String name;
private void onRejected() {
LOGGER.info("...thread:{}, Saturation occurs, actuator:{}", Thread.currentThread().getName(), name);
}
public SingletonExecutor(String poolName) {
super(1, 1, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(QUEUE_SIZE),
HmilyThreadFactory.create(poolName, false),
HANDLER);
this.name = poolName;
}
public String getName() {
return name;
}
}

View File

@ -2,6 +2,7 @@ package com.hmily.tcc.core.disruptor.handler;
import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.common.enums.EventTypeEnum;
import com.hmily.tcc.core.concurrent.ConsistentHashSelector;
import com.hmily.tcc.core.coordinator.CoordinatorService;
import com.hmily.tcc.core.disruptor.event.HmilyTransactionEvent;
import com.lmax.disruptor.WorkHandler;
@ -10,22 +11,24 @@ import java.util.concurrent.Executor;
/**
* this is disruptor consumer.
*
* @author xiaoyu(Myth)
*/
public class HmilyConsumerDataHandler implements WorkHandler<HmilyTransactionEvent> {
private Executor executor;
private ConsistentHashSelector executor;
private final CoordinatorService coordinatorService;
public HmilyConsumerDataHandler(final Executor executor, final CoordinatorService coordinatorService) {
public HmilyConsumerDataHandler(final ConsistentHashSelector executor, final CoordinatorService coordinatorService) {
this.executor = executor;
this.coordinatorService = coordinatorService;
}
@Override
public void onEvent(final HmilyTransactionEvent event) {
executor.execute(() -> {
String transId = event.getTccTransaction().getTransId();
executor.select(transId).execute(() -> {
if (event.getType() == EventTypeEnum.SAVE.getCode()) {
coordinatorService.save(event.getTccTransaction());
} else if (event.getType() == EventTypeEnum.UPDATE_PARTICIPANT.getCode()) {

View File

@ -21,7 +21,8 @@ package com.hmily.tcc.core.disruptor.publisher;
import com.hmily.tcc.common.bean.entity.TccTransaction;
import com.hmily.tcc.common.enums.EventTypeEnum;
import com.hmily.tcc.core.concurrent.threadpool.HmilyThreadFactory;
import com.hmily.tcc.core.concurrent.ConsistentHashSelector;
import com.hmily.tcc.core.concurrent.SingletonExecutor;
import com.hmily.tcc.core.coordinator.CoordinatorService;
import com.hmily.tcc.core.disruptor.event.HmilyTransactionEvent;
import com.hmily.tcc.core.disruptor.factory.HmilyTransactionEventFactory;
@ -36,10 +37,8 @@ import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -71,15 +70,13 @@ public class HmilyTransactionEventPublisher implements DisposableBean {
return new Thread(null, r, "disruptor-thread-" + index.getAndIncrement());
}, ProducerType.MULTI, new BlockingWaitStrategy());
final Executor executor = new ThreadPoolExecutor(threadSize, threadSize, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
HmilyThreadFactory.create("hmily-log-disruptor", false),
new ThreadPoolExecutor.AbortPolicy());
HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[threadSize];
HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[1];
List<SingletonExecutor> selects = new ArrayList<>();
for (int i = 0; i < threadSize; i++) {
consumers[i] = new HmilyConsumerDataHandler(executor, coordinatorService);
selects.add(new SingletonExecutor("hmily-log-disruptor" + i));
}
ConsistentHashSelector selector = new ConsistentHashSelector(selects);
consumers[0] = new HmilyConsumerDataHandler(selector, coordinatorService);
disruptor.handleEventsWithWorkerPool(consumers);
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
disruptor.start();