From 554939d7eea7f10751aab927f052dd8c37e7692c Mon Sep 17 00:00:00 2001 From: Nitromelon Date: Sat, 14 Jan 2023 10:16:50 +0800 Subject: [PATCH] Fix error when PgListener task queue not empty (#1478) * Fix error when task queue not empty. * Update test, listen multiple channels. --- orm_lib/src/postgresql_impl/PgListener.cc | 138 +++++++++++----------- orm_lib/tests/db_listener_test.cc | 65 ++++++---- 2 files changed, 105 insertions(+), 98 deletions(-) diff --git a/orm_lib/src/postgresql_impl/PgListener.cc b/orm_lib/src/postgresql_impl/PgListener.cc index bfb0c909..bc2c4da9 100644 --- a/orm_lib/src/postgresql_impl/PgListener.cc +++ b/orm_lib/src/postgresql_impl/PgListener.cc @@ -150,91 +150,84 @@ void PgListener::listenInLoop(const std::string& channel, loop_->assertInLoopThread(); if (!retryCnt) retryCnt = std::make_shared(0); - if (conn_ && listenTasks_.empty()) + if (conn_ && !conn_->isWorking()) { - if (!conn_->isWorking()) + auto pgConn = std::dynamic_pointer_cast(conn_); + std::string escapedChannel = + escapeIdentifier(pgConn, channel.c_str(), channel.size()); + if (escapedChannel.empty()) { - auto pgConn = std::dynamic_pointer_cast(conn_); - std::string escapedChannel = - escapeIdentifier(pgConn, channel.c_str(), channel.size()); - if (escapedChannel.empty()) - { - LOG_ERROR << "Failed to escape pg identifier, stop listen"; - // TODO: report - return; - } + LOG_ERROR << "Failed to escape pg identifier, stop listen"; + // TODO: report + return; + } - // Because DbConnection::execSql() takes string_view as parameter, - // sql must be hold until query finish. - auto sql = std::make_shared( - (listen ? "LISTEN " : "UNLISTEN ") + escapedChannel); - std::weak_ptr weakThis = shared_from_this(); - conn_->execSql( - *sql, - 0, - {}, - {}, - {}, - [listen, channel, sql](const Result& r) { + // Because DbConnection::execSql() takes string_view as parameter, + // sql must be hold until query finish. + auto sql = std::make_shared( + (listen ? "LISTEN " : "UNLISTEN ") + escapedChannel); + std::weak_ptr weakThis = shared_from_this(); + conn_->execSql( + *sql, + 0, + {}, + {}, + {}, + [listen, channel, sql](const Result& r) { + if (listen) + { + LOG_TRACE << "Listen channel " << channel; + } + else + { + LOG_TRACE << "Unlisten channel " << channel; + } + }, + [listen, channel, weakThis, sql, retryCnt, loop = loop_]( + const std::exception_ptr& exception) { + try + { + std::rethrow_exception(exception); + } + catch (const DrogonDbException& ex) + { + ++(*retryCnt); if (listen) { - LOG_DEBUG << "Listen channel " << channel; + LOG_ERROR << "Failed to listen channel " << channel + << ", error: " << ex.base().what(); + if (*retryCnt > MAX_LISTEN_RETRY) + { + LOG_ERROR << "Failed to listen channel " << channel + << " after max attempt. Stop trying."; + // TODO: report + return; + } } else { - LOG_DEBUG << "Unlisten channel " << channel; - } - }, - [listen, channel, weakThis, sql, retryCnt, loop = loop_]( - const std::exception_ptr& exception) { - try - { - std::rethrow_exception(exception); - } - catch (const DrogonDbException& ex) - { - ++(*retryCnt); - if (listen) - { - LOG_ERROR << "Failed to listen channel " << channel - << ", error: " << ex.base().what(); - if (*retryCnt > MAX_LISTEN_RETRY) - { - LOG_ERROR << "Failed to listen channel " - << channel - << " after max attempt. Stop trying."; - // TODO: report - return; - } - } - else + LOG_ERROR << "Failed to unlisten channel " << channel + << ", error: " << ex.base().what(); + if (*retryCnt > MAX_UNLISTEN_RETRY) { LOG_ERROR << "Failed to unlisten channel " << channel - << ", error: " << ex.base().what(); - if (*retryCnt > MAX_UNLISTEN_RETRY) - { - LOG_ERROR << "Failed to unlisten channel " - << channel - << " after max attempt. Stop trying."; - // TODO: report? - return; - } + << " after max attempt. Stop trying."; + // TODO: report? + return; } - auto delay = (*retryCnt) < 5 ? (*retryCnt * 2) : 10; - loop->runAfter(delay, [=]() { - auto thisPtr = weakThis.lock(); - if (thisPtr) - { - thisPtr->listenInLoop(channel, - listen, - retryCnt); - } - }); } - }); - return; - } + auto delay = (*retryCnt) < 5 ? (*retryCnt * 2) : 10; + loop->runAfter(delay, [=]() { + auto thisPtr = weakThis.lock(); + if (thisPtr) + { + thisPtr->listenInLoop(channel, listen, retryCnt); + } + }); + } + }); + return; } if (listenTasks_.size() > 20000) @@ -245,6 +238,7 @@ void PgListener::listenInLoop(const std::string& channel, return; } + LOG_TRACE << "Add to task queue, channel " << channel; listenTasks_.emplace_back(listen, channel); } diff --git a/orm_lib/tests/db_listener_test.cc b/orm_lib/tests/db_listener_test.cc index bfa4c575..ce3e67f8 100644 --- a/orm_lib/tests/db_listener_test.cc +++ b/orm_lib/tests/db_listener_test.cc @@ -25,8 +25,6 @@ using namespace drogon::orm; using namespace trantor; using namespace std::chrono_literals; -static const std::string LISTEN_CHANNEL = "listen_test"; - #if USE_POSTGRESQL orm::DbClientPtr postgreClient; DROGON_TEST(ListenNotifyTest) @@ -35,36 +33,51 @@ DROGON_TEST(ListenNotifyTest) auto dbListener = DbListener::newPgListener(clientPtr->connectionInfo()); MANDATE(dbListener); + std::vector channels{"listen_test_0", + "listen_test_1", + "listen_test_2"}; + static int numNotifications = 0; LOG_INFO << "Start listen."; - dbListener->listen(LISTEN_CHANNEL, - [TEST_CTX](const std::string &channel, - const std::string &message) { - MANDATE(channel == LISTEN_CHANNEL); - LOG_INFO << "Message from " << LISTEN_CHANNEL << ": " - << message; - ++numNotifications; - }); - - std::this_thread::sleep_for(1s); - LOG_INFO << "Start sending notifications."; - for (int i = 0; i < 10; ++i) + for (auto &chan : channels) { - // Can not use placeholders in LISTEN or NOTIFY command!!! - std::string cmd = - "NOTIFY " + LISTEN_CHANNEL + ", '" + std::to_string(i) + "'"; - clientPtr->execSqlAsync( - cmd, - [i](const orm::Result &result) { LOG_INFO << "Notified " << i; }, - [](const orm::DrogonDbException &ex) { - LOG_ERROR << "Failed to notify " << ex.base().what(); - }); + dbListener->listen(chan, + [TEST_CTX, chan](const std::string &channel, + const std::string &message) { + MANDATE(channel == chan); + LOG_INFO << "Message from " << channel << ": " + << message; + ++numNotifications; + }); } + + std::this_thread::sleep_for(1s); // ensure listen success + LOG_INFO << "Start sending notifications."; + for (int i = 0; i < 5; ++i) + { + for (auto &chan : channels) + { + // Can not use placeholders in LISTEN or NOTIFY command!!! + std::string cmd = + "NOTIFY " + chan + ", '" + std::to_string(i) + "'"; + clientPtr->execSqlAsync( + cmd, + [i, chan](const orm::Result &result) { + LOG_INFO << chan << " notified " << i; + }, + [](const orm::DrogonDbException &ex) { + LOG_ERROR << "Failed to notify " << ex.base().what(); + }); + } + } + std::this_thread::sleep_for(5s); LOG_INFO << "Unlisten."; - dbListener->unlisten("listen_test"); - - CHECK(numNotifications == 10); + for (auto &chan : channels) + { + dbListener->unlisten(chan); + } + CHECK(numNotifications == 15); } #endif