optimize tcp_keeper

This commit is contained in:
zsx 2019-01-14 14:34:00 +08:00
parent 7b0ba08240
commit 1cd42a9213
8 changed files with 949 additions and 455 deletions

View File

@ -0,0 +1,123 @@
#pragma once
#include "fiber/fiber_tbox.hpp"
namespace acl {
class keeper_conn;
struct keeper_link {
ACL_RING me;
keeper_conn* fb;
};
typedef enum {
ASK_T_NULL,
ASK_T_IDLE,
ASK_T_STOP,
ASK_T_CONN,
} ask_type_t;
typedef enum {
KEEPER_T_READY,
KEEPER_T_BUSY,
KEEPER_T_IDLE,
} keeper_status_t;
class keeper_config
{
public:
keeper_config(void)
: conn_timeo(10)
, rw_timeo(10)
, conn_max(10)
, conn_ttl(10)
, pool_ttl(20) {}
~keeper_config(void) {}
int conn_timeo;
int rw_timeo;
int conn_max;
int conn_ttl;
int pool_ttl;
};
class task_req
{
public:
task_req(void)
: hit_(false)
, conn_cost_(1000)
{
gettimeofday(&stamp_, NULL);
}
~task_req(void) {}
void set_addr(const char* addr)
{
addr_ = addr;
}
const char* get_addr(void) const
{
return addr_.c_str();
}
void set_hit(bool yes)
{
hit_ = yes;
}
bool is_hit(void) const
{
return hit_;
}
void set_stamp(void)
{
gettimeofday(&stamp_, NULL);
}
const struct timeval& get_stamp(void) const
{
return stamp_;
}
double get_cost(void) const
{
struct timeval curr;
gettimeofday(&curr, NULL);
return stamp_sub(curr, stamp_);
}
void set_conn_cost(double cost)
{
conn_cost_ = cost;
}
double get_conn_cost(void) const
{
return conn_cost_;
}
public:
socket_stream* pop(void)
{
return tbox_.pop();
}
void put(socket_stream* conn)
{
tbox_.push(conn);
}
private:
bool hit_;
string addr_;
struct timeval stamp_;
fiber_tbox<socket_stream> tbox_;
double conn_cost_;
};
} // namespace acl

View File

@ -0,0 +1,209 @@
#include "stdafx.hpp"
#include "keeper_conns.hpp"
#include "keeper_conn.hpp"
namespace acl {
keeper_conn::keeper_conn(const keeper_config& config, const char* addr,
keeper_link* lk, keeper_conns& pool)
: ask_(ASK_T_NULL)
, conn_(NULL)
, status_(KEEPER_T_IDLE)
, last_ctime_(0)
, config_(config)
, addr_(addr)
, lk_(lk)
, pool_(pool)
, task_(NULL)
, conn_cost_(-1.0)
{
}
keeper_conn::~keeper_conn(void)
{
delete conn_;
}
void keeper_conn::set_task(task_req& task)
{
task_ = &task;
}
socket_stream* keeper_conn::peek(double& cost)
{
cost = conn_cost_;
if (is_ready()) {
assert(conn_);
socket_stream* conn = new socket_stream;
conn->open(dup(conn_->sock_handle()));
delete conn_;
conn_ = NULL;
status_ = KEEPER_T_IDLE;
return conn;
}
return NULL;
}
void keeper_conn::stop(void)
{
if (is_busy()) {
logger_warn("fiber is busy, kill it");
this->kill();
}
ask_ = ASK_T_STOP;
box_.push(NULL);
}
void keeper_conn::ask_connect(void)
{
ask_ = ASK_T_CONN;
box_.push(NULL);
}
void keeper_conn::join(void)
{
(void) tbox_ctl_.pop();
}
void keeper_conn::print_status(void) const
{
switch (status_) {
case KEEPER_T_BUSY:
printf("fiber status: KEEPER_T_BUSY\r\n");
break;
case KEEPER_T_IDLE:
printf("fiber status: KEEPER_T_IDLE\r\n");
break;
case KEEPER_T_READY:
printf("fiber status: KEEPER_T_READY\r\n");
break;
default:
printf("fiber status: unknown(%d)\r\n", status_);
break;
}
}
void keeper_conn::run(void)
{
if (task_) {
double n = task_->get_cost();
if (n >= 50) {
const struct timeval& last = task_->get_stamp();
printf(">>>cost %.2f, %p\n", n, task_);
printf("last=%p, sec=%ld, usec=%ld\r\n",
&last, last.tv_sec, last.tv_usec);
}
handle_task(*task_);
delete this;
return;
}
connect_one();
#ifndef USE_SBOX
int timeo = config_.conn_ttl > 0 ? config_.conn_ttl * 1000 : -1;
#endif
while (true) {
bool found;
#ifdef USE_SBOX
task_req* task = box_.pop(&found);
#else
task_req* task = box_.pop(timeo, &found);
#endif
assert(task == NULL);
if (ask_ == ASK_T_STOP) {
break;
} else if (ask_ == ASK_T_CONN) {
if (conn_ == NULL) {
connect_one();
} else {
printf("connecting ...\n");
}
} else if (!found) {
check_idle();
} else {
printf("invalid ask=%d, task=%p\n", ask_, task);
abort();
}
}
done();
}
void keeper_conn::handle_task(task_req& task)
{
if (conn_ == NULL) {
connect_one();
}
socket_stream* conn;
if (conn_) {
conn = new socket_stream;
conn->open(dup(conn_->sock_handle()));
delete conn_;
conn_ = NULL;
status_ = KEEPER_T_IDLE;
} else {
conn = NULL;
status_ = KEEPER_T_IDLE;
}
if (conn_cost_ >= 1000)
printf("too long const=%.2f\r\n", conn_cost_);
task.set_conn_cost(conn_cost_);
task.put(conn);
}
void keeper_conn::connect_one(void)
{
assert(conn_ == NULL);
status_ = KEEPER_T_BUSY;
conn_ = new socket_stream;
struct timeval begin;
gettimeofday(&begin, NULL);
bool ret = conn_->open(addr_, config_.conn_timeo, config_.rw_timeo);
struct timeval end;
gettimeofday(&end, NULL);
conn_cost_ = acl::stamp_sub(end, begin);
// only for test
if (conn_cost_ >= 1000) {
printf("%s(%d): spent: %.2f ms, addr=%s\n",
__FUNCTION__, __LINE__, conn_cost_, addr_.c_str());
}
if (ret) {
status_ = KEEPER_T_READY;
last_ctime_ = time(NULL);
pool_.on_connect(*conn_, lk_);
} else {
if (this->killed()) {
logger_warn("I've been killed");
}
delete conn_;
conn_ = NULL;
status_ = KEEPER_T_IDLE;
}
}
void keeper_conn::check_idle(void)
{
if (config_.conn_ttl <= 0) {
return;
}
time_t now = time(NULL);
if ((now - last_ctime_) >= config_.conn_ttl) {
delete conn_;
conn_ = NULL;
status_ = KEEPER_T_IDLE;
}
}
void keeper_conn::done(void)
{
tbox_ctl_.push(NULL);
}
} // namespace acl

View File

@ -0,0 +1,73 @@
#pragma once
#include "fiber/fiber.hpp"
#include "fiber/fiber_sem.hpp"
#include "fiber/fiber_tbox.hpp"
#include "keeper.hpp"
namespace acl {
class keeper_conns;
class keeper_conn : public fiber
{
public:
keeper_conn(const keeper_config& config, const char* addr,
keeper_link* lk, keeper_conns& pool);
~keeper_conn(void);
void set_task(task_req& task);
socket_stream* peek(double& cost);
public:
void stop(void);
void ask_connect(void);
void join(void);
bool is_ready(void) const
{
return status_ == KEEPER_T_READY;
}
bool is_busy(void) const
{
return status_ == KEEPER_T_BUSY;
}
bool is_idle(void) const
{
return status_ == KEEPER_T_IDLE;
}
void print_status(void) const;
private:
// @override
void run(void);
void handle_task(task_req& task);
void connect_one(void);
void check_idle(void);
void done(void);
private:
#ifdef USE_SBOX
fiber_sbox<task_req> box_;
#else
fiber_tbox<task_req> box_;
#endif
fiber_tbox<keeper_conn> tbox_ctl_;
ask_type_t ask_;
socket_stream* conn_;
keeper_status_t status_;
time_t last_ctime_;
const keeper_config& config_;
string addr_;
keeper_link* lk_;
keeper_conns& pool_;
task_req* task_;
double conn_cost_;
};
} // namespace acl

View File

@ -0,0 +1,192 @@
#include "stdafx.hpp"
#include "keeper.hpp"
#include "keeper_conn.hpp"
#include "keeper_conns.hpp"
namespace acl {
keeper_conns::keeper_conns(const keeper_config& config, const char* addr)
: last_peek_(0)
, config_(config)
, addr_(addr)
{
acl_ring_init(&linker_);
for (int i = 0; i < config_.conn_max; i++) {
keeper_link* lk = new keeper_link;
lk->fb = new keeper_conn(config_, addr_, lk, *this);
ACL_RING_APPEND(&linker_, &lk->me);
lk->fb->start();
}
}
keeper_conns::~keeper_conns(void)
{
}
void keeper_conns::on_connect(socket_stream& conn, keeper_link* lk)
{
if (lk) {
ACL_RING_DETACH(&lk->me);
ACL_RING_APPEND(&linker_, &lk->me);
} else {
conn.set_tcp_solinger(true, 0);
}
}
void keeper_conns::add_task(task_req& task)
{
keeper_conn* fb = peek_ready();
if (fb) {
task.set_hit(true);
double cost;
socket_stream* conn = fb->peek(cost);
assert(conn);
task.set_conn_cost(cost);
task.put(conn);
fb->ask_connect();
if (0)
printf(">>>ready hit\r\n");
} else {
task.set_hit(false);
fb = new keeper_conn(config_, addr_, NULL, *this);
task.set_stamp();
fb->set_task(task);
fb->start();
if (0)
printf(">>>>trigger connect one, hit=%d\r\n",
this->debug_check());
}
}
void keeper_conns::stop(void)
{
tbox_.push(NULL);
}
void keeper_conns::join(void)
{
(void) tbox_ctl_.pop();
}
bool keeper_conns::empty(void) const
{
ACL_RING_ITER iter;
keeper_link* lk;
acl_ring_foreach(iter, &linker_) {
lk = acl_ring_to_appl(iter.ptr, keeper_link, me);
if (lk->fb->is_ready()) {
return false;
}
}
return true;
}
void keeper_conns::run(void)
{
int timeo = config_.conn_ttl > 0 ? config_.conn_ttl * 1000 : -1;
while (true) {
bool found;
task_req* task = tbox_.pop(timeo, &found);
if (task == NULL) {
if (found) {
break;
}
continue;
}
}
stop_all();
done();
}
int keeper_conns::debug_check(void)
{
ACL_RING_ITER iter;
keeper_link* lk;
int i = 0;
acl_ring_foreach(iter, &linker_) {
lk = acl_ring_to_appl(iter.ptr, keeper_link, me);
i++;
if (lk->fb->is_ready()) {
return i;
}
}
return -1;
}
keeper_conn* keeper_conns::peek_ready(void)
{
#if 0
keeper_link* lk = NULL;
ACL_RING_ITER iter;
acl_ring_foreach(iter, &linker_) {
lk = acl_ring_to_appl(iter.ptr, keeper_link, me);
if (lk->fb->is_ready()) {
break;
}
}
#else
keeper_link* lk = ACL_RING_FIRST_APPL(&linker_, keeper_link, me);
#endif
if (lk == NULL) {
logger_error("first keeper_conn null");
return NULL;
}
if (!lk->fb->is_ready()) {
//lk->fb->print_status();
return NULL;
}
ACL_RING_DETACH(&lk->me);
ACL_RING_PREPEND(&linker_, &lk->me);
return lk->fb;
}
void keeper_conns::stop_all(void)
{
while (true) {
ACL_RING* hdr = acl_ring_pop_head(&linker_);
if (hdr == NULL) {
break;
}
keeper_link* lk = acl_ring_to_appl(hdr, keeper_link, me);
lk->fb->stop();
lk->fb->join();
delete lk->fb;
delete lk;
}
acl_ring_init(&linker_);
}
void keeper_conns::done(void)
{
tbox_ctl_.push(NULL);
}
//////////////////////////////////////////////////////////////////////////////
keeper_killer::keeper_killer(keeper_conns* pool)
: pool_(pool)
{
}
keeper_killer::~keeper_killer(void) {}
void keeper_killer::run(void)
{
pool_->stop();
pool_->join();
delete pool_;
delete this;
}
} // namespace acl

View File

@ -0,0 +1,66 @@
#pragma once
#include "fiber/fiber.hpp"
#include "fiber/fiber_tbox.hpp"
namespace acl {
class keeper_config;
class task_req;
class keeper_conn;
class keeper_link;
class keeper_conns : public fiber
{
public:
keeper_conns(const keeper_config& config, const char* addr);
~keeper_conns(void);
void add_task(task_req& task);
public:
void stop(void);
void join(void);
bool empty(void) const;
time_t last_peek(void) const
{
return last_peek_;
}
void on_connect(socket_stream& conn, keeper_link* lk);
private:
// @override
void run(void);
int debug_check(void);
keeper_conn* peek_ready(void);
void stop_all(void);
void done(void);
private:
fiber_tbox<task_req> tbox_;
fiber_tbox<keeper_conns> tbox_ctl_;
ACL_RING linker_;
time_t last_peek_;
const keeper_config& config_;
string addr_;
};
//////////////////////////////////////////////////////////////////////////////
class keeper_killer : public fiber
{
public:
keeper_killer(keeper_conns* pool);
private:
~keeper_killer(void);
// @override
void run(void);
private:
keeper_conns* pool_;
};
} // namespace acl

View File

@ -0,0 +1,147 @@
#include "stdafx.hpp"
#include "keeper.hpp"
#include "keeper_conns.hpp"
#include "keeper_waiter.hpp"
namespace acl {
keeper_waiter::keeper_waiter(void)
: last_check_(0)
{
config_ = new keeper_config;
}
keeper_waiter::~keeper_waiter(void)
{
delete config_;
}
keeper_waiter& keeper_waiter::set_conn_timeout(int n)
{
config_->conn_timeo = n;
return *this;
}
keeper_waiter& keeper_waiter::set_rw_timeout(int n)
{
config_->rw_timeo = n;
return *this;
}
keeper_waiter& keeper_waiter::set_conn_max(int n)
{
config_->conn_max = n;
return *this;
}
keeper_waiter& keeper_waiter::set_conn_ttl(int ttl)
{
config_->conn_ttl = ttl;
return *this;
}
keeper_waiter& keeper_waiter::set_pool_ttl(int ttl)
{
config_->pool_ttl = ttl;
return *this;
}
const keeper_config& keeper_waiter::get_config(void) const
{
return *config_;
}
void keeper_waiter::stop(void)
{
add_task(NULL);
}
void keeper_waiter::join(void)
{
(void) tbox_ctl_.pop();
}
void keeper_waiter::add_task(task_req* task)
{
tbox_.push(task);
}
void keeper_waiter::run(void)
{
int timeo = config_->pool_ttl > 0 ? config_->pool_ttl * 1000 : -1;
while (true) {
bool found;
task_req* task = tbox_.pop(timeo, &found);
check_idle();
if (task == NULL) {
if (found) {
break;
}
continue;
}
keeper_conns* pool;
const char* addr = task->get_addr();
std::map<string, keeper_conns*>::iterator it =
manager_.find(addr);
if (it == manager_.end()) {
pool = new keeper_conns(*config_, addr);
manager_[addr] = pool;
pool->start();
} else {
pool = it->second;
}
task->set_stamp();
pool->add_task(*task);
}
for (std::map<string, keeper_conns*>::iterator it =
manager_.begin(); it != manager_.end(); ++it) {
it->second->stop();
it->second->join();
delete it->second;
}
done();
}
void keeper_waiter::done(void)
{
tbox_ctl_.push(this);
}
void keeper_waiter::check_idle(void)
{
if (config_->pool_ttl <= 0) {
return;
}
time_t now = time(NULL);
if (now - last_check_ <= 10) {
return;
}
std::map<string, keeper_conns*>::iterator it, next;
it = manager_.begin();
for (next = it; it != manager_.end(); it = next) {
++next;
time_t n = (now - it->second->last_peek());
if (n >= config_->pool_ttl && it->second->empty()) {
fiber* fb = new keeper_killer(it->second);
fb->start();
manager_.erase(it);
}
}
last_check_ = time(NULL);
}
} // namespace acl

View File

@ -0,0 +1,49 @@
#pragma once
#include <map>
#include "fiber/fiber.hpp"
#include "fiber/fiber_tbox.hpp"
namespace acl {
class keeper_config;
class task_req;
class keeper_conns;
class keeper_waiter : public fiber
{
public:
keeper_waiter(void);
~keeper_waiter(void);
void add_task(task_req* task);
public:
const keeper_config& get_config(void) const;
keeper_waiter& set_conn_timeout(int n);
keeper_waiter& set_rw_timeout(int n);
keeper_waiter& set_conn_max(int n);
keeper_waiter& set_conn_ttl(int ttl);
keeper_waiter& set_pool_ttl(int ttl);
public:
void stop(void);
void join(void);
protected:
// @override
void run(void);
void done(void);
private:
void check_idle(void);
private:
fiber_tbox<task_req> tbox_;
fiber_tbox<keeper_waiter> tbox_ctl_;
std::map<string, keeper_conns*> manager_;
keeper_config* config_;
time_t last_check_;
};
} // namespace acl

View File

@ -1,467 +1,21 @@
#include "stdafx.hpp"
#include <vector>
#include <map>
#include "acl_cpp/stdlib/trigger.hpp"
#include "acl_cpp/stdlib/string.hpp"
#include "acl_cpp/stream/socket_stream.hpp"
#include "acl_cpp/stdlib/trigger.hpp"
#include "fiber/fiber.hpp"
#include "fiber/fiber_tbox.hpp"
#include "fiber/tcp_keeper.hpp"
//////////////////////////////////////////////////////////////////////////////
#include "keeper/keeper.hpp"
#include "keeper/keeper_waiter.hpp"
namespace acl {
class tcp_keeper_config
{
public:
tcp_keeper_config(void)
: conn_timeo(10)
, rw_timeo(10)
, conn_max(10)
, conn_ttl(10)
, pool_ttl(20) {}
~tcp_keeper_config(void) {}
int conn_timeo;
int rw_timeo;
int conn_max;
int conn_ttl;
int pool_ttl;
};
class task_req
{
public:
task_req(void) {}
~task_req(void) {}
void set_addr(const char* addr)
{
addr_ = addr;
}
const char* get_addr(void) const
{
return addr_.c_str();
}
public:
socket_stream* pop(void)
{
return tbox_.pop();
}
void put(socket_stream* conn)
{
tbox_.push(conn);
}
private:
string addr_;
fiber_tbox<socket_stream> tbox_;
};
//////////////////////////////////////////////////////////////////////////////
class fiber_conn : public fiber
{
public:
fiber_conn(const tcp_keeper_config& config, const char* addr)
: conn_(NULL)
, last_ctime_(0)
, config_(config)
, addr_(addr) {}
~fiber_conn(void) { delete conn_; }
void add_task(task_req* task)
{
tbox_.push(task);
}
public:
void stop(void)
{
add_task(NULL);
}
void join(void)
{
(void) tbox_ctl_.pop();
}
bool connected(void) const
{
return conn_ ? true : false;
}
private:
// @override
void run(void)
{
int timeo = config_.conn_ttl > 0 ? config_.conn_ttl * 1000 : -1;
while (true) {
bool found;
task_req* req = tbox_.pop(timeo, &found);
if (req == NULL) {
if (found) {
break;
}
check_idle();
continue;
}
if (conn_ == NULL) {
connect_one();
}
socket_stream* conn;
if (conn_) {
conn = new socket_stream;
conn->open(dup(conn_->sock_handle()));
delete conn_;
} else {
conn = NULL;
}
req->put(conn);
connect_one();
}
done();
}
void connect_one(void)
{
conn_ = new socket_stream;
if (conn_->open(addr_, config_.conn_timeo, config_.rw_timeo)) {
last_ctime_ = time(NULL);
} else {
delete conn_;
conn_ = NULL;
}
}
void check_idle(void)
{
if (config_.conn_ttl <= 0) {
return;
}
time_t now = time(NULL);
if ((now - last_ctime_) >= config_.conn_ttl) {
delete conn_;
conn_ = NULL;
}
}
void done(void)
{
tbox_ctl_.push(NULL);
}
private:
fiber_tbox<task_req> tbox_;
fiber_tbox<fiber_conn> tbox_ctl_;
socket_stream* conn_;
time_t last_ctime_;
const tcp_keeper_config& config_;
string addr_;
};
//////////////////////////////////////////////////////////////////////////////
class fiber_pool : public fiber
{
public:
fiber_pool(const tcp_keeper_config& config, const char* addr)
: last_peek_(0)
, config_(config)
, addr_(addr)
{}
~fiber_pool(void) {}
void add_task(task_req* task)
{
tbox_.push(task);
}
public:
void stop(void)
{
add_task(NULL);
}
void join(void)
{
(void) tbox_ctl_.pop();
}
bool empty(void) const
{
for (std::vector<fiber_conn*>::const_iterator cit =
fibers_.begin(); cit != fibers_.end(); ++cit) {
if ((*cit)->connected()) {
return false;
}
}
return true;
}
time_t last_peek(void) const
{
return last_peek_;
}
private:
// @override
void run(void)
{
for (int i = 0; i < config_.conn_max; i++) {
fiber_conn* fb = new fiber_conn(config_, addr_);
fibers_.push_back(fb);
fb->start();
}
std::vector<fiber_conn*>::iterator it = fibers_.begin();
int timeo = config_.conn_ttl > 0 ? config_.conn_ttl * 1000 : -1;
while (true) {
bool found;
task_req* task = tbox_.pop(timeo, &found);
if (task == NULL) {
if (found) {
break;
}
continue;
}
last_peek_ = time(NULL);
(*it)->add_task(task);
if (++it == fibers_.end()) {
it = fibers_.begin();
}
}
for (it = fibers_.begin(); it != fibers_.end(); ++it) {
(*it)->stop();
(*it)->join();
delete *it;
}
done();
}
void done(void)
{
tbox_ctl_.push(NULL);
}
private:
fiber_tbox<task_req> tbox_;
fiber_tbox<fiber_pool> tbox_ctl_;
std::vector<fiber_conn*> fibers_;
time_t last_peek_;
const tcp_keeper_config& config_;
string addr_;
};
//////////////////////////////////////////////////////////////////////////////
class fiber_pool_killer : public fiber
{
public:
fiber_pool_killer(fiber_pool* pool)
: pool_(pool)
{
}
private:
~fiber_pool_killer(void) {}
// @override
void run(void)
{
pool_->stop();
pool_->join();
delete pool_;
delete this;
}
private:
fiber_pool* pool_;
};
class fiber_waiter : public fiber
{
public:
fiber_waiter(void)
: last_check_(0)
{
config_ = new tcp_keeper_config;
}
~fiber_waiter(void)
{
delete config_;
}
void add_task(task_req* task)
{
tbox_.push(task);
}
public:
fiber_waiter& set_conn_timeout(int n)
{
config_->conn_timeo = n;
return *this;
}
fiber_waiter& set_rw_timeout(int n)
{
config_->rw_timeo = n;
return *this;
}
fiber_waiter& set_conn_max(int n)
{
config_->conn_max = n;
return *this;
}
fiber_waiter& set_conn_ttl(int ttl)
{
config_->conn_ttl = ttl;
return *this;
}
fiber_waiter& set_pool_ttl(int ttl)
{
config_->pool_ttl = ttl;
return *this;
}
public:
void stop(void)
{
add_task(NULL);
}
void join(void)
{
(void) tbox_ctl_.pop();
}
protected:
// @override
void run(void)
{
int timeo = config_->pool_ttl > 0 ? config_->pool_ttl * 1000 : -1;
while (true) {
bool found;
task_req* task = tbox_.pop(timeo, &found);
check_idle();
if (task == NULL) {
if (found) {
break;
}
continue;
}
fiber_pool* pool;
const char* addr = task->get_addr();
std::map<string, fiber_pool*>::iterator
it = manager_.find(addr);
if (it == manager_.end()) {
pool = new fiber_pool(*config_, addr);
manager_[addr] = pool;
pool->start();
} else {
pool = it->second;
}
pool->add_task(task);
}
for (std::map<string, fiber_pool*>::iterator it =
manager_.begin(); it != manager_.end(); ++it) {
it->second->stop();
it->second->join();
delete it->second;
}
done();
}
void done(void)
{
tbox_ctl_.push(this);
}
private:
void check_idle(void)
{
if (config_->pool_ttl <= 0) {
return;
}
time_t now = time(NULL);
if (now - last_check_ <= 10) {
return;
}
std::map<string, fiber_pool*>::iterator it, next;
it = manager_.begin();
for (next = it; it != manager_.end(); it = next) {
++next;
time_t n = (now - it->second->last_peek());
if (n >= config_->pool_ttl && it->second->empty()) {
fiber* fb = new fiber_pool_killer(it->second);
fb->start();
manager_.erase(it);
}
}
last_check_ = time(NULL);
}
private:
fiber_tbox<task_req> tbox_;
fiber_tbox<fiber_waiter> tbox_ctl_;
std::map<string, fiber_pool*> manager_;
tcp_keeper_config* config_;
time_t last_check_;
};
//////////////////////////////////////////////////////////////////////////////
tcp_keeper::tcp_keeper(void)
: rtt_min_(5.00)
{
waiter_ = new fiber_waiter;
waiter_ = new keeper_waiter;
lock_ = new thread_mutex;
}
tcp_keeper::~tcp_keeper(void)
{
delete waiter_;
delete lock_;
}
tcp_keeper& tcp_keeper::set_conn_timeout(int n)
@ -495,6 +49,12 @@ tcp_keeper& tcp_keeper::set_pool_ttl(int ttl_ms)
return *this;
}
tcp_keeper& tcp_keeper::set_rtt_min(double rtt)
{
rtt_min_ = rtt;
return *this;
}
void* tcp_keeper::run(void)
{
waiter_->start();
@ -506,17 +66,92 @@ void tcp_keeper::stop(void)
{
waiter_->stop();
waiter_->join();
//fiber::schedule_stop();
this->wait();
}
socket_stream* tcp_keeper::peek(const char* addr)
bool tcp_keeper::direct(const char* addr, bool& found)
{
std::map<std::string, double>::iterator it;
thread_mutex_guard guard(*lock_);
it = addrs_.find(addr);
if (it == addrs_.end()) {
found = false;
return false;
}
found = true;
return it->second <= rtt_min_;
}
void tcp_keeper::remove(const char* addr)
{
std::map<std::string, double>::iterator it;
thread_mutex_guard guard(*lock_);
it = addrs_.find(addr);
if (it != addrs_.end()) {
addrs_.erase(it);
}
}
void tcp_keeper::update(const char* addr, double cost)
{
std::map<std::string, double>::iterator it;
thread_mutex_guard guard(*lock_);
addrs_[addr] = cost;
}
socket_stream* tcp_keeper::peek(const char* addr, bool* hit /* = NULL */)
{
bool found;
if (direct(addr, found)) {
if (hit) {
*hit = false;
}
const keeper_config& config = waiter_->get_config();
struct timeval begin;
gettimeofday(&begin, NULL);
socket_stream* conn = new socket_stream;
if (!conn->open(addr, config.conn_timeo, config.rw_timeo)) {
delete conn;
remove(addr);
return NULL;
}
struct timeval end;
gettimeofday(&end, NULL);
double cost = stamp_sub(end, begin);
if (cost > rtt_min_) {
if (0)
printf("remove %s, cost=%.2f > %.2f\r\n", addr,
cost, rtt_min_);
remove(addr);
}
return conn;
}
task_req task;
task.set_addr(addr);
task.set_stamp();
waiter_->add_task(&task);
return task.pop();
socket_stream* conn = task.pop();
if (hit) {
*hit = task.is_hit();
}
double cost = task.get_conn_cost();
if (cost < rtt_min_) {
if (0)
printf("update cost =%.2f < rtt=%.2f, addr=%s\r\n",
cost, rtt_min_, addr);
update(addr, cost);
}
return conn;
}
} // namespace acl