mirror of
https://gitee.com/acl-dev/acl.git
synced 2024-12-15 17:30:53 +08:00
260 lines
5.9 KiB
C++
260 lines
5.9 KiB
C++
#include "stdafx.h"
|
||
#include "rpc_manager.h"
|
||
#include "master_service.h"
|
||
|
||
////////////////////////////////////////////////////////////////////////////////
|
||
// 配置内容项
|
||
|
||
char *var_cfg_str;
|
||
acl::master_str_tbl var_conf_str_tab[] = {
|
||
{ "str", "test_msg", &var_cfg_str },
|
||
|
||
{ 0, 0, 0 }
|
||
};
|
||
|
||
int var_cfg_bool;
|
||
acl::master_bool_tbl var_conf_bool_tab[] = {
|
||
{ "bool", 1, &var_cfg_bool },
|
||
|
||
{ 0, 0, 0 }
|
||
};
|
||
|
||
int var_cfg_thread_pool_limit;
|
||
int var_cfg_read_timeout;
|
||
acl::master_int_tbl var_conf_int_tab[] = {
|
||
{ "thread_pool_limit", 100, &var_cfg_thread_pool_limit, 0, 0 },
|
||
{ "read_timeout", 120, &var_cfg_read_timeout, 0, 0 },
|
||
|
||
{ 0, 0 , 0 , 0, 0 }
|
||
};
|
||
|
||
long long int var_cfg_int64;
|
||
acl::master_int64_tbl var_conf_int64_tab[] = {
|
||
{ "int64", 120, &var_cfg_int64, 0, 0 },
|
||
|
||
{ 0, 0 , 0 , 0, 0 }
|
||
};
|
||
|
||
////////////////////////////////////////////////////////////////////////////////
|
||
|
||
class request_rpc : public acl::rpc_request
|
||
{
|
||
public:
|
||
request_rpc(acl::aio_socket_stream* client)
|
||
: io_error_(false)
|
||
, client_(client)
|
||
{
|
||
}
|
||
|
||
~request_rpc()
|
||
{
|
||
}
|
||
|
||
protected:
|
||
// 基类虚函数,由子线程处理
|
||
virtual void rpc_run()
|
||
{
|
||
// 将非阻塞流转为阻塞流
|
||
|
||
// 打开阻塞流对象
|
||
acl::socket_stream stream;
|
||
|
||
// 必须用 get_vstream() 获得的 ACL_VSTREAM 流对象做参数
|
||
// 来打开 stream 对象,因为在 acl_cpp 和 acl 中的阻塞流
|
||
// 和非阻塞流最终都是基于 ACL_VSTREAM,而 ACL_VSTREAM 流
|
||
// 内部维护着了一个读/写缓冲区,所以在长连接的数据处理中,
|
||
// 必须每次将 ACL_VSTREAM 做为内部流的缓冲流来对待
|
||
ACL_VSTREAM* vstream = client_->get_vstream();
|
||
ACL_VSTREAM_SET_RWTIMO(vstream, 10);
|
||
|
||
// 设置为阻塞模式以提高同步写的效率
|
||
acl_non_blocking(ACL_VSTREAM_SOCK(vstream), ACL_BLOCKING);
|
||
|
||
// 打开同步流读写对象
|
||
(void) stream.open(vstream);
|
||
|
||
// 开始处理该请求
|
||
handle_conn(stream);
|
||
|
||
// 将 ACL_VSTREAM 与阻塞流对象解绑定,这样才能保证当释放阻塞流对象时
|
||
// 不会关闭与请求者的连接,因为该连接本身是属于非阻塞流对象的,需要采
|
||
// 用异步流关闭方式进行关闭
|
||
stream.unbind();
|
||
}
|
||
|
||
// 基类虚函数,由主线程处理,收到子线程任务完成的消息
|
||
virtual void rpc_onover()
|
||
{
|
||
// 关闭异步流
|
||
client_->close();
|
||
|
||
// 自销毁
|
||
delete this;
|
||
}
|
||
|
||
/**
|
||
* 基类虚接口:当子线程调用本对象的 rpc_signal 时,在主线程中会
|
||
* 调用本接口,通知在任务未完成前(即调用 rpc_onover 前)收到
|
||
* 子线程运行的中间状态信息;内部自动支持套接口或 WIN32 窗口
|
||
* 消息;应用场景,例如,对于 HTTP 下载应用,在子线程中可以
|
||
* 一边下载,一边向主线程发送(调用 rpc_signal 方法)下载进程,
|
||
* 则主线程会调用本类实例的此方法来处理此消息
|
||
*/
|
||
virtual void rpc_wakeup(void*)
|
||
{
|
||
}
|
||
|
||
private:
|
||
bool io_error_;
|
||
acl::aio_socket_stream* client_;
|
||
|
||
void handle_conn(acl::socket_stream& stream)
|
||
{
|
||
acl::string buf;
|
||
|
||
// 将异步流转为同步流后,以阻塞方式对该连接池进行读写
|
||
|
||
if (stream.gets(buf, false) == false)
|
||
{
|
||
logger_warn("gets error!");
|
||
io_error_ = true;
|
||
}
|
||
else if (stream.write(buf) == -1)
|
||
{
|
||
logger_warn("write error!");
|
||
io_error_ = true;
|
||
}
|
||
}
|
||
};
|
||
|
||
////////////////////////////////////////////////////////////////////////////////
|
||
|
||
// acl::aio_callback 虚类的子类定义
|
||
class io_callback : public acl::aio_callback
|
||
{
|
||
public:
|
||
io_callback(acl::aio_socket_stream* client)
|
||
: client_(client)
|
||
{
|
||
}
|
||
|
||
~io_callback()
|
||
{
|
||
}
|
||
|
||
protected:
|
||
/**
|
||
* 实现父类中的虚函数,客户端流的读成功回调过程
|
||
* @param data {char*} 读到的数据地址
|
||
* @param len {int} 读到的数据长度
|
||
* @return {bool} 返回 true 表示继续,否则希望关闭该异步流
|
||
*/
|
||
virtual bool read_callback(char* data, int len)
|
||
{
|
||
if (strncmp(data, "quit", len) == 0)
|
||
{
|
||
// 可以显式地调用异步流的关闭过程,也可以直接返回 false,
|
||
// 通知异步框架自动关闭该异步流
|
||
// client_->close();
|
||
return false;
|
||
}
|
||
return true;
|
||
}
|
||
|
||
virtual bool read_wakeup()
|
||
{
|
||
// 为防止当子线程正处理客户端连接对象过程中连接被关闭,
|
||
// 所以必须从异步监听集合中去掉对该异步流的监控
|
||
// 该连接会在 request_rpc::rpc_onover 中被关闭
|
||
client_->disable_read();
|
||
|
||
// 创建同步流处理过程
|
||
request_rpc* req = new request_rpc(client_);
|
||
|
||
// 将该流的数据处理过程通过 rpc 通道交给子线程处理
|
||
rpc_manager::get_instance().fork(req);
|
||
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* 实现父类中的虚函数,客户端流的写成功回调过程
|
||
* @return {bool} 返回 true 表示继续,否则希望关闭该异步流
|
||
*/
|
||
virtual bool write_callback()
|
||
{
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* 实现父类中的虚函数,客户端流的关闭回调过程
|
||
*/
|
||
virtual void close_callback()
|
||
{
|
||
// 必须在此处删除该动态分配的回调类对象以防止内存泄露
|
||
delete this;
|
||
}
|
||
|
||
/**
|
||
* 实现父类中的虚函数,客户端流的超时回调过程
|
||
* @return {bool} 返回 true 表示继续,否则希望关闭该异步流
|
||
*/
|
||
virtual bool timeout_callback()
|
||
{
|
||
// 返回 false 通知异步框架关闭该异步流
|
||
return false;
|
||
}
|
||
|
||
private:
|
||
acl::aio_socket_stream* client_;
|
||
};
|
||
|
||
////////////////////////////////////////////////////////////////////////////////
|
||
|
||
master_service::master_service()
|
||
{
|
||
}
|
||
|
||
master_service::~master_service()
|
||
{
|
||
}
|
||
|
||
bool master_service::on_accept(acl::aio_socket_stream* client)
|
||
{
|
||
// 创建异步客户端流的回调对象并与该异步流进行绑定
|
||
io_callback* callback = new io_callback(client);
|
||
|
||
// 注册异步流的读回调过程
|
||
client->add_read_callback(callback);
|
||
|
||
// 注册异步流的写回调过程
|
||
client->add_write_callback(callback);
|
||
|
||
// 注册异步流的关闭回调过程
|
||
client->add_close_callback(callback);
|
||
|
||
// 注册异步流的超时回调过程
|
||
client->add_timeout_callback(callback);
|
||
|
||
// 写欢迎信息
|
||
// client->format("hello, you're welcome\r\n");
|
||
|
||
// 监控异步流的读状态,当有数据可读时,回调 acl::aio_callback
|
||
// 中的 read_wakeup 虚函数
|
||
client->read_wait(var_cfg_read_timeout);
|
||
return true;
|
||
}
|
||
|
||
void master_service::proc_on_init()
|
||
{
|
||
// 获得异步框架的事件引擎句柄
|
||
acl::aio_handle* handle = get_handle();
|
||
|
||
// 初始化 rpc 服务对象
|
||
rpc_manager::get_instance().init(handle, var_cfg_thread_pool_limit);
|
||
}
|
||
|
||
void master_service::proc_on_exit()
|
||
{
|
||
}
|