From c7eb23166e75321dd8684f7186cffd66c13e1231 Mon Sep 17 00:00:00 2001 From: sixh Date: Fri, 12 Apr 2019 16:19:41 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=20=E6=9B=B4=E6=96=B0disruptor.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DelegationThreadPoolExecutor.java | 124 ++++++++++++++++++ .../threadpool/HmilyThreadPool.java | 67 ++++++++++ .../hmily/core/disruptor/DataEvent.java | 14 ++ .../core/disruptor/DisruptorConsumer.java | 24 ++++ .../disruptor/DisruptorConsumerExecutor.java | 57 ++++++++ .../disruptor/DisruptorConsumerFactory.java | 25 ++++ .../core/disruptor/DisruptorEventFactory.java | 15 +++ .../core/disruptor/DisruptorProvider.java | 47 +++++++ .../disruptor/DisruptorProviderManage.java | 97 ++++++++++++++ .../core/disruptor/ExecutorSubscriber.java | 19 +++ .../factory/HmilyTransactionEventFactory.java | 35 ----- .../handler/HmilyConsumerDataHandler.java | 17 ++- .../HmilyConsumerTransactionDataHandler.java | 31 +++++ .../HmilyTransactionEventPublisher.java | 29 ++-- .../HmilyTransactionEventTranslator.java | 43 ------ .../StarterHmilyTransactionHandler.java | 30 ++--- .../handler/TransactionHandlerAlbum.java | 15 +++ 17 files changed, 570 insertions(+), 119 deletions(-) create mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/concurrent/threadpool/DelegationThreadPoolExecutor.java create mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/concurrent/threadpool/HmilyThreadPool.java create mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DataEvent.java create mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorConsumer.java create mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorConsumerExecutor.java create mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorConsumerFactory.java create mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorEventFactory.java create mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorProvider.java create mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorProviderManage.java create mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/disruptor/ExecutorSubscriber.java delete mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/disruptor/factory/HmilyTransactionEventFactory.java create mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/disruptor/handler/HmilyConsumerTransactionDataHandler.java delete mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/disruptor/translator/HmilyTransactionEventTranslator.java create mode 100644 hmily-core/src/main/java/org/dromara/hmily/core/service/handler/TransactionHandlerAlbum.java diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/concurrent/threadpool/DelegationThreadPoolExecutor.java b/hmily-core/src/main/java/org/dromara/hmily/core/concurrent/threadpool/DelegationThreadPoolExecutor.java new file mode 100644 index 00000000..062e0855 --- /dev/null +++ b/hmily-core/src/main/java/org/dromara/hmily/core/concurrent/threadpool/DelegationThreadPoolExecutor.java @@ -0,0 +1,124 @@ +package org.dromara.hmily.core.concurrent.threadpool; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.*; + +/** + * DelegationThreadPoolExecutor. + * Customize thread Pool . + * + * @author chenbin sixh + */ +@SuppressWarnings("unused") +public class DelegationThreadPoolExecutor extends ThreadPoolExecutor { + + private final Logger logger = LoggerFactory.getLogger(DelegationThreadPoolExecutor.class); + + /** + * Define a thread saturation strategy. + */ + static final RejectedExecutionHandler HANDLER = (r, executor) -> { + ((DelegationThreadPoolExecutor) executor).onInitialRejection(r); + BlockingQueue queue = executor.getQueue(); + while (true) { + if (executor.isShutdown()) { + throw new RejectedExecutionException("DelegationThreadPoolExecutor Closed"); + } + try { + if (queue.offer(r, 1000, TimeUnit.MILLISECONDS)) { + break; + } + } catch (InterruptedException e) { + throw new AssertionError(e); + } + + } + }; + + /** + * Instantiates a new Delegation thread pool executor. + * + * @param corePoolSize the core pool size + * @param maximumPoolSize the maximum pool size + * @param keepAliveTime the keep alive time + * @param unit the unit + * @param workQueue the work queue + */ + public DelegationThreadPoolExecutor(final int corePoolSize, + final int maximumPoolSize, + final long keepAliveTime, + final TimeUnit unit, + final BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + /** + * Instantiates a new Delegation thread pool executor. + * + * @param corePoolSize the core pool size + * @param maximumPoolSize the maximum pool size + * @param keepAliveTime the keep alive time + * @param unit the unit + * @param workQueue the work queue + * @param threadFactory the thread factory + */ + public DelegationThreadPoolExecutor(final int corePoolSize, + final int maximumPoolSize, + final long keepAliveTime, + final TimeUnit unit, + final BlockingQueue workQueue, + final ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + /** + * Instantiates a new Delegation thread pool executor. + * + * @param corePoolSize the core pool size + * @param maximumPoolSize the maximum pool size + * @param keepAliveTime the keep alive time + * @param unit the unit + * @param workQueue the work queue + * @param handler the handler + */ + public DelegationThreadPoolExecutor(final int corePoolSize, + final int maximumPoolSize, + final long keepAliveTime, + final TimeUnit unit, + final BlockingQueue workQueue, + final RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + /** + * Instantiates a new Delegation thread pool executor. + * + * @param corePoolSize the core pool size + * @param maximumPoolSize the maximum pool size + * @param keepAliveTime the keep alive time + * @param unit the unit + * @param workQueue the work queue + * @param threadFactory the thread factory + * @param handler the handler + */ + DelegationThreadPoolExecutor(final int corePoolSize, + final int maximumPoolSize, + final long keepAliveTime, + final TimeUnit unit, + final BlockingQueue workQueue, + final ThreadFactory threadFactory, + final RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + /** + * On initial rejection. + * + * @param runnable the runnable + */ + void onInitialRejection(final Runnable runnable) { + logger.info("DelegationThreadPoolExecutor:thread {} rejection", runnable); + } +} diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/concurrent/threadpool/HmilyThreadPool.java b/hmily-core/src/main/java/org/dromara/hmily/core/concurrent/threadpool/HmilyThreadPool.java new file mode 100644 index 00000000..6d78b4ec --- /dev/null +++ b/hmily-core/src/main/java/org/dromara/hmily/core/concurrent/threadpool/HmilyThreadPool.java @@ -0,0 +1,67 @@ +package org.dromara.hmily.core.concurrent.threadpool; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * HmilyThreadPool. + * Customize thread Pool . + * + * @author chenbin sixh + */ +public final class HmilyThreadPool extends DelegationThreadPoolExecutor { + + /** + * Initialize the multi-end thread pool. + * + * @param coreSize the core size + * @param maxSize the max size + * @param poolName the pool name + */ + public HmilyThreadPool(final int coreSize, final int maxSize, final String poolName) { + this(coreSize, maxSize, 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + HmilyThreadFactory.create(poolName, false)); + } + + /** + * Initialize a thread pool. + * + * @param poolName name; + */ + public HmilyThreadPool(final String poolName) { + this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), HmilyThreadFactory.create(poolName, false)); + } + + /** + * Initialize a thread pool with the same pool size as the maximum pool size. + * + * @param corePoolSize corePoolSize + * @param keepAliveTime keepAliveTime + * @param unit unit + * @param workQueue workQueue + * @param threadFactory threadFactory + * @see java.util.concurrent.ThreadPoolExecutor + */ + public HmilyThreadPool(final int corePoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue workQueue, final ThreadFactory threadFactory) { + this(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + /** + * Initialize a thread pool. + * + * @param corePoolSize corePoolSize + * @param maximumPoolSize maximumPoolSize + * @param keepAliveTime keepAliveTime + * @param unit unit + * @param workQueue workQueue + * @param threadFactory workQueue + * @see java.util.concurrent.ThreadPoolExecutor + */ + public HmilyThreadPool(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue workQueue, final ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, HANDLER); + } +} diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DataEvent.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DataEvent.java new file mode 100644 index 00000000..a617a19c --- /dev/null +++ b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DataEvent.java @@ -0,0 +1,14 @@ +package org.dromara.hmily.core.disruptor; + +import lombok.Data; + +/** + * DataEvent. + * disruptor data carrier . + * @author chenbin sixh + */ +@Data +public class DataEvent { + + private T t; +} diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorConsumer.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorConsumer.java new file mode 100644 index 00000000..a7d7467e --- /dev/null +++ b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorConsumer.java @@ -0,0 +1,24 @@ +package org.dromara.hmily.core.disruptor; + +import com.lmax.disruptor.WorkHandler; + +/** + * DisruptorConsumer. + * disruptor consumer work handler. + * @author chenbin sixh + */ +public class DisruptorConsumer implements WorkHandler> { + + private DisruptorConsumerFactory factory; + + DisruptorConsumer(final DisruptorConsumerFactory factory) { + this.factory = factory; + } + + @Override + public void onEvent(final DataEvent t) { + if (t != null) { + factory.create().executor(t.getT()); + } + } +} diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorConsumerExecutor.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorConsumerExecutor.java new file mode 100644 index 00000000..cc942912 --- /dev/null +++ b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorConsumerExecutor.java @@ -0,0 +1,57 @@ +package org.dromara.hmily.core.disruptor; + +import java.util.HashSet; +import java.util.Set; + +/** + * DisruptorConsumerExecutor. + * disruptor consumer executor. + * + * @param the type parameter + * @author chenbin sixh + */ +public abstract class DisruptorConsumerExecutor { + + /** + * Recorded the subscription processing after the user needs to subscribe to the calculation result. + */ + private Set subscribers = new HashSet<>(); + + /** + * Add subscribers disruptor consumer executor. + * + * @param subscriber subscriber; + * @return the disruptor consumer executor + */ + public DisruptorConsumerExecutor addSubscribers(final ExecutorSubscriber subscriber) { + subscribers.add(subscriber); + return this; + } + + /** + * Add subscribers disruptor consumer executor. + * + * @param subscribers the subscribers + * @return the disruptor consumer executor + */ + public DisruptorConsumerExecutor addSubscribers(final Set subscribers) { + subscribers.forEach(this::addSubscribers); + return this; + } + + /** + * Gets subscribers. + * + * @return the subscribers + */ + public Set getSubscribers() { + return subscribers; + } + + /** + * Perform the processing of the current event. + * + * @param data the data + */ + public abstract void executor(T data); +} diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorConsumerFactory.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorConsumerFactory.java new file mode 100644 index 00000000..35a29285 --- /dev/null +++ b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorConsumerFactory.java @@ -0,0 +1,25 @@ +package org.dromara.hmily.core.disruptor; + +/** + * DisruptorConsumerFactory. + * Create a subclass implementation object via the {@link #create()} method, + * which is called in {@link DisruptorConsumer#onEvent(DataEvent)}. + * + * @author chenbin sixh + */ +public interface DisruptorConsumerFactory { + + /** + * Fix name string. + * + * @return the string + */ + String fixName(); + + /** + * Create disruptor consumer executor. + * + * @return the disruptor consumer executor + */ + DisruptorConsumerExecutor create(); +} diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorEventFactory.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorEventFactory.java new file mode 100644 index 00000000..5a2a516d --- /dev/null +++ b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorEventFactory.java @@ -0,0 +1,15 @@ +package org.dromara.hmily.core.disruptor; + +import com.lmax.disruptor.EventFactory; + +/** + * DisruptorEventFactory. + * disruptor Create a factory implementation of the object. + * @author chenbin sixh + */ +public class DisruptorEventFactory implements EventFactory> { + @Override + public DataEvent newInstance() { + return new DataEvent<>(); + } +} diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorProvider.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorProvider.java new file mode 100644 index 00000000..e4e1e683 --- /dev/null +++ b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorProvider.java @@ -0,0 +1,47 @@ +package org.dromara.hmily.core.disruptor; + +import com.lmax.disruptor.RingBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DisruptorProvider. + * disruptor provider definition. + * + * @param the type parameter + * @author chenbin sixh + */ +public class DisruptorProvider { + + private final RingBuffer> ringBuffer; + + /** + * The Logger. + */ + private Logger logger = LoggerFactory.getLogger(DisruptorProvider.class); + + /** + * Instantiates a new Disruptor provider. + * + * @param ringBuffer the ring buffer + */ + DisruptorProvider(final RingBuffer> ringBuffer) { + this.ringBuffer = ringBuffer; + } + + /** + * push data to disruptor queue. + * + * @param t the t + */ + public void onData(final T t) { + long position = ringBuffer.next(); + try { + DataEvent de = ringBuffer.get(position); + de.setT(t); + ringBuffer.publish(position); + } catch (Exception ex) { + logger.error("push data error:", ex); + } + } +} diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorProviderManage.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorProviderManage.java new file mode 100644 index 00000000..eb87b4f0 --- /dev/null +++ b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/DisruptorProviderManage.java @@ -0,0 +1,97 @@ +package org.dromara.hmily.core.disruptor; + +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.IgnoreExceptionHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import org.dromara.hmily.core.concurrent.threadpool.HmilyThreadFactory; + +/** + * DisruptorProviderManage. + * disruptor provider manager. + * + * @param the type parameter + * @author chenbin sixh + */ +public class DisruptorProviderManage { + + public static final Integer DEFAULT_SIZE = 4096 << 1 << 1; + + private static final Integer DEFAULT_CONSUMER_SIZE = Runtime.getRuntime().availableProcessors() << 1; + + private final Integer size; + + private DisruptorProvider provider; + + private Integer consumerSize; + + private DisruptorConsumerFactory consumerFactory; + + /** + * Instantiates a new Disruptor provider manage. + * + * @param consumerFactory the consumer factory + * @param ringBufferSize the size + */ + public DisruptorProviderManage(final DisruptorConsumerFactory consumerFactory, final Integer ringBufferSize) { + this(consumerFactory, + DEFAULT_CONSUMER_SIZE, + ringBufferSize); + } + + /** + * Instantiates a new Disruptor provider manage. + * + * @param consumerFactory the consumer factory + */ + public DisruptorProviderManage(final DisruptorConsumerFactory consumerFactory) { + this(consumerFactory, DEFAULT_CONSUMER_SIZE, DEFAULT_SIZE); + } + + /** + * Instantiates a new Disruptor provider manage. + * + * @param consumerFactory the consumer factory + * @param consumerSize the consumer size + * @param ringBufferSize the ringBuffer size + */ + public DisruptorProviderManage(final DisruptorConsumerFactory consumerFactory, + final int consumerSize, + final int ringBufferSize) { + this.consumerFactory = consumerFactory; + this.size = ringBufferSize; + this.consumerSize = consumerSize; + + } + + /** + * start disruptor. + */ + @SuppressWarnings("unchecked") + public void startup() { + Disruptor> disruptor = new Disruptor<>(new DisruptorEventFactory<>(), + size, + HmilyThreadFactory.create("disruptor_consumer_" + consumerFactory.fixName(), false), + ProducerType.MULTI, + new BlockingWaitStrategy()); + DisruptorConsumer[] consumers = new DisruptorConsumer[consumerSize]; + for (int i = 0; i < consumerSize; i++) { + consumers[i] = new DisruptorConsumer<>(consumerFactory); + } + disruptor.handleEventsWithWorkerPool(consumers); + disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler()); + disruptor.start(); + RingBuffer> ringBuffer = disruptor.getRingBuffer(); + provider = new DisruptorProvider<>(ringBuffer); + } + + /** + * Gets provider. + * + * @return the provider + */ + public DisruptorProvider getProvider() { + return provider; + } +} diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/ExecutorSubscriber.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/ExecutorSubscriber.java new file mode 100644 index 00000000..16dba648 --- /dev/null +++ b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/ExecutorSubscriber.java @@ -0,0 +1,19 @@ +package org.dromara.hmily.core.disruptor; + +import java.util.Collection; + +/** + * The interface Executor subscriber. + * + * @param the type parameter + * @author chenbin sixh + */ +public interface ExecutorSubscriber { + + /** + * Executor. + * + * @param collections the collections + */ + void executor(Collection collections); +} diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/factory/HmilyTransactionEventFactory.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/factory/HmilyTransactionEventFactory.java deleted file mode 100644 index d07274f3..00000000 --- a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/factory/HmilyTransactionEventFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.dromara.hmily.core.disruptor.factory; - -import com.lmax.disruptor.EventFactory; -import org.dromara.hmily.core.disruptor.event.HmilyTransactionEvent; - -/** - * TccTransactionEventFactory. - * @author xiaoyu(Myth) - */ -public class HmilyTransactionEventFactory implements EventFactory { - - @Override - public HmilyTransactionEvent newInstance() { - return new HmilyTransactionEvent(); - } -} diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/handler/HmilyConsumerDataHandler.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/handler/HmilyConsumerDataHandler.java index 94f700b1..8137f26e 100644 --- a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/handler/HmilyConsumerDataHandler.java +++ b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/handler/HmilyConsumerDataHandler.java @@ -1,10 +1,11 @@ package org.dromara.hmily.core.disruptor.handler; -import com.lmax.disruptor.WorkHandler; import org.dromara.hmily.common.bean.entity.HmilyTransaction; import org.dromara.hmily.common.enums.EventTypeEnum; import org.dromara.hmily.core.concurrent.ConsistentHashSelector; import org.dromara.hmily.core.coordinator.HmilyCoordinatorService; +import org.dromara.hmily.core.disruptor.DisruptorConsumerExecutor; +import org.dromara.hmily.core.disruptor.DisruptorConsumerFactory; import org.dromara.hmily.core.disruptor.event.HmilyTransactionEvent; /** @@ -12,7 +13,7 @@ import org.dromara.hmily.core.disruptor.event.HmilyTransactionEvent; * * @author xiaoyu(Myth) */ -public class HmilyConsumerDataHandler implements WorkHandler { +public class HmilyConsumerDataHandler extends DisruptorConsumerExecutor implements DisruptorConsumerFactory { private ConsistentHashSelector executor; @@ -24,7 +25,17 @@ public class HmilyConsumerDataHandler implements WorkHandler { if (event.getType() == EventTypeEnum.SAVE.getCode()) { diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/handler/HmilyConsumerTransactionDataHandler.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/handler/HmilyConsumerTransactionDataHandler.java new file mode 100644 index 00000000..76647cc8 --- /dev/null +++ b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/handler/HmilyConsumerTransactionDataHandler.java @@ -0,0 +1,31 @@ +package org.dromara.hmily.core.disruptor.handler; + +import org.dromara.hmily.core.disruptor.DisruptorConsumerExecutor; +import org.dromara.hmily.core.disruptor.DisruptorConsumerFactory; +import org.dromara.hmily.core.service.handler.TransactionHandlerAlbum; + +/** + * HmilyTransactionHandler. + * About the processing of a rotation function. + * + * @author chenbin sixh + */ +public class HmilyConsumerTransactionDataHandler extends DisruptorConsumerExecutor implements DisruptorConsumerFactory { + + + @Override + public String fixName() { + return "HmilyConsumerTransactionDataHandler"; + } + + @Override + public DisruptorConsumerExecutor create() { + return this; + } + + @Override + public void executor(final TransactionHandlerAlbum data) { + data.run(); + } +} + diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/publisher/HmilyTransactionEventPublisher.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/publisher/HmilyTransactionEventPublisher.java index fb0c60cc..f58f8bf5 100644 --- a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/publisher/HmilyTransactionEventPublisher.java +++ b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/publisher/HmilyTransactionEventPublisher.java @@ -19,21 +19,15 @@ package org.dromara.hmily.core.disruptor.publisher; -import com.lmax.disruptor.BlockingWaitStrategy; -import com.lmax.disruptor.IgnoreExceptionHandler; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; import org.dromara.hmily.common.bean.entity.HmilyTransaction; import org.dromara.hmily.common.config.HmilyConfig; import org.dromara.hmily.common.enums.EventTypeEnum; import org.dromara.hmily.core.concurrent.ConsistentHashSelector; import org.dromara.hmily.core.concurrent.SingletonExecutor; import org.dromara.hmily.core.coordinator.HmilyCoordinatorService; +import org.dromara.hmily.core.disruptor.DisruptorProviderManage; import org.dromara.hmily.core.disruptor.event.HmilyTransactionEvent; -import org.dromara.hmily.core.disruptor.factory.HmilyTransactionEventFactory; import org.dromara.hmily.core.disruptor.handler.HmilyConsumerDataHandler; -import org.dromara.hmily.core.disruptor.translator.HmilyTransactionEventTranslator; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; @@ -54,7 +48,7 @@ public class HmilyTransactionEventPublisher implements DisposableBean, Applicati private static final AtomicLong INDEX = new AtomicLong(1); - private Disruptor disruptor; + private DisruptorProviderManage disruptorProviderManage; private final HmilyCoordinatorService coordinatorService; @@ -74,20 +68,13 @@ public class HmilyTransactionEventPublisher implements DisposableBean, Applicati * @param threadSize this is disruptor consumer thread size. */ private void start(final int bufferSize, final int threadSize) { - disruptor = new Disruptor<>(new HmilyTransactionEventFactory(), bufferSize, runnable -> { - return new Thread(new ThreadGroup("hmily-disruptor"), runnable, - "disruptor-thread-" + INDEX.getAndIncrement()); - }, ProducerType.MULTI, new BlockingWaitStrategy()); - HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[1]; List selects = new ArrayList<>(); for (int i = 0; i < threadSize; i++) { selects.add(new SingletonExecutor("hmily-log-disruptor" + i)); } ConsistentHashSelector selector = new ConsistentHashSelector(selects); - consumers[0] = new HmilyConsumerDataHandler(selector, coordinatorService); - disruptor.handleEventsWithWorkerPool(consumers); - disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler()); - disruptor.start(); + disruptorProviderManage = new DisruptorProviderManage<>(new HmilyConsumerDataHandler(selector, coordinatorService), 1, bufferSize); + disruptorProviderManage.startup(); } /** @@ -97,13 +84,15 @@ public class HmilyTransactionEventPublisher implements DisposableBean, Applicati * @param type {@linkplain EventTypeEnum} */ public void publishEvent(final HmilyTransaction hmilyTransaction, final int type) { - final RingBuffer ringBuffer = disruptor.getRingBuffer(); - ringBuffer.publishEvent(new HmilyTransactionEventTranslator(type), hmilyTransaction); + HmilyTransactionEvent event = new HmilyTransactionEvent(); + event.setType(type); + event.setHmilyTransaction(hmilyTransaction); + disruptorProviderManage.getProvider().onData(event); } @Override public void destroy() { - disruptor.shutdown(); + } @Override diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/translator/HmilyTransactionEventTranslator.java b/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/translator/HmilyTransactionEventTranslator.java deleted file mode 100644 index 86c82e36..00000000 --- a/hmily-core/src/main/java/org/dromara/hmily/core/disruptor/translator/HmilyTransactionEventTranslator.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.dromara.hmily.core.disruptor.translator; - -import com.lmax.disruptor.EventTranslatorOneArg; -import org.dromara.hmily.common.bean.entity.HmilyTransaction; -import org.dromara.hmily.core.disruptor.event.HmilyTransactionEvent; - -/** - * EventTranslator. - * @author xiaoyu(Myth) - */ -public class HmilyTransactionEventTranslator implements EventTranslatorOneArg { - - private int type; - - public HmilyTransactionEventTranslator(final int type) { - this.type = type; - } - - @Override - public void translateTo(final HmilyTransactionEvent hmilyTransactionEvent, final long l, final HmilyTransaction hmilyTransaction) { - hmilyTransactionEvent.setHmilyTransaction(hmilyTransaction); - hmilyTransactionEvent.setType(type); - } -} diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/service/handler/StarterHmilyTransactionHandler.java b/hmily-core/src/main/java/org/dromara/hmily/core/service/handler/StarterHmilyTransactionHandler.java index 806cf96e..72298624 100644 --- a/hmily-core/src/main/java/org/dromara/hmily/core/service/handler/StarterHmilyTransactionHandler.java +++ b/hmily-core/src/main/java/org/dromara/hmily/core/service/handler/StarterHmilyTransactionHandler.java @@ -23,7 +23,8 @@ import org.dromara.hmily.common.bean.entity.HmilyTransaction; import org.dromara.hmily.common.config.HmilyConfig; import org.dromara.hmily.common.enums.HmilyActionEnum; import org.dromara.hmily.core.concurrent.threadlocal.HmilyTransactionContextLocal; -import org.dromara.hmily.core.concurrent.threadpool.HmilyThreadFactory; +import org.dromara.hmily.core.disruptor.DisruptorProviderManage; +import org.dromara.hmily.core.disruptor.handler.HmilyConsumerTransactionDataHandler; import org.dromara.hmily.core.service.HmilyTransactionHandler; import org.dromara.hmily.core.service.executor.HmilyTransactionExecutor; import org.springframework.beans.factory.annotation.Autowired; @@ -31,11 +32,6 @@ import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - /** * this is hmily transaction starter. * @@ -46,10 +42,10 @@ public class StarterHmilyTransactionHandler implements HmilyTransactionHandler, private final HmilyTransactionExecutor hmilyTransactionExecutor; - private Executor executor; - private final HmilyConfig hmilyConfig; + private DisruptorProviderManage disruptorProviderManage; + /** * Instantiates a new Starter hmily transaction handler. * @@ -57,7 +53,7 @@ public class StarterHmilyTransactionHandler implements HmilyTransactionHandler, * @param hmilyConfig the hmily config */ @Autowired - public StarterHmilyTransactionHandler(final HmilyTransactionExecutor hmilyTransactionExecutor, HmilyConfig hmilyConfig) { + public StarterHmilyTransactionHandler(final HmilyTransactionExecutor hmilyTransactionExecutor, final HmilyConfig hmilyConfig) { this.hmilyTransactionExecutor = hmilyTransactionExecutor; this.hmilyConfig = hmilyConfig; } @@ -76,13 +72,12 @@ public class StarterHmilyTransactionHandler implements HmilyTransactionHandler, } catch (Throwable throwable) { //if exception ,execute cancel final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction(); - executor.execute(() -> hmilyTransactionExecutor - .cancel(currentTransaction)); + disruptorProviderManage.getProvider().onData(() -> hmilyTransactionExecutor.cancel(currentTransaction)); throw throwable; } //execute confirm final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction(); - executor.execute(() -> hmilyTransactionExecutor.confirm(currentTransaction)); + disruptorProviderManage.getProvider().onData(() -> hmilyTransactionExecutor.confirm(currentTransaction)); } finally { HmilyTransactionContextLocal.getInstance().remove(); hmilyTransactionExecutor.remove(); @@ -91,13 +86,12 @@ public class StarterHmilyTransactionHandler implements HmilyTransactionHandler, } @Override - public void onApplicationEvent(ContextRefreshedEvent event) { + public void onApplicationEvent(final ContextRefreshedEvent event) { if (hmilyConfig.getStarted()) { - executor = new ThreadPoolExecutor(hmilyConfig.getAsyncThreads(), - hmilyConfig.getAsyncThreads(), 0, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), - HmilyThreadFactory.create("hmily-execute", false), - new ThreadPoolExecutor.AbortPolicy()); + disruptorProviderManage = new DisruptorProviderManage<>(new HmilyConsumerTransactionDataHandler(), + hmilyConfig.getAsyncThreads(), + DisruptorProviderManage.DEFAULT_SIZE); + disruptorProviderManage.startup(); } } diff --git a/hmily-core/src/main/java/org/dromara/hmily/core/service/handler/TransactionHandlerAlbum.java b/hmily-core/src/main/java/org/dromara/hmily/core/service/handler/TransactionHandlerAlbum.java new file mode 100644 index 00000000..c251c73d --- /dev/null +++ b/hmily-core/src/main/java/org/dromara/hmily/core/service/handler/TransactionHandlerAlbum.java @@ -0,0 +1,15 @@ +package org.dromara.hmily.core.service.handler; + +/** + * TransactionHandlerAlbum. + * 2019/4/12 15:57 + * + * @author chenbin sixh + */ +@FunctionalInterface +public interface TransactionHandlerAlbum { + /** + * Run. + */ + void run(); +}