Fix error when PgListener task queue not empty (#1478)

* Fix error when task queue not empty.

* Update test, listen multiple channels.
This commit is contained in:
Nitromelon 2023-01-14 10:16:50 +08:00 committed by GitHub
parent 1618484d74
commit 554939d7ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 105 additions and 98 deletions

View File

@ -150,91 +150,84 @@ void PgListener::listenInLoop(const std::string& channel,
loop_->assertInLoopThread();
if (!retryCnt)
retryCnt = std::make_shared<unsigned int>(0);
if (conn_ && listenTasks_.empty())
if (conn_ && !conn_->isWorking())
{
if (!conn_->isWorking())
auto pgConn = std::dynamic_pointer_cast<PgConnection>(conn_);
std::string escapedChannel =
escapeIdentifier(pgConn, channel.c_str(), channel.size());
if (escapedChannel.empty())
{
auto pgConn = std::dynamic_pointer_cast<PgConnection>(conn_);
std::string escapedChannel =
escapeIdentifier(pgConn, channel.c_str(), channel.size());
if (escapedChannel.empty())
{
LOG_ERROR << "Failed to escape pg identifier, stop listen";
// TODO: report
return;
}
LOG_ERROR << "Failed to escape pg identifier, stop listen";
// TODO: report
return;
}
// Because DbConnection::execSql() takes string_view as parameter,
// sql must be hold until query finish.
auto sql = std::make_shared<std::string>(
(listen ? "LISTEN " : "UNLISTEN ") + escapedChannel);
std::weak_ptr<PgListener> weakThis = shared_from_this();
conn_->execSql(
*sql,
0,
{},
{},
{},
[listen, channel, sql](const Result& r) {
// Because DbConnection::execSql() takes string_view as parameter,
// sql must be hold until query finish.
auto sql = std::make_shared<std::string>(
(listen ? "LISTEN " : "UNLISTEN ") + escapedChannel);
std::weak_ptr<PgListener> weakThis = shared_from_this();
conn_->execSql(
*sql,
0,
{},
{},
{},
[listen, channel, sql](const Result& r) {
if (listen)
{
LOG_TRACE << "Listen channel " << channel;
}
else
{
LOG_TRACE << "Unlisten channel " << channel;
}
},
[listen, channel, weakThis, sql, retryCnt, loop = loop_](
const std::exception_ptr& exception) {
try
{
std::rethrow_exception(exception);
}
catch (const DrogonDbException& ex)
{
++(*retryCnt);
if (listen)
{
LOG_DEBUG << "Listen channel " << channel;
LOG_ERROR << "Failed to listen channel " << channel
<< ", error: " << ex.base().what();
if (*retryCnt > MAX_LISTEN_RETRY)
{
LOG_ERROR << "Failed to listen channel " << channel
<< " after max attempt. Stop trying.";
// TODO: report
return;
}
}
else
{
LOG_DEBUG << "Unlisten channel " << channel;
}
},
[listen, channel, weakThis, sql, retryCnt, loop = loop_](
const std::exception_ptr& exception) {
try
{
std::rethrow_exception(exception);
}
catch (const DrogonDbException& ex)
{
++(*retryCnt);
if (listen)
{
LOG_ERROR << "Failed to listen channel " << channel
<< ", error: " << ex.base().what();
if (*retryCnt > MAX_LISTEN_RETRY)
{
LOG_ERROR << "Failed to listen channel "
<< channel
<< " after max attempt. Stop trying.";
// TODO: report
return;
}
}
else
LOG_ERROR << "Failed to unlisten channel " << channel
<< ", error: " << ex.base().what();
if (*retryCnt > MAX_UNLISTEN_RETRY)
{
LOG_ERROR << "Failed to unlisten channel "
<< channel
<< ", error: " << ex.base().what();
if (*retryCnt > MAX_UNLISTEN_RETRY)
{
LOG_ERROR << "Failed to unlisten channel "
<< channel
<< " after max attempt. Stop trying.";
// TODO: report?
return;
}
<< " after max attempt. Stop trying.";
// TODO: report?
return;
}
auto delay = (*retryCnt) < 5 ? (*retryCnt * 2) : 10;
loop->runAfter(delay, [=]() {
auto thisPtr = weakThis.lock();
if (thisPtr)
{
thisPtr->listenInLoop(channel,
listen,
retryCnt);
}
});
}
});
return;
}
auto delay = (*retryCnt) < 5 ? (*retryCnt * 2) : 10;
loop->runAfter(delay, [=]() {
auto thisPtr = weakThis.lock();
if (thisPtr)
{
thisPtr->listenInLoop(channel, listen, retryCnt);
}
});
}
});
return;
}
if (listenTasks_.size() > 20000)
@ -245,6 +238,7 @@ void PgListener::listenInLoop(const std::string& channel,
return;
}
LOG_TRACE << "Add to task queue, channel " << channel;
listenTasks_.emplace_back(listen, channel);
}

View File

@ -25,8 +25,6 @@ using namespace drogon::orm;
using namespace trantor;
using namespace std::chrono_literals;
static const std::string LISTEN_CHANNEL = "listen_test";
#if USE_POSTGRESQL
orm::DbClientPtr postgreClient;
DROGON_TEST(ListenNotifyTest)
@ -35,36 +33,51 @@ DROGON_TEST(ListenNotifyTest)
auto dbListener = DbListener::newPgListener(clientPtr->connectionInfo());
MANDATE(dbListener);
std::vector<std::string> channels{"listen_test_0",
"listen_test_1",
"listen_test_2"};
static int numNotifications = 0;
LOG_INFO << "Start listen.";
dbListener->listen(LISTEN_CHANNEL,
[TEST_CTX](const std::string &channel,
const std::string &message) {
MANDATE(channel == LISTEN_CHANNEL);
LOG_INFO << "Message from " << LISTEN_CHANNEL << ": "
<< message;
++numNotifications;
});
std::this_thread::sleep_for(1s);
LOG_INFO << "Start sending notifications.";
for (int i = 0; i < 10; ++i)
for (auto &chan : channels)
{
// Can not use placeholders in LISTEN or NOTIFY command!!!
std::string cmd =
"NOTIFY " + LISTEN_CHANNEL + ", '" + std::to_string(i) + "'";
clientPtr->execSqlAsync(
cmd,
[i](const orm::Result &result) { LOG_INFO << "Notified " << i; },
[](const orm::DrogonDbException &ex) {
LOG_ERROR << "Failed to notify " << ex.base().what();
});
dbListener->listen(chan,
[TEST_CTX, chan](const std::string &channel,
const std::string &message) {
MANDATE(channel == chan);
LOG_INFO << "Message from " << channel << ": "
<< message;
++numNotifications;
});
}
std::this_thread::sleep_for(1s); // ensure listen success
LOG_INFO << "Start sending notifications.";
for (int i = 0; i < 5; ++i)
{
for (auto &chan : channels)
{
// Can not use placeholders in LISTEN or NOTIFY command!!!
std::string cmd =
"NOTIFY " + chan + ", '" + std::to_string(i) + "'";
clientPtr->execSqlAsync(
cmd,
[i, chan](const orm::Result &result) {
LOG_INFO << chan << " notified " << i;
},
[](const orm::DrogonDbException &ex) {
LOG_ERROR << "Failed to notify " << ex.base().what();
});
}
}
std::this_thread::sleep_for(5s);
LOG_INFO << "Unlisten.";
dbListener->unlisten("listen_test");
CHECK(numNotifications == 10);
for (auto &chan : channels)
{
dbListener->unlisten(chan);
}
CHECK(numNotifications == 15);
}
#endif