Merge branch 'gitee-master' into gitlab-upstream

This commit is contained in:
zhengshuxin 2023-07-26 15:51:43 +08:00
commit cefa09addb
12 changed files with 405 additions and 294 deletions

View File

@ -88,11 +88,19 @@ add_definitions(
#find_library(fiber_cpp_lib fiber_cpp PATHS /usr/lib /usr/local/lib)
#find_library(fiber_lib fiber PATHS /usr/lib /usr/local/lib)
#find_library(rocksdb_lib rocksdb PATHS /usr/lib /usr/local/lib)
set(acl_all ${home_path}/lib_fiber/lib/libfiber_cpp.a ${home_path}/libacl_all.a)
set(fiber ${home_path}/lib_fiber/lib/libfiber.a)
set(rocksdb /usr/local/lib/librocksdb.a)
find_library(rocksdb_lib rocksdb PATHS /usr/lib /usr/local/lib)
#set(rocksdb /usr/local/lib/librocksdb.a)
set(rocksdb -lrocksdb)
find_library(jemalloc jemalloc PATHS /usr/lib /usr/local/lib)
set(jemalloc -ljemalloc)
#set(wt -lwiredtiger)
#set(uring -luring)
#set(iconv -liconv)
if(CMAKE_SYSTEM_NAME MATCHES "Darwin")
# set(lib_all ${lib_global} ${lib_rpc} ${fiber_cpp_lib} ${acl_lib} ${fiber_lib}
@ -101,7 +109,7 @@ if(CMAKE_SYSTEM_NAME MATCHES "Darwin")
elseif(CMAKE_SYSTEM_NAME MATCHES "Linux")
# set(lib_all ${lib_global} ${lib_rpc} ${fiber_cpp_lib} ${acl_lib} ${fiber_lib}
set(lib_all ${acl_all}
${rocksdb} -lwiredtiger ${fiber} -luring -liconv -lz -lpthread -ldl)
${rocksdb} ${wt} ${fiber} ${uring} ${iconv} ${jemalloc} -lz -lpthread -ldl)
endif()
set(output_path ${CMAKE_CURRENT_SOURCE_DIR})

View File

@ -24,6 +24,19 @@ bool redis_handler::handle() {
if (objs.empty()) {
return true;
}
#if 0
{
acl::string tmp;
for (size_t i = 0; i < objs.size(); i++) {
tmp += "+OK\r\n";
}
return conn_.write(tmp.c_str(), tmp.size()) == (int) tmp.size();
}
#endif
//if (objs.size() >= 20) { printf(">>>objs=%zd\r\n", objs.size()); }
for (const auto& obj : objs) {
if (!handle_one(*obj)) {
return false;
@ -32,8 +45,15 @@ bool redis_handler::handle() {
acl::string buf;
if (!builder_.to_string(buf)) {
builder_.clear();
return false;
}
builder_.clear();
//if (objs.size() >= 20) { printf("reply len=%zd\r\n", buf.size()); }
//printf(">>>buf=[%s]\r\n", buf.c_str());
return conn_.write(buf) != -1;
}
@ -45,8 +65,17 @@ bool redis_handler::handle_one(const redis_object &obj) {
}
//printf(">>>cmd=%s\r\n", cmd);
if (EQ(cmd, "HSET")) {
//
//
if (EQ(cmd, "SET")) {
return set(obj);
} else if (EQ(cmd, "GET")) {
return get(obj);
} else if (EQ(cmd, "DEL")) {
return del(obj);
} else if (EQ(cmd, "TYPE")) {
return type(obj);
} else if (EQ(cmd, "HSET")) {
return hset(obj);
} else if (EQ(cmd, "HGET")) {
return hget(obj);
@ -58,14 +87,6 @@ bool redis_handler::handle_one(const redis_object &obj) {
return hmget(obj);
} else if (EQ(cmd, "HGETALL")) {
return hgetall(obj);
} else if (EQ(cmd, "SET")) {
return set(obj);
} else if (EQ(cmd, "GET")) {
return get(obj);
} else if (EQ(cmd, "DEL")) {
return del(obj);
} else if (EQ(cmd, "TYPE")) {
return type(obj);
}
acl::string err;
@ -75,6 +96,118 @@ bool redis_handler::handle_one(const redis_object &obj) {
return true;
}
bool redis_handler::set(const redis_object &obj) {
auto& objs = obj.get_objects();
if (objs.size() < 3) {
logger_error("invalid SET params' size=%zd", objs.size());
return false;
}
auto key = objs[1]->get_str();
if (key == nullptr || *key == 0) {
logger_error("key null");
return false;
}
auto value = objs[2]->get_str();
if (value == nullptr || *value == 0) {
logger_error("value null");
return false;
}
#if 1
acl::string buff;
coder_.create_object().set_string(value);
coder_.to_string(buff);
coder_.clear();
# if 1
if (!db_->set(key, buff.c_str())) {
logger_error("db set error, key=%s", key);
return false;
}
# endif
#endif
builder_.create_object().set_status("OK");
return true;
}
bool redis_handler::get(const redis_object &obj) {
auto& objs = obj.get_objects();
if (objs.size() < 2) {
logger_error("invalid GET params' size=%zd", objs.size());
return false;
}
auto key = objs[1]->get_str();
if (key == nullptr || *key == 0) {
logger_error("key null");
return false;
}
std::string buff;
if (!db_->get(key, buff) || buff.empty()) {
logger_error("db get error, key=%s", key);
return false;
}
redis_coder builder;
size_t len = buff.size();
(void) builder.update(buff.c_str(), len);
if (len > 0) {
logger_error("invalid buff in db, key=%s", key);
return false;
}
auto& objs2 = builder.get_objects();
if (objs2.size() != 1) {
logger_error("invalid object in db, key=%s, size=%zd", key, objs2.size());
return false;
}
auto o = objs2[0];
if (o->get_type() != REDIS_OBJ_STRING) {
logger_error("invalid object type=%d, key=%s", (int) o->get_type(), key);
return false;
}
auto v = o->get_str();
if (v == nullptr || *v == 0) {
logger_error("value null, key=%s", key);
return false;
}
builder_.create_object().set_string(v);
return true;
}
bool redis_handler::del(const redis_object &obj) {
auto& objs = obj.get_objects();
if (objs.size() < 2) {
logger_error("invalid SET params' size=%zd", objs.size());
return false;
}
auto key = objs[1]->get_str();
if (key == nullptr || *key == 0) {
logger_error("key null");
return false;
}
if (!db_->del(key)) {
logger_error("db del error, key=%s", key);
return false;
}
builder_.create_object().set_number(1);
return true;
}
bool redis_handler::type(const redis_object &obj) {
builder_.create_object().set_status("string");
return true;
}
bool redis_handler::hset(const redis_object &obj) {
auto& objs = obj.get_objects();
if (objs.size() < 4) {
@ -163,7 +296,7 @@ bool redis_handler::hget(const redis_object &obj) {
}
auto array = objs2[0];
if (array->get_type() != acl::REDIS_RESULT_ARRAY) {
if (array->get_type() != REDIS_OBJ_ARRAY) {
logger_error("invalid array object, key=%s", key);
return false;
}
@ -209,113 +342,4 @@ bool redis_handler::hgetall(const redis_object &obj) {
return true;
}
bool redis_handler::set(const redis_object &obj) {
auto& objs = obj.get_objects();
if (objs.size() < 3) {
logger_error("invalid SET params' size=%zd", objs.size());
return false;
}
auto key = objs[1]->get_str();
if (key == nullptr || *key == 0) {
logger_error("key null");
return false;
}
auto value = objs[2]->get_str();
if (value == nullptr || *value == 0) {
logger_error("value null");
return false;
}
redis_coder builder;
builder.create_object().set_string(value);
acl::string buff;
builder.to_string(buff);
if (!db_->set(key, buff.c_str())) {
logger_error("db set error, key=%s", key);
return false;
}
builder_.create_object().set_status("OK");
return true;
}
bool redis_handler::get(const redis_object &obj) {
auto& objs = obj.get_objects();
if (objs.size() < 2) {
logger_error("invalid GET params' size=%zd", objs.size());
return false;
}
auto key = objs[1]->get_str();
if (key == nullptr || *key == 0) {
logger_error("key null");
return false;
}
std::string buff;
if (!db_->get(key, buff) || buff.empty()) {
logger_error("db get error, key=%s", key);
return false;
}
redis_coder builder;
size_t len = buff.size();
(void) builder.update(buff.c_str(), len);
if (len > 0) {
logger_error("invalid buff in db, key=%s", key);
return false;
}
auto& objs2 = builder.get_objects();
if (objs2.size() != 1) {
logger_error("invalid object in db, key=%s, size=%zd", key, objs2.size());
return false;
}
auto o = objs2[0];
if (o->get_type() != acl::REDIS_RESULT_STRING) {
logger_error("invalid object type=%d, key=%s", (int) o->get_type(), key);
return false;
}
auto v = o->get_str();
if (v == nullptr || *v == 0) {
logger_error("value null, key=%s", key);
return false;
}
builder_.create_object().set_string(v);
return true;
}
bool redis_handler::del(const redis_object &obj) {
auto& objs = obj.get_objects();
if (objs.size() < 2) {
logger_error("invalid SET params' size=%zd", objs.size());
return false;
}
auto key = objs[1]->get_str();
if (key == nullptr || *key == 0) {
logger_error("key null");
return false;
}
if (!db_->del(key)) {
logger_error("db del error, key=%s", key);
return false;
}
builder_.create_object().set_number(1);
return true;
}
bool redis_handler::type(const redis_object &obj) {
builder_.create_object().set_status("string");
return true;
}
} // namespace pkv

View File

@ -23,19 +23,21 @@ private:
shared_db& db_;
const redis_coder& parser_;
redis_coder builder_;
redis_coder coder_;
acl::socket_stream& conn_;
bool handle_one(const redis_object& obj);
bool set(const redis_object& obj);
bool get(const redis_object& obj);
bool del(const redis_object& obj);
bool type(const redis_object& obj);
bool hset(const redis_object& obj);
bool hget(const redis_object& obj);
bool hdel(const redis_object& obj);
bool hmset(const redis_object& obj);
bool hmget(const redis_object& obj);
bool hgetall(const redis_object& obj);
bool set(const redis_object& obj);
bool get(const redis_object& obj);
bool del(const redis_object& obj);
bool type(const redis_object& obj);
};
}

View File

@ -6,25 +6,27 @@
static char *var_cfg_dbpath;
acl::master_str_tbl var_conf_str_tab[] = {
{ "dbpath", "./dbpath", &var_cfg_dbpath },
{ "dbpath", "./dbpath", &var_cfg_dbpath },
{ 0, 0, 0 }
{ 0, 0, 0 }
};
static int var_cfg_debug_enable;
acl::master_bool_tbl var_conf_bool_tab[] = {
{ "debug_enable", 1, &var_cfg_debug_enable },
{ "debug_enable", 1, &var_cfg_debug_enable },
{ 0, 0, 0 }
{ 0, 0, 0 }
};
static int var_cfg_io_timeout;
static int var_cfg_buf_size;
acl::master_int_tbl var_conf_int_tab[] = {
{ "io_timeout", 120, &var_cfg_io_timeout, 0, 0 },
{ "io_timeout", 120, &var_cfg_io_timeout, 0, 0 },
{ "buf_size", 8192, &var_cfg_buf_size, 0, 0 },
{ 0, 0 , 0 , 0, 0 }
{ 0, 0 , 0 , 0, 0 }
};
acl::master_int64_tbl var_conf_int64_tab[] = {
@ -37,20 +39,26 @@ acl::master_int64_tbl var_conf_int64_tab[] = {
using namespace pkv;
void master_service::on_accept(acl::socket_stream& conn) {
//logger(">>>accept connection: %d", conn.sock_handle());
//conn.set_rw_timeout(var_cfg_io_timeout);
logger(">>>accept connection: %d", conn.sock_handle());
run(conn, var_cfg_buf_size);
logger("Disconnect from peer, fd=%d", conn.sock_handle());
}
void master_service::run(acl::socket_stream& conn, size_t size) {
pkv::redis_coder parser;
char buf[8192];
size_t n = sizeof(buf) - 1;
pkv::redis_handler handler(db_, parser, conn);
char buf[size];
while(true) {
int ret = conn.read(buf, sizeof(n) - 1, false);
int ret = conn.read(buf, sizeof(buf) - 1, false);
if (ret <= 0) {
break;
}
buf[ret] = 0;
//printf("[%s]\r\n", buf);
size_t len = (size_t) ret;
const char* data = parser.update(buf, len);
@ -60,10 +68,8 @@ void master_service::on_accept(acl::socket_stream& conn) {
break;
}
//printf("len=%zd, data=%s\r\n", len, data);
assert(*data == '\0' && len == 0);
pkv::redis_handler handler(db_, parser, conn);
if (!handler.handle()) {
break;
}

View File

@ -4,28 +4,30 @@
class master_service : public acl::master_fiber {
public:
master_service() = default;
~master_service() override = default;
master_service() = default;
~master_service() override = default;
protected:
// @override
void on_accept(acl::socket_stream& conn) override;
// @override
void on_accept(acl::socket_stream& conn) override;
// @override
void proc_on_listen(acl::server_socket& ss) override;
// @override
void proc_on_listen(acl::server_socket& ss) override;
// @override
void proc_pre_jail() override;
// @override
void proc_pre_jail() override;
// @override
void proc_on_init() override;
// @override
void proc_on_init() override;
// @override
void proc_on_exit() override;
// @override
void proc_on_exit() override;
// @override
bool proc_on_sighup(acl::string&) override;
// @override
bool proc_on_sighup(acl::string&) override;
private:
pkv::shared_db db_;
pkv::shared_db db_;
void run(acl::socket_stream& conn, size_t size);
};

View File

@ -88,7 +88,7 @@ service pkv
# 每个进程实例的空闲超时时间超过此值后进程实例主动退出
fiber_idle_limit = 0
# 每个进程启动的线程数
fiber_threads = 8
fiber_threads = 1
# 进程运行时所在的路径
fiber_queue_dir = {install_path}/var
# 读写超时时间, 单位为秒
@ -105,7 +105,7 @@ service pkv
# fiber_dispatch_type = default
# 线程的堆栈空间大小单位为字节
fiber_stack_size = 64000
fiber_stack_size = 256000
# 允许访问 udserver 的客户端IP地址范围
# fiber_access_allow = 127.0.0.1:255.255.255.255, 127.0.0.1:127.0.0.1
fiber_access_allow = all
@ -144,4 +144,5 @@ service pkv
# debug_mem = 1
# 是否在一个线程中连接读
# loop_read = 1
buf_size = 8192
}

View File

@ -7,11 +7,33 @@
namespace pkv {
redis_coder::redis_coder() {
curr_ = std::make_shared<redis_object>(nullptr);
redis_coder::redis_coder(size_t cache_max) {
cache_max_ = cache_max;
curr_ = new redis_object(cache_, cache_max_);
}
redis_coder::~redis_coder() {
for (auto obj : objs_) {
delete obj;
}
for (auto obj : cache_) {
delete obj;
}
delete curr_;
}
void redis_coder::clear() {
for (auto obj : objs_) {
if (cache_.size() < cache_max_) {
obj->reset();
cache_.emplace_back(obj);
} else {
delete obj;
}
}
objs_.clear();
}
@ -19,8 +41,14 @@ const char* redis_coder::update(const char* data, size_t& len) {
while (len > 0) {
data = curr_->update(data, len);
if (curr_->finish()) {
objs_.push_back(curr_);
curr_ = std::make_shared<redis_object>(nullptr);
objs_.emplace_back(curr_);
if (cache_.empty()) {
curr_ = new redis_object(cache_, cache_max_);
} else {
curr_ = cache_.back();
cache_.pop_back();
}
} else if (curr_->failed()) {
break;
}
@ -30,7 +58,15 @@ const char* redis_coder::update(const char* data, size_t& len) {
}
redis_object& redis_coder::create_object() {
auto obj = std::make_shared<redis_object>(nullptr);
redis_object* obj;
if (cache_.empty()) {
obj = new redis_object(cache_, cache_max_);
} else {
obj = cache_.back();
cache_.pop_back();
}
objs_.emplace_back(obj);
return *obj;
}
@ -195,4 +231,4 @@ bool test_redis_build() {
//////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////
} // namespace pkv
} // namespace pkv

View File

@ -4,22 +4,23 @@
#pragma once
#include <vector>
#include "redis_object.h"
namespace pkv {
class redis_coder {
public:
redis_coder();
~redis_coder() = default;
redis_coder(size_t cache_max = 10000);
~redis_coder();
const char* update(const char* data, size_t& len);
[[nodiscard]] const std::vector<shared_redis>& get_objects() const {
[[nodiscard]] const std::vector<redis_object*>& get_objects() const {
return objs_;
}
[[nodiscard]] shared_redis get_curr() const {
[[nodiscard]] redis_object* get_curr() const {
return curr_;
}
@ -31,8 +32,10 @@ public:
bool to_string(acl::string& out) const;
private:
std::vector<shared_redis> objs_;
shared_redis curr_;
std::vector<redis_object*> objs_;
std::vector<redis_object*> cache_;
size_t cache_max_;
redis_object* curr_;
};
bool test_redis_parse(const char* filepath);

View File

@ -7,25 +7,51 @@
namespace pkv {
redis_object::redis_object(redis_object* parent) {
dbuf_ = new (1) acl::dbuf_pool();
parent_ = parent ? parent : this;
redis_object::redis_object(std::vector<redis_object*>& cache, size_t cache_max)
: parent_(this)
, cache_max_(cache_max)
, cache_(cache)
{
}
redis_object::~redis_object() {
dbuf_->destroy();
for (auto obj : objs_) {
delete obj;
}
}
void redis_object::set_parent(redis_object* parent) {
if (parent) {
parent_ = parent;
}
}
void redis_object::reset() {
for (auto obj : objs_) {
if (cache_.size() < cache_max_) {
obj->reset();
cache_.emplace_back(obj);
} else {
delete obj;
}
}
status_ = redis_s_begin;
type_ = REDIS_OBJ_UNKOWN;
parent_ = this;
obj_ = nullptr;
cnt_ = 0;
objs_.clear();
buf_.clear();
}
const char* redis_object::get_cmd() const {
if (me_ == nullptr) {
return nullptr;
if (type_ == REDIS_OBJ_STRING) {
return buf_.c_str();
}
if (me_->get_type() == acl::REDIS_RESULT_STRING) {
return me_->get(0);
}
if (objs_.empty() || me_->get_type() != acl::REDIS_RESULT_ARRAY) {
if (objs_.empty() || type_ != REDIS_OBJ_ARRAY) {
return nullptr;
}
@ -33,8 +59,8 @@ const char* redis_object::get_cmd() const {
}
const char* redis_object::get_str() const {
if (me_->get_type() == acl::REDIS_RESULT_STRING) {
return me_->get(0);
if (type_ == REDIS_OBJ_STRING) {
return buf_.c_str();
}
return nullptr;
@ -97,11 +123,17 @@ const char* redis_object::parse_object(const char* data, size_t& len) {
if (objs_.size() == (size_t) cnt_) {
obj_ = nullptr;
cnt_ = 0;
//cnt_ = 0;
status_ = redis_s_finish;
} else if (cache_.empty()) {
obj_ = new redis_object(cache_, cache_max_);
obj_->set_parent(this);
} else {
obj_ = std::make_shared<redis_object>(this);
obj_ = cache_.back();
obj_->set_parent(this);
cache_.pop_back();
}
return data;
}
@ -138,7 +170,7 @@ const char* redis_object::parse_begin(const char* data, size_t& len) {
const char* redis_object::parse_status(const char* data, size_t& len) {
bool found = false;
data = get_line(data, len, found);
data = get_line(data, len, buf_, found);
if (!found) {
assert(len == 0);
return data;
@ -149,19 +181,14 @@ const char* redis_object::parse_status(const char* data, size_t& len) {
return data;
}
me_ = new(dbuf_) acl::redis_result(dbuf_);
me_->set_type(acl::REDIS_RESULT_STATUS);
me_->set_size(1);
put_data(dbuf_, me_, buf_.c_str(), buf_.length());
buf_.clear();
type_ = REDIS_OBJ_STATUS;
status_ = redis_s_finish;
return data;
}
const char* redis_object::parse_error(const char* data, size_t& len) {
bool found = false;
data = get_line(data, len, found);
data = get_line(data, len, buf_, found);
if (!found) {
assert(len == 0);
return data;
@ -172,19 +199,14 @@ const char* redis_object::parse_error(const char* data, size_t& len) {
return data;
}
me_ = new(dbuf_) acl::redis_result(dbuf_);
me_->set_type(acl::REDIS_RESULT_ERROR);
me_->set_size(1);
put_data(dbuf_, me_, buf_.c_str(), buf_.length());
buf_.clear();
type_ = REDIS_OBJ_ERROR;
status_ = redis_s_finish;
return data;
}
const char* redis_object::parse_number(const char* data, size_t& len) {
bool found = false;
data = get_line(data, len, found);
data = get_line(data, len, buf_, found);
if (!found) {
assert(len == 0);
return data;
@ -195,12 +217,7 @@ const char* redis_object::parse_number(const char* data, size_t& len) {
return data;
}
me_ = new(dbuf_) acl::redis_result(dbuf_);
me_->set_type(acl::REDIS_RESULT_INTEGER);
me_->set_size(1);
put_data(dbuf_, me_, buf_.c_str(), buf_.length());
buf_.clear();
type_ = REDIS_OBJ_INTEGER;
status_ = redis_s_finish;
return data;
}
@ -213,43 +230,33 @@ const char* redis_object::parse_strlen(const char* data, size_t& len) {
return data;
}
me_ = new(dbuf_) acl::redis_result(dbuf_);
me_->set_type(acl::REDIS_RESULT_STRING);
if (cnt_ <= 0) {
status_ = redis_s_finish;
return data;
}
me_->set_size((size_t) cnt_);
buf_.clear();
type_ = REDIS_OBJ_STRING;
status_ = redis_s_string;
return data;
}
const char* redis_object::parse_string(const char* data, size_t& len) {
assert (me_ != nullptr);
buf_.reserve((size_t) cnt_);
data = get_data(data, len, (size_t) cnt_);
if (buf_.size() == (size_t) cnt_) {
status_ = redis_s_strend;
put_data(dbuf_, me_, buf_.c_str(), buf_.length());
buf_.clear();
cnt_ = 0;
}
return data;
}
const char* redis_object::parse_strend(const char* data, size_t& len) {
assert (me_ != nullptr);
bool found = false;
data = get_line(data, len, found);
std::string buf;
data = get_line(data, len, buf, found);
// If the buf_ not empty, some data other '\r\n' got.
if (!buf_.empty()) {
if (!buf.empty()) {
status_ = redis_s_null;
return data;
}
@ -271,24 +278,27 @@ const char* redis_object::parse_arlen(const char* data, size_t& len) {
return data;
}
me_ = new(dbuf_) acl::redis_result(dbuf_);
me_->set_type(acl::REDIS_RESULT_ARRAY);
if (cnt_ <= 0) {
status_ = redis_s_finish;
return data;
}
me_->set_size((size_t) cnt_);
buf_.clear();
type_ = REDIS_OBJ_ARRAY;
status_ = redis_s_array;
obj_ = std::make_shared<redis_object>(this);
if (cache_.empty()) {
obj_ = new redis_object(cache_, cache_max_);
obj_->set_parent(this);
} else {
obj_ = cache_.back();
obj_->set_parent(this);
cache_.pop_back();
}
return data;
}
const char* redis_object::parse_array(const char* data, size_t& len) {
assert(me_ != nullptr);
assert(obj_ != nullptr);
return parse_object(data, len);
@ -299,16 +309,30 @@ const char* redis_object::get_data(const char* data, size_t& len, size_t want) {
assert(n < want);
want -= n;
#if 1
if (want > len) {
want = len;
len = 0;
} else {
len -= want;
}
buf_.append(data, want);
data += want;
#else
for (size_t i = 0; i < want && len > 0; i++) {
buf_.push_back(*data++);
len--;
}
#endif
return data;
}
const char* redis_object::get_length(const char* data, size_t& len,
int& res, bool& found) {
data = get_line(data, len, found);
data = get_line(data, len, buf_, found);
if (!found) {
assert(len == 0);
return data;
@ -331,7 +355,8 @@ const char* redis_object::get_length(const char* data, size_t& len,
return data;
}
const char* redis_object::get_line(const char* data, size_t& len, bool& found) {
const char* redis_object::get_line(const char* data, size_t& len,
std::string& buf, bool& found) {
while (len > 0) {
switch (*data) {
case '\r':
@ -344,7 +369,7 @@ const char* redis_object::get_line(const char* data, size_t& len, bool& found) {
found = true;
return data;
default:
buf_.push_back(*data++);
buf.push_back(*data++);
len--;
break;
}
@ -352,17 +377,6 @@ const char* redis_object::get_line(const char* data, size_t& len, bool& found) {
return data;
}
void redis_object::put_data(acl::dbuf_pool* dbuf, acl::redis_result* rr,
const char* data, size_t len) {
char* buf = (char*) dbuf->dbuf_alloc(len + 1);
if (len > 0) {
memcpy(buf, data, len);
}
buf[len] = 0;
rr->put(buf, len);
}
bool redis_object::to_string(acl::string& out) const {
#define USE_UNIX_CRLF
#ifdef USE_UNIX_CRLF
@ -381,83 +395,80 @@ bool redis_object::to_string(acl::string& out) const {
}
}
if (me_ == nullptr) {
return false;
}
//assert(!buf_.empty());
switch (me_->get_type()) {
case acl::REDIS_RESULT_STATUS:
out.format_append("+%s%s", me_->get_status(), CRLF);
switch (type_) {
case REDIS_OBJ_STATUS:
out.format_append("+%s%s", buf_.c_str(), CRLF);
break;
case acl::REDIS_RESULT_ERROR:
out.format_append("-%s%s", me_->get_error(), CRLF);
case REDIS_OBJ_ERROR:
out.format_append("-%s%s", buf_.c_str(), CRLF);
break;
case acl::REDIS_RESULT_INTEGER:
out.format_append(":%lld%s", me_->get_integer64(), CRLF);
case REDIS_OBJ_INTEGER:
out.format_append(":%s%s", buf_.c_str(), CRLF);
break;
case acl::REDIS_RESULT_STRING:
out.format_append("$%zd%s", me_->get_length(), CRLF);
me_->argv_to_string(out, false);
out += CRLF;
case REDIS_OBJ_STRING:
out.format_append("$%zd%s%s%s", buf_.size(), CRLF, buf_.c_str(), CRLF);
break;
//case acl::REDIS_RESULT_ARRAY:
// break;
default:
break;
}
return true;
}
redis_object& redis_object::set_status(const std::string& data,
bool return_parent) {
me_ = new(dbuf_) acl::redis_result(dbuf_);
me_->set_type(acl::REDIS_RESULT_STATUS);
me_->set_size(1);
put_data(dbuf_, me_, data.c_str(), data.length());
type_ = REDIS_OBJ_STATUS;
buf_ = data;
return return_parent ? *parent_ : *this;
}
redis_object& redis_object::set_error(const std::string& data,
bool return_parent) {
me_ = new(dbuf_) acl::redis_result(dbuf_);
me_->set_type(acl::REDIS_RESULT_ERROR);
me_->set_size(1);
put_data(dbuf_, me_, data.c_str(), data.length());
type_ = REDIS_OBJ_ERROR;
buf_ = data;
return return_parent ? *parent_ : *this;
}
redis_object& redis_object::set_number(int n, bool return_parent) {
me_ = new(dbuf_) acl::redis_result(dbuf_);
me_->set_type(acl::REDIS_RESULT_INTEGER);
me_->set_size(1);
type_ = REDIS_OBJ_INTEGER;
std::string buf = std::to_string(n);
put_data(dbuf_, me_, buf.c_str(), buf.length());
buf_ = buf;
return return_parent ? *parent_ : *this;
}
redis_object& redis_object::set_string(const std::string &data,
bool return_parent) {
me_ = new(dbuf_) acl::redis_result(dbuf_);
me_->set_type(acl::REDIS_RESULT_STRING);
me_->set_size(data.size());
type_ = REDIS_OBJ_STRING;
if (!data.empty()) {
put_data(dbuf_, me_, data.c_str(), data.length());
buf_ = data;
}
return return_parent ? *parent_ : *this;
}
redis_object& redis_object::create_child() {
auto obj = std::make_shared<redis_object>(this);
objs_.emplace_back(obj);
if (me_ == nullptr) {
// The last one is NULL.
me_ = new(dbuf_) acl::redis_result(dbuf_);
me_->set_type(acl::REDIS_RESULT_ARRAY);
redis_object* obj;
if (cache_.empty()) {
obj = new redis_object(cache_, cache_max_);
obj->set_parent(this);
objs_.emplace_back(obj);
} else {
obj = cache_.back();
obj->set_parent(this);
cache_.pop_back();
objs_.emplace_back(obj);
}
me_->set_size(objs_.size());
if (obj_ == nullptr) {
// The last one is NULL.
type_ = REDIS_OBJ_ARRAY;
}
cnt_ = objs_.size();
return *obj;
}

View File

@ -4,6 +4,8 @@
#pragma once
#include <string>
#include <vector>
#include "redis_type.h"
namespace pkv {
@ -11,14 +13,27 @@ namespace pkv {
class redis_object;
using shared_redis = std::shared_ptr<redis_object>;
typedef enum {
REDIS_OBJ_UNKOWN,
REDIS_OBJ_NIL,
REDIS_OBJ_ERROR,
REDIS_OBJ_STATUS,
REDIS_OBJ_INTEGER,
REDIS_OBJ_STRING,
REDIS_OBJ_ARRAY,
} redis_obj_t;
class redis_object {
public:
explicit redis_object(redis_object* parent);
explicit redis_object(std::vector<redis_object*>& cache, size_t cache_max);
~redis_object();
redis_object(const redis_object&) = delete;
void operator=(const redis_object&) = delete;
void set_parent(redis_object* parent);
void reset();
public:
const char* update(const char* data, size_t& len);
@ -34,15 +49,15 @@ public:
return status_;
}
[[nodiscard]] acl::redis_result_t get_type() const {
return me_ ? me_->get_type() : acl::REDIS_RESULT_UNKOWN;
[[nodiscard]] redis_obj_t get_type() const {
return type_;
}
[[nodiscard]] const char* get_cmd() const;
[[nodiscard]] const char* get_str() const;
[[nodiscard]] const std::vector<shared_redis>& get_objects() const {
[[nodiscard]] const std::vector<redis_object*>& get_objects() const {
return objs_;
}
@ -56,19 +71,21 @@ public:
bool to_string(acl::string& out) const;
private:
acl::dbuf_pool* dbuf_;
redis_object* parent_;
int status_ = redis_s_begin;
acl::redis_result* me_ = nullptr;
std::vector<shared_redis> objs_;
redis_obj_t type_ = REDIS_OBJ_UNKOWN;
redis_object* parent_ = nullptr;
redis_object* obj_ = nullptr;
std::string buf_;
int cnt_ = 0;
shared_redis obj_ = nullptr;
size_t cache_max_;
std::vector<redis_object*>& cache_;
std::vector<redis_object*> objs_;
private:
static void put_data(acl::dbuf_pool*, acl::redis_result*, const char*, size_t);
const char* get_line(const char*, size_t&, bool&);
const char* get_line(const char*, size_t&, std::string&, bool&);
const char* get_length(const char*, size_t&, int&, bool&);
const char* get_data(const char*, size_t&, size_t);

View File

@ -73,7 +73,8 @@ bool redis_pipeline_channel::flush_all(void)
#if 0
if (msgs_.size() > 10) {
logger(">>>messages size is %zd<<<<", msgs_.size());
logger(">>>messages size is %zd, buf size=%zd<<<<",
msgs_.size(), buf_.size());
}
#endif

View File

@ -65,7 +65,7 @@ int main(int argc, char *argv[])
acl::acl_cpp_init();
acl::log::stdout_open(true);
acl_fiber_msg_stdout_enable(1);
acl::fiber::stdout_open(true);
std::vector<acl::thread*> threads;
int stack_size = STACK_SIZE;