Refactor HttpServer codes. (#1508)

This commit is contained in:
Nitromelon 2023-02-28 10:19:26 +08:00 committed by GitHub
parent d321bd4fc1
commit bc028776f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1010 additions and 948 deletions

View File

@ -13,52 +13,44 @@
*/
#include "HttpAppFrameworkImpl.h"
#include "HttpRequestImpl.h"
#include "HttpClientImpl.h"
#include "HttpResponseImpl.h"
#include "HttpUtils.h"
#include "WebSocketConnectionImpl.h"
#include "StaticFileRouter.h"
#include "HttpSimpleControllersRouter.h"
#include "HttpControllersRouter.h"
#include "WebsocketControllersRouter.h"
#include "AOPAdvice.h"
#include "ConfigLoader.h"
#include "HttpServer.h"
#include "PluginsManager.h"
#include "ListenerManager.h"
#include "SharedLibManager.h"
#include "SessionManager.h"
#include "DbClientManager.h"
#include "RedisClientManager.h"
#include <drogon/config.h>
#include <algorithm>
#include <drogon/version.h>
#include <drogon/CacheMap.h>
#include <drogon/DrClassMap.h>
#include <drogon/HttpRequest.h>
#include <drogon/HttpResponse.h>
#include <drogon/HttpTypes.h>
#include <drogon/Session.h>
#include <drogon/utils/Utilities.h>
#include "filesystem.h"
#include <trantor/utils/AsyncFileLogger.h>
#include <drogon/version.h>
#include <json/json.h>
#include <trantor/utils/AsyncFileLogger.h>
#include <algorithm>
#include "AOPAdvice.h"
#include "ConfigLoader.h"
#include "DbClientManager.h"
#include "HttpClientImpl.h"
#include "HttpControllersRouter.h"
#include "HttpRequestImpl.h"
#include "HttpResponseImpl.h"
#include "HttpServer.h"
#include "HttpSimpleControllersRouter.h"
#include "HttpUtils.h"
#include "ListenerManager.h"
#include "PluginsManager.h"
#include "RedisClientManager.h"
#include "SessionManager.h"
#include "SharedLibManager.h"
#include "StaticFileRouter.h"
#include "WebSocketConnectionImpl.h"
#include "WebsocketControllersRouter.h"
#include "filesystem.h"
#include <fstream>
#include <iostream>
#include <memory>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <tuple>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#ifndef _WIN32
#include <sys/wait.h>
#include <sys/file.h>
#include <uuid.h>
#include <unistd.h>
#define os_access access
#elif !defined(_WIN32) || defined(__MINGW32__)
@ -543,8 +535,20 @@ void HttpAppFrameworkImpl::run()
libFileOutputPath_);
}
#endif
// Create IO threads
ioLoopThreadPool_ =
std::make_unique<trantor::EventLoopThreadPool>(threadNum_,
"DrogonIoLoop");
std::vector<trantor::EventLoop *> ioLoops = ioLoopThreadPool_->getLoops();
for (size_t i = 0; i < threadNum_; ++i)
{
ioLoops[i]->setIndex(i);
}
getLoop()->setIndex(threadNum_);
// Create all listeners.
auto ioLoops = listenerManagerPtr_->createListeners(
listenerManagerPtr_->createListeners(
[this](const HttpRequestImplPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback) {
onAsyncRequest(req, std::move(callback));
@ -559,15 +563,10 @@ void HttpAppFrameworkImpl::run()
sslCertPath_,
sslKeyPath_,
sslConfCmds_,
threadNum_,
ioLoops,
syncAdvices_,
preSendingAdvices_);
assert(ioLoops.size() == threadNum_);
for (size_t i = 0; i < threadNum_; ++i)
{
ioLoops[i]->setIndex(i);
}
getLoop()->setIndex(threadNum_);
// A fast database client instance should be created in the main event
// loop, so put the main loop into ioLoops.
ioLoops.push_back(getLoop());
@ -609,6 +608,12 @@ void HttpAppFrameworkImpl::run()
// Let listener event loops run when everything is ready.
listenerManagerPtr_->startListening();
});
// start all loops
// TODO: when should IOLoops start?
// In before, IOLoops are started in `listenerManagerPtr_->startListening()`
// It should be fine for them to start anywhere before `startListening()`.
// However, we should consider other components.
ioLoopThreadPool_->start();
getLoop()->loop();
}
@ -883,8 +888,19 @@ trantor::EventLoop *HttpAppFrameworkImpl::getLoop() const
trantor::EventLoop *HttpAppFrameworkImpl::getIOLoop(size_t id) const
{
assert(listenerManagerPtr_);
return listenerManagerPtr_->getIOLoop(id);
if (!ioLoopThreadPool_)
{
LOG_WARN << "Please call getIOLoop() after drogon::app().run()";
return nullptr;
}
auto n = ioLoopThreadPool_->size();
if (id >= n)
{
LOG_TRACE << "Loop id (" << id << ") out of range [0-" << n << ").";
id %= n;
LOG_TRACE << "Rounded to : " << id;
}
return ioLoopThreadPool_->getLoop(id);
}
HttpAppFramework &HttpAppFramework::instance()
@ -1028,6 +1044,7 @@ void HttpAppFrameworkImpl::quit()
getLoop()->queueInLoop([this]() {
// Release members in the reverse order of initialization
listenerManagerPtr_->stopListening();
listenerManagerPtr_.reset();
websockCtrlsRouterPtr_.reset();
staticFileRouterPtr_.reset();
httpSimpleCtrlsRouterPtr_.reset();
@ -1035,12 +1052,13 @@ void HttpAppFrameworkImpl::quit()
pluginsManagerPtr_.reset();
redisClientManagerPtr_.reset();
dbClientManagerPtr_.reset();
// TODO: let HttpAppFrameworkImpl manage IO loops
// and reset listenerManagerPtr_ before IO loops quit.
listenerManagerPtr_->stopIoLoops();
listenerManagerPtr_.reset();
running_ = false;
getLoop()->quit();
for (trantor::EventLoop *loop : ioLoopThreadPool_->getLoops())
{
loop->quit();
}
ioLoopThreadPool_->wait();
});
}
}

View File

@ -14,17 +14,22 @@
#pragma once
#include "impl_forwards.h"
#include <drogon/HttpAppFramework.h>
#include <drogon/config.h>
#include <json/json.h>
#include <functional>
#include <limits>
#include <memory>
#include <mutex>
#include <regex>
#include <string>
#include <vector>
#include <functional>
#include <limits>
#include "impl_forwards.h"
namespace trantor
{
class EventLoopThreadPool;
}
namespace drogon
{
@ -635,6 +640,8 @@ class HttpAppFrameworkImpl final : public HttpAppFramework
std::atomic_bool routersInit_{false};
size_t threadNum_{1};
std::unique_ptr<trantor::EventLoopThreadPool> ioLoopThreadPool_;
#ifndef _WIN32
std::vector<std::string> libFilePaths_;
std::string libFileOutputPath_;

View File

@ -13,18 +13,22 @@
*/
#include "HttpRequestParser.h"
#include "HttpAppFrameworkImpl.h"
#include "HttpResponseImpl.h"
#include "HttpRequestImpl.h"
#include "HttpUtils.h"
#include <drogon/HttpTypes.h>
#include <iostream>
#include <trantor/utils/Logger.h>
#include <trantor/utils/MsgBuffer.h>
#include <iostream>
#include "HttpAppFrameworkImpl.h"
#include "HttpRequestImpl.h"
#include "HttpResponseImpl.h"
#include "HttpUtils.h"
using namespace trantor;
using namespace drogon;
static constexpr size_t CRLF_LEN = 2; // strlen("crlf")
static constexpr size_t METHOD_MAX_LEN = 7; // strlen("OPTIONS")
static constexpr size_t TRUNK_LEN_MAX_LEN = 16; // 0xFFFFFFFF,FFFFFFFF
HttpRequestParser::HttpRequestParser(const trantor::TcpConnectionPtr &connPtr)
: status_(HttpRequestParseStatus::kExpectMethod),
loop_(connPtr->getLoop()),
@ -127,251 +131,248 @@ void HttpRequestParser::reset()
request_->setCreationDate(trantor::Date::now());
}
}
// Return false if any error
bool HttpRequestParser::parseRequest(MsgBuffer *buf)
/**
* @return return -1 if encounters any error in request
* @return return 0 if request is not ready
* @return return 1 if request is ready
*/
int HttpRequestParser::parseRequest(MsgBuffer *buf)
{
bool ok = true;
bool hasMore = true;
// std::cout<<std::string(buf->peek(),buf->readableBytes())<<std::endl;
while (hasMore)
while (true)
{
if (status_ == HttpRequestParseStatus::kExpectMethod)
switch (status_)
{
auto *space =
std::find(buf->peek(), (const char *)buf->beginWrite(), ' ');
if (space != buf->beginWrite())
case (HttpRequestParseStatus::kExpectMethod):
{
if (request_->setMethod(buf->peek(), space))
auto *space = std::find(buf->peek(),
(const char *)buf->beginWrite(),
' ');
// no space in buffer
if (space == buf->beginWrite())
{
status_ = HttpRequestParseStatus::kExpectRequestLine;
buf->retrieveUntil(space + 1);
continue;
if (buf->readableBytes() > METHOD_MAX_LEN)
{
buf->retrieveAll();
shutdownConnection(k400BadRequest);
return -1;
}
return 0;
}
else
// try read method
if (!request_->setMethod(buf->peek(), space))
{
buf->retrieveAll();
shutdownConnection(k405MethodNotAllowed);
return false;
return -1;
}
status_ = HttpRequestParseStatus::kExpectRequestLine;
buf->retrieveUntil(space + 1);
continue;
}
else
case HttpRequestParseStatus::kExpectRequestLine:
{
if (buf->readableBytes() >= 7)
const char *crlf = buf->findCRLF();
if (!crlf)
{
if (buf->readableBytes() >= 64 * 1024)
{
/// The limit for request line is 64K bytes. response
/// k414RequestURITooLarge
/// TODO: Make this configurable?
buf->retrieveAll();
shutdownConnection(k414RequestURITooLarge);
return -1;
}
return 0;
}
if (!processRequestLine(buf->peek(), crlf))
{
// error
buf->retrieveAll();
shutdownConnection(k400BadRequest);
return false;
return -1;
}
hasMore = false;
buf->retrieveUntil(crlf + CRLF_LEN);
status_ = HttpRequestParseStatus::kExpectHeaders;
continue;
}
}
else if (status_ == HttpRequestParseStatus::kExpectRequestLine)
{
const char *crlf = buf->findCRLF();
if (crlf)
case HttpRequestParseStatus::kExpectHeaders:
{
ok = processRequestLine(buf->peek(), crlf);
if (ok)
const char *crlf = buf->findCRLF();
if (!crlf)
{
buf->retrieveUntil(crlf + 2);
status_ = HttpRequestParseStatus::kExpectHeaders;
if (buf->readableBytes() >= 64 * 1024)
{
/// The limit for every request header is 64K bytes;
/// TODO: Make this configurable?
buf->retrieveAll();
shutdownConnection(k400BadRequest);
return -1;
}
return 0;
}
else
{
buf->retrieveAll();
shutdownConnection(k400BadRequest);
return false;
}
}
else
{
if (buf->readableBytes() >= 64 * 1024)
{
/// The limit for request line is 64K bytes. respone
/// k414RequestURITooLarge
/// TODO: Make this configurable?
buf->retrieveAll();
shutdownConnection(k414RequestURITooLarge);
return false;
}
hasMore = false;
}
}
else if (status_ == HttpRequestParseStatus::kExpectHeaders)
{
const char *crlf = buf->findCRLF();
if (crlf)
{
const char *colon = std::find(buf->peek(), crlf, ':');
// found colon
if (colon != crlf)
{
request_->addHeader(buf->peek(), colon, crlf);
buf->retrieveUntil(crlf + CRLF_LEN);
continue;
}
else
buf->retrieveUntil(crlf + CRLF_LEN);
// end of headers
// We might want a kProcessHeaders status for code readability
// and maintainability.
// process header information
auto &len = request_->getHeaderBy("content-length");
if (!len.empty())
{
// empty line, end of header
const std::string &len =
request_->getHeaderBy("content-length");
if (!len.empty())
try
{
try
{
currentContentLength_ =
static_cast<size_t>(std::stoull(len));
}
catch (...)
{
buf->retrieveAll();
shutdownConnection(k400BadRequest);
return false;
}
if (currentContentLength_ == 0)
{
status_ = HttpRequestParseStatus::kGotAll;
++requestsCounter_;
hasMore = false;
}
else
{
status_ = HttpRequestParseStatus::kExpectBody;
}
currentContentLength_ =
static_cast<size_t>(std::stoull(len));
}
catch (...)
{
buf->retrieveAll();
shutdownConnection(k400BadRequest);
return -1;
}
if (currentContentLength_ == 0)
{
// content-length = 0, request is over.
status_ = HttpRequestParseStatus::kGotAll;
++requestsCounter_;
return 1;
}
else
{
const std::string &encode =
request_->getHeaderBy("transfer-encoding");
if (encode.empty())
{
status_ = HttpRequestParseStatus::kGotAll;
++requestsCounter_;
hasMore = false;
}
else if (encode == "chunked")
{
status_ = HttpRequestParseStatus::kExpectChunkLen;
}
else
{
buf->retrieveAll();
shutdownConnection(k501NotImplemented);
return false;
}
status_ = HttpRequestParseStatus::kExpectBody;
}
auto &expect = request_->expect();
if (expect == "100-continue" &&
request_->getVersion() >= Version::kHttp11)
}
else
{
const std::string &encode =
request_->getHeaderBy("transfer-encoding");
if (encode.empty())
{
if (currentContentLength_ == 0)
{
buf->retrieveAll();
shutdownConnection(k400BadRequest);
return false;
}
// rfc2616-8.2.3
auto connPtr = conn_.lock();
if (connPtr)
{
auto resp = HttpResponse::newHttpResponse();
if (currentContentLength_ >
HttpAppFrameworkImpl::instance()
.getClientMaxBodySize())
{
resp->setStatusCode(k413RequestEntityTooLarge);
auto httpString =
static_cast<HttpResponseImpl *>(resp.get())
->renderToBuffer();
reset();
connPtr->send(std::move(*httpString));
}
else
{
resp->setStatusCode(k100Continue);
auto httpString =
static_cast<HttpResponseImpl *>(resp.get())
->renderToBuffer();
connPtr->send(std::move(*httpString));
}
}
// no content-length and no transfer-encoding,
// request is over.
status_ = HttpRequestParseStatus::kGotAll;
++requestsCounter_;
return 1;
}
else if (!expect.empty())
else if (encode == "chunked")
{
LOG_WARN << "417ExpectationFailed for \"" << expect
<< "\"";
auto connPtr = conn_.lock();
if (connPtr)
{
buf->retrieveAll();
shutdownConnection(k417ExpectationFailed);
return false;
}
status_ = HttpRequestParseStatus::kExpectChunkLen;
}
else if (currentContentLength_ >
HttpAppFrameworkImpl::instance()
.getClientMaxBodySize())
else
{
buf->retrieveAll();
shutdownConnection(k413RequestEntityTooLarge);
return false;
shutdownConnection(k501NotImplemented);
return -1;
}
request_->reserveBodySize(currentContentLength_);
}
buf->retrieveUntil(crlf + 2);
}
else
{
if (buf->readableBytes() >= 64 * 1024)
auto &expect = request_->expect();
if (expect == "100-continue" &&
request_->getVersion() >= Version::kHttp11)
{
/// The limit for every request header is 64K bytes;
/// TODO: Make this configurable?
buf->retrieveAll();
shutdownConnection(k400BadRequest);
return false;
if (currentContentLength_ == 0)
{
// error
buf->retrieveAll();
shutdownConnection(k400BadRequest);
return -1;
}
// rfc2616-8.2.3
auto connPtr = conn_.lock();
if (!connPtr)
{
return -1;
}
auto resp = HttpResponse::newHttpResponse();
if (currentContentLength_ >
HttpAppFrameworkImpl::instance().getClientMaxBodySize())
{
resp->setStatusCode(k413RequestEntityTooLarge);
auto httpString =
static_cast<HttpResponseImpl *>(resp.get())
->renderToBuffer();
reset();
connPtr->send(std::move(*httpString));
// TODO: missing logic here
}
else
{
resp->setStatusCode(k100Continue);
auto httpString =
static_cast<HttpResponseImpl *>(resp.get())
->renderToBuffer();
connPtr->send(std::move(*httpString));
}
}
hasMore = false;
else if (!expect.empty())
{
LOG_WARN << "417ExpectationFailed for \"" << expect << "\"";
buf->retrieveAll();
shutdownConnection(k417ExpectationFailed);
return -1;
}
else if (currentContentLength_ >
HttpAppFrameworkImpl::instance()
.getClientMaxBodySize())
{
buf->retrieveAll();
shutdownConnection(k413RequestEntityTooLarge);
return -1;
}
request_->reserveBodySize(currentContentLength_);
continue;
}
}
else if (status_ == HttpRequestParseStatus::kExpectBody)
{
if (buf->readableBytes() == 0)
case HttpRequestParseStatus::kExpectBody:
{
size_t bytesToConsume =
currentContentLength_ <= buf->readableBytes()
? currentContentLength_
: buf->readableBytes();
if (bytesToConsume)
{
request_->appendToBody(buf->peek(), bytesToConsume);
buf->retrieve(bytesToConsume);
currentContentLength_ -= bytesToConsume;
}
if (currentContentLength_ == 0)
{
status_ = HttpRequestParseStatus::kGotAll;
++requestsCounter_;
return 1;
}
break;
// readableBytes() == 0, function should return.
return 0;
}
if (currentContentLength_ >= buf->readableBytes())
{
currentContentLength_ -= buf->readableBytes();
request_->appendToBody(buf->peek(), buf->readableBytes());
buf->retrieveAll();
}
else
{
request_->appendToBody(buf->peek(), currentContentLength_);
buf->retrieve(currentContentLength_);
currentContentLength_ = 0;
}
if (currentContentLength_ == 0)
{
status_ = HttpRequestParseStatus::kGotAll;
++requestsCounter_;
hasMore = false;
}
}
else if (status_ == HttpRequestParseStatus::kExpectChunkLen)
{
const char *crlf = buf->findCRLF();
if (crlf)
case HttpRequestParseStatus::kExpectChunkLen:
{
const char *crlf = buf->findCRLF();
if (!crlf)
{
if (buf->readableBytes() > TRUNK_LEN_MAX_LEN + CRLF_LEN)
{
buf->retrieveAll();
shutdownConnection(k400BadRequest);
return -1;
}
return 0;
}
// chunk length line
std::string len(buf->peek(), crlf - buf->peek());
char *end;
currentChunkLength_ = strtol(len.c_str(), &end, 16);
// LOG_TRACE << "chun length : " <<
// responsePtr_->currentChunkLength_;
if (currentChunkLength_ != 0)
{
if (currentChunkLength_ + currentContentLength_ >
@ -379,7 +380,7 @@ bool HttpRequestParser::parseRequest(MsgBuffer *buf)
{
buf->retrieveAll();
shutdownConnection(k413RequestEntityTooLarge);
return false;
return -1;
}
status_ = HttpRequestParseStatus::kExpectChunkBody;
}
@ -387,137 +388,94 @@ bool HttpRequestParser::parseRequest(MsgBuffer *buf)
{
status_ = HttpRequestParseStatus::kExpectLastEmptyChunk;
}
buf->retrieveUntil(crlf + 2);
buf->retrieveUntil(crlf + CRLF_LEN);
continue;
}
else
case HttpRequestParseStatus::kExpectChunkBody:
{
hasMore = false;
}
}
else if (status_ == HttpRequestParseStatus::kExpectChunkBody)
{
// LOG_TRACE<<"expect chunk
// len="<<responsePtr_->currentChunkLength_;
if (buf->readableBytes() >= (currentChunkLength_ + 2))
{
if (*(buf->peek() + currentChunkLength_) == '\r' &&
*(buf->peek() + currentChunkLength_ + 1) == '\n')
if (buf->readableBytes() < (currentChunkLength_ + CRLF_LEN))
{
request_->appendToBody(buf->peek(), currentChunkLength_);
buf->retrieve(currentChunkLength_ + 2);
currentContentLength_ += currentChunkLength_;
currentChunkLength_ = 0;
status_ = HttpRequestParseStatus::kExpectChunkLen;
return 0;
}
else
if (*(buf->peek() + currentChunkLength_) != '\r' ||
*(buf->peek() + currentChunkLength_ + 1) != '\n')
{
// error!
buf->retrieveAll();
return false;
shutdownConnection(k400BadRequest);
return -1;
}
request_->appendToBody(buf->peek(), currentChunkLength_);
buf->retrieve(currentChunkLength_ + CRLF_LEN);
currentContentLength_ += currentChunkLength_;
currentChunkLength_ = 0;
status_ = HttpRequestParseStatus::kExpectChunkLen;
continue;
}
else
case HttpRequestParseStatus::kExpectLastEmptyChunk:
{
hasMore = false;
}
}
else if (status_ == HttpRequestParseStatus::kExpectLastEmptyChunk)
{
// last empty chunk
const char *crlf = buf->findCRLF();
if (crlf)
{
buf->retrieveUntil(crlf + 2);
// last empty chunk
if (buf->readableBytes() < CRLF_LEN)
{
return 0;
}
if (*(buf->peek()) != '\r' || *(buf->peek() + 1) != '\n')
{
// error!
buf->retrieveAll();
shutdownConnection(k400BadRequest);
return -1;
}
buf->retrieve(CRLF_LEN);
status_ = HttpRequestParseStatus::kGotAll;
request_->addHeader("content-length",
std::to_string(
request_->getBody().length()));
std::to_string(request_->bodyLength()));
request_->removeHeaderBy("transfer-encoding");
++requestsCounter_;
break;
return 1;
}
else
case HttpRequestParseStatus::kGotAll:
{
hasMore = false;
return 1;
}
}
}
return ok;
return -1;
}
void HttpRequestParser::pushRequestToPipelining(const HttpRequestPtr &req)
void HttpRequestParser::pushRequestToPipelining(const HttpRequestPtr &req,
bool isHeadMethod)
{
#ifndef NDEBUG
auto conn = conn_.lock();
if (conn)
{
conn->getLoop()->assertInLoopThread();
}
#endif
requestPipelining_.push_back({req, {nullptr, false}});
assert(loop_->isInLoopThread());
requestPipelining_.push_back({req, {nullptr, isHeadMethod}});
}
HttpRequestPtr HttpRequestParser::getFirstRequest() const
/**
* @return returns true if the the response is the first in pipeline
*/
bool HttpRequestParser::pushResponseToPipelining(const HttpRequestPtr &req,
HttpResponsePtr resp)
{
#ifndef NDEBUG
auto conn = conn_.lock();
if (conn)
assert(loop_->isInLoopThread());
for (size_t i = 0; i != requestPipelining_.size(); ++i)
{
conn->getLoop()->assertInLoopThread();
}
#endif
if (!requestPipelining_.empty())
{
return requestPipelining_.front().first;
}
return nullptr;
}
std::pair<HttpResponsePtr, bool> HttpRequestParser::getFirstResponse() const
{
#ifndef NDEBUG
auto conn = conn_.lock();
if (conn)
{
conn->getLoop()->assertInLoopThread();
}
#endif
if (!requestPipelining_.empty())
{
return requestPipelining_.front().second;
}
return {nullptr, false};
}
void HttpRequestParser::popFirstRequest()
{
#ifndef NDEBUG
auto conn = conn_.lock();
if (conn)
{
conn->getLoop()->assertInLoopThread();
}
#endif
requestPipelining_.pop_front();
}
void HttpRequestParser::pushResponseToPipelining(const HttpRequestPtr &req,
const HttpResponsePtr &resp,
bool isHeadMethod)
{
#ifndef NDEBUG
auto conn = conn_.lock();
if (conn)
{
conn->getLoop()->assertInLoopThread();
}
#endif
for (auto &iter : requestPipelining_)
{
if (iter.first == req)
if (requestPipelining_[i].first == req)
{
iter.second = {resp, isHeadMethod};
return;
requestPipelining_[i].second.first = std::move(resp);
return i == 0;
}
}
}
assert(false); // Should always find a match
return false;
}
void HttpRequestParser::popReadyResponses(
std::vector<std::pair<HttpResponsePtr, bool>> &buffer)
{
while (!requestPipelining_.empty() &&
requestPipelining_.front().second.first)
{
buffer.push_back(std::move(requestPipelining_.front().second));
requestPipelining_.pop_front();
}
}

View File

@ -14,13 +14,14 @@
#pragma once
#include "impl_forwards.h"
#include <drogon/HttpTypes.h>
#include <trantor/utils/NonCopyable.h>
#include <trantor/net/TcpConnection.h>
#include <trantor/utils/MsgBuffer.h>
#include <mutex>
#include <trantor/utils/NonCopyable.h>
#include <deque>
#include <memory>
#include <mutex>
#include "impl_forwards.h"
namespace drogon
{
@ -43,7 +44,7 @@ class HttpRequestParser : public trantor::NonCopyable,
explicit HttpRequestParser(const trantor::TcpConnectionPtr &connPtr);
// return false if any error
bool parseRequest(trantor::MsgBuffer *buf);
int parseRequest(trantor::MsgBuffer *buf);
bool gotAll() const
{
@ -75,13 +76,9 @@ class HttpRequestParser : public trantor::NonCopyable,
websockConnPtr_ = conn;
}
// to support request pipelining(rfc2616-8.1.2.2)
void pushRequestToPipelining(const HttpRequestPtr &req);
HttpRequestPtr getFirstRequest() const;
std::pair<HttpResponsePtr, bool> getFirstResponse() const;
void popFirstRequest();
void pushResponseToPipelining(const HttpRequestPtr &req,
const HttpResponsePtr &resp,
bool isHeadMethod);
void pushRequestToPipelining(const HttpRequestPtr &, bool isHeadMethod);
bool pushResponseToPipelining(const HttpRequestPtr &, HttpResponsePtr);
void popReadyResponses(std::vector<std::pair<HttpResponsePtr, bool>> &);
size_t numberOfRequestsInPipelining() const
{
return requestPipelining_.size();
@ -147,7 +144,7 @@ class HttpRequestParser : public trantor::NonCopyable,
responseBuffer_;
std::unique_ptr<std::vector<HttpRequestImplPtr>> requestBuffer_;
std::vector<HttpRequestImplPtr> requestsPool_;
size_t currentChunkLength_;
size_t currentChunkLength_{0};
size_t currentContentLength_{0};
};

View File

@ -13,16 +13,16 @@
*/
#include "HttpServer.h"
#include "HttpRequestImpl.h"
#include "HttpRequestParser.h"
#include "HttpAppFrameworkImpl.h"
#include "HttpResponseImpl.h"
#include "WebSocketConnectionImpl.h"
#include <drogon/HttpRequest.h>
#include <drogon/HttpResponse.h>
#include <drogon/utils/Utilities.h>
#include <functional>
#include <trantor/utils/Logger.h>
#include <functional>
#include <utility>
#include "HttpAppFrameworkImpl.h"
#include "HttpRequestImpl.h"
#include "HttpRequestParser.h"
#include "HttpResponseImpl.h"
#include "WebSocketConnectionImpl.h"
#if COZ_PROFILING
#include <coz.h>
@ -36,94 +36,6 @@
using namespace std::placeholders;
using namespace drogon;
using namespace trantor;
namespace drogon
{
static HttpResponsePtr getCompressedResponse(const HttpRequestImplPtr &req,
const HttpResponsePtr &response,
bool isHeadMethod)
{
if (isHeadMethod ||
!static_cast<HttpResponseImpl *>(response.get())->shouldBeCompressed())
{
return response;
}
#ifdef USE_BROTLI
if (app().isBrotliEnabled() &&
req->getHeaderBy("accept-encoding").find("br") != std::string::npos)
{
auto newResp = response;
auto strCompress = utils::brotliCompress(response->getBody().data(),
response->getBody().length());
if (!strCompress.empty())
{
if (response->expiredTime() >= 0)
{
// cached response,we need to make a clone
newResp = std::make_shared<HttpResponseImpl>(
*static_cast<HttpResponseImpl *>(response.get()));
newResp->setExpiredTime(-1);
}
newResp->setBody(std::move(strCompress));
newResp->addHeader("Content-Encoding", "br");
}
else
{
LOG_ERROR << "brotli got 0 length result";
}
return newResp;
}
#endif
if (app().isGzipEnabled() &&
req->getHeaderBy("accept-encoding").find("gzip") != std::string::npos)
{
auto newResp = response;
auto strCompress = utils::gzipCompress(response->getBody().data(),
response->getBody().length());
if (!strCompress.empty())
{
if (response->expiredTime() >= 0)
{
// cached response,we need to make a clone
newResp = std::make_shared<HttpResponseImpl>(
*static_cast<HttpResponseImpl *>(response.get()));
newResp->setExpiredTime(-1);
}
newResp->setBody(std::move(strCompress));
newResp->addHeader("Content-Encoding", "gzip");
}
else
{
LOG_ERROR << "gzip got 0 length result";
}
return newResp;
}
return response;
}
static bool isWebSocket(const HttpRequestImplPtr &req)
{
auto &headers = req->headers();
if (headers.find("upgrade") == headers.end() ||
headers.find("connection") == headers.end())
return false;
auto connectionField = req->getHeaderBy("connection");
std::transform(connectionField.begin(),
connectionField.end(),
connectionField.begin(),
[](unsigned char c) { return tolower(c); });
auto upgradeField = req->getHeaderBy("upgrade");
std::transform(upgradeField.begin(),
upgradeField.end(),
upgradeField.begin(),
[](unsigned char c) { return tolower(c); });
if (connectionField.find("upgrade") != std::string::npos &&
upgradeField == "websocket")
{
LOG_TRACE << "new websocket request";
return true;
}
return false;
}
static void defaultHttpAsyncCallback(
const HttpRequestPtr &,
@ -146,27 +58,42 @@ static void defaultWebSockAsyncCallback(
static void defaultConnectionCallback(const trantor::TcpConnectionPtr &)
{
return;
}
} // namespace drogon
static inline bool isWebSocket(const HttpRequestImplPtr &req);
static inline HttpResponsePtr tryDecompressRequest(
const HttpRequestImplPtr &req);
static inline bool passSyncAdvices(
const HttpRequestImplPtr &req,
const std::shared_ptr<HttpRequestParser> &requestParser,
const std::vector<std::function<HttpResponsePtr(const HttpRequestPtr &)>>
&syncAdvices,
bool shouldBePipelined,
bool isHeadMethod);
static inline HttpResponsePtr getCompressedResponse(
const HttpRequestImplPtr &req,
const HttpResponsePtr &response,
bool isHeadMethod);
HttpServer::HttpServer(
EventLoop *loop,
const InetAddress &listenAddr,
const std::string &name,
std::string name,
const std::vector<std::function<HttpResponsePtr(const HttpRequestPtr &)>>
&syncAdvices,
const std::vector<
std::function<void(const HttpRequestPtr &, const HttpResponsePtr &)>>
&preSendingAdvices)
#ifdef __linux__
: server_(loop, listenAddr, name.c_str()),
: server_(loop, listenAddr, std::move(name)),
#else
: server_(loop, listenAddr, name.c_str(), true, app().reusePort()),
: server_(loop, listenAddr, std::move(name), true, app().reusePort()),
#endif
httpAsyncCallback_(defaultHttpAsyncCallback),
newWebsocketCallback_(defaultWebSockAsyncCallback),
connectionCallback_(defaultConnectionCallback),
syncAdvices_(syncAdvices),
hasSyncAdvices_(!syncAdvices.empty()),
preSendingAdvices_(preSendingAdvices)
{
server_.setConnectionCallback(
@ -175,13 +102,11 @@ HttpServer::HttpServer(
[this](const auto &conn, auto buff) { this->onMessage(conn, buff); });
}
HttpServer::~HttpServer()
{
}
HttpServer::~HttpServer() = default;
void HttpServer::start()
{
LOG_TRACE << "HttpServer[" << server_.name() << "] starts listenning on "
LOG_TRACE << "HttpServer[" << server_.name() << "] starts listening on "
<< server_.ipPort();
server_.start();
}
@ -221,112 +146,96 @@ void HttpServer::onMessage(const TcpConnectionPtr &conn, MsgBuffer *buf)
auto requestParser = conn->getContext<HttpRequestParser>();
if (!requestParser)
return;
// With the pipelining feature or web socket, it is possible to receice
// multiple messages at once, so
// the while loop is necessary
if (requestParser->webSocketConn())
{
// Websocket payload
requestParser->webSocketConn()->onNewMessage(conn, buf);
return;
}
else
auto &requests = requestParser->getRequestBuffer();
// With the pipelining feature or web socket, it is possible to receive
// multiple messages at once, so the while loop is necessary
while (buf->readableBytes() > 0)
{
auto &requests = requestParser->getRequestBuffer();
while (buf->readableBytes() > 0)
if (requestParser->isStop())
{
if (requestParser->isStop())
{
// The number of requests has reached the limit.
buf->retrieveAll();
return;
}
if (!requestParser->parseRequest(buf))
{
requestParser->reset();
conn->forceClose();
return;
}
if (requestParser->gotAll())
{
requestParser->requestImpl()->setPeerAddr(conn->peerAddr());
requestParser->requestImpl()->setLocalAddr(conn->localAddr());
requestParser->requestImpl()->setCreationDate(
trantor::Date::date());
requestParser->requestImpl()->setSecure(
conn->isSSLConnection());
if (requestParser->firstReq() &&
isWebSocket(requestParser->requestImpl()))
{
auto wsConn =
std::make_shared<WebSocketConnectionImpl>(conn);
wsConn->setPingMessage("", std::chrono::seconds{30});
auto req = requestParser->requestImpl();
newWebsocketCallback_(
req,
[conn, wsConn, requestParser, this, req](
const HttpResponsePtr &resp) mutable {
if (conn->connected())
{
for (auto &advice : preSendingAdvices_)
{
advice(req, resp);
}
if (resp->statusCode() ==
k101SwitchingProtocols)
{
requestParser->setWebsockConnection(wsConn);
}
auto httpString =
((HttpResponseImpl *)resp.get())
->renderToBuffer();
conn->send(httpString);
COZ_PROGRESS
}
},
wsConn);
}
else
requests.push_back(requestParser->requestImpl());
requestParser->reset();
}
else
{
break;
}
// The number of requests has reached the limit.
buf->retrieveAll();
return;
}
onRequests(conn, requests, requestParser);
requests.clear();
int parseRes = requestParser->parseRequest(buf);
if (parseRes < 0)
{
requestParser->reset();
conn->forceClose();
return;
}
if (parseRes == 0)
{
break;
}
auto &req = requestParser->requestImpl();
req->setPeerAddr(conn->peerAddr());
req->setLocalAddr(conn->localAddr());
req->setCreationDate(trantor::Date::date());
req->setSecure(conn->isSSLConnection());
if (requestParser->firstReq() && isWebSocket(req))
{
auto wsConn = std::make_shared<WebSocketConnectionImpl>(conn);
wsConn->setPingMessage("", std::chrono::seconds{30});
newWebsocketCallback_(
req,
[conn, wsConn, requestParser, this, req](
const HttpResponsePtr &resp) mutable {
if (conn->connected())
{
for (auto &advice : preSendingAdvices_)
{
advice(req, resp);
}
if (resp->statusCode() == k101SwitchingProtocols)
{
requestParser->setWebsockConnection(wsConn);
}
auto httpString =
((HttpResponseImpl *)resp.get())->renderToBuffer();
conn->send(httpString);
COZ_PROGRESS
}
},
wsConn);
}
else
{
requests.push_back(req);
}
requestParser->reset();
}
onRequests(conn, requests, requestParser);
requests.clear();
}
struct CallBackParamPack
struct CallbackParamPack
{
CallBackParamPack() = default;
CallBackParamPack(const trantor::TcpConnectionPtr &conn_,
const HttpRequestImplPtr &req_,
const std::shared_ptr<bool> &loopFlag_,
const std::shared_ptr<HttpRequestParser> &requestParser_,
bool *syncFlagPtr_,
bool close_,
bool isHeadMethod_)
: conn(conn_),
req(req_),
loopFlag(loopFlag_),
requestParser(requestParser_),
syncFlagPtr(syncFlagPtr_),
close(close_),
isHeadMethod(isHeadMethod_),
responseSent(false)
CallbackParamPack(trantor::TcpConnectionPtr conn,
HttpRequestImplPtr req,
std::shared_ptr<bool> loopFlag,
std::shared_ptr<HttpRequestParser> requestParser,
bool isHeadMethod)
: conn_(std::move(conn)),
req_(std::move(req)),
loopFlag_(std::move(loopFlag)),
requestParser_(std::move(requestParser)),
isHeadMethod_(isHeadMethod)
{
}
trantor::TcpConnectionPtr conn;
HttpRequestImplPtr req;
std::shared_ptr<bool> loopFlag;
std::shared_ptr<HttpRequestParser> requestParser;
bool *syncFlagPtr;
bool close;
bool isHeadMethod;
std::atomic<bool> responseSent;
trantor::TcpConnectionPtr conn_;
HttpRequestImplPtr req_;
std::shared_ptr<bool> loopFlag_;
std::shared_ptr<HttpRequestParser> requestParser_;
bool isHeadMethod_;
std::atomic<bool> responseSent_{false};
};
void HttpServer::onRequests(
@ -344,9 +253,9 @@ void HttpServer::onRequests(
conn->shutdown();
return;
}
else if (HttpAppFrameworkImpl::instance().pipeliningRequestsNumber() > 0 &&
requestParser->numberOfRequestsInPipelining() + requests.size() >=
HttpAppFrameworkImpl::instance().pipeliningRequestsNumber())
if (HttpAppFrameworkImpl::instance().pipeliningRequestsNumber() > 0 &&
requestParser->numberOfRequestsInPipelining() + requests.size() >=
HttpAppFrameworkImpl::instance().pipeliningRequestsNumber())
{
requestParser->stop();
conn->shutdown();
@ -360,212 +269,59 @@ void HttpServer::onRequests(
for (auto &req : requests)
{
bool close_ = (!req->keepAlive());
bool isHeadMethod = (req->method() == Head);
if (isHeadMethod)
{
req->setMethod(Get);
}
bool syncFlag = false;
bool reqPipelined = false;
if (!requestParser->emptyPipelining())
{
requestParser->pushRequestToPipelining(req);
syncFlag = true;
requestParser->pushRequestToPipelining(req, isHeadMethod);
reqPipelined = true;
}
if (!syncAdvices_.empty())
if (hasSyncAdvices_ &&
!passSyncAdvices(
req, requestParser, syncAdvices_, reqPipelined, isHeadMethod))
{
bool adviceFlag = false;
for (auto &advice : syncAdvices_)
{
auto resp = advice(req);
if (resp)
{
resp->setVersion(req->getVersion());
resp->setCloseConnection(close_);
if (!syncFlag)
{
requestParser->getResponseBuffer().emplace_back(
getCompressedResponse(req, resp, isHeadMethod),
isHeadMethod);
}
else
{
requestParser->pushResponseToPipelining(
req,
getCompressedResponse(req, resp, isHeadMethod),
isHeadMethod);
}
adviceFlag = true;
break;
}
}
if (adviceFlag)
continue;
continue;
}
// Optimization: Avoids dynamic allocation when copying the callback in
// handlers (ex: copying callback into lambda captures in DB calls)
auto paramPack = std::make_shared<CallBackParamPack>(conn,
req,
loopFlagPtr,
requestParser,
&syncFlag,
close_,
isHeadMethod);
bool respReady{false};
auto paramPack = std::make_shared<CallbackParamPack>(
conn, req, loopFlagPtr, requestParser, isHeadMethod);
auto handleResponse = [paramPack = std::move(paramPack),
this](const HttpResponsePtr &response) {
auto &conn = paramPack->conn;
auto &close_ = paramPack->close;
auto &req = paramPack->req;
auto &syncFlag = *paramPack->syncFlagPtr;
auto &isHeadMethod = paramPack->isHeadMethod;
auto &loopFlagPtr = paramPack->loopFlag;
auto &requestParser = paramPack->requestParser;
if (!response)
return;
if (!conn->connected())
return;
if (paramPack->responseSent.exchange(true,
std::memory_order_acq_rel) ==
true)
{
LOG_ERROR << "Sending more than 1 response for request. "
"Ignoring later response";
return;
}
response->setVersion(req->getVersion());
response->setCloseConnection(close_);
for (auto &advice : preSendingAdvices_)
{
advice(req, response);
}
auto newResp = getCompressedResponse(req, response, isHeadMethod);
if (conn->getLoop()->isInLoopThread())
{
/*
* A client that supports persistent connections MAY
* "pipeline" its requests (i.e., send multiple requests
* without waiting for each response). A server MUST send
* its responses to those requests in the same order that
* the requests were received. rfc2616-8.1.1.2
*/
if (!conn->connected())
return;
if (*loopFlagPtr)
{
syncFlag = true;
if (requestParser->emptyPipelining())
{
requestParser->getResponseBuffer().emplace_back(
newResp, isHeadMethod);
}
else
{
// some earlier requests are waiting for responses;
requestParser->pushResponseToPipelining(req,
newResp,
isHeadMethod);
}
}
else if (requestParser->getFirstRequest() == req)
{
requestParser->popFirstRequest();
std::vector<std::pair<HttpResponsePtr, bool>> resps;
resps.emplace_back(newResp, isHeadMethod);
while (!requestParser->emptyPipelining())
{
auto resp = requestParser->getFirstResponse();
if (resp.first)
{
requestParser->popFirstRequest();
resps.push_back(std::move(resp));
}
else
break;
}
sendResponses(conn, resps, requestParser->getBuffer());
}
else
{
// some earlier requests are waiting for responses;
requestParser->pushResponseToPipelining(req,
newResp,
isHeadMethod);
}
}
else
{
conn->getLoop()->queueInLoop([conn,
req,
newResp,
this,
isHeadMethod,
requestParser]() {
if (conn->connected())
{
if (requestParser->getFirstRequest() == req)
{
requestParser->popFirstRequest();
std::vector<std::pair<HttpResponsePtr, bool>> resps;
resps.emplace_back(newResp, isHeadMethod);
while (!requestParser->emptyPipelining())
{
auto resp = requestParser->getFirstResponse();
if (resp.first)
{
requestParser->popFirstRequest();
resps.push_back(std::move(resp));
}
else
break;
}
sendResponses(conn,
resps,
requestParser->getBuffer());
}
else
{
// some earlier requests are waiting for
// responses;
requestParser->pushResponseToPipelining(
req, newResp, isHeadMethod);
}
}
});
}
};
const bool enableDecompression = app().isCompressedRequestEnabled();
bool sendForProcessing = true;
if (enableDecompression)
auto errResp = tryDecompressRequest(req);
if (errResp)
{
auto status = req->decompressBody();
if (status != StreamDecompressStatus::Ok)
{
sendForProcessing = false;
auto resp = HttpResponse::newHttpResponse();
if (status == StreamDecompressStatus::DecompressError)
resp->setStatusCode(k422UnprocessableEntity);
else if (status == StreamDecompressStatus::NotSupported)
resp->setStatusCode(k415UnsupportedMediaType);
else if (status == StreamDecompressStatus::TooLarge)
resp->setStatusCode(k413RequestEntityTooLarge);
else // Should not happen
resp->setStatusCode(k422UnprocessableEntity);
handleResponse(resp);
}
handleResponse(errResp, paramPack, &respReady);
}
if (sendForProcessing)
httpAsyncCallback_(req, std::move(handleResponse));
if (syncFlag == false)
else
{
requestParser->pushRequestToPipelining(req);
// Although the function has 'async' in its name, the
// handleResponse() callback may be called synchronously. In this
// case, the generated response should not be sent right away, but
// be queued in buffer instead. Those ready responses will be sent
// together after the end of the for loop.
//
// By doing this, we could reduce some system calls when sending
// through socket. In order to achieve this, we create a
// `respReady` variable.
httpAsyncCallback_(req,
[this,
respReadyPtr = &respReady,
paramPack = std::move(paramPack)](
const HttpResponsePtr &response) {
handleResponse(response,
paramPack,
respReadyPtr);
});
}
if (!reqPipelined && !respReady)
{
requestParser->pushRequestToPipelining(req, isHeadMethod);
}
}
*loopFlagPtr = false;
@ -578,17 +334,108 @@ void HttpServer::onRequests(
}
}
using CallbackParams = struct
void HttpServer::handleResponse(
const HttpResponsePtr &response,
const std::shared_ptr<CallbackParamPack> &paramPack,
bool *respReadyPtr)
{
std::function<std::size_t(char *, std::size_t)> dataCallback;
bool bFinished;
auto &conn = paramPack->conn_;
auto &req = paramPack->req_;
auto &requestParser = paramPack->requestParser_;
auto &loopFlagPtr = paramPack->loopFlag_;
const bool isHeadMethod = paramPack->isHeadMethod_;
if (!response)
return;
if (!conn->connected())
return;
if (paramPack->responseSent_.exchange(true, std::memory_order_acq_rel))
{
LOG_ERROR << "Sending more than 1 response for request. "
"Ignoring later response";
return;
}
response->setVersion(req->getVersion());
response->setCloseConnection(!req->keepAlive());
for (auto &advice : preSendingAdvices_)
{
advice(req, response);
}
auto newResp = getCompressedResponse(req, response, isHeadMethod);
if (conn->getLoop()->isInLoopThread())
{
/*
* A client that supports persistent connections MAY
* "pipeline" its requests (i.e., send multiple requests
* without waiting for each response). A server MUST send
* its responses to those requests in the same order that
* the requests were received. rfc2616-8.1.1.2
*/
if (requestParser->emptyPipelining())
{
// response must have arrived synchronously
assert(*loopFlagPtr);
// TODO: change to weakPtr to be sure. But may drop performance.
*respReadyPtr = true;
requestParser->getResponseBuffer().emplace_back(std::move(newResp),
isHeadMethod);
return;
}
if (requestParser->pushResponseToPipelining(req, std::move(newResp)))
{
auto &responseBuffer = requestParser->getResponseBuffer();
requestParser->popReadyResponses(responseBuffer);
if (!*loopFlagPtr)
{
// We have passed the point where `onRequests()` sends
// responses. So, at here we should send ready responses from
// the beginning of pipeline queue.
sendResponses(conn, responseBuffer, requestParser->getBuffer());
responseBuffer.clear();
}
}
}
else
{
conn->getLoop()->queueInLoop([this,
conn,
req,
requestParser,
newResp = std::move(newResp)]() mutable {
if (!conn->connected())
{
return;
}
if (requestParser->pushResponseToPipelining(req,
std::move(newResp)))
{
std::vector<std::pair<HttpResponsePtr, bool>> responses;
requestParser->popReadyResponses(responses);
sendResponses(conn, responses, requestParser->getBuffer());
}
});
}
}
struct ChunkingParams
{
using DataCallback = std::function<std::size_t(char *, std::size_t)>;
explicit ChunkingParams(DataCallback cb) : dataCallback(std::move(cb))
{
}
DataCallback dataCallback;
bool bFinished{false};
#ifndef NDEBUG // defined by CMake for release build
std::size_t nDataReturned;
std::size_t nDataReturned{0};
#endif
};
static std::size_t chunkingCallback(std::shared_ptr<CallbackParams> cbParams,
char *pBuffer,
std::size_t nSize)
static std::size_t chunkingCallback(
const std::shared_ptr<ChunkingParams> &cbParams,
char *pBuffer,
std::size_t nSize)
{
if (!cbParams)
return 0;
@ -600,7 +447,6 @@ static std::size_t chunkingCallback(std::shared_ptr<CallbackParams> cbParams,
{
cbParams->dataCallback(pBuffer, nSize);
cbParams->dataCallback = {};
cbParams.reset();
}
return 0;
}
@ -711,20 +557,11 @@ void HttpServer::sendResponse(const TcpConnectionPtr &conn,
(headers.at("transfer-encoding") == "chunked");
if (bChunked)
{
auto chunkCallback =
std::bind(chunkingCallback,
std::shared_ptr<CallbackParams>(
new CallbackParams{streamCallback,
#ifndef NDEBUG // defined by CMake for release build
false,
0}),
#else
false}),
#endif
_1,
_2);
conn->sendStream(chunkCallback);
conn->sendStream(
[ctx = std::make_shared<ChunkingParams>(
streamCallback)](char *buffer, size_t len) {
return chunkingCallback(ctx, buffer, len);
});
}
else
conn->sendStream(streamCallback);
@ -788,19 +625,11 @@ void HttpServer::sendResponses(
(headers.at("transfer-encoding") == "chunked");
if (bChunked)
{
auto chunkCallback =
std::bind(chunkingCallback,
std::shared_ptr<CallbackParams>(
new CallbackParams{streamCallback,
#ifndef NDEBUG // defined by CMake for release build
false,
0}),
#else
false}),
#endif
_1,
_2);
conn->sendStream(chunkCallback);
conn->sendStream(
[ctx = std::make_shared<ChunkingParams>(
streamCallback)](char *buffer, size_t len) {
return chunkingCallback(ctx, buffer, len);
});
}
else
conn->sendStream(streamCallback);
@ -839,3 +668,168 @@ void HttpServer::sendResponses(
}
buffer.retrieveAll();
}
static inline bool isWebSocket(const HttpRequestImplPtr &req)
{
auto &headers = req->headers();
if (headers.find("upgrade") == headers.end() ||
headers.find("connection") == headers.end())
return false;
auto connectionField = req->getHeaderBy("connection");
std::transform(connectionField.begin(),
connectionField.end(),
connectionField.begin(),
[](unsigned char c) { return tolower(c); });
auto upgradeField = req->getHeaderBy("upgrade");
std::transform(upgradeField.begin(),
upgradeField.end(),
upgradeField.begin(),
[](unsigned char c) { return tolower(c); });
if (connectionField.find("upgrade") != std::string::npos &&
upgradeField == "websocket")
{
LOG_TRACE << "new websocket request";
return true;
}
return false;
}
/**
* @brief calling req->decompressBody(), if not success, generate corresponding
* error response
*/
static inline HttpResponsePtr tryDecompressRequest(
const HttpRequestImplPtr &req)
{
static const bool enableDecompression = app().isCompressedRequestEnabled();
if (!enableDecompression)
{
return nullptr;
}
auto status = req->decompressBody();
if (status == StreamDecompressStatus::Ok)
{
return nullptr;
}
auto resp = HttpResponse::newHttpResponse();
switch (status)
{
case StreamDecompressStatus::TooLarge:
resp->setStatusCode(k413RequestEntityTooLarge);
break;
case StreamDecompressStatus::DecompressError:
resp->setStatusCode(k422UnprocessableEntity);
break;
case StreamDecompressStatus::NotSupported:
resp->setStatusCode(k415UnsupportedMediaType);
break;
case StreamDecompressStatus::Ok:
return nullptr;
}
return resp;
}
/**
* @brief Check request against each sync advice, generate response if request
* is rejected by any one of them.
*
* @return true if all sync advices are passed.
* @return false if rejected by any sync advice.
*/
static inline bool passSyncAdvices(
const HttpRequestImplPtr &req,
const std::shared_ptr<HttpRequestParser> &requestParser,
const std::vector<std::function<HttpResponsePtr(const HttpRequestPtr &)>>
&syncAdvices,
bool shouldBePipelined,
bool isHeadMethod)
{
for (auto &advice : syncAdvices)
{
auto resp = advice(req);
if (resp)
{
// Rejected by sync advice
resp->setVersion(req->getVersion());
resp->setCloseConnection(!req->keepAlive());
if (!shouldBePipelined)
{
requestParser->getResponseBuffer().emplace_back(
getCompressedResponse(req, resp, isHeadMethod),
isHeadMethod);
}
else
{
requestParser->pushResponseToPipelining(
req, getCompressedResponse(req, resp, isHeadMethod));
}
return false;
}
}
return true;
}
static inline HttpResponsePtr getCompressedResponse(
const HttpRequestImplPtr &req,
const HttpResponsePtr &response,
bool isHeadMethod)
{
if (isHeadMethod ||
!static_cast<HttpResponseImpl *>(response.get())->shouldBeCompressed())
{
return response;
}
#ifdef USE_BROTLI
if (app().isBrotliEnabled() &&
req->getHeaderBy("accept-encoding").find("br") != std::string::npos)
{
auto newResp = response;
auto strCompress =
drogon::utils::brotliCompress(response->getBody().data(),
response->getBody().length());
if (!strCompress.empty())
{
if (response->expiredTime() >= 0)
{
// cached response,we need to make a clone
newResp = std::make_shared<HttpResponseImpl>(
*static_cast<HttpResponseImpl *>(response.get()));
newResp->setExpiredTime(-1);
}
newResp->setBody(std::move(strCompress));
newResp->addHeader("Content-Encoding", "br");
}
else
{
LOG_ERROR << "brotli got 0 length result";
}
return newResp;
}
#endif
if (app().isGzipEnabled() &&
req->getHeaderBy("accept-encoding").find("gzip") != std::string::npos)
{
auto newResp = response;
auto strCompress =
drogon::utils::gzipCompress(response->getBody().data(),
response->getBody().length());
if (!strCompress.empty())
{
if (response->expiredTime() >= 0)
{
// cached response,we need to make a clone
newResp = std::make_shared<HttpResponseImpl>(
*static_cast<HttpResponseImpl *>(response.get()));
newResp->setExpiredTime(-1);
}
newResp->setBody(std::move(strCompress));
newResp->addHeader("Content-Encoding", "gzip");
}
else
{
LOG_ERROR << "gzip got 0 length result";
}
return newResp;
}
return response;
}

View File

@ -14,14 +14,15 @@
#pragma once
#include "impl_forwards.h"
#include <trantor/net/TcpServer.h>
#include <trantor/net/callbacks.h>
#include <trantor/utils/NonCopyable.h>
#include <functional>
#include <string>
#include <vector>
#include "impl_forwards.h"
struct CallbackParamPack;
namespace drogon
{
class HttpServer : trantor::NonCopyable
@ -29,7 +30,7 @@ class HttpServer : trantor::NonCopyable
public:
HttpServer(trantor::EventLoop *loop,
const trantor::InetAddress &listenAddr,
const std::string &name,
std::string name,
const std::vector<
std::function<HttpResponsePtr(const HttpRequestPtr &)>>
&syncAdvices,
@ -48,7 +49,6 @@ class HttpServer : trantor::NonCopyable
{
httpAsyncCallback_ = cb;
}
void setNewWebsocketCallback(const WebSocketNewAsyncCallback &cb)
{
newWebsocketCallback_ = cb;
@ -66,6 +66,10 @@ class HttpServer : trantor::NonCopyable
{
server_.setIoLoopNum(numThreads);
}
void setIoLoops(const std::vector<trantor::EventLoop *> &ioLoops)
{
server_.setIoLoops(ioLoops);
}
void kickoffIdleConnections(size_t timeout)
{
server_.kickoffIdleConnections(timeout);
@ -101,6 +105,9 @@ class HttpServer : trantor::NonCopyable
void onRequests(const trantor::TcpConnectionPtr &,
const std::vector<HttpRequestImplPtr> &,
const std::shared_ptr<HttpRequestParser> &);
void handleResponse(const HttpResponsePtr &response,
const std::shared_ptr<CallbackParamPack> &paramPack,
bool *respReadyPtr);
void sendResponse(const trantor::TcpConnectionPtr &,
const HttpResponsePtr &,
bool isHeadMethod);
@ -114,6 +121,7 @@ class HttpServer : trantor::NonCopyable
trantor::ConnectionCallback connectionCallback_;
const std::vector<std::function<HttpResponsePtr(const HttpRequestPtr &)>>
&syncAdvices_;
const bool hasSyncAdvices_;
const std::vector<
std::function<void(const HttpRequestPtr &, const HttpResponsePtr &)>>
&preSendingAdvices_;

View File

@ -13,15 +13,12 @@
*/
#include "ListenerManager.h"
#include "HttpServer.h"
#include "HttpAppFrameworkImpl.h"
#include <drogon/config.h>
#include <trantor/utils/Logger.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <trantor/utils/Logger.h>
#include "HttpAppFrameworkImpl.h"
#include "HttpServer.h"
#ifndef _WIN32
#include <sys/wait.h>
#include <sys/file.h>
#include <unistd.h>
#endif
@ -71,7 +68,17 @@ void ListenerManager::addListener(
ip, port, useSSL, certFile, keyFile, useOldTLS, sslConfCmds);
}
std::vector<trantor::EventLoop *> ListenerManager::createListeners(
std::vector<trantor::InetAddress> ListenerManager::getListeners() const
{
std::vector<trantor::InetAddress> listeners;
for (auto &server : servers_)
{
listeners.emplace_back(server->address());
}
return listeners;
}
void ListenerManager::createListeners(
const HttpAsyncCallback &httpCallback,
const WebSocketNewAsyncCallback &webSocketCallback,
const ConnectionCallback &connectionCallback,
@ -85,25 +92,21 @@ std::vector<trantor::EventLoop *> ListenerManager::createListeners(
const std::string & /*globalKeyFile*/,
const std::vector<std::pair<std::string, std::string>> & /*sslConfCmds*/,
#endif
size_t threadNum,
const std::vector<trantor::EventLoop *> &ioLoops,
const std::vector<std::function<HttpResponsePtr(const HttpRequestPtr &)>>
&syncAdvices,
const std::vector<
std::function<void(const HttpRequestPtr &, const HttpResponsePtr &)>>
&preSendingAdvices)
{
LOG_TRACE << "thread num=" << ioLoops.size();
#ifdef __linux__
for (size_t i = 0; i < threadNum; ++i)
for (size_t i = 0; i < ioLoops.size(); ++i)
{
LOG_TRACE << "thread num=" << threadNum;
auto loopThreadPtr = std::make_shared<EventLoopThread>("DrogonIoLoop");
listeningloopThreads_.push_back(loopThreadPtr);
ioLoops_.push_back(loopThreadPtr->getLoop());
for (auto const &listener : listeners_)
{
auto const &ip = listener.ip_;
bool isIpv6 = ip.find(':') == std::string::npos ? false : true;
std::shared_ptr<HttpServer> serverPtr;
bool isIpv6 = (ip.find(':') != std::string::npos);
InetAddress listenAddress(ip, listener.port_, isIpv6);
if (listenAddress.isUnspecified())
{
@ -121,33 +124,24 @@ std::vector<trantor::EventLoop *> ListenerManager::createListeners(
"drogonPortTest",
true,
false);
serverPtr =
std::make_shared<HttpServer>(loopThreadPtr->getLoop(),
listenAddress,
"drogon",
syncAdvices,
preSendingAdvices);
}
else
{
serverPtr =
std::make_shared<HttpServer>(loopThreadPtr->getLoop(),
listenAddress,
"drogon",
syncAdvices,
preSendingAdvices);
}
std::shared_ptr<HttpServer> serverPtr =
std::make_shared<HttpServer>(ioLoops[i],
listenAddress,
"drogon",
syncAdvices,
preSendingAdvices);
if (listener.useSSL_)
{
#ifdef OpenSSL_FOUND
auto cert = listener.certFile_;
auto key = listener.keyFile_;
if (cert == "")
if (cert.empty())
cert = globalCertFile;
if (key == "")
if (key.empty())
key = globalKeyFile;
if (cert == "" || key == "")
if (cert.empty() || key.empty())
{
std::cerr
<< "You can't use https without cert file or key file"
@ -165,25 +159,22 @@ std::vector<trantor::EventLoop *> ListenerManager::createListeners(
serverPtr->setNewWebsocketCallback(webSocketCallback);
serverPtr->setConnectionCallback(connectionCallback);
serverPtr->kickoffIdleConnections(connectionTimeout);
serverPtr->start();
servers_.push_back(serverPtr);
}
}
#else
ioLoopThreadPoolPtr_ = std::make_shared<EventLoopThreadPool>(threadNum);
if (!listeners_.empty())
{
auto loopThreadPtr =
std::make_shared<EventLoopThread>("DrogonListeningLoop");
listeningloopThreads_.push_back(loopThreadPtr);
listeningThread_ =
std::make_unique<EventLoopThread>("DrogonListeningLoop");
listeningThread_->run();
for (auto const &listener : listeners_)
{
LOG_TRACE << "thread num=" << threadNum;
auto ip = listener.ip_;
bool isIpv6 = ip.find(':') == std::string::npos ? false : true;
bool isIpv6 = (ip.find(':') != std::string::npos);
auto serverPtr = std::make_shared<HttpServer>(
loopThreadPtr->getLoop(),
listeningThread_->getLoop(),
InetAddress(ip, listener.port_, isIpv6),
"drogon",
syncAdvices,
@ -211,106 +202,36 @@ std::vector<trantor::EventLoop *> ListenerManager::createListeners(
serverPtr->enableSSL(cert, key, listener.useOldTLS_, cmds);
#endif
}
serverPtr->setIoLoopThreadPool(ioLoopThreadPoolPtr_);
serverPtr->setIoLoops(ioLoops);
serverPtr->setHttpAsyncCallback(httpCallback);
serverPtr->setNewWebsocketCallback(webSocketCallback);
serverPtr->setConnectionCallback(connectionCallback);
serverPtr->kickoffIdleConnections(connectionTimeout);
serverPtr->start();
servers_.push_back(serverPtr);
}
}
else
{
ioLoopThreadPoolPtr_->start();
}
ioLoops_ = ioLoopThreadPoolPtr_->getLoops();
#endif
return ioLoops_;
}
void ListenerManager::startListening()
{
for (auto &loopThread : listeningloopThreads_)
for (auto &server : servers_)
{
loopThread->run();
server->start();
}
}
trantor::EventLoop *ListenerManager::getIOLoop(size_t id) const
{
#ifdef __linux__
auto const n = listeningloopThreads_.size();
#else
auto const n = ioLoopThreadPoolPtr_->size();
#endif
if (0 == n)
{
LOG_WARN << "Please call getIOLoop() after drogon::app().run()";
return nullptr;
}
if (id >= n)
{
LOG_TRACE << "Loop id (" << id << ") out of range [0-" << n << ").";
id %= n;
LOG_TRACE << "Rounded to : " << id;
}
#ifdef __linux__
assert(listeningloopThreads_[id]);
return listeningloopThreads_[id]->getLoop();
#else
return ioLoopThreadPoolPtr_->getLoop(id);
#endif
}
void ListenerManager::stopListening()
{
for (auto &serverPtr : servers_)
{
serverPtr->stop();
}
#ifndef __linux__
for (auto &listenerLoopPtr : listeningloopThreads_)
if (listeningThread_)
{
auto loop = listenerLoopPtr->getLoop();
auto loop = listeningThread_->getLoop();
assert(!loop->isInLoopThread());
if (loop->isRunning())
{
std::promise<int> pro;
auto f = pro.get_future();
loop->queueInLoop([loop, &pro]() {
loop->quit();
pro.set_value(1);
});
(void)f.get();
}
}
#endif
}
void ListenerManager::stopIoLoops()
{
for (auto loop : ioLoops_)
{
assert(!loop->isInLoopThread());
if (loop->isRunning())
{
std::promise<int> pro;
auto f = pro.get_future();
loop->queueInLoop([loop, &pro]() {
loop->quit();
pro.set_value(1);
});
(void)f.get();
}
loop->quit();
listeningThread_->wait();
}
}
std::vector<trantor::InetAddress> ListenerManager::getListeners() const
{
std::vector<trantor::InetAddress> listeners;
for (auto &server : servers_)
{
listeners.emplace_back(server->address());
}
return listeners;
}

View File

@ -14,13 +14,15 @@
#pragma once
#include "impl_forwards.h"
#include <trantor/utils/NonCopyable.h>
#include <trantor/net/EventLoopThreadPool.h>
#include <trantor/net/callbacks.h>
#include <string>
#include <vector>
#include <trantor/utils/NonCopyable.h>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "impl_forwards.h"
namespace trantor
{
class InetAddress;
@ -30,6 +32,7 @@ namespace drogon
class ListenerManager : public trantor::NonCopyable
{
public:
~ListenerManager() = default;
void addListener(const std::string &ip,
uint16_t port,
bool useSSL = false,
@ -38,7 +41,8 @@ class ListenerManager : public trantor::NonCopyable
bool useOldTLS = false,
const std::vector<std::pair<std::string, std::string>>
&sslConfCmds = {});
std::vector<trantor::EventLoop *> createListeners(
std::vector<trantor::InetAddress> getListeners() const;
void createListeners(
const HttpAsyncCallback &httpCallback,
const WebSocketNewAsyncCallback &webSocketCallback,
const trantor::ConnectionCallback &connectionCallback,
@ -46,7 +50,7 @@ class ListenerManager : public trantor::NonCopyable
const std::string &globalCertFile,
const std::string &globalKeyFile,
const std::vector<std::pair<std::string, std::string>> &sslConfCmds,
size_t threadNum,
const std::vector<trantor::EventLoop *> &ioLoops,
const std::vector<
std::function<HttpResponsePtr(const HttpRequestPtr &)>>
&syncAdvices,
@ -54,32 +58,26 @@ class ListenerManager : public trantor::NonCopyable
const HttpResponsePtr &)>>
&preSendingAdvices);
void startListening();
std::vector<trantor::InetAddress> getListeners() const;
~ListenerManager() = default;
trantor::EventLoop *getIOLoop(size_t id) const;
void stopListening();
void stopIoLoops();
std::vector<trantor::EventLoop *> ioLoops_;
private:
struct ListenerInfo
{
ListenerInfo(
const std::string &ip,
std::string ip,
uint16_t port,
bool useSSL,
const std::string &certFile,
const std::string &keyFile,
std::string certFile,
std::string keyFile,
bool useOldTLS,
const std::vector<std::pair<std::string, std::string>> &sslConfCmds)
: ip_(ip),
std::vector<std::pair<std::string, std::string>> sslConfCmds)
: ip_(std::move(ip)),
port_(port),
useSSL_(useSSL),
certFile_(certFile),
keyFile_(keyFile),
certFile_(std::move(certFile)),
keyFile_(std::move(keyFile)),
useOldTLS_(useOldTLS),
sslConfCmds_(sslConfCmds)
sslConfCmds_(std::move(sslConfCmds))
{
}
std::string ip_;
@ -92,9 +90,10 @@ class ListenerManager : public trantor::NonCopyable
};
std::vector<ListenerInfo> listeners_;
std::vector<std::shared_ptr<HttpServer>> servers_;
std::vector<std::shared_ptr<trantor::EventLoopThread>>
listeningloopThreads_;
std::shared_ptr<trantor::EventLoopThreadPool> ioLoopThreadPoolPtr_;
// should have value when and only when on OS that one port can only be
// listened by one thread
std::unique_ptr<trantor::EventLoopThread> listeningThread_;
};
} // namespace drogon

View File

@ -20,6 +20,7 @@
#include <fstream>
#include <iostream>
#include <algorithm>
#include <memory>
#include <fcntl.h>
#ifndef _WIN32
#include <sys/file.h>
@ -34,23 +35,25 @@
using namespace drogon;
void StaticFileRouter::init(const std::vector<trantor::EventLoop *> &ioloops)
void StaticFileRouter::init(const std::vector<trantor::EventLoop *> &ioLoops)
{
// Max timeout up to about 70 days;
staticFilesCacheMap_ = std::make_unique<
IOThreadStorage<std::unique_ptr<CacheMap<std::string, char>>>>();
staticFilesCacheMap_->init(
[&ioloops](std::unique_ptr<CacheMap<std::string, char>> &mapPtr,
[&ioLoops](std::unique_ptr<CacheMap<std::string, char>> &mapPtr,
size_t i) {
assert(i == ioloops[i]->index());
mapPtr = std::unique_ptr<CacheMap<std::string, char>>(
new CacheMap<std::string, char>(ioloops[i], 1.0, 4, 50));
assert(i == ioLoops[i]->index());
mapPtr = std::make_unique<CacheMap<std::string, char>>(ioLoops[i],
1.0,
4,
50);
});
staticFilesCache_ = std::make_unique<
IOThreadStorage<std::unordered_map<std::string, HttpResponsePtr>>>();
ioLocationsPtr_ =
std::make_shared<IOThreadStorage<std::vector<Location>>>();
for (auto *loop : ioloops)
for (auto *loop : ioLoops)
{
loop->queueInLoop(
[ioLocationsPtr = ioLocationsPtr_, locations = locations_] {

View File

@ -47,7 +47,7 @@ class StaticFileRouter
{
brStaticFlag_ = useBrStatic;
}
void init(const std::vector<trantor::EventLoop *> &ioloops);
void init(const std::vector<trantor::EventLoop *> &ioLoops);
void sendStaticFileResponse(
const std::string &filePath,

View File

@ -38,8 +38,6 @@ DROGON_TEST(HttpPipeliningTest)
REQUIRE(resp->getBody().length() == 44618UL);
});
auto request = HttpRequest::newHttpRequest();
request->setPath("/pipe");
for (int i = 0; i < 19; ++i)
{
client->sendRequest(
@ -56,3 +54,39 @@ DROGON_TEST(HttpPipeliningTest)
});
}
}
DROGON_TEST(HttpPipeliningStrangeTest1)
{
auto client = HttpClient::newHttpClient("127.0.0.1", 8848);
client->setPipeliningDepth(64);
for (int i = 0; i < 4; ++i)
{
auto request = HttpRequest::newHttpRequest();
request->setPath("/pipe/strange-1");
request->setBody(std::to_string(i));
client->sendRequest(request,
[TEST_CTX, i](ReqResult r,
const HttpResponsePtr &resp) {
REQUIRE(r == ReqResult::Ok);
REQUIRE(resp->body() == std::to_string(i));
});
}
}
DROGON_TEST(HttpPipeliningStrangeTest2)
{
auto client = HttpClient::newHttpClient("127.0.0.1", 8848);
client->setPipeliningDepth(64);
for (int i = 0; i < 6; ++i)
{
auto request = HttpRequest::newHttpRequest();
request->setPath("/pipe/strange-2");
request->setBody(std::to_string(i));
client->sendRequest(request,
[TEST_CTX, i](ReqResult r,
const HttpResponsePtr &resp) {
REQUIRE(r == ReqResult::Ok);
REQUIRE(resp->body() == std::to_string(i));
});
}
}

View File

@ -1,10 +1,11 @@
#include "PipeliningTest.h"
#include <trantor/net/EventLoop.h>
#include <atomic>
#include <mutex>
void PipeliningTest::asyncHandleHttpRequest(
void PipeliningTest::normalPipe(
const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback)
std::function<void(const HttpResponsePtr &)> &&callback) const
{
static std::atomic<int> counter{0};
int c = counter.fetch_add(1);
@ -39,3 +40,107 @@ void PipeliningTest::asyncHandleHttpRequest(
callback(resp);
});
}
// Receive 1, cache 1
// Receive 2, send 1 send 2
void PipeliningTest::strangePipe1(
const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback)
{
static std::mutex mtx;
static std::vector<
std::pair<std::function<void(const HttpResponsePtr &)>, std::string>>
callbacks;
LOG_INFO << "Receive request " << req->body();
std::function<void(const HttpResponsePtr &)> cb1;
std::string body1;
std::function<void(const HttpResponsePtr &)> cb2;
std::string body2;
{
std::lock_guard<std::mutex> lock(mtx);
if (callbacks.empty())
{
callbacks.emplace_back(std::move(callback), req->getBody());
return;
}
auto item = std::move(callbacks.back());
callbacks.pop_back();
cb1 = std::move(item.first);
body1 = std::move(item.second);
cb2 = std::move(callback);
body2 = std::string{req->body()};
}
if (cb1)
{
auto resp = HttpResponse::newHttpResponse();
resp->setBody(body1);
cb1(resp);
}
if (cb2)
{
auto resp = HttpResponse::newHttpResponse();
resp->setBody(body2);
cb2(resp);
}
}
// Receive 1, cache 1
// Receive 2, send 1 cache 2
// Receive 3, send 2 send 3
void PipeliningTest::strangePipe2(
const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback)
{
static std::mutex mtx;
static std::vector<
std::pair<std::function<void(const HttpResponsePtr &)>, std::string>>
callbacks;
static uint64_t idx{0};
LOG_INFO << "Receive request " << req->body();
std::function<void(const HttpResponsePtr &)> cb1;
std::string body1;
std::function<void(const HttpResponsePtr &)> cb2;
std::string body2;
{
std::lock_guard<std::mutex> lock(mtx);
++idx;
if (idx % 3 == 1)
{
assert(callbacks.empty());
callbacks.emplace_back(std::move(callback), req->getBody());
return;
}
assert(callbacks.size() == 1);
if (idx % 3 == 2)
{
auto item = std::move(callbacks.back());
cb1 = std::move(item.first);
body1 = std::move(item.second);
callbacks.pop_back();
callbacks.emplace_back(std::move(callback), req->getBody());
}
else
{
auto item = std::move(callbacks.back());
cb1 = std::move(item.first);
body1 = std::move(item.second);
callbacks.pop_back();
cb2 = std::move(callback);
body2 = std::string{req->body()};
}
}
if (cb1)
{
auto resp = HttpResponse::newHttpResponse();
resp->setBody(body1);
cb1(resp);
}
if (cb2)
{
auto resp = HttpResponse::newHttpResponse();
resp->setBody(body2);
cb2(resp);
}
}

View File

@ -1,14 +1,32 @@
#pragma once
#include <drogon/HttpSimpleController.h>
#include <drogon/HttpController.h>
using namespace drogon;
class PipeliningTest : public drogon::HttpSimpleController<PipeliningTest>
// class PipeliningTest : public drogon::HttpSimpleController<PipeliningTest>
//{
// public:
// virtual void asyncHandleHttpRequest(
// const HttpRequestPtr &req,
// std::function<void(const HttpResponsePtr &)> &&callback) override;
// PATH_LIST_BEGIN
// // list path definitions here;
// PATH_ADD("/pipe", Get);
// PATH_LIST_END
// };
class PipeliningTest : public drogon::HttpController<PipeliningTest>
{
public:
virtual void asyncHandleHttpRequest(
METHOD_LIST_BEGIN
ADD_METHOD_TO(PipeliningTest::normalPipe, "/pipe", Get);
ADD_METHOD_TO(PipeliningTest::strangePipe1, "/pipe/strange-1", Get);
ADD_METHOD_TO(PipeliningTest::strangePipe2, "/pipe/strange-2", Get);
METHOD_LIST_END
void normalPipe(
const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback) override;
PATH_LIST_BEGIN
// list path definitions here;
PATH_ADD("/pipe", Get);
PATH_LIST_END
std::function<void(const HttpResponsePtr &)> &&callback) const;
void strangePipe1(const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback);
void strangePipe2(const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback);
};