complete pubsub command for redis

This commit is contained in:
zsx 2015-01-23 17:23:04 +08:00
parent 78a639a4c2
commit 96abd859ac
18 changed files with 556 additions and 33 deletions

View File

@ -4,6 +4,9 @@
277) 2015.1.23
277.1) bugfix: master_threads2.cpp 中 run_once() 函数在 service_on_accept 调用
直接返回时没有释放流对象,导致描述字和内存泄漏
277.2) feature: redis 客户端库的 redis_key, redis_hash, redis_set, redis_zset
类方法增加了 scan 遍历方法
277.3) feature: 完成 了 redis 客户端命令中 pubsub 的所有功能
276) 2015.1.21
276.1) feature: 实现 了redis_zset 全部接口

View File

@ -140,7 +140,6 @@ private:
unsigned long long used_;
dbuf_pool* pool_;
socket_stream conn_;
socket_stream* p;
char* addr_;
int conn_timeout_;
int rw_timeout_;

View File

@ -24,6 +24,10 @@ public:
protected:
redis_client* conn_;
const redis_result** scan_keys(const char* cmd, const char* key,
int& cursor, size_t& size, const char* pattern,
const size_t* count);
};
} // namespace acl

View File

@ -114,6 +114,21 @@ public:
bool hexists(const char* key, const char* name, size_t name_len);
int hlen(const char* key);
/**
*
* @param key {const char*}
* @param cursor {int} 0
* @param out {std::map<string>&}
* @param pattern {const char*} glob
* @param count {const size_t*}
* @return {int}
* 0
* -1:
* >0:
*/
int hscan(const char* key, int cursor, std::map<string, string>& out,
const char* pattern = NULL, const size_t* count = NULL);
};
} // namespace acl

View File

@ -90,6 +90,27 @@ public:
* KEYS h[ae]llo hello hallo hillo
*/
int keys_pattern(const char* pattern, std::vector<string>& out);
/**
* redis-server redis-server
* @param key {const char*}
* @param addr {const char*} redis-server ip:port
* @param dest_db {unsigned} redis-server ID
* @param timeout {unsigned} ()
* @param option {const char*} COPY REPLACE
* @return {bool}
*/
bool migrate(const char* key, const char* addr, unsigned dest_db,
unsigned timeout, const char* option = NULL);
/**
* redis-server
* @param key {const char*}
* @param dest_db {unsigned} ID
* @return {int} -1: 0
* 1
*/
int move(const char* key, unsigned dest_db);
/**
* key key "易失的"( key )
@ -174,7 +195,7 @@ public:
* -1 key
* redis-server 2.8 key -1
*/
int get_ttl(const char* key);
int ttl(const char* key);
/**
* KEY
@ -184,25 +205,18 @@ public:
redis_key_t type(const char* key);
/**
* redis-server redis-server
* @param key {const char*}
* @param addr {const char*} redis-server ip:port
* @param dest_db {unsigned} redis-server ID
* @param timeout {unsigned} ()
* @param option {const char*} COPY REPLACE
* @return {bool}
*
* @param cursor {int} 0
* @param out {std::vector<string>&}
* @param pattern {const char*} glob
* @param count {const size_t*}
* @return {int}
* 0
* -1:
* >0:
*/
bool migrate(const char* key, const char* addr, unsigned dest_db,
unsigned timeout, const char* option = NULL);
/**
* redis-server
* @param key {const char*}
* @param dest_db {unsigned} ID
* @return {int} -1: 0
* 1
*/
int move(const char* key, unsigned dest_db);
int scan(int cursor, std::vector<string>& out,
const char* pattern = NULL, const size_t* count = NULL);
};
} // namespace acl

View File

@ -1,5 +1,7 @@
#pragma once
#include "acl_cpp/acl_cpp_define.hpp"
#include <vector>
#include <map>
#include "acl_cpp/redis/redis_command.hpp"
namespace acl
@ -25,16 +27,47 @@ public:
* > 0
*/
int publish(const char* channel, const char* msg, size_t len);
int subscribe(const char* first_channel, ...);
int subscribe(const std::vector<const char*>& channels);
int subscribe(const std::vector<string>& channels);
int unsubscribe(const char* first_channel, ...);
int unsubscribe(const std::vector<const char*>& channels);
int unsubscribe(const std::vector<string>& channels);
int psubscribe(const char* first_pattern, ...);
int psubscribe(const std::vector<const char*>& patterns);
int psubscribe(const std::vector<string>& patterns);
int punsubscribe(const char* first_pattern, ...);
int punsubscribe(const std::vector<const char*>& patterns);
int punsubscribe(const std::vector<string>& patterns);
bool get_message(string& channel, string& msg);
int pubsub_channels(std::vector<string>& channels,
const char* first_pattern, ...);
int pubsub_channels(const std::vector<const char*>& patterns,
std::vector<string>& channels);
int pubsub_channels(const std::vector<string>& patterns,
std::vector<string>& channels);
int pubsub_numsub(std::map<string, int>& out,
const char* first_channel, ...);
int pubsub_numsub(const std::vector<const char*>& channels,
std::map<string, int>& out);
int pubsub_numsub(const std::vector<string>& channels,
std::map<string, int>& out);
int pubsub_numpat();
private:
int subop(const char* cmd, const std::vector<const char*>& channels);
int subop(const char* cmd, const std::vector<string>& channels);
int check_channel(const redis_result* obj, const char* cmd,
const string& channel);
int pubsub_numsub(const string& req, std::map<string, int>& out);
};
} // namespace acl

View File

@ -76,6 +76,9 @@ public:
int srem(const char* key, const std::vector<const char*>& members);
int srem(const char* key, const char* members[],
size_t lens[], size_t argc);
int sscan(const char* key, int cursor, std::vector<string>& out,
const char* pattern = NULL, const size_t* count = NULL);
};
} // namespace acl

View File

@ -382,6 +382,20 @@ public:
*/
int zremrangebylex(const char* key, const char* min, const char* max);
/**
*
* @param cursor {int} 0
* @param out {std::map<string, double>&}
* @param pattern {const char*} glob
* @param count {const size_t*}
* @return {int}
* 0
* -1:
* >0:
*/
int zscan(const char* key, int cursor, std::map<string, double>& out,
const char* pattern = NULL, const size_t* count = NULL);
private:
int zrange_get(const char* cmd, const char* key, int start,
int stop, std::vector<string>& result);

View File

@ -48,7 +48,7 @@ static void test_ttl(acl::redis_key& option, int n)
{
key.format("%s_%d", __keypre.c_str(), i);
option.reset();
if ((ttl = option.get_ttl(key.c_str())) < 0)
if ((ttl = option.ttl(key.c_str())) < 0)
{
printf("get ttl key: %s error\r\n", key.c_str());
break;

View File

@ -42,7 +42,7 @@ static bool test_ttl(acl::redis_key& option, int i)
key.format("%s_%d", __keypre.c_str(), i);
option.reset();
if ((ttl = option.get_ttl(key.c_str())) < 0)
if ((ttl = option.ttl(key.c_str())) < 0)
{
printf("get ttl key: %s error\r\n", key.c_str());
return false;

View File

@ -42,7 +42,7 @@ static bool test_ttl(acl::redis_key& option, int i)
key.format("%s_%d", __keypre.c_str(), i);
option.reset();
if ((ttl = option.get_ttl(key.c_str())) < 0)
if ((ttl = option.ttl(key.c_str())) < 0)
{
printf("get ttl key: %s error\r\n", key.c_str());
return false;

View File

@ -19,7 +19,7 @@
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGuid>{81D44C60-6BC1-47AA-92DA-52FE7F84C822}</ProjectGuid>
<ProjectGuid>{9942942B-9DBA-4E56-897A-2746DB8C9E78}</ProjectGuid>
<Keyword>Win32Proj</Keyword>
<ProjectName>redis_pubsub</ProjectName>
</PropertyGroup>
@ -199,4 +199,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

View File

@ -1,10 +1,15 @@
#include "acl_stdafx.hpp"
#include "acl_cpp/redis/redis_client.hpp"
#include "acl_cpp/stdlib/snprintf.hpp"
#include "acl_cpp/redis/redis_result.hpp"
#include "acl_cpp/redis/redis_command.hpp"
namespace acl
{
#define INT_LEN 11
#define LONG_LEN 21
redis_command::redis_command(redis_client* conn /* = NULL */)
: conn_(conn)
{
@ -32,4 +37,101 @@ const redis_result* redis_command::get_result() const
return conn_ ? conn_->get_result() : NULL;
}
const redis_result** redis_command::scan_keys(const char* cmd, const char* key,
int& cursor, size_t& size, const char* pattern, const size_t* count)
{
size = 0;
if (cursor < 0)
return NULL;
const char* argv[7];
size_t lens[7];
size_t argc = 0;
argv[argc] = cmd;
lens[argc] = strlen(cmd);
argc++;
if (key && *key)
{
argv[argc] = key;
lens[argc] = strlen(key);
argc++;
}
char cursor_s[INT_LEN];
safe_snprintf(cursor_s, sizeof(cursor_s), "%d", cursor);
argv[argc] = cursor_s;
lens[argc] = strlen(cursor_s);
argc++;
if (pattern && *pattern)
{
argv[argc] = "MATCH";
lens[argc] = sizeof("MATCH") - 1;
argc++;
argv[argc] = pattern;
lens[argc] = strlen(pattern);
argc++;
}
if (count && *count > 0)
{
argv[argc] = "COUNT";
lens[argc] = sizeof("COUNT") - 1;
argc++;
char count_s[LONG_LEN];
safe_snprintf(count_s, sizeof(count_s), "%lu",
(unsigned long) count);
argv[argc] = count_s;
lens[argc] = strlen(count_s);
argc++;
}
const string& req = conn_->build_request(argc, argv, lens);
const redis_result* result = conn_->run(req, 2);
if (result == NULL)
{
cursor = -1;
return NULL;
}
if (result->get_size() != 2)
{
cursor = -1;
return NULL;
}
const redis_result* rr = result->get_child(0);
if (rr == NULL)
{
cursor = -1;
return NULL;
}
cursor = rr->get_integer();
if (cursor < 0)
{
cursor = -1;
return NULL;
}
rr = result->get_child(1);
if (rr == NULL)
{
cursor = -1;
return NULL;
}
const redis_result** children = rr->get_children(&size);
if (children == NULL)
{
cursor = 0;
size = 0;
}
return children;
}
} // namespace acl

View File

@ -355,6 +355,41 @@ int redis_hash::hlen(const char* key)
return conn_->get_number(req);
}
int redis_hash::hscan(const char* key, int cursor,
std::map<string, string>& out,
const char* pattern /* = NULL */, const size_t* count /* = NULL */)
{
if (key == NULL || *key == 0 || cursor < 0)
return -1;
size_t size;
const redis_result** children = scan_keys("HSCAN", key, cursor,
size, pattern, count);
if (children == NULL)
return cursor;
if (size % 2 != 0)
return -1;
const redis_result* rr;
string name(128), value(128);
for (size_t i = 0; i < size;)
{
rr = children[i];
rr->argv_to_string(name);
i++;
rr = children[i];
rr->argv_to_string(value);
i++;
out[name] = value;
}
return cursor;
}
/////////////////////////////////////////////////////////////////////////////
} // namespace acl

View File

@ -1,6 +1,7 @@
#include "acl_stdafx.hpp"
#include "acl_cpp/stdlib/snprintf.hpp"
#include "acl_cpp/stdlib/log.hpp"
#include "acl_cpp/redis/redis_result.hpp"
#include "acl_cpp/redis/redis_client.hpp"
#include "acl_cpp/redis/redis_key.hpp"
@ -291,7 +292,7 @@ bool redis_key::renamenx(const char* key, const char* newkey)
}
bool redis_key::restore(const char* key, const char* value, size_t len,
int ttl, bool replace /* = false */)
int nttl, bool replace /* = false */)
{
const char* argv[5];
size_t lens[5];
@ -303,7 +304,7 @@ bool redis_key::restore(const char* key, const char* value, size_t len,
lens[1] = strlen(key);
char ttl_s[INT_LEN];
safe_snprintf(ttl_s, sizeof(ttl_s), "%d", ttl);
safe_snprintf(ttl_s, sizeof(ttl_s), "%d", nttl);
argv[2] = ttl_s;
lens[2] = strlen(ttl_s);
@ -322,7 +323,7 @@ bool redis_key::restore(const char* key, const char* value, size_t len,
return conn_->get_status(req);
}
int redis_key::get_ttl(const char* key)
int redis_key::ttl(const char* key)
{
const char* argv[2];
size_t lens[2];
@ -443,6 +444,33 @@ int redis_key::move(const char* key, unsigned dest_db)
return conn_->get_number(req);
}
int redis_key::scan(int cursor, std::vector<string>& out,
const char* pattern /* = NULL */, const size_t* count /* = NULL */)
{
if (cursor < 0)
return -1;
size_t size;
const redis_result** children = scan_keys("SCAN", NULL, cursor,
size, pattern, count);
if (children == NULL)
return cursor;
const redis_result* rr;
string key_buf(128);
out.reserve(size);
for (size_t i = 0; i < size; i++)
{
rr = children[i];
rr->argv_to_string(key_buf);
out.push_back(key_buf);
key_buf.clear();
}
return cursor;
}
/////////////////////////////////////////////////////////////////////////////
} // namespace acl

View File

@ -36,7 +36,7 @@ int redis_pubsub::publish(const char* channel, const char* msg, size_t len)
int redis_pubsub::subscribe(const char* first_channel, ...)
{
std::vector<string> channels;
std::vector<const char*> channels;
channels.push_back(first_channel);
va_list ap;
va_start(ap, first_channel);
@ -48,6 +48,11 @@ int redis_pubsub::subscribe(const char* first_channel, ...)
return subscribe(channels);
}
int redis_pubsub::subscribe(const std::vector<const char*>& channels)
{
return subop("SUBSCRIBE", channels);
}
int redis_pubsub::subscribe(const std::vector<string>& channels)
{
return subop("SUBSCRIBE", channels);
@ -55,7 +60,7 @@ int redis_pubsub::subscribe(const std::vector<string>& channels)
int redis_pubsub::unsubscribe(const char* first_channel, ...)
{
std::vector<string> channels;
std::vector<const char*> channels;
channels.push_back(first_channel);
va_list ap;
va_start(ap, first_channel);
@ -67,11 +72,103 @@ int redis_pubsub::unsubscribe(const char* first_channel, ...)
return unsubscribe(channels);
}
int redis_pubsub::unsubscribe(const std::vector<const char*>& channels)
{
return subop("UNSUBSCRIBE", channels);
}
int redis_pubsub::unsubscribe(const std::vector<string>& channels)
{
return subop("UNSUBSCRIBE", channels);
}
int redis_pubsub::psubscribe(const char* first_pattern, ...)
{
std::vector<const char*> patterns;
patterns.push_back(first_pattern);
va_list ap;
va_start(ap, first_pattern);
const char* pattern;
while ((pattern = va_arg(ap, const char*)) != NULL)
patterns.push_back(pattern);
va_end(ap);
return psubscribe(patterns);
}
int redis_pubsub::psubscribe(const std::vector<const char*>& patterns)
{
return subop("PSUBSCRIBE", patterns);
}
int redis_pubsub::psubscribe(const std::vector<string>& patterns)
{
return subop("PSUBSCRIBE", patterns);
}
int redis_pubsub::punsubscribe(const char* first_pattern, ...)
{
std::vector<const char*> patterns;
patterns.push_back(first_pattern);
va_list ap;
va_start(ap, first_pattern);
const char* pattern;
while ((pattern = va_arg(ap, const char*)) != NULL)
patterns.push_back(pattern);
va_end(ap);
return punsubscribe(patterns);
}
int redis_pubsub::punsubscribe(const std::vector<const char*>& patterns)
{
return subop("PUNSUBSCRIBE", patterns);
}
int redis_pubsub::punsubscribe(const std::vector<string>& patterns)
{
return subop("PUNSUBSCRIBE", patterns);
}
int redis_pubsub::subop(const char* cmd, const std::vector<const char*>& channels)
{
size_t argc = 1 + channels.size();
dbuf_pool* pool = conn_->get_pool();
const char** argv = (const char**)
pool->dbuf_alloc(argc * sizeof(char*));
size_t* lens = (size_t *) pool->dbuf_alloc(argc * sizeof(size_t));
argv[0] = cmd;
lens[0] = strlen(cmd);
std::vector<const char*>::const_iterator cit = channels.begin();
for (size_t i = 1; cit != channels.end(); ++cit, ++i)
{
argv[i] = *cit;
lens[i] = strlen(argv[i]);
}
const string& req = conn_->build_request(argc, argv, lens);
const redis_result* result = conn_->run(req, channels.size());
if (result == NULL || result->get_type() != REDIS_RESULT_ARRAY)
return -1;
size_t size = channels.size();
int nchannels = 0, ret;
for (size_t i = 0; i < size; i++)
{
const redis_result* obj = result->get_child(i);
if (obj == NULL)
return -1;
if (( ret = check_channel(obj, argv[0], channels[i])) < 0)
return -1;
if (ret > nchannels)
nchannels = ret;
}
return nchannels;
}
int redis_pubsub::subop(const char* cmd, const std::vector<string>& channels)
{
size_t argc = 1 + channels.size();
@ -175,4 +272,113 @@ bool redis_pubsub::get_message(string& channel, string& msg)
return true;
}
int redis_pubsub::pubsub_channels(std::vector<string>& channels,
const char* first_pattern, ...)
{
std::vector<const char*> patterns;
if (first_pattern)
{
patterns.push_back(NULL);
va_list ap;
va_start(ap, first_pattern);
const char* pattern;
while ((pattern = va_arg(ap, const char*)) != NULL)
patterns.push_back(pattern);
va_end(ap);
}
return pubsub_channels(patterns, channels);
}
int redis_pubsub::pubsub_channels(const std::vector<const char*>& patterns,
std::vector<string>& channels)
{
const string& req = conn_->build("PUBSUB", "CHANNELS", patterns);
return conn_->get_strings(req, channels);
}
int redis_pubsub::pubsub_channels(const std::vector<string>& patterns,
std::vector<string>& channels)
{
const string& req = conn_->build("PUBSUB", "CHANNELS", patterns);
return conn_->get_strings(req, channels);
}
int redis_pubsub::pubsub_numsub(std::map<string, int>& out,
const char* first_channel, ...)
{
std::vector<const char*> channels;
if (first_channel != NULL)
{
channels.push_back(first_channel);
const char* channel;
va_list ap;
va_start(ap, first_channel);
while ((channel = va_arg(ap, const char*)) != NULL)
channels.push_back(channel);
}
return pubsub_numsub(channels, out);
}
int redis_pubsub::pubsub_numsub(const std::vector<const char*>& channels,
std::map<string, int>& out)
{
const string& req = conn_->build("PUBSUB", "NUMSUB", channels);
return pubsub_numsub(req, out);
}
int redis_pubsub::pubsub_numsub(const std::vector<string>& channels,
std::map<string, int>& out)
{
const string& req = conn_->build("PUBSUB", "NUMSUB", channels);
return pubsub_numsub(req, out);
}
int redis_pubsub::pubsub_numsub(const string& req, std::map<string, int>& out)
{
const redis_result* result = conn_->run(req);
if (result == NULL)
return -1;
size_t size;
const redis_result** children = result->get_children(&size);
if (children == NULL || size == 0)
return 0;
if (size % 2 != 0)
return -1;
string buf(128);
const redis_result* rr;
for (size_t i = 0; i < size;)
{
rr = children[i];
rr->argv_to_string(buf);
i++;
rr = children[i];
out[buf] = rr->get_integer();
buf.clear();
}
return size / 2;
}
int redis_pubsub::pubsub_numpat()
{
const char* argv[2];
size_t lens[2];
argv[0] = "PUBSUB";
lens[0] = sizeof("PUBSUB") - 1;
argv[1] = "NUMPAT";
lens[1] = sizeof("NUMPAT") - 1;
const string& req = conn_->build_request(2, argv, lens);
return conn_->get_number(req);
}
} // namespace acl

View File

@ -1,5 +1,7 @@
#include "acl_stdafx.hpp"
#include "acl_cpp/stdlib/snprintf.hpp"
#include "acl_cpp/stdlib/string.hpp"
#include "acl_cpp/redis/redis_result.hpp"
#include "acl_cpp/redis/redis_client.hpp"
#include "acl_cpp/redis/redis_set.hpp"
@ -405,4 +407,31 @@ int redis_set::srem(const char* key, const char* members[],
return conn_->get_number(req);
}
int redis_set::sscan(const char* key, int cursor, std::vector<string>& out,
const char* pattern /* = NULL */, const size_t* count /* = NULL */)
{
if (key == NULL || *key == 0 || cursor < 0)
return -1;
size_t size;
const redis_result** children = scan_keys("SSCAN", key, cursor,
size, pattern, count);
if (children == NULL)
return cursor;
const redis_result* rr;
string key_buf(128);
out.reserve(size);
for (size_t i = 0; i < size; i++)
{
rr = children[i];
rr->argv_to_string(key_buf);
out.push_back(key_buf);
key_buf.clear();
}
return cursor;
}
} // namespace acl

View File

@ -431,16 +431,18 @@ int redis_zset::zrange_get_with_scores(const char* cmd, const char* key,
child = children[i * 2];
if (child == NULL)
continue;
child->argv_to_string(buf);
score = atof(buf.c_str());
buf.clear();
child = children[(i + 1) * 2];
if (child == NULL)
continue;
buf.clear();
child->argv_to_string(buf);
child->argv_to_string(buf);
out.push_back(std::make_pair(buf, score));
buf.clear();
}
return size;
@ -578,16 +580,18 @@ int redis_zset::zrangebyscore_get_with_scores(const char* cmd,
child = children[i * 2];
if (child == NULL)
continue;
child->argv_to_string(buf);
score = atof(buf.c_str());
buf.clear();
child = children[(i + 1) * 2];
if (child == NULL)
continue;
buf.clear();
child->argv_to_string(buf);
out.push_back(std::make_pair(buf, score));
buf.clear();
}
return size;
@ -1050,4 +1054,38 @@ int redis_zset::zremrangebylex(const char* key, const char* min, const char* max
return conn_->get_number(req);
}
int redis_zset::zscan(const char* key, int cursor, std::map<string, double>& out,
const char* pattern /* = NULL */, const size_t* count /* = NULL */)
{
if (key == NULL || *key == 0 || cursor < 0)
return -1;
size_t size;
const redis_result** children = scan_keys("ZSCAN", key, cursor,
size, pattern, count);
if (children == NULL)
return cursor;
if (size % 2 != 0)
return -1;
const redis_result* rr;
string name(128), value(128);
for (size_t i = 0; i < size;)
{
rr = children[i];
rr->argv_to_string(name);
i++;
rr = children[i];
rr->argv_to_string(value);
i++;
out[name] = atof(value.c_str());
}
return cursor;
}
} // namespace acl