!9 udp多播v1.0

Merge pull request !9 from Davis/meng
This commit is contained in:
tianyaleixiaowu 2022-11-15 11:36:27 +00:00 committed by Gitee
commit a8109f4b75
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
9 changed files with 136 additions and 19 deletions

View File

@ -3,10 +3,12 @@ package com.jd.platform.jlog.client;
import com.alibaba.fastjson.JSON;
import com.jd.platform.jlog.client.mdc.Mdc;
import com.jd.platform.jlog.client.modeholder.ModeHolder;
import com.jd.platform.jlog.client.task.Monitor;
import com.jd.platform.jlog.client.udp.HttpSender;
import com.jd.platform.jlog.client.udp.UdpClient;
import com.jd.platform.jlog.client.udp.UdpSender;
import com.jd.platform.jlog.common.constant.SendMode;
import com.jd.platform.jlog.common.handler.TagConfig;
import com.jd.platform.jlog.core.ClientHandlerBuilder;
import com.jd.platform.jlog.core.Configurator;
@ -37,7 +39,7 @@ public class TracerClientStarter {
*/
private TagConfig tagConfig;
private SendMode sendMode;
/**
* TracerClientStarter
*/
@ -52,6 +54,7 @@ public class TracerClientStarter {
private String appName;
private Mdc mdc;
private TagConfig tagConfig;
private SendMode sendMode;
public Builder() {
}
@ -66,6 +69,11 @@ public class TracerClientStarter {
return this;
}
public Builder setSendMode(SendMode sendMode) {
this.sendMode = sendMode;
return this;
}
public Builder setTagConfig(TagConfig tagConfig) {
this.tagConfig = tagConfig;
return this;
@ -75,6 +83,7 @@ public class TracerClientStarter {
TracerClientStarter tracerClientStarter = new TracerClientStarter(appName);
tracerClientStarter.tagConfig = tagConfig;
tracerClientStarter.mdc = mdc;
tracerClientStarter.sendMode=sendMode;
return tracerClientStarter;
}
}
@ -88,6 +97,8 @@ public class TracerClientStarter {
Context.MDC = mdc;
ModeHolder.setSendMode(this.sendMode);
Monitor starter = new Monitor();
starter.start();

View File

@ -0,0 +1,32 @@
package com.jd.platform.jlog.client.modeholder;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.jd.platform.jlog.common.constant.SendMode;
/**
* 线程间传递通讯模式单播多播
*/
public class ModeHolder {
/**
* 用于在线程池间也能透传SendMode
*/
private static TransmittableThreadLocal<SendMode> context = new TransmittableThreadLocal<>();
/**
* 设置SendMode到线程里
*/
public static void setSendMode(SendMode mode) {
context.set(mode);
}
/**
* 如果没有SendMode说明没设置上则返回一个默认值默认是单播模式
*/
public static SendMode getSendMode() {
try {
return context.get();
} catch (Exception e) {
return new SendMode();
}
}
}

View File

@ -1,6 +1,7 @@
package com.jd.platform.jlog.client.udp;
import com.jd.platform.jlog.client.Context;
import com.jd.platform.jlog.client.modeholder.ModeHolder;
import com.jd.platform.jlog.client.worker.WorkerInfoHolder;
import com.jd.platform.jlog.common.constant.Constant;
import com.jd.platform.jlog.common.model.TracerData;
@ -91,15 +92,19 @@ public class UdpClient {
ByteBuf buf = channelHandlerContext.alloc().buffer(compressBytes.length);
buf.writeBytes(compressBytes);
//挑一个worker
String workerIpPort = WorkerInfoHolder.chooseWorker();
if (workerIpPort == null) {
return;
InetSocketAddress remoteAddress=null;
if(ModeHolder.getSendMode().getUnicast()){
//挑选worker
String workerIpPort = WorkerInfoHolder.chooseWorker();
if (workerIpPort == null) {
return;
}
String[] ipPort = workerIpPort.split(Constant.SPLITER);
//发往worker的ip
remoteAddress= new InetSocketAddress(ipPort[0], Integer.valueOf(ipPort[1]));
}else{
remoteAddress=tracerData.getAddress();
}
String[] ipPort = workerIpPort.split(Constant.SPLITER);
//发往worker的ip
InetSocketAddress remoteAddress = new InetSocketAddress(ipPort[0], Integer.valueOf(ipPort[1]));
DatagramPacket packet = new DatagramPacket(buf, remoteAddress);
list.add(packet);
}

View File

@ -1,14 +1,20 @@
package com.jd.platform.jlog.client.udp;
import com.jd.platform.jlog.client.Context;
import com.jd.platform.jlog.client.modeholder.ModeHolder;
import com.jd.platform.jlog.client.worker.WorkerInfoHolder;
import com.jd.platform.jlog.common.constant.Constant;
import com.jd.platform.jlog.common.model.RunLogMessage;
import com.jd.platform.jlog.common.model.TracerBean;
import com.jd.platform.jlog.common.model.TracerData;
import com.jd.platform.jlog.common.utils.AsyncPool;
import com.jd.platform.jlog.common.utils.AsyncWorker;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.DatagramPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -140,9 +146,23 @@ public class UdpSender {
/**
* 往worker发traceBean
*/
private static void send(List<TracerBean> tracerBeans) {
private static void send(List<TracerBean> tracerBeans) throws InterruptedException {
TracerData tracerData = new TracerData();
tracerData.setTracerBeanList(tracerBeans);
Context.CHANNEL.writeAndFlush(tracerData);
if(!ModeHolder.getSendMode().getUnicast()){
List<String>ips= WorkerInfoHolder.selectWorkers();
for(String ip:ips){
String[] ipPort = ip.split(Constant.SPLITER);
//发往worker的ip
InetSocketAddress remoteAddress = new InetSocketAddress(ipPort[0], Integer.valueOf(ipPort[1]));
tracerData.setAddress(remoteAddress);
ChannelFuture future = Context.CHANNEL.writeAndFlush(tracerData);
//同步操作否则会出现bug
future.sync();
}
return;
}else {
Context.CHANNEL.writeAndFlush(tracerData);
}
}
}

View File

@ -1,6 +1,6 @@
package com.jd.platform.jlog.client.worker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@ -33,12 +33,12 @@ public class WorkerInfoHolder {
if (size == 0) {
return workerIp;
}
//按本机ip对worker数量进行hash
// int index = Math.abs(IpUtils.getIp().hashCode() % size);
if (index >= WORKER_HOLDER.size()) {
index = 0;
}
try {
workerIp = WORKER_HOLDER.get(index);
} catch (Exception e) {
@ -49,7 +49,15 @@ public class WorkerInfoHolder {
return workerIp;
}
//多播模式 返回所有注册在注册中心为Work的地址
public static List<String> selectWorkers(){
List<String>defaultIps=new ArrayList<>();
defaultIps.add("127.0.0.1:9999");
if(WORKER_HOLDER.size()==0){
return defaultIps;
}
return WORKER_HOLDER;
}
/**
* 监听到worker信息变化后
* 将新的worker信息和当前的进行合并并且连接新的address

View File

@ -0,0 +1,19 @@
package com.jd.platform.jlog.common.constant;
import java.io.Serializable;
//通讯方式实体类
public class SendMode implements Serializable {
//true为单播false为多播
private Boolean unicast=true;
public SendMode() {
}
public boolean getUnicast() {
return unicast;
}
public void setUnicast(boolean unicast) {
this.unicast = unicast;
}
}

View File

@ -1,6 +1,7 @@
package com.jd.platform.jlog.common.model;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.List;
/**
@ -15,12 +16,17 @@ public class TracerData implements Serializable {
*/
private List<TracerBean> tracerBeanList;
@Override
private transient InetSocketAddress address;
/*@Override
public String toString() {
return "TracerData{" +
"tracerBeanList=" + tracerBeanList +
'}';
}
}*/
public InetSocketAddress getAddress() { return address; }
public void setAddress(InetSocketAddress address) { this.address = address; }
public List<TracerBean> getTracerBeanList() {
return tracerBeanList;

View File

@ -2,6 +2,7 @@ package com.jd.platform.jlog.clientdemo.custom;
import com.jd.platform.jlog.client.TracerClientStarter;
import com.jd.platform.jlog.client.filter.HttpFilter;
import com.jd.platform.jlog.common.constant.SendMode;
import com.jd.platform.jlog.common.handler.TagConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,6 +25,16 @@ public class Starter {
private Logger logger = LoggerFactory.getLogger(getClass());
private TagConfig tagConfig ;
//通讯方式单播多播默认是单播
private static SendMode sendMode=new SendMode();
public void setSendMode(SendMode sendMode) {
this.sendMode = sendMode;
}
public SendMode getSendMode() {
return sendMode;
}
public TagConfig getTagConfig() {
return tagConfig;
@ -38,18 +49,16 @@ public class Starter {
TracerClientStarter tracerClientStarter = new TracerClientStarter.Builder()
.setAppName("demo")
.setTagConfig(tagConfig)
.setSendMode(sendMode)
.build();
logger.info("初始化tagConfig: {}",tagConfig);
tracerClientStarter.startPipeline();
}
@Bean
public FilterRegistrationBean urlFilter() {
FilterRegistrationBean registration = new FilterRegistrationBean();
HttpFilter userFilter = new HttpFilter();
registration.setFilter(userFilter);
registration.addUrlPatterns("/*");
registration.setName("UserTraceFilter");

View File

@ -9,6 +9,10 @@ apollo.meta=http://127.0.0.1:8080
apollo.config-service=http://127.0.0.1:8080
app.id=order
#设置通讯方式(单播或者多播),不设置则为单播(默认)
#单播为true多播为false
#send-mode.unicast=true
tag-config.reqTags[0]=uid
tag-config.reqTags[1]=url
@ -25,3 +29,6 @@ tag-config.extract=41
compress=68
threshold=10
#worker的地址
#workers=['127.0.0.1:9999','127.0.0.1:10000']