acl stream and servers support UNIX domain socket in UDP packet mode

This commit is contained in:
zsx 2017-12-10 22:23:37 +08:00
parent e65bdd10bf
commit fa113a12e7
13 changed files with 228 additions and 67 deletions

View File

@ -358,7 +358,7 @@ static int service_udp(ACL_XINETD_CFG_PARSER *xcp, ACL_MASTER_SERV *serv)
acl_array_append(serv->addrs, addr);
serv->listen_fd_count++;
acl_msg_info("%s(%d), %s: add addr=%s", __FILE__,
__LINE__, __FUNCTION__, ptr);
__LINE__, __FUNCTION__, addr->addr);
}
acl_argv_free(addrs);
@ -494,6 +494,13 @@ static int service_args(ACL_XINETD_CFG_PARSER *xcp, ACL_MASTER_SERV *serv,
char unprivileged, chroot_var;
ACL_VSTRING *junk = acl_vstring_alloc(100);
/*
if (serv->max_proc == 1)
acl_argv_add(serv->args, "-l", (char *) 0);
if (serv->max_proc == 0)
acl_argv_add(serv->args, "-z", (char *) 0);
*/
/*
* Privilege level. Default is to restrict process privileges
* to those of the mail owner.
@ -559,27 +566,28 @@ static int service_args(ACL_XINETD_CFG_PARSER *xcp, ACL_MASTER_SERV *serv,
serv->args = acl_argv_alloc(0);
acl_argv_add(serv->args, command, (char *) 0);
name = get_str_ent(xcp, ACL_VAR_MASTER_SERV_SERVICE, (const char *) 0);
if (name == NULL || *name == 0) {
acl_msg_error("no %s found", ACL_VAR_MASTER_SERV_SERVICE);
return -1;
}
/* add "-f configure_file_path" flag */
acl_argv_add(serv->args, "-f", path, (char *) 0);
/* copy the configure filepath */
serv->conf = acl_mystrdup(path);
/*
if (serv->max_proc == 1)
acl_argv_add(serv->args, "-l", (char *) 0);
if (serv->max_proc == 0)
acl_argv_add(serv->args, "-z", (char *) 0);
*/
name = get_str_ent(xcp, ACL_VAR_MASTER_SERV_SERVICE, (const char *) 0);
if (name == NULL || *name == 0) {
acl_msg_error("no %s found", ACL_VAR_MASTER_SERV_SERVICE);
return -1;
}
if (strcmp(acl_safe_basename(command), name) != 0) {
char *tmp = acl_concatenate("\"", name, "\"", 0);
const char udp[] = "@udp";
char *tmp;
if (acl_strrncasecmp(name, udp, sizeof(udp) - 1) == 0)
tmp = acl_concatenate("\"", acl_var_master_queue_dir,
"/", ACL_MASTER_CLASS_PUBLIC, "/",
name, "\"", 0);
else
tmp = acl_concatenate("\"", name, "\"", 0);
acl_argv_add(serv->args, "-n", tmp, (char *) 0);
acl_myfree(tmp);
}
@ -909,7 +917,7 @@ ACL_MASTER_SERV *acl_master_ent_find(const char *path)
ACL_MASTER_SERV *serv;
if (acl_var_master_head == NULL) {
acl_msg_error("%s(%d), %s: acl_var_master_head null",
acl_msg_info("%s(%d), %s: acl_var_master_head null",
__FILE__, __LINE__, __FUNCTION__);
return NULL;
}

View File

@ -141,7 +141,7 @@ static int master_bind_udp(ACL_MASTER_SERV *serv)
switch (addr->type) {
case ACL_MASTER_SERV_TYPE_UDP:
serv->listen_streams[i] = acl_vstream_bind(addr->addr,
acl_var_master_rw_timeout);
acl_var_master_rw_timeout, serv->inet_flags);
break;
default:
acl_msg_panic("invalid type: %d, addr: %s",

View File

@ -1,48 +1,48 @@
#inet_interfaces = all
# 默认的每个服务的最大进程数
# 默认的每个服务的最大进程数
default_process_limit = 250
# 用户属主
# 用户属主
owner_user = root
master_owner = root
# 当停止 master 时是否等待所有子进程退出
# 当停止 master 时是否等待所有子进程退出
waiting_on_stop = true
# 组主
# 组主
owner_group = root
# 如果子进程服务异常退出重启该子进程服务的时间间隔()
service_throttle_time = 60
# 父进程进程名
# 如果子进程服务异常退出重启该子进程服务的时间间隔()
service_throttle_time = 10
# 父进程进程名
process_name = acl_master
#daemon_timeout = 18000
buf_size = 81920
# 父进程与子进程通信时的读写超时时间()
# 父进程与子进程通信时的读写超时时间()
rw_timeout = 60
# 父进程通知子进程重新加载配置时获得子进程反馈的超时时间(毫秒)
# 父进程通知子进程重新加载配置时获得子进程反馈的超时时间(毫秒)
reload_timeout = 5000
#max_use = 100
#max_idle = 100
#in_flow_delay = 1
# 调用 select 循环时的等待秒级值
# 调用 select 循环时的等待秒级值
event_delay_sec = 1
# 调用 select 循环时的等待微秒级值
# 调用 select 循环时的等待微秒级值
event_delay_usec = 5000
# 所有服务进程程序所在的默认目录
# 所有服务进程程序所在的默认目录
daemon_directory = {install_path}/libexec
# acl_master 进程运行时的日志记录文件
# acl_master 进程运行时的日志记录文件
log_file = {install_path}/var/log/acl_master
# 所有服务进程的服务配置文件所在的目录
# 所有服务进程的服务配置文件所在的目录
service_directory = {install_path}/conf/service
# 是否扫描并行 {install_path}/conf/service/ 目录下的子目录服务配置文件0 -- 1 --
# 是否扫描并行 {install_path}/conf/service/ 目录下的子目录服务配置文件0 -- 1 --
scan_subdir = 0
# 扫描 service_directory 下的配置文件时指定的文件扩展名
# 扫描 service_directory 下的配置文件时指定的文件扩展名
file_exts = .cf, .conf
# 指定了单独要启动的服务的配置文件列表
# 指定了单独要启动的服务的配置文件列表
service_file = {install_path}/conf/services.cf
# master 程序运行目录
# master 程序运行目录
queue_directory = {install_path}/var
# 记录 acl_master 运行时的进程号
# 记录 acl_master 运行时的进程号
pid_file = {install_path}/var/pid/acl_master.pid
# 是否允许自动切换用户运行身份
# 是否允许自动切换用户运行身份
# limit_privilege = 0
# acl_master 对外 WEB 管理接口地址
# acl_master 对外 WEB 管理接口地址
#manage_addr = 127.0.0.1:8190
manage_addr = {install_path}/var/public/master.sock

View File

@ -1,6 +1,10 @@
修改历史列表:
------------------------------------------------------------------------
615) 2017.12.10
615.1) feature: acl_udp_server.c 支持绑定 UNIX 域套接口
615.2) feature: net 中的 udp 模块支持绑定 UNIX 域套接口传输数据
614) 2017.11.21
614.1) feature: acl_argv.c 增加 acl_argv_set 用于替换指定下标位置的值

View File

@ -89,9 +89,10 @@ ACL_API ACL_VSTREAM *acl_vstream_connect(const char *addr, int block_mode,
* @param addr {const char*} UDP ip:port ip:0
*
* @param rw_timeout {int} ()
* @param flag {unsigned} ąę֞Ν
* @return {ACL_VSTREAM*} NULL
*/
ACL_API ACL_VSTREAM *acl_vstream_bind(const char *addr, int rw_timeout);
ACL_API ACL_VSTREAM *acl_vstream_bind(const char *addr, int rw_timeout, unsigned flag);
/**
* UDP IO

View File

@ -5,6 +5,8 @@ int main(void)
ACL_IFCONF *ifconf; /* 网卡查询结果对象 */
ACL_IFADDR *ifaddr; /* 每个网卡信息对象 */
ACL_ITER iter; /* 遍历对象 */
const char *pattern = "127.*.*.*:8290, 192.168.*.*:8290";
ACL_ARGV *addrs;
/* 查询本机所有网卡信息 */
ifconf = acl_get_ifaddrs();
@ -23,5 +25,19 @@ int main(void)
/* 释放查询结果 */
acl_free_ifaddrs(ifconf);
printf("\r\n----------------------------------------------\r\n");
addrs = acl_ifconf_search(pattern);
if (addrs == NULL) {
printf("acl_ifconf_search error\r\n");
return 1;
}
acl_foreach(iter, addrs) {
const char *addr = (const char *)iter.data;
printf(">>>ip: %s\r\n", addr);
}
acl_argv_free(addrs);
return 0;
}

View File

@ -191,10 +191,9 @@ ACL_VSTREAM *acl_vstream_connect_ex(const char *addr,
acl_msg_fatal("%s: addr null", myname);
ptr = strchr(addr, ':');
if (ptr != NULL) {
if (ptr != NULL)
connfd = acl_inet_connect_ex(addr, block_mode,
connect_timeout, he_errorp);
}
#ifdef ACL_WINDOWS
else {
acl_msg_error("%s(%d): addr(%s) invalid",
@ -202,13 +201,11 @@ ACL_VSTREAM *acl_vstream_connect_ex(const char *addr,
return NULL;
}
#elif defined(ACL_UNIX)
else {
else
connfd = acl_unix_connect(addr, block_mode, connect_timeout);
}
#else
else {
else
connfd = ACL_SOCKET_INVALID;
}
#endif
if (connfd == ACL_SOCKET_INVALID)
@ -235,6 +232,8 @@ ACL_VSTREAM *acl_vstream_connect(const char *addr, int block_mode,
rw_timeout, rw_bufsize, NULL);
}
/****************************************************************************/
static int udp_read(ACL_SOCKET fd, void *buf, size_t size,
int timeout acl_unused, ACL_VSTREAM *stream, void *arg acl_unused)
{
@ -283,6 +282,7 @@ static int udp_write(ACL_SOCKET fd, const void *buf, size_t size,
return ret;
}
#if 0
static ACL_SOCKET acl_bind(const char *addr, char *addr_buf, size_t size)
{
const char *myname = "acl_bind";
@ -401,12 +401,14 @@ static ACL_SOCKET acl_bind(const char *addr, char *addr_buf, size_t size)
return sock;
}
#endif
ACL_VSTREAM *acl_vstream_bind(const char *addr, int rw_timeout)
ACL_VSTREAM *acl_vstream_bind(const char *addr, int rw_timeout, unsigned flag)
{
ACL_VSTREAM *stream;
char addr_buf[256];
ACL_SOCKET sock = acl_bind(addr, addr_buf, sizeof(addr_buf));
// char addr_buf[256];
// ACL_SOCKET sock = acl_bind(addr, addr_buf, sizeof(addr_buf));
ACL_SOCKET sock = acl_udp_bind(addr, flag);
if (sock == ACL_SOCKET_INVALID) {
acl_msg_error("%s: bind addr %s error %s",
@ -418,7 +420,8 @@ ACL_VSTREAM *acl_vstream_bind(const char *addr, int rw_timeout)
stream->rw_timeout = rw_timeout;
/* 设置本地绑定地址 */
acl_vstream_set_local(stream, addr_buf);
// acl_vstream_set_local(stream, addr_buf);
acl_vstream_set_local(stream, addr);
/* 注册流读写回调函数 */
acl_vstream_ctl(stream,

View File

@ -66,11 +66,95 @@ ACL_SOCKET acl_inet_bind(const struct addrinfo *res, unsigned flag)
return fd;
}
#ifdef ACL_UNIX
static ACL_SOCKET acl_unix_bind(const char *addr, unsigned flag)
{
#undef sun
struct sockaddr_un sun;
int len = strlen(addr);
ACL_SOCKET sock;
/*
* Translate address information to internal form.
*/
if (len >= (int) sizeof(sun.sun_path)) {
acl_msg_error("%s: addr too long: %s", __FUNCTION__, addr);
return ACL_SOCKET_INVALID;
}
memset((char *) &sun, 0, sizeof(sun));
sun.sun_family = AF_UNIX;
#ifdef HAS_SUN_LEN
sun.sun_len = len + 1;
#endif
memcpy(sun.sun_path, addr, len + 1);
/*
* Create a listener socket. Do whatever we can so we don't run into
* trouble when this process is restarted after crash.
*/
if ((sock = socket(AF_UNIX, SOCK_DGRAM, 0)) == ACL_SOCKET_INVALID) {
acl_msg_error("%s: create socket error %s",
__FUNCTION__, acl_last_serror());
return ACL_SOCKET_INVALID;
}
(void) unlink(addr);
if (bind(sock, (struct sockaddr *) & sun, sizeof(sun)) < 0) {
acl_msg_error("%s: bind %s error %s",
__FUNCTION__, addr, acl_last_serror());
acl_socket_close(sock);
return ACL_SOCKET_INVALID;
}
#ifdef FCHMOD_UNIX_SOCKETS
if (fchmod(sock, 0666) < 0) {
acl_msg_fatal("%s: fchmod socket %s: %s",
__FUNCTION__, addr, acl_last_serror());
acl_socket_close(sock);
return ACL_SOCKET_INVALID;
}
#else
if (chmod(addr, 0666) < 0) {
acl_msg_error("%s: chmod socket error %s, addr=%s",
__FUNCTION__, acl_last_serror(), addr);
acl_socket_close(sock);
return ACL_SOCKET_INVALID;
}
#endif
acl_non_blocking(sock, flag & ACL_INET_FLAG_NBLOCK ?
ACL_NON_BLOCKING : ACL_BLOCKING);
return sock;
}
#endif
ACL_SOCKET acl_udp_bind(const char *addr, unsigned flag)
{
struct addrinfo *res0 = acl_host_addrinfo(addr, SOCK_DGRAM), *res;
struct addrinfo *res0, *res;
ACL_SOCKET fd;
#ifdef ACL_UNIX
const char udp_suffix[] = "@udp";
if (acl_strrncasecmp(addr, udp_suffix, sizeof(udp_suffix) - 1) == 0) {
char *buf = acl_mystrdup(addr), *at = strchr(buf, '@');
*at = 0;
if (*buf == 0) {
acl_msg_error("%s(%d): invalid addr=%s",
__FILE__, __LINE__, addr);
acl_myfree(buf);
return ACL_SOCKET_INVALID;
}
fd = acl_unix_bind(buf, flag);
printf("bind fd=%d, buf=%s\r\n", fd, buf);
acl_myfree(buf);
return fd;
}
#endif
res0 = acl_host_addrinfo(addr, SOCK_DGRAM);
if (res0 == NULL) {
acl_msg_error("%s(%d): host_addrinfo NULL, addr=%s",
__FILE__, __LINE__, addr);

View File

@ -2989,20 +2989,26 @@ int acl_vstream_close(ACL_VSTREAM *fp)
static struct sockaddr *set_sock_addr(const char *addr, size_t *sa_size)
{
char buf[256], *ptr;
char buf[1024], *ptr;
int port;
snprintf(buf, sizeof(buf), "%s", (addr));
ptr = strchr(buf, ':');
if (ptr == NULL) {
*sa_size = 0;
return NULL;
if (ptr == NULL)
port = -1;
else {
*ptr++ = 0;
port = atoi(ptr);
}
*ptr++ = 0;
port = atoi(ptr);
if (acl_is_ipv4(buf)) {
struct sockaddr_in *in = (struct sockaddr_in *)
struct sockaddr_in *in;
if (port < 0) {
*sa_size = 0;
return NULL;
}
in = (struct sockaddr_in *)
acl_mycalloc(1, sizeof(struct sockaddr_in));
in->sin_family = AF_INET;
in->sin_port = htons(port);
@ -3013,7 +3019,12 @@ static struct sockaddr *set_sock_addr(const char *addr, size_t *sa_size)
}
#ifdef AF_INET6
else if (acl_is_ipv6(buf)) {
struct sockaddr_in6 *in = (struct sockaddr_in6 *)
struct sockaddr_in6 *in;
if (port < 0) {
*sa_size = 0;
return NULL;
}
in = (struct sockaddr_in6 *)
acl_mycalloc(1, sizeof(struct sockaddr_in6));
in->sin6_family = AF_INET6;
in->sin6_port = htons(port);
@ -3022,10 +3033,37 @@ static struct sockaddr *set_sock_addr(const char *addr, size_t *sa_size)
return (struct sockaddr *) in;
}
#endif
else {
*sa_size = 0;
return NULL;
#ifdef ACL_UNIX
#define UDP_SUFFIX "@udp"
if (acl_strrncasecmp(buf, UDP_SUFFIX, sizeof(UDP_SUFFIX) - 1) == 0) {
struct sockaddr_un *un;
char *at = strrchr(buf, '@');
int len;
*at = 0;
len = (int) strlen(buf);
if (len == 0) {
*sa_size = 0;
return NULL;
}
un = (struct sockaddr_un *)
acl_mycalloc(1, sizeof(struct sockaddr_un));
un->sun_family = AF_UNIX;
#ifdef HAS_SUN_LEN
un->sun_len = len + 1;
#endif
memcpy(un->sun_path, buf, len + 1);
*sa_size = sizeof(struct sockaddr_un);
printf("set sock addr=%s\r\n", buf);
return (struct sockaddr *) un;
}
#endif
*sa_size = 0;
return NULL;
}
void acl_vstream_set_local(ACL_VSTREAM *fp, const char *addr)

View File

@ -55,9 +55,10 @@ public:
* UDP UDP
* @param addr {const char*} ip:port
* @param rw_timeout {int} ()
* @param flag {unsigned}
* @return {bool}
*/
bool bind_udp(const char* addr, int rw_timeout = 0);
bool bind_udp(const char* addr, int rw_timeout = 0, unsigned flag = 0);
/**
*

View File

@ -100,6 +100,7 @@ int main(int argc, char* argv[])
#ifdef WIN32
acl::acl_cpp_init();
#endif
acl::log::stdout_open(true);
snprintf(__server_addr, sizeof(__server_addr), "127.0.0.1:8888");
snprintf(__local_addr, sizeof(__local_addr), "127.0.0.1:18888");

View File

@ -0,0 +1,4 @@
#!/bin/sh
# udp transfer with UNIX domain socket
#./udp_client -s /opt/soft/acl-master/var/public/test.sock@udp -l /tmp/tt.sock@udp -n 100000
./udp_client -s 127.0.0.1:8888 -l 127.0.0.1:8889 -n 100000

View File

@ -51,11 +51,12 @@ bool socket_stream::open(ACL_VSTREAM* vstream, bool udp_mode /* = false */)
return true;
}
bool socket_stream::bind_udp(const char* addr, int rw_timeout /* = 0 */)
bool socket_stream::bind_udp(const char* addr, int rw_timeout /* = 0 */,
unsigned flag /* = 0 */)
{
if (stream_)
acl_vstream_close(stream_);
stream_ = acl_vstream_bind(addr, rw_timeout);
stream_ = acl_vstream_bind(addr, rw_timeout, flag);
if (stream_ == NULL)
return false;
eof_ = false;
@ -140,8 +141,8 @@ const char* socket_stream::get_peer_ip() const
if (stream_ == NULL)
return dummy_;
if (peer_ip_[0] != 0)
return peer_ip_;
//if (peer_ip_[0] != 0)
// return peer_ip_;
char* ptr = ACL_VSTREAM_PEER(stream_);
if (ptr == NULL || *ptr == 0)
@ -203,8 +204,8 @@ const char* socket_stream::get_local_ip() const
return dummy_;
// xxx: acl_vstream 中没有对此地址赋值
if (local_ip_[0] != 0)
return local_ip_;
//if (local_ip_[0] != 0)
// return local_ip_;
char* ptr = ACL_VSTREAM_LOCAL(stream_);
if (ptr == NULL || *ptr == 0)