tcp_keeper is ok!

This commit is contained in:
zsx 2019-01-15 11:57:19 +08:00
parent 6e9691397a
commit db2d293a3e
16 changed files with 305 additions and 76 deletions

View File

@ -72,5 +72,8 @@ const char *inet_ntop(int af, const void *src, char *dst, socklen_t size);
#endif /* ACL_PREPARE_COMPILE */
//#define ACL_DEBUG_MIN 0
//#define ACL_DEBUG_MAX 30
#endif

View File

@ -56,3 +56,5 @@
//#if defined(_WIN32) || defined(_WIN64)
#include "acl_cpp/lib_acl.hpp"
//#endif
//#define ACL_CPP_DEBUG_MIN 40
//#define ACL_CPP_DEBUG_MAX 70

View File

@ -1,4 +1,7 @@
103) 2019.1.15
103.1) feature: tcp_keeper 及相关功能类测试 OK
102) 2019.1.2
102.1) workaround: fiber_tbox.hpp 将 free_obj 缺省值设为 true以便与 mbox.hpp
和 tbox.hpp 中的缺省参数保持一致

View File

@ -8,22 +8,79 @@ class keeper_waiter;
class socket_stream;
class thread_mutex;
/**
* 线
* ping rtt 10ms rtt
*
*/
class tcp_keeper : public thread
{
public:
tcp_keeper(void);
~tcp_keeper(void);
/**
*
* @param n {int}
* @return {tcp_keeper&}
*/
tcp_keeper& set_conn_timeout(int n);
/**
* IO
* @param n {int}
* @return {tcp_keeper&}
*/
tcp_keeper& set_rw_timeout(int n);
/**
*
* @param n {int}
* @return {tcp_keeper&}
*/
tcp_keeper& set_conn_min(int n);
/**
*
* @param n {int}
* @return {tcp_keeper&}
*/
tcp_keeper& set_conn_max(int n);
/**
*
* @param ttl {int}
* @return {tcp_keeper&}
*/
tcp_keeper& set_conn_ttl(int ttl);
/**
*
* 便
* @param ttl {int}
* @return {tcp_keeper&}
*/
tcp_keeper& set_pool_ttl(int ttl);
/**
* rtt
*
* @param rtt {double}
* @return {tcp_keeper&}
*/
tcp_keeper& set_rtt_min(double rtt);
/**
* tcp_keeper
* @param addr {const char*} ip:port
* @param hit {bool*}
* @return {socket_stream*} NULL
*/
socket_stream* peek(const char* addr, bool* hit = NULL);
/**
* tcp_keeper 线
*/
void stop(void);
protected:

View File

@ -15,6 +15,7 @@ typedef enum {
ASK_T_IDLE,
ASK_T_STOP,
ASK_T_CONN,
ASK_T_CLOSE,
} ask_type_t;
typedef enum {
@ -29,6 +30,7 @@ public:
keeper_config(void)
: conn_timeo(10)
, rw_timeo(10)
, conn_min(10)
, conn_max(10)
, conn_ttl(10)
, pool_ttl(20) {}
@ -37,11 +39,28 @@ public:
int conn_timeo;
int rw_timeo;
int conn_min;
int conn_max;
int conn_ttl;
int pool_ttl;
};
class ask_req
{
public:
ask_req(ask_type_t type) : type_(type) {}
ask_type_t get_type(void) const
{
return type_;
}
~ask_req(void) {}
private:
ask_type_t type_;
};
class task_req
{
public:

View File

@ -6,8 +6,7 @@ namespace acl {
keeper_conn::keeper_conn(const keeper_config& config, const char* addr,
keeper_link* lk, keeper_conns& pool)
: ask_(ASK_T_NULL)
, conn_(NULL)
: conn_(NULL)
, status_(KEEPER_T_IDLE)
, last_ctime_(0)
, config_(config)
@ -51,14 +50,20 @@ void keeper_conn::stop(void)
logger_warn("fiber is busy, kill it");
this->kill();
}
ask_ = ASK_T_STOP;
box_.push(NULL);
ask_req* ask = new ask_req(ASK_T_STOP);
box_.push(ask);
}
void keeper_conn::ask_connect(void)
void keeper_conn::ask_open(void)
{
ask_ = ASK_T_CONN;
box_.push(NULL);
ask_req* ask = new ask_req(ASK_T_CONN);
box_.push(ask);
}
void keeper_conn::ask_close(void)
{
ask_req* ask = new ask_req(ASK_T_CLOSE);
box_.push(ask);
}
void keeper_conn::join(void)
@ -89,10 +94,7 @@ void keeper_conn::run(void)
if (task_) {
double n = task_->get_cost();
if (n >= 50) {
const struct timeval& last = task_->get_stamp();
printf(">>>cost %.2f, %p\n", n, task_);
printf("last=%p, sec=%ld, usec=%ld\r\n",
&last, last.tv_sec, last.tv_usec);
logger_warn("task schedule cost %.2f", n);
}
handle_task(*task_);
delete this;
@ -101,31 +103,32 @@ void keeper_conn::run(void)
connect_one();
#ifndef USE_SBOX
int timeo = config_.conn_ttl > 0 ? config_.conn_ttl * 1000 : -1;
#endif
while (true) {
bool found;
#ifdef USE_SBOX
task_req* task = box_.pop(&found);
#else
task_req* task = box_.pop(timeo, &found);
#endif
assert(task == NULL);
ask_req* ask = box_.pop();
if (ask_ == ASK_T_STOP) {
assert(ask != NULL);
ask_type_t type = ask->get_type();
delete ask;
if (type == ASK_T_STOP) {
break;
} else if (ask_ == ASK_T_CONN) {
}
if (type == ASK_T_CONN) {
if (conn_ == NULL) {
connect_one();
} else {
printf("connecting ...\n");
logger_debug(FIBER_DEBUG_KEEPER, 1,
"[debug]: connecting ...");
}
} else if (type == ASK_T_CLOSE) {
if (conn_ != NULL) {
delete conn_;
conn_ = NULL;
status_ = KEEPER_T_IDLE;
}
} else if (!found) {
check_idle();
} else {
printf("invalid ask=%d, task=%p\n", ask_, task);
abort();
logger_fatal("invalid ask=%d", type);
}
}
@ -150,8 +153,6 @@ void keeper_conn::handle_task(task_req& task)
status_ = KEEPER_T_IDLE;
}
if (conn_cost_ >= 1000)
printf("too long const=%.2f\r\n", conn_cost_);
task.set_conn_cost(conn_cost_);
task.put(conn);
}
@ -160,6 +161,7 @@ void keeper_conn::connect_one(void)
{
assert(conn_ == NULL);
status_ = KEEPER_T_BUSY;
conn_ = new socket_stream;
struct timeval begin;
gettimeofday(&begin, NULL);
@ -168,10 +170,8 @@ void keeper_conn::connect_one(void)
gettimeofday(&end, NULL);
conn_cost_ = acl::stamp_sub(end, begin);
// only for test
if (conn_cost_ >= 1000) {
printf("%s(%d): spent: %.2f ms, addr=%s\n",
__FUNCTION__, __LINE__, conn_cost_, addr_.c_str());
logger_warn("cost: %.2f ms, addr=%s", conn_cost_, addr_.c_str());
}
if (ret) {
@ -188,19 +188,6 @@ void keeper_conn::connect_one(void)
}
}
void keeper_conn::check_idle(void)
{
if (config_.conn_ttl <= 0) {
return;
}
time_t now = time(NULL);
if ((now - last_ctime_) >= config_.conn_ttl) {
delete conn_;
conn_ = NULL;
status_ = KEEPER_T_IDLE;
}
}
void keeper_conn::done(void)
{
tbox_ctl_.push(NULL);

View File

@ -20,10 +20,17 @@ public:
socket_stream* peek(double& cost);
public:
void ask_open(void);
void ask_close(void);
void stop(void);
void ask_connect(void);
void join(void);
time_t get_last_ctime(void) const
{
return last_ctime_;
}
bool is_ready(void) const
{
return status_ == KEEPER_T_READY;
@ -46,18 +53,16 @@ private:
void run(void);
void handle_task(task_req& task);
void connect_one(void);
void check_idle(void);
void done(void);
private:
#ifdef USE_SBOX
fiber_sbox<task_req> box_;
fiber_sbox<ask_req> box_;
#else
fiber_tbox<task_req> box_;
fiber_tbox<ask_req> box_;
#endif
fiber_tbox<keeper_conn> tbox_ctl_;
ask_type_t ask_;
socket_stream* conn_;
keeper_status_t status_;
time_t last_ctime_;

View File

@ -7,6 +7,7 @@ namespace acl {
keeper_conns::keeper_conns(const keeper_config& config, const char* addr)
: last_peek_(0)
, last_trigger_(0)
, config_(config)
, addr_(addr)
{
@ -34,6 +35,27 @@ void keeper_conns::on_connect(socket_stream& conn, keeper_link* lk)
}
}
void keeper_conns::trigger_more(void)
{
time(&last_trigger_);
int n = 0;
ACL_RING_ITER iter;
acl_ring_foreach_reverse(iter, &linker_) {
keeper_link* lk = acl_ring_to_appl(iter.ptr, keeper_link, me);
if (!lk->fb->is_idle()) {
// lk->fb->print_status();
continue;
}
lk->fb->ask_open();
n++;
if (config_.conn_min > 0 && n >= config_.conn_min) {
break;
}
}
logger_debug(FIBER_DEBUG_KEEPER, 1, "[debug] trigger open count=%d", n);
}
void keeper_conns::add_task(task_req& task)
{
keeper_conn* fb = peek_ready();
@ -44,9 +66,9 @@ void keeper_conns::add_task(task_req& task)
assert(conn);
task.set_conn_cost(cost);
task.put(conn);
fb->ask_connect();
if (0)
printf(">>>ready hit\r\n");
fb->ask_open();
logger_debug(FIBER_DEBUG_KEEPER, 3, "[debug] addr=%s, hit",
addr_.c_str());
} else {
task.set_hit(false);
fb = new keeper_conn(config_, addr_, NULL, *this);
@ -54,9 +76,12 @@ void keeper_conns::add_task(task_req& task)
fb->set_task(task);
fb->start();
if (0)
printf(">>>>trigger connect one, hit=%d\r\n",
this->debug_check());
logger_debug(FIBER_DEBUG_KEEPER, 3, "[debug] addr=%s, miss",
addr_.c_str());
if (time(NULL) - last_trigger_ >= 1) {
trigger_more();
}
}
}
@ -73,9 +98,8 @@ void keeper_conns::join(void)
bool keeper_conns::empty(void) const
{
ACL_RING_ITER iter;
keeper_link* lk;
acl_ring_foreach(iter, &linker_) {
lk = acl_ring_to_appl(iter.ptr, keeper_link, me);
keeper_link* lk = acl_ring_to_appl(iter.ptr, keeper_link, me);
if (lk->fb->is_ready()) {
return false;
}
@ -86,7 +110,8 @@ bool keeper_conns::empty(void) const
void keeper_conns::run(void)
{
int timeo = config_.conn_ttl > 0 ? config_.conn_ttl * 1000 : -1;
//int timeo = config_.conn_ttl > 0 ? config_.conn_ttl * 1000 : -1;
int timeo = 1000;
while (true) {
bool found;
@ -95,14 +120,37 @@ void keeper_conns::run(void)
if (found) {
break;
}
continue;
}
check_idle();
}
stop_all();
done();
}
void keeper_conns::check_idle(void)
{
int n = 0;
ACL_RING_ITER iter;
acl_ring_foreach(iter, &linker_) {
keeper_link* lk = acl_ring_to_appl(iter.ptr, keeper_link, me);
if (!lk->fb->is_ready()) {
break;
}
time_t now = time(NULL);
time_t ctime = lk->fb->get_last_ctime();
if (ctime > 0 && (now - ctime) >= config_.conn_ttl) {
lk->fb->ask_close();
n++;
}
}
logger_debug(FIBER_DEBUG_KEEPER, 2, "[debug] addr=%s, closed count=%d",
addr_.c_str(), n);
}
int keeper_conns::debug_check(void)
{
ACL_RING_ITER iter;

View File

@ -35,12 +35,15 @@ private:
keeper_conn* peek_ready(void);
void stop_all(void);
void done(void);
void check_idle(void);
void trigger_more(void);
private:
fiber_tbox<task_req> tbox_;
fiber_tbox<keeper_conns> tbox_ctl_;
ACL_RING linker_;
time_t last_peek_;
time_t last_trigger_;
const keeper_config& config_;
string addr_;

View File

@ -28,6 +28,12 @@ keeper_waiter& keeper_waiter::set_rw_timeout(int n)
return *this;
}
keeper_waiter& keeper_waiter::set_conn_min(int n)
{
config_->conn_min = n;
return *this;
}
keeper_waiter& keeper_waiter::set_conn_max(int n)
{
config_->conn_max = n;

View File

@ -22,6 +22,7 @@ public:
keeper_waiter& set_conn_timeout(int n);
keeper_waiter& set_rw_timeout(int n);
keeper_waiter& set_conn_min(int n);
keeper_waiter& set_conn_max(int n);
keeper_waiter& set_conn_ttl(int ttl);
keeper_waiter& set_pool_ttl(int ttl);

View File

@ -3,3 +3,6 @@
#include "lib_acl.h"
#include "acl_cpp/lib_acl.hpp"
#include "fiber/libfiber.h"
#define FIBER_DEBUG_MIN 71
#define FIBER_DEBUG_KEEPER (FIBER_DEBUG_MIN + 1)

View File

@ -30,9 +30,16 @@ tcp_keeper& tcp_keeper::set_rw_timeout(int n)
return *this;
}
tcp_keeper& tcp_keeper::set_conn_min(int n)
{
assert(n >= 0);
waiter_->set_conn_min(n);
return *this;
}
tcp_keeper& tcp_keeper::set_conn_max(int n)
{
assert (n >= 0);
assert(n >= 0);
waiter_->set_conn_max(n);
return *this;
}
@ -128,7 +135,9 @@ socket_stream* tcp_keeper::peek(const char* addr, bool* hit /* = NULL */)
if (0)
printf("remove %s, cost=%.2f > %.2f\r\n", addr,
cost, rtt_min_);
remove(addr);
if (found) {
remove(addr);
}
}
return conn;
}

View File

@ -3,8 +3,10 @@
#include <stdlib.h>
acl::atomic_long __count;
acl::atomic_long __hit;
static int __step = 10000;
static int __loop = 1;
static struct timeval __begin;
class fiber_client : public acl::fiber
{
@ -19,13 +21,47 @@ public:
~fiber_client(void) {}
private:
acl::socket_stream* peek(void)
{
#if 1
bool reused;
//acl_doze(5);
acl::socket_stream* conn = keeper_.peek(addr_, &reused);
if (reused) {
++__hit;
}
return conn;
#else
acl::socket_stream* conn = new acl::socket_stream;
if (conn->open(addr_, 10, 10) == false) {
printf("connect %s error %s\r\n",
addr_.c_str(), acl::last_serror());
delete conn;
return NULL;
}
return conn;
#endif
}
// @override
void run(void)
{
printf("fiber-%d running\r\n", acl::fiber::self());
for (int i = 0; i < max_; i++) {
acl::socket_stream* conn = keeper_.peek(addr_);
struct timeval begin;
gettimeofday(&begin, NULL);
acl::socket_stream* conn = peek();
struct timeval end;
gettimeofday(&end, NULL);
double spent = acl::stamp_sub(end, begin);
if (spent >= 1000) {
printf("%s(%d): spent: %.2f ms\r\n",
__FILE__, __LINE__, spent);
}
if (conn == NULL) {
printf("peek connection error=%s\r\n",
acl::last_serror());
@ -38,8 +74,24 @@ private:
}
if (--counter_ == 0) {
printf("all fiber_client over!\r\n");
struct timeval end;
gettimeofday(&end, NULL);
double spent = acl::stamp_sub(end, __begin);
double speed = 1000 * __count / (spent >= 1 ? spent : 1);
double hit_ratio;
if (__count == 0) {
hit_ratio = 0.0;
} else {
hit_ratio = ((double) (__hit.value()) * 100)
/ (double) __count.value();
}
printf("hit=%lld, count=%lld\r\n", __hit.value(), __count.value());
printf("all fiber_client over! total count=%lld, "
"speed=%.2f, hit=%lld, hit_ratio=%.2f%%\r\n",
__count.value(), speed, __hit.value(), hit_ratio);
}
printf("counter=%d\r\n", counter_);
}
void doit(acl::socket_stream& conn)
@ -47,6 +99,7 @@ private:
const char s[] = "hello world!\r\n";
acl::string buf;
for (int i = 0; i < __loop; i++) {
#if 1
if (conn.write(s, sizeof(s) - 1) == -1) {
printf("write error %s\r\n", acl::last_serror());
break;
@ -56,8 +109,10 @@ private:
printf("gets error %s\r\n", acl::last_serror());
break;
}
#endif
if (++__count % __step == 0) {
++__count;
if (__count % __step == 0) {
char tmp[256];
snprintf(tmp, sizeof(tmp), "addr=%s, fiber-%d,"
" gets line=[%s], n=%lld",
@ -75,6 +130,22 @@ private:
int max_;
};
class fiber_sleep : public acl::fiber
{
public:
fiber_sleep(void) {}
~fiber_sleep(void) {}
private:
// @override
void run(void)
{
while (true) {
sleep(1);
}
}
};
class thread_client : public acl::thread
{
public:
@ -102,6 +173,11 @@ private:
fb->start();
}
#if 0
acl::fiber* fb = new fiber_sleep;
fb->start();
#endif
acl::fiber::schedule();
for (std::vector<acl::fiber*>::iterator it = fibers.begin();
@ -124,8 +200,8 @@ static void usage(const char* procname)
{
printf("usage: %s -h [help]\r\n"
" -s server_addrs\r\n"
" -n fibers_count[default: 10]\r\n"
" -m max_loop[default: 10]\r\n"
" -c fibers_count[default: 10]\r\n"
" -n max_loop[default: 10]\r\n"
" -i step[default: 10000]\r\n"
" -l loop for one connection[default: 1]\r\n"
, procname);
@ -149,8 +225,9 @@ int main(int argc, char *argv[])
acl::acl_cpp_init();
acl::log::stdout_open(true);
acl::fiber::stdout_open(true);
while ((ch = getopt(argc, argv, "hs:n:m:i:l:")) > 0) {
while ((ch = getopt(argc, argv, "hs:c:n:i:l:")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
@ -160,10 +237,10 @@ int main(int argc, char *argv[])
append_addrs(addrs_buf, addrs);
addrs_buf.clear();
break;
case 'n':
case 'c':
n = atoi(optarg);
break;
case 'm':
case 'n':
max = atoi(optarg);
break;
case 'i':
@ -181,15 +258,20 @@ int main(int argc, char *argv[])
append_addrs(addrs_buf, addrs);
}
acl::fiber::stdout_open(true);
acl::log::debug_init("all:2");
acl::tcp_keeper keeper;
keeper.set_rtt_min(0);
keeper.set_conn_timeout(10)
.set_rw_timeout(10)
.set_conn_max(10)
.set_conn_ttl(10)
.set_conn_min(10)
.set_conn_max(200)
.set_conn_ttl(5)
.set_pool_ttl(10);
keeper.start();
gettimeofday(&__begin, NULL);
std::vector<acl::thread*> threads;
for (std::vector<acl::string>::const_iterator cit = addrs.begin();

View File

@ -1,4 +1,3 @@
#!/bin/sh
#valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes --max-stackframe=3426305034400000 -v ./fiber -n 10 -m 20
valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes -v ./tcp_keeper -s 127.0.0.1:8001 -n 10 -m 10 -i 10000 -l 1
valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes -v ./tcp_keeper -s 127.0.0.1:5300 -c 10 -n 1000

View File

@ -19,4 +19,6 @@ int proto_securev_snprintf(char *buf, size_t size, const char *fmt, va_list ap);
# endif
# endif /* WIN2/WIN64 */
#include "lib_acl.h"
//#define PRO_DEBUG_MIN 30
//#define PRO_DEBUG_MAX 40
#endif