Merge remote-tracking branch 'origin/master-zk' into multi_configuration

This commit is contained in:
liyunfeng 2022-02-09 19:17:15 +08:00
commit 7e73c054e1
9 changed files with 208 additions and 17 deletions

View File

@ -3,6 +3,7 @@ package com.jd.platform.jlog.client.etcd;
import com.jd.platform.jlog.common.config.IConfigCenter;
import com.jd.platform.jlog.common.config.etcd.JdEtcdBuilder;
import com.jd.platform.jlog.common.config.zookeeper.ZkBuilder;
/**
* @author wuweifeng wrote on 2020-01-07
@ -19,6 +20,7 @@ public class EtcdConfigFactory {
public static void buildConfigCenter(String etcdServer) {
//连接多个时逗号分隔
configCenter = JdEtcdBuilder.build(etcdServer);
//configCenter = JdEtcdBuilder.build(etcdServer);
configCenter = ZkBuilder.build(etcdServer);
}
}

View File

@ -57,27 +57,27 @@ public class EtcdStarter {
private void fetch() {
IConfigCenter configCenter = EtcdConfigFactory.configCenter();
//获取所有worker的ip
List<KeyValue> keyValues = null;
List<String> keys = null;
try {
//如果设置了机房属性则拉取同机房的worker如果同机房没worker则拉取所有
if (Context.MDC != null) {
String mdc = parseMdc(Context.MDC);
keyValues = configCenter.getPrefix(Constant.WORKER_PATH + Context.APP_NAME + "/" + mdc);
keys = configCenter.getPrefixKey(Constant.WORKER_PATH + Context.APP_NAME + "/" + mdc);
}
if (CollectionUtil.isEmpty(keyValues)) {
keyValues = configCenter.getPrefix(Constant.WORKER_PATH + Context.APP_NAME);
if (CollectionUtil.isEmpty(keys)) {
keys = configCenter.getPrefixKey(Constant.WORKER_PATH + Context.APP_NAME);
}
//全是空给个警告
if (CollectionUtil.isEmpty(keyValues)) {
if (CollectionUtil.isEmpty(keys)) {
logger.warn("very important warn !!! workers ip info is null!!!");
}
List<String> addresses = new ArrayList<>();
if (keyValues != null) {
for (KeyValue keyValue : keyValues) {
if (keys != null) {
for (String key : keys) {
//value里放的是ip地址
String ipPort = keyValue.getValue().toStringUtf8();
String ipPort = configCenter.get(key);
addresses.add(ipPort);
}
}

View File

@ -21,9 +21,38 @@
<hutool.version>5.1.0</hutool.version>
<hp-etcd.version>0.0.16</hp-etcd.version>
<protostuff.version>1.7.2</protostuff.version>
<zookeeper.version>3.4.14</zookeeper.version>
</properties>
<dependencies>
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>

View File

@ -45,10 +45,9 @@ public interface IConfigCenter {
String get(String key);
/**
* 获取指定前缀的所有key-value
* 获取指定前缀的所有key
*/
List<KeyValue> getPrefix(String key);
List<String> getPrefixKey(String key);
/**
* 监听key
*/

View File

@ -12,8 +12,10 @@ import com.ibm.etcd.client.lease.PersistentLease;
import com.ibm.etcd.client.lock.LockClient;
import com.jd.platform.jlog.common.config.IConfigCenter;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.SECONDS;
@ -115,10 +117,18 @@ public class JdEtcdClient implements IConfigCenter {
return keyValues.get(0);
}
// @Override
// public List<KeyValue> getPrefix(String key) {
// RangeResponse rangeResponse = kvClient.get(ByteString.copyFromUtf8(key)).asPrefix().sync();
// return rangeResponse.getKvsList();
// }
@Override
public List<KeyValue> getPrefix(String key) {
public List<String> getPrefixKey(String key) {
RangeResponse rangeResponse = kvClient.get(ByteString.copyFromUtf8(key)).asPrefix().sync();
return rangeResponse.getKvsList();
return rangeResponse.getKvsList().stream().map(
kv -> kv.getKey().toStringUtf8()
).collect(Collectors.toList());
}
@Override

View File

@ -0,0 +1,118 @@
package com.jd.platform.jlog.common.config.zookeeper;
import com.ibm.etcd.api.KeyValue;
import com.ibm.etcd.client.kv.KvClient;
import com.jd.platform.jlog.common.config.IConfigCenter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import java.util.List;
/**
* @author shenkaiwen5
* @version 1.0
* @date 2022-01-06
*/
public class ZKClient implements IConfigCenter {
private CuratorFramework curator;
public ZKClient(CuratorFramework curatorFramework) {
this.curator = curatorFramework;
}
@Override
public void put(String key, String value) {
try {
curator.create().creatingParentsIfNeeded() // 若创建节点的父节点不存在则先创建父节点再创建子节点
.withMode(CreateMode.EPHEMERAL) // 创建的是临时节点
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) // 默认匿名权限,权限scheme id:'world,'anyone,:cdrwa
.forPath(key, value.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void put(String key, String value, long leaseId) {
}
@Override
public void revoke(long leaseId) {
}
@Override
public long putAndGrant(String key, String value, long ttl) {
return 0;
}
@Override
public long setLease(String key, long leaseId) {
return 0;
}
@Override
public void delete(String key) {
}
@Override
public String get(String key) {
try {
return curator.getData().forPath(key).toString();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
@Override
public List<String> getPrefixKey(String key) {
try {
return curator.getChildren().forPath(key);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
@Override
public KvClient.WatchIterator watch(String key) {
return null;
}
@Override
public KvClient.WatchIterator watchPrefix(String key) {
return null;
}
@Override
public long keepAlive(String key, String value, int frequencySecs, int minTtl) throws Exception {
return 0;
}
@Override
public long buildAliveLease(int frequencySecs, int minTtl) throws Exception {
return 0;
}
@Override
public long buildNormalLease(long ttl) {
return 0;
}
@Override
public long timeToLive(long leaseId) {
return 0;
}
@Override
public KeyValue getKv(String key) {
return null;
}
}

View File

@ -0,0 +1,29 @@
package com.jd.platform.jlog.common.config.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* @author shenkaiwen5
* @version 1.0
* @date 2022-01-06
*/
public class ZkBuilder {
/**
* 构建ZKClient
*/
public static ZKClient build(String endPoints) {
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(endPoints)
// 连接超时时间
.sessionTimeoutMs(1000)
// 会话超时时间
.connectionTimeoutMs(1000)
// 刚开始重试间隔为1秒之后重试间隔逐渐增加最多重试不超过三次
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
return new ZKClient(client);
}
}

View File

@ -2,6 +2,7 @@ package com.jd.platform.jlog.worker.config;
import com.jd.platform.jlog.common.config.IConfigCenter;
import com.jd.platform.jlog.common.config.etcd.JdEtcdBuilder;
import com.jd.platform.jlog.common.config.zookeeper.ZkBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@ -25,7 +26,8 @@ public class EtcdConfig {
public IConfigCenter client() {
logger.info("etcd address : " + etcdServer);
//连接多个时逗号分隔
return JdEtcdBuilder.build(etcdServer);
//return JdEtcdBuilder.build(etcdServer);
return ZkBuilder.build(etcdServer);
}
}

View File

@ -50,8 +50,10 @@ public class EtcdStarter {
scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
configCenter.putAndGrant(buildKey(), buildValue(), 8);
configCenter.putAndGrant(buildSecondKey(), buildValue(), 8);
//configCenter.putAndGrant(buildKey(), buildValue(), 8);
//configCenter.putAndGrant(buildSecondKey(), buildValue(), 8);
configCenter.put(buildKey(), buildValue());
configCenter.put(buildSecondKey(), buildValue());
} catch (Exception e) {
//do nothing
e.printStackTrace();