diff --git a/raincat-common/src/main/java/com/raincat/common/netty/bean/TxTransactionItem.java b/raincat-common/src/main/java/com/raincat/common/netty/bean/TxTransactionItem.java index c4a5d01..a37d758 100644 --- a/raincat-common/src/main/java/com/raincat/common/netty/bean/TxTransactionItem.java +++ b/raincat-common/src/main/java/com/raincat/common/netty/bean/TxTransactionItem.java @@ -29,6 +29,7 @@ import java.io.Serializable; /** * TxTransactionItem. + * * @author xiaoyu */ @Data @@ -37,9 +38,9 @@ public class TxTransactionItem implements Serializable { private static final long serialVersionUID = -983809184773470584L; /** - *线程安全 + * 线程安全 */ - private static final ObjectMapper OBJECT_MAPPER=new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); /** * taskKey. @@ -108,24 +109,16 @@ public class TxTransactionItem implements Serializable { public void setMessage(Object message) { - if(message!=null && !( message instanceof String)) - { - try - { - this.message=OBJECT_MAPPER.writeValueAsString(message); + if (message != null && !(message instanceof String)) { + try { + this.message = OBJECT_MAPPER.writeValueAsString(message); + } catch (JsonProcessingException e) { + this.message = "internal server error,fail to parse object"; + log.error("设置操作结果信息时出错,message:{}", message, e); } - catch (JsonProcessingException e) - { - this.message="internal server error,fail to parse object"; - log.error("设置操作结果信息时出错,message:{}",message,e); - } - - } - else - { + } else { this.message = message; } - } diff --git a/raincat-core/src/main/java/com/raincat/core/disruptor/handler/TxTransactionEventHandler.java b/raincat-core/src/main/java/com/raincat/core/disruptor/handler/TxTransactionEventHandler.java index 73015e8..292a73f 100644 --- a/raincat-core/src/main/java/com/raincat/core/disruptor/handler/TxTransactionEventHandler.java +++ b/raincat-core/src/main/java/com/raincat/core/disruptor/handler/TxTransactionEventHandler.java @@ -19,35 +19,43 @@ package com.raincat.core.disruptor.handler; -import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.WorkHandler; import com.raincat.common.enums.CompensationActionEnum; import com.raincat.core.compensation.TxCompensationService; import com.raincat.core.disruptor.event.TxTransactionEvent; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; + +import java.util.concurrent.Executor; /** - * Disroptor handler. + * disruptor handler. * * @author xiaoyu(Myth) */ -@Component -public class TxTransactionEventHandler implements EventHandler { +public class TxTransactionEventHandler implements WorkHandler { - @Autowired - private TxCompensationService txCompensationService; + private final TxCompensationService txCompensationService; + + private final Executor executor; + + public TxTransactionEventHandler(final Executor executor, final TxCompensationService txCompensationService) { + this.executor = executor; + this.txCompensationService = txCompensationService; + } @Override - public void onEvent(final TxTransactionEvent txTransactionEvent, final long sequence, final boolean endOfBatch) { - if (txTransactionEvent.getType() == CompensationActionEnum.SAVE.getCode()) { - txCompensationService.save(txTransactionEvent.getTransactionRecover()); - } else if (txTransactionEvent.getType() == CompensationActionEnum.DELETE.getCode()) { - txCompensationService.remove(txTransactionEvent.getTransactionRecover().getId()); - } else if (txTransactionEvent.getType() == CompensationActionEnum.UPDATE.getCode()) { - txCompensationService.update(txTransactionEvent.getTransactionRecover()); - } else if (txTransactionEvent.getType() == CompensationActionEnum.COMPENSATE.getCode()) { - txCompensationService.compensation(txTransactionEvent.getTransactionRecover()); - } - txTransactionEvent.clear(); + public void onEvent(final TxTransactionEvent txTransactionEvent) { + executor.execute(() -> { + if (txTransactionEvent.getType() == CompensationActionEnum.SAVE.getCode()) { + txCompensationService.save(txTransactionEvent.getTransactionRecover()); + } else if (txTransactionEvent.getType() == CompensationActionEnum.DELETE.getCode()) { + txCompensationService.remove(txTransactionEvent.getTransactionRecover().getId()); + } else if (txTransactionEvent.getType() == CompensationActionEnum.UPDATE.getCode()) { + txCompensationService.update(txTransactionEvent.getTransactionRecover()); + } else if (txTransactionEvent.getType() == CompensationActionEnum.COMPENSATE.getCode()) { + txCompensationService.compensation(txTransactionEvent.getTransactionRecover()); + } + txTransactionEvent.clear(); + }); + } } diff --git a/raincat-core/src/main/java/com/raincat/core/disruptor/publisher/TxTransactionEventPublisher.java b/raincat-core/src/main/java/com/raincat/core/disruptor/publisher/TxTransactionEventPublisher.java index 58017cf..67bb3b0 100644 --- a/raincat-core/src/main/java/com/raincat/core/disruptor/publisher/TxTransactionEventPublisher.java +++ b/raincat-core/src/main/java/com/raincat/core/disruptor/publisher/TxTransactionEventPublisher.java @@ -19,21 +19,19 @@ package com.raincat.core.disruptor.publisher; -import com.lmax.disruptor.ExceptionHandler; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.IgnoreExceptionHandler; import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import com.raincat.common.bean.TransactionRecover; import com.raincat.common.enums.CompensationActionEnum; -import com.raincat.common.holder.LogUtil; +import com.raincat.core.compensation.TxCompensationService; import com.raincat.core.concurrent.threadpool.TxTransactionThreadFactory; import com.raincat.core.disruptor.event.TxTransactionEvent; import com.raincat.core.disruptor.factory.TxTransactionEventFactory; import com.raincat.core.disruptor.handler.TxTransactionEventHandler; import com.raincat.core.disruptor.translator.TxTransactionEventTranslator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -52,19 +50,15 @@ import java.util.concurrent.atomic.AtomicInteger; @Component public class TxTransactionEventPublisher implements DisposableBean { - private static final Logger LOGGER = LoggerFactory.getLogger(TxTransactionEventPublisher.class); - private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors() << 1; - private Executor executor; - private Disruptor disruptor; - private final TxTransactionEventHandler txTransactionEventHandler; + private final TxCompensationService txCompensationService; @Autowired - public TxTransactionEventPublisher(TxTransactionEventHandler txTransactionEventHandler) { - this.txTransactionEventHandler = txTransactionEventHandler; + public TxTransactionEventPublisher(final TxCompensationService txCompensationService) { + this.txCompensationService = txCompensationService; } /** @@ -76,29 +70,19 @@ public class TxTransactionEventPublisher implements DisposableBean { disruptor = new Disruptor<>(new TxTransactionEventFactory(), bufferSize, r -> { AtomicInteger index = new AtomicInteger(1); return new Thread(null, r, "disruptor-thread-" + index.getAndIncrement()); - }, ProducerType.MULTI, new YieldingWaitStrategy()); - disruptor.handleEventsWith(txTransactionEventHandler); - disruptor.setDefaultExceptionHandler(new ExceptionHandler() { - @Override - public void handleEventException(Throwable ex, long sequence, TxTransactionEvent event) { - LogUtil.error(LOGGER, () -> "Disruptor handleEventException:" - + event.getType() + event.getTransactionRecover().toString()); - } + }, ProducerType.MULTI, new BlockingWaitStrategy()); - @Override - public void handleOnStartException(Throwable ex) { - LogUtil.error(LOGGER, () -> "Disruptor start exception"); - } - - @Override - public void handleOnShutdownException(Throwable ex) { - LogUtil.error(LOGGER, () -> "Disruptor close Exception "); - } - }); - executor = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MILLISECONDS, + final Executor executor = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), TxTransactionThreadFactory.create("raincat-log-disruptor", false), new ThreadPoolExecutor.AbortPolicy()); + + TxTransactionEventHandler[] consumers = new TxTransactionEventHandler[MAX_THREAD]; + for (int i = 0; i < MAX_THREAD; i++) { + consumers[i] = new TxTransactionEventHandler(executor, txCompensationService); + } + disruptor.handleEventsWithWorkerPool(consumers); + disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler()); disruptor.start(); } @@ -109,10 +93,8 @@ public class TxTransactionEventPublisher implements DisposableBean { * @param type {@linkplain CompensationActionEnum} */ public void publishEvent(final TransactionRecover transactionRecover, final int type) { - executor.execute(() -> { - final RingBuffer ringBuffer = disruptor.getRingBuffer(); - ringBuffer.publishEvent(new TxTransactionEventTranslator(type), transactionRecover); - }); + final RingBuffer ringBuffer = disruptor.getRingBuffer(); + ringBuffer.publishEvent(new TxTransactionEventTranslator(type), transactionRecover); } @Override