Improve the performance of dbClient

This commit is contained in:
antao 2019-02-15 17:40:30 +08:00 committed by an-tao
parent 38163a26a6
commit 13417edf10
6 changed files with 118 additions and 49 deletions

View File

@ -274,19 +274,13 @@ void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr)
{ {
_busyConnections.insert(connPtr); //For new connections, this sentence is necessary _busyConnections.insert(connPtr); //For new connections, this sentence is necessary
auto &cmd = _sqlCmdBuffer.front(); auto &cmd = _sqlCmdBuffer.front();
connPtr->loop()->queueInLoop([connPtr, cmd, this]() { execSql(connPtr, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb));
execSql(connPtr, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb));
});
_sqlCmdBuffer.pop_front(); _sqlCmdBuffer.pop_front();
return; return;
} }
}
//Idle connection
connPtr->loop()->queueInLoop([connPtr, this]() {
std::lock_guard<std::mutex> guard(_connectionsMutex);
_busyConnections.erase(connPtr); _busyConnections.erase(connPtr);
_readyConnections.insert(connPtr); _readyConnections.insert(connPtr);
}); }
} }
} }

View File

@ -78,7 +78,7 @@ class DbConnection : public trantor::NonCopyable
protected: protected:
QueryCallback _cb; QueryCallback _cb;
trantor::EventLoop *_loop; trantor::EventLoop *_loop;
std::function<void()> _idleCb; std::shared_ptr<std::function<void()>> _idleCbPtr;
ConnectStatus _status = ConnectStatus_None; ConnectStatus _status = ConnectStatus_None;
DbConnectionCallback _closeCb = [](const DbConnectionPtr &) {}; DbConnectionCallback _closeCb = [](const DbConnectionPtr &) {};
DbConnectionCallback _okCb = [](const DbConnectionPtr &) {}; DbConnectionCallback _okCb = [](const DbConnectionPtr &) {};

View File

@ -150,13 +150,15 @@ void TransactionImpl::rollback()
SqlCmd cmd; SqlCmd cmd;
cmd._sql = "rollback"; cmd._sql = "rollback";
cmd._paraNum = 0; cmd._paraNum = 0;
cmd._cb = [clearupCb](const Result &r) { cmd._cb = [](const Result &r) {
LOG_TRACE << "Transaction roll back!"; LOG_TRACE << "Transaction roll back!";
clearupCb(); //clearupCb();
}; };
cmd._exceptCb = [clearupCb](const std::exception_ptr &ePtr) { cmd._exceptCb = [](const std::exception_ptr &ePtr) {
clearupCb(); //clearupCb();
LOG_ERROR << "Transaction rool back error";
}; };
cmd._idleCb = clearupCb;
//Rollback cmd should be executed firstly, so we push it in front of the list //Rollback cmd should be executed firstly, so we push it in front of the list
thisPtr->_sqlCmdBuffer.push_front(std::move(cmd)); thisPtr->_sqlCmdBuffer.push_front(std::move(cmd));
return; return;
@ -169,14 +171,16 @@ void TransactionImpl::rollback()
std::vector<const char *>(), std::vector<const char *>(),
std::vector<int>(), std::vector<int>(),
std::vector<int>(), std::vector<int>(),
[clearupCb](const Result &r) { [](const Result &r) {
LOG_TRACE << "Transaction roll back!"; LOG_TRACE << "Transaction roll back!";
clearupCb(); //clearupCb();
}, },
[clearupCb](const std::exception_ptr &ePtr) { [](const std::exception_ptr &ePtr) {
clearupCb(); //clearupCb();
LOG_ERROR << "Transaction rool back error";
}, },
[thisPtr]() { [thisPtr, clearupCb]() {
clearupCb();
thisPtr->execNewTask(); thisPtr->execNewTask();
}); });
}); });
@ -204,11 +208,14 @@ void TransactionImpl::execNewTask()
std::move(cmd._format), std::move(cmd._format),
std::move(cmd._cb), std::move(cmd._cb),
[cmd, thisPtr](const std::exception_ptr &ePtr) { [cmd, thisPtr](const std::exception_ptr &ePtr) {
thisPtr->rollback(); if (!cmd._idleCb)
thisPtr->rollback();
if (cmd._exceptCb) if (cmd._exceptCb)
cmd._exceptCb(ePtr); cmd._exceptCb(ePtr);
}, },
[thisPtr]() { [cmd,thisPtr]() {
if(cmd._idleCb)
cmd._idleCb();
thisPtr->execNewTask(); thisPtr->execNewTask();
}); });
}); });

View File

@ -57,6 +57,7 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
std::vector<int> _format; std::vector<int> _format;
QueryCallback _cb; QueryCallback _cb;
ExceptPtrCallback _exceptCb; ExceptPtrCallback _exceptCb;
std::function<void()> _idleCb;
}; };
std::list<SqlCmd> _sqlCmdBuffer; std::list<SqlCmd> _sqlCmdBuffer;
// std::mutex _bufferMutex; // std::mutex _bufferMutex;

View File

@ -311,7 +311,7 @@ void MysqlConnection::execSql(std::string &&sql,
assert(!sql.empty()); assert(!sql.empty());
_cb = std::move(rcb); _cb = std::move(rcb);
_idleCb = std::move(idleCb); _idleCbPtr = std::make_shared<std::function<void()>>(std ::move(idleCb));
_isWorking = true; _isWorking = true;
_exceptCb = std::move(exceptCallback); _exceptCb = std::move(exceptCallback);
_sql.clear(); _sql.clear();
@ -431,10 +431,11 @@ void MysqlConnection::outputError()
_cb = decltype(_cb)(); _cb = decltype(_cb)();
_isWorking = false; _isWorking = false;
if (_idleCb) if (_idleCbPtr)
{ {
_idleCb(); auto idle = std::move(_idleCbPtr);
_idleCb = decltype(_idleCb)(); _idleCbPtr.reset();
(*idle)();
} }
} }
} }
@ -451,7 +452,11 @@ void MysqlConnection::getResult(MYSQL_RES *res)
_cb = decltype(_cb)(); _cb = decltype(_cb)();
_exceptCb = decltype(_exceptCb)(); _exceptCb = decltype(_exceptCb)();
_isWorking = false; _isWorking = false;
_idleCb(); if (_idleCbPtr)
_idleCb = decltype(_idleCb)(); {
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
} }
} }

View File

@ -16,6 +16,7 @@
#include "PostgreSQLResultImpl.h" #include "PostgreSQLResultImpl.h"
#include <trantor/utils/Logger.h> #include <trantor/utils/Logger.h>
#include <drogon/orm/Exception.h> #include <drogon/orm/Exception.h>
#include <memory>
#include <stdio.h> #include <stdio.h>
using namespace drogon::orm; using namespace drogon::orm;
@ -166,14 +167,55 @@ void PgConnection::execSql(std::string &&sql,
assert(!sql.empty()); assert(!sql.empty());
_sql = std::move(sql); _sql = std::move(sql);
_cb = std::move(rcb); _cb = std::move(rcb);
_idleCb = std::move(idleCb); _idleCbPtr = std::make_shared<std::function<void()>>(std::move(idleCb));
_isWorking = true; _isWorking = true;
_exceptCb = std::move(exceptCallback); _exceptCb = std::move(exceptCallback);
auto thisPtr = shared_from_this(); auto thisPtr = shared_from_this();
_loop->runInLoop([thisPtr, paraNum = std::move(paraNum), parameters = std::move(parameters), length = std::move(length), format = std::move(format)]() { if (!_loop->isInLoopThread())
{
_loop->queueInLoop([thisPtr, paraNum = std::move(paraNum), parameters = std::move(parameters), length = std::move(length), format = std::move(format)]() {
if (PQsendQueryParams(
thisPtr->_connPtr.get(),
thisPtr->_sql.c_str(),
paraNum,
NULL,
parameters.data(),
length.data(),
format.data(),
0) == 0)
{
LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get());
if (thisPtr->_isWorking)
{
thisPtr->_isWorking = false;
try
{
throw Failure(PQerrorMessage(thisPtr->_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
thisPtr->_exceptCb(exceptPtr);
thisPtr->_exceptCb = decltype(_exceptCb)();
}
thisPtr->_cb = decltype(_cb)();
if (thisPtr->_idleCbPtr)
{
auto idle = std::move(thisPtr->_idleCbPtr);
thisPtr->_idleCbPtr.reset();
(*idle)();
}
}
return;
}
thisPtr->pgPoll();
});
}
else
{
if (PQsendQueryParams( if (PQsendQueryParams(
thisPtr->_connPtr.get(), _connPtr.get(),
thisPtr->_sql.c_str(), _sql.c_str(),
paraNum, paraNum,
NULL, NULL,
parameters.data(), parameters.data(),
@ -182,9 +224,31 @@ void PgConnection::execSql(std::string &&sql,
0) == 0) 0) == 0)
{ {
LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get()); LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get());
if (_isWorking)
{
_isWorking = false;
try
{
throw Failure(PQerrorMessage(_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
_exceptCb(exceptPtr);
_exceptCb = decltype(_exceptCb)();
}
_cb = decltype(_cb)();
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
}
return;
} }
thisPtr->pgPoll(); thisPtr->pgPoll();
}); }
} }
void PgConnection::handleRead() void PgConnection::handleRead()
@ -209,10 +273,11 @@ void PgConnection::handleRead()
_exceptCb = decltype(_exceptCb)(); _exceptCb = decltype(_exceptCb)();
} }
_cb = decltype(_cb)(); _cb = decltype(_cb)();
if (_idleCb) if (_idleCbPtr)
{ {
_idleCb(); auto idle = std::move(_idleCbPtr);
_idleCb = decltype(_idleCb)(); _idleCbPtr.reset();
(*idle)();
} }
} }
handleClosed(); handleClosed();
@ -225,7 +290,6 @@ void PgConnection::handleRead()
} }
if (_channel.isWriting()) if (_channel.isWriting())
_channel.disableWriting(); _channel.disableWriting();
// got query results?
while ((res = std::shared_ptr<PGresult>(PQgetResult(_connPtr.get()), [](PGresult *p) { while ((res = std::shared_ptr<PGresult>(PQgetResult(_connPtr.get()), [](PGresult *p) {
PQclear(p); PQclear(p);
}))) })))
@ -236,18 +300,15 @@ void PgConnection::handleRead()
LOG_WARN << PQerrorMessage(_connPtr.get()); LOG_WARN << PQerrorMessage(_connPtr.get());
if (_isWorking) if (_isWorking)
{ {
try
{ {
try //TODO: exception type
{ throw SqlError(PQerrorMessage(_connPtr.get()), _sql);
//TODO: exception type }
throw SqlError(PQerrorMessage(_connPtr.get()), catch (...)
_sql); {
} _exceptCb(std::current_exception());
catch (...) _exceptCb = decltype(_exceptCb)();
{
_exceptCb(std::current_exception());
_exceptCb = decltype(_exceptCb)();
}
} }
_cb = decltype(_cb)(); _cb = decltype(_cb)();
} }
@ -266,10 +327,11 @@ void PgConnection::handleRead()
if (_isWorking) if (_isWorking)
{ {
_isWorking = false; _isWorking = false;
if (_idleCb) if (_idleCbPtr)
{ {
_idleCb(); auto idle = std::move(_idleCbPtr);
_idleCb = decltype(_idleCb)(); _idleCbPtr.reset();
(*idle)();
} }
} }
} }