disruptor commit.

This commit is contained in:
Administrator 2018-09-27 18:25:30 +08:00
parent 72c0f52905
commit dd9529eadc
3 changed files with 54 additions and 71 deletions

View File

@ -29,6 +29,7 @@ import java.io.Serializable;
/**
* TxTransactionItem.
*
* @author xiaoyu
*/
@Data
@ -37,9 +38,9 @@ public class TxTransactionItem implements Serializable {
private static final long serialVersionUID = -983809184773470584L;
/**
*线程安全
* 线程安全
*/
private static final ObjectMapper OBJECT_MAPPER=new ObjectMapper();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* taskKey.
@ -108,24 +109,16 @@ public class TxTransactionItem implements Serializable {
public void setMessage(Object message) {
if(message!=null && !( message instanceof String))
{
try
{
this.message=OBJECT_MAPPER.writeValueAsString(message);
if (message != null && !(message instanceof String)) {
try {
this.message = OBJECT_MAPPER.writeValueAsString(message);
} catch (JsonProcessingException e) {
this.message = "internal server error,fail to parse object";
log.error("设置操作结果信息时出错message:{}", message, e);
}
catch (JsonProcessingException e)
{
this.message="internal server error,fail to parse object";
log.error("设置操作结果信息时出错message:{}",message,e);
}
}
else
{
} else {
this.message = message;
}
}

View File

@ -19,35 +19,43 @@
package com.raincat.core.disruptor.handler;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import com.raincat.common.enums.CompensationActionEnum;
import com.raincat.core.compensation.TxCompensationService;
import com.raincat.core.disruptor.event.TxTransactionEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.Executor;
/**
* Disroptor handler.
* disruptor handler.
*
* @author xiaoyu(Myth)
*/
@Component
public class TxTransactionEventHandler implements EventHandler<TxTransactionEvent> {
public class TxTransactionEventHandler implements WorkHandler<TxTransactionEvent> {
@Autowired
private TxCompensationService txCompensationService;
private final TxCompensationService txCompensationService;
private final Executor executor;
public TxTransactionEventHandler(final Executor executor, final TxCompensationService txCompensationService) {
this.executor = executor;
this.txCompensationService = txCompensationService;
}
@Override
public void onEvent(final TxTransactionEvent txTransactionEvent, final long sequence, final boolean endOfBatch) {
if (txTransactionEvent.getType() == CompensationActionEnum.SAVE.getCode()) {
txCompensationService.save(txTransactionEvent.getTransactionRecover());
} else if (txTransactionEvent.getType() == CompensationActionEnum.DELETE.getCode()) {
txCompensationService.remove(txTransactionEvent.getTransactionRecover().getId());
} else if (txTransactionEvent.getType() == CompensationActionEnum.UPDATE.getCode()) {
txCompensationService.update(txTransactionEvent.getTransactionRecover());
} else if (txTransactionEvent.getType() == CompensationActionEnum.COMPENSATE.getCode()) {
txCompensationService.compensation(txTransactionEvent.getTransactionRecover());
}
txTransactionEvent.clear();
public void onEvent(final TxTransactionEvent txTransactionEvent) {
executor.execute(() -> {
if (txTransactionEvent.getType() == CompensationActionEnum.SAVE.getCode()) {
txCompensationService.save(txTransactionEvent.getTransactionRecover());
} else if (txTransactionEvent.getType() == CompensationActionEnum.DELETE.getCode()) {
txCompensationService.remove(txTransactionEvent.getTransactionRecover().getId());
} else if (txTransactionEvent.getType() == CompensationActionEnum.UPDATE.getCode()) {
txCompensationService.update(txTransactionEvent.getTransactionRecover());
} else if (txTransactionEvent.getType() == CompensationActionEnum.COMPENSATE.getCode()) {
txCompensationService.compensation(txTransactionEvent.getTransactionRecover());
}
txTransactionEvent.clear();
});
}
}

View File

@ -19,21 +19,19 @@
package com.raincat.core.disruptor.publisher;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.raincat.common.bean.TransactionRecover;
import com.raincat.common.enums.CompensationActionEnum;
import com.raincat.common.holder.LogUtil;
import com.raincat.core.compensation.TxCompensationService;
import com.raincat.core.concurrent.threadpool.TxTransactionThreadFactory;
import com.raincat.core.disruptor.event.TxTransactionEvent;
import com.raincat.core.disruptor.factory.TxTransactionEventFactory;
import com.raincat.core.disruptor.handler.TxTransactionEventHandler;
import com.raincat.core.disruptor.translator.TxTransactionEventTranslator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -52,19 +50,15 @@ import java.util.concurrent.atomic.AtomicInteger;
@Component
public class TxTransactionEventPublisher implements DisposableBean {
private static final Logger LOGGER = LoggerFactory.getLogger(TxTransactionEventPublisher.class);
private static final int MAX_THREAD = Runtime.getRuntime().availableProcessors() << 1;
private Executor executor;
private Disruptor<TxTransactionEvent> disruptor;
private final TxTransactionEventHandler txTransactionEventHandler;
private final TxCompensationService txCompensationService;
@Autowired
public TxTransactionEventPublisher(TxTransactionEventHandler txTransactionEventHandler) {
this.txTransactionEventHandler = txTransactionEventHandler;
public TxTransactionEventPublisher(final TxCompensationService txCompensationService) {
this.txCompensationService = txCompensationService;
}
/**
@ -76,29 +70,19 @@ public class TxTransactionEventPublisher implements DisposableBean {
disruptor = new Disruptor<>(new TxTransactionEventFactory(), bufferSize, r -> {
AtomicInteger index = new AtomicInteger(1);
return new Thread(null, r, "disruptor-thread-" + index.getAndIncrement());
}, ProducerType.MULTI, new YieldingWaitStrategy());
disruptor.handleEventsWith(txTransactionEventHandler);
disruptor.setDefaultExceptionHandler(new ExceptionHandler<TxTransactionEvent>() {
@Override
public void handleEventException(Throwable ex, long sequence, TxTransactionEvent event) {
LogUtil.error(LOGGER, () -> "Disruptor handleEventException:"
+ event.getType() + event.getTransactionRecover().toString());
}
}, ProducerType.MULTI, new BlockingWaitStrategy());
@Override
public void handleOnStartException(Throwable ex) {
LogUtil.error(LOGGER, () -> "Disruptor start exception");
}
@Override
public void handleOnShutdownException(Throwable ex) {
LogUtil.error(LOGGER, () -> "Disruptor close Exception ");
}
});
executor = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MILLISECONDS,
final Executor executor = new ThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
TxTransactionThreadFactory.create("raincat-log-disruptor", false),
new ThreadPoolExecutor.AbortPolicy());
TxTransactionEventHandler[] consumers = new TxTransactionEventHandler[MAX_THREAD];
for (int i = 0; i < MAX_THREAD; i++) {
consumers[i] = new TxTransactionEventHandler(executor, txCompensationService);
}
disruptor.handleEventsWithWorkerPool(consumers);
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
disruptor.start();
}
@ -109,10 +93,8 @@ public class TxTransactionEventPublisher implements DisposableBean {
* @param type {@linkplain CompensationActionEnum}
*/
public void publishEvent(final TransactionRecover transactionRecover, final int type) {
executor.execute(() -> {
final RingBuffer<TxTransactionEvent> ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent(new TxTransactionEventTranslator(type), transactionRecover);
});
final RingBuffer<TxTransactionEvent> ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent(new TxTransactionEventTranslator(type), transactionRecover);
}
@Override