add samples for recvmmsg/sendmmsg

This commit is contained in:
zhengshuxin 2017-05-29 11:47:52 +08:00
parent 67e2a99eee
commit 183c0a6a3a
17 changed files with 1126 additions and 1 deletions

View File

@ -0,0 +1,156 @@
CC = $(ENV_CPP)
CFLAGS = -c -g -W \
-Wall \
-Wcast-qual \
-Waggregate-return \
-Wno-long-long \
-Wpointer-arith \
-Werror \
-Wshadow \
-O2 \
-D_REENTRANT \
-D_POSIX_PTHREAD_SEMANTICS \
-D_USE_FAST_MACRO
#-Wcast-align
###########################################################
#Check system:
# Linux, SunOS, Solaris, BSD variants, AIX, HP-UX
SYSLIB = -lpthread
CHECKSYSRES = @echo "Unknow system type!";exit 1
UNIXNAME = $(shell uname -sm)
ifeq ($(CC),)
CC = g++
endif
ifeq ($(findstring gcc, $(CC)), gcc)
CFLAGS += -Wstrict-prototypes
endif
ifeq ($(findstring clang, $(CC)), clang)
CFLAGS += -Wstrict-prototypes \
-Wno-invalid-source-encoding \
-Wno-extended-offsetof
endif
ifeq ($(findstring clang++, $(CC)), clang++)
CFLAGS += -Wno-invalid-source-encoding \
-Wno-extended-offsetof
endif
# For FreeBSD
ifeq ($(findstring FreeBSD, $(UNIXNAME)), FreeBSD)
CFLAGS += -DFREEBSD -pedantic
SYSLIB += -lcrypt
endif
# For Darwin
ifeq ($(findstring Darwin, $(UNIXNAME)), Darwin)
CFLAGS += -DMACOSX -Wno-invalid-source-encoding \
-Wno-extended-offsetof
UNIXTYPE = MACOSX
endif
#Path for Linux
ifeq ($(findstring Linux, $(UNIXNAME)), Linux)
# CFLAGS += -DLINUX2 -pedantic
CFLAGS += -pedantic
SYSLIB += -lcrypt
endif
# For MINGW
ifeq ($(findstring MINGW, $(UNIXNAME)), MINGW)
SYSLIB = -lpthread-2
CFLAGS += -DLINUX2 -DMINGW
UNIXTYPE = LINUX
endif
# For MSYS
ifeq ($(findstring MSYS, $(UNIXNAME)), MSYS)
SYSLIB = -lpthread-2
CFLAGS += -DLINUX2 -DMINGW
UNIXTYPE = LINUX
endif
#Path for SunOS
ifeq ($(findstring SunOS, $(UNIXNAME)), SunOS)
ifeq ($(findstring 86, $(UNIXNAME)), 86)
SYSLIB += -lsocket -lnsl -lrt
endif
ifeq ($(findstring sun4u, $(UNIXNAME)), sun4u)
SYSLIB += -lsocket -lnsl -lrt
endif
CFLAGS += -DSUNOS5 -pedantic
SYSLIB += -lcrypt
endif
#Path for HP-UX
ifeq ($(findstring HP-UX, $(UNIXNAME)), HP-UX)
CFLAGS += -DHP_UX -DHPUX11
PLAT_NAME=hp-ux
SYSLIB += -lcrypt
endif
# For Darwin
ifeq ($(findstring Darwin, $(UNIXNAME)), Darwin)
CFLAGS += -DMACOSX
endif
#Find system type.
ifneq ($(SYSPATH),)
CHECKSYSRES = @echo "System is $(shell uname -sm)"
endif
###########################################################
BASE_PATH =
ifneq ($(base_path),)
BASE_PATH = $(base_path)
else
BASE_PATH = ../..
endif
ACL_PATH = $(BASE_PATH)
ACL_INC = $(ACL_PATH)/include
ACL_LIB = $(ACL_PATH)/lib
EXTLIBS =
#CFLAGS += -I$(ACL_INC) -I$(PROTO_INC)
#LDFLAGS = -L$(ACL_LIB) -L$(PROTO_LIB) -l_protocol -lacl $(EXTLIBS) $(SYSLIB)
CFLAGS += -I$(ACL_INC) -I.. -I.
LDFLAGS = -L$(ACL_LIB) -lacl $(EXTLIBS) $(SYSLIB)
###########################################################
OUT_PATH = .
OBJ_PATH = $(OUT_PATH)
#Project's objs
SRC = $(wildcard *.cpp) $(wildcard ../*.cpp)
OBJ = $(patsubst %.cpp, $(OBJ_PATH)/%.o, $(notdir $(SRC)))
###########################################################
.PHONY = all clean
PROG =
COMPILE = $(CC) $(CFLAGS)
# -Wl,-rpath,$(ACL_LIB) -Wl,-rpath,$(PROTO_LIB) -o $(OBJ_PATH)/$(PROG)
all: RM $(OBJ)
$(CC) $(OBJ) $(LDFLAGS) -o $(OBJ_PATH)/$(PROG)
@echo ""
@echo "All ok! Output:$(PROG)"
@echo ""
$(OBJ_PATH)/%.o: %.cpp
$(COMPILE) $< -o $@
$(OBJ_PATH)/%.o: ../%.cpp
$(COMPILE) $< -o $@
RM:
rm -f $(PROG)
clean:
rm -f $(PROG)
rm -f $(OBJ)
###########################################################

View File

@ -0,0 +1,4 @@
base_path = ../../..
include ../Makefile.in
CFLAGS += -Wno-write-strings -Wno-pedantic
PROG = client

View File

@ -0,0 +1,272 @@
#include "stdafx.h"
#include "udp.h"
static double stamp_sub(const struct timeval *from, const struct timeval *sub_by)
{
struct timeval res;
memcpy(&res, from, sizeof(struct timeval));
res.tv_usec -= sub_by->tv_usec;
if (res.tv_usec < 0) {
--res.tv_sec;
res.tv_usec += 1000000;
}
res.tv_sec -= sub_by->tv_sec;
return res.tv_sec * 1000.0 + res.tv_usec/1000.0;
}
static int sio(SOCK_UDP *sock, const char *data, int dlen, int count,
int inter, bool need_read)
{
int i, ret;
char buf[4096];
for (i = 0; i < count; i++) {
ret = udp_send(sock, data, dlen);
if (ret == -1) {
printf("sock_write error %s\r\n", acl_last_serror());
break;
}
if (need_read) {
ret = udp_read(sock, buf, sizeof(buf) - 1);
if (ret == -1) {
printf("sock_read error %s\r\n",
acl_last_serror());
break;
} else
buf[ret] = 0;
if (i % inter == 0)
printf("result: %s\r\n", buf);
}
if (i % inter == 0) {
snprintf(buf, sizeof(buf),
"total: %d, curr: %d, dlen=%d",
count, i, dlen);
ACL_METER_TIME(buf);
}
}
return i;
}
static int mio(SOCK_UDP *sock, const char *peer_addr, int dlen,
int count, int inter, bool echo)
{
#define PKT_CNT 2
#define DAT_LEN 100
char bufs[PKT_CNT][DAT_LEN];
SOCK_PKT pkts[PKT_CNT];
int n = 0, total_write = 0;
if (dlen > DAT_LEN)
dlen = DAT_LEN;
for (int j = 0; j < PKT_CNT; j++) {
memset(&bufs[j], 'X', dlen - 2);
bufs[j][dlen - 1] = 0;
udp_pkt_set_buf(&pkts[j], bufs[j], DAT_LEN);
udp_pkt_set_peer(&pkts[j], peer_addr);
}
for (int i = 0; i < count; i++) {
for (int j = 0; j < PKT_CNT; j++) {
PKT_IOV_LEN(&pkts[j]) = DAT_LEN;
}
int nout = udp_msend(sock, pkts, PKT_CNT);
if (nout < 0) {
printf("sock_mwrite error %s\r\n", acl_last_serror());
break;
}
total_write += nout;
int nin = 0;
if (echo) {
nin = udp_mread(sock, pkts, PKT_CNT);
if (nin < 0) {
printf("sock_mread error %s\r\n", acl_last_serror());
break;
}
}
if (++n % inter == 0) {
char buf[256], ip[64];
snprintf(buf, sizeof(buf), "total: %d, curr: %d, "
"nout: %d, nin: %d, dlen: %d, ip: %s, port: %d",
total_write, n, nout, nin, dlen,
udp_ip(sock, nin > 0 ? n % nin : 0,
ip, sizeof(ip)),
udp_port(sock, nin > 0 ? n % nin : 0));
ACL_METER_TIME(buf);
}
}
return n;
}
static void run(const char *local_addr, const char *peer_addr,
int count, int dlen, int inter, int need_read, int mio_on, int quit)
{
int i;
double spent;
char data[4096];
struct timeval begin, end;
SOCK_UDP *sock = udp_client_open(local_addr, peer_addr);
(void) quit;
if (sock == NULL) {
printf("open_sock error!\r\n");
return;
}
if (dlen > (int) sizeof(data) - 1)
dlen = (int) sizeof(data) - 1;
for (i = 0; i < dlen; i++)
data[i] = 'X';
data[dlen] = 0;
gettimeofday(&begin, NULL);
if (mio_on)
i = mio(sock, peer_addr, dlen, count, inter, need_read);
else
i = sio(sock, data, dlen, count, inter, need_read);
gettimeofday(&end, NULL);
spent = stamp_sub(&end, &begin);
printf("thread: %lu, total: %d, curr: %d, spent: %.2f, speed: %.2f\r\n",
(unsigned long) acl_pthread_self(), count, i, spent,
(i * 1000) / (spent > 1 ? spent : 1));
printf("thread: %lu, peer addr: %s\r\n",
(unsigned long) acl_pthread_self(), peer_addr);
udp_close(sock);
}
typedef struct {
char local[64];
char peer[64];
int count;
int dlen;
int inter;
int need_read;
int mio_on;
int quit;
} THREAD_CTX;
static void thread_run(void *ctx)
{
THREAD_CTX *tc = (THREAD_CTX*) ctx;
run(tc->local, tc->peer, tc->count, tc->dlen,
tc->inter, tc->need_read, tc->mio_on, tc->quit);
acl_myfree(ctx);
}
static void usage(const char *procname)
{
printf("usage: %s -h [help]\r\n"
" -s server_addr [default: 127.0.0.1:8888]\r\n"
" -l local_addr [default: 127.0.0.1:0]\r\n"
" -t thread_count [default: 1]\r\n"
" -i print_per_loop [default: 1000]\r\n"
" -N data_len [default: 100]\r\n"
" -r if_need_read [default: false]\r\n"
" -m if_mio_on[default: false]\r\n"
" -q if_send_quit when over [default: false]\r\n"
" -n loop_count [default: 1]\r\n", procname);
}
int main(int argc, char *argv[])
{
char peer[64], local[64];
int ch, count = 1, dlen = 100, inter = 1000, nthreads = 1, quit = 0;
int need_read = 0, mio_on = 0;
acl_lib_init();
acl_msg_stdout_enable(1);
snprintf(peer, sizeof(peer), "127.0.0.1:8888");
snprintf(local, sizeof(local), "127.0.0.1:0");
while ((ch = getopt(argc, argv, "hl:s:n:N:i:t:rqm")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
return 0;
case 'l':
snprintf(local, sizeof(local), "%s", optarg);
break;
case 's':
snprintf(peer, sizeof(peer), "%s", optarg);
break;
case 'n':
count = atoi(optarg);
break;
case 'N':
dlen = atoi(optarg);
break;
case 'i':
inter = atoi(optarg);
break;
case 't':
nthreads = atoi(optarg);
break;
case 'r':
need_read = 1;
break;
case 'q':
quit = 1;
break;
case 'm':
mio_on = 1;
break;
default:
break;
}
}
if (peer[0] == 0 || local[0] == 0) {
usage(argv[0]);
return 1;
}
if (nthreads > 1) {
int i;
acl_pthread_pool_t *threads =
acl_thread_pool_create(nthreads, 120);
for (i = 0; i < nthreads; i++) {
THREAD_CTX *ctx = (THREAD_CTX*)
acl_mymalloc(sizeof(THREAD_CTX));
snprintf(ctx->local, sizeof(ctx->local), "%s", local);
snprintf(ctx->peer, sizeof(ctx->peer), "%s", peer);
ctx->count = count;
ctx->dlen = dlen;
ctx->inter = inter;
ctx->need_read = need_read;
ctx->mio_on = mio_on;
ctx->quit = quit;
acl_pthread_pool_add(threads, thread_run, ctx);
}
acl_pthread_pool_destroy(threads);
} else
run(local, peer, count, dlen, inter, need_read, mio_on, quit);
printf("\r\nlocal: %s, peer: %s, count: %d, dlen: %d, inter: %d\r\n",
local, peer, count, dlen, inter);
acl_lib_end();
return 0;
}

View File

@ -0,0 +1,2 @@
#!/bin/sh
./client -i 50000 -n 10000000 -N 100 -m -r

View File

@ -0,0 +1 @@
#include "stdafx.h"

View File

@ -0,0 +1,12 @@
#ifndef __STDAFX_INCLUDE_H__
#define __STDAFX_INCLUDE_H__
#include "lib_acl.h"
#ifdef WIN32
# ifndef snprintf
# define snprintf _snprintf
# endif
#endif
#endif

View File

@ -0,0 +1,2 @@
#!/bin/sh
./client -i 100000 -n 10000000 -N 100 -r

View File

@ -0,0 +1,4 @@
base_path = ../../..
include ../Makefile.in
CFLAGS += -Wno-write-strings -Wno-pedantic
PROG = server

View File

@ -0,0 +1,204 @@
#include "stdafx.h"
#include "udp.h"
static void sio(SOCK_UDP *sock, int inter, bool echo)
{
int i = 0, ret;
char buf[4096];
while (true) {
ret = udp_read(sock, buf, sizeof(buf) - 1);
if (ret == -1) {
printf("sock_read error %s\r\n", acl_last_serror());
break;
} else
buf[ret] = 0;
if (++i % inter == 0)
printf("result: %s\r\n", buf);
if (echo && (ret = udp_send(sock, buf, ret)) == -1) {
printf("sock_write error %s\r\n", acl_last_serror());
break;
}
if (i % inter == 0) {
snprintf(buf, sizeof(buf), "curr: %d, dlen=%d", i, ret);
ACL_METER_TIME(buf);
}
}
}
static void mio(SOCK_UDP *sock, int inter, bool echo)
{
#define PKT_CNT 20
#define BUF_LEN 1024
char bufs[PKT_CNT][BUF_LEN];
SOCK_PKT pkts[PKT_CNT];
int i, nread = 0, nwrite = 0, total_read = 0, n = 0;
while (true) {
for (i = 0; i < PKT_CNT; i++) {
bufs[i][0] = 0;
udp_pkt_set_buf(&pkts[i], bufs[i], BUF_LEN);
}
int ret = udp_mread(sock, pkts, PKT_CNT);
printf(">>.ret: %d\r\n", ret);
if (ret <= 0) {
printf(">>read error ret: %d, errno: %d, %d\r\n",
ret, errno, EMSGSIZE);
break;
}
nread++;
total_read += ret;
for (i = 0; i < ret; i++) {
PKT_IOV_LEN(&pkts[i]) = SOCK_PKT_LEN(sock, i);
#if 0
printf("[%s], addr_len: %d, %lu\r\n",
(char *) PKT_IOV_DAT(&pkts[i]),
(int) pkts[i].addr_len, sizeof(pkts[i].addr));
struct sockaddr_in *in = (struct sockaddr_in *)
sock->msgvec[i].msg_hdr.msg_name;
printf("ip: %s, port: %d\r\n",
inet_ntoa(in->sin_addr), ntohs(in->sin_port));
#endif
}
if (echo) {
ret = udp_msend(sock, pkts, ret);
if (ret <= 0) {
printf("sock_mwrite error %s, ret: %d\r\n",
acl_last_serror(), ret);
printf("len: %d\r\n", SOCK_PKT_LEN(sock, 0));
break;
}
nwrite++;
}
if (++n % inter == 0) {
char buf[256], ip[64], k;
k = ret > 0 ? time(NULL) % ret : 0;
snprintf(buf, sizeof(buf),
"curr: %d, total: %d, ret: %d, dlen: %ld, "
"nread: %d, nwrite: %d, ip: %s, port-%d: %d",
i, total_read, ret, PKT_IOV_LEN(&pkts[0]),
nread, nwrite,
udp_ip(sock, k, ip, sizeof(ip)),
k, udp_port(sock, k));
ACL_METER_TIME(buf);
}
}
}
static void run(const char *local_addr, int inter, int mio_on, int need_echo)
{
SOCK_UDP *sock = udp_server_open(local_addr);
if (sock == NULL) {
printf("open_sock error!\r\n");
return;
}
if (mio_on)
mio(sock, inter, need_echo);
else
sio(sock, inter, need_echo);
udp_close(sock);
}
typedef struct {
char local[64];
int inter;
int mio_on;
int need_echo;
} THREAD_CTX;
static void thread_run(void *ctx)
{
THREAD_CTX *tc = (THREAD_CTX*) ctx;
run(tc->local, tc->inter, tc->mio_on, tc->need_echo);
acl_myfree(ctx);
}
static void usage(const char *procname)
{
printf("usage: %s -h [help]\r\n"
" -s server_addr [default: 127.0.0.1:8888]\r\n"
" -t thread_count [default: 1]\r\n"
" -i print_per_loop [default: 1000]\r\n"
" -m if_mio_on[default: false]\r\n"
" -e if_echo [default: false]\r\n", procname);
}
int main(int argc, char *argv[])
{
char local[64];
int ch, inter = 1000, nthreads = 1;
int need_echo = 0, mio_on = 0;
acl_lib_init();
acl_msg_stdout_enable(1);
snprintf(local, sizeof(local), "127.0.0.1:8888");
while ((ch = getopt(argc, argv, "hs:i:t:em")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
return 0;
case 's':
snprintf(local, sizeof(local), "%s", optarg);
break;
case 'i':
inter = atoi(optarg);
break;
case 't':
nthreads = atoi(optarg);
break;
case 'e':
need_echo = 1;
break;
case 'm':
mio_on = 1;
break;
default:
break;
}
}
if (local[0] == 0) {
usage(argv[0]);
return 1;
}
if (nthreads > 1) {
int i;
acl_pthread_pool_t *threads =
acl_thread_pool_create(nthreads, 120);
for (i = 0; i < nthreads; i++) {
THREAD_CTX *ctx = (THREAD_CTX*)
acl_mymalloc(sizeof(THREAD_CTX));
snprintf(ctx->local, sizeof(ctx->local), "%s", local);
ctx->inter = inter;
ctx->need_echo = need_echo;
ctx->mio_on = mio_on;
acl_pthread_pool_add(threads, thread_run, ctx);
}
acl_pthread_pool_destroy(threads);
} else
run(local, inter, mio_on, need_echo);
acl_lib_end();
return 0;
}

View File

@ -0,0 +1,2 @@
#!/bin/sh
./server -i 50000 -m -e

View File

@ -0,0 +1 @@
#include "stdafx.h"

View File

@ -0,0 +1,12 @@
#ifndef __STDAFX_INCLUDE_H__
#define __STDAFX_INCLUDE_H__
#include "lib_acl.h"
#ifdef WIN32
# ifndef snprintf
# define snprintf _snprintf
# endif
#endif
#endif

View File

@ -0,0 +1,2 @@
#!/bin/sh
./server -i 100000 -e

377
lib_acl/samples/udp/udp.cpp Normal file
View File

@ -0,0 +1,377 @@
/**
* Copyright (C) 2015-2018
* All rights reserved.
*
* AUTHOR(S)
* Zheng Shuxin
* E-mail: zhengshuxin@qiyi.com
*
* VERSION
* Thu 25 May 2017 05:25:43 PM CST
*/
#include "stdafx.h"
#include "udp.h"
static bool host_port(char *buf, char **host, char **port)
{
const char *ptr = acl_host_port(buf, host, "", port, (char*) NULL);
if (ptr != NULL) {
acl_msg_error("%s(%d): invalid addr %s, %s",
__FILE__, __LINE__, buf, ptr);
return false;
}
if (*port == NULL || atoi(*port) < 0) {
acl_msg_error("%s(%d): invalid port: %s, addr: %s",
__FILE__, __LINE__, *port ? *port : "null", buf);
return false;
}
if (*host && **host == 0)
*host = 0;
if (*host == NULL)
*host = "0";
return true;
}
static struct addrinfo *host_addrinfo(const char *addr)
{
int err;
struct addrinfo hints, *res0;
char *buf = acl_mystrdup(addr), *host = NULL, *port = NULL;
if (host_port(buf, &host, &port) == false) {
acl_myfree(buf);
return NULL;
}
memset(&hints, 0, sizeof(hints));
hints.ai_family = PF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
#ifdef ACL_MACOSX
hints.ai_flags = AI_DEFAULT;
#elif defined(ACL_ANDROID)
hints.ai_flags = AI_ADDRCONFIG;
#elif defined(ACL_WINDOWS)
hints.ai_protocol = IPPROTO_UDP;
# if _MSC_VER >= 1500
hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG;
# endif
#else
hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG;
#endif
if ((err = getaddrinfo(host, port, &hints, &res0))) {
acl_msg_error("%s(%d): getaddrinfo error %s, peer=%s",
__FILE__, __LINE__, gai_strerror(err), host);
acl_myfree(buf);
return NULL;
}
acl_myfree(buf);
return res0;
}
static int bind_addr(struct addrinfo *res0, struct addrinfo **res)
{
struct addrinfo *it;
int on, fd;
for (it = res0; it != NULL ; it = it->ai_next) {
fd = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
if (fd == ACL_SOCKET_INVALID) {
acl_msg_error("%s(%d): create socket %s",
__FILE__, __LINE__, acl_last_serror());
return ACL_SOCKET_INVALID;
}
on = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
(const void *) &on, sizeof(on)) < 0)
{
acl_msg_warn("%s(%d): setsockopt(SO_REUSEADDR): %s",
__FILE__, __LINE__, acl_last_serror());
}
#if defined(SO_REUSEPORT)
on = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT,
(const void *) &on, sizeof(on)) < 0)
{
acl_msg_warn("%s(%d): setsocket(SO_REUSEPORT): %s",
__FILE__, __LINE__, acl_last_serror());
}
#endif
#ifdef ACL_WINDOWS
if (bind(fd, it->ai_addr, (int) it->ai_addrlen) == 0)
#else
if (bind(fd, it->ai_addr, it->ai_addrlen) == 0)
#endif
{
*res = it;
return fd;
}
acl_msg_error("%s(%d): bind error %s",
__FILE__, __LINE__, acl_last_serror());
acl_socket_close(fd);
}
return ACL_SOCKET_INVALID;
}
static SOCK_UDP *sock_open(const char *addr)
{
SOCK_UDP *sock;
struct addrinfo *res0, *res;
int fd;
res0 = host_addrinfo(addr);
if (res0 == NULL)
return NULL;
fd = bind_addr(res0, &res);
if (fd == ACL_SOCKET_INVALID) {
acl_msg_error("%s(%d): invalid socket", __FILE__, __LINE__);
freeaddrinfo(res0);
return NULL;
}
sock = (SOCK_UDP *) acl_mycalloc(1, sizeof(SOCK_UDP));
sock->fd = fd;
memcpy(&sock->sa_local, res->ai_addr, res->ai_addrlen);
sock->sa_local_len = res->ai_addrlen;
freeaddrinfo(res0);
return sock;
}
SOCK_UDP *udp_client_open(const char *local, const char *peer)
{
struct addrinfo *peer_res0 = host_addrinfo(peer);
SOCK_UDP *sock;
if (peer_res0 == NULL)
return NULL;
sock = sock_open(local);
if (sock == NULL) {
freeaddrinfo(peer_res0);
return NULL;
}
memcpy(&sock->sa_peer, peer_res0->ai_addr, peer_res0->ai_addrlen);
sock->sa_peer_len = peer_res0->ai_addrlen;
freeaddrinfo(peer_res0);
return sock;
}
SOCK_UDP *udp_server_open(const char *local)
{
return sock_open(local);
}
void udp_close(SOCK_UDP *sock)
{
acl_socket_close(sock->fd);
acl_myfree(sock);
}
int udp_read(SOCK_UDP *sock, void *buf, size_t size)
{
ssize_t ret;
sock->sa_peer_len = sizeof(sock->sa_peer);
ret = recvfrom(sock->fd, buf, size, 0,
(struct sockaddr *) &sock->sa_peer, &sock->sa_peer_len);
return ret;
}
int udp_send(SOCK_UDP *sock, const void *data, size_t len)
{
ssize_t ret = sendto(sock->fd, data, len, 0,
(struct sockaddr *) &sock->sa_peer, sock->sa_peer_len);
return ret;
}
void udp_pkt_set_buf(SOCK_PKT *pkt, char *buf, size_t len)
{
pkt->iov.iov_base = buf;
pkt->iov.iov_len = len;
pkt->addr_len = (socklen_t) sizeof(SOCK_ADDR);
}
int udp_pkt_set_peer(SOCK_PKT *pkt, const char *addr)
{
struct addrinfo *peer_res0 = host_addrinfo(addr);
if (peer_res0 == NULL)
return -1;
memcpy(&pkt->addr, peer_res0->ai_addr, peer_res0->ai_addrlen);
pkt->addr_len = peer_res0->ai_addrlen;
freeaddrinfo(peer_res0);
return 0;
}
int udp_mread(SOCK_UDP *sock, SOCK_PKT pkts[], size_t pkts_cnt)
{
unsigned int flags = MSG_WAITFORONE /* | MSG_DONTWAIT */;
size_t i;
if (sock->msgvec == NULL) {
sock->vlen = pkts_cnt;
sock->msgvec = (struct mmsghdr *)
acl_mycalloc(pkts_cnt, sizeof(struct mmsghdr));
} else if (sock->vlen < pkts_cnt) {
acl_myfree(sock->msgvec);
sock->vlen = pkts_cnt;
sock->msgvec = (struct mmsghdr *)
acl_mycalloc(pkts_cnt, sizeof(struct mmsghdr));
}
sock->pkts = pkts;
sock->pkts_cnt = pkts_cnt;
memset(sock->msgvec, 0, sizeof(struct mmsghdr) * pkts_cnt);
for (i = 0; i < pkts_cnt; i++) {
sock->msgvec[i].msg_hdr.msg_iov = &pkts[i].iov;
sock->msgvec[i].msg_hdr.msg_iovlen = 1;
sock->msgvec[i].msg_hdr.msg_name = &pkts[i].addr;
sock->msgvec[i].msg_hdr.msg_namelen = sizeof(pkts[i].addr);
sock->msgvec[i].msg_len = 0;
}
return recvmmsg(sock->fd, sock->msgvec, pkts_cnt, flags, NULL);
}
int udp_msend(SOCK_UDP *sock, SOCK_PKT pkts[], size_t pkts_cnt)
{
size_t i;
unsigned int flags = 0;
#ifndef __linux3__
int n = 0;
#endif
if (sock->msgvec == NULL) {
sock->vlen = pkts_cnt;
sock->msgvec = (struct mmsghdr *)
acl_mycalloc(pkts_cnt, sizeof(struct mmsghdr));
} else if (sock->vlen < pkts_cnt) {
acl_myfree(sock->msgvec);
sock->vlen = pkts_cnt;
sock->msgvec = (struct mmsghdr *)
acl_mycalloc(pkts_cnt, sizeof(struct mmsghdr));
}
sock->pkts = pkts;
sock->pkts_cnt = pkts_cnt;
memset(sock->msgvec, 0, sizeof(struct mmsghdr) * pkts_cnt);
for (i = 0; i < pkts_cnt; i++) {
sock->msgvec[i].msg_hdr.msg_iov = &pkts[i].iov;
sock->msgvec[i].msg_hdr.msg_iovlen = 1;
sock->msgvec[i].msg_hdr.msg_name = &pkts[i].addr;
sock->msgvec[i].msg_hdr.msg_namelen = sizeof(pkts[i].addr);
sock->msgvec[i].msg_len = 0;
#ifndef __linux3__
if (sendmsg(sock->fd, &sock->msgvec[i].msg_hdr, flags) < 0)
return -1;
n++;
#endif
}
#ifdef __linux3__
return sendmmsg(sock->fd, sock->msgvec, pkts_cnt, flags);
#else
return n;
#endif
}
int pkt_port(SOCK_PKT *pkt)
{
if (pkt->addr.sa.sa.sa_family == AF_INET) {
struct sockaddr_in *in = &pkt->addr.sa.in;
return ntohs(in->sin_port);
}
#ifdef AF_INET6
else if (pkt->addr.sa.sa.sa_family == AF_INET6) {
struct sockaddr_in6 *in = &pkt->addr.sa.in6;
return ntohl(in->sin6_port);
}
#endif
else {
acl_msg_error("%s(%d): unkown sa_family=%d",
__FUNCTION__, __LINE__, pkt->addr.sa.sa.sa_family);
return -1;
}
}
int udp_port(SOCK_UDP *sock, size_t i)
{
if (sock->msgvec == NULL) {
acl_msg_error("%s(%d): msgvec NULL", __FUNCTION__, __LINE__);
return -1;
}
if (sock->pkts == NULL) {
acl_msg_error("%s(%d): pkts NULL", __FUNCTION__, __LINE__);
return -1;
}
if (sock->pkts_cnt == 0) {
acl_msg_error("%s(%d): pkts_cnt 0", __FUNCTION__, __LINE__);
return -1;
}
if (i >= sock->pkts_cnt) {
acl_msg_error("%s(%d): invalid i=%d >= %d",
__FUNCTION__, __LINE__, (int) i, (int) sock->pkts_cnt);
return -1;
}
return pkt_port(&sock->pkts[i]);
}
const char *pkt_ip(SOCK_PKT *pkt, char *buf, size_t size)
{
if (pkt->addr.sa.sa.sa_family == AF_INET) {
struct sockaddr_in *in = &pkt->addr.sa.in;
return inet_ntop(in->sin_family, &in->sin_addr, buf, size);
}
#ifdef AF_INET6
else if (pkt->addr.sa.sa.sa_family == AF_INET6) {
struct sockaddr_in6 *in = &pkt->addr.sa.in6;
return inet_ntop(in->sin6_family, &in->sin6_addr, buf, size);
}
#endif
else {
acl_msg_error("%s(%d): unkown sa_family=%d",
__FUNCTION__, __LINE__, pkt->addr.sa.sa.sa_family);
return NULL;
}
}
const char *udp_ip(SOCK_UDP *sock, size_t i, char *buf, size_t size)
{
if (sock->msgvec == NULL) {
acl_msg_error("%s(%d): msgvec NULL", __FUNCTION__, __LINE__);
return NULL;
}
if (sock->pkts == NULL) {
acl_msg_error("%s(%d): pkts NULL", __FUNCTION__, __LINE__);
return NULL;
}
if (sock->pkts_cnt == 0) {
acl_msg_error("%s(%d): pkts_cnt 0", __FUNCTION__, __LINE__);
return NULL;
}
if (i >= sock->pkts_cnt) {
acl_msg_error("%s(%d): invalid i=%d >= %d",
__FUNCTION__, __LINE__, (int) i, (int) sock->pkts_cnt);
return NULL;
}
return pkt_ip(&sock->pkts[i], buf, size);
}

73
lib_acl/samples/udp/udp.h Normal file
View File

@ -0,0 +1,73 @@
/**
* Copyright (C) 2015-2018
* All rights reserved.
*
* AUTHOR(S)
* Zheng Shuxin
* E-mail: zhengshuxin@qiyi.com
*
* VERSION
* Thu 25 May 2017 05:26:42 PM CST
*/
#pragma once
#include <sys/socket.h>
#include <sys/types.h>
#include <netdb.h>
#include <sys/un.h>
typedef struct SOCK_ADDR {
union {
struct sockaddr_storage ss;
#ifdef AF_INET6
struct sockaddr_in6 in6;
#endif
struct sockaddr_in in;
#ifdef ACL_UNIX
struct sockaddr_un un;
#endif
struct sockaddr sa;
} sa;
} SOCK_ADDR;
typedef struct SOCK_PKT {
struct iovec iov;
SOCK_ADDR addr;
socklen_t addr_len;
} SOCK_PKT;
typedef struct SOCK_UDP {
int fd;
SOCK_ADDR sa_local;
socklen_t sa_local_len;
SOCK_ADDR sa_peer;
socklen_t sa_peer_len;
SOCK_PKT *pkts;
size_t pkts_cnt;
struct mmsghdr *msgvec;
size_t vlen;
} SOCK_UDP;
SOCK_UDP *udp_client_open(const char* local, const char *peer);
SOCK_UDP *udp_server_open(const char* local);
void udp_close(SOCK_UDP *sock);
int udp_send(SOCK_UDP *sock, const void *data, size_t len);
int udp_read(SOCK_UDP *sock, void *buf, size_t size);
void udp_pkt_set_buf(SOCK_PKT *pkt, char *buf, size_t len);
int udp_pkt_set_peer(SOCK_PKT *pkt, const char *addr);
int udp_mread(SOCK_UDP *sock, SOCK_PKT pkts[], size_t pkts_cnt);
int udp_msend(SOCK_UDP *sock, SOCK_PKT pkts[], size_t pkts_cnt);
int pkt_port(SOCK_PKT *pkt);
int udp_port(SOCK_UDP *sock, size_t i);
const char *pkt_ip(SOCK_PKT *pkt, char *buf, size_t size);
const char *udp_ip(SOCK_UDP *sock, size_t i, char *buf, size_t size);
#define PKT_IOV_DAT(pp) ((pp)->iov.iov_base)
#define PKT_IOV_LEN(pp) ((pp)->iov.iov_len)
#define SOCK_PKT_LEN(ss, ii) ((ss)->msgvec[(ii)].msg_len)

View File

@ -3077,7 +3077,7 @@ void acl_vstream_set_local_addr(ACL_VSTREAM *fp, const struct sockaddr *sa)
int port;
struct sockaddr_in6 *in = (struct sockaddr_in6 *) sa;
if (!inet_ntop(AF_INET, &in->sin6_addr, ip, sizeof(ip)))
if (!inet_ntop(AF_INET6, &in->sin6_addr, ip, sizeof(ip)))
ip[0] = 0;
port = ntohs(in->sin6_port);
snprintf(addr, sizeof(addr), "%s:%d", ip, port);

View File

@ -7,6 +7,7 @@
477) 2017.5.25
477.1) bugfix: connect_manager::remove 中存在一处删除问题
--- "fuwangqin" <niukey@qq.com>
476) 2017.5.22
476.1) performance: http_client.cpp 支持缓冲方式写数据,从而提升了写的性能