acl/lib_acl_cpp/samples/aio/aio_client/main.cpp

335 lines
7.0 KiB
C++
Raw Normal View History

#include <iostream>
#include <assert.h>
#include "lib_acl.h"
#include "acl_cpp/acl_cpp_init.hpp"
#include "acl_cpp/stdlib/string.hpp"
#include "acl_cpp/stdlib/util.hpp"
#include "acl_cpp/stream/aio_handle.hpp"
#include "acl_cpp/stream/aio_socket_stream.hpp"
#ifdef WIN32
# ifndef snprintf
# define snprintf _snprintf
# endif
#endif
using namespace acl;
typedef struct
{
char addr[64];
aio_handle* handle;
int connect_timeout;
int read_timeout;
int nopen_limit;
int nopen_total;
int nwrite_limit;
int nwrite_total;
int nread_total;
int id_begin;
bool debug;
} IO_CTX;
static bool connect_server(IO_CTX* ctx, int id);
/**
* <EFBFBD>ͻ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
class client_io_callback : public aio_open_callback
{
public:
/**
* <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @param ctx {IO_CTX*}
* @param client {aio_socket_stream*} <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @param id {int} <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ID<EFBFBD><EFBFBD>
*/
client_io_callback(IO_CTX* ctx, aio_socket_stream* client, int id)
: client_(client)
, ctx_(ctx)
, nwrite_(0)
, id_(id)
{
}
~client_io_callback()
{
std::cout << ">>>ID: " << id_ << ", io_callback deleted now!" << std::endl;
}
/**
* <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱ<EFBFBD><EFBFBD><EFBFBD>ô˻ص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @param data {char*} <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݵ<EFBFBD>ַ
* @param len {int<EFBFBD><EFBFBD> <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ݳ<EFBFBD><EFBFBD><EFBFBD>
* @return {bool} <EFBFBD><EFBFBD><EFBFBD>ظ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> true <EFBFBD><EFBFBD>ʾ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʾ<EFBFBD><EFBFBD>Ҫ<EFBFBD>ر<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
bool read_callback(char* data, int len)
{
(void) data;
(void) len;
ctx_->nread_total++;
if (ctx_->debug)
{
if (nwrite_ < 10)
std::cout << "gets(" << nwrite_ << "): " << data;
else if (nwrite_ % 2000 == 0)
std::cout << ">>ID: " << id_ << ", I: "
<< nwrite_ << "; "<< data;
}
// <20><><EFBFBD><EFBFBD><EFBFBD>յ<EFBFBD><D5B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>˳<EFBFBD><CBB3><EFBFBD>Ϣ<EFBFBD><CFA2><EFBFBD><EFBFBD>ҲӦ<D2B2>˳<EFBFBD>
if (acl::strncasecmp_(data, "quit", 4) == 0)
{
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
client_->format("Bye!\r\n");
// <20>ر<EFBFBD><D8B1><EFBFBD><ECB2BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
client_->close();
return (true);
}
if (nwrite_ >= ctx_->nwrite_limit)
{
if (ctx_->debug)
std::cout << "ID: " << id_
<< ", nwrite: " << nwrite_
<< ", nwrite_limit: " << ctx_->nwrite_limit
<< ", quiting ..." << std::endl;
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>˳<EFBFBD><CBB3><EFBFBD>Ϣ
client_->format("quit\r\n");
client_->close();
}
else
{
char buf[256];
snprintf(buf, sizeof(buf), "hello world: %d\n", nwrite_);
client_->write(buf, (int) strlen(buf));
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
//client_->format("hello world: %d\n", nwrite_);
}
return (true);
}
/**
* <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>д<EFBFBD>ɹ<EFBFBD>ʱ<EFBFBD><EFBFBD><EFBFBD>ô˻ص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
* @return {bool} <EFBFBD><EFBFBD><EFBFBD>ظ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> true <EFBFBD><EFBFBD>ʾ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʾ<EFBFBD><EFBFBD>Ҫ<EFBFBD>ر<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
bool write_callback()
{
ctx_->nwrite_total++;
nwrite_++;
// <20>ӷ<EFBFBD><D3B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
client_->gets(ctx_->read_timeout, false);
return (true);
}
/**
* <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ر<EFBFBD>ʱ<EFBFBD><EFBFBD><EFBFBD>ô˻ص<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
void close_callback()
{
if (client_->is_opened() == false)
{
std::cout << "Id: " << id_ << " connect "
<< ctx_->addr << " error: "
<< acl::last_serror();
// <20><><EFBFBD><EFBFBD><EFBFBD>ǵ<EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD>Ӿ<EFBFBD>ʧ<EFBFBD>ܣ<EFBFBD><DCA3><EFBFBD><EFBFBD>˳<EFBFBD>
if (ctx_->nopen_total == 0)
{
std::cout << ", first connect error, quit";
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><ECB2BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD>˳<EFBFBD>״̬ */
client_->get_handle().stop();
}
std::cout << std::endl;
delete this;
return;
}
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><ECB2BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ܼ<EFBFBD><DCBC>ص<EFBFBD><D8B5><EFBFBD><ECB2BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> */
int nleft = client_->get_handle().length();
if (ctx_->nopen_total == ctx_->nopen_limit && nleft == 1)
{
std::cout << "Id: " << id_ << " stop now! nstream: "
<< nleft << std::endl;
/* <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><ECB2BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϊ<EFBFBD>˳<EFBFBD>״̬ */
client_->get_handle().stop();
}
// <20><><EFBFBD><EFBFBD><EFBFBD>ڴ˴<DAB4>ɾ<EFBFBD><C9BE><EFBFBD>ö<EFBFBD>̬<EFBFBD><CCAC><EFBFBD><EFBFBD><EFBFBD>Ļص<C4BB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Է<EFBFBD>ֹ<EFBFBD>ڴ<EFBFBD>й¶
delete this;
}
/**
* <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʱʱ<EFBFBD><EFBFBD><EFBFBD>ô˺<EFBFBD><EFBFBD><EFBFBD>
* @return {bool} <EFBFBD><EFBFBD><EFBFBD>ظ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> true <EFBFBD><EFBFBD>ʾ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʾ<EFBFBD><EFBFBD>Ҫ<EFBFBD>ر<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
bool timeout_callback()
{
std::cout << "Connect " << ctx_->addr << " Timeout ..." << std::endl;
client_->close();
return (false);
}
/**
* <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>, <EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӳɹ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ô˺<EFBFBD><EFBFBD><EFBFBD>
* @return {bool} <EFBFBD><EFBFBD><EFBFBD>ظ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> true <EFBFBD><EFBFBD>ʾ<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʾ<EFBFBD><EFBFBD>Ҫ<EFBFBD>ر<EFBFBD><EFBFBD><EFBFBD><EFBFBD>
*/
bool open_callback()
{
// <20><><EFBFBD>ӳɹ<D3B3><C9B9><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>IO<49><4F>д<EFBFBD>ص<EFBFBD><D8B5><EFBFBD><EFBFBD><EFBFBD>
client_->add_read_callback(this);
client_->add_write_callback(this);
ctx_->nopen_total++;
acl::assert_(id_ > 0);
if (ctx_->nopen_total < ctx_->nopen_limit)
{
// <20><>ʼ<EFBFBD><CABC><EFBFBD><EFBFBD><EFBFBD><EFBFBD>һ<EFBFBD><D2BB><EFBFBD><EFBFBD><EFBFBD>ӹ<EFBFBD><D3B9><EFBFBD>
if (connect_server(ctx_, id_ + 1) == false)
std::cout << "connect error!" << std::endl;
}
// <20><EFBFBD><ECB2BD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
//client_->format("hello world: %d\n", nwrite_);
char buf[256];
snprintf(buf, sizeof(buf), "hello world: %d\n", nwrite_);
client_->write(buf, (int) strlen(buf));
// <20><EFBFBD>ӷ<EFBFBD><D3B7><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ȡһ<C8A1><D2BB><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
client_->gets(ctx_->read_timeout, false);
// <20><>ʾ<EFBFBD><CABE><EFBFBD><EFBFBD><EFBFBD><EFBFBD><ECB2BD><EFBFBD><EFBFBD>
return (true);
}
protected:
private:
aio_socket_stream* client_;
IO_CTX* ctx_;
int nwrite_;
int id_;
};
static bool connect_server(IO_CTX* ctx, int id)
{
// <20><>ʼ<EFBFBD><EFBFBD><ECB2BD><EFBFBD><EFBFBD>Զ<EFBFBD>̷<EFBFBD><CCB7><EFBFBD><EFBFBD><EFBFBD>
aio_socket_stream* stream = aio_socket_stream::open(ctx->handle,
ctx->addr, ctx->connect_timeout);
if (stream == NULL)
{
std::cout << "connect " << ctx->addr << " error!" << std::endl;
std::cout << "stoping ..." << std::endl;
if (id == 0)
ctx->handle->stop();
return (false);
}
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ӻ<EFBFBD><D3BA>Ļص<C4BB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
client_io_callback* callback = new client_io_callback(ctx, stream, id);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӳɹ<D3B3><C9B9>Ļص<C4BB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
stream->add_open_callback(callback);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ʧ<EFBFBD>ܺ<EFBFBD><DCBA>ص<EFBFBD><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
stream->add_close_callback(callback);
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ӳ<EFBFBD>ʱ<EFBFBD>Ļص<C4BB><D8B5><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>
stream->add_timeout_callback(callback);
return (true);
}
static void usage(const char* procname)
{
printf("usage: %s -h[help] -l server_addr -c nconnect"
" -n io_max -k[use kernel event: epoll/kqueue/devpoll"
" -t connect_timeout -d[debug]\n", procname);
}
int main(int argc, char* argv[])
{
bool use_kernel = false;
int ch;
IO_CTX ctx;
memset(&ctx, 0, sizeof(ctx));
ctx.connect_timeout = 5;
ctx.nopen_limit = 10;
ctx.id_begin = 1;
ctx.nwrite_limit = 10;
ctx.debug = false;
snprintf(ctx.addr, sizeof(ctx.addr), "127.0.0.1:9001");
while ((ch = getopt(argc, argv, "hc:n:kl:dt:")) > 0)
{
switch (ch)
{
case 'c':
ctx.nopen_limit = atoi(optarg);
if (ctx.nopen_limit <= 0)
ctx.nopen_limit = 10;
break;
case 'n':
ctx.nwrite_limit = atoi(optarg);
if (ctx.nwrite_limit <= 0)
ctx.nwrite_limit = 10;
break;
case 'h':
usage(argv[0]);
return (0);
case 'k':
use_kernel = true;
break;
case 'l':
snprintf(ctx.addr, sizeof(ctx.addr), "%s", optarg);
break;
case 'd':
ctx.debug = true;
break;
case 't':
ctx.connect_timeout = atoi(optarg);
break;
default:
break;
}
}
acl::meter_time(__FUNCTION__, __LINE__, "-----BEGIN-----");
acl::acl_cpp_init();
aio_handle handle(use_kernel ? ENGINE_KERNEL : ENGINE_SELECT);
ctx.handle = &handle;
if (connect_server(&ctx, ctx.id_begin) == false)
{
std::cout << "enter any key to exit." << std::endl;
getchar();
return (1);
}
std::cout << "Connect " << ctx.addr << " ..." << std::endl;
while (true)
{
// <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> false <20><><EFBFBD><EFBFBD>ʾ<EFBFBD><CABE><EFBFBD>ټ<EFBFBD><D9BC><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ҫ<EFBFBD>˳<EFBFBD>
if (handle.check() == false)
break;
}
acl::string buf;
buf << "total open: " << ctx.nopen_total
<< ", total write: " << ctx.nwrite_total
<< ", total read: " << ctx.nread_total;
acl::meter_time(__FUNCTION__, __LINE__, buf.c_str());
return (0);
}