Merge pull request #59 from an-tao/dev

Improve database client performance and fix some bugs
This commit is contained in:
An Tao 2019-02-23 01:28:35 +08:00 committed by GitHub
commit 1b9e91361c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 1354 additions and 519 deletions

View File

@ -133,6 +133,8 @@
],
//idle_connection_timeout: Defaults to 60 seconds, the lifetime
//of the connection without read or write
"idle_connection_timeout": 60
"idle_connection_timeout": 60,
//enable_fast_db_client: Defaults to false
"enable_fast_db_client": false
}
}

View File

@ -133,6 +133,8 @@
],
//idle_connection_timeout: Defaults to 60 seconds, the lifetime
//of the connection without read or write
"idle_connection_timeout": 60
"idle_connection_timeout": 60,
//enable_fast_db_client: Defaults to false
"enable_fast_db_client": false
}
}

View File

@ -3,7 +3,7 @@
GIT_VER=$(git log|grep ^commit|wc -l|sed -e "s/^ *//")
MD5=$(git log|head -1|awk '{printf $2}')
TMP_FILE=/tmp/version
echo "#define VERSION \"0.9.24.$GIT_VER\"" > ${TMP_FILE}
echo "#define VERSION \"0.9.25.$GIT_VER\"" > ${TMP_FILE}
echo "#define VERSION_MD5 \"$MD5\"" >> ${TMP_FILE}
if [ ! -f $1 ];then
mv -f ${TMP_FILE} $1

View File

@ -17,6 +17,7 @@
#include <drogon/config.h>
#if USE_ORM
#include <drogon/orm/DbClient.h>
#define USE_FAST_CLIENT 1
#endif
#include <drogon/utils/Utilities.h>
#include <drogon/HttpBinder.h>
@ -185,6 +186,10 @@ class HttpAppFramework : public trantor::NonCopyable
virtual void setIdleConnectionTimeout(size_t timeout) = 0;
#if USE_ORM
virtual orm::DbClientPtr getDbClient(const std::string &name = "default") = 0;
#if USE_FAST_CLIENT
virtual void enableFastDbClient() = 0;
virtual orm::DbClientPtr getFastDbClient(const std::string &name = "default") = 0;
#endif
virtual void createDbClient(const std::string &dbType,
const std::string &host,
const u_short port,

View File

@ -107,8 +107,8 @@ class HttpRequest
virtual void setParameter(const std::string &key, const std::string &value) = 0;
/// Set or get the content type
virtual void setContentTypeCode(ContentType type) = 0;
virtual void setContentTypeCodeAndCharacterSet(ContentType type, const std::string &charSet = "utf-8") = 0;
virtual void setContentTypeCode(const ContentType type) = 0;
/// virtual void setContentTypeCodeAndCharacterSet(ContentType type, const std::string &charSet = "utf-8") = 0;
virtual ContentType getContentTypeCode() = 0;
/// The following methods are a series of factory methods that help users create request objects.

View File

@ -58,7 +58,7 @@ class HttpResponse
virtual void setContentTypeCode(ContentType type) = 0;
/// Set the reponse content type and the character set.
virtual void setContentTypeCodeAndCharacterSet(ContentType type, const std::string &charSet = "utf-8") = 0;
/// virtual void setContentTypeCodeAndCharacterSet(ContentType type, const std::string &charSet = "utf-8") = 0;
/// Get the response content type.
virtual ContentType getContentTypeCode() = 0;
@ -103,7 +103,7 @@ class HttpResponse
/// Get the expiration time of the response.
virtual ssize_t expiredTime() const = 0;
/// Get the json object from the server response. If the response is not in json format,
/// Get the json object from the server response. If the response is not in json format,
/// then a empty shared_ptr will be retured.
virtual const std::shared_ptr<Json::Value> getJsonObject() const = 0;

View File

@ -228,12 +228,20 @@ static void loadApp(const Json::Value &app)
//Kick off idle connections
auto kickOffTimeout = app.get("idle_connection_timeout", 60).asUInt64();
drogon::app().setIdleConnectionTimeout(kickOffTimeout);
#if USE_ORM
#if USE_FAST_CLIENT
//Fast db client
auto fastDbClient = app.get("enable_fast_db_client", false).asBool();
if (fastDbClient)
drogon::app().enableFastDbClient();
#endif
#endif
}
static void loadDbClients(const Json::Value &dbClients)
{
if (!dbClients)
return;
#if USE_ORM
#if USE_ORM
for (auto const &client : dbClients)
{
auto type = client.get("rdbms", "postgresql").asString();

View File

@ -15,6 +15,11 @@
#include "HttpAppFrameworkImpl.h"
#include "ConfigLoader.h"
#include "HttpServer.h"
#if USE_ORM
#if USE_FAST_CLIENT
#include "../../orm_lib/src/DbClientLockFree.h"
#endif
#endif
#include <drogon/HttpTypes.h>
#include <drogon/utils/Utilities.h>
#include <drogon/DrClassMap.h>
@ -278,15 +283,17 @@ void HttpAppFrameworkImpl::run()
std::vector<std::shared_ptr<EventLoopThread>> loopThreads;
std::vector<trantor::EventLoop *> ioLoops;
for (auto const &listener : _listeners)
#ifdef __linux__
for (size_t i = 0; i < _threadNum; i++)
{
LOG_TRACE << "thread num=" << _threadNum;
#ifdef __linux__
for (size_t i = 0; i < _threadNum; i++)
auto loopThreadPtr = std::make_shared<EventLoopThread>("DrogonIoLoop");
loopThreadPtr->run();
loopThreads.push_back(loopThreadPtr);
ioLoops.push_back(loopThreadPtr->getLoop());
for (auto const &listener : _listeners)
{
auto loopThreadPtr = std::make_shared<EventLoopThread>("DrogonIoLoop");
loopThreadPtr->run();
loopThreads.push_back(loopThreadPtr);
auto serverPtr = std::make_shared<HttpServer>(loopThreadPtr->getLoop(),
InetAddress(std::get<0>(listener), std::get<1>(listener)), "drogon");
if (std::get<2>(listener))
@ -312,9 +319,12 @@ void HttpAppFrameworkImpl::run()
serverPtr->kickoffIdleConnections(_idleConnectionTimeout);
serverPtr->start();
servers.push_back(serverPtr);
ioLoops.push_back(serverPtr->getLoop());
}
}
#else
for (auto const &listener : _listeners)
{
LOG_TRACE << "thread num=" << _threadNum;
auto loopThreadPtr = std::make_shared<EventLoopThread>("DrogonListeningLoop");
loopThreadPtr->run();
loopThreads.push_back(loopThreadPtr);
@ -359,8 +369,16 @@ void HttpAppFrameworkImpl::run()
ioLoops.push_back(serverIoLoop);
}
servers.push_back(serverPtr);
#endif
}
#endif
#if USE_ORM
#if USE_FAST_CLIENT
// Create fast db clients for every io loop
if (_enableFastDbClient)
createFastDbClient(ioLoops);
#endif
#endif
_httpCtrlsRouter.init(ioLoops);
_httpSimpleCtrlsRouter.init(ioLoops);
_websockCtrlsRouter.init();
@ -395,7 +413,27 @@ void HttpAppFrameworkImpl::run()
_responseCachingMap = std::unique_ptr<CacheMap<std::string, HttpResponsePtr>>(new CacheMap<std::string, HttpResponsePtr>(loop(), 1.0, 4, 50)); //Max timeout up to about 70 days;
loop()->loop();
}
#if USE_ORM
#if USE_FAST_CLIENT
void HttpAppFrameworkImpl::createFastDbClient(const std::vector<trantor::EventLoop *> &ioloops)
{
for (auto &iter : _dbClientsMap)
{
for (auto *loop : ioloops)
{
if (iter.second->type() == drogon::orm::ClientType::Sqlite3)
{
_dbFastClientsMap[iter.first][loop] = iter.second;
}
if (iter.second->type() == drogon::orm::ClientType::PostgreSQL || iter.second->type() == drogon::orm::ClientType::Mysql)
{
_dbFastClientsMap[iter.first][loop] = std::shared_ptr<drogon::orm::DbClient>(new drogon::orm::DbClientLockFree(iter.second->connectionInfo(), loop, iter.second->type()));
}
}
}
}
#endif
#endif
void HttpAppFrameworkImpl::onWebsockDisconnect(const WebSocketConnectionPtr &wsConnPtr)
{
auto wsConnImplPtr = std::dynamic_pointer_cast<WebSocketConnectionImpl>(wsConnPtr);
@ -438,7 +476,7 @@ void HttpAppFrameworkImpl::onConnection(const TcpConnectionPtr &conn)
}
else
{
#if (CXX_STD > 14)
#if CXX_STD >= 17
if (!conn->getContext().has_value())
#else
if (conn->getContext().empty())
@ -775,7 +813,12 @@ orm::DbClientPtr HttpAppFrameworkImpl::getDbClient(const std::string &name)
{
return _dbClientsMap[name];
}
#if USE_FAST_CLIENT
orm::DbClientPtr HttpAppFrameworkImpl::getFastDbClient(const std::string &name)
{
return _dbFastClientsMap[name][trantor::EventLoop::getEventLoopOfCurrentThread()];
}
#endif
void HttpAppFrameworkImpl::createDbClient(const std::string &dbType,
const std::string &host,
const u_short port,

View File

@ -108,6 +108,13 @@ class HttpAppFrameworkImpl : public HttpAppFramework
}
#if USE_ORM
virtual orm::DbClientPtr getDbClient(const std::string &name = "default") override;
#if USE_FAST_CLIENT
virtual void enableFastDbClient() override
{
_enableFastDbClient = true;
}
virtual orm::DbClientPtr getFastDbClient(const std::string &name = "default") override;
#endif
virtual void createDbClient(const std::string &dbType,
const std::string &host,
const u_short port,
@ -187,12 +194,17 @@ class HttpAppFrameworkImpl : public HttpAppFramework
size_t _logfileSize = 100000000;
bool _useSendfile = true;
bool _useGzip = true;
bool _enableFastDbClient = false;
int _staticFilesCacheTime = 5;
std::unordered_map<std::string, std::weak_ptr<HttpResponse>> _staticFilesCache;
std::mutex _staticFilesCacheMutex;
#if USE_ORM
std::map<std::string, orm::DbClientPtr> _dbClientsMap;
std::vector<std::function<void()>> _dbFuncs;
#if USE_FAST_CLIENT
std::map<std::string, std::map<trantor::EventLoop *, orm::DbClientPtr>> _dbFastClientsMap;
void createFastDbClient(const std::vector<trantor::EventLoop *> &ioloops);
#endif
#endif
};

View File

@ -24,6 +24,6 @@ HttpFileUploadRequest::HttpFileUploadRequest(const std::vector<UploadFile> &file
{
setMethod(drogon::Post);
setVersion(drogon::HttpRequest::kHttp11);
setContentType("multipart/form-data; boundary=" + _boundary);
setContentType("Content-Type: multipart/form-data; boundary=" + _boundary + "\r\n");
_contentType = CT_MULTIPART_FORM_DATA;
}

View File

@ -221,14 +221,16 @@ void HttpRequestImpl::appendToBuffer(MsgBuffer *output) const
char buf[64];
snprintf(buf, sizeof buf, "Content-Length: %lu\r\n", static_cast<long unsigned int>(content.length() + _content.length()));
output->append(buf);
if (_headers.find("Content-Type") == _headers.end())
if (_contentTypeString.empty())
{
output->append("Content-Type: ");
output->append(webContentTypeToString(_contentType));
output->append("\r\n");
auto &type = webContentTypeToString(_contentType);
output->append(type.data(), type.length());
}
}
if (!_contentTypeString.empty())
{
output->append(_contentTypeString);
}
for (auto it = _headers.begin(); it != _headers.end(); ++it)
{
output->append(it->first);
@ -311,7 +313,8 @@ void HttpRequestImpl::addHeader(const char *start, const char *colon, const char
}
else
{
_headers[std::move(field)] = std::move(value);
//_headers[std::move(field)] = std::move(value);
_headers.emplace(std::move(field), std::move(value));
}
}

View File

@ -332,17 +332,18 @@ class HttpRequestImpl : public HttpRequest
return _jsonPtr;
}
virtual void setContentTypeCode(ContentType type) override
virtual void setContentTypeCode(const ContentType type) override
{
_contentType = type;
setContentType(webContentTypeToString(type));
auto &typeStr = webContentTypeToString(type);
setContentType(std::string(typeStr.data(), typeStr.length()));
}
virtual void setContentTypeCodeAndCharacterSet(ContentType type, const std::string &charSet = "utf-8") override
{
_contentType = type;
setContentType(webContentTypeAndCharsetToString(type, charSet));
}
// virtual void setContentTypeCodeAndCharacterSet(ContentType type, const std::string &charSet = "utf-8") override
// {
// _contentType = type;
// setContentType(webContentTypeAndCharsetToString(type, charSet));
// }
virtual ContentType getContentTypeCode() override
{
@ -353,11 +354,11 @@ class HttpRequestImpl : public HttpRequest
friend class HttpRequest;
void setContentType(const std::string &contentType)
{
addHeader("Content-Type", contentType);
_contentTypeString = contentType;
}
void setContentType(std::string &&contentType)
{
addHeader("Content-Type", std::move(contentType));
_contentTypeString = std::move(contentType);
}
private:
@ -379,6 +380,7 @@ class HttpRequestImpl : public HttpRequest
size_t _contentLen;
trantor::EventLoop *_loop;
ContentType _contentType = CT_TEXT_PLAIN;
std::string _contentTypeString;
};
typedef std::shared_ptr<HttpRequestImpl> HttpRequestImplPtr;

View File

@ -26,11 +26,15 @@
using namespace trantor;
using namespace drogon;
static const std::string &getServerString()
{
static const std::string server = "Server: drogon/" + drogon::getVersion() + "\r\n";
return server;
}
HttpResponsePtr HttpResponse::newHttpResponse()
{
auto res = std::make_shared<HttpResponseImpl>();
res->setStatusCode(k200OK);
res->setContentTypeCode(CT_TEXT_HTML);
auto res = std::make_shared<HttpResponseImpl>(k200OK, CT_TEXT_HTML);
return res;
}
@ -42,12 +46,11 @@ HttpResponsePtr HttpResponse::newHttpJsonResponse(const Json::Value &data)
builder["commentStyle"] = "None";
builder["indentation"] = "";
});
auto res = std::make_shared<HttpResponseImpl>();
res->setStatusCode(k200OK);
res->setContentTypeCode(CT_APPLICATION_JSON);
auto res = std::make_shared<HttpResponseImpl>(k200OK, CT_APPLICATION_JSON);
res->setBody(writeString(builder, data));
return res;
}
HttpResponsePtr HttpResponse::newNotFoundResponse()
{
HttpViewData data;
@ -183,7 +186,7 @@ HttpResponsePtr HttpResponse::newFileResponse(const std::string &fullPath, const
return resp;
}
std::string HttpResponseImpl::web_response_code_to_string(int code)
const char *HttpResponseImpl::statusCodeToString(int code)
{
switch (code)
{
@ -292,7 +295,8 @@ void HttpResponseImpl::makeHeaderString(const std::shared_ptr<std::string> &head
assert(headerStringPtr);
snprintf(buf, sizeof buf, "HTTP/1.1 %d ", _statusCode);
headerStringPtr->append(buf);
headerStringPtr->append(_statusMessage);
if (_statusMessage)
headerStringPtr->append(_statusMessage);
headerStringPtr->append("\r\n");
if (_sendfileName.empty())
{
@ -322,7 +326,7 @@ void HttpResponseImpl::makeHeaderString(const std::shared_ptr<std::string> &head
//output->append("Connection: Keep-Alive\r\n");
}
}
headerStringPtr->append(_contentTypeString.data(), _contentTypeString.length());
for (auto it = _headers.begin(); it != _headers.end(); ++it)
{
headerStringPtr->append(it->first);
@ -330,10 +334,7 @@ void HttpResponseImpl::makeHeaderString(const std::shared_ptr<std::string> &head
headerStringPtr->append(it->second);
headerStringPtr->append("\r\n");
}
headerStringPtr->append("Server: drogon/");
headerStringPtr->append(drogon::getVersion());
headerStringPtr->append("\r\n");
headerStringPtr->append(getServerString());
}
std::shared_ptr<std::string> HttpResponseImpl::renderToString() const
@ -394,3 +395,34 @@ std::shared_ptr<std::string> HttpResponseImpl::renderToString() const
}
return httpString;
}
std::shared_ptr<std::string> HttpResponseImpl::renderHeaderForHeadMethod() const
{
auto httpString = std::make_shared<std::string>();
httpString->reserve(256);
if (!_fullHeaderString)
{
makeHeaderString(httpString);
}
else
{
httpString->append(*_fullHeaderString);
}
//output cookies
if (_cookies.size() > 0)
{
for (auto it = _cookies.begin(); it != _cookies.end(); ++it)
{
httpString->append(it->second.cookieString());
}
}
//output Date header
httpString->append("Date: ");
httpString->append(getHttpFullDate(trantor::Date::date()));
httpString->append("\r\n\r\n");
return httpString;
}

View File

@ -34,7 +34,7 @@ class HttpResponseImpl : public HttpResponse
friend class HttpResponseParser;
public:
explicit HttpResponseImpl()
HttpResponseImpl()
: _statusCode(kUnknown),
_creationDate(trantor::Date::now()),
_closeConnection(false),
@ -43,6 +43,18 @@ class HttpResponseImpl : public HttpResponse
_bodyPtr(new std::string())
{
}
HttpResponseImpl(HttpStatusCode code, ContentType type)
: _statusCode(code),
_statusMessage(statusCodeToString(code)),
_creationDate(trantor::Date::now()),
_closeConnection(false),
_leftBodyLength(0),
_currentChunkLength(0),
_bodyPtr(new std::string()),
_contentType(type),
_contentTypeString(webContentTypeToString(type))
{
}
virtual HttpStatusCode statusCode() override
{
return _statusCode;
@ -56,7 +68,7 @@ class HttpResponseImpl : public HttpResponse
virtual void setStatusCode(HttpStatusCode code) override
{
_statusCode = code;
setStatusMessage(web_response_code_to_string(code));
setStatusMessage(statusCodeToString(code));
}
virtual void setStatusCode(HttpStatusCode code, const std::string &status_message) override
@ -86,11 +98,11 @@ class HttpResponseImpl : public HttpResponse
setContentType(webContentTypeToString(type));
}
virtual void setContentTypeCodeAndCharacterSet(ContentType type, const std::string &charSet = "utf-8") override
{
_contentType = type;
setContentType(webContentTypeAndCharsetToString(type, charSet));
}
// virtual void setContentTypeCodeAndCharacterSet(ContentType type, const std::string &charSet = "utf-8") override
// {
// _contentType = type;
// setContentType(webContentTypeAndCharsetToString(type, charSet));
// }
virtual ContentType getContentTypeCode() override
{
@ -265,12 +277,12 @@ class HttpResponseImpl : public HttpResponse
_headers["location"] = url;
}
std::shared_ptr<std::string> renderToString() const;
std::shared_ptr<std::string> renderHeaderForHeadMethod() const;
virtual void clear() override
{
_statusCode = kUnknown;
_v = kHttp11;
_statusMessage.clear();
_statusMessage = nullptr;
_fullHeaderString.reset();
_headers.clear();
_cookies.clear();
@ -302,20 +314,21 @@ class HttpResponseImpl : public HttpResponse
}
void swap(HttpResponseImpl &that)
{
using std::swap;
_headers.swap(that._headers);
_cookies.swap(that._cookies);
std::swap(_statusCode, that._statusCode);
std::swap(_v, that._v);
_statusMessage.swap(that._statusMessage);
std::swap(_closeConnection, that._closeConnection);
swap(_statusCode, that._statusCode);
swap(_v, that._v);
swap(_statusMessage, that._statusMessage);
swap(_closeConnection, that._closeConnection);
_bodyPtr.swap(that._bodyPtr);
std::swap(_leftBodyLength, that._leftBodyLength);
std::swap(_currentChunkLength, that._currentChunkLength);
std::swap(_contentType, that._contentType);
swap(_leftBodyLength, that._leftBodyLength);
swap(_currentChunkLength, that._currentChunkLength);
swap(_contentType, that._contentType);
_jsonPtr.swap(that._jsonPtr);
_fullHeaderString.swap(that._fullHeaderString);
_httpString.swap(that._httpString);
std::swap(_datePos, that._datePos);
swap(_datePos, that._datePos);
}
void parseJson() const
{
@ -361,24 +374,25 @@ class HttpResponseImpl : public HttpResponse
}
protected:
static std::string web_response_code_to_string(int code);
static const char *statusCodeToString(int code);
void makeHeaderString(const std::shared_ptr<std::string> &headerStringPtr) const;
private:
std::unordered_map<std::string, std::string> _headers;
std::unordered_map<std::string, Cookie> _cookies;
HttpStatusCode _statusCode;
const char *_statusMessage = nullptr;
trantor::Date _creationDate;
Version _v;
std::string _statusMessage;
std::string _statusMessageString;
bool _closeConnection;
size_t _leftBodyLength;
size_t _currentChunkLength;
std::shared_ptr<std::string> _bodyPtr;
ContentType _contentType = CT_TEXT_HTML;
ssize_t _expriedTime = -1;
std::string _sendfileName;
mutable std::shared_ptr<Json::Value> _jsonPtr;
@ -389,23 +403,26 @@ class HttpResponseImpl : public HttpResponse
mutable std::string::size_type _datePos = std::string::npos;
mutable int64_t _httpStringDate = -1;
//trantor::Date receiveTime_;
ContentType _contentType = CT_TEXT_HTML;
string_view _contentTypeString = "Content-Type: text/html; charset=utf-8\r\n";
void setContentType(const string_view &contentType)
{
_contentTypeString = contentType;
}
void setContentType(const std::string &contentType)
{
addHeader("Content-Type", contentType);
}
void setContentType(std::string &&contentType)
{
addHeader("Content-Type", std::move(contentType));
}
void setStatusMessage(const std::string &message)
void setStatusMessage(const char *message)
{
_statusMessage = message;
}
void setStatusMessage(const std::string &message)
{
_statusMessageString = message;
_statusMessage = _statusMessageString.c_str();
}
void setStatusMessage(std::string &&message)
{
_statusMessage = std::move(message);
_statusMessageString = std::move(message);
_statusMessage = _statusMessageString.c_str();
}
};
typedef std::shared_ptr<HttpResponseImpl> HttpResponseImplPtr;

View File

@ -67,7 +67,7 @@ HttpServer::~HttpServer()
void HttpServer::start()
{
LOG_WARN << "HttpServer[" << _server.name()
LOG_TRACE << "HttpServer[" << _server.name()
<< "] starts listenning on " << _server.ipPort();
_server.start();
}
@ -159,33 +159,32 @@ void HttpServer::onRequest(const TcpConnectionPtr &conn, const HttpRequestImplPt
bool _close = connection == "close" ||
(req->getVersion() == HttpRequestImpl::kHttp10 && connection != "Keep-Alive");
bool _isHeadMethod = (req->method() == Head);
if (_isHeadMethod)
bool isHeadMethod = (req->method() == Head);
if (isHeadMethod)
{
req->setMethod(Get);
}
HttpRequestParser *requestParser = any_cast<HttpRequestParser>(conn->getMutableContext());
{
//std::lock_guard<std::mutex> guard(requestParser->getPipeLineMutex());
requestParser->pushRquestToPipeLine(req);
}
requestParser->pushRquestToPipeLine(req);
_httpAsyncCallback(req, [=](const HttpResponsePtr &response) {
if (!response)
return;
response->setCloseConnection(_close);
//if the request method is HEAD,remove the body of response(rfc2616-9.4)
auto newResp = response;
if (_isHeadMethod)
{
if (newResp->expiredTime() >= 0)
{
//Cached response,make a copy
newResp = std::make_shared<HttpResponseImpl>(*std::dynamic_pointer_cast<HttpResponseImpl>(response));
newResp->setExpiredTime(-1);
}
newResp->setBody(std::string());
}
//
// if (isHeadMethod)
// {
// if (newResp->expiredTime() >= 0)
// {
// //Cached response,make a copy
// newResp = std::make_shared<HttpResponseImpl>(*std::dynamic_pointer_cast<HttpResponseImpl>(response));
// newResp->setExpiredTime(-1);
// }
// newResp->setBody(std::string());
// }
auto newResp = response;
auto &sendfileName = std::dynamic_pointer_cast<HttpResponseImpl>(newResp)->sendfileName();
if (HttpAppFramework::instance().useGzip() &&
@ -231,17 +230,19 @@ void HttpServer::onRequest(const TcpConnectionPtr &conn, const HttpRequestImplPt
* rfc2616-8.1.1.2
*/
//std::lock_guard<std::mutex> guard(requestParser->getPipeLineMutex());
if (conn->disconnected())
return;
if (requestParser->getFirstRequest() == req)
{
requestParser->popFirstRequest();
sendResponse(conn, newResp);
sendResponse(conn, newResp, isHeadMethod);
while (1)
{
auto resp = requestParser->getFirstResponse();
if (resp)
{
requestParser->popFirstRequest();
sendResponse(conn, resp);
sendResponse(conn, resp, isHeadMethod);
}
else
return;
@ -255,21 +256,21 @@ void HttpServer::onRequest(const TcpConnectionPtr &conn, const HttpRequestImplPt
}
else
{
conn->getLoop()->queueInLoop([conn, req, newResp, this]() {
conn->getLoop()->queueInLoop([conn, req, newResp, this, isHeadMethod]() {
HttpRequestParser *requestParser = any_cast<HttpRequestParser>(conn->getMutableContext());
if (requestParser)
{
if (requestParser->getFirstRequest() == req)
{
requestParser->popFirstRequest();
sendResponse(conn, newResp);
sendResponse(conn, newResp, isHeadMethod);
while (1)
{
auto resp = requestParser->getFirstResponse();
if (resp)
{
requestParser->popFirstRequest();
sendResponse(conn, resp);
sendResponse(conn, resp, isHeadMethod);
}
else
return;
@ -286,16 +287,27 @@ void HttpServer::onRequest(const TcpConnectionPtr &conn, const HttpRequestImplPt
});
}
void HttpServer::sendResponse(const TcpConnectionPtr &conn,
const HttpResponsePtr &response)
const HttpResponsePtr &response,
bool isHeadMethod)
{
conn->getLoop()->assertInLoopThread();
auto httpString = std::dynamic_pointer_cast<HttpResponseImpl>(response)->renderToString();
conn->send(httpString);
auto &sendfileName = std::dynamic_pointer_cast<HttpResponseImpl>(response)->sendfileName();
if (!sendfileName.empty())
auto respImplPtr = std::dynamic_pointer_cast<HttpResponseImpl>(response);
if (!isHeadMethod)
{
conn->sendFile(sendfileName.c_str());
auto httpString = respImplPtr->renderToString();
conn->send(httpString);
auto &sendfileName = respImplPtr->sendfileName();
if (!sendfileName.empty())
{
conn->sendFile(sendfileName.c_str());
}
}
else
{
auto httpString = respImplPtr->renderHeaderForHeadMethod();
conn->send(httpString);
}
if (response->closeConnection())
{
conn->shutdown();

View File

@ -101,7 +101,7 @@ class HttpServer : trantor::NonCopyable
MsgBuffer *);
void onRequest(const TcpConnectionPtr &, const HttpRequestImplPtr &);
bool isWebSocket(const TcpConnectionPtr &conn, const HttpRequestImplPtr &req);
void sendResponse(const TcpConnectionPtr &, const HttpResponsePtr &);
void sendResponse(const TcpConnectionPtr &, const HttpResponsePtr &, bool isHeadMethod);
trantor::TcpServer _server;
HttpAsyncCallback _httpAsyncCallback;
WebSocketNewAsyncCallback _newWebsocketCallback;

View File

@ -17,150 +17,193 @@
namespace drogon
{
std::string webContentTypeAndCharsetToString(ContentType contenttype, const std::string &charSet)
// std::string webContentTypeAndCharsetToString(ContentType contenttype, const std::string &charSet)
// {
// switch (contenttype)
// {
// case CT_TEXT_HTML:
// return "text/html; charset=" + charSet;
// case CT_APPLICATION_X_FORM:
// return "application/x-www-form-urlencoded";
// case CT_APPLICATION_XML:
// return "application/xml; charset=" + charSet;
// case CT_APPLICATION_JSON:
// return "application/json; charset=" + charSet;
// case CT_APPLICATION_X_JAVASCRIPT:
// return "application/x-javascript; charset=" + charSet;
// case CT_TEXT_CSS:
// return "text/css; charset=" + charSet;
// case CT_TEXT_XML:
// return "text/xml; charset=" + charSet;
// case CT_TEXT_XSL:
// return "text/xsl; charset=" + charSet;
// case CT_APPLICATION_OCTET_STREAM:
// return "application/octet-stream";
// case CT_IMAGE_SVG_XML:
// return "image/svg+xml";
// case CT_APPLICATION_X_FONT_TRUETYPE:
// return "application/x-font-truetype";
// case CT_APPLICATION_X_FONT_OPENTYPE:
// return "application/x-font-opentype";
// case CT_APPLICATION_FONT_WOFF:
// return "application/font-woff";
// case CT_APPLICATION_FONT_WOFF2:
// return "application/font-woff2";
// case CT_APPLICATION_VND_MS_FONTOBJ:
// return "application/vnd.ms-fontobject";
// case CT_IMAGE_PNG:
// return "image/png";
// case CT_IMAGE_JPG:
// return "image/jpeg";
// case CT_IMAGE_GIF:
// return "image/gif";
// case CT_IMAGE_XICON:
// return "image/x-icon";
// case CT_IMAGE_BMP:
// return "image/bmp";
// case CT_IMAGE_ICNS:
// return "image/icns";
// default:
// case CT_TEXT_PLAIN:
// return "text/plain; charset=" + charSet;
// }
// }
const string_view &webContentTypeToString(ContentType contenttype)
{
switch (contenttype)
{
case CT_TEXT_HTML:
return "text/html; charset=" + charSet;
case CT_APPLICATION_X_FORM:
return "application/x-www-form-urlencoded";
case CT_APPLICATION_XML:
return "application/xml; charset=" + charSet;
case CT_APPLICATION_JSON:
return "application/json; charset=" + charSet;
case CT_APPLICATION_X_JAVASCRIPT:
return "application/x-javascript; charset=" + charSet;
case CT_TEXT_CSS:
return "text/css; charset=" + charSet;
case CT_TEXT_XML:
return "text/xml; charset=" + charSet;
case CT_TEXT_XSL:
return "text/xsl; charset=" + charSet;
case CT_APPLICATION_OCTET_STREAM:
return "application/octet-stream";
case CT_IMAGE_SVG_XML:
return "image/svg+xml";
case CT_APPLICATION_X_FONT_TRUETYPE:
return "application/x-font-truetype";
case CT_APPLICATION_X_FONT_OPENTYPE:
return "application/x-font-opentype";
case CT_APPLICATION_FONT_WOFF:
return "application/font-woff";
case CT_APPLICATION_FONT_WOFF2:
return "application/font-woff2";
case CT_APPLICATION_VND_MS_FONTOBJ:
return "application/vnd.ms-fontobject";
case CT_IMAGE_PNG:
return "image/png";
case CT_IMAGE_JPG:
return "image/jpeg";
case CT_IMAGE_GIF:
return "image/gif";
case CT_IMAGE_XICON:
return "image/x-icon";
case CT_IMAGE_BMP:
return "image/bmp";
case CT_IMAGE_ICNS:
return "image/icns";
default:
case CT_TEXT_PLAIN:
return "text/plain; charset=" + charSet;
}
}
std::string webContentTypeToString(ContentType contenttype)
{
switch (contenttype)
{
case CT_TEXT_HTML:
return "text/html; charset=utf-8";
static string_view sv = "Content-Type: text/html; charset=utf-8\r\n";
return sv;
}
case CT_APPLICATION_X_FORM:
return "application/x-www-form-urlencoded";
{
static string_view sv = "Content-Type: application/x-www-form-urlencoded\r\n";
return sv;
}
case CT_APPLICATION_XML:
return "application/xml; charset=utf-8";
{
static string_view sv = "Content-Type: application/xml; charset=utf-8\r\n";
return sv;
}
case CT_APPLICATION_JSON:
return "application/json; charset=utf-8";
{
static string_view sv = "Content-Type: application/json; charset=utf-8\r\n";
return sv;
}
case CT_APPLICATION_X_JAVASCRIPT:
return "application/x-javascript; charset=utf-8";
{
static string_view sv = "Content-Type: application/x-javascript; charset=utf-8\r\n";
return sv;
}
case CT_TEXT_CSS:
return "text/css; charset=utf-8";
{
static string_view sv = "Content-Type: text/css; charset=utf-8\r\n";
return sv;
}
case CT_TEXT_XML:
return "text/xml; charset=utf-8";
{
static string_view sv = "Content-Type: text/xml; charset=utf-8\r\n";
return sv;
}
case CT_TEXT_XSL:
return "text/xsl; charset=utf-8";
{
static string_view sv = "Content-Type: text/xsl; charset=utf-8\r\n";
return sv;
}
case CT_APPLICATION_OCTET_STREAM:
return "application/octet-stream";
{
static string_view sv = "Content-Type: application/octet-stream\r\n";
return sv;
}
case CT_IMAGE_SVG_XML:
return "image/svg+xml";
{
static string_view sv = "Content-Type: image/svg+xml\r\n";
return sv;
}
case CT_APPLICATION_X_FONT_TRUETYPE:
return "application/x-font-truetype";
{
static string_view sv = "Content-Type: application/x-font-truetype\r\n";
return sv;
}
case CT_APPLICATION_X_FONT_OPENTYPE:
return "application/x-font-opentype";
{
static string_view sv = "Content-Type: application/x-font-opentype\r\n";
return sv;
}
case CT_APPLICATION_FONT_WOFF:
return "application/font-woff";
{
static string_view sv = "Content-Type: application/font-woff\r\n";
return sv;
}
case CT_APPLICATION_FONT_WOFF2:
return "application/font-woff2";
{
static string_view sv = "Content-Type: application/font-woff2\r\n";
return sv;
}
case CT_APPLICATION_VND_MS_FONTOBJ:
return "application/vnd.ms-fontobject";
{
static string_view sv = "Content-Type: application/vnd.ms-fontobject\r\n";
return sv;
}
case CT_IMAGE_PNG:
return "image/png";
{
static string_view sv = "Content-Type: image/png\r\n";
return sv;
}
case CT_IMAGE_JPG:
return "image/jpeg";
{
static string_view sv = "Content-Type: image/jpeg\r\n";
return sv;
}
case CT_IMAGE_GIF:
return "image/gif";
{
static string_view sv = "Content-Type: image/gif\r\n";
return sv;
}
case CT_IMAGE_XICON:
return "image/x-icon";
{
static string_view sv = "Content-Type: image/x-icon\r\n";
return sv;
}
case CT_IMAGE_BMP:
return "image/bmp";
{
static string_view sv = "Content-Type: image/bmp\r\n";
return sv;
}
case CT_IMAGE_ICNS:
return "image/icns";
{
static string_view sv = "Content-Type: image/icns\r\n";
return sv;
}
default:
case CT_TEXT_PLAIN:
return "text/plain; charset=utf-8";
{
static string_view sv = "Content-Type: text/plain; charset=utf-8\r\n";
return sv;
}
}
}
} // namespace drogon

View File

@ -15,11 +15,18 @@
#pragma once
#include <string>
#include <drogon/HttpTypes.h>
#include <drogon/config.h>
#if CXX_STD >= 17
#include <string_view>
typedef std::string_view string_view;
#else
#include <experimental/string_view>
typedef std::experimental::basic_string_view<char> string_view;
#endif
namespace drogon
{
const string_view &webContentTypeToString(ContentType contenttype);
std::string webContentTypeToString(ContentType contenttype);
std::string webContentTypeAndCharsetToString(ContentType contenttype, const std::string &charSet);
//std::string webContentTypeAndCharsetToString(ContentType contenttype, const std::string &charSet);
} // namespace drogon

View File

@ -133,6 +133,7 @@ class DbClient : public trantor::NonCopyable
/// A stream-type method for sql execution
internal::SqlBinder operator<<(const std::string &sql);
internal::SqlBinder operator<<(std::string &&sql);
/// Create a transaction object.
/**
@ -147,6 +148,7 @@ class DbClient : public trantor::NonCopyable
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) = 0;
ClientType type() const { return _type; }
const std::string &connectionInfo() { return _connInfo; }
private:
friend internal::SqlBinder;
@ -160,6 +162,7 @@ class DbClient : public trantor::NonCopyable
protected:
ClientType _type;
std::string _connInfo;
};
typedef std::shared_ptr<DbClient> DbClientPtr;

View File

@ -80,12 +80,33 @@ namespace drogon
{
namespace orm
{
/// Reference to a field in a result set.
/**
* A field represents one entry in a row. It represents an actual value
* in the result set, and can be converted to various types.
*/
class Field
{
public:
using size_type = unsigned long;
/// Column name
const char *name() const;
/// Is this field's value null?
bool isNull() const;
/// Read as plain C string
/**
* Since the field's data is stored internally in the form of a
* zero-terminated C string, this is the fastest way to read it. Use the
* to() or as() functions to convert the string to other types such as
* @c int, or to C++ strings.
*/
const char *c_str() const;
/// Convert to a type T value
template <typename T>
T as() const
{
@ -115,6 +136,15 @@ class Field
}
return value;
}
/// Parse the field as an SQL array.
/**
* Call the parser to retrieve values (and structure) from the array.
*
* Make sure the @c result object stays alive until parsing is finished. If
* you keep the @c row of @c field object alive, it will keep the @c result
* object alive as well.
*/
ArrayParser getArrayParser() const
{
return ArrayParser(_result.getValue(_row, _column));

View File

@ -195,7 +195,7 @@ inline T Mapper<T>::findByPrimaryKey(const typename Mapper<T>::TraitsPKType &key
clear();
Result r(nullptr);
{
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
outputPrimeryKeyToBinder(key, binder);
binder << Mode::Blocking;
binder >> [&r](const Result &result) {
@ -231,7 +231,7 @@ inline void Mapper<T>::findByPrimaryKey(const typename Mapper<T>::TraitsPKType &
}
sql = replaceSqlPlaceHolder(sql, "$?");
clear();
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
outputPrimeryKeyToBinder(key, binder);
binder >> [=](const Result &r) {
if (r.size() == 0)
@ -265,7 +265,7 @@ Mapper<T>::findFutureByPrimaryKey(const typename Mapper<T>::TraitsPKType &key) n
}
sql = replaceSqlPlaceHolder(sql, "$?");
clear();
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
outputPrimeryKeyToBinder(key, binder);
std::shared_ptr<std::promise<T>> prom = std::make_shared<std::promise<T>>();
@ -323,7 +323,7 @@ inline T Mapper<T>::findOne(const Criteria &criteria) noexcept(false)
clear();
Result r(nullptr);
{
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
if (criteria)
criteria.outputArgs(binder);
binder << Mode::Blocking;
@ -363,7 +363,7 @@ inline void Mapper<T>::findOne(const Criteria &criteria,
sql += " for update";
}
clear();
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
if (criteria)
criteria.outputArgs(binder);
binder >> [=](const Result &r) {
@ -400,7 +400,7 @@ inline std::future<T> Mapper<T>::findFutureOne(const Criteria &criteria) noexcep
sql += " for update";
}
clear();
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
if (criteria)
criteria.outputArgs(binder);
@ -458,7 +458,7 @@ inline std::vector<T> Mapper<T>::findBy(const Criteria &criteria) noexcept(false
clear();
Result r(nullptr);
{
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
if (criteria)
criteria.outputArgs(binder);
binder << Mode::Blocking;
@ -493,7 +493,7 @@ inline void Mapper<T>::findBy(const Criteria &criteria,
sql += " for update";
}
clear();
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
if (criteria)
criteria.outputArgs(binder);
binder >> [=](const Result &r) {
@ -523,7 +523,7 @@ inline std::future<std::vector<T>> Mapper<T>::findFutureBy(const Criteria &crite
sql += " for update";
}
clear();
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
if (criteria)
criteria.outputArgs(binder);
@ -572,7 +572,7 @@ inline size_t Mapper<T>::count(const Criteria &criteria) noexcept(false)
clear();
Result r(nullptr);
{
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
if (criteria)
criteria.outputArgs(binder);
binder << Mode::Blocking;
@ -598,7 +598,7 @@ inline void Mapper<T>::count(const Criteria &criteria,
sql = replaceSqlPlaceHolder(sql, "$?");
}
clear();
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
if (criteria)
criteria.outputArgs(binder);
binder >> [=](const Result &r) {
@ -619,7 +619,7 @@ inline std::future<size_t> Mapper<T>::countFuture(const Criteria &criteria) noex
sql = replaceSqlPlaceHolder(sql, "$?");
}
clear();
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
if (criteria)
criteria.outputArgs(binder);
@ -660,7 +660,7 @@ inline void Mapper<T>::insert(T &obj) noexcept(false)
sql = replaceSqlPlaceHolder(sql, "$?");
Result r(nullptr);
{
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
obj.outputArgs(binder);
binder << Mode::Blocking;
binder >> [&r](const Result &result) {
@ -705,7 +705,7 @@ inline void Mapper<T>::insert(const T &obj,
sql += " returning *";
}
sql = replaceSqlPlaceHolder(sql, "$?");
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
obj.outputArgs(binder);
auto client = _client;
binder >> [client, rcb, obj](const Result &r) {
@ -748,7 +748,7 @@ inline std::future<T> Mapper<T>::insertFuture(const T &obj) noexcept
sql += " returning *";
}
sql = replaceSqlPlaceHolder(sql, "$?");
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
obj.outputArgs(binder);
std::shared_ptr<std::promise<T>> prom = std::make_shared<std::promise<T>>();
@ -793,7 +793,7 @@ inline size_t Mapper<T>::update(const T &obj) noexcept(false)
sql = replaceSqlPlaceHolder(sql, "$?");
Result r(nullptr);
{
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
obj.updateArgs(binder);
outputPrimeryKeyToBinder(obj.getPrimaryKey(), binder);
binder << Mode::Blocking;
@ -824,7 +824,7 @@ inline void Mapper<T>::update(const T &obj,
makePrimaryKeyCriteria(sql);
sql = replaceSqlPlaceHolder(sql, "$?");
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
obj.updateArgs(binder);
outputPrimeryKeyToBinder(obj.getPrimaryKey(), binder);
binder >> [=](const Result &r) {
@ -850,7 +850,7 @@ inline std::future<size_t> Mapper<T>::updateFuture(const T &obj) noexcept
makePrimaryKeyCriteria(sql);
sql = replaceSqlPlaceHolder(sql, "$?");
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
obj.updateArgs(binder);
outputPrimeryKeyToBinder(obj.getPrimaryKey(), binder);
@ -880,7 +880,7 @@ inline size_t Mapper<T>::deleteOne(const T &obj) noexcept(false)
sql = replaceSqlPlaceHolder(sql, "$?");
Result r(nullptr);
{
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
outputPrimeryKeyToBinder(obj.getPrimaryKey(), binder);
binder << Mode::Blocking;
binder >> [&r](const Result &result) {
@ -904,7 +904,7 @@ inline void Mapper<T>::deleteOne(const T &obj,
makePrimaryKeyCriteria(sql);
sql = replaceSqlPlaceHolder(sql, "$?");
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
outputPrimeryKeyToBinder(obj.getPrimaryKey(), binder);
binder >> [=](const Result &r) {
rcb(r.affectedRows());
@ -923,7 +923,7 @@ inline std::future<size_t> Mapper<T>::deleteFutureOne(const T &obj) noexcept
makePrimaryKeyCriteria(sql);
sql = replaceSqlPlaceHolder(sql, "$?");
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
outputPrimeryKeyToBinder(obj.getPrimaryKey(), binder);
std::shared_ptr<std::promise<size_t>> prom = std::make_shared<std::promise<size_t>>();
@ -954,7 +954,7 @@ inline size_t Mapper<T>::deleteBy(const Criteria &criteria) noexcept(false)
Result r(nullptr);
{
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
if (criteria)
{
criteria.outputArgs(binder);
@ -984,7 +984,7 @@ inline void Mapper<T>::deleteBy(const Criteria &criteria,
sql = replaceSqlPlaceHolder(sql, "$?");
}
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
if (criteria)
{
criteria.outputArgs(binder);
@ -1007,7 +1007,7 @@ inline std::future<size_t> Mapper<T>::deleteFutureBy(const Criteria &criteria) n
sql += criteria.criteriaString();
sql = replaceSqlPlaceHolder(sql, "$?");
}
auto binder = *_client << sql;
auto binder = *_client << std::move(sql);
if (criteria)
{
criteria.outputArgs(binder);

View File

@ -37,6 +37,21 @@ enum class SqlStatus
Ok,
End
};
/// Result set containing data returned by a query or command.
/** This behaves as a container (as defined by the C++ standard library) and
* provides random access const iterators to iterate over its rows. A row
* can also be accessed by indexing a result R by the row's zero-based
* number:
*
* @code
* for (result::size_type i=0; i < R.size(); ++i) Process(R[i]);
* @endcode
*
* Result sets in libpqxx are lightweight, reference-counted wrapper objects
* which are relatively small and cheap to copy. Think of a result object as
* a "smart pointer" to an underlying result set.
*/
class Result
{
public:
@ -73,15 +88,24 @@ class Result
reference operator[](size_type index) const;
reference at(size_type index) const;
void swap(Result &) noexcept;
/// Number of columns in result.
row_size_type columns() const noexcept;
/// Name of column with this number (throws exception if it doesn't exist)
const char *columnName(row_size_type number) const;
/// If command was @c INSERT, @c UPDATE, or @c DELETE: number of affected rows
/**
* @return Number of affected rows if last command was @c INSERT, @c UPDATE,
* or @c DELETE; zero for all other commands.
*/
size_type affectedRows() const noexcept;
/// For Mysql, Sqlite3 database
unsigned long long insertId() const noexcept;
/// Query that produced this result, if available (empty string otherwise)
const std::string &sql() const noexcept;
private:

View File

@ -28,6 +28,19 @@ class Field;
class ConstRowIterator;
class ConstReverseRowIterator;
/// Reference to one row in a result.
/**
* A row represents one row (also called a row) in a query result set.
* It also acts as a container mapping column numbers or names to field
* values (see below):
*
* @code
* cout << row["date"].as<std::string>() << ": " << row["name"].as<std::string>() << endl;
* @endcode
*
* The row itself acts like a (non-modifyable) container, complete with its
* own const_iterator and const_reverse_iterator.
*/
class Row
{
public:

View File

@ -227,6 +227,9 @@ class SqlBinder
SqlBinder(const std::string &sql, DbClient &client, ClientType type) : _sql(sql), _client(client), _type(type)
{
}
SqlBinder(std::string &&sql, DbClient &client, ClientType type) : _sql(std::move(sql)), _client(client), _type(type)
{
}
~SqlBinder();
template <typename CallbackType,
typename traits = FunctionTraits<CallbackType>>

View File

@ -22,6 +22,11 @@ internal::SqlBinder DbClient::operator<<(const std::string &sql)
return internal::SqlBinder(sql, *this, _type);
}
internal::SqlBinder DbClient::operator<<(std::string &&sql)
{
return internal::SqlBinder(std::move(sql), *this, _type);
}
#if USE_POSTGRESQL
std::shared_ptr<DbClient> DbClient::newPgClient(const std::string &connInfo, const size_t connNum)
{

View File

@ -42,11 +42,11 @@
using namespace drogon::orm;
DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, ClientType type)
: _connInfo(connInfo),
_connectNum(connNum),
_loops(type == ClientType::Sqlite3 ? 1 : (connNum / 100 > 0 ? connNum / 100 : 1), "DbLoop")
: _connectNum(connNum),
_loops(type == ClientType::Sqlite3 ? 1 : (connNum < std::thread::hardware_concurrency() ? connNum : std::thread::hardware_concurrency()), "DbLoop")
{
_type = type;
_connInfo = connInfo;
LOG_TRACE << "type=" << (int)type;
//LOG_DEBUG << _loops.getLoopNum();
assert(connNum > 0);
@ -71,7 +71,7 @@ DbClientImpl::DbClientImpl(const std::string &connInfo, const size_t connNum, Cl
for (size_t i = 0; i < _connectNum; i++)
{
auto loop = _loops.getNextLoop();
loop->runAfter(0.01 * (i + 1), [this, loop]() {
loop->runAfter(0.1 * (i + 1), [this, loop]() {
std::lock_guard<std::mutex> lock(_connectionsMutex);
_connections.insert(newConnection(loop));
});
@ -129,15 +129,7 @@ void DbClientImpl::execSql(const DbConnectionPtr &conn,
}
std::weak_ptr<DbConnection> weakConn = conn;
conn->execSql(std::move(sql), paraNum, std::move(parameters), std::move(length), std::move(format),
std::move(rcb), std::move(exceptCallback),
[=]() -> void {
{
auto connPtr = weakConn.lock();
if (!connPtr)
return;
handleNewTask(connPtr);
}
});
std::move(rcb), std::move(exceptCallback));
}
void DbClientImpl::execSql(std::string &&sql,
size_t paraNum,
@ -205,14 +197,13 @@ void DbClientImpl::execSql(std::string &&sql,
return;
}
//LOG_TRACE << "Push query to buffer";
std::shared_ptr<SqlCmd> cmd = std::make_shared<SqlCmd>();
cmd->_sql = std::move(sql);
cmd->_paraNum = paraNum;
cmd->_parameters = std::move(parameters);
cmd->_length = std::move(length);
cmd->_format = std::move(format);
cmd->_cb = std::move(rcb);
cmd->_exceptCb = std::move(exceptCallback);
std::shared_ptr<SqlCmd> cmd = std::make_shared<SqlCmd>(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback));
{
std::lock_guard<std::mutex> guard(_bufferMutex);
_sqlCmdBuffer.push_back(std::move(cmd));
@ -234,22 +225,40 @@ std::shared_ptr<Transaction> DbClientImpl::newTransaction(const std::function<vo
conn = *iter;
_readyConnections.erase(iter);
}
auto trans = std::shared_ptr<TransactionImpl>(new TransactionImpl(_type, conn, commitCallback, [=]() {
std::weak_ptr<DbClientImpl> weakThis = shared_from_this();
auto trans = std::shared_ptr<TransactionImpl>(new TransactionImpl(_type, conn, commitCallback, [weakThis, conn]() {
auto thisPtr = weakThis.lock();
if (!thisPtr)
return;
if (conn->status() == ConnectStatus_Bad)
{
return;
}
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
if (_connections.find(conn) == _connections.end() &&
_busyConnections.find(conn) == _busyConnections.find(conn))
std::lock_guard<std::mutex> guard(thisPtr->_connectionsMutex);
if (thisPtr->_connections.find(conn) == thisPtr->_connections.end() &&
thisPtr->_busyConnections.find(conn) == thisPtr->_busyConnections.find(conn))
{
//connection is broken and removed
return;
}
}
handleNewTask(conn);
conn->loop()->queueInLoop([weakThis, conn]() {
auto thisPtr = weakThis.lock();
if(!thisPtr)
return;
std::weak_ptr<DbConnection> weakConn = conn;
conn->setIdleCallback([weakThis, weakConn]() {
auto thisPtr = weakThis.lock();
if (!thisPtr)
return;
auto connPtr = weakConn.lock();
if (!connPtr)
return;
thisPtr->handleNewTask(connPtr);
});
thisPtr->handleNewTask(conn);
});
}));
trans->doBegin();
return trans;
@ -257,36 +266,37 @@ std::shared_ptr<Transaction> DbClientImpl::newTransaction(const std::function<vo
void DbClientImpl::handleNewTask(const DbConnectionPtr &connPtr)
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
if (_transWaitNum > 0)
{
//Prioritize the needs of the transaction
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
_condConnectionReady.notify_one();
}
else
{
//Then check if there are some sql queries in the buffer
std::lock_guard<std::mutex> guard(_connectionsMutex);
if (_transWaitNum > 0)
{
std::lock_guard<std::mutex> guard(_bufferMutex);
if (!_sqlCmdBuffer.empty())
{
_busyConnections.insert(connPtr); //For new connections, this sentence is necessary
auto &cmd = _sqlCmdBuffer.front();
connPtr->loop()->queueInLoop([connPtr, cmd, this]() {
execSql(connPtr, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb));
});
_sqlCmdBuffer.pop_front();
return;
}
}
//Idle connection
connPtr->loop()->queueInLoop([connPtr, this]() {
std::lock_guard<std::mutex> guard(_connectionsMutex);
//Prioritize the needs of the transaction
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
});
_condConnectionReady.notify_one();
return;
}
}
//Then check if there are some sql queries in the buffer
std::shared_ptr<SqlCmd> cmd;
{
std::lock_guard<std::mutex> guard(_bufferMutex);
if (!_sqlCmdBuffer.empty())
{
cmd = std::move(_sqlCmdBuffer.front());
_sqlCmdBuffer.pop_front();
}
}
if (cmd)
{
execSql(connPtr, std::move(cmd->_sql), cmd->_paraNum, std::move(cmd->_parameters), std::move(cmd->_length), std::move(cmd->_format), std::move(cmd->_cb), std::move(cmd->_exceptCb));
return;
}
//Connection is idle, put it into the _readyConnections set;
{
std::lock_guard<std::mutex> guard(_connectionsMutex);
_busyConnections.erase(connPtr);
_readyConnections.insert(connPtr);
}
}
@ -350,8 +360,22 @@ DbConnectionPtr DbClientImpl::newConnection(trantor::EventLoop *loop)
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
{
std::lock_guard<std::mutex> guard(thisPtr->_connectionsMutex);
thisPtr->_busyConnections.insert(okConnPtr); //For new connections, this sentence is necessary
}
thisPtr->handleNewTask(okConnPtr);
});
std::weak_ptr<DbConnection> weakConn = connPtr;
connPtr->setIdleCallback([weakPtr, weakConn]() {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
auto connPtr = weakConn.lock();
if (!connPtr)
return;
thisPtr->handleNewTask(connPtr);
});
//std::cout<<"newConn end"<<connPtr<<std::endl;
return connPtr;
}

View File

@ -45,7 +45,6 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClie
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) override;
private:
std::string _connInfo;
size_t _connectNum;
trantor::EventLoopThreadPool _loops;
std::shared_ptr<SharedMutex> _sharedMutexPtr;
@ -78,6 +77,22 @@ class DbClientImpl : public DbClient, public std::enable_shared_from_this<DbClie
std::vector<int> _format;
QueryCallback _cb;
ExceptPtrCallback _exceptCb;
SqlCmd(std::string &&sql,
const size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> format,
QueryCallback &&cb,
ExceptPtrCallback &&exceptCb)
: _sql(std::move(sql)),
_paraNum(paraNum),
_parameters(std::move(parameters)),
_length(std::move(length)),
_format(std::move(format)),
_cb(std::move(cb)),
_exceptCb(std::move(exceptCb))
{
}
};
std::deque<std::shared_ptr<SqlCmd>> _sqlCmdBuffer;
std::mutex _bufferMutex;

View File

@ -0,0 +1,221 @@
/**
*
* DbClientLockFree.cc
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* https://github.com/an-tao/drogon
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
* Drogon
*
*/
#include "DbClientLockFree.h"
#include "DbConnection.h"
#if USE_POSTGRESQL
#include "postgresql_impl/PgConnection.h"
#endif
#if USE_MYSQL
#include "mysql_impl/MysqlConnection.h"
#endif
#include "TransactionImpl.h"
#include <trantor/net/EventLoop.h>
#include <trantor/net/inner/Channel.h>
#include <drogon/drogon.h>
#include <drogon/orm/Exception.h>
#include <drogon/orm/DbClient.h>
#include <sys/select.h>
#include <iostream>
#include <thread>
#include <vector>
#include <unordered_set>
#include <memory>
#include <stdio.h>
#include <unistd.h>
#include <sstream>
using namespace drogon::orm;
DbClientLockFree::DbClientLockFree(const std::string &connInfo, trantor::EventLoop *loop, ClientType type)
: _connInfo(connInfo),
_loop(loop)
{
_type = type;
LOG_TRACE << "type=" << (int)type;
if (type == ClientType::PostgreSQL)
{
_loop->runInLoop([this]() {
_connectionHolder = newConnection();
});
}
else if (type == ClientType::Mysql)
{
_loop->runInLoop([this]() {
_connectionHolder = newConnection();
});
}
else
{
LOG_ERROR << "No supported database type!";
}
}
DbClientLockFree::~DbClientLockFree() noexcept
{
if (_connection)
{
_connection->disconnect();
}
}
void DbClientLockFree::execSql(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
assert(paraNum == parameters.size());
assert(paraNum == length.size());
assert(paraNum == format.size());
assert(rcb);
_loop->assertInLoopThread();
if (!_connection)
{
try
{
throw BrokenConnection("No connection to database server");
}
catch (...)
{
exceptCallback(std::current_exception());
}
return;
}
else
{
if (!_connection->isWorking())
{
_connection->execSql(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback));
return;
}
}
if (_sqlCmdBuffer.size() > 20000)
{
//too many queries in buffer;
try
{
throw Failure("Too many queries in buffer");
}
catch (...)
{
exceptCallback(std::current_exception());
}
return;
}
//LOG_TRACE << "Push query to buffer";
_sqlCmdBuffer.emplace_back(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback));
}
std::shared_ptr<Transaction> DbClientLockFree::newTransaction(const std::function<void(bool)> &commitCallback)
{
// Don't support transaction;
assert(0);
return nullptr;
}
void DbClientLockFree::handleNewTask()
{
assert(_connection);
assert(!_connection->isWorking());
if (!_sqlCmdBuffer.empty())
{
auto &cmd = _sqlCmdBuffer.front();
_connection->execSql(std::move(cmd._sql),
cmd._paraNum,
std::move(cmd._parameters),
std::move(cmd._length),
std::move(cmd._format),
std::move(cmd._cb),
std::move(cmd._exceptCb));
_sqlCmdBuffer.pop_front();
return;
}
}
DbConnectionPtr DbClientLockFree::newConnection()
{
DbConnectionPtr connPtr;
if (_type == ClientType::PostgreSQL)
{
#if USE_POSTGRESQL
connPtr = std::make_shared<PgConnection>(_loop, _connInfo);
#else
return nullptr;
#endif
}
else if (_type == ClientType::Mysql)
{
#if USE_MYSQL
connPtr = std::make_shared<MysqlConnection>(_loop, _connInfo);
#else
return nullptr;
#endif
}
else
{
return nullptr;
}
std::weak_ptr<DbClientLockFree> weakPtr = shared_from_this();
connPtr->setCloseCallback([weakPtr](const DbConnectionPtr &closeConnPtr) {
//Erase the connection
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
assert(thisPtr->_connection);
thisPtr->_connection.reset();
//Reconnect after 1 second
thisPtr->_loop->runAfter(1, [weakPtr] {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->_connectionHolder = thisPtr->newConnection();
});
});
connPtr->setOkCallback([weakPtr](const DbConnectionPtr &okConnPtr) {
LOG_TRACE << "connected!";
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->_connection = okConnPtr;
thisPtr->handleNewTask();
});
connPtr->setIdleCallback([weakPtr]() {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->handleNewTask();
});
//std::cout<<"newConn end"<<connPtr<<std::endl;
return connPtr;
}

View File

@ -0,0 +1,85 @@
/**
*
* DbClientLockFree.h
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* https://github.com/an-tao/drogon
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
* Drogon
*
*/
#pragma once
#include "DbConnection.h"
#include <drogon/HttpTypes.h>
#include <drogon/orm/DbClient.h>
#include <trantor/net/EventLoopThreadPool.h>
#include <memory>
#include <thread>
#include <functional>
#include <string>
#include <unordered_set>
#include <list>
namespace drogon
{
namespace orm
{
class DbClientLockFree : public DbClient, public std::enable_shared_from_this<DbClientLockFree>
{
public:
DbClientLockFree(const std::string &connInfo, trantor::EventLoop *loop, ClientType type);
virtual ~DbClientLockFree() noexcept;
virtual void execSql(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &commitCallback = std::function<void(bool)>()) override;
private:
std::string _connInfo;
trantor::EventLoop *_loop;
DbConnectionPtr newConnection();
DbConnectionPtr _connection;
DbConnectionPtr _connectionHolder;
struct SqlCmd
{
std::string _sql;
size_t _paraNum;
std::vector<const char *> _parameters;
std::vector<int> _length;
std::vector<int> _format;
QueryCallback _cb;
ExceptPtrCallback _exceptCb;
SqlCmd(std::string &&sql,
const size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> format,
QueryCallback &&cb,
ExceptPtrCallback &&exceptCb)
: _sql(std::move(sql)),
_paraNum(paraNum),
_parameters(std::move(parameters)),
_length(std::move(length)),
_format(std::move(format)),
_cb(std::move(cb)),
_exceptCb(std::move(exceptCb))
{
}
};
std::deque<SqlCmd> _sqlCmdBuffer;
void handleNewTask();
};
} // namespace orm
} // namespace drogon

View File

@ -59,14 +59,17 @@ class DbConnection : public trantor::NonCopyable
{
_closeCb = cb;
}
void setIdleCallback(const std::function<void()> &cb)
{
_idleCb = cb;
}
virtual void execSql(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) = 0;
std::function<void(const std::exception_ptr &)> &&exceptCallback) = 0;
virtual ~DbConnection()
{
LOG_TRACE << "Destruct DbConn" << this;
@ -74,6 +77,7 @@ class DbConnection : public trantor::NonCopyable
ConnectStatus status() const { return _status; }
trantor::EventLoop *loop() { return _loop; }
virtual void disconnect() = 0;
bool isWorking() { return _isWorking; }
protected:
QueryCallback _cb;

View File

@ -60,6 +60,11 @@ std::vector<char> Field::as<std::vector<char>>() const
char *last = first + _result.getLength(_row, _column);
return std::vector<char>(first, last);
}
const char *Field::c_str() const
{
return as<const char *>();
}
// template <>
// std::vector<short> Field::as<std::vector<short>>() const
// {

View File

@ -27,10 +27,9 @@ void SqlBinder::exec()
{
//nonblocking mode,default mode
//Retain shared_ptrs of parameters until we get the result;
std::shared_ptr<decltype(_objs)> objs = std::make_shared<decltype(_objs)>(std::move(_objs));
_client.execSql(std::move(_sql), _paraNum, std::move(_parameters), std::move(_length), std::move(_format),
[holder = std::move(_callbackHolder), objs](const Result &r) {
objs->clear();
[holder = std::move(_callbackHolder), objs = std::move(_objs)](const Result &r) mutable {
objs.clear();
if (holder)
{
holder->execCallback(r);

View File

@ -35,6 +35,10 @@ TransactionImpl::~TransactionImpl()
{
auto loop = _connectionPtr->loop();
loop->queueInLoop([conn = _connectionPtr, ucb = std::move(_usedUpCallback), commitCb = std::move(_commitCallback)]() {
conn->setIdleCallback([ucb = std::move(ucb)]() {
if (ucb)
ucb();
});
conn->execSql("commit",
0,
std::vector<const char *>(),
@ -60,73 +64,65 @@ TransactionImpl::~TransactionImpl()
commitCb(false);
}
}
},
[ucb]() {
if (ucb)
{
ucb();
}
});
});
}
}
void TransactionImpl::execSql(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback)
void TransactionImpl::execSqlInLoop(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
auto thisPtr = shared_from_this();
_loop->queueInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback)]() mutable {
if (!thisPtr->_isCommitedOrRolledback)
_loop->assertInLoopThread();
if (!_isCommitedOrRolledback)
{
auto thisPtr = shared_from_this();
if (!_isWorking)
{
if (!thisPtr->_isWorking)
{
thisPtr->_isWorking = true;
thisPtr->_connectionPtr->execSql(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
[exceptCallback, thisPtr](const std::exception_ptr &ePtr) {
thisPtr->rollback();
if (exceptCallback)
exceptCallback(ePtr);
},
[thisPtr]() {
thisPtr->execNewTask();
});
}
else
{
//push sql cmd to buffer;
SqlCmd cmd;
cmd._sql = std::move(sql);
cmd._paraNum = paraNum;
cmd._parameters = std::move(parameters);
cmd._length = std::move(length);
cmd._format = std::move(format);
cmd._cb = std::move(rcb);
cmd._exceptCb = std::move(exceptCallback);
thisPtr->_sqlCmdBuffer.push_back(std::move(cmd));
}
_isWorking = true;
_thisPtr = thisPtr;
_connectionPtr->execSql(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
[exceptCallback, thisPtr](const std::exception_ptr &ePtr) {
thisPtr->rollback();
if (exceptCallback)
exceptCallback(ePtr);
});
}
else
{
//The transaction has been rolled back;
try
{
throw TransactionRollback("The transaction has been rolled back");
}
catch (...)
{
exceptCallback(std::current_exception());
}
//push sql cmd to buffer;
SqlCmd cmd;
cmd._sql = std::move(sql);
cmd._paraNum = paraNum;
cmd._parameters = std::move(parameters);
cmd._length = std::move(length);
cmd._format = std::move(format);
cmd._cb = std::move(rcb);
cmd._exceptCb = std::move(exceptCallback);
cmd._thisPtr = thisPtr;
thisPtr->_sqlCmdBuffer.push_back(std::move(cmd));
}
});
}
else
{
//The transaction has been rolled back;
try
{
throw TransactionRollback("The transaction has been rolled back");
}
catch (...)
{
exceptCallback(std::current_exception());
}
}
}
void TransactionImpl::rollback()
@ -136,83 +132,82 @@ void TransactionImpl::rollback()
_loop->runInLoop([thisPtr]() {
if (thisPtr->_isCommitedOrRolledback)
return;
auto clearupCb = [thisPtr]() {
thisPtr->_isCommitedOrRolledback = true;
if (thisPtr->_usedUpCallback)
{
thisPtr->_usedUpCallback();
thisPtr->_usedUpCallback = decltype(thisPtr->_usedUpCallback)();
}
};
if (thisPtr->_isWorking)
{
//push sql cmd to buffer;
SqlCmd cmd;
cmd._sql = "rollback";
cmd._paraNum = 0;
cmd._cb = [clearupCb](const Result &r) {
LOG_TRACE << "Transaction roll back!";
clearupCb();
cmd._cb = [thisPtr](const Result &r) {
LOG_DEBUG << "Transaction roll back!";
thisPtr->_isCommitedOrRolledback = true;
};
cmd._exceptCb = [clearupCb](const std::exception_ptr &ePtr) {
clearupCb();
cmd._exceptCb = [thisPtr](const std::exception_ptr &ePtr) {
//clearupCb();
thisPtr->_isCommitedOrRolledback = true;
LOG_ERROR << "Transaction rool back error";
};
cmd._isRollbackCmd = true;
//Rollback cmd should be executed firstly, so we push it in front of the list
thisPtr->_sqlCmdBuffer.push_front(std::move(cmd));
return;
}
thisPtr->_isWorking = true;
thisPtr
->_connectionPtr
->execSql("rollback",
0,
std::vector<const char *>(),
std::vector<int>(),
std::vector<int>(),
[clearupCb](const Result &r) {
LOG_TRACE << "Transaction roll back!";
clearupCb();
},
[clearupCb](const std::exception_ptr &ePtr) {
clearupCb();
},
[thisPtr]() {
thisPtr->execNewTask();
});
thisPtr->_thisPtr = thisPtr;
thisPtr->_connectionPtr->execSql("rollback",
0,
std::vector<const char *>(),
std::vector<int>(),
std::vector<int>(),
[thisPtr](const Result &r) {
LOG_TRACE << "Transaction roll back!";
thisPtr->_isCommitedOrRolledback = true;
//clearupCb();
},
[thisPtr](const std::exception_ptr &ePtr) {
//clearupCb();
LOG_ERROR << "Transaction rool back error";
thisPtr->_isCommitedOrRolledback = true;
});
});
}
void TransactionImpl::execNewTask()
{
_loop->assertInLoopThread();
_thisPtr.reset();
assert(_isWorking);
if (!_isCommitedOrRolledback)
{
auto thisPtr = shared_from_this();
if (!_sqlCmdBuffer.empty())
{
auto cmd = _sqlCmdBuffer.front();
auto cmd = std::move(_sqlCmdBuffer.front());
_sqlCmdBuffer.pop_front();
auto conn = _connectionPtr;
_loop->queueInLoop([=]() mutable {
conn->execSql(std::move(cmd._sql),
cmd._paraNum,
std::move(cmd._parameters),
std::move(cmd._length),
std::move(cmd._format),
std::move(cmd._cb),
[cmd, thisPtr](const std::exception_ptr &ePtr) {
conn->execSql(std::move(cmd._sql),
cmd._paraNum,
std::move(cmd._parameters),
std::move(cmd._length),
std::move(cmd._format),
[callback = std::move(cmd._cb), cmd, thisPtr](const Result &r) {
if (cmd._isRollbackCmd)
{
thisPtr->_isCommitedOrRolledback = true;
}
if (callback)
callback(r);
},
[cmd, thisPtr](const std::exception_ptr &ePtr) {
if (!cmd._isRollbackCmd)
thisPtr->rollback();
if (cmd._exceptCb)
cmd._exceptCb(ePtr);
},
[thisPtr]() {
thisPtr->execNewTask();
});
});
else
{
thisPtr->_isCommitedOrRolledback = true;
}
if (cmd._exceptCb)
cmd._exceptCb(ePtr);
});
return;
}
_isWorking = false;
@ -238,16 +233,27 @@ void TransactionImpl::execNewTask()
}
_sqlCmdBuffer.clear();
}
if (_usedUpCallback)
{
_usedUpCallback();
}
}
}
void TransactionImpl::doBegin()
{
auto thisPtr = shared_from_this();
_loop->queueInLoop([thisPtr]() {
_loop->queueInLoop([thisPtr = shared_from_this()]() {
std::weak_ptr<TransactionImpl> weakPtr = thisPtr;
thisPtr->_connectionPtr->setIdleCallback([weakPtr]() {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->execNewTask();
});
assert(!thisPtr->_isWorking);
assert(!thisPtr->_isCommitedOrRolledback);
thisPtr->_isWorking = true;
thisPtr->_thisPtr = thisPtr;
thisPtr->_connectionPtr->execSql("begin",
0,
std::vector<const char *>(),
@ -258,14 +264,6 @@ void TransactionImpl::doBegin()
},
[thisPtr](const std::exception_ptr &ePtr) {
thisPtr->_isCommitedOrRolledback = true;
if (thisPtr->_usedUpCallback)
{
thisPtr->_usedUpCallback();
}
},
[thisPtr]() {
thisPtr->execNewTask();
});
});
}

View File

@ -38,8 +38,47 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)>&) override
std::function<void(const std::exception_ptr &)> &&exceptCallback) override
{
if (_loop->isInLoopThread())
{
execSqlInLoop(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback));
}
else
{
_loop->queueInLoop([thisPtr = shared_from_this(),
sql = std::move(sql),
paraNum,
parameters = std::move(parameters),
length = std::move(length),
format = std::move(format),
rcb = std::move(rcb),
exceptCallback = std::move(exceptCallback)]() mutable {
thisPtr->execSqlInLoop(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback));
});
}
}
void execSqlInLoop(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback);
virtual std::shared_ptr<Transaction> newTransaction(const std::function<void(bool)> &) override
{
return shared_from_this();
}
@ -57,6 +96,8 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
std::vector<int> _format;
QueryCallback _cb;
ExceptPtrCallback _exceptCb;
bool _isRollbackCmd = false;
std::shared_ptr<TransactionImpl> _thisPtr;
};
std::list<SqlCmd> _sqlCmdBuffer;
// std::mutex _bufferMutex;
@ -64,6 +105,7 @@ class TransactionImpl : public Transaction, public std::enable_shared_from_this<
void doBegin();
trantor::EventLoop *_loop;
std::function<void(bool)> _commitCallback;
std::shared_ptr<TransactionImpl> _thisPtr;
};
} // namespace orm
} // namespace drogon

View File

@ -72,6 +72,7 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop, const std::string &co
}
else if (key == "dbname")
{
//LOG_DEBUG << "database:[" << value << "]";
dbname = value;
}
else if (key == "port")
@ -95,7 +96,7 @@ MysqlConnection::MysqlConnection(trantor::EventLoop *loop, const std::string &co
port.empty() ? 3306 : atol(port.c_str()),
NULL,
0);
//LOG_DEBUG << ret;
auto fd = mysql_get_socket(_mysqlPtr.get());
_channelPtr = std::unique_ptr<trantor::Channel>(new trantor::Channel(loop, fd));
_channelPtr->setCloseCallback([=]() {
@ -204,6 +205,8 @@ void MysqlConnection::handleEvent()
if (revents & POLLPRI)
status |= MYSQL_WAIT_EXCEPT;
status = (status & _waitStatus);
if (status == 0)
return;
MYSQL *ret;
if (_status == ConnectStatus_Connecting)
{
@ -232,7 +235,7 @@ void MysqlConnection::handleEvent()
{
case ExecStatus_RealQuery:
{
int err;
int err = 0;
_waitStatus = mysql_real_query_cont(&err, _mysqlPtr.get(), status);
LOG_TRACE << "real_query:" << _waitStatus;
if (_waitStatus == 0)
@ -240,6 +243,7 @@ void MysqlConnection::handleEvent()
if (err)
{
_execStatus = ExecStatus_None;
LOG_ERROR << "error:" << err << " status:" << status;
outputError();
return;
}
@ -252,6 +256,7 @@ void MysqlConnection::handleEvent()
_execStatus = ExecStatus_None;
if (err)
{
LOG_ERROR << "error";
outputError();
return;
}
@ -271,6 +276,7 @@ void MysqlConnection::handleEvent()
if (!ret)
{
_execStatus = ExecStatus_None;
LOG_ERROR << "error";
outputError();
return;
}
@ -298,20 +304,17 @@ void MysqlConnection::execSql(std::string &&sql,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb)
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
LOG_TRACE << sql;
assert(paraNum == parameters.size());
assert(paraNum == length.size());
assert(paraNum == format.size());
assert(rcb);
assert(idleCb);
assert(!_isWorking);
assert(!sql.empty());
_cb = std::move(rcb);
_idleCb = std::move(idleCb);
_isWorking = true;
_exceptCb = std::move(exceptCallback);
_sql.clear();
@ -384,6 +387,7 @@ void MysqlConnection::execSql(std::string &&sql,
{
if (err)
{
LOG_ERROR << "error";
outputError();
return;
}
@ -397,6 +401,7 @@ void MysqlConnection::execSql(std::string &&sql,
_execStatus = ExecStatus_None;
if (!ret)
{
LOG_ERROR << "error";
outputError();
return;
}
@ -426,16 +431,12 @@ void MysqlConnection::outputError()
catch (...)
{
_exceptCb(std::current_exception());
_exceptCb = decltype(_exceptCb)();
_exceptCb = nullptr;
}
_cb = decltype(_cb)();
_cb = nullptr;
_isWorking = false;
if (_idleCb)
{
_idleCb();
_idleCb = decltype(_idleCb)();
}
_idleCb();
}
}
@ -448,10 +449,9 @@ void MysqlConnection::getResult(MYSQL_RES *res)
if (_isWorking)
{
_cb(Result);
_cb = decltype(_cb)();
_exceptCb = decltype(_exceptCb)();
_cb = nullptr;
_exceptCb = nullptr;
_isWorking = false;
_idleCb();
_idleCb = decltype(_idleCb)();
}
}

View File

@ -43,8 +43,7 @@ class MysqlConnection : public DbConnection, public std::enable_shared_from_this
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override;
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
virtual void disconnect() override;
private:

View File

@ -73,12 +73,19 @@ int main()
// str.resize(filesize);
// pbuf->sgetn(&str[0], filesize);
// *clientPtr << "update users set file=? where id=?" << str << 1000 << Mode::Blocking >> [](const Result &r) {
// std::cout << "update " << r.affectedRows() << " rows" << std::endl;
// } >> [](const DrogonDbException &e) {
// std::cerr << e.base().what() << std::endl;
// };
{
auto trans = clientPtr->newTransaction();
*trans << "update users set file=? where id != ?"
<< "hehaha" << 1000 >>
[](const Result &r) {
std::cout << "hahaha update " << r.affectedRows() << " rows" << std::endl;
//trans->rollback();
} >>
[](const DrogonDbException &e) {
std::cerr << e.base().what() << std::endl;
};
}
LOG_DEBUG << "out of transaction block";
*clientPtr << "select * from users where id=1000" >> [](const Result &r) {
std::cout << "file:" << r[0]["file"].as<std::string>() << std::endl;
} >> [](const DrogonDbException &e) {

View File

@ -16,6 +16,8 @@
#include "PostgreSQLResultImpl.h"
#include <trantor/utils/Logger.h>
#include <drogon/orm/Exception.h>
#include <drogon/utils/Utilities.h>
#include <memory>
#include <stdio.h>
using namespace drogon::orm;
@ -147,44 +149,162 @@ void PgConnection::pgPoll()
}
}
void PgConnection::execSql(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb)
void PgConnection::execSqlInLoop(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
LOG_TRACE << sql;
_loop->assertInLoopThread();
assert(paraNum == parameters.size());
assert(paraNum == length.size());
assert(paraNum == format.size());
assert(rcb);
assert(idleCb);
assert(!_isWorking);
assert(!sql.empty());
_sql = std::move(sql);
_cb = std::move(rcb);
_idleCb = std::move(idleCb);
_isWorking = true;
_exceptCb = std::move(exceptCallback);
auto thisPtr = shared_from_this();
_loop->runInLoop([thisPtr, paraNum = std::move(paraNum), parameters = std::move(parameters), length = std::move(length), format = std::move(format)]() {
if (PQsendQueryParams(
thisPtr->_connPtr.get(),
thisPtr->_sql.c_str(),
paraNum,
NULL,
parameters.data(),
length.data(),
format.data(),
0) == 0)
if (paraNum == 0)
{
_isRreparingStatement = false;
if (PQsendQueryParams(_connPtr.get(),
_sql.c_str(),
paraNum,
nullptr,
parameters.data(),
length.data(),
format.data(),
0) == 0)
{
LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get());
LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get());
if (_isWorking)
{
_isWorking = false;
_isRreparingStatement = false;
try
{
throw Failure(PQerrorMessage(_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
_exceptCb(exceptPtr);
_exceptCb = nullptr;
}
_cb = nullptr;
_idleCb();
}
return;
}
thisPtr->pgPoll();
});
}
else
{
auto iter = _preparedStatementMap.find(_sql);
if (iter != _preparedStatementMap.end())
{
_isRreparingStatement = false;
if (PQsendQueryPrepared(_connPtr.get(),
iter->second.c_str(),
paraNum,
parameters.data(),
length.data(),
format.data(),
0) == 0)
{
LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get());
if (_isWorking)
{
_isWorking = false;
_isRreparingStatement = false;
try
{
throw Failure(PQerrorMessage(_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
_exceptCb(exceptPtr);
_exceptCb = nullptr;
}
_cb = nullptr;
_idleCb();
}
return;
}
}
else
{
_isRreparingStatement = true;
auto statementName = getuuid();
if (PQsendPrepare(_connPtr.get(), statementName.c_str(), _sql.c_str(), paraNum, NULL) == 0)
{
LOG_ERROR << "send query error: " << PQerrorMessage(_connPtr.get());
if (_isWorking)
{
_isWorking = false;
try
{
throw Failure(PQerrorMessage(_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
_exceptCb(exceptPtr);
_exceptCb = nullptr;
}
_cb = nullptr;
_idleCb();
}
return;
}
std::weak_ptr<PgConnection> weakPtr = shared_from_this();
_preparingCallback = [weakPtr,
statementName,
paraNum,
parameters = std::move(parameters),
length = std::move(length),
format = std::move(format)]() {
auto thisPtr = weakPtr.lock();
if (!thisPtr)
return;
thisPtr->_isRreparingStatement = false;
thisPtr->_preparedStatementMap[thisPtr->_sql] = statementName;
if (PQsendQueryPrepared(thisPtr->_connPtr.get(),
statementName.c_str(),
paraNum,
parameters.data(),
length.data(),
format.data(),
0) == 0)
{
LOG_ERROR << "send query error: " << PQerrorMessage(thisPtr->_connPtr.get());
if (thisPtr->_isWorking)
{
thisPtr->_isWorking = false;
try
{
throw Failure(PQerrorMessage(thisPtr->_connPtr.get()));
}
catch (...)
{
auto exceptPtr = std::current_exception();
thisPtr->_exceptCb(exceptPtr);
thisPtr->_exceptCb = nullptr;
}
thisPtr->_cb = nullptr;
thisPtr->_idleCb();
}
return;
}
};
}
}
pgPoll();
}
void PgConnection::handleRead()
@ -206,14 +326,9 @@ void PgConnection::handleRead()
{
auto exceptPtr = std::current_exception();
_exceptCb(exceptPtr);
_exceptCb = decltype(_exceptCb)();
}
_cb = decltype(_cb)();
if (_idleCb)
{
_idleCb();
_idleCb = decltype(_idleCb)();
_exceptCb = nullptr;
}
_cb = nullptr;
}
handleClosed();
return;
@ -225,7 +340,7 @@ void PgConnection::handleRead()
}
if (_channel.isWriting())
_channel.disableWriting();
// got query results?
bool isPreparing = false;
while ((res = std::shared_ptr<PGresult>(PQgetResult(_connPtr.get()), [](PGresult *p) {
PQclear(p);
})))
@ -236,40 +351,49 @@ void PgConnection::handleRead()
LOG_WARN << PQerrorMessage(_connPtr.get());
if (_isWorking)
{
try
{
try
{
//TODO: exception type
throw SqlError(PQerrorMessage(_connPtr.get()),
_sql);
}
catch (...)
{
_exceptCb(std::current_exception());
_exceptCb = decltype(_exceptCb)();
}
//TODO: exception type
throw SqlError(PQerrorMessage(_connPtr.get()), _sql);
}
_cb = decltype(_cb)();
catch (...)
{
_exceptCb(std::current_exception());
_exceptCb = nullptr;
}
_cb = nullptr;
}
}
else
{
if (_isWorking)
{
auto r = makeResult(res, _sql);
_cb(r);
_cb = decltype(_cb)();
_exceptCb = decltype(_exceptCb)();
if (_isRreparingStatement)
{
isPreparing = true;
}
else
{
auto r = makeResult(res, _sql);
_cb(r);
_cb = nullptr;
_exceptCb = nullptr;
}
}
}
}
if (_isWorking)
{
_isWorking = false;
if (_idleCb)
if (isPreparing)
{
_preparingCallback();
_preparingCallback = nullptr;
}
else
{
_isWorking = false;
_isRreparingStatement = false;
_idleCb();
_idleCb = decltype(_idleCb)();
}
}
}

View File

@ -24,6 +24,7 @@
#include <string>
#include <functional>
#include <iostream>
#include <unordered_map>
namespace drogon
{
@ -43,16 +44,58 @@ class PgConnection : public DbConnection, public std::enable_shared_from_this<Pg
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override;
std::function<void(const std::exception_ptr &)> &&exceptCallback) override
{
if (_loop->isInLoopThread())
{
execSqlInLoop(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback));
}
else
{
auto thisPtr = shared_from_this();
_loop->queueInLoop([thisPtr,
sql = std::move(sql),
paraNum,
parameters = std::move(parameters),
length = std::move(length),
format = std::move(format),
rcb = std::move(rcb),
exceptCallback = std::move(exceptCallback)]() mutable {
thisPtr->execSqlInLoop(std::move(sql),
paraNum,
std::move(parameters),
std::move(length),
std::move(format),
std::move(rcb),
std::move(exceptCallback));
});
}
}
virtual void disconnect() override;
private:
std::shared_ptr<PGconn> _connPtr;
trantor::Channel _channel;
std::unordered_map<std::string, std::string> _preparedStatementMap;
bool _isRreparingStatement = false;
void handleRead();
void pgPoll();
void handleClosed();
void execSqlInLoop(std::string &&sql,
size_t paraNum,
std::vector<const char *> &&parameters,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback);
std::function<void()> _preparingCallback;
};
} // namespace orm

View File

@ -93,12 +93,18 @@ void Sqlite3Connection::execSql(std::string &&sql,
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb)
std::function<void(const std::exception_ptr &)> &&exceptCallback)
{
auto thisPtr = shared_from_this();
_loopThread.getLoop()->runInLoop([thisPtr, sql = std::move(sql), paraNum, parameters = std::move(parameters), length = std::move(length), format = std::move(format), rcb = std::move(rcb), exceptCallback = std::move(exceptCallback), idleCb = std::move(idleCb)]() mutable {
thisPtr->execSqlInQueue(sql, paraNum, parameters, length, format, rcb, exceptCallback, idleCb);
_loopThread.getLoop()->runInLoop([thisPtr,
sql = std::move(sql),
paraNum,
parameters = std::move(parameters),
length = std::move(length),
format = std::move(format),
rcb = std::move(rcb),
exceptCallback = std::move(exceptCallback)]() mutable {
thisPtr->execSqlInQueue(sql, paraNum, parameters, length, format, rcb, exceptCallback);
});
}
@ -108,8 +114,7 @@ void Sqlite3Connection::execSqlInQueue(const std::string &sql,
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb)
const std::function<void(const std::exception_ptr &)> &exceptCallback)
{
LOG_TRACE << "sql:" << sql;
sqlite3_stmt *stmt = nullptr;
@ -214,7 +219,7 @@ void Sqlite3Connection::execSqlInQueue(const std::string &sql,
}
rcb(Result(resultPtr));
idleCb();
_idleCb();
}
int Sqlite3Connection::stmtStep(sqlite3_stmt *stmt, const std::shared_ptr<Sqlite3ResultImpl> &resultPtr, int columnNum)

View File

@ -47,8 +47,7 @@ class Sqlite3Connection : public DbConnection, public std::enable_shared_from_th
std::vector<int> &&length,
std::vector<int> &&format,
ResultCallback &&rcb,
std::function<void(const std::exception_ptr &)> &&exceptCallback,
std::function<void()> &&idleCb) override;
std::function<void(const std::exception_ptr &)> &&exceptCallback) override;
virtual void disconnect() override;
private:
@ -59,8 +58,7 @@ class Sqlite3Connection : public DbConnection, public std::enable_shared_from_th
const std::vector<int> &length,
const std::vector<int> &format,
const ResultCallback &rcb,
const std::function<void(const std::exception_ptr &)> &exceptCallback,
const std::function<void()> &idleCb);
const std::function<void(const std::exception_ptr &)> &exceptCallback);
void onError(const std::string &sql, const std::function<void(const std::exception_ptr &)> &exceptCallback);
int stmtStep(sqlite3_stmt *stmt, const std::shared_ptr<Sqlite3ResultImpl> &resultPtr, int columnNum);
trantor::EventLoopThread _loopThread;