mirror of
https://gitee.com/dromara/hmily.git
synced 2024-12-03 19:57:40 +08:00
1. 更新disruptor.
This commit is contained in:
parent
7f6425c846
commit
c7eb23166e
@ -0,0 +1,124 @@
|
||||
package org.dromara.hmily.core.concurrent.threadpool;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
|
||||
/**
|
||||
* DelegationThreadPoolExecutor.
|
||||
* Customize thread Pool .
|
||||
*
|
||||
* @author chenbin sixh
|
||||
*/
|
||||
@SuppressWarnings("unused")
|
||||
public class DelegationThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(DelegationThreadPoolExecutor.class);
|
||||
|
||||
/**
|
||||
* Define a thread saturation strategy.
|
||||
*/
|
||||
static final RejectedExecutionHandler HANDLER = (r, executor) -> {
|
||||
((DelegationThreadPoolExecutor) executor).onInitialRejection(r);
|
||||
BlockingQueue<Runnable> queue = executor.getQueue();
|
||||
while (true) {
|
||||
if (executor.isShutdown()) {
|
||||
throw new RejectedExecutionException("DelegationThreadPoolExecutor Closed");
|
||||
}
|
||||
try {
|
||||
if (queue.offer(r, 1000, TimeUnit.MILLISECONDS)) {
|
||||
break;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Instantiates a new Delegation thread pool executor.
|
||||
*
|
||||
* @param corePoolSize the core pool size
|
||||
* @param maximumPoolSize the maximum pool size
|
||||
* @param keepAliveTime the keep alive time
|
||||
* @param unit the unit
|
||||
* @param workQueue the work queue
|
||||
*/
|
||||
public DelegationThreadPoolExecutor(final int corePoolSize,
|
||||
final int maximumPoolSize,
|
||||
final long keepAliveTime,
|
||||
final TimeUnit unit,
|
||||
final BlockingQueue<Runnable> workQueue) {
|
||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new Delegation thread pool executor.
|
||||
*
|
||||
* @param corePoolSize the core pool size
|
||||
* @param maximumPoolSize the maximum pool size
|
||||
* @param keepAliveTime the keep alive time
|
||||
* @param unit the unit
|
||||
* @param workQueue the work queue
|
||||
* @param threadFactory the thread factory
|
||||
*/
|
||||
public DelegationThreadPoolExecutor(final int corePoolSize,
|
||||
final int maximumPoolSize,
|
||||
final long keepAliveTime,
|
||||
final TimeUnit unit,
|
||||
final BlockingQueue<Runnable> workQueue,
|
||||
final ThreadFactory threadFactory) {
|
||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new Delegation thread pool executor.
|
||||
*
|
||||
* @param corePoolSize the core pool size
|
||||
* @param maximumPoolSize the maximum pool size
|
||||
* @param keepAliveTime the keep alive time
|
||||
* @param unit the unit
|
||||
* @param workQueue the work queue
|
||||
* @param handler the handler
|
||||
*/
|
||||
public DelegationThreadPoolExecutor(final int corePoolSize,
|
||||
final int maximumPoolSize,
|
||||
final long keepAliveTime,
|
||||
final TimeUnit unit,
|
||||
final BlockingQueue<Runnable> workQueue,
|
||||
final RejectedExecutionHandler handler) {
|
||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new Delegation thread pool executor.
|
||||
*
|
||||
* @param corePoolSize the core pool size
|
||||
* @param maximumPoolSize the maximum pool size
|
||||
* @param keepAliveTime the keep alive time
|
||||
* @param unit the unit
|
||||
* @param workQueue the work queue
|
||||
* @param threadFactory the thread factory
|
||||
* @param handler the handler
|
||||
*/
|
||||
DelegationThreadPoolExecutor(final int corePoolSize,
|
||||
final int maximumPoolSize,
|
||||
final long keepAliveTime,
|
||||
final TimeUnit unit,
|
||||
final BlockingQueue<Runnable> workQueue,
|
||||
final ThreadFactory threadFactory,
|
||||
final RejectedExecutionHandler handler) {
|
||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* On initial rejection.
|
||||
*
|
||||
* @param runnable the runnable
|
||||
*/
|
||||
void onInitialRejection(final Runnable runnable) {
|
||||
logger.info("DelegationThreadPoolExecutor:thread {} rejection", runnable);
|
||||
}
|
||||
}
|
@ -0,0 +1,67 @@
|
||||
package org.dromara.hmily.core.concurrent.threadpool;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* HmilyThreadPool.
|
||||
* Customize thread Pool .
|
||||
*
|
||||
* @author chenbin sixh
|
||||
*/
|
||||
public final class HmilyThreadPool extends DelegationThreadPoolExecutor {
|
||||
|
||||
/**
|
||||
* Initialize the multi-end thread pool.
|
||||
*
|
||||
* @param coreSize the core size
|
||||
* @param maxSize the max size
|
||||
* @param poolName the pool name
|
||||
*/
|
||||
public HmilyThreadPool(final int coreSize, final int maxSize, final String poolName) {
|
||||
this(coreSize, maxSize, 0L,
|
||||
TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<>(),
|
||||
HmilyThreadFactory.create(poolName, false));
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a thread pool.
|
||||
*
|
||||
* @param poolName name;
|
||||
*/
|
||||
public HmilyThreadPool(final String poolName) {
|
||||
this(1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), HmilyThreadFactory.create(poolName, false));
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a thread pool with the same pool size as the maximum pool size.
|
||||
*
|
||||
* @param corePoolSize corePoolSize
|
||||
* @param keepAliveTime keepAliveTime
|
||||
* @param unit unit
|
||||
* @param workQueue workQueue
|
||||
* @param threadFactory threadFactory
|
||||
* @see java.util.concurrent.ThreadPoolExecutor
|
||||
*/
|
||||
public HmilyThreadPool(final int corePoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
|
||||
this(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a thread pool.
|
||||
*
|
||||
* @param corePoolSize corePoolSize
|
||||
* @param maximumPoolSize maximumPoolSize
|
||||
* @param keepAliveTime keepAliveTime
|
||||
* @param unit unit
|
||||
* @param workQueue workQueue
|
||||
* @param threadFactory workQueue
|
||||
* @see java.util.concurrent.ThreadPoolExecutor
|
||||
*/
|
||||
public HmilyThreadPool(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
|
||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, HANDLER);
|
||||
}
|
||||
}
|
@ -0,0 +1,14 @@
|
||||
package org.dromara.hmily.core.disruptor;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* DataEvent.
|
||||
* disruptor data carrier .
|
||||
* @author chenbin sixh
|
||||
*/
|
||||
@Data
|
||||
public class DataEvent<T> {
|
||||
|
||||
private T t;
|
||||
}
|
@ -0,0 +1,24 @@
|
||||
package org.dromara.hmily.core.disruptor;
|
||||
|
||||
import com.lmax.disruptor.WorkHandler;
|
||||
|
||||
/**
|
||||
* DisruptorConsumer.
|
||||
* disruptor consumer work handler.
|
||||
* @author chenbin sixh
|
||||
*/
|
||||
public class DisruptorConsumer<T> implements WorkHandler<DataEvent<T>> {
|
||||
|
||||
private DisruptorConsumerFactory<T> factory;
|
||||
|
||||
DisruptorConsumer(final DisruptorConsumerFactory<T> factory) {
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(final DataEvent<T> t) {
|
||||
if (t != null) {
|
||||
factory.create().executor(t.getT());
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
package org.dromara.hmily.core.disruptor;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* DisruptorConsumerExecutor.
|
||||
* disruptor consumer executor.
|
||||
*
|
||||
* @param <T> the type parameter
|
||||
* @author chenbin sixh
|
||||
*/
|
||||
public abstract class DisruptorConsumerExecutor<T> {
|
||||
|
||||
/**
|
||||
* Recorded the subscription processing after the user needs to subscribe to the calculation result.
|
||||
*/
|
||||
private Set<ExecutorSubscriber> subscribers = new HashSet<>();
|
||||
|
||||
/**
|
||||
* Add subscribers disruptor consumer executor.
|
||||
*
|
||||
* @param subscriber subscriber;
|
||||
* @return the disruptor consumer executor
|
||||
*/
|
||||
public DisruptorConsumerExecutor addSubscribers(final ExecutorSubscriber subscriber) {
|
||||
subscribers.add(subscriber);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add subscribers disruptor consumer executor.
|
||||
*
|
||||
* @param subscribers the subscribers
|
||||
* @return the disruptor consumer executor
|
||||
*/
|
||||
public DisruptorConsumerExecutor addSubscribers(final Set<ExecutorSubscriber> subscribers) {
|
||||
subscribers.forEach(this::addSubscribers);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets subscribers.
|
||||
*
|
||||
* @return the subscribers
|
||||
*/
|
||||
public Set<ExecutorSubscriber> getSubscribers() {
|
||||
return subscribers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the processing of the current event.
|
||||
*
|
||||
* @param data the data
|
||||
*/
|
||||
public abstract void executor(T data);
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
package org.dromara.hmily.core.disruptor;
|
||||
|
||||
/**
|
||||
* DisruptorConsumerFactory.
|
||||
* Create a subclass implementation object via the {@link #create()} method,
|
||||
* which is called in {@link DisruptorConsumer#onEvent(DataEvent)}.
|
||||
*
|
||||
* @author chenbin sixh
|
||||
*/
|
||||
public interface DisruptorConsumerFactory<T> {
|
||||
|
||||
/**
|
||||
* Fix name string.
|
||||
*
|
||||
* @return the string
|
||||
*/
|
||||
String fixName();
|
||||
|
||||
/**
|
||||
* Create disruptor consumer executor.
|
||||
*
|
||||
* @return the disruptor consumer executor
|
||||
*/
|
||||
DisruptorConsumerExecutor<T> create();
|
||||
}
|
@ -0,0 +1,15 @@
|
||||
package org.dromara.hmily.core.disruptor;
|
||||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
|
||||
/**
|
||||
* DisruptorEventFactory.
|
||||
* disruptor Create a factory implementation of the object.
|
||||
* @author chenbin sixh
|
||||
*/
|
||||
public class DisruptorEventFactory<T> implements EventFactory<DataEvent<T>> {
|
||||
@Override
|
||||
public DataEvent<T> newInstance() {
|
||||
return new DataEvent<>();
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
package org.dromara.hmily.core.disruptor;
|
||||
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* DisruptorProvider.
|
||||
* disruptor provider definition.
|
||||
*
|
||||
* @param <T> the type parameter
|
||||
* @author chenbin sixh
|
||||
*/
|
||||
public class DisruptorProvider<T> {
|
||||
|
||||
private final RingBuffer<DataEvent<T>> ringBuffer;
|
||||
|
||||
/**
|
||||
* The Logger.
|
||||
*/
|
||||
private Logger logger = LoggerFactory.getLogger(DisruptorProvider.class);
|
||||
|
||||
/**
|
||||
* Instantiates a new Disruptor provider.
|
||||
*
|
||||
* @param ringBuffer the ring buffer
|
||||
*/
|
||||
DisruptorProvider(final RingBuffer<DataEvent<T>> ringBuffer) {
|
||||
this.ringBuffer = ringBuffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* push data to disruptor queue.
|
||||
*
|
||||
* @param t the t
|
||||
*/
|
||||
public void onData(final T t) {
|
||||
long position = ringBuffer.next();
|
||||
try {
|
||||
DataEvent<T> de = ringBuffer.get(position);
|
||||
de.setT(t);
|
||||
ringBuffer.publish(position);
|
||||
} catch (Exception ex) {
|
||||
logger.error("push data error:", ex);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,97 @@
|
||||
package org.dromara.hmily.core.disruptor;
|
||||
|
||||
import com.lmax.disruptor.BlockingWaitStrategy;
|
||||
import com.lmax.disruptor.IgnoreExceptionHandler;
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
import org.dromara.hmily.core.concurrent.threadpool.HmilyThreadFactory;
|
||||
|
||||
/**
|
||||
* DisruptorProviderManage.
|
||||
* disruptor provider manager.
|
||||
*
|
||||
* @param <T> the type parameter
|
||||
* @author chenbin sixh
|
||||
*/
|
||||
public class DisruptorProviderManage<T> {
|
||||
|
||||
public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;
|
||||
|
||||
private static final Integer DEFAULT_CONSUMER_SIZE = Runtime.getRuntime().availableProcessors() << 1;
|
||||
|
||||
private final Integer size;
|
||||
|
||||
private DisruptorProvider<T> provider;
|
||||
|
||||
private Integer consumerSize;
|
||||
|
||||
private DisruptorConsumerFactory consumerFactory;
|
||||
|
||||
/**
|
||||
* Instantiates a new Disruptor provider manage.
|
||||
*
|
||||
* @param consumerFactory the consumer factory
|
||||
* @param ringBufferSize the size
|
||||
*/
|
||||
public DisruptorProviderManage(final DisruptorConsumerFactory consumerFactory, final Integer ringBufferSize) {
|
||||
this(consumerFactory,
|
||||
DEFAULT_CONSUMER_SIZE,
|
||||
ringBufferSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new Disruptor provider manage.
|
||||
*
|
||||
* @param consumerFactory the consumer factory
|
||||
*/
|
||||
public DisruptorProviderManage(final DisruptorConsumerFactory consumerFactory) {
|
||||
this(consumerFactory, DEFAULT_CONSUMER_SIZE, DEFAULT_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new Disruptor provider manage.
|
||||
*
|
||||
* @param consumerFactory the consumer factory
|
||||
* @param consumerSize the consumer size
|
||||
* @param ringBufferSize the ringBuffer size
|
||||
*/
|
||||
public DisruptorProviderManage(final DisruptorConsumerFactory consumerFactory,
|
||||
final int consumerSize,
|
||||
final int ringBufferSize) {
|
||||
this.consumerFactory = consumerFactory;
|
||||
this.size = ringBufferSize;
|
||||
this.consumerSize = consumerSize;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* start disruptor.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void startup() {
|
||||
Disruptor<DataEvent<T>> disruptor = new Disruptor<>(new DisruptorEventFactory<>(),
|
||||
size,
|
||||
HmilyThreadFactory.create("disruptor_consumer_" + consumerFactory.fixName(), false),
|
||||
ProducerType.MULTI,
|
||||
new BlockingWaitStrategy());
|
||||
DisruptorConsumer<T>[] consumers = new DisruptorConsumer[consumerSize];
|
||||
for (int i = 0; i < consumerSize; i++) {
|
||||
consumers[i] = new DisruptorConsumer<>(consumerFactory);
|
||||
}
|
||||
disruptor.handleEventsWithWorkerPool(consumers);
|
||||
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
|
||||
disruptor.start();
|
||||
RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
|
||||
provider = new DisruptorProvider<>(ringBuffer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets provider.
|
||||
*
|
||||
* @return the provider
|
||||
*/
|
||||
public DisruptorProvider<T> getProvider() {
|
||||
return provider;
|
||||
}
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
package org.dromara.hmily.core.disruptor;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* The interface Executor subscriber.
|
||||
*
|
||||
* @param <T> the type parameter
|
||||
* @author chenbin sixh
|
||||
*/
|
||||
public interface ExecutorSubscriber<T> {
|
||||
|
||||
/**
|
||||
* Executor.
|
||||
*
|
||||
* @param collections the collections
|
||||
*/
|
||||
void executor(Collection<? extends T> collections);
|
||||
}
|
@ -1,35 +0,0 @@
|
||||
/*
|
||||
*
|
||||
* * Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* * contributor license agreements. See the NOTICE file distributed with
|
||||
* * this work for additional information regarding copyright ownership.
|
||||
* * The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* * (the "License"); you may not use this file except in compliance with
|
||||
* * the License. You may obtain a copy of the License at
|
||||
* *
|
||||
* * http://www.apache.org/licenses/LICENSE-2.0
|
||||
* *
|
||||
* * Unless required by applicable law or agreed to in writing, software
|
||||
* * distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* * See the License for the specific language governing permissions and
|
||||
* * limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.dromara.hmily.core.disruptor.factory;
|
||||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
import org.dromara.hmily.core.disruptor.event.HmilyTransactionEvent;
|
||||
|
||||
/**
|
||||
* TccTransactionEventFactory.
|
||||
* @author xiaoyu(Myth)
|
||||
*/
|
||||
public class HmilyTransactionEventFactory implements EventFactory<HmilyTransactionEvent> {
|
||||
|
||||
@Override
|
||||
public HmilyTransactionEvent newInstance() {
|
||||
return new HmilyTransactionEvent();
|
||||
}
|
||||
}
|
@ -1,10 +1,11 @@
|
||||
package org.dromara.hmily.core.disruptor.handler;
|
||||
|
||||
import com.lmax.disruptor.WorkHandler;
|
||||
import org.dromara.hmily.common.bean.entity.HmilyTransaction;
|
||||
import org.dromara.hmily.common.enums.EventTypeEnum;
|
||||
import org.dromara.hmily.core.concurrent.ConsistentHashSelector;
|
||||
import org.dromara.hmily.core.coordinator.HmilyCoordinatorService;
|
||||
import org.dromara.hmily.core.disruptor.DisruptorConsumerExecutor;
|
||||
import org.dromara.hmily.core.disruptor.DisruptorConsumerFactory;
|
||||
import org.dromara.hmily.core.disruptor.event.HmilyTransactionEvent;
|
||||
|
||||
/**
|
||||
@ -12,7 +13,7 @@ import org.dromara.hmily.core.disruptor.event.HmilyTransactionEvent;
|
||||
*
|
||||
* @author xiaoyu(Myth)
|
||||
*/
|
||||
public class HmilyConsumerDataHandler implements WorkHandler<HmilyTransactionEvent> {
|
||||
public class HmilyConsumerDataHandler extends DisruptorConsumerExecutor<HmilyTransactionEvent> implements DisruptorConsumerFactory {
|
||||
|
||||
private ConsistentHashSelector executor;
|
||||
|
||||
@ -24,7 +25,17 @@ public class HmilyConsumerDataHandler implements WorkHandler<HmilyTransactionEve
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(final HmilyTransactionEvent event) {
|
||||
public String fixName() {
|
||||
return "HmilyConsumerDataHandler";
|
||||
}
|
||||
|
||||
@Override
|
||||
public DisruptorConsumerExecutor create() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executor(final HmilyTransactionEvent event) {
|
||||
String transId = event.getHmilyTransaction().getTransId();
|
||||
executor.select(transId).execute(() -> {
|
||||
if (event.getType() == EventTypeEnum.SAVE.getCode()) {
|
||||
|
@ -0,0 +1,31 @@
|
||||
package org.dromara.hmily.core.disruptor.handler;
|
||||
|
||||
import org.dromara.hmily.core.disruptor.DisruptorConsumerExecutor;
|
||||
import org.dromara.hmily.core.disruptor.DisruptorConsumerFactory;
|
||||
import org.dromara.hmily.core.service.handler.TransactionHandlerAlbum;
|
||||
|
||||
/**
|
||||
* HmilyTransactionHandler.
|
||||
* About the processing of a rotation function.
|
||||
*
|
||||
* @author chenbin sixh
|
||||
*/
|
||||
public class HmilyConsumerTransactionDataHandler extends DisruptorConsumerExecutor<TransactionHandlerAlbum> implements DisruptorConsumerFactory {
|
||||
|
||||
|
||||
@Override
|
||||
public String fixName() {
|
||||
return "HmilyConsumerTransactionDataHandler";
|
||||
}
|
||||
|
||||
@Override
|
||||
public DisruptorConsumerExecutor create() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executor(final TransactionHandlerAlbum data) {
|
||||
data.run();
|
||||
}
|
||||
}
|
||||
|
@ -19,21 +19,15 @@
|
||||
|
||||
package org.dromara.hmily.core.disruptor.publisher;
|
||||
|
||||
import com.lmax.disruptor.BlockingWaitStrategy;
|
||||
import com.lmax.disruptor.IgnoreExceptionHandler;
|
||||
import com.lmax.disruptor.RingBuffer;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
import org.dromara.hmily.common.bean.entity.HmilyTransaction;
|
||||
import org.dromara.hmily.common.config.HmilyConfig;
|
||||
import org.dromara.hmily.common.enums.EventTypeEnum;
|
||||
import org.dromara.hmily.core.concurrent.ConsistentHashSelector;
|
||||
import org.dromara.hmily.core.concurrent.SingletonExecutor;
|
||||
import org.dromara.hmily.core.coordinator.HmilyCoordinatorService;
|
||||
import org.dromara.hmily.core.disruptor.DisruptorProviderManage;
|
||||
import org.dromara.hmily.core.disruptor.event.HmilyTransactionEvent;
|
||||
import org.dromara.hmily.core.disruptor.factory.HmilyTransactionEventFactory;
|
||||
import org.dromara.hmily.core.disruptor.handler.HmilyConsumerDataHandler;
|
||||
import org.dromara.hmily.core.disruptor.translator.HmilyTransactionEventTranslator;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
@ -54,7 +48,7 @@ public class HmilyTransactionEventPublisher implements DisposableBean, Applicati
|
||||
|
||||
private static final AtomicLong INDEX = new AtomicLong(1);
|
||||
|
||||
private Disruptor<HmilyTransactionEvent> disruptor;
|
||||
private DisruptorProviderManage<HmilyTransactionEvent> disruptorProviderManage;
|
||||
|
||||
private final HmilyCoordinatorService coordinatorService;
|
||||
|
||||
@ -74,20 +68,13 @@ public class HmilyTransactionEventPublisher implements DisposableBean, Applicati
|
||||
* @param threadSize this is disruptor consumer thread size.
|
||||
*/
|
||||
private void start(final int bufferSize, final int threadSize) {
|
||||
disruptor = new Disruptor<>(new HmilyTransactionEventFactory(), bufferSize, runnable -> {
|
||||
return new Thread(new ThreadGroup("hmily-disruptor"), runnable,
|
||||
"disruptor-thread-" + INDEX.getAndIncrement());
|
||||
}, ProducerType.MULTI, new BlockingWaitStrategy());
|
||||
HmilyConsumerDataHandler[] consumers = new HmilyConsumerDataHandler[1];
|
||||
List<SingletonExecutor> selects = new ArrayList<>();
|
||||
for (int i = 0; i < threadSize; i++) {
|
||||
selects.add(new SingletonExecutor("hmily-log-disruptor" + i));
|
||||
}
|
||||
ConsistentHashSelector selector = new ConsistentHashSelector(selects);
|
||||
consumers[0] = new HmilyConsumerDataHandler(selector, coordinatorService);
|
||||
disruptor.handleEventsWithWorkerPool(consumers);
|
||||
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
|
||||
disruptor.start();
|
||||
disruptorProviderManage = new DisruptorProviderManage<>(new HmilyConsumerDataHandler(selector, coordinatorService), 1, bufferSize);
|
||||
disruptorProviderManage.startup();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -97,13 +84,15 @@ public class HmilyTransactionEventPublisher implements DisposableBean, Applicati
|
||||
* @param type {@linkplain EventTypeEnum}
|
||||
*/
|
||||
public void publishEvent(final HmilyTransaction hmilyTransaction, final int type) {
|
||||
final RingBuffer<HmilyTransactionEvent> ringBuffer = disruptor.getRingBuffer();
|
||||
ringBuffer.publishEvent(new HmilyTransactionEventTranslator(type), hmilyTransaction);
|
||||
HmilyTransactionEvent event = new HmilyTransactionEvent();
|
||||
event.setType(type);
|
||||
event.setHmilyTransaction(hmilyTransaction);
|
||||
disruptorProviderManage.getProvider().onData(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
disruptor.shutdown();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1,43 +0,0 @@
|
||||
/*
|
||||
*
|
||||
* * Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* * contributor license agreements. See the NOTICE file distributed with
|
||||
* * this work for additional information regarding copyright ownership.
|
||||
* * The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* * (the "License"); you may not use this file except in compliance with
|
||||
* * the License. You may obtain a copy of the License at
|
||||
* *
|
||||
* * http://www.apache.org/licenses/LICENSE-2.0
|
||||
* *
|
||||
* * Unless required by applicable law or agreed to in writing, software
|
||||
* * distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* * See the License for the specific language governing permissions and
|
||||
* * limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.dromara.hmily.core.disruptor.translator;
|
||||
|
||||
import com.lmax.disruptor.EventTranslatorOneArg;
|
||||
import org.dromara.hmily.common.bean.entity.HmilyTransaction;
|
||||
import org.dromara.hmily.core.disruptor.event.HmilyTransactionEvent;
|
||||
|
||||
/**
|
||||
* EventTranslator.
|
||||
* @author xiaoyu(Myth)
|
||||
*/
|
||||
public class HmilyTransactionEventTranslator implements EventTranslatorOneArg<HmilyTransactionEvent, HmilyTransaction> {
|
||||
|
||||
private int type;
|
||||
|
||||
public HmilyTransactionEventTranslator(final int type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void translateTo(final HmilyTransactionEvent hmilyTransactionEvent, final long l, final HmilyTransaction hmilyTransaction) {
|
||||
hmilyTransactionEvent.setHmilyTransaction(hmilyTransaction);
|
||||
hmilyTransactionEvent.setType(type);
|
||||
}
|
||||
}
|
@ -23,7 +23,8 @@ import org.dromara.hmily.common.bean.entity.HmilyTransaction;
|
||||
import org.dromara.hmily.common.config.HmilyConfig;
|
||||
import org.dromara.hmily.common.enums.HmilyActionEnum;
|
||||
import org.dromara.hmily.core.concurrent.threadlocal.HmilyTransactionContextLocal;
|
||||
import org.dromara.hmily.core.concurrent.threadpool.HmilyThreadFactory;
|
||||
import org.dromara.hmily.core.disruptor.DisruptorProviderManage;
|
||||
import org.dromara.hmily.core.disruptor.handler.HmilyConsumerTransactionDataHandler;
|
||||
import org.dromara.hmily.core.service.HmilyTransactionHandler;
|
||||
import org.dromara.hmily.core.service.executor.HmilyTransactionExecutor;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@ -31,11 +32,6 @@ import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.event.ContextRefreshedEvent;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* this is hmily transaction starter.
|
||||
*
|
||||
@ -46,10 +42,10 @@ public class StarterHmilyTransactionHandler implements HmilyTransactionHandler,
|
||||
|
||||
private final HmilyTransactionExecutor hmilyTransactionExecutor;
|
||||
|
||||
private Executor executor;
|
||||
|
||||
private final HmilyConfig hmilyConfig;
|
||||
|
||||
private DisruptorProviderManage<TransactionHandlerAlbum> disruptorProviderManage;
|
||||
|
||||
/**
|
||||
* Instantiates a new Starter hmily transaction handler.
|
||||
*
|
||||
@ -57,7 +53,7 @@ public class StarterHmilyTransactionHandler implements HmilyTransactionHandler,
|
||||
* @param hmilyConfig the hmily config
|
||||
*/
|
||||
@Autowired
|
||||
public StarterHmilyTransactionHandler(final HmilyTransactionExecutor hmilyTransactionExecutor, HmilyConfig hmilyConfig) {
|
||||
public StarterHmilyTransactionHandler(final HmilyTransactionExecutor hmilyTransactionExecutor, final HmilyConfig hmilyConfig) {
|
||||
this.hmilyTransactionExecutor = hmilyTransactionExecutor;
|
||||
this.hmilyConfig = hmilyConfig;
|
||||
}
|
||||
@ -76,13 +72,12 @@ public class StarterHmilyTransactionHandler implements HmilyTransactionHandler,
|
||||
} catch (Throwable throwable) {
|
||||
//if exception ,execute cancel
|
||||
final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
|
||||
executor.execute(() -> hmilyTransactionExecutor
|
||||
.cancel(currentTransaction));
|
||||
disruptorProviderManage.getProvider().onData(() -> hmilyTransactionExecutor.cancel(currentTransaction));
|
||||
throw throwable;
|
||||
}
|
||||
//execute confirm
|
||||
final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
|
||||
executor.execute(() -> hmilyTransactionExecutor.confirm(currentTransaction));
|
||||
disruptorProviderManage.getProvider().onData(() -> hmilyTransactionExecutor.confirm(currentTransaction));
|
||||
} finally {
|
||||
HmilyTransactionContextLocal.getInstance().remove();
|
||||
hmilyTransactionExecutor.remove();
|
||||
@ -91,13 +86,12 @@ public class StarterHmilyTransactionHandler implements HmilyTransactionHandler,
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ContextRefreshedEvent event) {
|
||||
public void onApplicationEvent(final ContextRefreshedEvent event) {
|
||||
if (hmilyConfig.getStarted()) {
|
||||
executor = new ThreadPoolExecutor(hmilyConfig.getAsyncThreads(),
|
||||
hmilyConfig.getAsyncThreads(), 0, TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<>(),
|
||||
HmilyThreadFactory.create("hmily-execute", false),
|
||||
new ThreadPoolExecutor.AbortPolicy());
|
||||
disruptorProviderManage = new DisruptorProviderManage<>(new HmilyConsumerTransactionDataHandler(),
|
||||
hmilyConfig.getAsyncThreads(),
|
||||
DisruptorProviderManage.DEFAULT_SIZE);
|
||||
disruptorProviderManage.startup();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,15 @@
|
||||
package org.dromara.hmily.core.service.handler;
|
||||
|
||||
/**
|
||||
* TransactionHandlerAlbum.
|
||||
* 2019/4/12 15:57
|
||||
*
|
||||
* @author chenbin sixh
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface TransactionHandlerAlbum {
|
||||
/**
|
||||
* Run.
|
||||
*/
|
||||
void run();
|
||||
}
|
Loading…
Reference in New Issue
Block a user