#include "stdafx.h" #include #include static bool __show_results = false; class pgsql_oper { public: pgsql_oper(acl::db_pool& dbp, unsigned long id) : dbp_(dbp), id_(id) { } ~pgsql_oper(void) { } void pgsql_add(int count) { for (int i = 0; i < count; i++) { acl::db_handle* db = dbp_.peek_open(); if (db == NULL) { printf("peek db connection error i: %d\r\n", i); break; } add(*db, i); dbp_.put(db); } } void pgsql_get(int count) { for (int i = 0; i < count; i++) { acl::db_handle* db = dbp_.peek_open(); if (db == NULL) { printf("peek db connection error i: %d\r\n", i); break; } get(*db, i); dbp_.put(db); } } private: acl::db_pool& dbp_; unsigned long id_; bool add(acl::db_handle& db, int n) { acl::query query; query.create_sql("insert into group_tbl(group_name, uvip_tbl," " update_date) values(:group, :test, :date)") .set_format("group", "group:%lu:%d", id_, n) .set_parameter("test", "test") .set_date("date", time(NULL), "%Y-%m-%d"); if (db.exec_update(query) == false) { printf("exec_update error: %s\r\n", db.get_error()); return false; } return true; } bool get(acl::db_handle& db, int n) { acl::query query; query.create_sql("select * from group_tbl" " where group_name=:group" " and uvip_tbl=:test") .set_format("group", "group:%lu:%d", id_, n) .set_format("test", "test"); if (db.exec_select(query) == false) { printf("exec_select error: %s\r\n", db.get_error()); return false; } const acl::db_rows* result = db.get_result(); if (__show_results && result) { const std::vector& rows = result->get_rows(); for (size_t i = 0; i < rows.size(); i++) { if (n > 10) continue; const acl::db_row* row = rows[i]; for (size_t j = 0; j < row->length(); j++) printf("%s, ", (*row)[j]); printf("\r\n"); } } db.free_result(); return true; } }; ////////////////////////////////////////////////////////////////////////////// // pgsql thread class class pgsql_thread : public acl::thread { public: pgsql_thread(int id, acl::db_pool& dbp, const char* oper, int count) : id_(id), dbp_(dbp), oper_(oper), count_(count) {} ~pgsql_thread(void) {} protected: void* run(void) { pgsql_oper dboper(dbp_, id_); if (oper_.equal("add", false)) dboper.pgsql_add(count_); else if (oper_.equal("get", false)) dboper.pgsql_get(count_); else printf("unknown command: %s\r\n", oper_.c_str()); return NULL; } private: int id_; acl::db_pool& dbp_; acl::string oper_; int count_; }; ////////////////////////////////////////////////////////////////////////////// // pgsql fiber class static int __max_fibers = 2; static int __cur_fibers = 2; class pgsql_fiber : public acl::fiber { public: pgsql_fiber(int id, acl::db_pool& dbp, const char* oper, int count) : id_(id), dbp_(dbp), oper_(oper), count_(count) {} ~pgsql_fiber(void) {} protected: // @override void run(void) { // printf("fiber-%d-%d running\r\n", get_id(), acl::fiber::self()); acl_fiber_delay(10); pgsql_oper dboper(dbp_, id_); if (oper_.equal("add", false)) dboper.pgsql_add(count_); else if (oper_.equal("get", false)) dboper.pgsql_get(count_); else printf("unknown command: %s\r\n", oper_.c_str()); delete this; printf("----__cur_fibers: %d----\r\n", __cur_fibers); if (--__cur_fibers == 0) { printf("All fibers Over\r\n"); acl::fiber::schedule_stop(); } } private: int id_; acl::db_pool& dbp_; acl::string oper_; int count_; }; ////////////////////////////////////////////////////////////////////////////// static void usage(const char* procname) { printf("usage: %s\r\n" " -h [help]\r\n" " -c cocurrent\r\n" " -t [use threads mode]\r\n" " -n oper_count\r\n" " -f pgsqlclient_path\r\n" " -s pgsql_addr\r\n" " -o db_oper[add|get]\r\n" " -d [show results of get]\r\n" " -C conn_timeout\r\n" " -R rw_timeout\r\n" " -u dbuser\r\n" " -p dbpass\r\n", procname); } int main(int argc, char *argv[]) { int ch, count = 10, conn_timeout = 10, rw_timeout = 10, cocurrent = 2; acl::string pgsql_path("../../lib/libpg.so"); acl::string dbaddr("127.0.0.1:5432"), dbname("acl_db"); acl::string dbuser("root"), dbpass(""), oper("get"); bool use_threads = false; acl::acl_cpp_init(); acl::log::stdout_open(true); while ((ch = getopt(argc, argv, "hc:tn:f:s:u:o:p:C:R:d")) > 0) { switch (ch) { case 'h': usage(argv[0]); return 0; case 'c': cocurrent = atoi(optarg); break; case 't': use_threads = true; break; case 'n': count = atoi(optarg); break; case 'f': pgsql_path = optarg; break; case 's': dbaddr = optarg; break; case 'u': dbuser = optarg; break; case 'p': dbpass = optarg; break; case 'o': oper = optarg; break; case 'C': conn_timeout = atoi(optarg); break; case 'R': rw_timeout = atoi(optarg); break; case 'd': __show_results = true; break; default: break; } } // setup libpgsqlclient_r.so path acl::db_handle::set_loadpath(pgsql_path); // init pgsql connection configure acl::pgsql_conf dbconf(dbaddr, dbname); dbconf.set_dbuser(dbuser) .set_dbpass(dbpass) .set_dblimit(cocurrent) .set_conn_timeout(conn_timeout) .set_rw_timeout(rw_timeout); // init pgsql connections pool acl::pgsql_pool dbpool(dbconf); if (use_threads) { std::vector threads; for (int i = 0; i < cocurrent; i++) { acl::thread* thread = new pgsql_thread(i, dbpool, oper, count); thread->set_detachable(false); threads.push_back(thread); thread->start(); } for (std::vector::iterator it = threads.begin(); it != threads.end(); ++it) { (*it)->wait(NULL); delete (*it); } } else { __max_fibers = cocurrent; __cur_fibers = __max_fibers; for (int i = 0; i < __max_fibers; i++) { acl::fiber* f = new pgsql_fiber(i, dbpool, oper, count); f->start(); } acl::fiber::schedule(); } printf("---- exit now ----\r\n"); return 0; }