mirror of
https://gitee.com/dromara/hmily.git
synced 2024-12-02 03:08:12 +08:00
mod tars structure.add ConsistentHashLoadBalance. (#204)
* add tars springboot demo * add tars springboot ide config file * add hmily-tars module * add hmily tars demo * support spring and springboot * mod tars demo * mod tars structure.add ConsistentHashLoadBalance. Co-authored-by: duojiao <yudong.tang@perfma.com>
This commit is contained in:
parent
478d23d283
commit
d9257c057b
@ -0,0 +1,144 @@
|
||||
package org.dromara.hmily.tars.loadbalance;
|
||||
|
||||
import com.qq.tars.client.ServantProxyConfig;
|
||||
import com.qq.tars.client.cluster.ServantInvokerAliveChecker;
|
||||
import com.qq.tars.client.cluster.ServantInvokerAliveStat;
|
||||
import com.qq.tars.client.rpc.InvokerComparator;
|
||||
import com.qq.tars.client.rpc.loadbalance.LoadBalanceHelper;
|
||||
import com.qq.tars.common.util.CollectionUtils;
|
||||
import com.qq.tars.common.util.Constants;
|
||||
import com.qq.tars.common.util.StringUtils;
|
||||
import com.qq.tars.rpc.common.InvokeContext;
|
||||
import com.qq.tars.rpc.common.Invoker;
|
||||
import com.qq.tars.rpc.common.LoadBalance;
|
||||
import com.qq.tars.rpc.common.exc.NoInvokerException;
|
||||
import com.qq.tars.support.log.LoggerFactory;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.SortedMap;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
|
||||
/**
|
||||
* add HmilyConsistentHashLoadBalance.
|
||||
*
|
||||
* @author tydhot
|
||||
*/
|
||||
public class HmilyConsistentHashLoadBalance<T> implements LoadBalance<T> {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getClientLogger();
|
||||
|
||||
private final ServantProxyConfig config;
|
||||
|
||||
private final InvokerComparator comparator = new InvokerComparator();
|
||||
|
||||
private volatile ConcurrentSkipListMap<Long, Invoker<T>> conHashInvokersCache;
|
||||
|
||||
private volatile List<Invoker<T>> sortedInvokersCache;
|
||||
|
||||
public HmilyConsistentHashLoadBalance(final ServantProxyConfig config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
/**
|
||||
* Use load balancing to select invoker.
|
||||
*
|
||||
* @param invocation invocation
|
||||
* @return Invoker
|
||||
* @throws NoInvokerException NoInvokerException
|
||||
*/
|
||||
@Override
|
||||
public Invoker<T> select(final InvokeContext invocation) throws NoInvokerException {
|
||||
long consistentHash = Math.abs(StringUtils.convertLong(invocation.getAttachment(Constants.TARS_CONSISTENT_HASH), 0));
|
||||
consistentHash = consistentHash & 0xFFFFFFFFL;
|
||||
|
||||
ConcurrentSkipListMap<Long, Invoker<T>> conHashInvokers = conHashInvokersCache;
|
||||
if (conHashInvokers != null && !conHashInvokers.isEmpty()) {
|
||||
if (!conHashInvokers.containsKey(consistentHash)) {
|
||||
SortedMap<Long, Invoker<T>> tailMap = conHashInvokers.tailMap(consistentHash);
|
||||
if (tailMap.isEmpty()) {
|
||||
consistentHash = conHashInvokers.firstKey();
|
||||
} else {
|
||||
consistentHash = tailMap.firstKey();
|
||||
}
|
||||
}
|
||||
|
||||
Invoker<T> invoker = conHashInvokers.get(consistentHash);
|
||||
if (invoker.isAvailable()) {
|
||||
return invoker;
|
||||
}
|
||||
|
||||
ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
|
||||
if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis()) {
|
||||
LOGGER.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
|
||||
stat.setLastRetryTime(System.currentTimeMillis());
|
||||
return invoker;
|
||||
}
|
||||
}
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(config.getSimpleObjectName() + " can't find active invoker using consistent hash loadbalance. try to use normal hash");
|
||||
}
|
||||
|
||||
List<Invoker<T>> sortedInvokers = sortedInvokersCache;
|
||||
if (sortedInvokers == null || sortedInvokers.isEmpty()) {
|
||||
throw new NoInvokerException("no such active connection invoker");
|
||||
}
|
||||
|
||||
List<Invoker<T>> list = new ArrayList<Invoker<T>>();
|
||||
for (Invoker<T> invoker : sortedInvokers) {
|
||||
if (!invoker.isAvailable()) {
|
||||
//Shield then call
|
||||
ServantInvokerAliveStat stat = ServantInvokerAliveChecker.get(invoker.getUrl());
|
||||
if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis()) {
|
||||
list.add(invoker);
|
||||
}
|
||||
} else {
|
||||
list.add(invoker);
|
||||
}
|
||||
}
|
||||
//TODO When all is not available. Whether to randomly extract one
|
||||
if (list.isEmpty()) {
|
||||
throw new NoInvokerException(config.getSimpleObjectName() + " try to select active invoker, size=" + sortedInvokers.size() + ", no such active connection invoker");
|
||||
}
|
||||
|
||||
Invoker<T> invoker = list.get((int) (consistentHash % list.size()));
|
||||
|
||||
if (!invoker.isAvailable()) {
|
||||
LOGGER.info("try to use inactive invoker|" + invoker.getUrl().toIdentityString());
|
||||
ServantInvokerAliveChecker.get(invoker.getUrl()).setLastRetryTime(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
return HmilyLoadBalanceUtils.doSelect(invoker, sortedInvokersCache);
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh local invoker.
|
||||
*
|
||||
* @param invokers invokers
|
||||
*/
|
||||
@Override
|
||||
public void refresh(final Collection<Invoker<T>> invokers) {
|
||||
LOGGER.info(config.getSimpleObjectName() + " try to refresh ConsistentHashLoadBalance's invoker cache, size=" + (invokers == null || invokers.isEmpty() ? 0 : invokers.size()));
|
||||
if (CollectionUtils.isEmpty(invokers)) {
|
||||
sortedInvokersCache = null;
|
||||
conHashInvokersCache = null;
|
||||
return;
|
||||
}
|
||||
|
||||
List<Invoker<T>> sortedInvokersTmp = new ArrayList<>(invokers);
|
||||
sortedInvokersTmp.sort(comparator);
|
||||
|
||||
sortedInvokersCache = sortedInvokersTmp;
|
||||
|
||||
ConcurrentSkipListMap<Long, Invoker<T>> concurrentSkipListMap = new ConcurrentSkipListMap<Long, Invoker<T>>();
|
||||
LoadBalanceHelper.buildConsistentHashCircle(sortedInvokersTmp, config).forEach(concurrentSkipListMap::put);
|
||||
conHashInvokersCache = concurrentSkipListMap;
|
||||
|
||||
LOGGER.info(config.getSimpleObjectName() + " refresh ConsistentHashLoadBalance's invoker cache done, conHashInvokersCache size="
|
||||
+ (conHashInvokersCache == null || conHashInvokersCache.isEmpty() ? 0 : conHashInvokersCache.size())
|
||||
+ ", sortedInvokersCache size=" + (sortedInvokersCache == null || sortedInvokersCache.isEmpty() ? 0 : sortedInvokersCache.size()));
|
||||
}
|
||||
}
|
@ -27,6 +27,10 @@ public class HmilyLoadBalance<T> implements LoadBalance<T> {
|
||||
|
||||
private final Object hashLoadBalanceLock = new Object();
|
||||
|
||||
private volatile HmilyConsistentHashLoadBalance<T> consistentHashLoadBalance;
|
||||
|
||||
private final Object consistentHashLoadBalanceLock = new Object();
|
||||
|
||||
public HmilyLoadBalance(final HmilyRoundRobinLoadBalance hmilyRoundRobinLoadBalance, final ServantProxyConfig config) {
|
||||
this.hmilyRoundRobinLoadBalance = hmilyRoundRobinLoadBalance;
|
||||
this.config = config;
|
||||
@ -35,6 +39,20 @@ public class HmilyLoadBalance<T> implements LoadBalance<T> {
|
||||
@Override
|
||||
public Invoker<T> select(final InvokeContext invocation) throws NoInvokerException {
|
||||
long hash = Math.abs(StringUtils.convertLong(invocation.getAttachment(Constants.TARS_HASH), 0));
|
||||
long consistentHash = Math.abs(StringUtils.convertLong(invocation.getAttachment(Constants.TARS_CONSISTENT_HASH), 0));
|
||||
|
||||
if (consistentHash > 0) {
|
||||
if (consistentHashLoadBalance == null) {
|
||||
synchronized (consistentHashLoadBalanceLock) {
|
||||
if (consistentHashLoadBalance == null) {
|
||||
HmilyConsistentHashLoadBalance<T> tmp = new HmilyConsistentHashLoadBalance<T>(config);
|
||||
tmp.refresh(lastRefreshInvokers);
|
||||
consistentHashLoadBalance = tmp;
|
||||
}
|
||||
}
|
||||
}
|
||||
return consistentHashLoadBalance.select(invocation);
|
||||
}
|
||||
|
||||
if (hash > 0) {
|
||||
if (hashLoadBalance == null) {
|
||||
@ -62,6 +80,12 @@ public class HmilyLoadBalance<T> implements LoadBalance<T> {
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (consistentHashLoadBalanceLock) {
|
||||
if (consistentHashLoadBalance != null) {
|
||||
consistentHashLoadBalance.refresh(invokers);
|
||||
}
|
||||
}
|
||||
|
||||
hmilyRoundRobinLoadBalance.refresh(invokers);
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package org.dromara.hmily.tars.spring;
|
||||
package org.dromara.hmily.tars.startup;
|
||||
|
||||
import com.qq.tars.client.Communicator;
|
||||
import org.dromara.hmily.tars.spring.TarsHmilyCommunicatorBeanPostProcessor;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
@ -1,4 +1,4 @@
|
||||
package org.dromara.hmily.tars.spring;
|
||||
package org.dromara.hmily.tars.startup;
|
||||
|
||||
import com.qq.tars.common.FilterKind;
|
||||
import com.qq.tars.server.core.AppContextManager;
|
@ -1,2 +1,2 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.dromara.hmily.tars.spring.TarsHmilyConfiguration
|
||||
org.dromara.hmily.tars.startup.TarsHmilyConfiguration
|
Loading…
Reference in New Issue
Block a user