From 183c0a6a3a6fe1c303ae2e9b0b9cc364a6366f0c Mon Sep 17 00:00:00 2001 From: zhengshuxin Date: Mon, 29 May 2017 11:47:52 +0800 Subject: [PATCH] add samples for recvmmsg/sendmmsg --- lib_acl/samples/udp/Makefile.in | 156 +++++++++++ lib_acl/samples/udp/client/Makefile | 4 + lib_acl/samples/udp/client/main.cpp | 272 +++++++++++++++++++ lib_acl/samples/udp/client/mt.sh | 2 + lib_acl/samples/udp/client/stdafx.cpp | 1 + lib_acl/samples/udp/client/stdafx.h | 12 + lib_acl/samples/udp/client/t.sh | 2 + lib_acl/samples/udp/server/Makefile | 4 + lib_acl/samples/udp/server/main.cpp | 204 ++++++++++++++ lib_acl/samples/udp/server/mt.sh | 2 + lib_acl/samples/udp/server/stdafx.cpp | 1 + lib_acl/samples/udp/server/stdafx.h | 12 + lib_acl/samples/udp/server/t.sh | 2 + lib_acl/samples/udp/udp.cpp | 377 ++++++++++++++++++++++++++ lib_acl/samples/udp/udp.h | 73 +++++ lib_acl/src/stdlib/acl_vstream.c | 2 +- lib_acl_cpp/changes.txt | 1 + 17 files changed, 1126 insertions(+), 1 deletion(-) create mode 100644 lib_acl/samples/udp/Makefile.in create mode 100644 lib_acl/samples/udp/client/Makefile create mode 100644 lib_acl/samples/udp/client/main.cpp create mode 100644 lib_acl/samples/udp/client/mt.sh create mode 100644 lib_acl/samples/udp/client/stdafx.cpp create mode 100644 lib_acl/samples/udp/client/stdafx.h create mode 100644 lib_acl/samples/udp/client/t.sh create mode 100644 lib_acl/samples/udp/server/Makefile create mode 100644 lib_acl/samples/udp/server/main.cpp create mode 100644 lib_acl/samples/udp/server/mt.sh create mode 100644 lib_acl/samples/udp/server/stdafx.cpp create mode 100644 lib_acl/samples/udp/server/stdafx.h create mode 100644 lib_acl/samples/udp/server/t.sh create mode 100644 lib_acl/samples/udp/udp.cpp create mode 100644 lib_acl/samples/udp/udp.h diff --git a/lib_acl/samples/udp/Makefile.in b/lib_acl/samples/udp/Makefile.in new file mode 100644 index 000000000..ecd655b9e --- /dev/null +++ b/lib_acl/samples/udp/Makefile.in @@ -0,0 +1,156 @@ +CC = $(ENV_CPP) + +CFLAGS = -c -g -W \ +-Wall \ +-Wcast-qual \ +-Waggregate-return \ +-Wno-long-long \ +-Wpointer-arith \ +-Werror \ +-Wshadow \ +-O2 \ +-D_REENTRANT \ +-D_POSIX_PTHREAD_SEMANTICS \ +-D_USE_FAST_MACRO + +#-Wcast-align + +########################################################### +#Check system: +# Linux, SunOS, Solaris, BSD variants, AIX, HP-UX +SYSLIB = -lpthread +CHECKSYSRES = @echo "Unknow system type!";exit 1 +UNIXNAME = $(shell uname -sm) + +ifeq ($(CC),) + CC = g++ +endif + +ifeq ($(findstring gcc, $(CC)), gcc) + CFLAGS += -Wstrict-prototypes +endif + +ifeq ($(findstring clang, $(CC)), clang) + CFLAGS += -Wstrict-prototypes \ + -Wno-invalid-source-encoding \ + -Wno-extended-offsetof +endif + +ifeq ($(findstring clang++, $(CC)), clang++) + CFLAGS += -Wno-invalid-source-encoding \ + -Wno-extended-offsetof +endif + +# For FreeBSD +ifeq ($(findstring FreeBSD, $(UNIXNAME)), FreeBSD) + CFLAGS += -DFREEBSD -pedantic + SYSLIB += -lcrypt +endif + +# For Darwin +ifeq ($(findstring Darwin, $(UNIXNAME)), Darwin) + CFLAGS += -DMACOSX -Wno-invalid-source-encoding \ + -Wno-extended-offsetof + UNIXTYPE = MACOSX +endif + +#Path for Linux +ifeq ($(findstring Linux, $(UNIXNAME)), Linux) +# CFLAGS += -DLINUX2 -pedantic + CFLAGS += -pedantic + SYSLIB += -lcrypt +endif + +# For MINGW +ifeq ($(findstring MINGW, $(UNIXNAME)), MINGW) + SYSLIB = -lpthread-2 + CFLAGS += -DLINUX2 -DMINGW + UNIXTYPE = LINUX +endif + +# For MSYS +ifeq ($(findstring MSYS, $(UNIXNAME)), MSYS) + SYSLIB = -lpthread-2 + CFLAGS += -DLINUX2 -DMINGW + UNIXTYPE = LINUX +endif + +#Path for SunOS +ifeq ($(findstring SunOS, $(UNIXNAME)), SunOS) + ifeq ($(findstring 86, $(UNIXNAME)), 86) + SYSLIB += -lsocket -lnsl -lrt + endif + ifeq ($(findstring sun4u, $(UNIXNAME)), sun4u) + SYSLIB += -lsocket -lnsl -lrt + endif + CFLAGS += -DSUNOS5 -pedantic + SYSLIB += -lcrypt +endif + +#Path for HP-UX +ifeq ($(findstring HP-UX, $(UNIXNAME)), HP-UX) + CFLAGS += -DHP_UX -DHPUX11 + PLAT_NAME=hp-ux + SYSLIB += -lcrypt +endif + +# For Darwin +ifeq ($(findstring Darwin, $(UNIXNAME)), Darwin) + CFLAGS += -DMACOSX +endif + +#Find system type. +ifneq ($(SYSPATH),) + CHECKSYSRES = @echo "System is $(shell uname -sm)" +endif +########################################################### + +BASE_PATH = +ifneq ($(base_path),) + BASE_PATH = $(base_path) +else + BASE_PATH = ../.. +endif + +ACL_PATH = $(BASE_PATH) +ACL_INC = $(ACL_PATH)/include +ACL_LIB = $(ACL_PATH)/lib + +EXTLIBS = +#CFLAGS += -I$(ACL_INC) -I$(PROTO_INC) +#LDFLAGS = -L$(ACL_LIB) -L$(PROTO_LIB) -l_protocol -lacl $(EXTLIBS) $(SYSLIB) +CFLAGS += -I$(ACL_INC) -I.. -I. +LDFLAGS = -L$(ACL_LIB) -lacl $(EXTLIBS) $(SYSLIB) + + +########################################################### + +OUT_PATH = . +OBJ_PATH = $(OUT_PATH) + +#Project's objs +SRC = $(wildcard *.cpp) $(wildcard ../*.cpp) +OBJ = $(patsubst %.cpp, $(OBJ_PATH)/%.o, $(notdir $(SRC))) +########################################################### + +.PHONY = all clean +PROG = + +COMPILE = $(CC) $(CFLAGS) + +# -Wl,-rpath,$(ACL_LIB) -Wl,-rpath,$(PROTO_LIB) -o $(OBJ_PATH)/$(PROG) +all: RM $(OBJ) + $(CC) $(OBJ) $(LDFLAGS) -o $(OBJ_PATH)/$(PROG) + @echo "" + @echo "All ok! Output:$(PROG)" + @echo "" +$(OBJ_PATH)/%.o: %.cpp + $(COMPILE) $< -o $@ +$(OBJ_PATH)/%.o: ../%.cpp + $(COMPILE) $< -o $@ +RM: + rm -f $(PROG) +clean: + rm -f $(PROG) + rm -f $(OBJ) +########################################################### diff --git a/lib_acl/samples/udp/client/Makefile b/lib_acl/samples/udp/client/Makefile new file mode 100644 index 000000000..46e8b35a0 --- /dev/null +++ b/lib_acl/samples/udp/client/Makefile @@ -0,0 +1,4 @@ +base_path = ../../.. +include ../Makefile.in +CFLAGS += -Wno-write-strings -Wno-pedantic +PROG = client diff --git a/lib_acl/samples/udp/client/main.cpp b/lib_acl/samples/udp/client/main.cpp new file mode 100644 index 000000000..02fad236b --- /dev/null +++ b/lib_acl/samples/udp/client/main.cpp @@ -0,0 +1,272 @@ +#include "stdafx.h" +#include "udp.h" + +static double stamp_sub(const struct timeval *from, const struct timeval *sub_by) +{ + struct timeval res; + + memcpy(&res, from, sizeof(struct timeval)); + + res.tv_usec -= sub_by->tv_usec; + if (res.tv_usec < 0) { + --res.tv_sec; + res.tv_usec += 1000000; + } + res.tv_sec -= sub_by->tv_sec; + + return res.tv_sec * 1000.0 + res.tv_usec/1000.0; +} + +static int sio(SOCK_UDP *sock, const char *data, int dlen, int count, + int inter, bool need_read) +{ + int i, ret; + char buf[4096]; + + for (i = 0; i < count; i++) { + ret = udp_send(sock, data, dlen); + if (ret == -1) { + printf("sock_write error %s\r\n", acl_last_serror()); + break; + } + + if (need_read) { + ret = udp_read(sock, buf, sizeof(buf) - 1); + if (ret == -1) { + printf("sock_read error %s\r\n", + acl_last_serror()); + break; + } else + buf[ret] = 0; + if (i % inter == 0) + printf("result: %s\r\n", buf); + } + + if (i % inter == 0) { + snprintf(buf, sizeof(buf), + "total: %d, curr: %d, dlen=%d", + count, i, dlen); + ACL_METER_TIME(buf); + } + } + + return i; +} + +static int mio(SOCK_UDP *sock, const char *peer_addr, int dlen, + int count, int inter, bool echo) +{ +#define PKT_CNT 2 +#define DAT_LEN 100 + + char bufs[PKT_CNT][DAT_LEN]; + SOCK_PKT pkts[PKT_CNT]; + + int n = 0, total_write = 0; + + if (dlen > DAT_LEN) + dlen = DAT_LEN; + + for (int j = 0; j < PKT_CNT; j++) { + memset(&bufs[j], 'X', dlen - 2); + bufs[j][dlen - 1] = 0; + udp_pkt_set_buf(&pkts[j], bufs[j], DAT_LEN); + udp_pkt_set_peer(&pkts[j], peer_addr); + } + + for (int i = 0; i < count; i++) { + for (int j = 0; j < PKT_CNT; j++) { + PKT_IOV_LEN(&pkts[j]) = DAT_LEN; + } + + int nout = udp_msend(sock, pkts, PKT_CNT); + if (nout < 0) { + printf("sock_mwrite error %s\r\n", acl_last_serror()); + break; + } + + total_write += nout; + + int nin = 0; + if (echo) { + nin = udp_mread(sock, pkts, PKT_CNT); + if (nin < 0) { + printf("sock_mread error %s\r\n", acl_last_serror()); + break; + } + } + + if (++n % inter == 0) { + char buf[256], ip[64]; + snprintf(buf, sizeof(buf), "total: %d, curr: %d, " + "nout: %d, nin: %d, dlen: %d, ip: %s, port: %d", + total_write, n, nout, nin, dlen, + udp_ip(sock, nin > 0 ? n % nin : 0, + ip, sizeof(ip)), + udp_port(sock, nin > 0 ? n % nin : 0)); + ACL_METER_TIME(buf); + } + } + + return n; +} + +static void run(const char *local_addr, const char *peer_addr, + int count, int dlen, int inter, int need_read, int mio_on, int quit) +{ + int i; + double spent; + char data[4096]; + struct timeval begin, end; + SOCK_UDP *sock = udp_client_open(local_addr, peer_addr); + + (void) quit; + + if (sock == NULL) { + printf("open_sock error!\r\n"); + return; + } + + if (dlen > (int) sizeof(data) - 1) + dlen = (int) sizeof(data) - 1; + for (i = 0; i < dlen; i++) + data[i] = 'X'; + data[dlen] = 0; + + gettimeofday(&begin, NULL); + + if (mio_on) + i = mio(sock, peer_addr, dlen, count, inter, need_read); + else + i = sio(sock, data, dlen, count, inter, need_read); + + gettimeofday(&end, NULL); + spent = stamp_sub(&end, &begin); + + printf("thread: %lu, total: %d, curr: %d, spent: %.2f, speed: %.2f\r\n", + (unsigned long) acl_pthread_self(), count, i, spent, + (i * 1000) / (spent > 1 ? spent : 1)); + + printf("thread: %lu, peer addr: %s\r\n", + (unsigned long) acl_pthread_self(), peer_addr); + + udp_close(sock); +} + +typedef struct { + char local[64]; + char peer[64]; + int count; + int dlen; + int inter; + int need_read; + int mio_on; + int quit; +} THREAD_CTX; + +static void thread_run(void *ctx) +{ + THREAD_CTX *tc = (THREAD_CTX*) ctx; + + run(tc->local, tc->peer, tc->count, tc->dlen, + tc->inter, tc->need_read, tc->mio_on, tc->quit); + acl_myfree(ctx); +} + +static void usage(const char *procname) +{ + printf("usage: %s -h [help]\r\n" + " -s server_addr [default: 127.0.0.1:8888]\r\n" + " -l local_addr [default: 127.0.0.1:0]\r\n" + " -t thread_count [default: 1]\r\n" + " -i print_per_loop [default: 1000]\r\n" + " -N data_len [default: 100]\r\n" + " -r if_need_read [default: false]\r\n" + " -m if_mio_on[default: false]\r\n" + " -q if_send_quit when over [default: false]\r\n" + " -n loop_count [default: 1]\r\n", procname); +} + +int main(int argc, char *argv[]) +{ + char peer[64], local[64]; + int ch, count = 1, dlen = 100, inter = 1000, nthreads = 1, quit = 0; + int need_read = 0, mio_on = 0; + + acl_lib_init(); + acl_msg_stdout_enable(1); + + snprintf(peer, sizeof(peer), "127.0.0.1:8888"); + snprintf(local, sizeof(local), "127.0.0.1:0"); + + while ((ch = getopt(argc, argv, "hl:s:n:N:i:t:rqm")) > 0) { + switch (ch) { + case 'h': + usage(argv[0]); + return 0; + case 'l': + snprintf(local, sizeof(local), "%s", optarg); + break; + case 's': + snprintf(peer, sizeof(peer), "%s", optarg); + break; + case 'n': + count = atoi(optarg); + break; + case 'N': + dlen = atoi(optarg); + break; + case 'i': + inter = atoi(optarg); + break; + case 't': + nthreads = atoi(optarg); + break; + case 'r': + need_read = 1; + break; + case 'q': + quit = 1; + break; + case 'm': + mio_on = 1; + break; + default: + break; + } + } + + if (peer[0] == 0 || local[0] == 0) { + usage(argv[0]); + return 1; + } + + if (nthreads > 1) { + int i; + acl_pthread_pool_t *threads = + acl_thread_pool_create(nthreads, 120); + for (i = 0; i < nthreads; i++) { + THREAD_CTX *ctx = (THREAD_CTX*) + acl_mymalloc(sizeof(THREAD_CTX)); + snprintf(ctx->local, sizeof(ctx->local), "%s", local); + snprintf(ctx->peer, sizeof(ctx->peer), "%s", peer); + ctx->count = count; + ctx->dlen = dlen; + ctx->inter = inter; + ctx->need_read = need_read; + ctx->mio_on = mio_on; + ctx->quit = quit; + acl_pthread_pool_add(threads, thread_run, ctx); + } + + acl_pthread_pool_destroy(threads); + } else + run(local, peer, count, dlen, inter, need_read, mio_on, quit); + + printf("\r\nlocal: %s, peer: %s, count: %d, dlen: %d, inter: %d\r\n", + local, peer, count, dlen, inter); + + acl_lib_end(); + + return 0; +} diff --git a/lib_acl/samples/udp/client/mt.sh b/lib_acl/samples/udp/client/mt.sh new file mode 100644 index 000000000..5796360b5 --- /dev/null +++ b/lib_acl/samples/udp/client/mt.sh @@ -0,0 +1,2 @@ +#!/bin/sh +./client -i 50000 -n 10000000 -N 100 -m -r diff --git a/lib_acl/samples/udp/client/stdafx.cpp b/lib_acl/samples/udp/client/stdafx.cpp new file mode 100644 index 000000000..a27b824da --- /dev/null +++ b/lib_acl/samples/udp/client/stdafx.cpp @@ -0,0 +1 @@ +#include "stdafx.h" diff --git a/lib_acl/samples/udp/client/stdafx.h b/lib_acl/samples/udp/client/stdafx.h new file mode 100644 index 000000000..71ab9da2e --- /dev/null +++ b/lib_acl/samples/udp/client/stdafx.h @@ -0,0 +1,12 @@ +#ifndef __STDAFX_INCLUDE_H__ +#define __STDAFX_INCLUDE_H__ + +#include "lib_acl.h" + +#ifdef WIN32 +# ifndef snprintf +# define snprintf _snprintf +# endif +#endif + +#endif diff --git a/lib_acl/samples/udp/client/t.sh b/lib_acl/samples/udp/client/t.sh new file mode 100644 index 000000000..4829f00da --- /dev/null +++ b/lib_acl/samples/udp/client/t.sh @@ -0,0 +1,2 @@ +#!/bin/sh +./client -i 100000 -n 10000000 -N 100 -r diff --git a/lib_acl/samples/udp/server/Makefile b/lib_acl/samples/udp/server/Makefile new file mode 100644 index 000000000..d01166c4c --- /dev/null +++ b/lib_acl/samples/udp/server/Makefile @@ -0,0 +1,4 @@ +base_path = ../../.. +include ../Makefile.in +CFLAGS += -Wno-write-strings -Wno-pedantic +PROG = server diff --git a/lib_acl/samples/udp/server/main.cpp b/lib_acl/samples/udp/server/main.cpp new file mode 100644 index 000000000..1a0af88ac --- /dev/null +++ b/lib_acl/samples/udp/server/main.cpp @@ -0,0 +1,204 @@ +#include "stdafx.h" +#include "udp.h" + +static void sio(SOCK_UDP *sock, int inter, bool echo) +{ + int i = 0, ret; + char buf[4096]; + + while (true) { + ret = udp_read(sock, buf, sizeof(buf) - 1); + if (ret == -1) { + printf("sock_read error %s\r\n", acl_last_serror()); + break; + } else + buf[ret] = 0; + + if (++i % inter == 0) + printf("result: %s\r\n", buf); + + if (echo && (ret = udp_send(sock, buf, ret)) == -1) { + printf("sock_write error %s\r\n", acl_last_serror()); + break; + } + + if (i % inter == 0) { + snprintf(buf, sizeof(buf), "curr: %d, dlen=%d", i, ret); + ACL_METER_TIME(buf); + } + } +} + +static void mio(SOCK_UDP *sock, int inter, bool echo) +{ +#define PKT_CNT 20 +#define BUF_LEN 1024 + + char bufs[PKT_CNT][BUF_LEN]; + SOCK_PKT pkts[PKT_CNT]; + + int i, nread = 0, nwrite = 0, total_read = 0, n = 0; + + while (true) { + for (i = 0; i < PKT_CNT; i++) { + bufs[i][0] = 0; + udp_pkt_set_buf(&pkts[i], bufs[i], BUF_LEN); + } + + int ret = udp_mread(sock, pkts, PKT_CNT); + printf(">>.ret: %d\r\n", ret); + if (ret <= 0) { + printf(">>read error ret: %d, errno: %d, %d\r\n", + ret, errno, EMSGSIZE); + break; + } + + nread++; + total_read += ret; + + for (i = 0; i < ret; i++) { + PKT_IOV_LEN(&pkts[i]) = SOCK_PKT_LEN(sock, i); +#if 0 + printf("[%s], addr_len: %d, %lu\r\n", + (char *) PKT_IOV_DAT(&pkts[i]), + (int) pkts[i].addr_len, sizeof(pkts[i].addr)); + struct sockaddr_in *in = (struct sockaddr_in *) + sock->msgvec[i].msg_hdr.msg_name; + printf("ip: %s, port: %d\r\n", + inet_ntoa(in->sin_addr), ntohs(in->sin_port)); +#endif + } + + if (echo) { + ret = udp_msend(sock, pkts, ret); + if (ret <= 0) { + printf("sock_mwrite error %s, ret: %d\r\n", + acl_last_serror(), ret); + printf("len: %d\r\n", SOCK_PKT_LEN(sock, 0)); + break; + } + nwrite++; + } + + if (++n % inter == 0) { + char buf[256], ip[64], k; + + k = ret > 0 ? time(NULL) % ret : 0; + + snprintf(buf, sizeof(buf), + "curr: %d, total: %d, ret: %d, dlen: %ld, " + "nread: %d, nwrite: %d, ip: %s, port-%d: %d", + i, total_read, ret, PKT_IOV_LEN(&pkts[0]), + nread, nwrite, + udp_ip(sock, k, ip, sizeof(ip)), + k, udp_port(sock, k)); + ACL_METER_TIME(buf); + } + } +} + +static void run(const char *local_addr, int inter, int mio_on, int need_echo) +{ + SOCK_UDP *sock = udp_server_open(local_addr); + + if (sock == NULL) { + printf("open_sock error!\r\n"); + return; + } + + if (mio_on) + mio(sock, inter, need_echo); + else + sio(sock, inter, need_echo); + + udp_close(sock); +} + +typedef struct { + char local[64]; + int inter; + int mio_on; + int need_echo; +} THREAD_CTX; + +static void thread_run(void *ctx) +{ + THREAD_CTX *tc = (THREAD_CTX*) ctx; + + run(tc->local, tc->inter, tc->mio_on, tc->need_echo); + acl_myfree(ctx); +} + +static void usage(const char *procname) +{ + printf("usage: %s -h [help]\r\n" + " -s server_addr [default: 127.0.0.1:8888]\r\n" + " -t thread_count [default: 1]\r\n" + " -i print_per_loop [default: 1000]\r\n" + " -m if_mio_on[default: false]\r\n" + " -e if_echo [default: false]\r\n", procname); +} + +int main(int argc, char *argv[]) +{ + char local[64]; + int ch, inter = 1000, nthreads = 1; + int need_echo = 0, mio_on = 0; + + acl_lib_init(); + acl_msg_stdout_enable(1); + + snprintf(local, sizeof(local), "127.0.0.1:8888"); + + while ((ch = getopt(argc, argv, "hs:i:t:em")) > 0) { + switch (ch) { + case 'h': + usage(argv[0]); + return 0; + case 's': + snprintf(local, sizeof(local), "%s", optarg); + break; + case 'i': + inter = atoi(optarg); + break; + case 't': + nthreads = atoi(optarg); + break; + case 'e': + need_echo = 1; + break; + case 'm': + mio_on = 1; + break; + default: + break; + } + } + + if (local[0] == 0) { + usage(argv[0]); + return 1; + } + + if (nthreads > 1) { + int i; + acl_pthread_pool_t *threads = + acl_thread_pool_create(nthreads, 120); + for (i = 0; i < nthreads; i++) { + THREAD_CTX *ctx = (THREAD_CTX*) + acl_mymalloc(sizeof(THREAD_CTX)); + snprintf(ctx->local, sizeof(ctx->local), "%s", local); + ctx->inter = inter; + ctx->need_echo = need_echo; + ctx->mio_on = mio_on; + acl_pthread_pool_add(threads, thread_run, ctx); + } + + acl_pthread_pool_destroy(threads); + } else + run(local, inter, mio_on, need_echo); + + acl_lib_end(); + + return 0; +} diff --git a/lib_acl/samples/udp/server/mt.sh b/lib_acl/samples/udp/server/mt.sh new file mode 100644 index 000000000..218889d13 --- /dev/null +++ b/lib_acl/samples/udp/server/mt.sh @@ -0,0 +1,2 @@ +#!/bin/sh +./server -i 50000 -m -e diff --git a/lib_acl/samples/udp/server/stdafx.cpp b/lib_acl/samples/udp/server/stdafx.cpp new file mode 100644 index 000000000..a27b824da --- /dev/null +++ b/lib_acl/samples/udp/server/stdafx.cpp @@ -0,0 +1 @@ +#include "stdafx.h" diff --git a/lib_acl/samples/udp/server/stdafx.h b/lib_acl/samples/udp/server/stdafx.h new file mode 100644 index 000000000..71ab9da2e --- /dev/null +++ b/lib_acl/samples/udp/server/stdafx.h @@ -0,0 +1,12 @@ +#ifndef __STDAFX_INCLUDE_H__ +#define __STDAFX_INCLUDE_H__ + +#include "lib_acl.h" + +#ifdef WIN32 +# ifndef snprintf +# define snprintf _snprintf +# endif +#endif + +#endif diff --git a/lib_acl/samples/udp/server/t.sh b/lib_acl/samples/udp/server/t.sh new file mode 100644 index 000000000..d4675110c --- /dev/null +++ b/lib_acl/samples/udp/server/t.sh @@ -0,0 +1,2 @@ +#!/bin/sh +./server -i 100000 -e diff --git a/lib_acl/samples/udp/udp.cpp b/lib_acl/samples/udp/udp.cpp new file mode 100644 index 000000000..7d37739f0 --- /dev/null +++ b/lib_acl/samples/udp/udp.cpp @@ -0,0 +1,377 @@ +/** + * Copyright (C) 2015-2018 + * All rights reserved. + * + * AUTHOR(S) + * Zheng Shuxin + * E-mail: zhengshuxin@qiyi.com + * + * VERSION + * Thu 25 May 2017 05:25:43 PM CST + */ +#include "stdafx.h" +#include "udp.h" + +static bool host_port(char *buf, char **host, char **port) +{ + const char *ptr = acl_host_port(buf, host, "", port, (char*) NULL); + + if (ptr != NULL) { + acl_msg_error("%s(%d): invalid addr %s, %s", + __FILE__, __LINE__, buf, ptr); + return false; + } + + if (*port == NULL || atoi(*port) < 0) { + acl_msg_error("%s(%d): invalid port: %s, addr: %s", + __FILE__, __LINE__, *port ? *port : "null", buf); + return false; + } + + if (*host && **host == 0) + *host = 0; + if (*host == NULL) + *host = "0"; + + return true; +} + +static struct addrinfo *host_addrinfo(const char *addr) +{ + int err; + struct addrinfo hints, *res0; + char *buf = acl_mystrdup(addr), *host = NULL, *port = NULL; + + if (host_port(buf, &host, &port) == false) { + acl_myfree(buf); + return NULL; + } + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = PF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; +#ifdef ACL_MACOSX + hints.ai_flags = AI_DEFAULT; +#elif defined(ACL_ANDROID) + hints.ai_flags = AI_ADDRCONFIG; +#elif defined(ACL_WINDOWS) + hints.ai_protocol = IPPROTO_UDP; +# if _MSC_VER >= 1500 + hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG; +# endif +#else + hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG; +#endif + if ((err = getaddrinfo(host, port, &hints, &res0))) { + acl_msg_error("%s(%d): getaddrinfo error %s, peer=%s", + __FILE__, __LINE__, gai_strerror(err), host); + acl_myfree(buf); + return NULL; + } + + acl_myfree(buf); + return res0; +} + +static int bind_addr(struct addrinfo *res0, struct addrinfo **res) +{ + struct addrinfo *it; + int on, fd; + + for (it = res0; it != NULL ; it = it->ai_next) { + fd = socket(it->ai_family, it->ai_socktype, it->ai_protocol); + if (fd == ACL_SOCKET_INVALID) { + acl_msg_error("%s(%d): create socket %s", + __FILE__, __LINE__, acl_last_serror()); + return ACL_SOCKET_INVALID; + } + + on = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, + (const void *) &on, sizeof(on)) < 0) + { + acl_msg_warn("%s(%d): setsockopt(SO_REUSEADDR): %s", + __FILE__, __LINE__, acl_last_serror()); + } + +#if defined(SO_REUSEPORT) + on = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, + (const void *) &on, sizeof(on)) < 0) + { + acl_msg_warn("%s(%d): setsocket(SO_REUSEPORT): %s", + __FILE__, __LINE__, acl_last_serror()); + } +#endif + +#ifdef ACL_WINDOWS + if (bind(fd, it->ai_addr, (int) it->ai_addrlen) == 0) +#else + if (bind(fd, it->ai_addr, it->ai_addrlen) == 0) +#endif + { + *res = it; + return fd; + } + + acl_msg_error("%s(%d): bind error %s", + __FILE__, __LINE__, acl_last_serror()); + acl_socket_close(fd); + } + + return ACL_SOCKET_INVALID; +} + +static SOCK_UDP *sock_open(const char *addr) +{ + SOCK_UDP *sock; + struct addrinfo *res0, *res; + int fd; + + res0 = host_addrinfo(addr); + if (res0 == NULL) + return NULL; + + fd = bind_addr(res0, &res); + if (fd == ACL_SOCKET_INVALID) { + acl_msg_error("%s(%d): invalid socket", __FILE__, __LINE__); + freeaddrinfo(res0); + return NULL; + } + + sock = (SOCK_UDP *) acl_mycalloc(1, sizeof(SOCK_UDP)); + sock->fd = fd; + memcpy(&sock->sa_local, res->ai_addr, res->ai_addrlen); + sock->sa_local_len = res->ai_addrlen; + + freeaddrinfo(res0); + return sock; +} + +SOCK_UDP *udp_client_open(const char *local, const char *peer) +{ + struct addrinfo *peer_res0 = host_addrinfo(peer); + SOCK_UDP *sock; + + if (peer_res0 == NULL) + return NULL; + + sock = sock_open(local); + if (sock == NULL) { + freeaddrinfo(peer_res0); + return NULL; + } + + memcpy(&sock->sa_peer, peer_res0->ai_addr, peer_res0->ai_addrlen); + sock->sa_peer_len = peer_res0->ai_addrlen; + freeaddrinfo(peer_res0); + return sock; +} + +SOCK_UDP *udp_server_open(const char *local) +{ + return sock_open(local); +} + +void udp_close(SOCK_UDP *sock) +{ + acl_socket_close(sock->fd); + acl_myfree(sock); +} + +int udp_read(SOCK_UDP *sock, void *buf, size_t size) +{ + ssize_t ret; + + sock->sa_peer_len = sizeof(sock->sa_peer); + ret = recvfrom(sock->fd, buf, size, 0, + (struct sockaddr *) &sock->sa_peer, &sock->sa_peer_len); + return ret; +} + +int udp_send(SOCK_UDP *sock, const void *data, size_t len) +{ + ssize_t ret = sendto(sock->fd, data, len, 0, + (struct sockaddr *) &sock->sa_peer, sock->sa_peer_len); + return ret; +} + +void udp_pkt_set_buf(SOCK_PKT *pkt, char *buf, size_t len) +{ + pkt->iov.iov_base = buf; + pkt->iov.iov_len = len; + pkt->addr_len = (socklen_t) sizeof(SOCK_ADDR); +} + +int udp_pkt_set_peer(SOCK_PKT *pkt, const char *addr) +{ + struct addrinfo *peer_res0 = host_addrinfo(addr); + + if (peer_res0 == NULL) + return -1; + memcpy(&pkt->addr, peer_res0->ai_addr, peer_res0->ai_addrlen); + pkt->addr_len = peer_res0->ai_addrlen; + freeaddrinfo(peer_res0); + return 0; +} + +int udp_mread(SOCK_UDP *sock, SOCK_PKT pkts[], size_t pkts_cnt) +{ + unsigned int flags = MSG_WAITFORONE /* | MSG_DONTWAIT */; + size_t i; + + if (sock->msgvec == NULL) { + sock->vlen = pkts_cnt; + sock->msgvec = (struct mmsghdr *) + acl_mycalloc(pkts_cnt, sizeof(struct mmsghdr)); + } else if (sock->vlen < pkts_cnt) { + acl_myfree(sock->msgvec); + sock->vlen = pkts_cnt; + sock->msgvec = (struct mmsghdr *) + acl_mycalloc(pkts_cnt, sizeof(struct mmsghdr)); + } + + sock->pkts = pkts; + sock->pkts_cnt = pkts_cnt; + + memset(sock->msgvec, 0, sizeof(struct mmsghdr) * pkts_cnt); + + for (i = 0; i < pkts_cnt; i++) { + sock->msgvec[i].msg_hdr.msg_iov = &pkts[i].iov; + sock->msgvec[i].msg_hdr.msg_iovlen = 1; + sock->msgvec[i].msg_hdr.msg_name = &pkts[i].addr; + sock->msgvec[i].msg_hdr.msg_namelen = sizeof(pkts[i].addr); + sock->msgvec[i].msg_len = 0; + } + + return recvmmsg(sock->fd, sock->msgvec, pkts_cnt, flags, NULL); +} + +int udp_msend(SOCK_UDP *sock, SOCK_PKT pkts[], size_t pkts_cnt) +{ + size_t i; + unsigned int flags = 0; +#ifndef __linux3__ + int n = 0; +#endif + + if (sock->msgvec == NULL) { + sock->vlen = pkts_cnt; + sock->msgvec = (struct mmsghdr *) + acl_mycalloc(pkts_cnt, sizeof(struct mmsghdr)); + } else if (sock->vlen < pkts_cnt) { + acl_myfree(sock->msgvec); + sock->vlen = pkts_cnt; + sock->msgvec = (struct mmsghdr *) + acl_mycalloc(pkts_cnt, sizeof(struct mmsghdr)); + } + + sock->pkts = pkts; + sock->pkts_cnt = pkts_cnt; + + memset(sock->msgvec, 0, sizeof(struct mmsghdr) * pkts_cnt); + + for (i = 0; i < pkts_cnt; i++) { + sock->msgvec[i].msg_hdr.msg_iov = &pkts[i].iov; + sock->msgvec[i].msg_hdr.msg_iovlen = 1; + sock->msgvec[i].msg_hdr.msg_name = &pkts[i].addr; + sock->msgvec[i].msg_hdr.msg_namelen = sizeof(pkts[i].addr); + sock->msgvec[i].msg_len = 0; +#ifndef __linux3__ + if (sendmsg(sock->fd, &sock->msgvec[i].msg_hdr, flags) < 0) + return -1; + n++; +#endif + } + +#ifdef __linux3__ + return sendmmsg(sock->fd, sock->msgvec, pkts_cnt, flags); +#else + return n; +#endif +} + +int pkt_port(SOCK_PKT *pkt) +{ + if (pkt->addr.sa.sa.sa_family == AF_INET) { + struct sockaddr_in *in = &pkt->addr.sa.in; + return ntohs(in->sin_port); + } +#ifdef AF_INET6 + else if (pkt->addr.sa.sa.sa_family == AF_INET6) { + struct sockaddr_in6 *in = &pkt->addr.sa.in6; + return ntohl(in->sin6_port); + } +#endif + else { + acl_msg_error("%s(%d): unkown sa_family=%d", + __FUNCTION__, __LINE__, pkt->addr.sa.sa.sa_family); + return -1; + } +} + +int udp_port(SOCK_UDP *sock, size_t i) +{ + if (sock->msgvec == NULL) { + acl_msg_error("%s(%d): msgvec NULL", __FUNCTION__, __LINE__); + return -1; + } + if (sock->pkts == NULL) { + acl_msg_error("%s(%d): pkts NULL", __FUNCTION__, __LINE__); + return -1; + } + if (sock->pkts_cnt == 0) { + acl_msg_error("%s(%d): pkts_cnt 0", __FUNCTION__, __LINE__); + return -1; + } + if (i >= sock->pkts_cnt) { + acl_msg_error("%s(%d): invalid i=%d >= %d", + __FUNCTION__, __LINE__, (int) i, (int) sock->pkts_cnt); + return -1; + } + + return pkt_port(&sock->pkts[i]); +} + +const char *pkt_ip(SOCK_PKT *pkt, char *buf, size_t size) +{ + if (pkt->addr.sa.sa.sa_family == AF_INET) { + struct sockaddr_in *in = &pkt->addr.sa.in; + return inet_ntop(in->sin_family, &in->sin_addr, buf, size); + } +#ifdef AF_INET6 + else if (pkt->addr.sa.sa.sa_family == AF_INET6) { + struct sockaddr_in6 *in = &pkt->addr.sa.in6; + return inet_ntop(in->sin6_family, &in->sin6_addr, buf, size); + } +#endif + else { + acl_msg_error("%s(%d): unkown sa_family=%d", + __FUNCTION__, __LINE__, pkt->addr.sa.sa.sa_family); + return NULL; + } +} + +const char *udp_ip(SOCK_UDP *sock, size_t i, char *buf, size_t size) +{ + if (sock->msgvec == NULL) { + acl_msg_error("%s(%d): msgvec NULL", __FUNCTION__, __LINE__); + return NULL; + } + if (sock->pkts == NULL) { + acl_msg_error("%s(%d): pkts NULL", __FUNCTION__, __LINE__); + return NULL; + } + if (sock->pkts_cnt == 0) { + acl_msg_error("%s(%d): pkts_cnt 0", __FUNCTION__, __LINE__); + return NULL; + } + if (i >= sock->pkts_cnt) { + acl_msg_error("%s(%d): invalid i=%d >= %d", + __FUNCTION__, __LINE__, (int) i, (int) sock->pkts_cnt); + return NULL; + } + + return pkt_ip(&sock->pkts[i], buf, size); +} + diff --git a/lib_acl/samples/udp/udp.h b/lib_acl/samples/udp/udp.h new file mode 100644 index 000000000..d96a8eddf --- /dev/null +++ b/lib_acl/samples/udp/udp.h @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2015-2018 + * All rights reserved. + * + * AUTHOR(S) + * Zheng Shuxin + * E-mail: zhengshuxin@qiyi.com + * + * VERSION + * Thu 25 May 2017 05:26:42 PM CST + */ + +#pragma once +#include +#include +#include +#include + +typedef struct SOCK_ADDR { + union { + struct sockaddr_storage ss; +#ifdef AF_INET6 + struct sockaddr_in6 in6; +#endif + struct sockaddr_in in; +#ifdef ACL_UNIX + struct sockaddr_un un; +#endif + struct sockaddr sa; + } sa; +} SOCK_ADDR; + +typedef struct SOCK_PKT { + struct iovec iov; + SOCK_ADDR addr; + socklen_t addr_len; +} SOCK_PKT; + +typedef struct SOCK_UDP { + int fd; + SOCK_ADDR sa_local; + socklen_t sa_local_len; + + SOCK_ADDR sa_peer; + socklen_t sa_peer_len; + + SOCK_PKT *pkts; + size_t pkts_cnt; + + struct mmsghdr *msgvec; + size_t vlen; +} SOCK_UDP; + +SOCK_UDP *udp_client_open(const char* local, const char *peer); +SOCK_UDP *udp_server_open(const char* local); + +void udp_close(SOCK_UDP *sock); +int udp_send(SOCK_UDP *sock, const void *data, size_t len); +int udp_read(SOCK_UDP *sock, void *buf, size_t size); + +void udp_pkt_set_buf(SOCK_PKT *pkt, char *buf, size_t len); +int udp_pkt_set_peer(SOCK_PKT *pkt, const char *addr); +int udp_mread(SOCK_UDP *sock, SOCK_PKT pkts[], size_t pkts_cnt); +int udp_msend(SOCK_UDP *sock, SOCK_PKT pkts[], size_t pkts_cnt); + +int pkt_port(SOCK_PKT *pkt); +int udp_port(SOCK_UDP *sock, size_t i); +const char *pkt_ip(SOCK_PKT *pkt, char *buf, size_t size); +const char *udp_ip(SOCK_UDP *sock, size_t i, char *buf, size_t size); + +#define PKT_IOV_DAT(pp) ((pp)->iov.iov_base) +#define PKT_IOV_LEN(pp) ((pp)->iov.iov_len) +#define SOCK_PKT_LEN(ss, ii) ((ss)->msgvec[(ii)].msg_len) diff --git a/lib_acl/src/stdlib/acl_vstream.c b/lib_acl/src/stdlib/acl_vstream.c index 913383e8b..0191bcc8e 100644 --- a/lib_acl/src/stdlib/acl_vstream.c +++ b/lib_acl/src/stdlib/acl_vstream.c @@ -3077,7 +3077,7 @@ void acl_vstream_set_local_addr(ACL_VSTREAM *fp, const struct sockaddr *sa) int port; struct sockaddr_in6 *in = (struct sockaddr_in6 *) sa; - if (!inet_ntop(AF_INET, &in->sin6_addr, ip, sizeof(ip))) + if (!inet_ntop(AF_INET6, &in->sin6_addr, ip, sizeof(ip))) ip[0] = 0; port = ntohs(in->sin6_port); snprintf(addr, sizeof(addr), "%s:%d", ip, port); diff --git a/lib_acl_cpp/changes.txt b/lib_acl_cpp/changes.txt index ff6bdc649..2522f03eb 100644 --- a/lib_acl_cpp/changes.txt +++ b/lib_acl_cpp/changes.txt @@ -7,6 +7,7 @@ 477) 2017.5.25 477.1) bugfix: connect_manager::remove 中存在一处删除问题 +--- "fuwangqin" 476) 2017.5.22 476.1) performance: http_client.cpp 支持缓冲方式写数据,从而提升了写的性能