diff --git a/client/src/main/java/com/jd/platform/jlog/client/udp/HttpSender.java b/client/src/main/java/com/jd/platform/jlog/client/udp/HttpSender.java index bbad057..1b467be 100644 --- a/client/src/main/java/com/jd/platform/jlog/client/udp/HttpSender.java +++ b/client/src/main/java/com/jd/platform/jlog/client/udp/HttpSender.java @@ -1,5 +1,6 @@ package com.jd.platform.jlog.client.udp; +import com.jd.platform.jlog.client.modeholder.ModeHolder; import com.jd.platform.jlog.client.worker.WorkerInfoHolder; import com.jd.platform.jlog.common.utils.AsyncPool; import okhttp3.*; @@ -7,6 +8,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; @@ -42,10 +44,15 @@ public class HttpSender { */ private static LinkedBlockingQueue tracerDataQueue = new LinkedBlockingQueue<>(500); + //地址(仅多播时候使用) + private static InetSocketAddress address; /** * 写入队列 */ - public static void offerBean(byte[] compressBytes) { + public static void offerBean(byte[] compressBytes, InetSocketAddress address) { + if(address!=null){ + HttpSender.address=address; + } OneTracer oneTracer = new OneTracer(); oneTracer.setBytes(compressBytes); //容量是否已满 @@ -64,7 +71,7 @@ public class HttpSender { } /** - * 定时往worker发烧 + * 定时往worker发送 */ public static void uploadToWorker() { //filter拦截到的出入参 @@ -112,9 +119,18 @@ public class HttpSender { .addFormDataPart("data", "data", requestBody) .build(); - //挑一个worker - String rawIpPort = WorkerInfoHolder.chooseWorker(); - String ipPort = rawIpPort.substring(0, rawIpPort.lastIndexOf(":")) + ":8080"; + String rawIpPort; + String ipPort; + if(ModeHolder.getSendMode().getUnicast()){ + //挑一个worker + rawIpPort = WorkerInfoHolder.chooseWorker(); + ipPort = rawIpPort.substring(0, rawIpPort.lastIndexOf(":")) + ":8080"; + }else{ + //直接从TracerData中获取地址 + rawIpPort=HttpSender.address.getHostName(); + ipPort=rawIpPort+":8080"; + } + String url = "http://" + ipPort + "/big/receive"; Request request = new Request.Builder() diff --git a/client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java b/client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java index 7bf45a4..15881a7 100644 --- a/client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java +++ b/client/src/main/java/com/jd/platform/jlog/client/udp/UdpClient.java @@ -86,7 +86,7 @@ public class UdpClient { //判断压缩完是否过大,过大走http接口请求worker if (compressBytes.length >= COMPRESS_BYTES_LEN) { //放入发okhttp的队列 - HttpSender.offerBean(compressBytes); + HttpSender.offerBean(compressBytes,tracerData.getAddress()); return; } diff --git a/common/src/main/java/com/jd/platform/jlog/common/model/TracerData.java b/common/src/main/java/com/jd/platform/jlog/common/model/TracerData.java index 9354891..a52d211 100644 --- a/common/src/main/java/com/jd/platform/jlog/common/model/TracerData.java +++ b/common/src/main/java/com/jd/platform/jlog/common/model/TracerData.java @@ -16,13 +16,8 @@ public class TracerData implements Serializable { */ private List tracerBeanList; + //发送地址(仅多播时候使用) private transient InetSocketAddress address; - /*@Override - public String toString() { - return "TracerData{" + - "tracerBeanList=" + tracerBeanList + - '}'; - }*/ public InetSocketAddress getAddress() { return address; } @@ -35,4 +30,12 @@ public class TracerData implements Serializable { public void setTracerBeanList(List tracerBeanList) { this.tracerBeanList = tracerBeanList; } + + @Override + public String toString() { + return "TracerData{" + + "tracerBeanList=" + tracerBeanList + + ", address=" + address + + '}'; + } }