add proc_on_bind in master_udp class

This commit is contained in:
zhengshuxin 2017-07-05 14:31:18 +08:00
parent 98d11361bb
commit 99660d0734
8 changed files with 115 additions and 117 deletions

View File

@ -47,6 +47,8 @@ service server {
# master_env = logme:FALSE, priority:E_LOG_INFO, action:E_LOG_PER_DAY, flush:sync_flush, imit_size:512,\
# sync_action:E_LOG_SEM, sem_name:/tmp/udp_echo.sem
# 启动时创建的固定线程数
udp_threads = 1
# 是否允许产生 core 文件
# udp_enable_core = 1
# 每个进程实例处理连接数的最大次数超过此值后进程实例主动退出

View File

@ -34,11 +34,11 @@ acl::master_int64_tbl var_conf_int64_tab[] = {
//////////////////////////////////////////////////////////////////////////////
master_service::master_service()
master_service::master_service(void)
{
}
master_service::~master_service()
master_service::~master_service(void)
{
}
@ -54,17 +54,27 @@ void master_service::on_read(acl::socket_stream* stream)
stream->write(buf, n);
}
void master_service::proc_on_init()
void master_service::thread_on_init(void)
{
logger(">>thread_on_init<<<");
}
void master_service::proc_on_bind(acl::socket_stream&)
{
logger(">>>proc_on_bind<<<");
}
void master_service::proc_on_init(void)
{
logger(">>>proc_on_init<<<");
}
void master_service::proc_on_exit()
void master_service::proc_on_exit(void)
{
logger(">>>proc_on_exit<<<");
}
void master_service::proc_on_sighup()
void master_service::proc_on_sighup(void)
{
logger(">>>proc_on_sighup<<<");
}

View File

@ -22,33 +22,25 @@ extern acl::master_int64_tbl var_conf_int64_tab[];
class master_service : public acl::master_udp
{
public:
master_service();
~master_service();
master_service(void);
~master_service(void);
protected:
/**
* @override
*
* @param stream {aio_socket_stream*} UDP
*/
// @override
void on_read(acl::socket_stream* stream);
/**
* @override
*
*
*/
void proc_on_init();
// @override
void thread_on_init(void);
/**
* @override
* 退
*/
void proc_on_exit();
// @override
void proc_on_bind(acl::socket_stream& stream);
/**
* @override
* SIGHUP
*/
void proc_on_sighup();
// @override
void proc_on_init(void);
// @override
void proc_on_exit(void);
// @override
void proc_on_sighup(void);
};

View File

@ -44,6 +44,7 @@ extern "C" {
#define ACL_MASTER_SERVER_EXIT_TIMER 26
#define ACL_MASTER_SERVER_ON_LISTEN 27
#define ACL_MASTER_SERVER_ON_BIND ACL_MASTER_SERVER_ON_LISTEN
#define ACL_MASTER_SERVER_SIGHUP 28
@ -69,6 +70,7 @@ typedef void (*ACL_MASTER_SERVER_INIT_FN) (void *);
typedef int (*ACL_MASTER_SERVER_LOOP_FN) (void *);
typedef void (*ACL_MASTER_SERVER_EXIT_FN) (void *);
typedef void (*ACL_MASTER_SERVER_ON_LISTEN_FN) (void *, ACL_VSTREAM *);
typedef void (*ACL_MASTER_SERVER_ON_BIND_FN) (void *, ACL_VSTREAM *);
typedef int (*ACL_MASTER_SERVER_ON_ACCEPT_FN) (void *, ACL_VSTREAM *);
typedef int (*ACL_MASTER_SERVER_HANDSHAKE_FN) (void *, ACL_VSTREAM *);
typedef void (*ACL_MASTER_SERVER_DISCONN_FN) (void *, ACL_VSTREAM *);

View File

@ -127,12 +127,12 @@ static ACL_CONFIG_STR_TABLE __conf_str_tab[] = {
{ 0, 0, 0 },
};
typedef struct SERVER {
typedef struct UDP_SERVER {
acl_pthread_t tid;
ACL_EVENT *event;
ACL_VSTREAM **streams;
int socket_count;
} SERVER;
} UDP_SERVER;
/*
* Global state.
@ -141,21 +141,22 @@ static ACL_UDP_SERVER_FN __service_main;
static ACL_MASTER_SERVER_EXIT_FN __service_exit;
static ACL_MASTER_SERVER_THREAD_INIT_FN __thread_init;
static ACL_MASTER_SERVER_SIGHUP_FN __sighup_handler;
static ACL_MASTER_SERVER_ON_BIND_FN __server_on_bind;
static void *__thread_init_ctx = NULL;
static void *__thread_init_ctx = NULL;
static int __event_mode;
static int __socket_count = 1;
static SERVER *__servers = NULL;
static __thread SERVER *__server = NULL;
static ACL_EVENT *__main_event = NULL;
static __thread UDP_SERVER *__server = NULL;
static int __event_mode;
static int __socket_count = 1;
static UDP_SERVER *__servers = NULL;
static ACL_EVENT *__main_event = NULL;
static const char *__service_name;
static char **__service_argv;
static void *__service_ctx;
static int __daemon_mode = 1;
static const char *__service_name;
static char **__service_argv;
static void *__service_ctx;
static int __daemon_mode = 1;
static ACL_ATOMIC_CLOCK *__clock = NULL;
static ACL_ATOMIC_CLOCK *__clock = NULL;
ACL_EVENT *acl_udp_server_event(void)
{
@ -185,7 +186,7 @@ void acl_udp_server_cancel_timer(ACL_EVENT_NOTIFY_TIME timer_fn, void *arg)
acl_event_cancel_timer(acl_udp_server_event(), timer_fn, arg);
}
static void server_stop(SERVER *server)
static void server_stop(UDP_SERVER *server)
{
int i;
@ -367,9 +368,10 @@ static void log_event_mode(int event_mode)
}
}
static SERVER *servers_alloc(int event_mode, int nthreads, int sock_count)
static UDP_SERVER *servers_alloc(int event_mode, int nthreads, int sock_count)
{
SERVER *servers = (SERVER *) acl_mycalloc(nthreads, sizeof(SERVER));
UDP_SERVER *servers = (UDP_SERVER *)
acl_mycalloc(nthreads, sizeof(UDP_SERVER));
int i;
for (i = 0; i < nthreads; i++) {
@ -386,7 +388,7 @@ static SERVER *servers_alloc(int event_mode, int nthreads, int sock_count)
static int __fdtype = ACL_VSTREAM_TYPE_LISTEN | ACL_VSTREAM_TYPE_LISTEN_INET;
static void server_binding(SERVER *server, ACL_ARGV *addrs)
static void server_binding(UDP_SERVER *server, ACL_ARGV *addrs)
{
ACL_ITER iter;
int i = 0;
@ -420,12 +422,12 @@ static void server_binding(SERVER *server, ACL_ARGV *addrs)
__FILE__, __LINE__, __FUNCTION__);
}
static SERVER *servers_binding(const char *service,
static UDP_SERVER *servers_binding(const char *service,
int event_mode, int nthreads)
{
ACL_ARGV *addrs = acl_ifconf_search(service);
SERVER *servers;
int i = 0;
UDP_SERVER *servers;
int i = 0;
if (addrs == NULL)
acl_msg_fatal("%s(%d), %s: no addrs available for %s",
@ -443,7 +445,7 @@ static SERVER *servers_binding(const char *service,
#ifdef ACL_UNIX
static void server_open(SERVER *server, int sock_count)
static void server_open(UDP_SERVER *server, int sock_count)
{
ACL_SOCKET fd = ACL_MASTER_LISTEN_FD;
int i = 0;
@ -468,9 +470,9 @@ static void server_open(SERVER *server, int sock_count)
}
}
static SERVER *servers_open(int event_mode, int nthreads, int sock_count)
static UDP_SERVER *servers_open(int event_mode, int nthreads, int sock_count)
{
SERVER *servers;
UDP_SERVER *servers;
int i;
servers = servers_alloc(event_mode, nthreads, sock_count);
@ -483,7 +485,7 @@ static SERVER *servers_open(int event_mode, int nthreads, int sock_count)
#endif /* ACL_UNIX */
static SERVER *servers_create(const char *service, int nthreads)
static UDP_SERVER *servers_create(const char *service, int nthreads)
{
if (strcasecmp(acl_var_udp_event_mode, "poll") == 0)
__event_mode = ACL_EVENT_POLL;
@ -515,7 +517,7 @@ static SERVER *servers_create(const char *service, int nthreads)
static void *thread_main(void *ctx)
{
/* set thread local storage */
__server = (SERVER *) ctx;
__server = (UDP_SERVER *) ctx;
if (__thread_init)
__thread_init(__thread_init_ctx);
@ -585,11 +587,26 @@ static void main_thread_loop(void)
}
}
static void servers_start(SERVER *servers, int nthreads)
static void servers_start(UDP_SERVER *servers, int nthreads)
{
acl_pthread_attr_t attr;
int i;
if (nthreads <= 0)
acl_msg_fatal("%s(%d), %s: invalid nthreads %d",
__FILE__, __LINE__, __FUNCTION__, nthreads);
if (__server_on_bind) {
for (i = 0; i < nthreads; i++) {
UDP_SERVER *server = &servers[i];
int j;
for (j = 0; j > server->socket_count; j++)
__server_on_bind(__service_ctx,
server->streams[j]);
}
}
__clock = acl_atomic_clock_alloc();
acl_pthread_attr_init(&attr);
acl_pthread_attr_setdetachstate(&attr, ACL_PTHREAD_CREATE_DETACHED);
@ -728,6 +745,10 @@ void acl_udp_server_main(int argc, char **argv, ACL_UDP_SERVER_FN service, ...)
__sighup_handler =
va_arg(ap, ACL_MASTER_SERVER_SIGHUP_FN);
break;
case ACL_MASTER_SERVER_ON_BIND:
__server_on_bind =
va_arg(ap, ACL_MASTER_SERVER_ON_BIND_FN);
break;
default:
acl_msg_panic("%s: unknown type: %d", myname, key);
}

View File

@ -1,6 +1,5 @@
#pragma once
#include "master_base.hpp"
#include "../stdlib/locker.hpp"
namespace acl {
@ -32,11 +31,21 @@ protected:
virtual ~master_udp();
/**
* UDP
* UDP 线
* @param stream {socket_stream*}
*/
virtual void on_read(socket_stream* stream) = 0;
/**
* UDP 线
*/
virtual void proc_on_bind(socket_stream&) {}
/**
* 线
*/
virtual void thread_on_init(void) {}
/**
*
* @return {const std::vector<socket_stream*>&}
@ -46,22 +55,18 @@ protected:
return sstreams_;
}
/**
* 线
*/
virtual void thread_on_init(void) {}
private:
std::vector<socket_stream*> sstreams_;
locker locker_;
void close_sstreams(void);
void run(int argc, char** argv);
private:
// 当接收到一个客户端连接时回调此函数
static void service_main(void*, ACL_VSTREAM*);
// 当绑定地址成功后的回调函数
static void service_on_bind(void*, ACL_VSTREAM*);
// 当进程切换用户身份后调用的回调函数
static void service_pre_jail(void*);

View File

@ -13,24 +13,12 @@ master_udp::master_udp(void) {}
master_udp::~master_udp(void)
{
close_sstreams();
}
void master_udp::close_sstreams(void)
{
std::vector<socket_stream*>::iterator it = sstreams_.begin();
for (; it != sstreams_.end(); ++it)
for (std::vector<socket_stream*>::iterator it = sstreams_.begin();
it != sstreams_.end(); ++it)
{
// 当在 daemon 运行模式socket_stream 中的 ACL_VSTREAM
// 对象将由 lib_acl 库中的 acl_udp_server 内部关闭,所以
// 此处需要将 ACL_VSTREAM 对象与 socket_stream 解耦
if (daemon_mode_)
(*it)->unbind();
(*it)->unbind();
delete *it;
}
// 必须清空流集合,因为该函数在析构时还将被调用一次
sstreams_.clear();
}
static bool __has_called = false;
@ -41,6 +29,7 @@ void master_udp::run(int argc, char** argv)
acl_udp_server_main(argc, argv, service_main,
ACL_MASTER_SERVER_CTX, this,
ACL_APP_CTL_THREAD_INIT_CTX, this,
ACL_MASTER_SERVER_ON_BIND, service_on_bind,
ACL_MASTER_SERVER_PRE_INIT, service_pre_jail,
ACL_MASTER_SERVER_POST_INIT, service_init,
ACL_MASTER_SERVER_EXIT, service_exit,
@ -102,31 +91,13 @@ bool master_udp::run_alone(const char* addrs, const char* path /* = NULL */,
//////////////////////////////////////////////////////////////////////////
static void on_close(ACL_VSTREAM* stream, void* ctx)
{
if (ctx && stream->context == ctx)
{
socket_stream* ss = (socket_stream*) ctx;
ss->unbind();
delete ss;
}
}
void master_udp::service_main(void* ctx, ACL_VSTREAM *stream)
{
master_udp* mu = (master_udp *) ctx;
acl_assert(mu != NULL);
socket_stream* ss = (socket_stream*) stream->context;
if (ss == NULL)
{
// 当本函数第一次被调用时,需要打开 socket_stream 流
ss = NEW socket_stream();
if (ss->open(stream) == false)
logger_fatal("open stream error!");
stream->context = ss;
acl_vstream_add_close_handle(stream, on_close, ss);
}
acl_assert(ss);
/*
#ifndef ACL_WINDOWS
@ -170,27 +141,23 @@ void master_udp::thread_init(void* ctx)
{
master_udp* mu = (master_udp *) ctx;
acl_assert(mu != NULL);
#ifndef ACL_WINDOWS
if (mu->daemon_mode_)
{
ACL_VSTREAM** streams = acl_udp_server_streams();
mu->locker_.lock();
if (streams != NULL)
{
for (int i = 0; streams[i] != NULL; i++)
{
socket_stream* ss = NEW socket_stream();
if (ss->open(streams[i]) == false)
logger_fatal("open stream error!");
mu->sstreams_.push_back(ss);
}
}
mu->locker_.unlock();
}
#endif
mu->thread_on_init();
}
void master_udp::service_on_bind(void* ctx, ACL_VSTREAM* stream)
{
master_udp* mu = (master_udp *) ctx;
acl_assert(mu);
socket_stream* ss = NEW socket_stream();
if (ss->open(stream) == false)
logger_fatal("open stream error!");
stream->context = ss;
mu->sstreams_.push_back(ss);
mu->proc_on_bind(*ss);
}
void master_udp::service_on_sighup(void* ctx)
{
master_udp* mu = (master_udp *) ctx;

View File

@ -530,10 +530,9 @@ static void servers_start(FIBER_SERVER *servers, int nthreads)
/* this can only be called in the main thread */
if (__server_on_listen) {
int j;
for (i = 0; i < nthreads; i++) {
FIBER_SERVER *server = &servers[i];
int j;
for (j = 0; j < server->socket_count; j++)
__server_on_listen(__service_ctx,