test poll.c

This commit is contained in:
zhengshuxin 2023-06-26 16:26:02 +08:00
parent 9df7c8d7b1
commit c5b6dab9aa
5 changed files with 40 additions and 9 deletions

View File

@ -81,6 +81,7 @@ static void read_callback(EVENT *ev, FILE_EVENT *fe)
pfd = ring_to_appl(iter, POLLFD, me); pfd = ring_to_appl(iter, POLLFD, me);
if (pfd->pfd->events & POLLIN) { if (pfd->pfd->events & POLLIN) {
handle_poll_read(ev, fe, pfd); handle_poll_read(ev, fe, pfd);
break;
} }
} }
} }
@ -133,6 +134,7 @@ static void write_callback(EVENT *ev, FILE_EVENT *fe)
pfd = ring_to_appl(iter, POLLFD, me); pfd = ring_to_appl(iter, POLLFD, me);
if (pfd->pfd->events & POLLOUT) { if (pfd->pfd->events & POLLOUT) {
handle_poll_write(ev, fe, pfd); handle_poll_write(ev, fe, pfd);
break;
} }
} }
} }
@ -215,8 +217,6 @@ static void poll_event_clean(EVENT *ev, POLL_EVENT *pe)
pfd->fe->fiber_w = NULL; pfd->fe->fiber_w = NULL;
} }
ring_init(&pfd->fe->pfds);
// Unrefer the fe because we don't need it again. // Unrefer the fe because we don't need it again.
//file_event_unrefer(pfd->fe); //file_event_unrefer(pfd->fe);
pfd->fe = NULL; pfd->fe = NULL;

View File

@ -1,3 +1,3 @@
include ../Makefile_cpp.in include ../Makefile_cpp.in
CFLAGS += -std=c++14 CFLAGS += -std=c++11
PROG = fiber_pool PROG = fiber_pool

View File

@ -5,7 +5,8 @@ static void usage(const char* procname) {
printf("usage: %s -h[help]\r\n" printf("usage: %s -h[help]\r\n"
" -s ip:port, default: 127.0.0.1:9001\r\n" " -s ip:port, default: 127.0.0.1:9001\r\n"
" -c fiber_pool_count [default: 100] \r\n" " -c fiber_pool_count [default: 100] \r\n"
" -r timeout\r\n" " -r read_timeout\r\n"
" -w write_timeout\r\n"
" -S [if in sync mode, default: false]\r\n" " -S [if in sync mode, default: false]\r\n"
, procname); , procname);
} }
@ -13,9 +14,9 @@ static void usage(const char* procname) {
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
acl::string addr("127.0.0.1:9001"); acl::string addr("127.0.0.1:9001");
bool sync = false, use_unique = false; bool sync = false, use_unique = false;
int ch, nfibers = 100; int ch, nfibers = 100, rtimeo = -1, wtimeo = -1;
while ((ch = getopt(argc, argv, "hs:Sc:r:U")) > 0) { while ((ch = getopt(argc, argv, "hs:Sc:r:w:U")) > 0) {
switch (ch) { switch (ch) {
case 'h': case 'h':
usage(argv[0]); usage(argv[0]);
@ -32,6 +33,12 @@ int main(int argc, char* argv[]) {
case 'U': case 'U':
use_unique = true; use_unique = true;
break; break;
case 'r':
rtimeo = atoi(optarg);
break;
case 'w':
wtimeo = atoi(optarg);
break;
default: default:
break; break;
} }
@ -47,7 +54,7 @@ int main(int argc, char* argv[]) {
std::cout << "unique_ptr should be used for c++14" << std::endl; std::cout << "unique_ptr should be used for c++14" << std::endl;
#endif #endif
} else { } else {
server_pool_run(addr, sync, nfibers); server_pool_run(addr, sync, nfibers, rtimeo, wtimeo);
} }
return 0; return 0;

View File

@ -3,6 +3,7 @@
#include <atomic> #include <atomic>
#include "server_pool.h" #include "server_pool.h"
static int __rtimeo = 0, __wtimeo = 0;
using shared_stream = std::shared_ptr<acl::socket_stream>; using shared_stream = std::shared_ptr<acl::socket_stream>;
class socket_client { class socket_client {
@ -49,13 +50,17 @@ private:
using shared_message = std::shared_ptr<message>; using shared_message = std::shared_ptr<message>;
void server_pool_run(const char* addr, bool sync, int nfibers) { void server_pool_run(const char* addr, bool sync, int nfibers,
int rtimeo, int wtimeo) {
acl::server_socket ss; acl::server_socket ss;
if (!ss.open(addr)) { if (!ss.open(addr)) {
printf("listen %s error %s\r\n", addr, acl::last_serror()); printf("listen %s error %s\r\n", addr, acl::last_serror());
return; return;
} }
__rtimeo = rtimeo;
__wtimeo = wtimeo;
printf("listen on %s, fiber pool: %d\r\n", addr, nfibers); printf("listen on %s, fiber pool: %d\r\n", addr, nfibers);
acl::fiber_sbox2<shared_message> box; acl::fiber_sbox2<shared_message> box;
@ -70,6 +75,15 @@ void server_pool_run(const char* addr, bool sync, int nfibers) {
} }
auto client = msg->get_client(); auto client = msg->get_client();
auto data = msg->get_data(); auto data = msg->get_data();
if (__wtimeo > 0) {
int fd = client->get_conn().sock_handle();
if (acl_write_wait(fd , __wtimeo) < 0) {
printf("write wait error\r\n");
break;
}
}
if (client->get_conn().write(data.c_str(), data.size()) == -1) { if (client->get_conn().write(data.c_str(), data.size()) == -1) {
printf("write error: %s\r\n", acl::last_serror()); printf("write error: %s\r\n", acl::last_serror());
break; break;
@ -80,6 +94,7 @@ void server_pool_run(const char* addr, bool sync, int nfibers) {
std::atomic<long> nusers(0), nmsgs(0); std::atomic<long> nusers(0), nmsgs(0);
#if 0
go[&nusers, &nmsgs] { go[&nusers, &nmsgs] {
while (true) { while (true) {
std::cout << "client count: " << nusers std::cout << "client count: " << nusers
@ -87,6 +102,7 @@ void server_pool_run(const char* addr, bool sync, int nfibers) {
::sleep(1); ::sleep(1);
} }
}; };
#endif
go[&ss, &box, &nusers, &nmsgs, sync] { go[&ss, &box, &nusers, &nmsgs, sync] {
while (true) { while (true) {
@ -103,6 +119,14 @@ void server_pool_run(const char* addr, bool sync, int nfibers) {
go[&box, &nmsgs, client, sync] { go[&box, &nmsgs, client, sync] {
char buf[4096]; char buf[4096];
while (true) { while (true) {
if (__rtimeo > 0) {
int fd = client->get_conn().sock_handle();
if (acl_read_wait(fd, __rtimeo) < 0) {
printf("read wait error\r\n");
break;
}
}
int ret = client->get_conn().read(buf, sizeof(buf), false); int ret = client->get_conn().read(buf, sizeof(buf), false);
if (ret <= 0) { if (ret <= 0) {
break; break;

View File

@ -1,4 +1,4 @@
#pragma once #pragma once
void server_pool_run(const char* addr, bool sync, int nfibers); void server_pool_run(const char* addr, bool sync, int nfibers, int rtimeo, int wtimeo);
void server_pool2_run(const char* addr, bool sync, int nfibers); void server_pool2_run(const char* addr, bool sync, int nfibers);