Merge remote-tracking branch 'origin/master'

This commit is contained in:
qiwei 2023-08-29 13:41:56 +08:00
commit 4b88b9e607
39 changed files with 848 additions and 226 deletions

2
.gitignore vendored
View File

@ -27,3 +27,5 @@ HELP.md
### VS Code ###
.vscode/
/log/

29
Dashboard/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

View File

@ -1,6 +1,5 @@
package com.jd.platform.jlog.dashboard.entity;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;

View File

@ -1,6 +1,5 @@
package com.jd.platform.jlog.dashboard.entity;
import lombok.Data;
/**
* @author shenkaiwen5

View File

@ -1,6 +1,5 @@
package com.jd.platform.jlog.dashboard.model;
import lombok.Data;
/**
* 查询条件对象

View File

@ -1,6 +1,5 @@
package com.jd.platform.jlog.dashboard.model;
import lombok.Data;
/**
* 单个查询条件

View File

@ -193,7 +193,10 @@
},
{
field: 'errmsg',
title: '错误信息'
title: '错误信息',
formatter: function(value, row, index) {
return $.table.tooltip(value, 100, "open");
}
},
{
field: 'createTime',

29
client/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

View File

@ -3,10 +3,12 @@ package com.jd.platform.jlog.client;
import com.alibaba.fastjson.JSON;
import com.jd.platform.jlog.client.mdc.Mdc;
import com.jd.platform.jlog.client.modeholder.ModeHolder;
import com.jd.platform.jlog.client.task.Monitor;
import com.jd.platform.jlog.client.udp.HttpSender;
import com.jd.platform.jlog.client.udp.UdpClient;
import com.jd.platform.jlog.client.udp.UdpSender;
import com.jd.platform.jlog.common.constant.SendMode;
import com.jd.platform.jlog.common.handler.TagConfig;
import com.jd.platform.jlog.core.ClientHandlerBuilder;
import com.jd.platform.jlog.core.Configurator;
@ -37,7 +39,7 @@ public class TracerClientStarter {
*/
private TagConfig tagConfig;
private SendMode sendMode;
/**
* TracerClientStarter
*/
@ -52,6 +54,7 @@ public class TracerClientStarter {
private String appName;
private Mdc mdc;
private TagConfig tagConfig;
private SendMode sendMode;
public Builder() {
}
@ -66,6 +69,11 @@ public class TracerClientStarter {
return this;
}
public Builder setSendMode(SendMode sendMode) {
this.sendMode = sendMode;
return this;
}
public Builder setTagConfig(TagConfig tagConfig) {
this.tagConfig = tagConfig;
return this;
@ -75,6 +83,7 @@ public class TracerClientStarter {
TracerClientStarter tracerClientStarter = new TracerClientStarter(appName);
tracerClientStarter.tagConfig = tagConfig;
tracerClientStarter.mdc = mdc;
tracerClientStarter.sendMode=sendMode;
return tracerClientStarter;
}
}
@ -88,6 +97,8 @@ public class TracerClientStarter {
Context.MDC = mdc;
ModeHolder.setSendMode(this.sendMode);
Monitor starter = new Monitor();
starter.start();

View File

@ -2,13 +2,13 @@ package com.jd.platform.jlog.client.filter;
import com.jd.platform.jlog.client.Context;
import com.jd.platform.jlog.client.cache.ExtParamFactory;
import com.jd.platform.jlog.client.log.LogExceptionStackTrace;
import com.jd.platform.jlog.client.percent.DefaultTracerPercentImpl;
import com.jd.platform.jlog.client.percent.ITracerPercent;
import com.jd.platform.jlog.client.tracerholder.TracerHolder;
import com.jd.platform.jlog.client.udp.UdpSender;
import com.jd.platform.jlog.common.handler.CompressHandler.Outcome;
import com.jd.platform.jlog.common.model.TracerBean;
import com.jd.platform.jlog.common.utils.CollectionUtil;
import com.jd.platform.jlog.common.handler.CompressHandler.Outcome;
import com.jd.platform.jlog.common.utils.IdWorker;
import com.jd.platform.jlog.common.utils.IpUtils;
import com.jd.platform.jlog.core.ClientHandler;
@ -18,13 +18,11 @@ import org.slf4j.LoggerFactory;
import javax.servlet.*;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.*;
import static com.jd.platform.jlog.common.constant.Constant.REQ;
import static com.jd.platform.jlog.common.constant.Constant.RESP;
/**
* HttpFilter
@ -64,74 +62,64 @@ public class HttpFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletResponse resp = (HttpServletResponse) servletResponse;
RequestWrapper requestWrapper = new RequestWrapper((HttpServletRequest) servletRequest);
long currentTImeMills = System.currentTimeMillis();
String uri = requestWrapper.getRequestURI().replace("/", "");
//设置随机数
Random random = new Random(currentTImeMills);
//1-100之间
int number = random.nextInt(100) + 1;
//此处要有个开关控制百分比
if (iTracerPercent.percent() < number) {
filterChain.doFilter(requestWrapper, servletResponse);
return;
}
//如果是要忽略的接口就继续执行不搜集信息
if (iTracerPercent.ignoreUriSet() != null && iTracerPercent.ignoreUriSet().contains(uri)) {
filterChain.doFilter(requestWrapper, servletResponse);
return;
}
//链路唯一Id
long tracerId = IdWorker.nextId();
TracerHolder.setTracerId(tracerId);
TracerBean tracerBean = new TracerBean();
tracerBean.setTracerId(tracerId);
tracerBean.setCreateTimeLong(System.currentTimeMillis());
tracerBean.setUri(uri);
tracerBean.setApp(Context.APP_NAME);
//处理request的各个入参
parseRequestMap(requestWrapper, tracerBean);
try {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
HttpServletResponse resp = (HttpServletResponse) servletResponse;
String uri = httpRequest.getRequestURI().replace("/", "");
long currentTImeMills = System.currentTimeMillis();
//设置随机数
Random random = new Random(currentTImeMills);
//1-100之间
int number = random.nextInt(100) + 1;
//此处要有个开关控制百分比
if (iTracerPercent.percent() < number) {
filterChain.doFilter(servletRequest, servletResponse);
return;
}
//如果是要忽略的接口就继续执行不搜集信息
if (iTracerPercent.ignoreUriSet() != null && iTracerPercent.ignoreUriSet().contains(uri)) {
filterChain.doFilter(servletRequest, servletResponse);
return;
}
//链路唯一Id
long tracerId = IdWorker.nextId();
TracerHolder.setTracerId(tracerId);
//传输对象基础属性设置
TracerBean tracerBean = new TracerBean();
tracerBean.setCreateTime(System.currentTimeMillis());
List<Map<String, Object>> tracerObject = new ArrayList<>();
tracerBean.setTracerObject(tracerObject);
tracerBean.setTracerId(tracerId + "");
//处理request的各个入参
dealRequestMap(servletRequest, tracerObject, tracerId, uri);
//处理response
dealResponseMap(servletRequest, servletResponse, resp, tracerObject, filterChain);
tracerBean.setResponseContent(dealResponseMap(requestWrapper, servletResponse,
resp, filterChain));
} catch (Exception e) {
//异常信息
tracerBean.setErrmsg(LogExceptionStackTrace.erroStackTrace(e).toString());
filterChain.doFilter(requestWrapper, servletResponse);
}finally {
//设置耗时
tracerBean.setCostTime((int) (System.currentTimeMillis() - tracerBean.getCreateTime()));
tracerBean.setCostTime((System.currentTimeMillis() - tracerBean.getCreateTimeLong()));
//udp发送
UdpSender.offerBean(tracerBean);
} catch (Exception e) {
filterChain.doFilter(servletRequest, servletResponse);
}
}
/**
* 处理出参相关信息
*/
private void dealResponseMap(ServletRequest servletRequest, ServletResponse servletResponse, HttpServletResponse resp,
List<Map<String, Object>> tracerObject, FilterChain filterChain) throws IOException, ServletException {
private byte[] dealResponseMap(ServletRequest servletRequest, ServletResponse servletResponse, HttpServletResponse resp,
FilterChain filterChain) throws IOException, ServletException {
// 包装响应对象 resp 并缓存响应数据
ResponseWrapper mResp = new ResponseWrapper(resp);
filterChain.doFilter(servletRequest, mResp);
byte[] contentBytes = mResp.getContent();
String content = new String(contentBytes);
Map<String, Object> responseMap = new HashMap<>(8);
Map<String, Object> map = ExtParamFactory.getRespMap(content);
Outcome outcome = ClientHandler.processResp(contentBytes, map);
responseMap.put(RESP, outcome.getContent());
if(CollectionUtil.isNotEmpty(outcome.getTagMap())){
responseMap.putAll(outcome.getTagMap());
}
tracerObject.add(responseMap);
//此处可以对content做处理,然后再把content写回到输出流中
servletResponse.setContentLength(-1);
@ -139,29 +127,25 @@ public class HttpFilter implements Filter {
out.write(content);
out.flush();
out.close();
return (byte[]) outcome.getContent();
}
/**
* 处理入参相关信息
*/
private void dealRequestMap(ServletRequest servletRequest, List<Map<String, Object>> tracerObject,
long tracerId, String uri) throws IllegalAccessException, InstantiationException {
private void parseRequestMap(RequestWrapper requestWrapper, TracerBean tracerBean) {
//request的各个入参
Map<String, String[]> params = servletRequest.getParameterMap();
Map<String, String[]> params = requestWrapper.getParameterMap();
Map<String, Object> requestMap = new HashMap<>(params.size());
for (String key : params.keySet()) {
requestMap.put(key, params.get(key)[0]);
}
requestMap.put("appName", Context.APP_NAME);
requestMap.put("serverIp", IpUtils.getIp());
requestMap.put("tracerId", tracerId);
requestMap.put("uri", uri);
tracerBean.setUid((String) requestMap.get("uid"));
// 自定义的其他的参数对
requestMap.putAll(ExtParamFactory.getReqMap(servletRequest));
requestMap.putAll(ExtParamFactory.getReqMap(requestWrapper));
Outcome out = ClientHandler.processReq(requestMap);
requestMap.put(REQ, out.getContent());
tracerObject.add(out.getTagMap());
tracerBean.setRequestContent((byte[]) out.getContent());
}
@Override

View File

@ -0,0 +1,32 @@
package com.jd.platform.jlog.client.modeholder;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.jd.platform.jlog.common.constant.SendMode;
/**
* 线程间传递通讯模式单播多播
*/
public class ModeHolder {
/**
* 用于在线程池间也能透传SendMode
*/
private static TransmittableThreadLocal<SendMode> context = new TransmittableThreadLocal<>();
/**
* 设置SendMode到线程里
*/
public static void setSendMode(SendMode mode) {
context.set(mode);
}
/**
* 如果没有SendMode说明没设置上则返回一个默认值默认是单播模式
*/
public static SendMode getSendMode() {
try {
return context.get();
} catch (Exception e) {
return new SendMode();
}
}
}

View File

@ -1,5 +1,6 @@
package com.jd.platform.jlog.client.udp;
import com.jd.platform.jlog.client.modeholder.ModeHolder;
import com.jd.platform.jlog.client.worker.WorkerInfoHolder;
import com.jd.platform.jlog.common.utils.AsyncPool;
import okhttp3.*;
@ -7,6 +8,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
@ -42,10 +44,15 @@ public class HttpSender {
*/
private static LinkedBlockingQueue<OneTracer> tracerDataQueue = new LinkedBlockingQueue<>(500);
//地址仅多播时候使用
private static InetSocketAddress address;
/**
* 写入队列
*/
public static void offerBean(byte[] compressBytes) {
public static void offerBean(byte[] compressBytes, InetSocketAddress address) {
if(address!=null){
HttpSender.address=address;
}
OneTracer oneTracer = new OneTracer();
oneTracer.setBytes(compressBytes);
//容量是否已满
@ -64,7 +71,7 @@ public class HttpSender {
}
/**
* 定时往worker发
* 定时往worker发
*/
public static void uploadToWorker() {
//filter拦截到的出入参
@ -112,9 +119,18 @@ public class HttpSender {
.addFormDataPart("data", "data", requestBody)
.build();
//挑一个worker
String rawIpPort = WorkerInfoHolder.chooseWorker();
String ipPort = rawIpPort.substring(0, rawIpPort.lastIndexOf(":")) + ":8080";
String rawIpPort;
String ipPort;
if(ModeHolder.getSendMode().getUnicast()){
//挑一个worker
rawIpPort = WorkerInfoHolder.chooseWorker();
ipPort = rawIpPort.substring(0, rawIpPort.lastIndexOf(":")) + ":8080";
}else{
//直接从TracerData中获取地址
rawIpPort=HttpSender.address.getHostName();
ipPort=rawIpPort+":8080";
}
String url = "http://" + ipPort + "/big/receive";
Request request = new Request.Builder()

View File

@ -1,6 +1,7 @@
package com.jd.platform.jlog.client.udp;
import com.jd.platform.jlog.client.Context;
import com.jd.platform.jlog.client.modeholder.ModeHolder;
import com.jd.platform.jlog.client.worker.WorkerInfoHolder;
import com.jd.platform.jlog.common.constant.Constant;
import com.jd.platform.jlog.common.model.TracerData;
@ -85,21 +86,25 @@ public class UdpClient {
//判断压缩完是否过大过大走http接口请求worker
if (compressBytes.length >= COMPRESS_BYTES_LEN) {
//放入发okhttp的队列
HttpSender.offerBean(compressBytes);
HttpSender.offerBean(compressBytes,tracerData.getAddress());
return;
}
ByteBuf buf = channelHandlerContext.alloc().buffer(compressBytes.length);
buf.writeBytes(compressBytes);
//挑一个worker
String workerIpPort = WorkerInfoHolder.chooseWorker();
if (workerIpPort == null) {
return;
InetSocketAddress remoteAddress=null;
if(ModeHolder.getSendMode().getUnicast()){
//挑选worker
String workerIpPort = WorkerInfoHolder.chooseWorker();
if (workerIpPort == null) {
return;
}
String[] ipPort = workerIpPort.split(Constant.SPLITER);
//发往worker的ip
remoteAddress= new InetSocketAddress(ipPort[0], Integer.valueOf(ipPort[1]));
}else{
remoteAddress=tracerData.getAddress();
}
String[] ipPort = workerIpPort.split(Constant.SPLITER);
//发往worker的ip
InetSocketAddress remoteAddress = new InetSocketAddress(ipPort[0], Integer.valueOf(ipPort[1]));
DatagramPacket packet = new DatagramPacket(buf, remoteAddress);
list.add(packet);
}

View File

@ -1,14 +1,20 @@
package com.jd.platform.jlog.client.udp;
import com.jd.platform.jlog.client.Context;
import com.jd.platform.jlog.common.model.RunLogMessage;
import com.jd.platform.jlog.client.modeholder.ModeHolder;
import com.jd.platform.jlog.client.worker.WorkerInfoHolder;
import com.jd.platform.jlog.common.constant.Constant;
import com.jd.platform.jlog.common.constant.LogTypeEnum;
import com.jd.platform.jlog.common.model.TracerBean;
import com.jd.platform.jlog.common.model.RunLogMessage;
import com.jd.platform.jlog.common.model.TracerData;
import com.jd.platform.jlog.common.utils.AsyncPool;
import com.jd.platform.jlog.common.utils.AsyncWorker;
import io.netty.channel.ChannelFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -39,7 +45,7 @@ public class UdpSender {
*/
private static AtomicLong SUCCESS_LOGGER_OFFER_COUNT = new AtomicLong();
/**
* 出入参集中营最多积压5万
* 出入参集中营最多积压5万
*/
private static LinkedBlockingQueue<TracerBean> tracerBeanQueue = new LinkedBlockingQueue<>(50000);
/**
@ -51,9 +57,9 @@ public class UdpSender {
/**
* 写入队列
*/
public static void offerBean(TracerBean tracerBean) {
public static void offerBean(TracerBean tracerModel) {
//容量是否已满
boolean success = tracerBeanQueue.offer(tracerBean);
boolean success = tracerBeanQueue.offer(tracerModel);
if (!success) {
long failCount = FAIL_OFFER_COUNT.incrementAndGet();
if (failCount % 10 == 0) {
@ -88,7 +94,7 @@ public class UdpSender {
/**
* 定时往worker发烧
* 定时向worker发送
*/
public static void uploadToWorker() {
//filter拦截到的出入参
@ -99,7 +105,10 @@ public class UdpSender {
TracerBean tracerBean = tracerBeanQueue.take();
tempTracers.add(tracerBean);
send(tempTracers);
TracerData tracerData = new TracerData();
tracerData.setTracerBeanList(tempTracers);
tracerData.setType(LogTypeEnum.SPAN);
send(tracerData);
} catch (Exception e) {
e.printStackTrace();
}
@ -115,21 +124,10 @@ public class UdpSender {
if (tempLogs.size() == 0) {
continue;
}
List<TracerBean> tempTracers = new ArrayList<>();
TracerBean tracerBean = new TracerBean();
tracerBean.setTracerId("-1");
List<Map<String, Object>> tracerObject = new ArrayList<>();
Map<String, Object> map = new HashMap<>();
for (RunLogMessage runLogMessage : tempLogs) {
map.put(UUID.randomUUID().toString(), runLogMessage);
}
tracerObject.add(map);
tracerBean.setTracerObject(tracerObject);
tempTracers.add(tracerBean);
send(tempTracers);
TracerData tracerData = new TracerData();
tracerData.setTempLogs(tempLogs);
tracerData.setType(LogTypeEnum.TRADE);
send(tracerData);
} catch (Exception e) {
e.printStackTrace();
}
@ -140,9 +138,21 @@ public class UdpSender {
/**
* 往worker发traceBean
*/
private static void send(List<TracerBean> tracerBeans) {
TracerData tracerData = new TracerData();
tracerData.setTracerBeanList(tracerBeans);
Context.CHANNEL.writeAndFlush(tracerData);
private static void send(TracerData tracerData) throws InterruptedException {
if(!ModeHolder.getSendMode().getUnicast()){
List<String>ips= WorkerInfoHolder.selectWorkers();
for(String ip:ips){
String[] ipPort = ip.split(Constant.SPLITER);
//发往worker的ip
InetSocketAddress remoteAddress = new InetSocketAddress(ipPort[0], Integer.valueOf(ipPort[1]));
tracerData.setAddress(remoteAddress);
ChannelFuture future = Context.CHANNEL.writeAndFlush(tracerData);
//同步操作否则会出现bug
future.sync();
}
return;
}else {
Context.CHANNEL.writeAndFlush(tracerData);
}
}
}

View File

@ -1,6 +1,6 @@
package com.jd.platform.jlog.client.worker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@ -33,12 +33,12 @@ public class WorkerInfoHolder {
if (size == 0) {
return workerIp;
}
//按本机ip对worker数量进行hash
// int index = Math.abs(IpUtils.getIp().hashCode() % size);
if (index >= WORKER_HOLDER.size()) {
index = 0;
}
try {
workerIp = WORKER_HOLDER.get(index);
} catch (Exception e) {
@ -49,7 +49,15 @@ public class WorkerInfoHolder {
return workerIp;
}
//多播模式 返回所有注册在注册中心为Work的地址
public static List<String> selectWorkers(){
List<String>defaultIps=new ArrayList<>();
defaultIps.add("127.0.0.1:9999");
if(WORKER_HOLDER.size()==0){
return defaultIps;
}
return WORKER_HOLDER;
}
/**
* 监听到worker信息变化后
* 将新的worker信息和当前的进行合并并且连接新的address

29
clientlog4j/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

29
clientlog4j2/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

29
clientlogback/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

29
common/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

View File

@ -15,7 +15,7 @@
<netty4.version>4.1.42.Final</netty4.version>
<snappy.version>1.1.8.4</snappy.version>
<zstd.version>1.5.0-4</zstd.version>
<fastjson.version>1.2.70</fastjson.version>
<fastjson.version>1.2.83</fastjson.version>
<protostuff.version>1.7.2</protostuff.version>
</properties>

View File

@ -0,0 +1,15 @@
package com.jd.platform.jlog.common.constant;
import java.io.Serializable;
/**
* @author xiaochangbai
* @date 2023-07-15 11:56
*/
public enum LogTypeEnum implements Serializable {
TRADE,
SPAN;
}

View File

@ -0,0 +1,19 @@
package com.jd.platform.jlog.common.constant;
import java.io.Serializable;
//通讯方式实体类
public class SendMode implements Serializable {
//true为单播false为多播
private Boolean unicast=true;
public SendMode() {
}
public boolean getUnicast() {
return unicast;
}
public void setUnicast(boolean unicast) {
this.unicast = unicast;
}
}

View File

@ -107,4 +107,19 @@ public class RunLogMessage {
public void setTagMap(Map<String, Object> tagMap) {
this.tagMap = tagMap;
}
@Override
public String toString() {
return "RunLogMessage{" +
"tracerId=" + tracerId +
", createTime=" + createTime +
", content=" + content +
", logLevel='" + logLevel + '\'' +
", className='" + className + '\'' +
", methodName='" + methodName + '\'' +
", threadName='" + threadName + '\'' +
", tagMap=" + tagMap +
'}';
}
}

View File

@ -1,75 +1,130 @@
package com.jd.platform.jlog.common.model;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Arrays;
/**
* 承载传输信息的对象
* @author wuweifeng
* @version 1.0
* @date 2021-08-12
* @author xiaochangbai
* @date 2023-07-15 11:25
*/
public class TracerBean implements Serializable {
/**
* 时间戳
*/
private long createTime;
/**
* 耗时毫秒
*/
private int costTime;
/**
* 唯一id代表一条链路
*/
private String tracerId;
/**
* tracer对象里面放的是List<Map<String, Object>>
* 第一个元素是request对象key为requestvalue为完整request入参从中可找到uuid
* 最后一个元素是response响应key为responsevalue为响应值byte[]可转为String入库
*/
private List<Map<String, Object>> tracerObject;
private Long tracerId;
private byte[] requestContent;
private byte[] responseContent;
private Long costTime;
private String uid;
private String errno;
private String errmsg;
private String app;
private String uri;
private String createTime;
private Long createTimeLong;
@Override
public String toString() {
return "TracerBean{" +
"createTime=" + createTime +
", costTime=" + costTime +
", tracerId='" + tracerId + '\'' +
", tracerObject=" + tracerObject +
'}';
}
public String getTracerId() {
public Long getTracerId() {
return tracerId;
}
public void setTracerId(String tracerId) {
public void setTracerId(Long tracerId) {
this.tracerId = tracerId;
}
public int getCostTime() {
public byte[] getRequestContent() {
return requestContent;
}
public void setRequestContent(byte[] requestContent) {
this.requestContent = requestContent;
}
public byte[] getResponseContent() {
return responseContent;
}
public void setResponseContent(byte[] responseContent) {
this.responseContent = responseContent;
}
public Long getCostTime() {
return costTime;
}
public void setCostTime(int costTime) {
public void setCostTime(Long costTime) {
this.costTime = costTime;
}
public long getCreateTime() {
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getErrno() {
return errno;
}
public void setErrno(String errno) {
this.errno = errno;
}
public String getErrmsg() {
return errmsg;
}
public void setErrmsg(String errmsg) {
this.errmsg = errmsg;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
public String getCreateTime() {
return createTime;
}
public void setCreateTime(long createTime) {
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
public List<Map<String, Object>> getTracerObject() {
return tracerObject;
public Long getCreateTimeLong() {
return createTimeLong;
}
public void setTracerObject(List<Map<String, Object>> tracerObject) {
this.tracerObject = tracerObject;
public void setCreateTimeLong(Long createTimeLong) {
this.createTimeLong = createTimeLong;
}
@Override
public String toString() {
return "TracerModel{" +
"tracerId=" + tracerId +
", requestContent=" + Arrays.toString(requestContent) +
", responseContent=" + Arrays.toString(responseContent) +
", costTime=" + costTime +
", uid='" + uid + '\'' +
", errno='" + errno + '\'' +
", errmsg='" + errmsg + '\'' +
", app='" + app + '\'' +
", uri='" + uri + '\'' +
", createTime=" + createTime +
", createTimeLong=" + createTimeLong +
'}';
}
}

View File

@ -1,6 +1,9 @@
package com.jd.platform.jlog.common.model;
import com.jd.platform.jlog.common.constant.LogTypeEnum;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.List;
/**
@ -10,17 +13,28 @@ import java.util.List;
* @date 2021-08-17
*/
public class TracerData implements Serializable {
/**
* type
*/
private LogTypeEnum type;
/**
* 多个tracer批量打包后
*/
private List<TracerBean> tracerBeanList;
@Override
public String toString() {
return "TracerData{" +
"tracerBeanList=" + tracerBeanList +
'}';
}
/**
* span日志
*/
List<RunLogMessage> tempLogs;
//发送地址仅多播时候使用
private transient InetSocketAddress address;
public InetSocketAddress getAddress() { return address; }
public void setAddress(InetSocketAddress address) { this.address = address; }
public List<TracerBean> getTracerBeanList() {
return tracerBeanList;
@ -29,4 +43,30 @@ public class TracerData implements Serializable {
public void setTracerBeanList(List<TracerBean> tracerBeanList) {
this.tracerBeanList = tracerBeanList;
}
public LogTypeEnum getType() {
return type;
}
public void setType(LogTypeEnum type) {
this.type = type;
}
public List<RunLogMessage> getTempLogs() {
return tempLogs;
}
public void setTempLogs(List<RunLogMessage> tempLogs) {
this.tempLogs = tempLogs;
}
@Override
public String toString() {
return "TracerData{" +
"type=" + type +
", tracerBeanList=" + tracerBeanList +
", tempLogs=" + tempLogs +
", address=" + address +
'}';
}
}

29
config/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

29
config/config-apollo/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

29
config/config-core/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

View File

@ -78,7 +78,7 @@ public class ConfiguratorFactory {
* 自定义配置器覆盖文件配置器
* @param configurator 配置器
*/
public static synchronized void cover(FileConfigurator configurator){
public static synchronized void cover(Configurator configurator){
instance = configurator;
LOGGER.info("自定义配置器类型:{}", instance.getType());
}

29
config/config-etcd/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

29
config/config-nacos/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

29
config/config-zk/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

29
example/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

View File

@ -2,6 +2,7 @@ package com.jd.platform.jlog.clientdemo.custom;
import com.jd.platform.jlog.client.TracerClientStarter;
import com.jd.platform.jlog.client.filter.HttpFilter;
import com.jd.platform.jlog.common.constant.SendMode;
import com.jd.platform.jlog.common.handler.TagConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,6 +25,16 @@ public class Starter {
private Logger logger = LoggerFactory.getLogger(getClass());
private TagConfig tagConfig ;
//通讯方式单播多播默认是单播
private static SendMode sendMode=new SendMode();
public void setSendMode(SendMode sendMode) {
this.sendMode = sendMode;
}
public SendMode getSendMode() {
return sendMode;
}
public TagConfig getTagConfig() {
return tagConfig;
@ -38,18 +49,16 @@ public class Starter {
TracerClientStarter tracerClientStarter = new TracerClientStarter.Builder()
.setAppName("demo")
.setTagConfig(tagConfig)
.setSendMode(sendMode)
.build();
logger.info("初始化tagConfig: {}",tagConfig);
tracerClientStarter.startPipeline();
}
@Bean
public FilterRegistrationBean urlFilter() {
FilterRegistrationBean registration = new FilterRegistrationBean();
HttpFilter userFilter = new HttpFilter();
registration.setFilter(userFilter);
registration.addUrlPatterns("/*");
registration.setName("UserTraceFilter");

View File

@ -41,7 +41,7 @@ public class TestController {
@RequestMapping("/index")
public Object index() {
TracerBean tracerBean = new TracerBean();
tracerBean.setTracerId("11111");
tracerBean.setTracerId(11111L);
Configurator configurator = ConfiguratorFactory.getInstance();
try{
@ -70,6 +70,9 @@ public class TestController {
if(newKey == 1){
return 1;
}
if(uid!=null && 3==uid){
throw new RuntimeException("发生异常了");
}
return new Response("滴滴员工tangbohu的终身代号是什么是9527");
}

View File

@ -9,6 +9,10 @@ apollo.meta=http://127.0.0.1:8080
apollo.config-service=http://127.0.0.1:8080
app.id=order
#设置通讯方式(单播或者多播),不设置则为单播(默认)
#单播为true多播为false
#send-mode.unicast=true
tag-config.reqTags[0]=uid
tag-config.reqTags[1]=url
@ -25,3 +29,6 @@ tag-config.extract=41
compress=68
threshold=10
#worker的地址
#workers=['127.0.0.1:9999','127.0.0.1:10000']

29
worker/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
HELP.md
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
/build/
### VS Code ###
.vscode/

View File

@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit;
@Component
public class CenterStarter {
private final static String configKeyName = "workers";
/**
* 上报自己的ip到配置中心
*/
@ -35,10 +37,11 @@ public class CenterStarter {
scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
List<String> list = config.getList("workers");
if(!list.contains(buildKey())){
list.add(buildValue());
config.putConfig("workers", JSON.toJSONString(list));
List<String> list = config.getList(configKeyName);
String value = buildValue();
if(!list.contains(value)){
list.add(value);
config.putConfig(configKeyName, JSON.toJSONString(list));
}
} catch (Exception e) {
//do nothing
@ -46,6 +49,15 @@ public class CenterStarter {
}
}, 0, 5, TimeUnit.SECONDS);
//注册注销事件
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
List<String> configList = config.getList(configKeyName);
if(configList.remove(buildValue())){
config.putConfig(configKeyName, JSON.toJSONString(configList));
}
}));
}
/**

View File

@ -1,18 +1,17 @@
package com.jd.platform.jlog.worker.disruptor;
import com.alibaba.fastjson.JSON;
import com.jd.platform.jlog.common.model.RunLogMessage;
import com.jd.platform.jlog.common.constant.LogTypeEnum;
import com.jd.platform.jlog.common.model.TracerBean;
import com.jd.platform.jlog.common.model.RunLogMessage;
import com.jd.platform.jlog.common.model.TracerData;
import com.jd.platform.jlog.common.utils.FastJsonUtils;
import com.jd.platform.jlog.common.utils.ProtostuffUtils;
import com.jd.platform.jlog.common.utils.ZstdUtils;
import com.jd.platform.jlog.worker.store.TracerLogToDbStore;
import com.jd.platform.jlog.worker.store.TracerModelToDbStore;
import com.lmax.disruptor.WorkHandler;
import io.netty.util.internal.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cglib.beans.BeanMap;
import java.time.Instant;
import java.time.LocalDateTime;
@ -22,8 +21,6 @@ import java.util.*;
import java.util.concurrent.atomic.LongAdder;
import static com.jd.platform.jlog.common.constant.Constant.DEFAULT_BYTE;
import static com.jd.platform.jlog.common.constant.Constant.REQ;
import static com.jd.platform.jlog.common.constant.Constant.RESP;
/**
* TracerConsumer
@ -69,9 +66,8 @@ public class TracerConsumer implements WorkHandler<OneTracer> {
TracerData tracerData = ProtostuffUtils.deserialize(decompressBytes, TracerData.class);
//包含了多个tracer对象
List<TracerBean> tracerBeanList = tracerData.getTracerBeanList();
buildTracerModel(tracerBeanList);
//消费处理
buildTracerModel(tracerData);
//处理完毕将数量加1
totalDealCount.increment();
@ -84,30 +80,24 @@ public class TracerConsumer implements WorkHandler<OneTracer> {
/**
* 构建要入库的对象
*/
private void buildTracerModel(List<TracerBean> tracerBeanList) {
//遍历传过来的
for (TracerBean tracerBean : tracerBeanList) {
//普通日志
if ("-1".equals(tracerBean.getTracerId())) {
dealTracerLog(tracerBean);
} else {
dealFilterModel(tracerBean);
}
private void buildTracerModel(TracerData tracerData) {
//普通日志
if (LogTypeEnum.TRADE.equals(tracerData.getType())) {
dealTracerLog(tracerData.getTempLogs());
} else {
dealFilterModel(tracerData.getTracerBeanList());
}
}
/**
* 处理中途日志
*/
private void dealTracerLog(TracerBean tracerBean) {
List<Map<String, Object>> mapList = tracerBean.getTracerObject();
Map<String, Object> objectMap = mapList.get(0);
//遍历value集合里面每个都是一个RunLogMessage对象
for (Object object :objectMap.values()) {
private void dealTracerLog(List<RunLogMessage> tempLogs) {
if(tempLogs==null){
return;
}
for (RunLogMessage runLogMessage :tempLogs) {
Map<String, Object> map = new HashMap<>(12);
RunLogMessage runLogMessage = (RunLogMessage) object;
map.put("tracerId", runLogMessage.getTracerId());
map.put("className", runLogMessage.getClassName());
map.put("threadName", runLogMessage.getThreadName());
@ -118,41 +108,25 @@ public class TracerConsumer implements WorkHandler<OneTracer> {
map.putAll(runLogMessage.getTagMap());
tracerLogToDbStore.offer(map);
}
}
/**
* 处理filter里处理的出入参
*/
private void dealFilterModel(TracerBean tracerBean) {
List<Map<String, Object>> mapList = tracerBean.getTracerObject();
Map<String, Object> requestMap = mapList.get(0);
Object req = requestMap.get(REQ);
if(req == null){
req = DEFAULT_BYTE;
private void dealFilterModel(List<TracerBean> tracerList) {
if(tracerList==null){
return;
}
requestMap.remove(REQ);
Map<String, Object> map = new HashMap<>(requestMap);
long tracerId = Long.parseLong(tracerBean.getTracerId());
//filter的出入参
Map<String, Object> responseMap = mapList.get(mapList.size() - 1);
Object resp = responseMap.get(RESP);
if(resp == null){
resp = DEFAULT_BYTE;
for(TracerBean tracerModel:tracerList){
if(tracerModel.getResponseContent()==null){
tracerModel.setResponseContent(DEFAULT_BYTE);
}
tracerModel.setCreateTime(formatLongTime(tracerModel.getCreateTimeLong()));
Map map = new HashMap(BeanMap.create(tracerModel));
map.remove("createTimeLong");
tracerModelToDbStore.offer(map);
}
map.put("requestContent", req);
map.put("responseContent", resp);
map.put("costTime", tracerBean.getCostTime());
map.put("tracerId", tracerId);
map.put("createTime", formatLongTime(tracerBean.getCreateTime()));
responseMap.remove("response");
map.putAll(responseMap);
tracerModelToDbStore.offer(map);
}
private static String formatLongTime(long time) {