add sample tcp_server & tcp_client in lib_fiber.

This commit is contained in:
zhengshuxin 2017-08-09 16:04:54 +08:00
parent fd56ed2b85
commit dcaec58d47
14 changed files with 511 additions and 1 deletions

View File

@ -16,6 +16,15 @@ public:
tcp_client(const char* addr, int conn_timeout = 10, int rw_timeout = 10);
virtual ~tcp_client(void);
/**
*
* @param {const void*}
* @param len {unsigned int}
* @param out {string*} NULL
* NULL
*
* @return {bool}
*/
bool send(const void* data, unsigned int len, string* out = NULL);
protected:

View File

@ -19,20 +19,106 @@ class tcp_manager;
class tcp_pool;
class string;
class tcp_ipc
/**
* tcp_manager
*
*/
class ACL_CPP_API tcp_ipc
{
public:
tcp_ipc(void);
~tcp_ipc(void);
/**
*
* @param max {int} <= 0
* @return {tcp_ipc&}
*/
tcp_ipc& set_limit(int max);
/**
*
* @param ttl {int}
* @return {tcp_ipc&}
*/
tcp_ipc& set_idle(int ttl);
/**
*
* @param conn_timeout {int}
* @return {tcp_ipc&}
*/
tcp_ipc& set_conn_timeout(int conn_timeout);
/**
*
* @param timeout {int}
* @return {tcp_ipc&}
*/
tcp_ipc& set_rw_timeout(int timeout);
/**
* TCP
* @return {tcp_manager&}
*/
tcp_manager& get_manager(void) const;
/**
*
* @param addr {const char*} IP:PORT
* @return {tcp_ipc&}
*/
tcp_ipc& add_addr(const char* addr);
/**
*
*
*
* @param addr {const char*} IP:PORT
* @return {tcp_ipc&}
*/
tcp_ipc& del_addr(const char* addr);
/**
*
* @param addr {const char*} IP:PORT
* @return {bool}
*/
bool addr_exist(const char* addr);
/**
*
* @param addrs {std::vector<string>&}
*/
void get_addrs(std::vector<string>& addrs);
/**
*
* @param addr {const char*}
* @param {const void*}
* @param len {unsigned int}
* @param out {string*} NULL
* NULL
*
* @return {bool}
*/
bool send(const char* addr, const void* data, unsigned int len,
string* out = NULL);
/**
*
* @param {const void*}
* @param len {unsigned int}
* @param exclusive {bool} 广线线
*
* @param check_result {bool}
* @param nerr {unsigned *} NULL
* @return {size_t}
*/
size_t broadcast(const void* data, unsigned int len,
bool exclusive = true, bool check_result = false,
unsigned* nerr = NULL);
private:
tcp_manager* manager_;
int max_;

View File

@ -14,6 +14,15 @@ public:
tcp_pool(const char* addr, size_t count, size_t idx = 0);
virtual ~tcp_pool(void);
/**
*
* @param {const void*}
* @param len {unsigned int}
* @param out {string*} NULL
* NULL
*
* @return {bool}
*/
bool send(const void* data, unsigned int len, string* out = NULL);
protected:

View File

@ -18,12 +18,19 @@ namespace acl
class socket_stream;
class string;
/**
* tcp ipc
*/
class ACL_CPP_API tcp_reader
{
public:
tcp_reader(socket_stream& conn);
~tcp_reader(void) {}
/**
*
* @param out {string&} out
*/
bool read(string& out);
private:

View File

@ -20,12 +20,21 @@ namespace acl
class socket_stream;
/**
* tcp ipc
*/
class ACL_CPP_API tcp_sender
{
public:
tcp_sender(socket_stream& conn);
~tcp_sender(void);
/**
*
* @param data {const void*}
* @param len {unsigned int}
* @return {bool}
*/
bool send(const void* data, unsigned int len);
private:

View File

@ -72,6 +72,8 @@ int main(int argc, char* argv[])
printf("listen on %s error %s\r\n", addr.c_str(), acl::last_serror());
return 1;
}
else
printf("listen on %s ok\r\n", addr.c_str());
__atomic = acl_atomic_new();
acl_atomic_set(__atomic, &__count);

View File

@ -61,6 +61,42 @@ tcp_ipc& tcp_ipc::set_rw_timeout(int timeout)
return *this;
}
tcp_manager& tcp_ipc::get_manager(void) const
{
acl_assert(manager_);
return *manager_;
}
tcp_ipc& tcp_ipc::add_addr(const char* addr)
{
manager_->set(addr, max_, conn_timeout_, rw_timeout_);
return *this;
}
tcp_ipc& tcp_ipc::del_addr(const char* addr)
{
manager_->remove(addr);
return *this;
}
bool tcp_ipc::addr_exist(const char* addr)
{
return manager_->get(addr) != NULL;
}
void tcp_ipc::get_addrs(std::vector<string>& addrs)
{
manager_->lock();
std::vector<connect_pool*>& pools = manager_->get_pools();
for (std::vector<connect_pool*>::const_iterator cit = pools.begin();
cit != pools.end(); ++cit)
{
addrs.push_back((*cit)->get_addr());
}
manager_->unlock();
}
bool tcp_ipc::send(const char* addr, const void* data, unsigned int len,
string* out /* = NULL */)
{
@ -92,4 +128,32 @@ bool tcp_ipc::send(tcp_pool& pool, const void* data, unsigned int len,
return true;
}
size_t tcp_ipc::broadcast(const void* data, unsigned int len,
bool exclusive /* = true */, bool check_result /* = false */,
unsigned* nerr /* = NULL */)
{
size_t n = 0;
if (exclusive)
manager_->lock();
string dummy;
std::vector<connect_pool*>& pools = manager_->get_pools();
for (std::vector<connect_pool*>::iterator it = pools.begin();
it != pools.end(); ++it)
{
tcp_pool* pool = (tcp_pool*) (*it);
if (send(*pool, data, len, check_result ? &dummy : NULL))
n++;
else if (nerr)
(*nerr)++;
dummy.clear();
}
if (exclusive)
manager_->unlock();
return n;
}
} // namespace acl

View File

@ -33,6 +33,8 @@ all:
# @(cd https_server; make)
@(cd mysql; make)
@(cd fiber_local; make)
@(cd tcp_server; make)
@(cd tcp_client; make)
cl clean:
@(cd dns; make clean)
@ -68,5 +70,7 @@ cl clean:
@(cd https_server; make clean)
@(cd mysql; make clean)
@(cd fiber_local; make clean)
@(cd tcp_server; make clean)
@(cd tcp_client; make clean)
rebuild rb: clean all

View File

@ -0,0 +1,2 @@
include ../Makefile_cpp.in
PROG = tcp_client

View File

@ -0,0 +1,158 @@
#include "stdafx.h"
#include <string.h>
class thread_fiber : public acl::thread
{
public:
thread_fiber(acl::tcp_ipc& ipc, const char* addr,
int count, int n, bool ipc_mode)
: ipc_(ipc)
, addr_(addr)
, count_(count)
, n_(n)
, ipc_mode_(ipc_mode)
{
}
~thread_fiber(void) {}
protected:
void* run(void)
{
if (ipc_mode_)
run_ipc();
else
run_pool();
return NULL;
}
void run_pool(void)
{
acl::tcp_pool pool(addr_, 0);
char* s = (char*) malloc(n_ + 1);
memset(s, 'x', n_);
s[n_] = 0;
for (int i = 0; i < count_; i++)
{
#if 1
if (pool.send(s, n_) == false)
{
printf("send error to %s\r\n", addr_.c_str());
break;
}
#else
acl::tcp_client* conn = (acl::tcp_client*) pool.peek();
if (conn == NULL)
{
printf("peek error from %s\r\n", addr_.c_str());
break;
}
if (conn->send(s, n_) == false)
{
pool.put(conn, false);
printf("send error to %s\r\n", addr_.c_str());
break;
}
pool.put(conn);
#endif
}
}
void run_ipc(void)
{
char* s = (char*) malloc(n_ + 1);
memset(s, 'x', n_);
s[n_] = 0;
for (int i = 0; i < count_; i++)
{
if (ipc_.send(addr_, s, (unsigned) n_) == false)
{
printf("send error\r\n");
break;
}
if (i > 0 && i % 100000 == 0)
{
char info[128];
snprintf(info, sizeof(info), "thread=%lu, i=%d",
acl::thread::thread_self(), i);
acl::meter_time(__FILE__, __LINE__, info);
}
}
free(s);
}
private:
acl::tcp_ipc& ipc_;
acl::string addr_;
int count_;
int n_;
bool ipc_mode_;
};
static void usage(const char* procname)
{
printf("usage: %s -s server\r\n"
" -n count\r\n"
" -l data_size\r\n"
" -i [if ipc_mode]\r\n", procname);
}
int main(int argc, char* argv[])
{
int ch, count = 10, nthreads = 4, n = 10;
bool ipc_mode = false;
acl::string addr("127.0.0.1:8887");
while ((ch = getopt(argc, argv, "hs:n:t:l:i")) > 0)
{
switch (ch)
{
case 'h':
usage(argv[0]);
return 0;
case 's':
addr = optarg;
break;
case 'n':
count = atoi(optarg);
break;
case 't':
nthreads = atoi(optarg);
break;
case 'l':
n = atoi(optarg);
break;
case 'i':
ipc_mode = true;
break;
default:
usage(argv[0]);
return 0;
}
}
acl::tcp_ipc ipc;
std::vector<acl::thread*> threads;
for (int k = 0; k < nthreads; k++)
{
thread_fiber* thread =
new thread_fiber(ipc, addr, count, n, ipc_mode);
thread->set_detachable(false);
threads.push_back(thread);
thread->start();
}
for (std::vector<acl::thread*>::iterator it = threads.begin();
it != threads.end(); ++it)
{
(*it)->wait();
delete *it;
}
return 0;
}

View File

@ -0,0 +1,20 @@
// stdafx.h : 标准系统包含文件的包含文件,
// 或是常用但不常更改的项目特定的包含文件
//
#pragma once
//#include <iostream>
//#include <tchar.h>
// TODO: 在此处引用程序要求的附加头文件
#include "lib_acl.h"
#include "acl_cpp/lib_acl.hpp"
#include "fiber/fiber.hpp"
#ifdef WIN32
#define snprintf _snprintf
#endif

View File

@ -0,0 +1,2 @@
include ../Makefile_cpp.in
PROG = tcp_server

View File

@ -0,0 +1,118 @@
#include "stdafx.h"
#include <stdlib.h>
#include <stdio.h>
#define STACK_SIZE 32000
static int __rw_timeout = 0;
class fiber_client : public acl::fiber
{
public:
fiber_client(acl::socket_stream* conn) : conn_(conn) {}
protected:
// @override
void run(void)
{
acl::string buf;
acl::tcp_reader reader(*conn_);
while (true)
{
if (reader.read(buf) == false)
{
printf("read over %s\r\n", acl::last_serror());
break;
}
buf.clear();
}
delete conn_;
delete this;
}
private:
acl::socket_stream* conn_;
~fiber_client(void) {}
};
class fiber_server : public acl::fiber
{
public:
fiber_server(const char* addr) : addr_(addr) {}
protected:
void run(void)
{
acl::server_socket server;
if (server.open(addr_) == false)
{
printf("open %s error\r\n", addr_.c_str());
exit (1);
}
else
printf("open %s ok\r\n", addr_.c_str());
while (true)
{
acl::socket_stream* conn = server.accept();
if (conn == NULL)
{
printf("accept failed: %s\r\n", acl::last_serror());
break;
}
conn->set_rw_timeout(__rw_timeout);
printf("accept one: %d\r\n", conn->sock_handle());
acl::fiber* fb = new fiber_client(conn);
fb->start();
}
delete this;
}
private:
acl::string addr_;
~fiber_server(void) {}
};
static void usage(const char* procname)
{
printf("usage: %s -h [help] -s listen_addr -r rw_timeout\r\n", procname);
}
int main(int argc, char *argv[])
{
acl::string addr("127.0.0.1:8887");
int ch;
while ((ch = getopt(argc, argv, "hs:r:")) > 0)
{
switch (ch)
{
case 'h':
usage(argv[0]);
return 0;
case 's':
addr = optarg;
break;
case 'r':
__rw_timeout = atoi(optarg);
break;
default:
break;
}
}
acl::acl_cpp_init();
acl::log::stdout_open(true);
acl::fiber* fiber = new fiber_server(addr);
fiber->start(STACK_SIZE);
acl::fiber::schedule();
return 0;
}

View File

@ -0,0 +1,20 @@
// stdafx.h : 标准系统包含文件的包含文件,
// 或是常用但不常更改的项目特定的包含文件
//
#pragma once
//#include <iostream>
//#include <tchar.h>
// TODO: 在此处引用程序要求的附加头文件
#include "lib_acl.h"
#include "acl_cpp/lib_acl.hpp"
#include "fiber/fiber.hpp"
#ifdef WIN32
#define snprintf _snprintf
#endif