Merge branch 'gitee-master' into gitlab-upstream

This commit is contained in:
zhengshuxin 2023-09-20 18:42:49 +08:00
commit d9d7166c72
26 changed files with 779 additions and 44 deletions

View File

@ -62,6 +62,10 @@ service server
# master_env = logme:FALSE, priority:E_LOG_INFO, action:E_LOG_PER_DAY, flush:sync_flush, imit_size:512,\
# sync_action:E_LOG_SEM, sem_name:/tmp/udp_echo.sem
# 设置组播地址当该地址非空时则自动切换成组播模式
# master_multicast_addr =
# 是否允许组播发送数据时可以回送即打开IP_MULTICAST_LOOP
# master_multicase_loopback = 0
# 在启动时如果绑定某个地址失败是否让进程崩溃
# udp_fatal_on_bind_error = 0
# 启动时创建的固定线程数

View File

@ -42,16 +42,30 @@ master_service::~master_service(void)
{
}
static acl::atomic_long __counter;
void master_service::on_read(acl::socket_stream* stream)
{
int n;
char buf[4096];
char buf[512];
if ((n = stream->read(buf, sizeof(buf), false)) == -1) {
return;
}
logger("read from %s, %d bytes", stream->get_peer(), n);
long long cnt = ++__counter;
if (cnt < 100) {
buf[n] = 0;
logger("read from %s, %d bytes: %s", stream->get_peer(), n, buf);
} else if (cnt % 10000 == 0) {
buf[n] = 0;
char tmp[1024];
snprintf(tmp, sizeof(tmp), "count=%lld, %s", cnt, buf);
acl::meter_time("Read udp", __LINE__, tmp);
}
stream->write(buf, n);
}

View File

@ -38,6 +38,8 @@ extern int acl_var_udp_monitor_netlink;
extern int acl_var_udp_non_block;
extern char *acl_var_udp_reuse_port;
extern char *acl_var_udp_private;
extern char *acl_var_udp_multicast_addr;
extern int acl_var_udp_multicast_loopback;
#ifdef __cplusplus
}

View File

@ -16,6 +16,7 @@ extern "C" {
#define ACL_INET_FLAG_REUSEPORT (1 << 1)
#define ACL_INET_FLAG_FASTOPEN (1 << 2)
#define ACL_INET_FLAG_EXCLUSIVE (1 << 3)
#define ACL_INET_FLAG_MULTILOOP_ON (1 << 4)
/**
*

View File

@ -117,6 +117,43 @@ ACL_API ACL_VSTREAM *acl_vstream_bind(const char *addr, int rw_timeout, unsigned
*/
ACL_API void acl_vstream_set_udp_io(ACL_VSTREAM *stream);
/**
*
* @param addr {const char*} IP地址
* @param iface {const char*} IP
* @param port {int} Port
* @param timeout {int} IO
* @param flag {unsigned}
* @return {ACL_VSTREAM*} NULL
*/
ACL_API ACL_VSTREAM *acl_vstream_bind_multicast(const char *addr,
const char *iface, int port, int timeout, unsigned flag);
/**
* TTL (1--255)
* @param sock {ACL_SOCKET}
* @param ttl {int}
* @return {int} -1 0
*/
ACL_API int acl_multicast_set_ttl(ACL_SOCKET sock, int ttl);
/**
*
* @param sock {ACL_SOCKET}
* @param addr {const char*}
* @return {int} 0 -1
*/
ACL_API int acl_multicast_set_if(ACL_SOCKET sock, const char *addr);
/**
* API
* @param sock {ACL_SOCKET}
* @param addr {const char*} IP
* @param iface {const char*} IP
* @return {int} 0 -1
*/
ACL_API int acl_multicast_drop(ACL_SOCKET sock, const char *addr, const char *iface);
#ifdef __cplusplus
}
#endif

View File

@ -106,12 +106,14 @@ int acl_var_udp_threads_detached;
int acl_var_udp_non_block;
int acl_var_udp_fatal_on_bind_error;
int acl_var_udp_monitor_netlink;
int acl_var_udp_multicast_loopback;
static ACL_CONFIG_BOOL_TABLE __conf_bool_tab[] = {
{ "udp_threads_detached", 1, &acl_var_udp_threads_detached },
{ "master_nonblock", 1, &acl_var_udp_non_block },
{ "udp_fatal_on_bind_error", 0, &acl_var_udp_fatal_on_bind_error },
{ "udp_monitor_netlink", 1, &acl_var_udp_monitor_netlink },
{ "master_multicase_loopback", 0, &acl_var_udp_multicast_loopback },
{ 0, 0, 0 },
};
@ -124,6 +126,7 @@ char *acl_var_udp_log_debug;
char *acl_var_udp_private;
char *acl_var_udp_reuse_port;
static int var_udp_reuse_port = 0;
char *acl_var_udp_multicast_addr;
static ACL_CONFIG_STR_TABLE __conf_str_tab[] = {
{ "udp_queue_dir", "/opt/acl_master/var/queue", &acl_var_udp_queue_dir },
@ -132,7 +135,8 @@ static ACL_CONFIG_STR_TABLE __conf_str_tab[] = {
{ "udp_event_mode", "select", &acl_var_udp_event_mode },
{ "master_debug", "", &acl_var_udp_log_debug },
{ "master_private", "n", &acl_var_udp_private },
{ "master_reuseport", "yes", &acl_var_udp_reuse_port},
{ "master_reuseport", "yes", &acl_var_udp_reuse_port },
{ "master_multicast_addr", "", &acl_var_udp_multicast_addr },
{ 0, 0, 0 },
};
@ -677,9 +681,7 @@ static int __fdtype = ACL_VSTREAM_TYPE_LISTEN | ACL_VSTREAM_TYPE_LISTEN_INET;
static ACL_VSTREAM *server_bind_one(const char *addr)
{
ACL_VSTREAM *stream;
ACL_SOCKET fd;
unsigned flag = 0;
char local[MAX];
if (acl_var_udp_non_block) {
flag |= ACL_INET_FLAG_NBLOCK;
@ -689,24 +691,57 @@ static ACL_VSTREAM *server_bind_one(const char *addr)
flag |= ACL_INET_FLAG_REUSEPORT;
}
fd = acl_udp_bind(addr, flag);
if (fd == ACL_SOCKET_INVALID) {
acl_msg_warn("%s(%d), %s: bind %s error %s", __FILE__,
if (acl_var_udp_multicast_loopback) {
flag |= ACL_INET_FLAG_MULTILOOP_ON;
}
if (*acl_var_udp_multicast_addr != 0 && acl_valid_hostaddr(addr, 0)
&& !acl_valid_unix(addr)) {
char buf[MAX], *sep, iface[64];
int port = 0;
acl_snprintf(buf, sizeof(buf), "%s", addr);
if ((sep = strchr(buf, '|')) || (sep = strchr(buf, ':'))) {
const char *ptr;
*sep++ = 0;
port = atoi(sep);
if (buf[0] == 0) {
ptr = "0.0.0.0";
} else {
ptr = buf;
}
acl_snprintf(iface, sizeof(iface), "%s", ptr);
} else if (acl_alldig(buf)) {
port = atoi(buf);
acl_snprintf(iface, sizeof(iface), "0.0.0.0");
} else {
acl_msg_error("%s(%d), %s: invalid udp addr=%s",
__FILE__, __LINE__, __FUNCTION__, addr);
return NULL;
}
acl_msg_info("Begin bind multicast addr=%s, iface=%s, port=%d",
acl_var_udp_multicast_addr, iface, port);
stream = acl_vstream_bind_multicast(acl_var_udp_multicast_addr,
iface, port, acl_var_udp_rw_timeout, flag);
} else {
stream = acl_vstream_bind(addr, acl_var_udp_rw_timeout, flag);
}
if (stream == NULL) {
acl_msg_error("%s(%d), %s: bind %s error %s", __FILE__,
__LINE__, __FUNCTION__, addr, acl_last_serror());
return NULL;
}
#ifdef ACL_UNIX
acl_close_on_exec(fd, ACL_CLOSE_ON_EXEC);
acl_close_on_exec(ACL_VSTREAM_SOCK(stream), ACL_CLOSE_ON_EXEC);
#endif
stream = acl_vstream_fdopen(fd, O_RDWR, acl_var_udp_buf_size,
acl_var_udp_rw_timeout, __fdtype);
acl_getsockname(fd, local, sizeof(local));
acl_vstream_set_local(stream, local);
acl_vstream_set_udp_io(stream);
return stream;
}

View File

@ -360,3 +360,169 @@ void acl_vstream_set_udp_io(ACL_VSTREAM *stream)
ACL_VSTREAM_CTL_CONTEXT, stream->context,
ACL_VSTREAM_CTL_END);
}
/****************************************************************************/
static int join_multicast(ACL_SOCKET sock, const char *addr,
const char *iface, int enable_loop)
{
int on;
struct ip_mreq mreq;
memset(&mreq, 0, sizeof(mreq));
mreq.imr_multiaddr.s_addr = inet_addr(addr);
mreq.imr_interface.s_addr = strcmp(iface, "0.0.0.0") == 0 ?
htonl(INADDR_ANY) : inet_addr(iface);
if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
(const char*) &mreq, sizeof(mreq)) == -1) {
acl_msg_error("%s(%d), %s: setsockopt %d error %s", __FILE__,
__LINE__, __FUNCTION__, (int) sock, acl_last_serror());
return -1;
}
on = enable_loop;
if (setsockopt(sock,IPPROTO_IP, IP_MULTICAST_LOOP,
(const char*) &on, sizeof(on)) == -1) {
acl_msg_error("%s(%d), %s: setsockopt IP_MULTICAST_LOOP error %s",
__FILE__, __LINE__, __FUNCTION__, acl_last_serror());
return -1;
}
return 0;
}
static int multicast_read(ACL_SOCKET fd, void *buf, size_t size,
int timeout acl_unused, ACL_VSTREAM *stream, void *arg acl_unused)
{
return udp_read(fd, buf, size, timeout, stream, arg);
}
static int multicast_write(ACL_SOCKET fd, const void *buf, size_t size,
int timeout acl_unused, ACL_VSTREAM *stream, void *arg acl_unused)
{
int ret;
if (stream->sa_local_len == 0) {
acl_msg_error("%s, %s(%d): peer addr null",
__FUNCTION__, __FILE__, __LINE__);
return -1;
}
/* 与 UDP 单发不同之处使用了 local 保存的地址做为目标地址,因为其存放着组播地址 */
ret = (int) sendto(fd, buf, (int) size, 0,
(struct sockaddr*) stream->sa_local,
(int) stream->sa_local_len);
return ret;
}
ACL_VSTREAM *acl_vstream_bind_multicast(const char *addr, const char *iface,
int port, int timeout, unsigned flag)
{
ACL_SOCKET sock;
ACL_VSTREAM *stream;
char addrbuf[256];
int enable_loop = (flag & ACL_INET_FLAG_MULTILOOP_ON) ? 1 : 0;
#if defined(_WIN32) || defined(_WIN64)
acl_snprintf(addrbuf, sizeof(addrbuf), "%s|%d", iface, port);
#else
acl_snprintf(addrbuf, sizeof(addrbuf), "%s|%d", addr, port);
#endif
sock = acl_udp_bind(addrbuf, flag);
if (sock == ACL_SOCKET_INVALID) {
acl_msg_error("%s(%d), %s: bind addr %s error %s", __FILE__,
__LINE__, __FUNCTION__, addrbuf, acl_last_serror());
return NULL;
}
if (join_multicast(sock, addr, iface, enable_loop) == -1) {
acl_socket_close(sock);
return NULL;
}
stream = acl_vstream_fdopen(sock, O_RDWR, 4096, -1, ACL_VSTREAM_TYPE_SOCK);
stream->rw_timeout = timeout;
/* 设置本地绑定地址,该地址同时用做外发地址 */
acl_snprintf(addrbuf, sizeof(addrbuf), "%s|%d", addr, port);
acl_vstream_set_local(stream, addrbuf);
acl_vstream_ctl(stream,
ACL_VSTREAM_CTL_READ_FN, multicast_read,
ACL_VSTREAM_CTL_WRITE_FN, multicast_write,
ACL_VSTREAM_CTL_CONTEXT, stream->context,
ACL_VSTREAM_CTL_END);
return stream;
}
int acl_multicast_set_ttl(ACL_SOCKET sock, int ttl)
{
if (ttl < 0) {
acl_msg_error("%s: invalid ttl=%d", __FUNCTION__, ttl);
return -1;
}
if (sock == ACL_SOCKET_INVALID) {
acl_msg_error("%s: invalid socket", __FUNCTION__ );
return -1;
}
if (setsockopt(sock, IPPROTO_IP,IP_TTL,
(const char*) &ttl, sizeof(ttl)) == -1) {
acl_msg_error("%s: setsockopt IP_TTL error %s, sock=%d",
__FUNCTION__, acl_last_serror(), sock);
return -1;
}
return 0;
}
int acl_multicast_set_if(ACL_SOCKET sock, const char *addr)
{
struct in_addr in;
if (addr == NULL || *addr == 0) {
acl_msg_error("%s: addr null", __FUNCTION__);
return -1;
}
if (sock == ACL_SOCKET_INVALID) {
acl_msg_error("%s: invalid socket", __FUNCTION__);
return -1;
}
in.s_addr = inet_addr(addr);
if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
(const char*) &in, sizeof(in)) < 0) {
acl_msg_error("%s: set IP_MULTICAST_IF error=%s, addr=%s",
__FUNCTION__, acl_last_serror(), addr);
return -1;
}
return 0;
}
int acl_multicast_drop(ACL_SOCKET sock, const char *addr, const char *iface)
{
if (sock == ACL_SOCKET_INVALID) {
acl_msg_error("%s: invalid socket", __FUNCTION__);
return -1;
}
struct ip_mreq ipmr;
memset(&ipmr, 0, sizeof(ipmr));
ipmr.imr_interface.s_addr = strcmp(iface, "0.0.0.0") == 0 ?
htonl(INADDR_ANY) : inet_addr(iface);
ipmr.imr_multiaddr.s_addr = inet_addr(addr);
if (setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,
(const char*)&ipmr, sizeof(ipmr)) == -1) {
acl_msg_error("%s: set IP_DROP_MEMBERSHIP error=%s",
__FUNCTION__, acl_last_serror());
return -1;
}
return 0;
}

View File

@ -3240,7 +3240,7 @@ void acl_vstream_set_local(ACL_VSTREAM *fp, const char *addr)
acl_myfree(fp->sa_local);
}
fp->sa_local = set_sock_addr(addr, &fp->sa_local_size);
fp->sa_local = set_sock_addr(addr, &fp->sa_local_size);
if (fp->sa_local) {
if (fp->sa_local->sa_family == AF_INET) {
fp->type |= ACL_VSTREAM_TYPE_INET4;

View File

@ -20,6 +20,7 @@ enum {
OPEN_FLAG_REUSEPORT = (1 << 1), // 端口复用,要求 Linux3.0 以上
OPEN_FLAG_FASTOPEN = (1 << 2), // 是否启用 Fast open实验阶段
OPEN_FLAG_EXCLUSIVE = (1 << 3), // 是否禁止复用地址
OPEN_FLAG_MULTICAST_LOOP = (1 << 4), // 是否允许组播时接收回路包
};
/**

View File

@ -60,14 +60,48 @@ public:
time_unit_t unit = time_unit_s);
/**
* UDP UDP
* UDP UDP
* @param addr {const char*} ip:port
* UNIX Linux Linux abstract unix socket
* @param rw_timeout {int} ()
* @param flag {unsigned} server_socket.hpp
* @param flags {unsigned} server_socket.hpp
* @return {bool}
*/
bool bind_udp(const char* addr, int rw_timeout = -1, unsigned flag = 0);
bool bind_udp(const char* addr, int rw_timeout = -1, unsigned flags = 0);
/**
*
* @param addr {const char*} IP
* @param iface {const char*} IP
* @param port {int}
* @param rw_timeout {int} IO
* @param flags {unsigned} server_socket.hpp
* @return {bool}
*/
bool bind_multicast(const char* addr, const char* iface, int port,
int rw_timeout = -1, unsigned flags = 0);
/**
* bind_multicast 广 TTL
* @param ttl {int} 1--255
* @return {bool}
*/
bool multicast_set_ttl(int ttl);
/**
* bind_multicast 广 IP
* @param iface {const char*}
* @return {bool}
*/
bool multicast_set_if(const char* iface);
/**
* bind_multicast
* @param addr {const char*} 广 IP
* @param iface {const char*} IP
* @return {bool}
*/
bool multicast_drop(const char *addr, const char *iface);
/**
*

View File

@ -0,0 +1,4 @@
base_path = ../../..
include ./Makefile.in
CFLAGS += -Wno-write-strings
PROG = multicast

View File

@ -0,0 +1,96 @@
CC = g++
CFLAGS = -c -g -W -Wall -Wcast-qual -Wcast-align \
-Wno-long-long \
-Wpointer-arith -Werror -Wshadow -O3 \
-D_REENTRANT -D_POSIX_PTHREAD_SEMANTICS -D_USE_FAST_MACRO
#-Waggregate-return
###########################################################
#Check system:
# Linux, SunOS, Solaris, BSD variants, AIX, HP-UX
SYSLIB = -lpthread
CHECKSYSRES = @echo "Unknow system type!";exit 1
UNIXNAME = $(shell uname -sm)
ifeq ($(CC),)
CC = gcc
endif
# For FreeBSD
ifeq ($(findstring FreeBSD, $(UNIXNAME)), FreeBSD)
ifeq ($(findstring gcc, $(CC)), gcc)
CFLAGS += -Wstrict-prototypes
endif
CFLAGS += -DFREEBSD -D_REENTRANT
SYSLIB = -lpthread
endif
#Path for Linux
ifeq ($(findstring Linux, $(UNIXNAME)), Linux)
ifeq ($CC, "gcc")
CFLAGS += -Wstrict-prototypes
endif
CFLAGS += -DLINUX2 -D_REENTRANT
endif
#Path for SunOS
ifeq ($(findstring SunOS, $(UNIXNAME)), SunOS)
ifeq ($(findstring 86, $(UNIXNAME)), 86)
SYSLIB += -lsocket -lnsl -lrt
endif
ifeq ($(findstring sun4u, $(UNIXNAME)), sun4u)
SYSLIB += -lsocket -lnsl -lrt
endif
ifeq ($CC, "gcc")
CFLAGS += -Wstrict-prototypes
endif
CFLAGS += -DSUNOS5 -D_REENTRANT
endif
#Path for HP-UX
ifeq ($(findstring HP-UX, $(UNIXNAME)), HP-UX)
ifeq ($CC, "gcc")
CFLAGS += -Wstrict-prototypes
endif
CFLAGS += -DHP_UX -DHPUX11
PLAT_NAME=hp-ux
endif
#Find system type.
ifneq ($(SYSPATH),)
CHECKSYSRES = @echo "System is $(shell uname -sm)"
endif
###########################################################
CFLAGS += -I. -I.. -I../.. -I../../include -I../../../lib_acl/include -I../../../lib_protocol/include
EXTLIBS =
LDFLAGS = -L../../lib -l_acl_cpp -L../../../lib_protocol/lib -l_protocol -L../../../lib_acl/lib -l_acl \
$(EXTLIBS) $(SYSLIB)
COMPILE = $(CC) $(CFLAGS)
LINK = $(CC) $(OBJ) $(LDFLAGS)
###########################################################
OBJ_PATH = .
#Project's objs
SRC = $(wildcard *.cpp) $(wildcard ../../*.cpp)
OBJ = $(patsubst %.cpp, $(OBJ_PATH)/%.o, $(notdir $(SRC)))
$(OBJ_PATH)/%.o: %.cpp
$(COMPILE) $< -o $@
$(OBJ_PATH)/%.o: ../%.cpp
$(COMPILE) $< -o $@
.PHONY = all clean
all: RM $(OBJ)
$(LINK) -o $(PROG)
@echo ""
@echo "All ok! Output:$(PROG)"
@echo ""
RM:
rm -f $(PROG)
clean:
rm -f $(PROG)
rm -f $(OBJ)
###########################################################

View File

@ -0,0 +1,263 @@
#include "stdafx.h"
class multicast_thread : public acl::thread {
public:
multicast_thread() : loopback_(false) {}
virtual ~multicast_thread() {}
void set_multicast_loopback(bool on) {
loopback_ = on;
}
bool open(const char* addr, const char* iface, int port) {
unsigned oflags = acl::OPEN_FLAG_REUSEPORT;
if (loopback_) {
oflags |= acl::OPEN_FLAG_MULTICAST_LOOP;
}
if (ss_.bind_multicast(addr, iface, port, -1, oflags)) {
printf("Open ok, addr=%s, iface=%s, port=%d\r\n",
addr, iface, port);
return true;
}
printf("Bind error=%s, addr=%s, iface=%s, port=%d\r\n",
acl::last_serror(), addr, iface, port);
return false;
}
protected:
acl::socket_stream ss_;
bool loopback_;
};
class sender_thread : public multicast_thread {
public:
sender_thread(int count, acl::atomic_long& counter)
: count_(count), counter_(counter)
{
}
~sender_thread() {}
protected:
// @override
void* run() {
const char* data = "hello world!\r\n";
size_t dlen = strlen(data);
for (int i = 0; i < count_; i++) {
if (ss_.write(data, dlen, false) == -1) {
printf("send error %s\r\n", acl::last_serror());
break;
}
++counter_;
}
return NULL;
}
private:
int count_;
acl::atomic_long& counter_;
};
class reader_thread : public multicast_thread {
public:
reader_thread(acl::atomic_long& counter, bool echo)
: counter_(counter), echo_(echo)
{}
~reader_thread() {}
protected:
// @override
void* run() {
while (true) {
char buf[512];
int ret = ss_.read(buf, sizeof(buf) - 1, false);
if (ret == -1) {
printf("Read error=%s\r\n", acl::last_serror());
break;
}
if (echo_ && ss_.write(buf, ret) == -1) {
printf("Send error=%s\r\n", acl::last_serror());
break;
}
long long n = ++counter_;
if (n < 50) {
buf[ret] = 0;
printf("Thread-%lu read(count=%lld): %s",
acl::thread::self(), n, buf);
fflush(stdout);
} else if (n % 10000 == 0) {
char tmp[128];
snprintf(tmp, sizeof(tmp), "read count=%lld", n);
acl::meter_time("reader", __LINE__, tmp);
}
}
return NULL;
}
private:
acl::atomic_long& counter_;
bool echo_;
};
class monitor_thread : public acl::thread {
public:
monitor_thread(acl::atomic_long& counter) : counter_(counter) {}
~monitor_thread() {}
protected:
// @override
void* run() {
while (true) {
sleep(1);
printf(">>>read count=%lld<<<\r\n", counter_.value());
}
return NULL;
}
private:
acl::atomic_long& counter_;
};
static void usage(const char* proc) {
printf("usage: %s -h [help]\r\n"
" -m multicast_addr\r\n"
" -l local_iface[default: 0.0.0.0]\r\n"
" -p port[default: 8089]\r\n"
" -n count_for_sender[default: 10000]\r\n"
" -w senders_thread_count[default: 1]\r\n"
" -r readers_thread_count[default: 0]\r\n"
" -E [if echo the read data, default: false]\r\n"
" -O [if enable IP_MULTICAST_LOOP, default: false]\r\n"
, proc);
}
int main(int argc, char* argv[]) {
acl::string multicast_addr, local_iface = "0.0.0.0";
int ch, nsenders = 1, nreaders = 0, port = 8089, count = 10000;
acl::atomic_long write_counter, read_counter;
bool echo = false, loopback = false;
while ((ch = getopt(argc, argv, "hm:l:p:n:w:r:EO")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
return 0;
case 'w':
nsenders= atoi(optarg);
break;
case 'r':
nreaders = atoi(optarg);
break;
case 'm':
multicast_addr = optarg;
break;
case 'l':
local_iface = optarg;
break;
case 'p':
port = atoi(optarg);
break;
case 'n':
count = atoi(optarg);
break;
case 'E':
echo = true;
break;
case 'O':
loopback = true;
break;
default:
break;
}
}
if (multicast_addr.empty()) {
printf("multicast_addr empty!\r\n");
usage(argv[0]);
return 0;
}
acl::log::stdout_open(true);
std::vector<acl::thread*> senders, readers;
struct timeval begin;
gettimeofday(&begin, NULL);
for (int i = 0; i < nsenders; i++) {
sender_thread* thread = new sender_thread(count, write_counter);
thread->set_multicast_loopback(loopback);
if (!thread->open(multicast_addr, local_iface, port)) {
delete thread;
continue;
}
thread->set_detachable(false);
thread->start();
senders.push_back(thread);
}
for (int i = 0; i < nreaders; i++) {
reader_thread* thread = new reader_thread(read_counter, echo);
thread->set_multicast_loopback(loopback);
if (!thread->open(multicast_addr, local_iface, port)) {
delete thread;
continue;
}
thread->set_detachable(false);
thread->start();
readers.push_back(thread);
}
for (std::vector<acl::thread*>::iterator it = senders.begin();
it != senders.end(); ++it) {
(*it)->wait();
delete *it;
}
if (nsenders > 0) {
struct timeval end;
gettimeofday(&end, NULL);
double cost = acl::stamp_sub(end, begin);
double speed = (write_counter.value() * 1000)
/ (cost > 0 ? cost : 0.0001);
printf("All over, count=%lld, cost=%.2f seconds, speed=%.2f\r\n",
write_counter.value(), cost, speed);
}
acl::thread* monitor;
if (nreaders > 0) {
monitor = new monitor_thread(read_counter);
monitor->set_detachable(false);
monitor->start();
} else {
monitor = NULL;
}
for (std::vector<acl::thread*>::iterator it = readers.begin();
it != readers.end(); ++it) {
(*it)->wait();
delete *it;
}
if (monitor) {
monitor->wait();
delete monitor;
}
return 0;
}

View File

@ -0,0 +1,10 @@
// stdafx.h : 标准系统包含文件的包含文件,
// 或是常用但不常更改的项目特定的包含文件
//
#pragma once
// TODO: 在此处引用程序要求的附加头文件
#include "lib_acl.h"
#include "acl_cpp/lib_acl.hpp"

View File

@ -1,4 +1,4 @@
base_path = ../../..
include ../Makefile.in
include ./Makefile.in
CFLAGS += -Wno-write-strings
PROG = server

View File

@ -74,15 +74,13 @@ LINK = $(CC) $(OBJ) $(LDFLAGS)
OBJ_PATH = .
#Project's objs
SRC = $(wildcard *.cpp) $(wildcard ../*.cpp) $(wildcard ../../*.cpp)
SRC = $(wildcard *.cpp) $(wildcard ../../*.cpp)
OBJ = $(patsubst %.cpp, $(OBJ_PATH)/%.o, $(notdir $(SRC)))
$(OBJ_PATH)/%.o: %.cpp
$(COMPILE) $< -o $@
$(OBJ_PATH)/%.o: ../../%.cpp
$(COMPILE) $< -o $@
$(OBJ_PATH)/%.o: ../%.cpp
$(COMPILE) $< -o $@
.PHONY = all clean
all: RM $(OBJ)

View File

@ -0,0 +1,8 @@
// stdafx.cpp : 只包括标准包含文件的源文件
// xml.pch 将成为预编译头
// stdafx.obj 将包含预编译类型信息
#include "stdafx.h"
// TODO: 在 STDAFX.H 中
//引用任何所需的附加头文件,而不是在此文件中引用

View File

@ -59,32 +59,46 @@ bool socket_stream::open(ACL_VSTREAM* vstream, bool udp_mode /* = false */)
return true;
}
// ½« C++ API µÄ flags ת±äΪ C API µÄ flags.
static unsigned to_oflags(unsigned flags)
{
unsigned oflags = 0;
if (flags & OPEN_FLAG_NONBLOCK) {
oflags |= ACL_NON_BLOCKING;
} else {
oflags |= ACL_BLOCKING;
}
if (flags & OPEN_FLAG_REUSEPORT) {
oflags |= ACL_INET_FLAG_REUSEPORT;
}
// It's only useful for TCP.
if (flags & OPEN_FLAG_FASTOPEN) {
oflags |= ACL_INET_FLAG_FASTOPEN;
}
if (flags & OPEN_FLAG_EXCLUSIVE) {
oflags |= ACL_INET_FLAG_EXCLUSIVE;
}
if (flags & OPEN_FLAG_MULTICAST_LOOP) {
oflags |= ACL_INET_FLAG_MULTILOOP_ON;
}
return oflags;
}
bool socket_stream::bind_udp(const char* addr, int rw_timeout /* = 0 */,
unsigned flag /* = 0 */)
unsigned flags /* = 0 */)
{
if (stream_) {
acl_vstream_close(stream_);
}
unsigned oflag = 0;
if (flag & OPEN_FLAG_NONBLOCK) {
oflag |= ACL_NON_BLOCKING;
} else {
oflag |= ACL_BLOCKING;
}
if (flag & OPEN_FLAG_REUSEPORT) {
oflag |= ACL_INET_FLAG_REUSEPORT;
}
if (flag & OPEN_FLAG_FASTOPEN) {
oflag |= ACL_INET_FLAG_FASTOPEN;
}
if (flag & OPEN_FLAG_EXCLUSIVE) {
oflag |= ACL_INET_FLAG_EXCLUSIVE;
}
stream_ = acl_vstream_bind(addr, rw_timeout, oflag);
unsigned oflags = to_oflags(flags);
stream_ = acl_vstream_bind(addr, rw_timeout, oflags);
if (stream_ == NULL) {
return false;
}
@ -93,6 +107,54 @@ bool socket_stream::bind_udp(const char* addr, int rw_timeout /* = 0 */,
return true;
}
bool socket_stream::bind_multicast(const char *addr, const char *iface,
int port, int rw_timeout, unsigned int flags)
{
if (stream_) {
acl_vstream_close(stream_);
}
unsigned oflags = to_oflags(flags);
stream_ = acl_vstream_bind_multicast(addr, iface, port,
rw_timeout, oflags);
if (stream_ == NULL) {
return false;
}
eof_ = false;
opened_ = true;
return true;
}
bool socket_stream::multicast_set_ttl(int ttl)
{
if (eof_ || stream_ == NULL) {
logger_error("Socket not opened yet");
return false;
}
return acl_multicast_set_ttl(ACL_VSTREAM_SOCK(stream_), ttl) == 0;
}
bool socket_stream::multicast_set_if(const char *iface)
{
if (eof_ || stream_ == NULL) {
logger_error("Socket not opened yet");
return false;
}
return acl_multicast_set_if(ACL_VSTREAM_SOCK(stream_), iface) == 0;
}
bool socket_stream::multicast_drop(const char *addr, const char *iface)
{
if (eof_ || stream_ == NULL) {
logger_error("Socket not opened yet");
return false;
}
return acl_multicast_drop(ACL_VSTREAM_SOCK(stream_), addr, iface) == 0;
}
bool socket_stream::shutdown_read(void)
{
if (stream_ == NULL) {