mirror of
https://gitee.com/acl-dev/acl.git
synced 2024-12-16 01:40:52 +08:00
155 lines
3.0 KiB
C++
155 lines
3.0 KiB
C++
// beanstalk.cpp : 定义控制台应用程序的入口点。
|
|
//
|
|
|
|
#include "stdafx.h"
|
|
#include "util.h"
|
|
|
|
static char __addr[64];
|
|
static const char* __tube = "zsxxsz";
|
|
static int __max = 100;
|
|
|
|
// 消息生产者线程
|
|
static void* producer(void* ctx)
|
|
{
|
|
(void) ctx;
|
|
|
|
// beanstalk 客户端连接对象
|
|
acl::beanstalk conn(__addr, 10);
|
|
|
|
// 指定消息目标队列
|
|
if (conn.use(__tube) == false)
|
|
{
|
|
printf("use %s error\r\n", __tube);
|
|
return NULL;
|
|
}
|
|
else
|
|
printf("use %s ok\r\n", __tube);
|
|
|
|
acl::string data;
|
|
unsigned long long id;
|
|
for (int i = 0; i < __max; i++)
|
|
{
|
|
data.format("hello-%d", i);
|
|
// 向 beanstalkd 消息服务器发送消息
|
|
if ((id = conn.put(data.c_str(), data.length())) == 0)
|
|
{
|
|
printf("put %s failed\r\n", data.c_str());
|
|
return NULL;
|
|
}
|
|
else if (i < 10)
|
|
printf("put %s ok, id: %llu, len: %d\r\n",
|
|
data.c_str(), id, (int) data.length());
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
// 接收队列消息的消费者线程
|
|
static void* consumer(void* ctx)
|
|
{
|
|
(void) ctx;
|
|
|
|
acl::beanstalk conn(__addr, 10);
|
|
// 从指定消息队列中接收消息
|
|
if (conn.watch(__tube) == false)
|
|
{
|
|
printf("watch %s faile\r\n", __tube);
|
|
return NULL;
|
|
}
|
|
|
|
acl::string buf;
|
|
unsigned long long id;
|
|
struct timeval begin, end;
|
|
gettimeofday(&begin, NULL);
|
|
ACL_METER_TIME("begin");
|
|
for (int i = 0; i < __max; i++)
|
|
{
|
|
// 接收一条消息
|
|
if ((id = conn.reserve(buf)) == 0)
|
|
{
|
|
printf("reserve failed\r\n");
|
|
return NULL;
|
|
}
|
|
else if (i < 10)
|
|
printf("reserved: %s\r\n", buf.c_str());
|
|
|
|
// 删除接收到的消息
|
|
if (conn.delete_id(id) == false)
|
|
{
|
|
printf("delete id %llu failed\r\n", id);
|
|
return NULL;
|
|
}
|
|
else if (i < 10)
|
|
printf("delete id %llu ok\r\n", id);
|
|
if (i % 1000 == 0)
|
|
{
|
|
buf.format_append("; total: %d, curr: %d, id: %lld",
|
|
__max, i, id);
|
|
ACL_METER_TIME(buf.c_str());
|
|
}
|
|
}
|
|
|
|
gettimeofday(&end, NULL);
|
|
double n = util::stamp_sub(&end, &begin);
|
|
printf("total get: %d, spent: %0.2f ms, speed: %0.2f\r\n",
|
|
__max, n, (__max * 1000) /(n > 0 ? n : 1));
|
|
return NULL;
|
|
}
|
|
|
|
static void test1()
|
|
{
|
|
acl_pthread_attr_t attr;
|
|
acl_pthread_t tid1, tid2;
|
|
|
|
acl_pthread_attr_init(&attr);
|
|
acl_pthread_attr_setdetachstate(&attr, 0);
|
|
|
|
acl_pthread_create(&tid1, &attr, consumer, NULL);
|
|
acl_pthread_create(&tid2, &attr, producer, NULL);
|
|
|
|
acl_pthread_join(tid1, NULL);
|
|
acl_pthread_join(tid2, NULL);
|
|
}
|
|
|
|
static void usage(const char* procname)
|
|
{
|
|
printf("usage: %s -h [help] -s beanstalk_addr [127.0.0.1:11300] -n max_count\r\n", procname);
|
|
}
|
|
|
|
int main(int argc, char* argv[])
|
|
{
|
|
#if WIN32
|
|
acl::acl_cpp_init();
|
|
#endif
|
|
snprintf(__addr, sizeof(__addr), "127.0.0.1:11300");
|
|
int ch;
|
|
while ((ch = getopt(argc, argv, "hs:n:")) > 0)
|
|
{
|
|
switch (ch)
|
|
{
|
|
case 'h':
|
|
usage(argv[0]);
|
|
return 0;
|
|
case 's':
|
|
snprintf(__addr, sizeof(__addr), "%s", optarg);
|
|
break;
|
|
case 'n':
|
|
__max = atoi(optarg);
|
|
if (__max <= 0)
|
|
__max = 1;
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
test1();
|
|
|
|
#ifdef WIN32
|
|
printf("enter any key to exit\r\n");
|
|
getchar();
|
|
#endif
|
|
return 0;
|
|
}
|
|
|