diff --git a/app/wizard_demo/pkv/CMakeLists.txt b/app/wizard_demo/pkv/CMakeLists.txt index 4dfe4655b..3c16ef279 100644 --- a/app/wizard_demo/pkv/CMakeLists.txt +++ b/app/wizard_demo/pkv/CMakeLists.txt @@ -88,11 +88,19 @@ add_definitions( #find_library(fiber_cpp_lib fiber_cpp PATHS /usr/lib /usr/local/lib) #find_library(fiber_lib fiber PATHS /usr/lib /usr/local/lib) -#find_library(rocksdb_lib rocksdb PATHS /usr/lib /usr/local/lib) - set(acl_all ${home_path}/lib_fiber/lib/libfiber_cpp.a ${home_path}/libacl_all.a) set(fiber ${home_path}/lib_fiber/lib/libfiber.a) -set(rocksdb /usr/local/lib/librocksdb.a) + +find_library(rocksdb_lib rocksdb PATHS /usr/lib /usr/local/lib) +#set(rocksdb /usr/local/lib/librocksdb.a) +set(rocksdb -lrocksdb) + +find_library(jemalloc jemalloc PATHS /usr/lib /usr/local/lib) +set(jemalloc -ljemalloc) + +#set(wt -lwiredtiger) +#set(uring -luring) +#set(iconv -liconv) if(CMAKE_SYSTEM_NAME MATCHES "Darwin") # set(lib_all ${lib_global} ${lib_rpc} ${fiber_cpp_lib} ${acl_lib} ${fiber_lib} @@ -101,7 +109,7 @@ if(CMAKE_SYSTEM_NAME MATCHES "Darwin") elseif(CMAKE_SYSTEM_NAME MATCHES "Linux") # set(lib_all ${lib_global} ${lib_rpc} ${fiber_cpp_lib} ${acl_lib} ${fiber_lib} set(lib_all ${acl_all} - ${rocksdb} -lwiredtiger ${fiber} -luring -liconv -lz -lpthread -ldl) + ${rocksdb} ${wt} ${fiber} ${uring} ${iconv} ${jemalloc} -lz -lpthread -ldl) endif() set(output_path ${CMAKE_CURRENT_SOURCE_DIR}) diff --git a/app/wizard_demo/pkv/action/redis_handler.cpp b/app/wizard_demo/pkv/action/redis_handler.cpp index 38453a192..1cdb7437a 100644 --- a/app/wizard_demo/pkv/action/redis_handler.cpp +++ b/app/wizard_demo/pkv/action/redis_handler.cpp @@ -24,6 +24,19 @@ bool redis_handler::handle() { if (objs.empty()) { return true; } + +#if 0 + { + acl::string tmp; + for (size_t i = 0; i < objs.size(); i++) { + tmp += "+OK\r\n"; + } + return conn_.write(tmp.c_str(), tmp.size()) == (int) tmp.size(); + } +#endif + + //if (objs.size() >= 20) { printf(">>>objs=%zd\r\n", objs.size()); } + for (const auto& obj : objs) { if (!handle_one(*obj)) { return false; @@ -32,8 +45,15 @@ bool redis_handler::handle() { acl::string buf; if (!builder_.to_string(buf)) { + builder_.clear(); return false; } + + builder_.clear(); + + //if (objs.size() >= 20) { printf("reply len=%zd\r\n", buf.size()); } + + //printf(">>>buf=[%s]\r\n", buf.c_str()); return conn_.write(buf) != -1; } @@ -45,8 +65,17 @@ bool redis_handler::handle_one(const redis_object &obj) { } //printf(">>>cmd=%s\r\n", cmd); - - if (EQ(cmd, "HSET")) { + // + // + if (EQ(cmd, "SET")) { + return set(obj); + } else if (EQ(cmd, "GET")) { + return get(obj); + } else if (EQ(cmd, "DEL")) { + return del(obj); + } else if (EQ(cmd, "TYPE")) { + return type(obj); + } else if (EQ(cmd, "HSET")) { return hset(obj); } else if (EQ(cmd, "HGET")) { return hget(obj); @@ -58,14 +87,6 @@ bool redis_handler::handle_one(const redis_object &obj) { return hmget(obj); } else if (EQ(cmd, "HGETALL")) { return hgetall(obj); - } else if (EQ(cmd, "SET")) { - return set(obj); - } else if (EQ(cmd, "GET")) { - return get(obj); - } else if (EQ(cmd, "DEL")) { - return del(obj); - } else if (EQ(cmd, "TYPE")) { - return type(obj); } acl::string err; @@ -75,6 +96,118 @@ bool redis_handler::handle_one(const redis_object &obj) { return true; } +bool redis_handler::set(const redis_object &obj) { + auto& objs = obj.get_objects(); + if (objs.size() < 3) { + logger_error("invalid SET params' size=%zd", objs.size()); + return false; + } + + auto key = objs[1]->get_str(); + if (key == nullptr || *key == 0) { + logger_error("key null"); + return false; + } + + auto value = objs[2]->get_str(); + if (value == nullptr || *value == 0) { + logger_error("value null"); + return false; + } + +#if 1 + acl::string buff; + coder_.create_object().set_string(value); + coder_.to_string(buff); + coder_.clear(); + +# if 1 + if (!db_->set(key, buff.c_str())) { + logger_error("db set error, key=%s", key); + return false; + } +# endif +#endif + + builder_.create_object().set_status("OK"); + return true; +} + +bool redis_handler::get(const redis_object &obj) { + auto& objs = obj.get_objects(); + if (objs.size() < 2) { + logger_error("invalid GET params' size=%zd", objs.size()); + return false; + } + + auto key = objs[1]->get_str(); + if (key == nullptr || *key == 0) { + logger_error("key null"); + return false; + } + + std::string buff; + if (!db_->get(key, buff) || buff.empty()) { + logger_error("db get error, key=%s", key); + return false; + } + + redis_coder builder; + size_t len = buff.size(); + (void) builder.update(buff.c_str(), len); + if (len > 0) { + logger_error("invalid buff in db, key=%s", key); + return false; + } + + auto& objs2 = builder.get_objects(); + if (objs2.size() != 1) { + logger_error("invalid object in db, key=%s, size=%zd", key, objs2.size()); + return false; + } + + auto o = objs2[0]; + if (o->get_type() != REDIS_OBJ_STRING) { + logger_error("invalid object type=%d, key=%s", (int) o->get_type(), key); + return false; + } + + auto v = o->get_str(); + if (v == nullptr || *v == 0) { + logger_error("value null, key=%s", key); + return false; + } + builder_.create_object().set_string(v); + return true; +} + +bool redis_handler::del(const redis_object &obj) { + auto& objs = obj.get_objects(); + if (objs.size() < 2) { + logger_error("invalid SET params' size=%zd", objs.size()); + return false; + } + + auto key = objs[1]->get_str(); + if (key == nullptr || *key == 0) { + logger_error("key null"); + return false; + } + + if (!db_->del(key)) { + logger_error("db del error, key=%s", key); + return false; + } + + builder_.create_object().set_number(1); + return true; +} + +bool redis_handler::type(const redis_object &obj) { + builder_.create_object().set_status("string"); + return true; +} + bool redis_handler::hset(const redis_object &obj) { auto& objs = obj.get_objects(); if (objs.size() < 4) { @@ -163,7 +296,7 @@ bool redis_handler::hget(const redis_object &obj) { } auto array = objs2[0]; - if (array->get_type() != acl::REDIS_RESULT_ARRAY) { + if (array->get_type() != REDIS_OBJ_ARRAY) { logger_error("invalid array object, key=%s", key); return false; } @@ -209,113 +342,4 @@ bool redis_handler::hgetall(const redis_object &obj) { return true; } -bool redis_handler::set(const redis_object &obj) { - auto& objs = obj.get_objects(); - if (objs.size() < 3) { - logger_error("invalid SET params' size=%zd", objs.size()); - return false; - } - - auto key = objs[1]->get_str(); - if (key == nullptr || *key == 0) { - logger_error("key null"); - return false; - } - - auto value = objs[2]->get_str(); - if (value == nullptr || *value == 0) { - logger_error("value null"); - return false; - } - - redis_coder builder; - builder.create_object().set_string(value); - - acl::string buff; - builder.to_string(buff); - - if (!db_->set(key, buff.c_str())) { - logger_error("db set error, key=%s", key); - return false; - } - - builder_.create_object().set_status("OK"); - return true; -} - -bool redis_handler::get(const redis_object &obj) { - auto& objs = obj.get_objects(); - if (objs.size() < 2) { - logger_error("invalid GET params' size=%zd", objs.size()); - return false; - } - - auto key = objs[1]->get_str(); - if (key == nullptr || *key == 0) { - logger_error("key null"); - return false; - } - - std::string buff; - if (!db_->get(key, buff) || buff.empty()) { - logger_error("db get error, key=%s", key); - return false; - } - - redis_coder builder; - size_t len = buff.size(); - (void) builder.update(buff.c_str(), len); - if (len > 0) { - logger_error("invalid buff in db, key=%s", key); - return false; - } - - auto& objs2 = builder.get_objects(); - if (objs2.size() != 1) { - logger_error("invalid object in db, key=%s, size=%zd", key, objs2.size()); - return false; - } - - auto o = objs2[0]; - if (o->get_type() != acl::REDIS_RESULT_STRING) { - logger_error("invalid object type=%d, key=%s", (int) o->get_type(), key); - return false; - } - - auto v = o->get_str(); - if (v == nullptr || *v == 0) { - logger_error("value null, key=%s", key); - return false; - } - builder_.create_object().set_string(v); - return true; -} - -bool redis_handler::del(const redis_object &obj) { - auto& objs = obj.get_objects(); - if (objs.size() < 2) { - logger_error("invalid SET params' size=%zd", objs.size()); - return false; - } - - auto key = objs[1]->get_str(); - if (key == nullptr || *key == 0) { - logger_error("key null"); - return false; - } - - if (!db_->del(key)) { - logger_error("db del error, key=%s", key); - return false; - } - - builder_.create_object().set_number(1); - return true; -} - -bool redis_handler::type(const redis_object &obj) { - builder_.create_object().set_status("string"); - return true; -} - } // namespace pkv diff --git a/app/wizard_demo/pkv/action/redis_handler.h b/app/wizard_demo/pkv/action/redis_handler.h index 1a911817a..083f678aa 100644 --- a/app/wizard_demo/pkv/action/redis_handler.h +++ b/app/wizard_demo/pkv/action/redis_handler.h @@ -23,19 +23,21 @@ private: shared_db& db_; const redis_coder& parser_; redis_coder builder_; + redis_coder coder_; acl::socket_stream& conn_; bool handle_one(const redis_object& obj); + + bool set(const redis_object& obj); + bool get(const redis_object& obj); + bool del(const redis_object& obj); + bool type(const redis_object& obj); bool hset(const redis_object& obj); bool hget(const redis_object& obj); bool hdel(const redis_object& obj); bool hmset(const redis_object& obj); bool hmget(const redis_object& obj); bool hgetall(const redis_object& obj); - bool set(const redis_object& obj); - bool get(const redis_object& obj); - bool del(const redis_object& obj); - bool type(const redis_object& obj); }; } diff --git a/app/wizard_demo/pkv/master_service.cpp b/app/wizard_demo/pkv/master_service.cpp index c73514f55..f50f93a7c 100644 --- a/app/wizard_demo/pkv/master_service.cpp +++ b/app/wizard_demo/pkv/master_service.cpp @@ -6,25 +6,27 @@ static char *var_cfg_dbpath; acl::master_str_tbl var_conf_str_tab[] = { - { "dbpath", "./dbpath", &var_cfg_dbpath }, + { "dbpath", "./dbpath", &var_cfg_dbpath }, - { 0, 0, 0 } + { 0, 0, 0 } }; static int var_cfg_debug_enable; acl::master_bool_tbl var_conf_bool_tab[] = { - { "debug_enable", 1, &var_cfg_debug_enable }, + { "debug_enable", 1, &var_cfg_debug_enable }, - { 0, 0, 0 } + { 0, 0, 0 } }; static int var_cfg_io_timeout; +static int var_cfg_buf_size; acl::master_int_tbl var_conf_int_tab[] = { - { "io_timeout", 120, &var_cfg_io_timeout, 0, 0 }, + { "io_timeout", 120, &var_cfg_io_timeout, 0, 0 }, + { "buf_size", 8192, &var_cfg_buf_size, 0, 0 }, - { 0, 0 , 0 , 0, 0 } + { 0, 0 , 0 , 0, 0 } }; acl::master_int64_tbl var_conf_int64_tab[] = { @@ -37,20 +39,26 @@ acl::master_int64_tbl var_conf_int64_tab[] = { using namespace pkv; void master_service::on_accept(acl::socket_stream& conn) { - //logger(">>>accept connection: %d", conn.sock_handle()); //conn.set_rw_timeout(var_cfg_io_timeout); + logger(">>>accept connection: %d", conn.sock_handle()); + run(conn, var_cfg_buf_size); + + logger("Disconnect from peer, fd=%d", conn.sock_handle()); +} + +void master_service::run(acl::socket_stream& conn, size_t size) { pkv::redis_coder parser; - char buf[8192]; - size_t n = sizeof(buf) - 1; + pkv::redis_handler handler(db_, parser, conn); + char buf[size]; while(true) { - int ret = conn.read(buf, sizeof(n) - 1, false); + int ret = conn.read(buf, sizeof(buf) - 1, false); if (ret <= 0) { break; } + buf[ret] = 0; - //printf("[%s]\r\n", buf); size_t len = (size_t) ret; const char* data = parser.update(buf, len); @@ -60,10 +68,8 @@ void master_service::on_accept(acl::socket_stream& conn) { break; } - //printf("len=%zd, data=%s\r\n", len, data); assert(*data == '\0' && len == 0); - pkv::redis_handler handler(db_, parser, conn); if (!handler.handle()) { break; } diff --git a/app/wizard_demo/pkv/master_service.h b/app/wizard_demo/pkv/master_service.h index 8e802feb9..bad34a986 100644 --- a/app/wizard_demo/pkv/master_service.h +++ b/app/wizard_demo/pkv/master_service.h @@ -4,28 +4,30 @@ class master_service : public acl::master_fiber { public: - master_service() = default; - ~master_service() override = default; + master_service() = default; + ~master_service() override = default; protected: - // @override - void on_accept(acl::socket_stream& conn) override; + // @override + void on_accept(acl::socket_stream& conn) override; - // @override - void proc_on_listen(acl::server_socket& ss) override; + // @override + void proc_on_listen(acl::server_socket& ss) override; - // @override - void proc_pre_jail() override; + // @override + void proc_pre_jail() override; - // @override - void proc_on_init() override; + // @override + void proc_on_init() override; - // @override - void proc_on_exit() override; + // @override + void proc_on_exit() override; - // @override - bool proc_on_sighup(acl::string&) override; + // @override + bool proc_on_sighup(acl::string&) override; private: - pkv::shared_db db_; + pkv::shared_db db_; + + void run(acl::socket_stream& conn, size_t size); }; diff --git a/app/wizard_demo/pkv/pkv.cf b/app/wizard_demo/pkv/pkv.cf index 544e8ee57..2eb0d4a1a 100644 --- a/app/wizard_demo/pkv/pkv.cf +++ b/app/wizard_demo/pkv/pkv.cf @@ -88,7 +88,7 @@ service pkv # 每个进程实例的空闲超时时间,超过此值后进程实例主动退出 fiber_idle_limit = 0 # 每个进程启动的线程数 - fiber_threads = 8 + fiber_threads = 1 # 进程运行时所在的路径 fiber_queue_dir = {install_path}/var # 读写超时时间, 单位为秒 @@ -105,7 +105,7 @@ service pkv # fiber_dispatch_type = default # 线程的堆栈空间大小,单位为字节 - fiber_stack_size = 64000 + fiber_stack_size = 256000 # 允许访问 udserver 的客户端IP地址范围 # fiber_access_allow = 127.0.0.1:255.255.255.255, 127.0.0.1:127.0.0.1 fiber_access_allow = all @@ -144,4 +144,5 @@ service pkv # debug_mem = 1 # 是否在一个线程中连接读 # loop_read = 1 + buf_size = 8192 } diff --git a/app/wizard_demo/pkv/proto/redis_coder.cpp b/app/wizard_demo/pkv/proto/redis_coder.cpp index b90e1ca74..0dedfc7c2 100644 --- a/app/wizard_demo/pkv/proto/redis_coder.cpp +++ b/app/wizard_demo/pkv/proto/redis_coder.cpp @@ -7,11 +7,33 @@ namespace pkv { -redis_coder::redis_coder() { - curr_ = std::make_shared(nullptr); +redis_coder::redis_coder(size_t cache_max) { + cache_max_ = cache_max; + curr_ = new redis_object(cache_, cache_max_); +} + +redis_coder::~redis_coder() { + for (auto obj : objs_) { + delete obj; + } + + for (auto obj : cache_) { + delete obj; + } + + delete curr_; } void redis_coder::clear() { + for (auto obj : objs_) { + if (cache_.size() < cache_max_) { + obj->reset(); + cache_.emplace_back(obj); + } else { + delete obj; + } + } + objs_.clear(); } @@ -19,8 +41,14 @@ const char* redis_coder::update(const char* data, size_t& len) { while (len > 0) { data = curr_->update(data, len); if (curr_->finish()) { - objs_.push_back(curr_); - curr_ = std::make_shared(nullptr); + objs_.emplace_back(curr_); + + if (cache_.empty()) { + curr_ = new redis_object(cache_, cache_max_); + } else { + curr_ = cache_.back(); + cache_.pop_back(); + } } else if (curr_->failed()) { break; } @@ -30,7 +58,15 @@ const char* redis_coder::update(const char* data, size_t& len) { } redis_object& redis_coder::create_object() { - auto obj = std::make_shared(nullptr); + redis_object* obj; + + if (cache_.empty()) { + obj = new redis_object(cache_, cache_max_); + } else { + obj = cache_.back(); + cache_.pop_back(); + } + objs_.emplace_back(obj); return *obj; } @@ -195,4 +231,4 @@ bool test_redis_build() { ////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// -} // namespace pkv \ No newline at end of file +} // namespace pkv diff --git a/app/wizard_demo/pkv/proto/redis_coder.h b/app/wizard_demo/pkv/proto/redis_coder.h index 290ae4c57..946af19cd 100644 --- a/app/wizard_demo/pkv/proto/redis_coder.h +++ b/app/wizard_demo/pkv/proto/redis_coder.h @@ -4,22 +4,23 @@ #pragma once +#include #include "redis_object.h" namespace pkv { class redis_coder { public: - redis_coder(); - ~redis_coder() = default; + redis_coder(size_t cache_max = 10000); + ~redis_coder(); const char* update(const char* data, size_t& len); - [[nodiscard]] const std::vector& get_objects() const { + [[nodiscard]] const std::vector& get_objects() const { return objs_; } - [[nodiscard]] shared_redis get_curr() const { + [[nodiscard]] redis_object* get_curr() const { return curr_; } @@ -31,8 +32,10 @@ public: bool to_string(acl::string& out) const; private: - std::vector objs_; - shared_redis curr_; + std::vector objs_; + std::vector cache_; + size_t cache_max_; + redis_object* curr_; }; bool test_redis_parse(const char* filepath); diff --git a/app/wizard_demo/pkv/proto/redis_object.cpp b/app/wizard_demo/pkv/proto/redis_object.cpp index 9b7a9deb7..bc9b184d3 100644 --- a/app/wizard_demo/pkv/proto/redis_object.cpp +++ b/app/wizard_demo/pkv/proto/redis_object.cpp @@ -7,25 +7,51 @@ namespace pkv { -redis_object::redis_object(redis_object* parent) { - dbuf_ = new (1) acl::dbuf_pool(); - parent_ = parent ? parent : this; +redis_object::redis_object(std::vector& cache, size_t cache_max) +: parent_(this) +, cache_max_(cache_max) +, cache_(cache) +{ } redis_object::~redis_object() { - dbuf_->destroy(); + for (auto obj : objs_) { + delete obj; + } +} + +void redis_object::set_parent(redis_object* parent) { + if (parent) { + parent_ = parent; + } +} + +void redis_object::reset() { + for (auto obj : objs_) { + if (cache_.size() < cache_max_) { + obj->reset(); + cache_.emplace_back(obj); + } else { + delete obj; + } + } + + status_ = redis_s_begin; + type_ = REDIS_OBJ_UNKOWN; + parent_ = this; + obj_ = nullptr; + cnt_ = 0; + + objs_.clear(); + buf_.clear(); } const char* redis_object::get_cmd() const { - if (me_ == nullptr) { - return nullptr; + if (type_ == REDIS_OBJ_STRING) { + return buf_.c_str(); } - if (me_->get_type() == acl::REDIS_RESULT_STRING) { - return me_->get(0); - } - - if (objs_.empty() || me_->get_type() != acl::REDIS_RESULT_ARRAY) { + if (objs_.empty() || type_ != REDIS_OBJ_ARRAY) { return nullptr; } @@ -33,8 +59,8 @@ const char* redis_object::get_cmd() const { } const char* redis_object::get_str() const { - if (me_->get_type() == acl::REDIS_RESULT_STRING) { - return me_->get(0); + if (type_ == REDIS_OBJ_STRING) { + return buf_.c_str(); } return nullptr; @@ -97,11 +123,17 @@ const char* redis_object::parse_object(const char* data, size_t& len) { if (objs_.size() == (size_t) cnt_) { obj_ = nullptr; - cnt_ = 0; + //cnt_ = 0; status_ = redis_s_finish; + } else if (cache_.empty()) { + obj_ = new redis_object(cache_, cache_max_); + obj_->set_parent(this); } else { - obj_ = std::make_shared(this); + obj_ = cache_.back(); + obj_->set_parent(this); + cache_.pop_back(); } + return data; } @@ -138,7 +170,7 @@ const char* redis_object::parse_begin(const char* data, size_t& len) { const char* redis_object::parse_status(const char* data, size_t& len) { bool found = false; - data = get_line(data, len, found); + data = get_line(data, len, buf_, found); if (!found) { assert(len == 0); return data; @@ -149,19 +181,14 @@ const char* redis_object::parse_status(const char* data, size_t& len) { return data; } - me_ = new(dbuf_) acl::redis_result(dbuf_); - me_->set_type(acl::REDIS_RESULT_STATUS); - me_->set_size(1); - put_data(dbuf_, me_, buf_.c_str(), buf_.length()); - buf_.clear(); - + type_ = REDIS_OBJ_STATUS; status_ = redis_s_finish; return data; } const char* redis_object::parse_error(const char* data, size_t& len) { bool found = false; - data = get_line(data, len, found); + data = get_line(data, len, buf_, found); if (!found) { assert(len == 0); return data; @@ -172,19 +199,14 @@ const char* redis_object::parse_error(const char* data, size_t& len) { return data; } - me_ = new(dbuf_) acl::redis_result(dbuf_); - me_->set_type(acl::REDIS_RESULT_ERROR); - me_->set_size(1); - put_data(dbuf_, me_, buf_.c_str(), buf_.length()); - buf_.clear(); - + type_ = REDIS_OBJ_ERROR; status_ = redis_s_finish; return data; } const char* redis_object::parse_number(const char* data, size_t& len) { bool found = false; - data = get_line(data, len, found); + data = get_line(data, len, buf_, found); if (!found) { assert(len == 0); return data; @@ -195,12 +217,7 @@ const char* redis_object::parse_number(const char* data, size_t& len) { return data; } - me_ = new(dbuf_) acl::redis_result(dbuf_); - me_->set_type(acl::REDIS_RESULT_INTEGER); - me_->set_size(1); - put_data(dbuf_, me_, buf_.c_str(), buf_.length()); - buf_.clear(); - + type_ = REDIS_OBJ_INTEGER; status_ = redis_s_finish; return data; } @@ -213,43 +230,33 @@ const char* redis_object::parse_strlen(const char* data, size_t& len) { return data; } - me_ = new(dbuf_) acl::redis_result(dbuf_); - me_->set_type(acl::REDIS_RESULT_STRING); - if (cnt_ <= 0) { status_ = redis_s_finish; return data; } - me_->set_size((size_t) cnt_); - buf_.clear(); - + type_ = REDIS_OBJ_STRING; status_ = redis_s_string; return data; } const char* redis_object::parse_string(const char* data, size_t& len) { - assert (me_ != nullptr); - + buf_.reserve((size_t) cnt_); data = get_data(data, len, (size_t) cnt_); if (buf_.size() == (size_t) cnt_) { status_ = redis_s_strend; - put_data(dbuf_, me_, buf_.c_str(), buf_.length()); - buf_.clear(); - cnt_ = 0; } return data; } const char* redis_object::parse_strend(const char* data, size_t& len) { - assert (me_ != nullptr); - bool found = false; - data = get_line(data, len, found); + std::string buf; + data = get_line(data, len, buf, found); // If the buf_ not empty, some data other '\r\n' got. - if (!buf_.empty()) { + if (!buf.empty()) { status_ = redis_s_null; return data; } @@ -271,24 +278,27 @@ const char* redis_object::parse_arlen(const char* data, size_t& len) { return data; } - me_ = new(dbuf_) acl::redis_result(dbuf_); - me_->set_type(acl::REDIS_RESULT_ARRAY); - if (cnt_ <= 0) { status_ = redis_s_finish; return data; } - me_->set_size((size_t) cnt_); - - buf_.clear(); + type_ = REDIS_OBJ_ARRAY; status_ = redis_s_array; - obj_ = std::make_shared(this); + + if (cache_.empty()) { + obj_ = new redis_object(cache_, cache_max_); + obj_->set_parent(this); + } else { + obj_ = cache_.back(); + obj_->set_parent(this); + cache_.pop_back(); + } + return data; } const char* redis_object::parse_array(const char* data, size_t& len) { - assert(me_ != nullptr); assert(obj_ != nullptr); return parse_object(data, len); @@ -299,16 +309,30 @@ const char* redis_object::get_data(const char* data, size_t& len, size_t want) { assert(n < want); want -= n; + +#if 1 + if (want > len) { + want = len; + len = 0; + } else { + len -= want; + } + + buf_.append(data, want); + data += want; +#else for (size_t i = 0; i < want && len > 0; i++) { buf_.push_back(*data++); len--; } +#endif + return data; } const char* redis_object::get_length(const char* data, size_t& len, int& res, bool& found) { - data = get_line(data, len, found); + data = get_line(data, len, buf_, found); if (!found) { assert(len == 0); return data; @@ -331,7 +355,8 @@ const char* redis_object::get_length(const char* data, size_t& len, return data; } -const char* redis_object::get_line(const char* data, size_t& len, bool& found) { +const char* redis_object::get_line(const char* data, size_t& len, + std::string& buf, bool& found) { while (len > 0) { switch (*data) { case '\r': @@ -344,7 +369,7 @@ const char* redis_object::get_line(const char* data, size_t& len, bool& found) { found = true; return data; default: - buf_.push_back(*data++); + buf.push_back(*data++); len--; break; } @@ -352,17 +377,6 @@ const char* redis_object::get_line(const char* data, size_t& len, bool& found) { return data; } -void redis_object::put_data(acl::dbuf_pool* dbuf, acl::redis_result* rr, - const char* data, size_t len) { - char* buf = (char*) dbuf->dbuf_alloc(len + 1); - if (len > 0) { - memcpy(buf, data, len); - } - - buf[len] = 0; - rr->put(buf, len); -} - bool redis_object::to_string(acl::string& out) const { #define USE_UNIX_CRLF #ifdef USE_UNIX_CRLF @@ -381,83 +395,80 @@ bool redis_object::to_string(acl::string& out) const { } } - if (me_ == nullptr) { - return false; - } + //assert(!buf_.empty()); - switch (me_->get_type()) { - case acl::REDIS_RESULT_STATUS: - out.format_append("+%s%s", me_->get_status(), CRLF); + switch (type_) { + case REDIS_OBJ_STATUS: + out.format_append("+%s%s", buf_.c_str(), CRLF); break; - case acl::REDIS_RESULT_ERROR: - out.format_append("-%s%s", me_->get_error(), CRLF); + case REDIS_OBJ_ERROR: + out.format_append("-%s%s", buf_.c_str(), CRLF); break; - case acl::REDIS_RESULT_INTEGER: - out.format_append(":%lld%s", me_->get_integer64(), CRLF); + case REDIS_OBJ_INTEGER: + out.format_append(":%s%s", buf_.c_str(), CRLF); break; - case acl::REDIS_RESULT_STRING: - out.format_append("$%zd%s", me_->get_length(), CRLF); - me_->argv_to_string(out, false); - out += CRLF; + case REDIS_OBJ_STRING: + out.format_append("$%zd%s%s%s", buf_.size(), CRLF, buf_.c_str(), CRLF); break; //case acl::REDIS_RESULT_ARRAY: // break; default: break; } + return true; } redis_object& redis_object::set_status(const std::string& data, bool return_parent) { - me_ = new(dbuf_) acl::redis_result(dbuf_); - me_->set_type(acl::REDIS_RESULT_STATUS); - me_->set_size(1); - put_data(dbuf_, me_, data.c_str(), data.length()); + type_ = REDIS_OBJ_STATUS; + buf_ = data; return return_parent ? *parent_ : *this; } redis_object& redis_object::set_error(const std::string& data, bool return_parent) { - me_ = new(dbuf_) acl::redis_result(dbuf_); - me_->set_type(acl::REDIS_RESULT_ERROR); - me_->set_size(1); - put_data(dbuf_, me_, data.c_str(), data.length()); + type_ = REDIS_OBJ_ERROR; + buf_ = data; return return_parent ? *parent_ : *this; } redis_object& redis_object::set_number(int n, bool return_parent) { - me_ = new(dbuf_) acl::redis_result(dbuf_); - me_->set_type(acl::REDIS_RESULT_INTEGER); - me_->set_size(1); + type_ = REDIS_OBJ_INTEGER; std::string buf = std::to_string(n); - put_data(dbuf_, me_, buf.c_str(), buf.length()); + buf_ = buf; return return_parent ? *parent_ : *this; } redis_object& redis_object::set_string(const std::string &data, bool return_parent) { - me_ = new(dbuf_) acl::redis_result(dbuf_); - me_->set_type(acl::REDIS_RESULT_STRING); - me_->set_size(data.size()); + type_ = REDIS_OBJ_STRING; if (!data.empty()) { - put_data(dbuf_, me_, data.c_str(), data.length()); + buf_ = data; } return return_parent ? *parent_ : *this; } redis_object& redis_object::create_child() { - auto obj = std::make_shared(this); - objs_.emplace_back(obj); - - if (me_ == nullptr) { - // The last one is NULL. - me_ = new(dbuf_) acl::redis_result(dbuf_); - me_->set_type(acl::REDIS_RESULT_ARRAY); + redis_object* obj; + if (cache_.empty()) { + obj = new redis_object(cache_, cache_max_); + obj->set_parent(this); + objs_.emplace_back(obj); + } else { + obj = cache_.back(); + obj->set_parent(this); + cache_.pop_back(); + objs_.emplace_back(obj); } - me_->set_size(objs_.size()); + if (obj_ == nullptr) { + // The last one is NULL. + type_ = REDIS_OBJ_ARRAY; + } + + cnt_ = objs_.size(); return *obj; } diff --git a/app/wizard_demo/pkv/proto/redis_object.h b/app/wizard_demo/pkv/proto/redis_object.h index 894db1432..50f83f61b 100644 --- a/app/wizard_demo/pkv/proto/redis_object.h +++ b/app/wizard_demo/pkv/proto/redis_object.h @@ -4,6 +4,8 @@ #pragma once +#include +#include #include "redis_type.h" namespace pkv { @@ -11,14 +13,27 @@ namespace pkv { class redis_object; using shared_redis = std::shared_ptr; +typedef enum { + REDIS_OBJ_UNKOWN, + REDIS_OBJ_NIL, + REDIS_OBJ_ERROR, + REDIS_OBJ_STATUS, + REDIS_OBJ_INTEGER, + REDIS_OBJ_STRING, + REDIS_OBJ_ARRAY, +} redis_obj_t; + class redis_object { public: - explicit redis_object(redis_object* parent); + explicit redis_object(std::vector& cache, size_t cache_max); ~redis_object(); redis_object(const redis_object&) = delete; void operator=(const redis_object&) = delete; + void set_parent(redis_object* parent); + void reset(); + public: const char* update(const char* data, size_t& len); @@ -34,15 +49,15 @@ public: return status_; } - [[nodiscard]] acl::redis_result_t get_type() const { - return me_ ? me_->get_type() : acl::REDIS_RESULT_UNKOWN; + [[nodiscard]] redis_obj_t get_type() const { + return type_; } [[nodiscard]] const char* get_cmd() const; [[nodiscard]] const char* get_str() const; - [[nodiscard]] const std::vector& get_objects() const { + [[nodiscard]] const std::vector& get_objects() const { return objs_; } @@ -56,19 +71,21 @@ public: bool to_string(acl::string& out) const; private: - acl::dbuf_pool* dbuf_; - redis_object* parent_; int status_ = redis_s_begin; - acl::redis_result* me_ = nullptr; - std::vector objs_; + redis_obj_t type_ = REDIS_OBJ_UNKOWN; + + redis_object* parent_ = nullptr; + redis_object* obj_ = nullptr; std::string buf_; int cnt_ = 0; - shared_redis obj_ = nullptr; + + size_t cache_max_; + std::vector& cache_; + std::vector objs_; private: - static void put_data(acl::dbuf_pool*, acl::redis_result*, const char*, size_t); - const char* get_line(const char*, size_t&, bool&); + const char* get_line(const char*, size_t&, std::string&, bool&); const char* get_length(const char*, size_t&, int&, bool&); const char* get_data(const char*, size_t&, size_t); diff --git a/lib_acl_cpp/src/redis/redis_client_pipeline.cpp b/lib_acl_cpp/src/redis/redis_client_pipeline.cpp index 58361ecb2..30229f4f6 100644 --- a/lib_acl_cpp/src/redis/redis_client_pipeline.cpp +++ b/lib_acl_cpp/src/redis/redis_client_pipeline.cpp @@ -73,7 +73,8 @@ bool redis_pipeline_channel::flush_all(void) #if 0 if (msgs_.size() > 10) { - logger(">>>messages size is %zd<<<<", msgs_.size()); + logger(">>>messages size is %zd, buf size=%zd<<<<", + msgs_.size(), buf_.size()); } #endif diff --git a/lib_fiber/samples/redis_threads/main.cpp b/lib_fiber/samples/redis_threads/main.cpp index 19ce4d253..62475061d 100644 --- a/lib_fiber/samples/redis_threads/main.cpp +++ b/lib_fiber/samples/redis_threads/main.cpp @@ -65,7 +65,7 @@ int main(int argc, char *argv[]) acl::acl_cpp_init(); acl::log::stdout_open(true); - acl_fiber_msg_stdout_enable(1); + acl::fiber::stdout_open(true); std::vector threads; int stack_size = STACK_SIZE;