complete the redis cluster lib

This commit is contained in:
ubuntu14 2015-03-01 07:50:53 -08:00
parent dfe63fc844
commit 9b50d1dc50
9 changed files with 359 additions and 71 deletions

View File

@ -547,8 +547,9 @@ static void server_wakeup(ACL_AIO *aio, int fd)
increase_client_counter();
if (acl_getpeername(fd, addr, sizeof(addr)) < 0) {
acl_msg_warn("%s, %s(%d): get socket's addr error: %s",
__FILE__, myname, __LINE__, acl_last_serror());
if (acl_msg_verbose)
acl_msg_warn("%s, %s(%d): get socket's addr error: %s",
__FILE__, myname, __LINE__, acl_last_serror());
acl_socket_close(fd);
return;
}
@ -558,8 +559,9 @@ static void server_wakeup(ACL_AIO *aio, int fd)
*ptr = 0;
if (!acl_access_permit(addr)) {
acl_msg_warn("%s, %s(%d): addr(%s) be denied",
__FILE__, myname, __LINE__, addr);
if (acl_msg_verbose)
acl_msg_warn("%s, %s(%d): addr(%s) be denied",
__FILE__, myname, __LINE__, addr);
if (__deny_info && *__deny_info) {
if (write(fd, __deny_info, strlen(__deny_info)) > 0

View File

@ -1,6 +1,16 @@
修改历史列表:
------------------------------------------------------------------------
287) 2015.3.1
287.1) feature: redis 客户端集群版增强了针对服务器掉线的容错功能
287.2) feature: 丰富了连接池公共类对象功能
286) 2015.2.28
286.1) feature: redis 客户端库针对集群方式增加了损坏结点自动删除功能
286.2) bugfix: connect_pool 类中的函数 put 当参数 delay_destroy_ 为 true
时,若 count_ > 0 返回时没有对互斥锁 lock_ 解锁,从而会造成其它使用该
连接池对象的线程调用 put 函数加锁时永远等待
285) 2015.2.26
285.1) feature: redis 客户端库完善了针对集群版本 redis3.0 的支持

View File

@ -35,6 +35,12 @@ public:
*/
void set_slot(int slot, const char* addr);
/**
* redis 便
* @param slot {int}
*/
void clear_slot(int slot);
/**
*
* @return {int}
@ -44,6 +50,40 @@ public:
return max_slot_;
}
//////////////////////////////////////////////////////////////////////
/**
* 15
* @param max {int} > 0
*/
void set_redirect_max(int max);
/**
*
* @return {int}
*/
int get_redirect_max() const
{
return redirect_max_;
}
/**
* >= 2 () 1
* redis 线
* ( redis.conf cluster-node-timeout )
*
* @param n {int} () 1
*/
void set_redirect_sleep(int n);
/**
* set_redirect_sleep
* @return {int}
*/
int get_redirect_sleep() const
{
return redirect_sleep_;
}
protected:
/**
*
@ -59,6 +99,8 @@ private:
int max_slot_;
const char** slot_addrs_;
std::vector<char*> addrs_;
int redirect_max_;
int redirect_sleep_;
};
} // namespace acl

View File

@ -173,8 +173,8 @@ public:
void set_slice_respond(bool on);
protected:
const redis_result* run(size_t nchildren = 0);
const redis_result* run(redis_cluster* cluster, size_t nchildren);
const redis_result* run(size_t nchild = 0);
const redis_result* run(redis_cluster* cluster, size_t nchild);
void build_request(size_t argc, const char* argv[], size_t lens[]);
void reset_request();
@ -266,8 +266,12 @@ private:
size_t max_conns_;
unsigned long long used_;
int slot_;
int redirect_max_;
int redirect_sleep_;
redis_pool* get_conns(redis_cluster* cluster, const char* info);
redis_client* peek_conn(redis_cluster* cluster, int slot);
redis_client* redirect(redis_cluster* cluster, const char* addr);
const char* get_addr(const char* info);
private:
/************************** request ********************************/

View File

@ -99,6 +99,7 @@ static bool test_set(acl::redis_string& option, int i)
option.reset();
bool ret = option.set(key.c_str(), value.c_str());
return ret;
if (i < 10)
printf("set key: %s, value: %s %s\r\n", key.c_str(),
value.c_str(), ret ? "ok" : "error");
@ -121,12 +122,19 @@ static bool test_get(acl::redis_string& option, int i)
return ret;
}
static int __threads_exit = 0;
class test_thread : public acl::thread
{
public:
test_thread(acl::redis_cluster& cluster, int max_conns,
const char* cmd, int n)
: cluster_(cluster), max_conns_(max_conns), cmd_(cmd), n_(n) {}
test_thread(acl::locker& locker, acl::redis_cluster& cluster,
int max_conns, const char* cmd, int n)
: locker_(locker)
, cluster_(cluster)
, max_conns_(max_conns)
, cmd_(cmd)
, n_(n)
{}
~test_thread() {}
@ -179,10 +187,13 @@ protected:
if (ret == false)
{
printf("cmd: %s error\r\n", cmd_.c_str());
printf("cmd: %s error, tid: %lu\r\n",
cmd_.c_str(), thread_self());
break;
}
continue;
if (i > 0 && i % 1000 == 0)
{
char tmp[128];
@ -191,10 +202,15 @@ protected:
}
}
locker_.lock();
__threads_exit++;
locker_.unlock();
return NULL;
}
private:
acl::locker& locker_;
acl::redis_cluster& cluster_;
int max_conns_;
acl::string cmd_;
@ -250,28 +266,57 @@ int main(int argc, char* argv[])
}
acl::acl_cpp_init();
//acl::log::stdout_open(true);
acl::log::stdout_open(true);
acl::redis_cluster cluster(conn_timeout, rw_timeout);
// 当连接池不可用时,设置重新恢复该连接池对象的等待时间(秒)
// 当设置的值 <= 0 时表示不再恢复
cluster.set_retry_inter(0);
// 设置重定向的最大阀值,若重定向次数超过此阀值则报错
cluster.set_redirect_max(20);
// 当重定向次数 >= 2 时每次再重定向此函数设置休息的时间(秒)
cluster.set_redirect_sleep(1);
cluster.init(NULL, addrs.c_str(), max_threads);
struct timeval begin;
gettimeofday(&begin, NULL);
acl::locker locker;
std::vector<test_thread*> threads;
for (int i = 0; i < max_threads; i++)
{
test_thread* thread = new test_thread(cluster, max_threads,
cmd.c_str(), n);
test_thread* thread = new test_thread(locker, cluster,
max_threads, cmd.c_str(), n);
threads.push_back(thread);
thread->set_detachable(false);
thread->set_detachable(true);
thread->start();
}
while (true)
{
locker.lock();
if (__threads_exit == max_threads)
{
locker.unlock();
printf("All threads over now!\r\n");
break;
}
locker.unlock();
//printf("max_threads: %d, threads_exit: %d\r\n",
// max_threads, __threads_exit);
sleep(1);
}
std::vector<test_thread*>::iterator it = threads.begin();
for (; it != threads.end(); ++it)
{
(*it)->wait();
//(*it)->wait();
delete (*it);
}

View File

@ -20,11 +20,15 @@ connect_manager::connect_manager()
connect_manager::~connect_manager()
{
lock_.lock();
std::vector<connect_pool*>::iterator it = pools_.begin();
// default_pool_ 已经包含在 pools_ 里了
for (; it != pools_.end(); ++it)
delete *it;
lock_.unlock();
}
// 分析一个服务器地址格式IP:PORT[:MAX_CONN]
@ -146,8 +150,7 @@ connect_pool& connect_manager::set(const char* addr, int count)
}
connect_pool* pool = create_pool(key, count, pools_.size() - 1);
if (retry_inter_ > 0)
pool->set_retry_inter(retry_inter_);
pool->set_retry_inter(retry_inter_);
pools_.push_back(pool);
lock_.unlock();
@ -181,7 +184,6 @@ void connect_manager::remove(const char* addr)
lock_.unlock();
}
connect_pool* connect_manager::get(const char* addr,
bool exclusive /* = true */)
{

View File

@ -61,10 +61,13 @@ bool connect_pool::aliving()
time_t now = time(NULL);
lock_.lock();
if (now - last_dead_ >= retry_inter_)
if (retry_inter_ > 0 && now - last_dead_ >= retry_inter_)
{
alive_ = true;
lock_.unlock();
// 重置服务端连接状态,以便重试
logger("reset server: %s", get_addr());
return true;
}
@ -78,12 +81,15 @@ connect_client* connect_pool::peek()
if (alive_ == false)
{
time_t now = time(NULL);
if (now - last_dead_ < retry_inter_)
if (retry_inter_ <= 0 || now - last_dead_ < retry_inter_)
{
lock_.unlock();
return NULL;
}
alive_ = true;
// 重置服务端连接状态,以便重试
logger("reset server: %s", get_addr());
}
connect_client* conn;
@ -140,13 +146,15 @@ void connect_pool::put(connect_client* conn, bool keep /* = true */)
{
delete conn;
count_--;
acl_assert(count_ >= 0);
if (count_ == 0)
if (count_ <= 0)
{
// 如果引用计数为 0 则自销毁
lock_.unlock();
delete this;
}
else
lock_.unlock();
return;
}

View File

@ -10,6 +10,8 @@ redis_cluster::redis_cluster(int conn_timeout, int rw_timeout,
: conn_timeout_(conn_timeout)
, rw_timeout_(rw_timeout)
, max_slot_(max_slot)
, redirect_max_(15)
, redirect_sleep_(1)
{
slot_addrs_ = (const char**) acl_mycalloc(max_slot_, sizeof(char*));
}
@ -22,6 +24,17 @@ redis_cluster::~redis_cluster()
acl_myfree(*it);
}
void redis_cluster::set_redirect_max(int max)
{
if (max > 0)
redirect_max_ = max;
}
void redis_cluster::set_redirect_sleep(int n)
{
redirect_sleep_ = n;
}
connect_pool* redis_cluster::create_pool(const char* addr,
int count, size_t idx)
{
@ -46,11 +59,21 @@ redis_pool* redis_cluster::peek_slot(int slot)
}
// 因为已经进行了加锁保护,所以在调用 get 方法时的第二个锁保护参数设为 false
redis_pool* conn = (redis_pool*) get(slot_addrs_[slot], false);
redis_pool* conns = (redis_pool*) get(slot_addrs_[slot], false);
unlock();
return conn;
return conns;
}
void redis_cluster::clear_slot(int slot)
{
if (slot >= 0 && slot < max_slot_)
{
lock();
slot_addrs_[slot] = NULL;
unlock();
}
}
void redis_cluster::set_slot(int slot, const char* addr)

View File

@ -21,6 +21,8 @@ redis_command::redis_command()
, max_conns_(0)
, used_(0)
, slot_(-1)
, redirect_max_(15)
, redirect_sleep_(1)
, slice_req_(false)
, request_buf_(NULL)
, request_obj_(NULL)
@ -40,6 +42,8 @@ redis_command::redis_command(redis_client* conn)
, max_conns_(0)
, used_(0)
, slot_(-1)
, redirect_max_(15)
, redirect_sleep_(1)
, slice_req_(false)
, request_buf_(NULL)
, request_obj_(NULL)
@ -68,6 +72,18 @@ redis_command::redis_command(redis_cluster* cluster, size_t max_conns)
, result_(NULL)
{
pool_ = NEW dbuf_pool(128000);
if (cluster != NULL)
{
redirect_max_ = cluster->get_redirect_max();
if (redirect_max_ <= 0)
redirect_max_ = 15;
redirect_sleep_ = cluster->get_redirect_sleep();
}
else
{
redirect_max_ = 15;
redirect_sleep_ = 1;
}
}
redis_command::~redis_command()
@ -114,6 +130,10 @@ void redis_command::set_cluster(redis_cluster* cluster, size_t max_conns)
max_conns_ = max_conns;
if (max_conns_ == 0)
max_conns_ = 100;
redirect_max_ = cluster->get_redirect_max();
if (redirect_max_ <= 0)
redirect_max_ = 15;
redirect_sleep_ = cluster->get_redirect_sleep();
}
bool redis_command::eof() const
@ -232,7 +252,8 @@ const redis_result* redis_command::get_result() const
return result_;
}
redis_pool* redis_command::get_conns(redis_cluster* cluster, const char* info)
// 分析重定向信息,获得重定向的服务器地址
const char* redis_command::get_addr(const char* info)
{
char* cmd = pool_->dbuf_strdup(info);
char* slot = strchr(cmd, ' ');
@ -246,130 +267,261 @@ redis_pool* redis_command::get_conns(redis_cluster* cluster, const char* info)
if (*addr == 0)
return NULL;
// printf(">>>addr: %s, slot: %s\r\n", addr, slot);
redis_pool* conns = (redis_pool*) cluster->get(addr);
// 如果服务器地址不存在,则根据服务器地址动态创建连接池对象
if (conns == NULL)
conns = (redis_pool*) &cluster->set(addr, max_conns_);
return conns;
return addr;
}
const redis_result* redis_command::run(redis_cluster* cluster, size_t nchildren)
// 根据输入的目标地址进行重定向:打开与该地址的连接,如果连接失败,则随机
// 选取一个服务器地址进行连接
redis_client* redis_command::redirect(redis_cluster* cluster, const char* addr)
{
redis_pool* conns;
// 如果已经计算了哈希槽值,则优先从本地缓存中查找对应的连接池
// 如果未找到,则从所有集群结点中随便找一个可用的连接池对象
// 如果服务器地址不存在,则根据服务器地址动态创建连接池对象
if ((conns = (redis_pool*) cluster->get(addr)) == NULL)
conns = (redis_pool*) &cluster->set(addr, max_conns_);
if (slot_ >= 0)
{
conns = cluster->peek_slot(slot_);
if (conns == NULL)
conns = (redis_pool*) cluster->peek();
}
else
conns = (redis_pool*) cluster->peek();
// 如果没有找到可用的连接池对象,则直接返回 NULL 表示出错
if (conns == NULL)
return NULL;
// 从连接池对象中获取一个连接对象
redis_client* conn = (redis_client*) conns->peek();
redis_client* conn;
int i = 0;
while (i++ < 5)
{
conn = (redis_client*) conns->peek();
if (conn != NULL)
return conn;
conns = (redis_pool*) cluster->peek();
}
logger_warn("too many retry: %d, addr: %s", i, addr);
return NULL;
}
redis_client* redis_command::peek_conn(redis_cluster* cluster, int slot)
{
// 如果已经计算了哈希槽值,则优先从本地缓存中查找对应的连接池
// 如果未找到,则从所有集群结点中随便找一个可用的连接池对象
redis_pool* conns;
redis_client* conn;
int i = 0;
while (i++ < 5)
{
if (slot < 0)
conns = (redis_pool*) cluster->peek();
else if ((conns = cluster->peek_slot(slot)) == NULL)
conns = (redis_pool*) cluster->peek();
if (conns == NULL)
{
slot = -1;
continue;
}
conn = (redis_client*) conns->peek();
if (conn != NULL)
return conn;
// 取消哈希槽的地址映射关系
cluster->clear_slot(slot);
// 将连接池对象置为不可用状态
conns->set_alive(false);
}
logger_warn("too many retry: %d, slot: %d", i, slot);
return NULL;
}
const redis_result* redis_command::run(redis_cluster* cluster, size_t nchild)
{
redis_client* conn = peek_conn(cluster, slot_);
// 如果没有找到可用的连接对象,则直接返回 NULL 表示出错
if (conn == NULL)
{
logger_error("peek_conn NULL, slot_: %d", slot_);
return NULL;
}
redis_result_t type;
bool last_moved;
int n = 0;
bool last_moved = false;
while (n++ <= 10)
while (n++ < redirect_max_)
{
// 根据请求过程是否采用内存分片方式调用不同的请求过程
if (slice_req_)
result_ = conn->run(pool_, *request_obj_, nchildren);
result_ = conn->run(pool_, *request_obj_, nchild);
else
result_ = conn->run(pool_, *request_buf_, nchildren);
result_ = conn->run(pool_, *request_buf_, nchild);
// 将连接对象归还给连接池对象
conns->put(conn, !conn->eof());
// 如果连接异常断开,则需要进行重试
if (conn->eof())
{
// 删除哈希槽中的地址映射关系以便下次操作时重新获取
cluster->clear_slot(slot_);
// 将连接池对象置为不可用状态
conn->get_pool()->set_alive(false);
// 将连接对象归还给连接池对象
conn->get_pool()->put(conn, false);
conn = peek_conn(cluster, slot_);
if (conn != NULL)
{
last_moved = true;
reset(true);
continue;
}
last_moved = false;
}
else
{
last_moved = false;
// 将连接对象归还给连接池对象
conn->get_pool()->put(conn, true);
}
if (result_ == NULL)
{
logger_error("result NULL");
return NULL;
}
// 取得服务器的响应结果的类型,并进行分别处理
type = result_->get_type();
if (type == REDIS_RESULT_UNKOWN)
{
logger_error("unknown result type: %d", type);
return NULL;
}
if (type != REDIS_RESULT_ERROR)
{
// 如果发生重定向过程,则设置哈希槽的对应 redis 服务地址
// 如果发生重定向过程,则设置哈希槽对应 redis 服务地址
if (slot_ < 0 || !last_moved)
return result_;
const char* addr = conns->get_addr();
const char* addr = conn->get_pool()->get_addr();
cluster->set_slot(slot_, addr);
return result_;
}
#define EQ(x, y) !strncasecmp((x), (y), sizeof(y) -1)
// 对于结果类型为错误类型,则需要进一步判断是否是重定向指令
const char* ptr = result_->get_error();
if (ptr == NULL || *ptr == 0)
{
logger_error("result error: null");
return result_;
}
// 如果出错信息为重定向指令,则执行重定向过程
if (strncasecmp(ptr, "MOVED", 5) == 0)
if (EQ(ptr, "MOVED"))
{
conns = get_conns(cluster_, ptr);
if (conns == NULL)
const char* addr = get_addr(ptr);
if (addr == NULL)
{
logger_warn("MOVED invalid, ptr: %s", ptr);
return result_;
conn = (redis_client*) conns->peek();
}
conn = redirect(cluster, addr);
if (conn == NULL)
{
logger_error("redirect NULL, addr: %s", addr);
return result_;
}
ptr = conn->get_pool()->get_addr();
if (n >= 2 && redirect_sleep_ > 0
&& strcmp(ptr, addr) != 0)
{
logger("redirect %d, curr %s, waiting %s ...",
n, ptr, addr);
sleep(redirect_sleep_);
}
last_moved = true;
// 需要保存哈希槽值
reset(true);
}
else if (strncasecmp(ptr, "ASK", 3) == 0)
else if (EQ(ptr, "ASK"))
{
conns = get_conns(cluster_, ptr);
if (conns == NULL)
const char* addr = get_addr(ptr);
if (addr == NULL)
{
logger_warn("ASK invalid, ptr: %s", ptr);
return result_;
conn = (redis_client*) conns->peek();
}
conn = redirect(cluster, addr);
if (conn == NULL)
{
logger_error("redirect NULL, addr: %s", addr);
return result_;
}
last_moved = false;
reset(true);
}
// 处理一个主结点失效的情形
else if (EQ(ptr, "CLUSTERDOWN"))
{
cluster->clear_slot(slot_);
if (redirect_sleep_ > 0)
{
logger("redirect %d, slot %d, waiting %s ...",
n, slot_, ptr);
sleep(redirect_sleep_);
}
conn = peek_conn(cluster, -1);
if (conn == NULL)
{
logger_error("peek_conn NULL");
return result_;
}
reset(true);
}
// 对于其它错误类型,则直接返回本次得到的响应结果对象
else
{
logger_error("server error: %s", ptr);
return result_;
}
}
logger_warn("too many cluster redirect: %d", n);
logger_warn("too many redirect: %d, max: %d", n, redirect_max_);
return NULL;
}
const redis_result* redis_command::run(size_t nchildren /* = 0 */)
const redis_result* redis_command::run(size_t nchild /* = 0 */)
{
used_++;
if (cluster_ != NULL)
return run(cluster_, nchildren);
return run(cluster_, nchild);
else if (conn_ != NULL)
{
if (slice_req_)
result_ = conn_->run(pool_, *request_obj_, nchildren);
result_ = conn_->run(pool_, *request_obj_, nchild);
else
result_ = conn_->run(pool_, *request_buf_, nchildren);
result_ = conn_->run(pool_, *request_buf_, nchild);
return result_;
}
else