acl/lib_acl_cpp/samples/http_server/http_server.cpp

290 lines
5.5 KiB
C++
Raw Normal View History

// http_server.cpp : <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>̨Ӧ<CCA8>ó<EFBFBD><C3B3><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ڵ㡣
//
#include "stdafx.h"
#include <iostream>
#include "rpc_manager.h"
#include "rpc_stats.h"
#include "http_rpc.h"
static int var_data_size = 1024;
//////////////////////////////////////////////////////////////////////////
/**
* <EFBFBD><EFBFBD>ͻ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ļص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
class handle_io : public aio_callback
{
public:
handle_io(aio_socket_stream* client)
: client_(client)
{
http_ = new http_rpc(client_, (unsigned) var_data_size);
}
~handle_io()
{
delete http_;
std::cout << "delete io_callback now ..." << std::endl;
}
bool write_callback()
{
return (true);
}
/**
* ʵ<EFBFBD>ָ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>е<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͻ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ij<EFBFBD>ʱ<EFBFBD>ص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
void close_callback()
{
std::cout << "Closed now." << std::endl;
// <20><><EFBFBD><EFBFBD><EFBFBD>ڴ˴<DAB4>ɾ<EFBFBD><C9BE><EFBFBD>ö<EFBFBD>̬<EFBFBD><CCAC><EFBFBD><EFBFBD><EFBFBD>Ļص<C4BB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Է<EFBFBD>ֹ<EFBFBD>ڴ<EFBFBD>й¶
delete this;
}
/**
* ʵ<EFBFBD>ָ<EFBFBD><EFBFBD><EFBFBD><EFBFBD>е<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͻ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ij<EFBFBD>ʱ<EFBFBD>ص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @return {bool} <EFBFBD><EFBFBD><EFBFBD><EFBFBD> true <EFBFBD><EFBFBD>ʾ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ϣ<EFBFBD><EFBFBD><EFBFBD>رո<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
bool timeout_callback()
{
std::cout << "Timeout ..." << std::endl;
return (false);
}
virtual bool read_wakeup()
{
// <20><><EFBFBD><EFBFBD>״̬
rpc_read_wait_del();
rpc_add();
// <20><><EFBFBD><EFBFBD><ECB2BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȥ<EFBFBD><C8A5><EFBFBD>Ը<EFBFBD><D4B8><EFBFBD><ECB2BD><EFBFBD>ļ<EFBFBD><C4BC><EFBFBD>
client_->disable_read();
// <20><><EFBFBD><EFBFBD>һ<EFBFBD><D2BB> http <20><EFBFBD><E1BBB0><EFBFBD><EFBFBD>
rpc_manager::get_instance().fork(http_);
return true;
}
private:
aio_socket_stream* client_;
http_rpc* http_;
};
//////////////////////////////////////////////////////////////////////////
/**
* <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ļص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
class handle_accept : public aio_accept_callback
{
public:
handle_accept(bool preread)
: preread_(preread)
{
}
~handle_accept()
{
printf(">>io_accept_callback over!\n");
}
/**
* <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӵ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ô˻ص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @param client {aio_socket_stream*} <EFBFBD><EFBFBD>ͻ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @return {bool} <EFBFBD><EFBFBD><EFBFBD><EFBFBD> true <EFBFBD><EFBFBD>֪ͨ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
bool accept_callback(acl::aio_socket_stream* client)
{
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>߳<EFBFBD><DFB3><EFBFBD>Ԥ<EFBFBD><D4A4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ԥ<EFBFBD><D4A4><EFBFBD><EFBFBD>־λ
if (preread_)
{
ACL_VSTREAM* vstream = client->get_vstream();
vstream->flag |= ACL_VSTREAM_FLAG_PREREAD;
}
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͻ<EFBFBD><CDBB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ļص<C4BB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><ECB2BD><EFBFBD><EFBFBD><EFBFBD>а<EFBFBD><D0B0><EFBFBD>
handle_io* callback = new handle_io(client);
// ע<><D7A2><EFBFBD><EFBFBD><ECB2BD><EFBFBD>Ķ<EFBFBD><C4B6>ص<EFBFBD><D8B5><EFBFBD><EFBFBD><EFBFBD>
client->add_read_callback(callback);
// ע<><D7A2><EFBFBD><EFBFBD><ECB2BD><EFBFBD><EFBFBD>д<EFBFBD>ص<EFBFBD><D8B5><EFBFBD><EFBFBD><EFBFBD>
client->add_write_callback(callback);
// ע<><D7A2><EFBFBD><EFBFBD><ECB2BD><EFBFBD>Ĺرջص<D5BB><D8B5><EFBFBD><EFBFBD><EFBFBD>
client->add_close_callback(callback);
// ע<><D7A2><EFBFBD><EFBFBD><ECB2BD><EFBFBD>ij<EFBFBD>ʱ<EFBFBD>ص<EFBFBD><D8B5><EFBFBD><EFBFBD><EFBFBD>
client->add_timeout_callback(callback);
rpc_read_wait_add();
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><ECB2BD><EFBFBD>Ƿ<EFBFBD><C7B7>ɶ<EFBFBD>
client->read_wait(10);
return (true);
}
private:
bool preread_;
};
static void usage(const char* procname)
{
printf("usage: %s \r\n"
" -h[help]\r\n"
" -p[preread in main thread]\r\n"
" -l listen_addr[127.0.0.1:9001]\r\n"
" -m[use mempool]\r\n"
" -k[use kernel engine]\r\n"
" -n data size response\r\n"
" -N thread pool limit\r\n"
" -r rpc_addr\r\n"
" -v[enable stdout]\r\n", procname);
}
int main(int argc, char* argv[])
{
#ifdef WIN32
acl_cpp_init();
#endif
bool preread = false;
char addr[32], rpc_addr[32], ch;
bool use_mempool = false;
bool use_kernel = false;
bool enable_stdout = false;
int nthreads = 20;
snprintf(addr, sizeof(addr), "127.0.0.1:9001");
snprintf(rpc_addr, sizeof(rpc_addr), "127.0.0.1:0");
while ((ch = getopt(argc, argv, "vkhpms:n:N:r:")) > 0)
{
switch (ch)
{
case 'h':
usage(argv[0]);
return 0;
case 's':
snprintf(addr, sizeof(addr), "%s", optarg);
break;
case 'p':
preread = true;
break;
case 'm':
use_mempool = true;
break;
case 'k':
use_kernel = true;
break;
case 'n':
var_data_size = atoi(optarg);
if (var_data_size <= 0)
var_data_size = 1024;
break;
case 'N':
nthreads = atoi(optarg);
if (nthreads <= 0)
nthreads = 10;
break;
case 'v':
enable_stdout = true;
break;
case 'r':
snprintf(rpc_addr, sizeof(rpc_addr), "%s", optarg);
break;
default:
break;
}
}
// <20>Ƿ<EFBFBD><C7B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ֲ߳̾<CCBE><D6B2>ڴ<EFBFBD><DAB4><EFBFBD>
if (use_mempool)
acl_mem_slice_init(8, 1024, 100000,
ACL_SLICE_FLAG_GC2 |
ACL_SLICE_FLAG_RTGC_OFF |
ACL_SLICE_FLAG_LP64_ALIGN);
rpc_stats_init();
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>־<EFBFBD><D6BE>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ļ
if (enable_stdout)
log::stdout_open(true);
// <20>첽ͨ<ECB2BD>ſ<EFBFBD><C5BF>ܾ<EFBFBD><DCBE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> select ϵͳ api
aio_handle* handle = new aio_handle(use_kernel ? ENGINE_KERNEL : ENGINE_SELECT);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><ECB2BD>
aio_listen_stream* sstream = new aio_listen_stream(handle);
// <20><><EFBFBD><EFBFBD>ָ<EFBFBD><D6B8><EFBFBD>ĵ<EFBFBD>ַ
if (sstream->open(addr) == false)
{
std::cout << "open " << addr << " error!" << std::endl;
sstream->close();
// XXX: Ϊ<>˱<EFBFBD>֤<EFBFBD>ܹرռ<D8B1><D5BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ӧ<EFBFBD>ڴ˴<DAB4><CBB4><EFBFBD> check һ<><D2BB>
handle->check();
#ifdef WIN32
getchar();
#endif
return 1;
}
// <20><>ʼ<EFBFBD><CABC><EFBFBD>첽 RPC ͨ<>ŷ<EFBFBD><C5B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
rpc_manager::get_instance().init(handle, nthreads, rpc_addr);
// <20><><EFBFBD><EFBFBD><EFBFBD>ص<EFBFBD><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD>󣬵<EFBFBD><F3A3ACB5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӵ<EFBFBD><D3B5><EFBFBD>ʱ<EFBFBD>Զ<EFBFBD><D4B6><EFBFBD><EFBFBD>ô<EFBFBD><C3B4><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ļص<C4BB><D8B5><EFBFBD><EFBFBD><EFBFBD>
handle_accept callback(preread);
// <20><><EFBFBD>ص<EFBFBD><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><ECB2BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
sstream->add_accept_callback(&callback);
std::cout << "Listen: " << addr << " ok!" << std::endl;
time_t last = time(NULL), now;
while (true)
{
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> false <20><><EFBFBD><EFBFBD>ʾ<EFBFBD><CABE><EFBFBD>ټ<EFBFBD><D9BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD>˳<EFBFBD>
if (handle->check() == false)
{
std::cout << "aio_server stop now ..." << std::endl;
break;
}
time(&now);
if (now - last >= 1)
{
printf("\r\n------------------------------\r\n");
rpc_out(); // <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ǰ rpc <20><><EFBFBD>е<EFBFBD><D0B5><EFBFBD><EFBFBD><EFBFBD>
rpc_req_out();
rpc_read_wait_out();
last = now;
}
}
// <20>رռ<D8B1><D5BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ͷ<EFBFBD><CDB7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
sstream->close();
// <20>ر<EFBFBD> RPC <20><><EFBFBD><EFBFBD>
rpc_manager::get_instance().finish();
// XXX: Ϊ<>˱<EFBFBD>֤<EFBFBD>ܹرռ<D8B1><D5BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ӧ<EFBFBD>ڴ˴<DAB4><CBB4><EFBFBD> check һ<><D2BB>
handle->check();
delete handle;
rpc_stats_finish();
if (use_mempool)
{
acl_mem_slice_gc();
acl_mem_slice_destroy();
}
return 0;
}