drogon/nosql_lib/redis/tests/redis_subscriber_test.cc
Nikolay Mitev 17c80508c0 Drop cpp14 build (#1740)
* Drop C++14 Support

* Update README.md

* Remove drogon::optional in favor of std::optional

* Remove drogon::filesystem in favor of std::filesystem

* Remove boost::string_view and pre-c++17 hacks

* Remove boost::any

* Remove the string_view.h and the any.h

* Remove boost lib

* Update .clang-format and ci

Co-authored-by: Omar Mohamed <omar@omar-laptop.lan>
Co-authored-by: Ken Matsui <26405363+ken-matsui@users.noreply.github.com>
Co-authored-by: An Tao <antao2002@gmail.com>
Co-authored-by: albaropereyra22 <141711575+albaropereyra22@users.noreply.github.com>
Co-authored-by: Yoshihiro Hokazono <47231909+hokacci@users.noreply.github.com>
Co-authored-by: Omar Mohamed Khallaf <51155980+omarmohamedkh@users.noreply.github.com>
2023-08-23 11:49:55 +08:00

119 lines
3.3 KiB
C++

#define DROGON_TEST_MAIN
#include <drogon/nosql/RedisClient.h>
#include <drogon/drogon_test.h>
#include <drogon/drogon.h>
#include <iostream>
#include <thread>
using namespace std::chrono_literals;
using namespace drogon::nosql;
static std::atomic_int nMsgRecv{0};
static std::atomic_int nPmsgRecv{0};
static std::atomic_int nMsgSent{0};
RedisClientPtr redisClient;
DROGON_TEST(RedisSubscriberTest)
{
redisClient = drogon::nosql::RedisClient::newRedisClient(
trantor::InetAddress("127.0.0.1", 6379), 1);
REQUIRE(redisClient != nullptr);
auto subscriber = redisClient->newSubscriber();
subscriber->subscribe("test_sub",
[](const std::string &channel,
const std::string &message) {
++nMsgRecv;
LOG_INFO << "Channel test_sub receive "
<< nMsgRecv << " messages: " << message;
});
subscriber->psubscribe("test_*",
[](const std::string &channel,
const std::string &message) {
++nPmsgRecv;
LOG_INFO << "Channel " << channel << " receive "
<< nPmsgRecv
<< " pmessages: " << message;
});
std::this_thread::sleep_for(1s);
auto fnPublish = [TEST_CTX](const char *channel, int i) {
redisClient->execCommandAsync(
[TEST_CTX](const drogon::nosql::RedisResult &r) {
SUCCESS();
++nMsgSent;
},
[TEST_CTX](const std::exception &err) {
MANDATE(err.what());
LOG_ERROR << err.what();
++nMsgSent;
},
"publish %s %s%d",
channel,
"drogon",
i);
};
for (int i = 0; i < 5; ++i)
{
fnPublish("test_sub", i);
}
for (int i = 5; i < 10; ++i)
{
fnPublish("test_test", i);
}
while (nMsgSent < 10)
{
std::this_thread::sleep_for(100ms);
}
std::this_thread::sleep_for(1s);
MANDATE(nMsgRecv == 5);
MANDATE(nPmsgRecv == 10);
// Unsub from channel
subscriber->unsubscribe("test_sub");
fnPublish("test_sub", 11);
while (nMsgSent < 11)
{
std::this_thread::sleep_for(100ms);
}
std::this_thread::sleep_for(1s);
MANDATE(nMsgRecv == 5);
MANDATE(nPmsgRecv == 11);
// Unsub from pattern
subscriber->punsubscribe("test_*");
fnPublish("test_sub", 12);
while (nMsgSent < 12)
{
std::this_thread::sleep_for(100ms);
}
std::this_thread::sleep_for(1s);
MANDATE(nMsgRecv == 5);
MANDATE(nPmsgRecv == 11);
}
int main(int argc, char **argv)
{
#ifndef USE_REDIS
LOG_DEBUG << "Drogon is built without "
"Redis. No tests executed.";
return 0;
#endif
std::promise<void> p1;
std::future<void> f1 = p1.get_future();
std::thread thr([&]() {
p1.set_value();
drogon::app().run();
});
f1.get();
int testStatus = drogon::test::run(argc, argv);
drogon::app().getLoop()->queueInLoop([]() { drogon::app().quit(); });
thr.join();
return testStatus;
}