!10 fix:udp多发

Merge pull request !10 from Davis/meng
This commit is contained in:
tianyaleixiaowu 2022-11-17 12:24:46 +00:00 committed by Gitee
commit b82bbd3bf3
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
3 changed files with 31 additions and 12 deletions

View File

@ -1,5 +1,6 @@
package com.jd.platform.jlog.client.udp;
import com.jd.platform.jlog.client.modeholder.ModeHolder;
import com.jd.platform.jlog.client.worker.WorkerInfoHolder;
import com.jd.platform.jlog.common.utils.AsyncPool;
import okhttp3.*;
@ -7,6 +8,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
@ -42,10 +44,15 @@ public class HttpSender {
*/
private static LinkedBlockingQueue<OneTracer> tracerDataQueue = new LinkedBlockingQueue<>(500);
//地址仅多播时候使用
private static InetSocketAddress address;
/**
* 写入队列
*/
public static void offerBean(byte[] compressBytes) {
public static void offerBean(byte[] compressBytes, InetSocketAddress address) {
if(address!=null){
HttpSender.address=address;
}
OneTracer oneTracer = new OneTracer();
oneTracer.setBytes(compressBytes);
//容量是否已满
@ -64,7 +71,7 @@ public class HttpSender {
}
/**
* 定时往worker发
* 定时往worker发
*/
public static void uploadToWorker() {
//filter拦截到的出入参
@ -112,9 +119,18 @@ public class HttpSender {
.addFormDataPart("data", "data", requestBody)
.build();
//挑一个worker
String rawIpPort = WorkerInfoHolder.chooseWorker();
String ipPort = rawIpPort.substring(0, rawIpPort.lastIndexOf(":")) + ":8080";
String rawIpPort;
String ipPort;
if(ModeHolder.getSendMode().getUnicast()){
//挑一个worker
rawIpPort = WorkerInfoHolder.chooseWorker();
ipPort = rawIpPort.substring(0, rawIpPort.lastIndexOf(":")) + ":8080";
}else{
//直接从TracerData中获取地址
rawIpPort=HttpSender.address.getHostName();
ipPort=rawIpPort+":8080";
}
String url = "http://" + ipPort + "/big/receive";
Request request = new Request.Builder()

View File

@ -86,7 +86,7 @@ public class UdpClient {
//判断压缩完是否过大过大走http接口请求worker
if (compressBytes.length >= COMPRESS_BYTES_LEN) {
//放入发okhttp的队列
HttpSender.offerBean(compressBytes);
HttpSender.offerBean(compressBytes,tracerData.getAddress());
return;
}

View File

@ -16,13 +16,8 @@ public class TracerData implements Serializable {
*/
private List<TracerBean> tracerBeanList;
//发送地址仅多播时候使用
private transient InetSocketAddress address;
/*@Override
public String toString() {
return "TracerData{" +
"tracerBeanList=" + tracerBeanList +
'}';
}*/
public InetSocketAddress getAddress() { return address; }
@ -35,4 +30,12 @@ public class TracerData implements Serializable {
public void setTracerBeanList(List<TracerBean> tracerBeanList) {
this.tracerBeanList = tracerBeanList;
}
@Override
public String toString() {
return "TracerData{" +
"tracerBeanList=" + tracerBeanList +
", address=" + address +
'}';
}
}