service_guard for tcp_service

This commit is contained in:
zsx 2018-01-29 16:51:24 +08:00
parent b5720deac5
commit 8dff32f40f
14 changed files with 310 additions and 76 deletions

View File

@ -15,17 +15,21 @@
class guard_report
{
public:
guard_report(const char* guard_manager, int conn_timeout = 10,
int rw_timeout = 10);
guard_report(const char* guard_manager, acl::tcp_ipc& ipc,
int conn_timeout = 10, int rw_timeout = 10);
~guard_report(void) {}
bool report(const acl::string& body);
private:
acl::string guard_manager_;
acl::tcp_ipc& ipc_;
int conn_timeout_;
int rw_timeout_;
bool tcp_report(const acl::string& body);
bool udp_report(const acl::string& body);
bool get_one_addr(acl::string& addr);
bool resolve_domain(std::vector<acl::string>& ips, int& port);
};

View File

@ -1,9 +1,16 @@
#include "stdafx.h"
#include "guard_report.h"
guard_report::guard_report(const char* guard_manager,
#define SET_TIME(x) do { \
struct timeval _tv; \
gettimeofday(&_tv, NULL); \
(x) = ((long long) _tv.tv_sec) * 1000 + ((long long) _tv.tv_usec)/ 1000; \
} while (0)
guard_report::guard_report(const char* guard_manager, acl::tcp_ipc& ipc,
int conn_timeout /* = 10 */, int rw_timeout /* = 10 */)
: guard_manager_(guard_manager)
, ipc_(ipc)
, conn_timeout_(conn_timeout)
, rw_timeout_(rw_timeout)
{
@ -17,34 +24,16 @@ bool guard_report::report(const acl::string& body)
return udp_report(body);
}
bool guard_report::tcp_report(const acl::string& body)
{
acl::socket_stream conn;
if (conn.open(guard_manager_, conn_timeout_, rw_timeout_) == false) {
logger_error("connect %s error %s", guard_manager_.c_str(),
acl::last_serror());
return false;
}
if (conn.write(body) == -1) {
logger_error("write to %s error %s", guard_manager_.c_str(),
acl::last_serror());
return false;
}
return true;
}
bool guard_report::udp_report(const acl::string& body)
bool guard_report::resolve_domain(std::vector<acl::string>& ips, int& port)
{
acl::string domain(guard_manager_);
char* ptr = domain.c_str();
char* port = strchr(ptr, ':');
if (port == NULL || port == ptr || *(port + 1) == 0) {
char* port_s = strchr(ptr, ':');
if (port_s == NULL || port_s == ptr || *(port_s + 1) == 0) {
logger_error("invalid guard_manager=%s", guard_manager_.c_str());
return false;
}
*port++ = 0;
*port_s++ = 0;
int h_error;
ACL_DNS_DB *db = acl_gethostbyname(domain.c_str(), &h_error);
@ -54,21 +43,59 @@ bool guard_report::udp_report(const acl::string& body)
return false;
}
acl::string ip;
ACL_ITER iter;
acl_foreach(iter, db) {
ACL_HOSTNAME* host = (ACL_HOSTNAME *) iter.data;
ip = host->ip;
ips.push_back(host->ip);
}
acl_netdb_free(db);
if (ip.empty()) {
if (ips.empty()) {
logger_error("no ip found, domain=%s", domain.c_str());
return false;
}
ip << ":" << port;
port = atoi(port_s);
return true;
}
// printf("addr=%s, %s\n", guard_manager_.c_str(), ip.c_str());
bool guard_report::get_one_addr(acl::string& addr)
{
std::vector<acl::string> ips;
int port;
if (resolve_domain(ips, port) == false)
return false;
long long n;
SET_TIME(n);
n = n % ips.size();
addr.format("%s:%d", ips[n].c_str(), port);
return true;
}
bool guard_report::tcp_report(const acl::string& body)
{
acl::string addr;
if (get_one_addr(addr) == false)
return false;
if (ipc_.send(addr, body, (unsigned) body.size()) == false) {
logger_error("send to %s error %s",
addr.c_str(), acl::last_serror());
return false;
}
return true;
}
bool guard_report::udp_report(const acl::string& body)
{
acl::string addr;
if (get_one_addr(addr) == false)
return false;
// printf("addr=%s, %s\n", guard_manager_.c_str(), addr.c_str());
const char* local_addr = "0.0.0.0:0";
acl::socket_stream stream;
if (stream.bind_udp(local_addr) == false) {
@ -76,7 +103,7 @@ bool guard_report::udp_report(const acl::string& body)
local_addr, acl::last_serror());
return false;
}
stream.set_peer(ip);
stream.set_peer(addr);
if (stream.write(body) == false) {
logger_error("write %s error %s",
body.c_str(), acl::last_serror());

View File

@ -36,9 +36,10 @@ static long get_mem(const std::list<proc_info_t>& procs)
}
service_list::service_list(const char* master_ctld, const char* guard_manager,
int conn_timeout, int rw_timeout)
acl::tcp_ipc& ipc, int conn_timeout, int rw_timeout)
: master_ctld_(master_ctld)
, guard_manager_(guard_manager)
, ipc_(ipc)
, conn_timeout_(conn_timeout)
, rw_timeout_(rw_timeout)
{
@ -85,6 +86,6 @@ bool service_list::run(void)
serialize<service_list_res_t>(list_res, body);
//logger("|%s|, len=%d", body.c_str(), (int) body.size());
guard_report report(guard_manager_, conn_timeout_, rw_timeout_);
guard_report report(guard_manager_, ipc_, conn_timeout_, rw_timeout_);
return report.report(body);
}

View File

@ -16,7 +16,7 @@ class service_list
{
public:
service_list(const char* master_ctld, const char* guard_manager,
int conn_timeout = 10, int rw_timeout = 10);
acl::tcp_ipc& ipc, int conn_timeout = 10, int rw_timeout = 10);
~service_list(void) {}
bool run(void);
@ -24,6 +24,7 @@ public:
private:
acl::string master_ctld_;
acl::string guard_manager_;
acl::tcp_ipc& ipc_;
int conn_timeout_;
int rw_timeout_;
};

View File

@ -1,4 +1,5 @@
#include "stdafx.h"
#include <signal.h>
#include "action/service_list.h"
#include "master_service.h"
@ -21,9 +22,11 @@ acl::master_bool_tbl var_conf_bool_tab[] = {
int var_cfg_conn_timeout;
int var_cfg_rw_timeout;
int var_cfg_connection_idle;
acl::master_int_tbl var_conf_int_tab[] = {
{ "conn_timeout", 30, &var_cfg_conn_timeout, 0, 0 },
{ "rw_timeout", 30, &var_cfg_rw_timeout, 0, 0 },
{ "connection_idle", 30, &var_cfg_connection_idle, 0, 0 },
{ 0, 0, 0, 0, 0 }
};
@ -35,8 +38,54 @@ acl::master_int64_tbl var_conf_int64_tab[] = {
//////////////////////////////////////////////////////////////////////////////
master_service::master_service()
class ipc_monitor : public acl::thread
{
public:
ipc_monitor(acl::tcp_ipc& ipc, int ttl, bool service_exit)
: ipc_(ipc)
, ttl_(ttl)
, service_exit_(service_exit)
{
}
~ipc_monitor(void) {}
private:
void* run(void)
{
while (!service_exit_)
{
sleep(1);
check_idle();
}
return NULL;
}
void check_idle(void)
{
acl::tcp_manager& manager = ipc_.get_manager();
std::vector<acl::connect_pool*>& pools = manager.get_pools();
for (std::vector<acl::connect_pool*>::iterator it = pools.begin();
it != pools.end(); ++it)
{
(*it)->check_idle(ttl_);
}
}
private:
acl::tcp_ipc& ipc_;
int ttl_;
bool service_exit_;
};
master_service::master_service()
: service_exit_(false)
, monitor_(NULL)
{
ipc_.set_limit(0)
.set_idle(30)
.set_conn_timeout(var_cfg_conn_timeout)
.set_rw_timeout(var_cfg_rw_timeout);
}
master_service::~master_service()
@ -45,18 +94,33 @@ master_service::~master_service()
void master_service::on_trigger()
{
service_list action(var_cfg_master_ctld, var_cfg_guard_manager);
service_list action(var_cfg_master_ctld, var_cfg_guard_manager, ipc_);
action.run();
}
void master_service::proc_on_init()
{
logger(">>>proc_on_init<<<");
monitor_ = new ipc_monitor(ipc_, var_cfg_connection_idle,
service_exit_);
monitor_->set_detachable(false);
monitor_->start();
}
static void wait_timeout(int)
{
exit (1);
}
void master_service::proc_on_exit()
{
logger(">>>proc_on_exit<<<");
service_exit_ = true;
signal(SIGALRM, wait_timeout);
alarm(10);
monitor_->wait();
delete monitor_;
}
bool master_service::proc_on_sighup(acl::string&)

View File

@ -48,4 +48,9 @@ protected:
* SIGHUP
*/
bool proc_on_sighup(acl::string&);
private:
acl::tcp_ipc ipc_;
bool service_exit_;
acl::thread* monitor_;
};

View File

@ -17,12 +17,12 @@ class guard_action : public acl::thread_job
public:
guard_action(const char* ip, const char* data);
private:
~guard_action(void);
// @override
void* run(void);
private:
~guard_action(void);
private:
acl::string ip_;
acl::string data_;

View File

@ -11,7 +11,20 @@
*/
#pragma once
extern char *var_cfg_redis_addrs;
extern char *var_cfg_redis_passwd;
extern char *var_cfg_main_service_list;
extern acl::master_str_tbl var_conf_str_tab[];
extern acl::master_bool_tbl var_conf_bool_tab[];
extern int var_cfg_threads_max;
extern int var_cfg_threads_idle;
extern int var_cfg_redis_conn_timeout;
extern int var_cfg_redis_rw_timeout;
extern acl::master_int_tbl var_conf_int_tab[];
extern acl::master_int64_tbl var_conf_int64_tab[];
extern acl::redis_client_cluster var_redis;
extern std::map<acl::string, bool> var_main_service_list;

View File

@ -1,20 +1,84 @@
#include "stdafx.h"
#include "master_service.h"
#include "configure.h"
#include "tcp_service.h"
#include "udp_service.h"
int main(int argc, char* argv[])
static void init_configure(acl::master_base& base)
{
// ³õʼ»¯ acl ¿â
acl::acl_cpp_init();
master_service& ms = acl::singleton2<master_service>::get_instance();
// 设置配置参数表
ms.set_cfg_int(var_conf_int_tab);
ms.set_cfg_int64(var_conf_int64_tab);
ms.set_cfg_str(var_conf_str_tab);
ms.set_cfg_bool(var_conf_bool_tab);
base.set_cfg_int(var_conf_int_tab);
base.set_cfg_int64(var_conf_int64_tab);
base.set_cfg_str(var_conf_str_tab);
base.set_cfg_bool(var_conf_bool_tab);
}
// ¿ªÊ¼ÔËÐÐ
static void run_tcp_service(int argc, char* argv[])
{
tcp_service& ts = acl::singleton2<tcp_service>::get_instance();
init_configure(ts);
if (argc >= 2 && strcmp(argv[1], "alone") == 0)
{
// 日志输出至标准输出
acl::log::stdout_open(true);
// 监听的地址列表格式ip:port1,ip:port2,...
const char* addrs = ":8888";
printf("listen on: %s\r\n", addrs);
// 测试时设置该值 > 0 则指定服务器处理客户端连接过程的
// 会话总数(一个连接从接收到关闭称之为一个会话),当
// 处理的连接会话数超过此值,测试过程结束;如果该值设
// 为 0则测试过程永远不结束
unsigned int count = 0;
// 测试过程中指定线程池最大线程个数
unsigned int max_threads = 100;
// 单独运行方式
if (argc >= 3)
ts.run_alone(addrs, argv[2], count, max_threads);
else
ts.run_alone(addrs, NULL, count, max_threads);
printf("Enter any key to exit now\r\n");
getchar();
}
else
{
#ifdef WIN32
// 日志输出至标准输出
acl::log::stdout_open(true);
// 监听的地址列表格式ip:port1,ip:port2,...
const char* addrs = "127.0.0.1:8888";
printf("listen on: %s\r\n", addrs);
// 测试时设置该值 > 0 则指定服务器处理客户端连接过程的
// 会话总数(一个连接从接收到关闭称之为一个会话),当
// 处理的连接会话数超过此值,测试过程结束;如果该值设
// 为 0则测试过程永远不结束
unsigned int count = 0;
// 测试过程中指定线程池最大线程个数
unsigned int max_threads = 100;
// 单独运行方式
ts.run_alone(addrs, NULL, count, max_threads);
printf("Enter any key to exit now\r\n");
getchar();
#else
// acl_master 控制模式运行
ts.run_daemon(argc, argv);
#endif
}
}
static void run_udp_service(int argc, char* argv[])
{
udp_service& us = acl::singleton2<udp_service>::get_instance();
init_configure(us);
if (argc >= 2 && strcmp(argv[1], "alone") == 0)
{
@ -33,9 +97,9 @@ int main(int argc, char* argv[])
// 单独运行方式
if (argc >= 3)
ms.run_alone(addrs, argv[2], count);
us.run_alone(addrs, argv[2], count);
else
ms.run_alone(addrs, NULL, count);
us.run_alone(addrs, NULL, count);
printf("Enter any key to exit now\r\n");
getchar();
@ -47,7 +111,7 @@ int main(int argc, char* argv[])
acl::log::stdout_open(true);
// 监听的地址列表格式ip:port1,ip:port2,...
const char* addrs = "127.0.0.1:8888";
const char* addrs = "0.0.0.0:8390";
printf("bind on: %s\r\n", addrs);
// 测试时设置该值 > 0 则指定服务器处理客户端连接过程的
@ -57,14 +121,50 @@ int main(int argc, char* argv[])
unsigned int count = 0;
// 单独运行方式
ms.run_alone(addrs, NULL, count);
us.run_alone(addrs, NULL, count);
printf("Enter any key to exit now\r\n");
getchar();
#else
// acl_master 控制模式运行
ms.run_daemon(argc, argv);
us.run_daemon(argc, argv);
#endif
}
}
static bool check_service(int argc, char* argv[])
{
bool is_tcp_service = false;
for (int i = 0; i < argc; i++)
{
if (argv[i][0] != '-')
continue;
switch (argv[i][1])
{
case 'M':
if (i + 1 < argc && !strcasecmp(argv[i + 1], "tcp"))
{
is_tcp_service = true;
break;
}
default:
break;
}
}
return is_tcp_service;
}
int main(int argc, char* argv[])
{
// 初始化 acl 库
acl::acl_cpp_init();
bool tcp_mode = check_service(argc, argv);
if (tcp_mode)
run_tcp_service(argc, argv);
else
run_udp_service(argc, argv);
return 0;
}

View File

@ -63,3 +63,6 @@
extern acl::redis_client_cluster var_redis;
extern std::map<acl::string, bool> var_main_service_list;
extern char* var_cfg_main_service_list;
#define DBG_BASE 100
#define DBG_NET (DBG_BASE + 1)

View File

@ -11,28 +11,47 @@ tcp_service::~tcp_service(void)
{
}
bool tcp_service::thread_on_read(acl::socket_stream* stream)
bool tcp_service::thread_on_read(acl::socket_stream* conn)
{
int n;
char buf[1500];
acl::tcp_reader reader(*conn);
if ((n = stream->read(buf, sizeof(buf) - 1, false)) == -1)
acl::string buf;
if (reader.read(buf) == false) {
logger("read over from %s", conn->get_peer());
return false;
}
logger("read from %s, %d bytes, buf=|%s|",
conn->get_peer(), (int) buf.size(), buf.c_str());
buf[n] = 0;
logger("read from %s, %d bytes, buf=|%s|", stream->get_peer(), n, buf);
const char* peer_ip = stream->get_peer();
const char* peer_ip = conn->get_peer();
if (peer_ip == NULL || *peer_ip == 0) {
logger_error("can't get peer ip");
return false;
}
guard_action* job = new guard_action(peer_ip, buf);
threads_->execute(job);
job->run();
return true;
}
bool tcp_service::thread_on_accept(acl::socket_stream*)
{
return true;
}
bool tcp_service::thread_on_timeout(acl::socket_stream* conn)
{
logger_debug(DBG_NET, 2, "read timeout from %s, fd: %d",
conn->get_peer(), conn->sock_handle());
return false;
}
void tcp_service::thread_on_close(acl::socket_stream* conn)
{
logger_debug(DBG_NET, 2, "disconnect from %s, fd: %d",
conn->get_peer(), conn->sock_handle());
}
void tcp_service::thread_on_init(void)
{
}
@ -43,11 +62,6 @@ void tcp_service::thread_on_exit(void)
void tcp_service::proc_on_init(void)
{
threads_ = new acl::thread_pool;
threads_->set_limit(var_cfg_threads_max);
threads_->set_idle(var_cfg_threads_idle);
threads_->start();
var_redis.init(NULL, var_cfg_redis_addrs, var_cfg_threads_max,
var_cfg_redis_conn_timeout, var_cfg_redis_rw_timeout);
var_redis.set_password("default", var_cfg_redis_passwd);
@ -70,7 +84,6 @@ void tcp_service::proc_on_init(void)
void tcp_service::proc_on_exit(void)
{
delete threads_;
logger(">>>proc_on_exit<<<");
}

View File

@ -3,7 +3,7 @@
//////////////////////////////////////////////////////////////////////////////
// ÅäÖÃÄÚÈÝÏî
class tcp_service : public acl::master_thread
class tcp_service : public acl::master_threads
{
public:
tcp_service(void);
@ -11,13 +11,16 @@ public:
protected:
// @override
bool thread_on_read(acl::socket_stream* stream);
bool thread_on_read(acl::socket_stream* conn);
// @override
bool thread_on_accept(acl::socket_stream* stream);
bool thread_on_accept(acl::socket_stream* conn);
// @override
void thread_on_close(acl::socket_stream* stream);
void thread_on_close(acl::socket_stream* conn);
// @override
bool thread_on_timeout(acl::socket_stream* conn);
// @override
void thread_on_init(void);
@ -35,5 +38,4 @@ protected:
bool proc_on_sighup(acl::string&);
private:
acl::thread_pool* threads_;
};

View File

@ -1,5 +1,6 @@
#include "stdafx.h"
#include "action/guard_action.h"
#include "configure.h"
#include "udp_service.h"
udp_service::udp_service(void)