test iocp for fiber module

This commit is contained in:
郑树新 2021-10-07 23:13:21 +08:00
parent 667c2f27f0
commit fca21901ac
6 changed files with 36 additions and 6 deletions

View File

@ -26,7 +26,7 @@ class aio_istream;
class ACL_CPP_API aio_timer_reader : public aio_timer_callback
{
public:
aio_timer_reader(void) {}
aio_timer_reader(void) : in_(NULL) {}
/**
* aio_istream

View File

@ -88,10 +88,13 @@ static int iocp_close_sock(EVENT_IOCP *ev, FILE_EVENT *fe)
* the GetQueuedCompletionStatus process.
*/
if (ok) {
printf(">>>0-has over, fe=%p, e=%p, %p, reader=%p\r\n",
fe,&ev->event, ev, fe->reader);
mem_free(fe->reader);
} else {
fe->reader->type = IOCP_EVENT_DEAD;
fe->reader->fe = NULL;
printf(">>>0-has not over\r\n");
}
fe->reader = NULL;
}
@ -103,9 +106,11 @@ static int iocp_close_sock(EVENT_IOCP *ev, FILE_EVENT *fe)
*/
if (HasOverlappedIoCompleted(&fe->writer->overlapped)) {
mem_free(fe->writer);
printf(">>>has over\r\n");
} else {
fe->writer->type = IOCP_EVENT_DEAD;
fe->writer->fe = NULL;
printf(">>>not over\r\n");
}
fe->writer = NULL;
@ -182,10 +187,10 @@ static int iocp_add_read(EVENT_IOCP *ev, FILE_EVENT *fe)
if (fe->reader == NULL) {
fe->reader = (IOCP_EVENT*) mem_malloc(sizeof(IOCP_EVENT));
fe->reader->fe = fe;
memset(&fe->reader->overlapped, 0, sizeof(fe->reader->overlapped));
}
fe->reader->type = IOCP_EVENT_READ;
memset(&fe->reader->overlapped, 0, sizeof(fe->reader->overlapped));
if (is_listen_socket(fe->fd)) {
return iocp_add_listen(ev, fe);
@ -196,8 +201,14 @@ static int iocp_add_read(EVENT_IOCP *ev, FILE_EVENT *fe)
ret = WSARecv(fe->fd, &wsaData, 1, &len, &flags,
(OVERLAPPED*) &fe->reader->overlapped, NULL);
fe->len = (int) len;
if (!HasOverlappedIoCompleted(&fe->reader->overlapped)) {
printf("reader padding, fe->buf=%p, size=%d\r\n", fe->buf, fe->size);
//return 0;
}
if (ret != SOCKET_ERROR) {
fe->mask |= EVENT_READ;
return 1;
@ -207,6 +218,7 @@ static int iocp_add_read(EVENT_IOCP *ev, FILE_EVENT *fe)
} else {
msg_warn("%s(%d): ReadFile error(%s), fd=%d",
__FUNCTION__, __LINE__, last_serror(), fe->fd);
printf(">>>read error=%s\r\n", last_serror());
fe->mask |= EVENT_ERROR;
array_append(ev->readers, fe);
return -1;
@ -387,7 +399,11 @@ static int iocp_wait(EVENT *ev, int timeout)
continue;
}
assert(fe == event->fe);
if (fe != event->fe) {
printf("fe=%p, reader=%p, null=%s, e=%p, e->fe=%p\n",
fe, fe->reader, fe->reader ? "no":"yes", event, event->fe);
assert(fe == event->fe);
}
if (fe->mask & EVENT_ERROR) {
continue;

View File

@ -535,6 +535,7 @@ int fiber_file_close(socket_t fd, int *closed)
}
if (fe->fiber == acl_fiber_running()) {
printf(">>>>>free fe=%p<<<\r\n", fe);
file_event_free(fe);
} else {
fe->fiber->errnum = ECANCELED;

View File

@ -227,8 +227,11 @@ static int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
while (1) {
int err;
fe->mask &= ~EVENT_READ;
fiber_wait_read(fe);
if (fe->mask & EVENT_ERROR) {
err = acl_fiber_last_error();
fiber_save_errno(err);
return -1;
}
@ -237,10 +240,16 @@ static int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len)
return -1;
}
if (fe->len >= 0) {
return fe->len;
}
err = acl_fiber_last_error();
fiber_save_errno(err);
return fe->len;
if (!error_again(err)) {
return -1;
}
}
}

View File

@ -108,6 +108,8 @@ static POLLFD *pollfd_alloc(POLL_EVENT *pe, struct pollfd *fds, nfds_t nfds)
for (i = 0; i < nfds; i++) {
pfds[i].fe = fiber_file_open(fds[i].fd);
pfds[i].fe->buf = NULL;
pfds[i].fe->size = 0;
pfds[i].pe = pe;
pfds[i].pfd = &fds[i];
}

View File

@ -7,6 +7,7 @@
static void client_echo(acl::socket_stream* conn) {
acl::string buf;
while (true) {
#if 1
struct timeval begin, end;
gettimeofday(&begin, NULL);
int ret = acl_readable(conn->sock_handle());
@ -14,12 +15,13 @@ static void client_echo(acl::socket_stream* conn) {
double cost = acl::stamp_sub(end, begin);
if (ret == 0) {
printf("not readable, cost=%.2f\r\n", cost);
//printf("not readable, cost=%.2f\r\n", cost);
} else if (ret == 1) {
printf("readable, cost=%.2f\r\n", cost);
} else {
printf("readable error\r\n");
}
#endif
if (!conn->gets(buf, false)) {
printf("client read error %s\r\n", acl::last_serror());
@ -57,7 +59,7 @@ static void usage(const char* procname) {
int main(int argc, char *argv[]) {
int ch;
acl::string addr = "127.0.0.1:9000";
acl::string addr = "0.0.0.0:9000";
acl::acl_cpp_init();
acl::log::stdout_open(true);