Use prepared statement in postgresql connections

This commit is contained in:
antao 2019-02-19 15:06:16 +08:00
parent ca98eccdfe
commit 6c4ad73f7a
2 changed files with 140 additions and 65 deletions

View File

@ -16,6 +16,7 @@
#include "PostgreSQLResultImpl.h" #include "PostgreSQLResultImpl.h"
#include <trantor/utils/Logger.h> #include <trantor/utils/Logger.h>
#include <drogon/orm/Exception.h> #include <drogon/orm/Exception.h>
#include <drogon/utils/Utilities.h>
#include <memory> #include <memory>
#include <stdio.h> #include <stdio.h>
@ -148,16 +149,17 @@ void PgConnection::pgPoll()
} }
} }
void PgConnection::execSql(std::string &&sql, void PgConnection::execSqlInLoop(std::string &&sql,
size_t paraNum, size_t paraNum,
std::vector<const char *> &&parameters, std::vector<const char *> &&parameters,
std::vector<int> &&length, std::vector<int> &&length,
std::vector<int> &&format, std::vector<int> &&format,
ResultCallback &&rcb, ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback, std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) std::function<void()> &&idleCb)
{ {
LOG_TRACE << sql; LOG_TRACE << sql;
_loop->assertInLoopThread();
assert(paraNum == parameters.size()); assert(paraNum == parameters.size());
assert(paraNum == length.size()); assert(paraNum == length.size());
assert(paraNum == format.size()); assert(paraNum == format.size());
@ -170,60 +172,19 @@ void PgConnection::execSql(std::string &&sql,
_idleCbPtr = std::make_shared<std::function<void()>>(std::move(idleCb)); _idleCbPtr = std::make_shared<std::function<void()>>(std::move(idleCb));
_isWorking = true; _isWorking = true;
_exceptCb = std::move(exceptCallback); _exceptCb = std::move(exceptCallback);
auto thisPtr = shared_from_this(); auto iter = _preparedStatementMap.find(_sql);
if (!_loop->isInLoopThread()) if (iter != _preparedStatementMap.end())
{ {
_loop->queueInLoop([thisPtr, paraNum = std::move(paraNum), parameters = std::move(parameters), length = std::move(length), format = std::move(format)]() { if (PQsendQueryPrepared(
if (PQsendQueryParams(
thisPtr->_connPtr.get(),
thisPtr->_sql.c_str(),
paraNum,
NULL,
parameters.data(),
length.data(),
format.data(),
0) == 0)
{
LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get());
if (thisPtr->_isWorking)
{
thisPtr->_isWorking = false;
try
{
throw Failure(PQerrorMessage(thisPtr->_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
thisPtr->_exceptCb(exceptPtr);
thisPtr->_exceptCb = decltype(_exceptCb)();
}
thisPtr->_cb = decltype(_cb)();
if (thisPtr->_idleCbPtr)
{
auto idle = std::move(thisPtr->_idleCbPtr);
thisPtr->_idleCbPtr.reset();
(*idle)();
}
}
return;
}
thisPtr->pgPoll();
});
}
else
{
if (PQsendQueryParams(
_connPtr.get(), _connPtr.get(),
_sql.c_str(), iter->second.c_str(),
paraNum, paraNum,
NULL,
parameters.data(), parameters.data(),
length.data(), length.data(),
format.data(), format.data(),
0) == 0) 0) == 0)
{ {
LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get()); LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get());
if (_isWorking) if (_isWorking)
{ {
_isWorking = false; _isWorking = false;
@ -247,8 +208,80 @@ void PgConnection::execSql(std::string &&sql,
} }
return; return;
} }
thisPtr->pgPoll();
} }
else
{
_isRreparingStatement = true;
auto statementName = getuuid();
if (PQsendPrepare(_connPtr.get(), statementName.c_str(), _sql.c_str(), paraNum, NULL) == 0)
{
LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get());
if (_isWorking)
{
_isWorking = false;
try
{
throw Failure(PQerrorMessage(_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
_exceptCb(exceptPtr);
_exceptCb = decltype(_exceptCb)();
}
_cb = decltype(_cb)();
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
}
return;
}
std::weak_ptr<PgConnection> weakPtr = shared_from_this();
_preparingCallback = [weakPtr, statementName, paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format)]() {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->_isRreparingStatement = false;
thisPtr->_preparedStatementMap[thisPtr->_sql] = statementName;
if (PQsendQueryPrepared(
thisPtr->_connPtr.get(),
statementName.c_str(),
paraNum,
parameters.data(),
length.data(),
format.data(),
0) == 0)
{
LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get());
if (thisPtr->_isWorking)
{
thisPtr->_isWorking = false;
try
{
throw Failure(PQerrorMessage(thisPtr->_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
thisPtr->_exceptCb(exceptPtr);
thisPtr->_exceptCb = decltype(thisPtr->_exceptCb)();
}
thisPtr->_cb = decltype(thisPtr->_cb)();
if (thisPtr->_idleCbPtr)
{
auto idle = std::move(thisPtr->_idleCbPtr);
thisPtr->_idleCbPtr.reset();
(*idle)();
}
}
return;
}
};
}
pgPoll();
} }
void PgConnection::handleRead() void PgConnection::handleRead()
@ -290,6 +323,7 @@ void PgConnection::handleRead()
} }
if (_channel.isWriting()) if (_channel.isWriting())
_channel.disableWriting(); _channel.disableWriting();
bool isPreparing = false;
while ((res = std::shared_ptr<PGresult>(PQgetResult(_connPtr.get()), [](PGresult *p) { while ((res = std::shared_ptr<PGresult>(PQgetResult(_connPtr.get()), [](PGresult *p) {
PQclear(p); PQclear(p);
}))) })))
@ -317,21 +351,36 @@ void PgConnection::handleRead()
{ {
if (_isWorking) if (_isWorking)
{ {
auto r = makeResult(res, _sql); if (_isRreparingStatement)
_cb(r); {
_cb = decltype(_cb)(); isPreparing = true;
_exceptCb = decltype(_exceptCb)(); }
else
{
auto r = makeResult(res, _sql);
_cb(r);
_cb = decltype(_cb)();
_exceptCb = decltype(_exceptCb)();
}
} }
} }
} }
if (_isWorking) if (_isWorking)
{ {
_isWorking = false; if(isPreparing)
if (_idleCbPtr)
{ {
auto idle = std::move(_idleCbPtr); _preparingCallback();
_idleCbPtr.reset(); _preparingCallback = std::function<void()>();
(*idle)();
} }
else
{
_isWorking = false;
if (_idleCbPtr)
{
auto idle = std::move(_idleCbPtr);
_idleCbPtr.reset();
(*idle)();
}
}
} }
} }

View File

@ -24,6 +24,7 @@
#include <string> #include <string>
#include <functional> #include <functional>
#include <iostream> #include <iostream>
#include <unordered_map>
namespace drogon namespace drogon
{ {
@ -44,15 +45,40 @@ class PgConnection : public DbConnection, public std::enable_shared_from_this<Pg
std::vector<int> &&format, std::vector<int> &&format,
ResultCallback &&rcb, ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback, std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override; std::function<void()> &&idleCb) override
{
if (_loop->isInLoopThread())
{
execSqlInLoop(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), std::move(rcb), std::move(exceptCallback), std::move(idleCb));
}
else
{
auto thisPtr = shared_from_this();
_loop->queueInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback), idleCb = std::move(idleCb)]() mutable {
thisPtr->execSqlInLoop(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format), std::move(rcb), std::move(exceptCallback), std::move(idleCb));
});
}
}
virtual void disconnect() override; virtual void disconnect() override;
private: private:
std::shared_ptr<PGconn> _connPtr; std::shared_ptr<PGconn> _connPtr;
trantor::Channel _channel; trantor::Channel _channel;
std::unordered_map<std::string, std::string> _preparedStatementMap;
bool _isRreparingStatement = false;
void handleRead(); void handleRead();
void pgPoll(); void pgPoll();
void handleClosed(); void handleClosed();
void execSqlInLoop(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb);
std::function<void()> _preparingCallback;
}; };
} // namespace orm } // namespace orm