From 1cd42a9213c397414d4ae4bc8dd48a0ce153c75f Mon Sep 17 00:00:00 2001 From: zsx Date: Mon, 14 Jan 2019 14:34:00 +0800 Subject: [PATCH] optimize tcp_keeper --- lib_fiber/cpp/src/keeper/keeper.hpp | 123 +++++ lib_fiber/cpp/src/keeper/keeper_conn.cpp | 209 ++++++++ lib_fiber/cpp/src/keeper/keeper_conn.hpp | 73 +++ lib_fiber/cpp/src/keeper/keeper_conns.cpp | 192 ++++++++ lib_fiber/cpp/src/keeper/keeper_conns.hpp | 66 +++ lib_fiber/cpp/src/keeper/keeper_waiter.cpp | 147 ++++++ lib_fiber/cpp/src/keeper/keeper_waiter.hpp | 49 ++ lib_fiber/cpp/src/tcp_keeper.cpp | 545 ++++----------------- 8 files changed, 949 insertions(+), 455 deletions(-) create mode 100644 lib_fiber/cpp/src/keeper/keeper.hpp create mode 100644 lib_fiber/cpp/src/keeper/keeper_conn.cpp create mode 100644 lib_fiber/cpp/src/keeper/keeper_conn.hpp create mode 100644 lib_fiber/cpp/src/keeper/keeper_conns.cpp create mode 100644 lib_fiber/cpp/src/keeper/keeper_conns.hpp create mode 100644 lib_fiber/cpp/src/keeper/keeper_waiter.cpp create mode 100644 lib_fiber/cpp/src/keeper/keeper_waiter.hpp diff --git a/lib_fiber/cpp/src/keeper/keeper.hpp b/lib_fiber/cpp/src/keeper/keeper.hpp new file mode 100644 index 000000000..437e94b28 --- /dev/null +++ b/lib_fiber/cpp/src/keeper/keeper.hpp @@ -0,0 +1,123 @@ +#pragma once +#include "fiber/fiber_tbox.hpp" + +namespace acl { + +class keeper_conn; + +struct keeper_link { + ACL_RING me; + keeper_conn* fb; +}; + +typedef enum { + ASK_T_NULL, + ASK_T_IDLE, + ASK_T_STOP, + ASK_T_CONN, +} ask_type_t; + +typedef enum { + KEEPER_T_READY, + KEEPER_T_BUSY, + KEEPER_T_IDLE, +} keeper_status_t; + +class keeper_config +{ +public: + keeper_config(void) + : conn_timeo(10) + , rw_timeo(10) + , conn_max(10) + , conn_ttl(10) + , pool_ttl(20) {} + + ~keeper_config(void) {} + + int conn_timeo; + int rw_timeo; + int conn_max; + int conn_ttl; + int pool_ttl; +}; + +class task_req +{ +public: + task_req(void) + : hit_(false) + , conn_cost_(1000) + { + gettimeofday(&stamp_, NULL); + } + + ~task_req(void) {} + + void set_addr(const char* addr) + { + addr_ = addr; + } + + const char* get_addr(void) const + { + return addr_.c_str(); + } + + void set_hit(bool yes) + { + hit_ = yes; + } + + bool is_hit(void) const + { + return hit_; + } + + void set_stamp(void) + { + gettimeofday(&stamp_, NULL); + } + + const struct timeval& get_stamp(void) const + { + return stamp_; + } + + double get_cost(void) const + { + struct timeval curr; + gettimeofday(&curr, NULL); + return stamp_sub(curr, stamp_); + } + + void set_conn_cost(double cost) + { + conn_cost_ = cost; + } + + double get_conn_cost(void) const + { + return conn_cost_; + } + +public: + socket_stream* pop(void) + { + return tbox_.pop(); + } + + void put(socket_stream* conn) + { + tbox_.push(conn); + } + +private: + bool hit_; + string addr_; + struct timeval stamp_; + fiber_tbox tbox_; + double conn_cost_; +}; + +} // namespace acl diff --git a/lib_fiber/cpp/src/keeper/keeper_conn.cpp b/lib_fiber/cpp/src/keeper/keeper_conn.cpp new file mode 100644 index 000000000..2ab05b098 --- /dev/null +++ b/lib_fiber/cpp/src/keeper/keeper_conn.cpp @@ -0,0 +1,209 @@ +#include "stdafx.hpp" +#include "keeper_conns.hpp" +#include "keeper_conn.hpp" + +namespace acl { + +keeper_conn::keeper_conn(const keeper_config& config, const char* addr, + keeper_link* lk, keeper_conns& pool) +: ask_(ASK_T_NULL) +, conn_(NULL) +, status_(KEEPER_T_IDLE) +, last_ctime_(0) +, config_(config) +, addr_(addr) +, lk_(lk) +, pool_(pool) +, task_(NULL) +, conn_cost_(-1.0) +{ +} + +keeper_conn::~keeper_conn(void) +{ + delete conn_; +} + +void keeper_conn::set_task(task_req& task) +{ + task_ = &task; +} + +socket_stream* keeper_conn::peek(double& cost) +{ + cost = conn_cost_; + if (is_ready()) { + assert(conn_); + socket_stream* conn = new socket_stream; + conn->open(dup(conn_->sock_handle())); + delete conn_; + conn_ = NULL; + status_ = KEEPER_T_IDLE; + return conn; + } + + return NULL; +} + +void keeper_conn::stop(void) +{ + if (is_busy()) { + logger_warn("fiber is busy, kill it"); + this->kill(); + } + ask_ = ASK_T_STOP; + box_.push(NULL); +} + +void keeper_conn::ask_connect(void) +{ + ask_ = ASK_T_CONN; + box_.push(NULL); +} + +void keeper_conn::join(void) +{ + (void) tbox_ctl_.pop(); +} + +void keeper_conn::print_status(void) const +{ + switch (status_) { + case KEEPER_T_BUSY: + printf("fiber status: KEEPER_T_BUSY\r\n"); + break; + case KEEPER_T_IDLE: + printf("fiber status: KEEPER_T_IDLE\r\n"); + break; + case KEEPER_T_READY: + printf("fiber status: KEEPER_T_READY\r\n"); + break; + default: + printf("fiber status: unknown(%d)\r\n", status_); + break; + } +} + +void keeper_conn::run(void) +{ + if (task_) { + double n = task_->get_cost(); + if (n >= 50) { + const struct timeval& last = task_->get_stamp(); + printf(">>>cost %.2f, %p\n", n, task_); + printf("last=%p, sec=%ld, usec=%ld\r\n", + &last, last.tv_sec, last.tv_usec); + } + handle_task(*task_); + delete this; + return; + } + + connect_one(); + +#ifndef USE_SBOX + int timeo = config_.conn_ttl > 0 ? config_.conn_ttl * 1000 : -1; +#endif + while (true) { + bool found; +#ifdef USE_SBOX + task_req* task = box_.pop(&found); +#else + task_req* task = box_.pop(timeo, &found); +#endif + assert(task == NULL); + + if (ask_ == ASK_T_STOP) { + break; + } else if (ask_ == ASK_T_CONN) { + if (conn_ == NULL) { + connect_one(); + } else { + printf("connecting ...\n"); + } + } else if (!found) { + check_idle(); + } else { + printf("invalid ask=%d, task=%p\n", ask_, task); + abort(); + } + } + + done(); +} + +void keeper_conn::handle_task(task_req& task) +{ + if (conn_ == NULL) { + connect_one(); + } + + socket_stream* conn; + if (conn_) { + conn = new socket_stream; + conn->open(dup(conn_->sock_handle())); + delete conn_; + conn_ = NULL; + status_ = KEEPER_T_IDLE; + } else { + conn = NULL; + status_ = KEEPER_T_IDLE; + } + + if (conn_cost_ >= 1000) + printf("too long const=%.2f\r\n", conn_cost_); + task.set_conn_cost(conn_cost_); + task.put(conn); +} + +void keeper_conn::connect_one(void) +{ + assert(conn_ == NULL); + status_ = KEEPER_T_BUSY; + conn_ = new socket_stream; + struct timeval begin; + gettimeofday(&begin, NULL); + bool ret = conn_->open(addr_, config_.conn_timeo, config_.rw_timeo); + struct timeval end; + gettimeofday(&end, NULL); + conn_cost_ = acl::stamp_sub(end, begin); + + // only for test + if (conn_cost_ >= 1000) { + printf("%s(%d): spent: %.2f ms, addr=%s\n", + __FUNCTION__, __LINE__, conn_cost_, addr_.c_str()); + } + + if (ret) { + status_ = KEEPER_T_READY; + last_ctime_ = time(NULL); + pool_.on_connect(*conn_, lk_); + } else { + if (this->killed()) { + logger_warn("I've been killed"); + } + delete conn_; + conn_ = NULL; + status_ = KEEPER_T_IDLE; + } +} + +void keeper_conn::check_idle(void) +{ + if (config_.conn_ttl <= 0) { + return; + } + time_t now = time(NULL); + if ((now - last_ctime_) >= config_.conn_ttl) { + delete conn_; + conn_ = NULL; + status_ = KEEPER_T_IDLE; + } +} + +void keeper_conn::done(void) +{ + tbox_ctl_.push(NULL); +} + +} // namespace acl diff --git a/lib_fiber/cpp/src/keeper/keeper_conn.hpp b/lib_fiber/cpp/src/keeper/keeper_conn.hpp new file mode 100644 index 000000000..080300b11 --- /dev/null +++ b/lib_fiber/cpp/src/keeper/keeper_conn.hpp @@ -0,0 +1,73 @@ +#pragma once +#include "fiber/fiber.hpp" +#include "fiber/fiber_sem.hpp" +#include "fiber/fiber_tbox.hpp" +#include "keeper.hpp" + +namespace acl { + +class keeper_conns; + +class keeper_conn : public fiber +{ +public: + keeper_conn(const keeper_config& config, const char* addr, + keeper_link* lk, keeper_conns& pool); + + ~keeper_conn(void); + + void set_task(task_req& task); + socket_stream* peek(double& cost); + +public: + void stop(void); + void ask_connect(void); + void join(void); + + bool is_ready(void) const + { + return status_ == KEEPER_T_READY; + } + + bool is_busy(void) const + { + return status_ == KEEPER_T_BUSY; + } + + bool is_idle(void) const + { + return status_ == KEEPER_T_IDLE; + } + + void print_status(void) const; + +private: + // @override + void run(void); + void handle_task(task_req& task); + void connect_one(void); + void check_idle(void); + void done(void); + +private: +#ifdef USE_SBOX + fiber_sbox box_; +#else + fiber_tbox box_; +#endif + fiber_tbox tbox_ctl_; + + ask_type_t ask_; + socket_stream* conn_; + keeper_status_t status_; + time_t last_ctime_; + + const keeper_config& config_; + string addr_; + keeper_link* lk_; + keeper_conns& pool_; + task_req* task_; + double conn_cost_; +}; + +} // namespace acl diff --git a/lib_fiber/cpp/src/keeper/keeper_conns.cpp b/lib_fiber/cpp/src/keeper/keeper_conns.cpp new file mode 100644 index 000000000..429605bf1 --- /dev/null +++ b/lib_fiber/cpp/src/keeper/keeper_conns.cpp @@ -0,0 +1,192 @@ +#include "stdafx.hpp" +#include "keeper.hpp" +#include "keeper_conn.hpp" +#include "keeper_conns.hpp" + +namespace acl { + +keeper_conns::keeper_conns(const keeper_config& config, const char* addr) +: last_peek_(0) +, config_(config) +, addr_(addr) +{ + acl_ring_init(&linker_); + + for (int i = 0; i < config_.conn_max; i++) { + keeper_link* lk = new keeper_link; + lk->fb = new keeper_conn(config_, addr_, lk, *this); + ACL_RING_APPEND(&linker_, &lk->me); + lk->fb->start(); + } +} + +keeper_conns::~keeper_conns(void) +{ +} + +void keeper_conns::on_connect(socket_stream& conn, keeper_link* lk) +{ + if (lk) { + ACL_RING_DETACH(&lk->me); + ACL_RING_APPEND(&linker_, &lk->me); + } else { + conn.set_tcp_solinger(true, 0); + } +} + +void keeper_conns::add_task(task_req& task) +{ + keeper_conn* fb = peek_ready(); + if (fb) { + task.set_hit(true); + double cost; + socket_stream* conn = fb->peek(cost); + assert(conn); + task.set_conn_cost(cost); + task.put(conn); + fb->ask_connect(); + if (0) + printf(">>>ready hit\r\n"); + } else { + task.set_hit(false); + fb = new keeper_conn(config_, addr_, NULL, *this); + task.set_stamp(); + fb->set_task(task); + fb->start(); + + if (0) + printf(">>>>trigger connect one, hit=%d\r\n", + this->debug_check()); + } +} + +void keeper_conns::stop(void) +{ + tbox_.push(NULL); +} + +void keeper_conns::join(void) +{ + (void) tbox_ctl_.pop(); +} + +bool keeper_conns::empty(void) const +{ + ACL_RING_ITER iter; + keeper_link* lk; + acl_ring_foreach(iter, &linker_) { + lk = acl_ring_to_appl(iter.ptr, keeper_link, me); + if (lk->fb->is_ready()) { + return false; + } + } + + return true; +} + +void keeper_conns::run(void) +{ + int timeo = config_.conn_ttl > 0 ? config_.conn_ttl * 1000 : -1; + + while (true) { + bool found; + task_req* task = tbox_.pop(timeo, &found); + if (task == NULL) { + if (found) { + break; + } + continue; + } + } + + stop_all(); + done(); +} + +int keeper_conns::debug_check(void) +{ + ACL_RING_ITER iter; + keeper_link* lk; + + int i = 0; + acl_ring_foreach(iter, &linker_) { + lk = acl_ring_to_appl(iter.ptr, keeper_link, me); + i++; + if (lk->fb->is_ready()) { + return i; + } + } + return -1; +} + +keeper_conn* keeper_conns::peek_ready(void) +{ +#if 0 + keeper_link* lk = NULL; + ACL_RING_ITER iter; + acl_ring_foreach(iter, &linker_) { + lk = acl_ring_to_appl(iter.ptr, keeper_link, me); + if (lk->fb->is_ready()) { + break; + } + } +#else + keeper_link* lk = ACL_RING_FIRST_APPL(&linker_, keeper_link, me); +#endif + + if (lk == NULL) { + logger_error("first keeper_conn null"); + return NULL; + } + + if (!lk->fb->is_ready()) { + //lk->fb->print_status(); + return NULL; + } + + ACL_RING_DETACH(&lk->me); + ACL_RING_PREPEND(&linker_, &lk->me); + + return lk->fb; +} + +void keeper_conns::stop_all(void) +{ + while (true) { + ACL_RING* hdr = acl_ring_pop_head(&linker_); + if (hdr == NULL) { + break; + } + keeper_link* lk = acl_ring_to_appl(hdr, keeper_link, me); + lk->fb->stop(); + lk->fb->join(); + delete lk->fb; + delete lk; + } + + acl_ring_init(&linker_); +} + +void keeper_conns::done(void) +{ + tbox_ctl_.push(NULL); +} + +////////////////////////////////////////////////////////////////////////////// + +keeper_killer::keeper_killer(keeper_conns* pool) +: pool_(pool) +{ +} + +keeper_killer::~keeper_killer(void) {} + +void keeper_killer::run(void) +{ + pool_->stop(); + pool_->join(); + delete pool_; + delete this; +} + +} // namespace acl diff --git a/lib_fiber/cpp/src/keeper/keeper_conns.hpp b/lib_fiber/cpp/src/keeper/keeper_conns.hpp new file mode 100644 index 000000000..28f13e959 --- /dev/null +++ b/lib_fiber/cpp/src/keeper/keeper_conns.hpp @@ -0,0 +1,66 @@ +#pragma once +#include "fiber/fiber.hpp" +#include "fiber/fiber_tbox.hpp" + +namespace acl { + +class keeper_config; +class task_req; +class keeper_conn; +class keeper_link; + +class keeper_conns : public fiber +{ +public: + keeper_conns(const keeper_config& config, const char* addr); + ~keeper_conns(void); + + void add_task(task_req& task); + +public: + void stop(void); + void join(void); + bool empty(void) const; + time_t last_peek(void) const + { + return last_peek_; + } + + void on_connect(socket_stream& conn, keeper_link* lk); + +private: + // @override + void run(void); + int debug_check(void); + keeper_conn* peek_ready(void); + void stop_all(void); + void done(void); + +private: + fiber_tbox tbox_; + fiber_tbox tbox_ctl_; + ACL_RING linker_; + time_t last_peek_; + + const keeper_config& config_; + string addr_; +}; + +////////////////////////////////////////////////////////////////////////////// + +class keeper_killer : public fiber +{ +public: + keeper_killer(keeper_conns* pool); + +private: + ~keeper_killer(void); + + // @override + void run(void); + +private: + keeper_conns* pool_; +}; + +} // namespace acl diff --git a/lib_fiber/cpp/src/keeper/keeper_waiter.cpp b/lib_fiber/cpp/src/keeper/keeper_waiter.cpp new file mode 100644 index 000000000..082d4c458 --- /dev/null +++ b/lib_fiber/cpp/src/keeper/keeper_waiter.cpp @@ -0,0 +1,147 @@ +#include "stdafx.hpp" +#include "keeper.hpp" +#include "keeper_conns.hpp" +#include "keeper_waiter.hpp" + +namespace acl { + +keeper_waiter::keeper_waiter(void) +: last_check_(0) +{ + config_ = new keeper_config; +} + +keeper_waiter::~keeper_waiter(void) +{ + delete config_; +} + +keeper_waiter& keeper_waiter::set_conn_timeout(int n) +{ + config_->conn_timeo = n; + return *this; +} + +keeper_waiter& keeper_waiter::set_rw_timeout(int n) +{ + config_->rw_timeo = n; + return *this; +} + +keeper_waiter& keeper_waiter::set_conn_max(int n) +{ + config_->conn_max = n; + return *this; +} + +keeper_waiter& keeper_waiter::set_conn_ttl(int ttl) +{ + config_->conn_ttl = ttl; + return *this; +} + +keeper_waiter& keeper_waiter::set_pool_ttl(int ttl) +{ + config_->pool_ttl = ttl; + return *this; +} + +const keeper_config& keeper_waiter::get_config(void) const +{ + return *config_; +} + +void keeper_waiter::stop(void) +{ + add_task(NULL); +} + +void keeper_waiter::join(void) +{ + (void) tbox_ctl_.pop(); +} + +void keeper_waiter::add_task(task_req* task) +{ + tbox_.push(task); +} + +void keeper_waiter::run(void) +{ + int timeo = config_->pool_ttl > 0 ? config_->pool_ttl * 1000 : -1; + + while (true) { + bool found; + task_req* task = tbox_.pop(timeo, &found); + + check_idle(); + + if (task == NULL) { + if (found) { + break; + } + + continue; + } + + keeper_conns* pool; + + const char* addr = task->get_addr(); + std::map::iterator it = + manager_.find(addr); + if (it == manager_.end()) { + pool = new keeper_conns(*config_, addr); + manager_[addr] = pool; + pool->start(); + } else { + pool = it->second; + } + + task->set_stamp(); + pool->add_task(*task); + } + + for (std::map::iterator it = + manager_.begin(); it != manager_.end(); ++it) { + + it->second->stop(); + it->second->join(); + delete it->second; + } + + done(); +} + +void keeper_waiter::done(void) +{ + tbox_ctl_.push(this); +} + +void keeper_waiter::check_idle(void) +{ + if (config_->pool_ttl <= 0) { + return; + } + + time_t now = time(NULL); + if (now - last_check_ <= 10) { + return; + } + + std::map::iterator it, next; + it = manager_.begin(); + for (next = it; it != manager_.end(); it = next) { + ++next; + time_t n = (now - it->second->last_peek()); + if (n >= config_->pool_ttl && it->second->empty()) { + fiber* fb = new keeper_killer(it->second); + fb->start(); + + manager_.erase(it); + } + } + + last_check_ = time(NULL); +} + +} // namespace acl diff --git a/lib_fiber/cpp/src/keeper/keeper_waiter.hpp b/lib_fiber/cpp/src/keeper/keeper_waiter.hpp new file mode 100644 index 000000000..6854207f0 --- /dev/null +++ b/lib_fiber/cpp/src/keeper/keeper_waiter.hpp @@ -0,0 +1,49 @@ +#pragma once +#include +#include "fiber/fiber.hpp" +#include "fiber/fiber_tbox.hpp" + +namespace acl { + +class keeper_config; +class task_req; +class keeper_conns; + +class keeper_waiter : public fiber +{ +public: + keeper_waiter(void); + ~keeper_waiter(void); + + void add_task(task_req* task); + +public: + const keeper_config& get_config(void) const; + + keeper_waiter& set_conn_timeout(int n); + keeper_waiter& set_rw_timeout(int n); + keeper_waiter& set_conn_max(int n); + keeper_waiter& set_conn_ttl(int ttl); + keeper_waiter& set_pool_ttl(int ttl); + +public: + void stop(void); + void join(void); + +protected: + // @override + void run(void); + void done(void); + +private: + void check_idle(void); + +private: + fiber_tbox tbox_; + fiber_tbox tbox_ctl_; + std::map manager_; + keeper_config* config_; + time_t last_check_; +}; + +} // namespace acl diff --git a/lib_fiber/cpp/src/tcp_keeper.cpp b/lib_fiber/cpp/src/tcp_keeper.cpp index 193dc0c13..6a12a95e7 100644 --- a/lib_fiber/cpp/src/tcp_keeper.cpp +++ b/lib_fiber/cpp/src/tcp_keeper.cpp @@ -1,467 +1,21 @@ #include "stdafx.hpp" -#include -#include -#include "acl_cpp/stdlib/trigger.hpp" -#include "acl_cpp/stdlib/string.hpp" -#include "acl_cpp/stream/socket_stream.hpp" -#include "acl_cpp/stdlib/trigger.hpp" -#include "fiber/fiber.hpp" -#include "fiber/fiber_tbox.hpp" #include "fiber/tcp_keeper.hpp" - -////////////////////////////////////////////////////////////////////////////// +#include "keeper/keeper.hpp" +#include "keeper/keeper_waiter.hpp" namespace acl { -class tcp_keeper_config -{ -public: - tcp_keeper_config(void) - : conn_timeo(10) - , rw_timeo(10) - , conn_max(10) - , conn_ttl(10) - , pool_ttl(20) {} - - ~tcp_keeper_config(void) {} - - int conn_timeo; - int rw_timeo; - int conn_max; - int conn_ttl; - int pool_ttl; -}; - -class task_req -{ -public: - task_req(void) {} - ~task_req(void) {} - - void set_addr(const char* addr) - { - addr_ = addr; - } - - const char* get_addr(void) const - { - return addr_.c_str(); - } - -public: - socket_stream* pop(void) - { - return tbox_.pop(); - } - - void put(socket_stream* conn) - { - tbox_.push(conn); - } - -private: - string addr_; - fiber_tbox tbox_; -}; - -////////////////////////////////////////////////////////////////////////////// - -class fiber_conn : public fiber -{ -public: - fiber_conn(const tcp_keeper_config& config, const char* addr) - : conn_(NULL) - , last_ctime_(0) - , config_(config) - , addr_(addr) {} - - ~fiber_conn(void) { delete conn_; } - - void add_task(task_req* task) - { - tbox_.push(task); - } - -public: - void stop(void) - { - add_task(NULL); - } - - void join(void) - { - (void) tbox_ctl_.pop(); - } - - bool connected(void) const - { - return conn_ ? true : false; - } - -private: - // @override - void run(void) - { - int timeo = config_.conn_ttl > 0 ? config_.conn_ttl * 1000 : -1; - - while (true) { - bool found; - task_req* req = tbox_.pop(timeo, &found); - if (req == NULL) { - if (found) { - break; - } - - check_idle(); - continue; - } - - if (conn_ == NULL) { - connect_one(); - } - - socket_stream* conn; - if (conn_) { - conn = new socket_stream; - conn->open(dup(conn_->sock_handle())); - delete conn_; - } else { - conn = NULL; - } - - req->put(conn); - connect_one(); - } - - done(); - } - - void connect_one(void) - { - conn_ = new socket_stream; - if (conn_->open(addr_, config_.conn_timeo, config_.rw_timeo)) { - last_ctime_ = time(NULL); - } else { - delete conn_; - conn_ = NULL; - } - } - - void check_idle(void) - { - if (config_.conn_ttl <= 0) { - return; - } - time_t now = time(NULL); - if ((now - last_ctime_) >= config_.conn_ttl) { - delete conn_; - conn_ = NULL; - } - } - - void done(void) - { - tbox_ctl_.push(NULL); - } - -private: - fiber_tbox tbox_; - fiber_tbox tbox_ctl_; - socket_stream* conn_; - time_t last_ctime_; - - const tcp_keeper_config& config_; - string addr_; -}; - -////////////////////////////////////////////////////////////////////////////// - -class fiber_pool : public fiber -{ -public: - fiber_pool(const tcp_keeper_config& config, const char* addr) - : last_peek_(0) - , config_(config) - , addr_(addr) - {} - - ~fiber_pool(void) {} - - void add_task(task_req* task) - { - tbox_.push(task); - } - -public: - void stop(void) - { - add_task(NULL); - } - - void join(void) - { - (void) tbox_ctl_.pop(); - } - - bool empty(void) const - { - for (std::vector::const_iterator cit = - fibers_.begin(); cit != fibers_.end(); ++cit) { - - if ((*cit)->connected()) { - return false; - } - } - - return true; - } - - time_t last_peek(void) const - { - return last_peek_; - } - -private: - // @override - void run(void) - { - for (int i = 0; i < config_.conn_max; i++) { - fiber_conn* fb = new fiber_conn(config_, addr_); - fibers_.push_back(fb); - fb->start(); - } - - std::vector::iterator it = fibers_.begin(); - - int timeo = config_.conn_ttl > 0 ? config_.conn_ttl * 1000 : -1; - - while (true) { - bool found; - task_req* task = tbox_.pop(timeo, &found); - if (task == NULL) { - if (found) { - break; - } - continue; - } - - last_peek_ = time(NULL); - - (*it)->add_task(task); - if (++it == fibers_.end()) { - it = fibers_.begin(); - } - } - - for (it = fibers_.begin(); it != fibers_.end(); ++it) { - (*it)->stop(); - (*it)->join(); - delete *it; - } - - done(); - } - - void done(void) - { - tbox_ctl_.push(NULL); - } - -private: - fiber_tbox tbox_; - fiber_tbox tbox_ctl_; - std::vector fibers_; - time_t last_peek_; - - const tcp_keeper_config& config_; - string addr_; -}; - -////////////////////////////////////////////////////////////////////////////// - -class fiber_pool_killer : public fiber -{ -public: - fiber_pool_killer(fiber_pool* pool) - : pool_(pool) - { - } - -private: - ~fiber_pool_killer(void) {} - - // @override - void run(void) - { - pool_->stop(); - pool_->join(); - delete pool_; - delete this; - } - -private: - fiber_pool* pool_; -}; - -class fiber_waiter : public fiber -{ -public: - fiber_waiter(void) - : last_check_(0) - { - config_ = new tcp_keeper_config; - } - - ~fiber_waiter(void) - { - delete config_; - } - - void add_task(task_req* task) - { - tbox_.push(task); - } - -public: - fiber_waiter& set_conn_timeout(int n) - { - config_->conn_timeo = n; - return *this; - } - - fiber_waiter& set_rw_timeout(int n) - { - config_->rw_timeo = n; - return *this; - } - - fiber_waiter& set_conn_max(int n) - { - config_->conn_max = n; - return *this; - } - - fiber_waiter& set_conn_ttl(int ttl) - { - config_->conn_ttl = ttl; - return *this; - } - - fiber_waiter& set_pool_ttl(int ttl) - { - config_->pool_ttl = ttl; - return *this; - } - -public: - void stop(void) - { - add_task(NULL); - } - - void join(void) - { - (void) tbox_ctl_.pop(); - } - -protected: - // @override - void run(void) - { - int timeo = config_->pool_ttl > 0 ? config_->pool_ttl * 1000 : -1; - - while (true) { - bool found; - task_req* task = tbox_.pop(timeo, &found); - - check_idle(); - - if (task == NULL) { - if (found) { - break; - } - - continue; - } - - fiber_pool* pool; - - const char* addr = task->get_addr(); - std::map::iterator - it = manager_.find(addr); - if (it == manager_.end()) { - pool = new fiber_pool(*config_, addr); - manager_[addr] = pool; - pool->start(); - } else { - pool = it->second; - } - - pool->add_task(task); - } - - for (std::map::iterator it = - manager_.begin(); it != manager_.end(); ++it) { - - it->second->stop(); - it->second->join(); - delete it->second; - } - - done(); - } - - void done(void) - { - tbox_ctl_.push(this); - } - -private: - void check_idle(void) - { - if (config_->pool_ttl <= 0) { - return; - } - - time_t now = time(NULL); - if (now - last_check_ <= 10) { - return; - } - - std::map::iterator it, next; - it = manager_.begin(); - for (next = it; it != manager_.end(); it = next) { - ++next; - time_t n = (now - it->second->last_peek()); - if (n >= config_->pool_ttl && it->second->empty()) { - fiber* fb = new fiber_pool_killer(it->second); - fb->start(); - - manager_.erase(it); - } - } - - last_check_ = time(NULL); - } - -private: - fiber_tbox tbox_; - fiber_tbox tbox_ctl_; - std::map manager_; - tcp_keeper_config* config_; - time_t last_check_; -}; - -////////////////////////////////////////////////////////////////////////////// - tcp_keeper::tcp_keeper(void) +: rtt_min_(5.00) { - waiter_ = new fiber_waiter; + waiter_ = new keeper_waiter; + lock_ = new thread_mutex; } tcp_keeper::~tcp_keeper(void) { delete waiter_; + delete lock_; } tcp_keeper& tcp_keeper::set_conn_timeout(int n) @@ -495,6 +49,12 @@ tcp_keeper& tcp_keeper::set_pool_ttl(int ttl_ms) return *this; } +tcp_keeper& tcp_keeper::set_rtt_min(double rtt) +{ + rtt_min_ = rtt; + return *this; +} + void* tcp_keeper::run(void) { waiter_->start(); @@ -506,17 +66,92 @@ void tcp_keeper::stop(void) { waiter_->stop(); waiter_->join(); - //fiber::schedule_stop(); this->wait(); } -socket_stream* tcp_keeper::peek(const char* addr) +bool tcp_keeper::direct(const char* addr, bool& found) { + std::map::iterator it; + thread_mutex_guard guard(*lock_); + it = addrs_.find(addr); + if (it == addrs_.end()) { + found = false; + return false; + } + + found = true; + return it->second <= rtt_min_; +} + +void tcp_keeper::remove(const char* addr) +{ + std::map::iterator it; + thread_mutex_guard guard(*lock_); + it = addrs_.find(addr); + if (it != addrs_.end()) { + addrs_.erase(it); + } +} + +void tcp_keeper::update(const char* addr, double cost) +{ + std::map::iterator it; + thread_mutex_guard guard(*lock_); + addrs_[addr] = cost; +} + +socket_stream* tcp_keeper::peek(const char* addr, bool* hit /* = NULL */) +{ + bool found; + + if (direct(addr, found)) { + if (hit) { + *hit = false; + } + + const keeper_config& config = waiter_->get_config(); + + struct timeval begin; + gettimeofday(&begin, NULL); + socket_stream* conn = new socket_stream; + + if (!conn->open(addr, config.conn_timeo, config.rw_timeo)) { + delete conn; + remove(addr); + return NULL; + } + + struct timeval end; + gettimeofday(&end, NULL); + double cost = stamp_sub(end, begin); + if (cost > rtt_min_) { + if (0) + printf("remove %s, cost=%.2f > %.2f\r\n", addr, + cost, rtt_min_); + remove(addr); + } + return conn; + } + task_req task; task.set_addr(addr); + task.set_stamp(); waiter_->add_task(&task); - return task.pop(); + socket_stream* conn = task.pop(); + if (hit) { + *hit = task.is_hit(); + } + + double cost = task.get_conn_cost(); + if (cost < rtt_min_) { + if (0) + printf("update cost =%.2f < rtt=%.2f, addr=%s\r\n", + cost, rtt_min_, addr); + update(addr, cost); + } + + return conn; } } // namespace acl