Merge pull request #23 from an-tao/pg

Optimize PostgreSQL client
This commit is contained in:
An Tao 2019-01-09 14:57:45 +08:00 committed by GitHub
commit 3eff3853d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 209 additions and 196 deletions

View File

@ -100,6 +100,7 @@ class HttpAppFramework : public trantor::NonCopyable
virtual trantor::EventLoop *loop() = 0; virtual trantor::EventLoop *loop() = 0;
virtual void setThreadNum(size_t threadNum) = 0; virtual void setThreadNum(size_t threadNum) = 0;
virtual size_t getThreadNum() const = 0;
virtual void setSSLFiles(const std::string &certPath, virtual void setSSLFiles(const std::string &certPath,
const std::string &keyPath) = 0; const std::string &keyPath) = 0;
virtual void addListener(const std::string &ip, virtual void addListener(const std::string &ip,

View File

@ -45,6 +45,7 @@ class HttpAppFrameworkImpl : public HttpAppFramework
const std::string &certFile = "", const std::string &certFile = "",
const std::string &keyFile = "") override; const std::string &keyFile = "") override;
virtual void setThreadNum(size_t threadNum) override; virtual void setThreadNum(size_t threadNum) override;
virtual size_t getThreadNum() const override { return _threadNum; }
virtual void setSSLFiles(const std::string &certPath, virtual void setSSLFiles(const std::string &certPath,
const std::string &keyPath) override; const std::string &keyPath) override;
virtual void run() override; virtual void run() override;

View File

@ -152,13 +152,13 @@ class DbClient : public trantor::NonCopyable
private: private:
friend internal::SqlBinder; friend internal::SqlBinder;
virtual void execSql(const std::string &sql, virtual void execSql(std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &rcb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exptCallback) = 0; std::function<void(const std::exception_ptr &)> &&exceptCallback) = 0;
protected: protected:
ClientType _type; ClientType _type;

View File

@ -84,9 +84,11 @@ class Result
/// For Mysql, Sqlite3 database /// For Mysql, Sqlite3 database
unsigned long long insertId() const noexcept; unsigned long long insertId() const noexcept;
const std::string &sql() const noexcept;
private: private:
ResultImplPtr _resultPtr; ResultImplPtr _resultPtr;
std::string _query;
std::string _errString; std::string _errString;
friend class Field; friend class Field;
friend class Row; friend class Row;

View File

@ -26,6 +26,7 @@
#include "TransactionImpl.h" #include "TransactionImpl.h"
#include <trantor/net/EventLoop.h> #include <trantor/net/EventLoop.h>
#include <trantor/net/inner/Channel.h> #include <trantor/net/inner/Channel.h>
#include <drogon/drogon.h>
#include <drogon/orm/Exception.h> #include <drogon/orm/Exception.h>
#include <drogon/orm/DbClient.h> #include <drogon/orm/DbClient.h>
#include <sys/select.h> #include <sys/select.h>
@ -42,44 +43,41 @@ using namespace drogon::orm;
DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type) DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type)
: _connInfo(connInfo), : _connInfo(connInfo),
_connectNum(connNum) _connectNum(connNum),
_loops(connNum / 100 > 0 ? connNum / 100 : 1)
{ {
_type = type; _type = type;
LOG_TRACE << "type=" << (int)type; LOG_TRACE << "type=" << (int)type;
//LOG_DEBUG << _loops.getLoopNum();
assert(connNum > 0); assert(connNum > 0);
_loopThread = std::thread([=]() { _loops.start();
_loopPtr = std::shared_ptr<trantor::EventLoop>(new trantor::EventLoop); std::thread([this]() {
ioLoop(); for (size_t i = 0; i < _connectNum; i++)
});
}
void DbClientImpl::ioLoop()
{
auto thisPtr = shared_from_this();
_loopPtr->runAfter(0, [thisPtr]() {
for (size_t i = 0; i < thisPtr->_connectNum; i++)
{ {
thisPtr->_connections.insert(thisPtr->newConnection()); auto loop = _loops.getNextLoop();
loop->runInLoop([this, loop]() {
_connections.insert(newConnection(loop));
});
} }
}); }).detach();
_loopPtr->loop();
} }
DbClientImpl::~DbClientImpl() noexcept DbClientImpl::~DbClientImpl() noexcept
{ {
_stop = true; std::lock_guard<std::mutex> lock(_connectionsMutex);
_loopPtr->quit(); _connections.clear();
if (_loopThread.joinable()) _readyConnections.clear();
_loopThread.join(); _busyConnections.clear();
} }
void DbClientImpl::execSql(const DbConnectionPtr &conn, void DbClientImpl::execSql(const DbConnectionPtr &conn,
const std::string &sql, std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &cb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) std::function<void(const std::exception_ptr &)> &&exceptCallback)
{ {
if (!conn) if (!conn)
{ {
@ -94,8 +92,8 @@ void DbClientImpl::execSql(const DbConnectionPtr &conn,
return; return;
} }
std::weak_ptr<DbConnection> weakConn = conn; std::weak_ptr<DbConnection> weakConn = conn;
conn->execSql(sql, paraNum, parameters, length, format, conn->execSql(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format),
cb, exceptCallback, std::move(rcb), std::move(exceptCallback),
[=]() -> void { [=]() -> void {
{ {
auto connPtr = weakConn.lock(); auto connPtr = weakConn.lock();
@ -105,18 +103,18 @@ void DbClientImpl::execSql(const DbConnectionPtr &conn,
} }
}); });
} }
void DbClientImpl::execSql(const std::string &sql, void DbClientImpl::execSql(std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const QueryCallback &cb, ResultCallback &&rcb,
const ExceptPtrCallback &exceptCb) std::function<void(const std::exception_ptr &)> &&exceptCallback)
{ {
assert(paraNum == parameters.size()); assert(paraNum == parameters.size());
assert(paraNum == length.size()); assert(paraNum == length.size());
assert(paraNum == format.size()); assert(paraNum == format.size());
assert(cb); assert(rcb);
DbConnectionPtr conn; DbConnectionPtr conn;
{ {
std::lock_guard<std::mutex> guard(_connectionsMutex); std::lock_guard<std::mutex> guard(_connectionsMutex);
@ -131,7 +129,7 @@ void DbClientImpl::execSql(const std::string &sql,
} }
catch (...) catch (...)
{ {
exceptCb(std::current_exception()); exceptCallback(std::current_exception());
} }
return; return;
} }
@ -146,7 +144,7 @@ void DbClientImpl::execSql(const std::string &sql,
} }
if (conn) if (conn)
{ {
execSql(conn, sql, paraNum, parameters, length, format, cb, exceptCb); execSql(conn, std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), std::move(rcb), std::move(exceptCallback));
return; return;
} }
bool busy = false; bool busy = false;
@ -166,19 +164,19 @@ void DbClientImpl::execSql(const std::string &sql,
} }
catch (...) catch (...)
{ {
exceptCb(std::current_exception()); exceptCallback(std::current_exception());
} }
return; return;
} }
//LOG_TRACE << "Push query to buffer"; //LOG_TRACE << "Push query to buffer";
SqlCmd cmd; std::shared_ptr<SqlCmd> cmd = std::make_shared<SqlCmd>();
cmd._sql = sql; cmd->_sql = std::move(sql);
cmd._paraNum = paraNum; cmd->_paraNum = paraNum;
cmd._parameters = parameters; cmd->_parameters = std::move(parameters);
cmd._length = length; cmd->_length = std::move(length);
cmd._format = format; cmd->_format = std::move(format);
cmd._cb = cb; cmd->_cb = std::move(rcb);
cmd._exceptCb = exceptCb; cmd->_exceptCb = std::move(exceptCallback);
{ {
std::lock_guard<std::mutex> guard(_bufferMutex); std::lock_guard<std::mutex> guard(_bufferMutex);
_sqlCmdBuffer.push_back(std::move(cmd)); _sqlCmdBuffer.push_back(std::move(cmd));
@ -239,28 +237,30 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr)
if (!_sqlCmdBuffer.empty()) if (!_sqlCmdBuffer.empty())
{ {
_busyConnections.insert(connPtr); //For new connections, this sentence is necessary _busyConnections.insert(connPtr); //For new connections, this sentence is necessary
auto cmd = _sqlCmdBuffer.front(); auto &cmd = _sqlCmdBuffer.front();
_sqlCmdBuffer.pop_front(); connPtr->loop()->queueInLoop([connPtr, cmd, this]() {
_loopPtr->queueInLoop([=]() { execSql(connPtr, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb));
execSql(connPtr, cmd._sql, cmd._paraNum, cmd._parameters, cmd._length, cmd._format, cmd._cb, cmd._exceptCb);
}); });
_sqlCmdBuffer.pop_front();
return; return;
} }
} }
//Idle connection //Idle connection
_busyConnections.erase(connPtr); connPtr->loop()->queueInLoop([connPtr, this]() {
_readyConnections.insert(connPtr); std::lock_guard<std::mutex> guard(_connectionsMutex);
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
});
} }
} }
DbConnectionPtr DbClientImpl::newConnection() DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop)
{ {
DbConnectionPtr connPtr; DbConnectionPtr connPtr;
if (_type == ClientType::PostgreSQL) if (_type == ClientType::PostgreSQL)
{ {
#if USE_POSTGRESQL #if USE_POSTGRESQL
connPtr = std::make_shared<PgConnection>(_loopPtr.get(), _connInfo); connPtr = std::make_shared<PgConnection>(loop, _connInfo);
#else #else
return nullptr; return nullptr;
#endif #endif
@ -268,7 +268,7 @@ DbConnectionPtr DbClientImpl::newConnection()
else if (_type == ClientType::Mysql) else if (_type == ClientType::Mysql)
{ {
#if USE_MYSQL #if USE_MYSQL
connPtr = std::make_shared<MysqlConnection>(_loopPtr.get(), _connInfo); connPtr = std::make_shared<MysqlConnection>(loop, _connInfo);
#else #else
return nullptr; return nullptr;
#endif #endif
@ -276,7 +276,7 @@ DbConnectionPtr DbClientImpl::newConnection()
else if (_type == ClientType::Sqlite3) else if (_type == ClientType::Sqlite3)
{ {
#if USE_SQLITE3 #if USE_SQLITE3
connPtr = std::make_shared<Sqlite3Connection>(_loopPtr.get(), _connInfo); connPtr = std::make_shared<Sqlite3Connection>(loop, _connInfo);
#else #else
return nullptr; return nullptr;
#endif #endif
@ -285,11 +285,11 @@ DbConnectionPtr DbClientImpl::newConnection()
{ {
return nullptr; return nullptr;
} }
auto loopPtr = _loopPtr;
std::weak_ptr<DbClientImpl> weakPtr = shared_from_this(); std::weak_ptr<DbClientImpl> weakPtr = shared_from_this();
connPtr->setCloseCallback([weakPtr, loopPtr](const DbConnectionPtr &closeConnPtr) { connPtr->setCloseCallback([weakPtr, loop](const DbConnectionPtr &closeConnPtr) {
//Reconnect after 1 second //Reconnect after 1 second
loopPtr->runAfter(1, [weakPtr, closeConnPtr] { loop->runAfter(1, [weakPtr, closeConnPtr, loop] {
auto thisPtr = weakPtr.lock(); auto thisPtr = weakPtr.lock();
if (!thisPtr) if (!thisPtr)
return; return;
@ -298,7 +298,7 @@ DbConnectionPtr DbClientImpl::newConnection()
thisPtr->_busyConnections.erase(closeConnPtr); thisPtr->_busyConnections.erase(closeConnPtr);
assert(thisPtr->_connections.find(closeConnPtr) != thisPtr->_connections.end()); assert(thisPtr->_connections.find(closeConnPtr) != thisPtr->_connections.end());
thisPtr->_connections.erase(closeConnPtr); thisPtr->_connections.erase(closeConnPtr);
thisPtr->_connections.insert(thisPtr->newConnection()); thisPtr->_connections.insert(thisPtr->newConnection(loop));
}); });
}); });
connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) { connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) {

View File

@ -16,7 +16,7 @@
#include "DbConnection.h" #include "DbConnection.h"
#include <drogon/orm/DbClient.h> #include <drogon/orm/DbClient.h>
#include <trantor/net/EventLoop.h> #include <trantor/net/EventLoopThreadPool.h>
#include <memory> #include <memory>
#include <thread> #include <thread>
#include <functional> #include <functional>
@ -34,40 +34,39 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClie
public: public:
DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type); DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type);
virtual ~DbClientImpl() noexcept; virtual ~DbClientImpl() noexcept;
virtual void execSql(const std::string &sql, virtual void execSql(std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &rcb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) override; std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) override; virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) override;
private: private:
void ioLoop(); std::string _connInfo;
std::shared_ptr<trantor::EventLoop> _loopPtr; size_t _connectNum;
void execSql(const DbConnectionPtr &conn, const std::string &sql, trantor::EventLoopThreadPool _loops;
void execSql(const DbConnectionPtr &conn,
std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &rcb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback); std::function<void(const std::exception_ptr &)> &&exceptCallback);
DbConnectionPtr newConnection(); DbConnectionPtr newConnection(trantor::EventLoop *loop);
std::mutex _connectionsMutex;
std::unordered_set<DbConnectionPtr> _connections; std::unordered_set<DbConnectionPtr> _connections;
std::unordered_set<DbConnectionPtr> _readyConnections; std::unordered_set<DbConnectionPtr> _readyConnections;
std::unordered_set<DbConnectionPtr> _busyConnections; std::unordered_set<DbConnectionPtr> _busyConnections;
std::string _connInfo;
std::thread _loopThread;
std::mutex _connectionsMutex;
std::condition_variable _condConnectionReady; std::condition_variable _condConnectionReady;
size_t _transWaitNum = 0; size_t _transWaitNum = 0;
size_t _connectNum;
bool _stop = false;
struct SqlCmd struct SqlCmd
{ {
std::string _sql; std::string _sql;
@ -78,7 +77,7 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClie
QueryCallback _cb; QueryCallback _cb;
ExceptPtrCallback _exceptCb; ExceptPtrCallback _exceptCb;
}; };
std::deque<SqlCmd> _sqlCmdBuffer; std::deque<std::shared_ptr<SqlCmd>> _sqlCmdBuffer;
std::mutex _bufferMutex; std::mutex _bufferMutex;
void handleNewTask(const DbConnectionPtr &conn); void handleNewTask(const DbConnectionPtr &conn);

View File

@ -51,14 +51,14 @@ class DbConnection : public trantor::NonCopyable
{ {
_closeCb = cb; _closeCb = cb;
} }
virtual void execSql(const std::string &sql, virtual void execSql(std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &rcb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback, std::function<void(const std::exception_ptr &)> &&exceptCallback,
const std::function<void()> &idleCb) = 0; std::function<void()> &&idleCb) = 0;
virtual ~DbConnection() virtual ~DbConnection()
{ {
LOG_TRACE << "Destruct DbConn" << this; LOG_TRACE << "Destruct DbConn" << this;

View File

@ -114,7 +114,6 @@ Result::size_type Result::size() const noexcept
void Result::swap(Result &other) noexcept void Result::swap(Result &other) noexcept
{ {
_resultPtr.swap(other._resultPtr); _resultPtr.swap(other._resultPtr);
_query.swap(other._query);
_errString.swap(other._errString); _errString.swap(other._errString);
} }
Result::row_size_type Result::columns() const noexcept Result::row_size_type Result::columns() const noexcept
@ -148,4 +147,8 @@ Result::field_size_type Result::getLength(Result::size_type row, Result::row_siz
unsigned long long Result::insertId() const noexcept unsigned long long Result::insertId() const noexcept
{ {
return _resultPtr->insertId(); return _resultPtr->insertId();
}
const std::string &Result::sql() const noexcept
{
return _resultPtr->sql();
} }

View File

@ -24,6 +24,7 @@ namespace orm
class ResultImpl : public trantor::NonCopyable, public Result class ResultImpl : public trantor::NonCopyable, public Result
{ {
public: public:
ResultImpl(const std::string &query) : _query(query) {}
virtual size_type size() const noexcept = 0; virtual size_type size() const noexcept = 0;
virtual row_size_type columns() const noexcept = 0; virtual row_size_type columns() const noexcept = 0;
virtual const char *columnName(row_size_type Number) const = 0; virtual const char *columnName(row_size_type Number) const = 0;
@ -32,8 +33,12 @@ class ResultImpl : public trantor::NonCopyable, public Result
virtual const char *getValue(size_type row, row_size_type column) const = 0; virtual const char *getValue(size_type row, row_size_type column) const = 0;
virtual bool isNull(size_type row, row_size_type column) const = 0; virtual bool isNull(size_type row, row_size_type column) const = 0;
virtual field_size_type getLength(size_type row, row_size_type column) const = 0; virtual field_size_type getLength(size_type row, row_size_type column) const = 0;
virtual const std::string &sql() const { return _query; }
virtual unsigned long long insertId() const noexcept { return 0; } virtual unsigned long long insertId() const noexcept { return 0; }
virtual ~ResultImpl() {} virtual ~ResultImpl() {}
private:
std::string _query;
}; };
} // namespace orm } // namespace orm

View File

@ -28,7 +28,7 @@ void SqlBinder::exec()
//nonblocking mode,default mode //nonblocking mode,default mode
//Retain shared_ptrs of parameters until we get the result; //Retain shared_ptrs of parameters until we get the result;
std::shared_ptr<decltype(_objs)> objs = std::make_shared<decltype(_objs)>(std::move(_objs)); std::shared_ptr<decltype(_objs)> objs = std::make_shared<decltype(_objs)>(std::move(_objs));
_client.execSql(_sql, _paraNum, _parameters, _length, _format, _client.execSql(std::move(_sql), _paraNum, std::move(_parameters), std::move(_length), std::move(_format),
[holder = std::move(_callbackHolder), objs](const Result &r) { [holder = std::move(_callbackHolder), objs](const Result &r) {
objs->clear(); objs->clear();
if (holder) if (holder)
@ -67,7 +67,7 @@ void SqlBinder::exec()
std::shared_ptr<std::promise<Result>> pro(new std::promise<Result>); std::shared_ptr<std::promise<Result>> pro(new std::promise<Result>);
auto f = pro->get_future(); auto f = pro->get_future();
_client.execSql(_sql, _paraNum, _parameters, _length, _format, _client.execSql(std::move(_sql), _paraNum, std::move(_parameters), std::move(_length), std::move(_format),
[pro](const Result &r) { [pro](const Result &r) {
pro->set_value(r); pro->set_value(r);
}, },

View File

@ -70,27 +70,27 @@ TransactionImpl::~TransactionImpl()
}); });
} }
} }
void TransactionImpl::execSql(const std::string &sql, void TransactionImpl::execSql(std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &rcb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) std::function<void(const std::exception_ptr &)> &&exceptCallback)
{ {
auto thisPtr = shared_from_this(); auto thisPtr = shared_from_this();
_loop->queueInLoop([thisPtr, sql, paraNum, parameters, length, format, rcb, exceptCallback]() { _loop->queueInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback)]() mutable {
if (!thisPtr->_isCommitedOrRolledback) if (!thisPtr->_isCommitedOrRolledback)
{ {
if (!thisPtr->_isWorking) if (!thisPtr->_isWorking)
{ {
thisPtr->_isWorking = true; thisPtr->_isWorking = true;
thisPtr->_connectionPtr->execSql(sql, thisPtr->_connectionPtr->execSql(std::move(sql),
paraNum, paraNum,
parameters, std::move(parameters),
length, std::move(length),
format, std::move(format),
rcb, std::move(rcb),
[exceptCallback, thisPtr](const std::exception_ptr &ePtr) { [exceptCallback, thisPtr](const std::exception_ptr &ePtr) {
thisPtr->rollback(); thisPtr->rollback();
if (exceptCallback) if (exceptCallback)
@ -104,13 +104,13 @@ void TransactionImpl::execSql(const std::string &sql,
{ {
//push sql cmd to buffer; //push sql cmd to buffer;
SqlCmd cmd; SqlCmd cmd;
cmd._sql = sql; cmd._sql = std::move(sql);
cmd._paraNum = paraNum; cmd._paraNum = paraNum;
cmd._parameters = parameters; cmd._parameters = std::move(parameters);
cmd._length = length; cmd._length = std::move(length);
cmd._format = format; cmd._format = std::move(format);
cmd._cb = rcb; cmd._cb = std::move(rcb);
cmd._exceptCb = exceptCallback; cmd._exceptCb = std::move(exceptCallback);
thisPtr->_sqlCmdBuffer.push_back(std::move(cmd)); thisPtr->_sqlCmdBuffer.push_back(std::move(cmd));
} }
} }
@ -196,13 +196,13 @@ void TransactionImpl::execNewTask()
auto conn = _connectionPtr; auto conn = _connectionPtr;
_loop->queueInLoop([=]() { _loop->queueInLoop([=]() mutable {
conn->execSql(cmd._sql, conn->execSql(std::move(cmd._sql),
cmd._paraNum, cmd._paraNum,
cmd._parameters, std::move(cmd._parameters),
cmd._length, std::move(cmd._length),
cmd._format, std::move(cmd._format),
cmd._cb, std::move(cmd._cb),
[cmd, thisPtr](const std::exception_ptr &ePtr) { [cmd, thisPtr](const std::exception_ptr &ePtr) {
thisPtr->rollback(); thisPtr->rollback();
if (cmd._exceptCb) if (cmd._exceptCb)

View File

@ -32,13 +32,13 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
private: private:
DbConnectionPtr _connectionPtr; DbConnectionPtr _connectionPtr;
virtual void execSql(const std::string &sql, virtual void execSql(std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &rcb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback) override; std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)>&) override virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)>&) override
{ {
return shared_from_this(); return shared_from_this();

View File

@ -268,14 +268,14 @@ void MysqlConnection::handleEvent()
} }
} }
void MysqlConnection::execSql(const std::string &sql, void MysqlConnection::execSql(std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &rcb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback, std::function<void(const std::exception_ptr &)> &&exceptCallback,
const std::function<void()> &idleCb) std::function<void()> &&idleCb)
{ {
LOG_TRACE << sql; LOG_TRACE << sql;
assert(paraNum == parameters.size()); assert(paraNum == parameters.size());
@ -286,10 +286,10 @@ void MysqlConnection::execSql(const std::string &sql,
assert(!_isWorking); assert(!_isWorking);
assert(!sql.empty()); assert(!sql.empty());
_cb = rcb; _cb = std::move(rcb);
_idleCb = idleCb; _idleCb = std::move(idleCb);
_isWorking = true; _isWorking = true;
_exceptCb = exceptCallback; _exceptCb = std::move(exceptCallback);
_sql.clear(); _sql.clear();
if (paraNum > 0) if (paraNum > 0)
{ {

View File

@ -37,14 +37,14 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this
public: public:
MysqlConnection(trantor::EventLoop *loop, const std::string &connInfo); MysqlConnection(trantor::EventLoop *loop, const std::string &connInfo);
~MysqlConnection() {} ~MysqlConnection() {}
virtual void execSql(const std::string &sql, virtual void execSql(std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &rcb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback, std::function<void(const std::exception_ptr &)> &&exceptCallback,
const std::function<void()> &idleCb) override; std::function<void()> &&idleCb) override;
private: private:
std::unique_ptr<trantor::Channel> _channelPtr; std::unique_ptr<trantor::Channel> _channelPtr;

View File

@ -34,8 +34,8 @@ class MysqlResultImpl : public ResultImpl
const std::string &query, const std::string &query,
size_type affectedRows, size_type affectedRows,
unsigned long long insertId) noexcept unsigned long long insertId) noexcept
: _result(r), : ResultImpl(query),
_query(query), _result(r),
_rowsNum(_result ? mysql_num_rows(_result.get()) : 0), _rowsNum(_result ? mysql_num_rows(_result.get()) : 0),
_fieldArray(r ? mysql_fetch_fields(r.get()) : nullptr), _fieldArray(r ? mysql_fetch_fields(r.get()) : nullptr),
_fieldNum(r ? mysql_num_fields(r.get()) : 0), _fieldNum(r ? mysql_num_fields(r.get()) : 0),
@ -78,7 +78,6 @@ class MysqlResultImpl : public ResultImpl
private: private:
const std::shared_ptr<MYSQL_RES> _result; const std::shared_ptr<MYSQL_RES> _result;
const std::string _query;
const Result::size_type _rowsNum; const Result::size_type _rowsNum;
const MYSQL_FIELD *_fieldArray; const MYSQL_FIELD *_fieldArray;
const Result::row_size_type _fieldNum; const Result::row_size_type _fieldNum;

View File

@ -97,12 +97,14 @@ void PgConnection::pgPoll()
LOG_ERROR << "!!!Pg connection failed: " << PQerrorMessage(_connPtr.get()); LOG_ERROR << "!!!Pg connection failed: " << PQerrorMessage(_connPtr.get());
break; break;
case PGRES_POLLING_WRITING: case PGRES_POLLING_WRITING:
_channel.enableWriting(); if (!_channel.isWriting())
_channel.disableReading(); _channel.enableWriting();
break; break;
case PGRES_POLLING_READING: case PGRES_POLLING_READING:
_channel.enableReading(); if (!_channel.isReading())
_channel.disableWriting(); _channel.enableReading();
if (_channel.isWriting())
_channel.disableWriting();
break; break;
case PGRES_POLLING_OK: case PGRES_POLLING_OK:
@ -112,8 +114,10 @@ void PgConnection::pgPoll()
assert(_okCb); assert(_okCb);
_okCb(shared_from_this()); _okCb(shared_from_this());
} }
_channel.enableReading(); if (!_channel.isReading())
_channel.disableWriting(); _channel.enableReading();
if (_channel.isWriting())
_channel.disableWriting();
break; break;
case PGRES_POLLING_ACTIVE: case PGRES_POLLING_ACTIVE:
//unused! //unused!
@ -122,14 +126,14 @@ void PgConnection::pgPoll()
break; break;
} }
} }
void PgConnection::execSql(const std::string &sql, void PgConnection::execSql(std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &rcb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback, std::function<void(const std::exception_ptr &)> &&exceptCallback,
const std::function<void()> &idleCb) std::function<void()> &&idleCb)
{ {
LOG_TRACE << sql; LOG_TRACE << sql;
assert(paraNum == parameters.size()); assert(paraNum == parameters.size());
@ -139,16 +143,16 @@ void PgConnection::execSql(const std::string &sql,
assert(idleCb); assert(idleCb);
assert(!_isWorking); assert(!_isWorking);
assert(!sql.empty()); assert(!sql.empty());
_sql = sql; _sql = std::move(sql);
_cb = rcb; _cb = std::move(rcb);
_idleCb = idleCb; _idleCb = std::move(idleCb);
_isWorking = true; _isWorking = true;
_exceptCb = exceptCallback; _exceptCb = std::move(exceptCallback);
auto thisPtr = shared_from_this(); auto thisPtr = shared_from_this();
_loop->runInLoop([thisPtr, sql, paraNum, parameters, length, format]() { _loop->runInLoop([thisPtr, paraNum=std::move(paraNum), parameters=std::move(parameters), length=std::move(length), format=std::move(format)]() {
if (PQsendQueryParams( if (PQsendQueryParams(
thisPtr->_connPtr.get(), thisPtr->_connPtr.get(),
sql.c_str(), thisPtr->_sql.c_str(),
paraNum, paraNum,
NULL, NULL,
parameters.data(), parameters.data(),
@ -198,8 +202,8 @@ void PgConnection::handleRead()
//need read more data from socket; //need read more data from socket;
return; return;
} }
if (_channel.isWriting())
_channel.disableWriting(); _channel.disableWriting();
// got query results? // got query results?
while ((res = std::shared_ptr<PGresult>(PQgetResult(_connPtr.get()), [](PGresult *p) { while ((res = std::shared_ptr<PGresult>(PQgetResult(_connPtr.get()), [](PGresult *p) {
PQclear(p); PQclear(p);

View File

@ -37,14 +37,14 @@ class PgConnection : public DbConnection, public std::enable_shared_from_this<Pg
public: public:
PgConnection(trantor::EventLoop *loop, const std::string &connInfo); PgConnection(trantor::EventLoop *loop, const std::string &connInfo);
virtual void execSql(const std::string &sql, virtual void execSql(std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &rcb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback, std::function<void(const std::exception_ptr &)> &&exceptCallback,
const std::function<void()> &idleCb) override; std::function<void()> &&idleCb) override;
private: private:
std::shared_ptr<PGconn> _connPtr; std::shared_ptr<PGconn> _connPtr;

View File

@ -29,8 +29,8 @@ class PostgreSQLResultImpl : public ResultImpl
{ {
public: public:
PostgreSQLResultImpl(const std::shared_ptr<PGresult> &r, const std::string &query) noexcept PostgreSQLResultImpl(const std::shared_ptr<PGresult> &r, const std::string &query) noexcept
: _result(r), :ResultImpl(query),
_query(query) _result(r)
{ {
} }
virtual size_type size() const noexcept override; virtual size_type size() const noexcept override;
@ -41,10 +41,8 @@ class PostgreSQLResultImpl : public ResultImpl
virtual const char *getValue(size_type row, row_size_type column) const override; virtual const char *getValue(size_type row, row_size_type column) const override;
virtual bool isNull(size_type row, row_size_type column) const override; virtual bool isNull(size_type row, row_size_type column) const override;
virtual field_size_type getLength(size_type row, row_size_type column) const override; virtual field_size_type getLength(size_type row, row_size_type column) const override;
private: private:
std::shared_ptr<PGresult> _result; std::shared_ptr<PGresult> _result;
std::string _query;
}; };
} // namespace orm } // namespace orm

View File

@ -86,17 +86,17 @@ Sqlite3Connection::Sqlite3Connection(trantor::EventLoop *loop, const std::string
}); });
} }
void Sqlite3Connection::execSql(const std::string &sql, void Sqlite3Connection::execSql(std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &rcb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback, std::function<void(const std::exception_ptr &)> &&exceptCallback,
const std::function<void()> &idleCb) std::function<void()> &&idleCb)
{ {
auto thisPtr = shared_from_this(); auto thisPtr = shared_from_this();
_loopThread.getLoop()->runInLoop([=]() { _loopThread.getLoop()->runInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback), idleCb = std::move(idleCb)]() mutable {
thisPtr->execSqlInQueue(sql, paraNum, parameters, length, format, rcb, exceptCallback, idleCb); thisPtr->execSqlInQueue(sql, paraNum, parameters, length, format, rcb, exceptCallback, idleCb);
}); });
} }

View File

@ -38,14 +38,14 @@ class Sqlite3Connection : public DbConnection, public std::enable_shared_from_th
public: public:
Sqlite3Connection(trantor::EventLoop *loop, const std::string &connInfo); Sqlite3Connection(trantor::EventLoop *loop, const std::string &connInfo);
virtual void execSql(const std::string &sql, virtual void execSql(std::string &&sql,
size_t paraNum, size_t paraNum,
const std::vector<const char *> &parameters, std::vector<const char *> &&parameters,
const std::vector<int> &length, std::vector<int> &&length,
const std::vector<int> &format, std::vector<int> &&format,
const ResultCallback &rcb, ResultCallback &&rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback, std::function<void(const std::exception_ptr &)> &&exceptCallback,
const std::function<void()> &idleCb) override; std::function<void()> &&idleCb) override;
private: private:
static std::once_flag _once; static std::once_flag _once;

View File

@ -30,6 +30,7 @@ class Sqlite3ResultImpl : public ResultImpl
{ {
public: public:
Sqlite3ResultImpl(const std::string &query) noexcept Sqlite3ResultImpl(const std::string &query) noexcept
: ResultImpl(query)
{ {
} }
virtual size_type size() const noexcept override; virtual size_type size() const noexcept override;