add the files been lost for lib_fiber

This commit is contained in:
ubuntu14 2016-06-24 10:49:06 +08:00
parent 1d6b02b579
commit fd96629741
3 changed files with 925 additions and 0 deletions

387
lib_fiber/c/src/channel.c Normal file
View File

@ -0,0 +1,387 @@
/* Copyright (c) 2005 Russ Cox, MIT; see COPYRIGHT */
#include "stdafx.h"
#include "fiber.h"
CHANNEL* channel_create(int elemsize, int bufsize)
{
CHANNEL *c;
c = (CHANNEL *) acl_mycalloc(1, sizeof(*c) + bufsize * elemsize);
c->elemsize = elemsize;
c->bufsize = bufsize;
c->nbuf = 0;
c->buf = (unsigned char *)(c + 1);
return c;
}
/* bug - work out races */
void channel_free(CHANNEL *c)
{
if(c != NULL) {
acl_myfree(c->name);
acl_myfree(c->arecv.a);
acl_myfree(c->asend.a);
acl_myfree(c);
}
}
static void array_add(FIBER_ALT_ARRAY *a, FIBER_ALT *alt)
{
if (a->n == a->m) {
a->m += 16;
a->a = acl_myrealloc(a->a, a->m * sizeof(a->a[0]));
}
a->a[a->n++] = alt;
}
static void array_del(FIBER_ALT_ARRAY *a, int i)
{
--a->n;
a->a[i] = a->a[a->n];
}
/*
* doesn't really work for things other than CHANSND and CHANRCV
* but is only used as arg to channel_array, which can handle it
*/
#define otherop(op) (CHANSND + CHANRCV - (op))
static FIBER_ALT_ARRAY* channel_array(CHANNEL *c, unsigned int op)
{
switch (op) {
case CHANSND:
return &c->asend;
case CHANRCV:
return &c->arecv;
default:
return NULL;
}
}
static int alt_can_exec(FIBER_ALT *a)
{
FIBER_ALT_ARRAY *ar;
CHANNEL *c;
if (a->op == CHANNOP)
return 0;
c = a->c;
if (c->bufsize == 0) {
ar = channel_array(c, otherop(a->op));
return ar && ar->n;
}
switch (a->op) {
default:
return 0;
case CHANSND:
return c->nbuf < c->bufsize;
case CHANRCV:
return c->nbuf > 0;
}
}
static void alt_queue(FIBER_ALT *a)
{
FIBER_ALT_ARRAY *ar;
ar = channel_array(a->c, a->op);
array_add(ar, a);
}
static void alt_dequeue(FIBER_ALT *a)
{
FIBER_ALT_ARRAY *ar;
unsigned int i;
ar = channel_array(a->c, a->op);
if (ar == NULL){
fprintf(stderr, "bad use of altdequeue op=%d\n", a->op);
abort();
}
for (i = 0; i < ar->n; i++) {
if (ar->a[i] == a) {
array_del(ar, i);
return;
}
}
fprintf(stderr, "cannot find self in altdq\n");
abort();
}
static void alt_all_dequeue(FIBER_ALT *a)
{
int i;
for ( i = 0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++) {
if (a[i].op != CHANNOP)
alt_dequeue(&a[i]);
}
}
static void amove(void *dst, void *src, unsigned int n)
{
if (dst) {
if (src == NULL)
memset(dst, 0, n);
else
memmove(dst, src, n);
}
}
/*
* Actually move the data around. There are up to three
* players: the sender, the receiver, and the channel itself.
* If the channel is unbuffered or the buffer is empty,
* data goes from sender to receiver. If the channel is full,
* the receiver removes some from the channel and the sender
* gets to put some in.
*/
static void alt_copy(FIBER_ALT *s, FIBER_ALT *r)
{
FIBER_ALT *t;
CHANNEL *c;
unsigned char *cp;
/*
* Work out who is sender and who is receiver
*/
if (s == NULL && r == NULL)
return;
assert(s != NULL);
c = s->c;
if (s->op == CHANRCV) {
t = s;
s = r;
r = t;
}
assert(s==NULL || s->op == CHANSND);
assert(r==NULL || r->op == CHANRCV);
/*
* CHANNEL is empty (or unbuffered) - copy directly.
*/
if (s && r && c->nbuf == 0) {
amove(r->v, s->v, c->elemsize);
return;
}
/*
* Otherwise it's always okay to receive and then send.
*/
if (r) {
cp = c->buf + c->off*c->elemsize;
amove(r->v, cp, c->elemsize);
--c->nbuf;
if (++c->off == c->bufsize)
c->off = 0;
}
if (s) {
cp = c->buf + (c->off + c->nbuf) % c->bufsize * c->elemsize;
amove(cp, s->v, c->elemsize);
++c->nbuf;
}
}
static void alt_exec(FIBER_ALT *a)
{
FIBER_ALT_ARRAY *ar;
FIBER_ALT *other;
CHANNEL *c;
int i;
c = a->c;
ar = channel_array(c, otherop(a->op));
if (ar && ar->n) {
i = rand() % ar->n;
printf("...i: %d, n: %d\r\n", i, ar->n);
other = ar->a[i];
alt_copy(a, other);
alt_all_dequeue(other->xalt);
other->xalt[0].xalt = other;
fiber_ready(other->fiber);
} else
alt_copy(a, NULL);
}
#define dbgalt 0
static int channel_alt(FIBER_ALT *a)
{
int i, j, ncan, n, canblock;
CHANNEL *c;
FIBER *t;
for (i = 0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++) {}
n = i;
canblock = a[i].op == CHANEND;
t = fiber_running();
for (i = 0; i < n; i++) {
a[i].fiber = t;
a[i].xalt = a;
}
if (dbgalt)
printf("alt ");
ncan = 0;
for (i = 0; i < n; i++) {
c = a[i].c;
if (dbgalt)
printf(" %c:", "esrnb"[a[i].op]);
if (dbgalt) {
if (c->name)
printf("%s", c->name);
else
printf("%p", c);
}
if (alt_can_exec(&a[i])) {
if (dbgalt)
printf("*");
ncan++;
}
}
if (ncan) {
j = rand() % ncan;
for (i = 0; i < n; i++) {
if (!alt_can_exec(&a[i]))
continue;
if (j-- > 0)
continue;
if (dbgalt) {
c = a[i].c;
printf(" => %c:", "esrnb"[a[i].op]);
if(c->name)
printf("%s", c->name);
else
printf("%p", c);
printf("\n");
}
alt_exec(&a[i]);
return i;
}
}
if (dbgalt)
printf("\n");
if (!canblock)
return -1;
for (i = 0; i < n; i++) {
if (a[i].op != CHANNOP)
alt_queue(&a[i]);
}
fiber_switch();
/*
* the guy who ran the op took care of dequeueing us
* and then set a[0].alt to the one that was executed.
*/
return a[0].xalt - a;
}
static int channel_op(CHANNEL *c, int op, void *p, int canblock)
{
FIBER_ALT a[2];
a[0].c = c;
a[0].op = op;
a[0].v = p;
a[1].op = canblock ? CHANEND : CHANNOBLK;
if (channel_alt(a) < 0)
return -1;
return 1;
}
int channel_send(CHANNEL *c, void *v)
{
return channel_op(c, CHANSND, v, 1);
}
int channel_send_nb(CHANNEL *c, void *v)
{
return channel_op(c, CHANSND, v, 0);
}
int channel_recv(CHANNEL *c, void *v)
{
return channel_op(c, CHANRCV, v, 1);
}
int channel_recv_nb(CHANNEL *c, void *v)
{
return channel_op(c, CHANRCV, v, 0);
}
int channel_sendp(CHANNEL *c, void *v)
{
return channel_op(c, CHANSND, (void *) &v, 1);
}
void *channel_recvp(CHANNEL *c)
{
void *v;
channel_op(c, CHANRCV, (void *) &v, 1);
return v;
}
int channel_sendp_nb(CHANNEL *c, void *v)
{
return channel_op(c, CHANSND, (void *) &v, 0);
}
void *channel_recvp_nb(CHANNEL *c)
{
void *v;
channel_op(c, CHANRCV, (void *) &v, 0);
return v;
}
int channel_sendul(CHANNEL *c, ulong val)
{
return channel_op(c, CHANSND, &val, 1);
}
unsigned long channel_recvul(CHANNEL *c)
{
unsigned long val;
channel_op(c, CHANRCV, &val, 1);
return val;
}
int channel_sendul_nb(CHANNEL *c, ulong val)
{
return channel_op(c, CHANSND, &val, 0);
}
unsigned long channel_recvul_nb(CHANNEL *c)
{
unsigned long val;
channel_op(c, CHANRCV, &val, 0);
return val;
}

383
lib_fiber/c/src/event.c Normal file
View File

@ -0,0 +1,383 @@
#include "stdafx.h"
#include <stdio.h>
#include <sys/time.h>
#include <sys/types.h>
#include <stdlib.h>
#include <string.h>
#include <poll.h>
#include <errno.h>
#include "event_epoll.h"
#include "event.h"
EVENT *event_create(int size)
{
int i;
EVENT *ev = event_epoll_create(size);
ev->events = (FILE_EVENT *) acl_mycalloc(size, sizeof(FILE_EVENT));
ev->defers = (DEFER_DELETE *) acl_mycalloc(size, sizeof(FILE_EVENT));
ev->fired = (FIRED_EVENT *) acl_mycalloc(size, sizeof(FIRED_EVENT));
ev->setsize = size;
ev->maxfd = -1;
ev->ndefer = 0;
ev->timeout = -1;
acl_ring_init(&ev->pevents_list);
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it.
*/
for (i = 0; i < size; i++) {
ev->events[i].mask = EVENT_NONE;
ev->events[i].defer = NULL;
}
return ev;
}
/* Return the current set size. */
int event_size(EVENT *ev)
{
return ev->setsize;
}
void event_free(EVENT *ev)
{
FILE_EVENT *events = ev->events;
DEFER_DELETE *defers = ev->defers;
FIRED_EVENT *fired = ev->fired;
ev->free(ev);
acl_myfree(events);
acl_myfree(defers);
acl_myfree(fired);
}
void event_poll(EVENT *ev, POLL_EVENTS *pe, int timeout)
{
int i;
acl_ring_prepend(&ev->pevents_list, &pe->me);
pe->nready = 0;
for (i = 0; i < pe->nfds; i++) {
if (pe->fds[i].events & POLLIN) {
event_add(ev, pe->fds[i].fd, EVENT_READABLE, NULL, pe);
ev->events[pe->fds[i].fd].pevents = pe;
ev->events[pe->fds[i].fd].pfd = &pe->fds[i];
}
if (pe->fds[i].events & POLLOUT) {
event_add(ev, pe->fds[i].fd, EVENT_WRITABLE, NULL, pe);
ev->events[pe->fds[i].fd].pevents = pe;
ev->events[pe->fds[i].fd].pfd = &pe->fds[i];
}
pe->fds[i].revents = 0;
}
if (timeout > 0) {
if (ev->timeout < 0 || timeout < ev->timeout)
ev->timeout = timeout;
}
}
static int check_fdtype(int fd)
{
struct stat buf;
if (fstat(fd, &buf) < 0)
{
acl_msg_info("fd: %d fstat error", fd);
return -1;
}
/*
acl_msg_info("fd: %d, S_ISSOCK: %s, S_ISFIFO: %s, S_ISCHR: %s, "
"S_ISBLK: %s, S_ISREG: %s", fd,
S_ISSOCK(buf.st_mode) ? "yes" : "no",
S_ISFIFO(buf.st_mode) ? "yes" : "no",
S_ISCHR(buf.st_mode) ? "yes" : "no",
S_ISBLK(buf.st_mode) ? "yes" : "no",
S_ISREG(buf.st_mode) ? "yes" : "no");
*/
if (S_ISSOCK(buf.st_mode)
|| S_ISFIFO(buf.st_mode)
|| S_ISCHR(buf.st_mode))
{
return 0;
}
return -1;
}
int event_add(EVENT *ev, int fd, int mask, event_proc *proc, void *ctx)
{
FILE_EVENT *fe;
if (fd >= ev->setsize) {
acl_msg_error("fd: %d >= setsize: %d", fd, ev->setsize);
errno = ERANGE;
return -1;
}
fe = &ev->events[fd];
if (fe->defer != NULL) {
int fd2, pos = fe->defer->pos;
int to_mask = mask | (fe->mask & ~(ev->defers[pos].mask));
assert(to_mask != 0);
ev->ndefer--;
fd2 = ev->defers[ev->ndefer].fd;
if (ev->ndefer > 0) {
ev->defers[pos].mask = ev->defers[ev->ndefer].mask;
ev->defers[pos].pos = pos;
ev->defers[pos].fd = fd2;
ev->events[fd2].defer = &ev->defers[pos];
} else {
if (fd2 >= 0)
ev->events[fd2].defer = NULL;
ev->defers[0].mask = EVENT_NONE;
ev->defers[0].pos = 0;
}
if (ev->add(ev, fd, to_mask) == -1) {
acl_msg_error("mod fd(%d) error: %s",
fd, acl_last_serror());
return -1;
}
ev->defers[ev->ndefer].fd = -1;
fe->defer = NULL;
fe->mask = to_mask;
} else {
if (fe->type == TYPE_NONE) {
if (check_fdtype(fd) < 0) {
fe->type = TYPE_NOSOCK;
return 0;
}
fe->type = TYPE_SOCK;
} else if (fe->type == TYPE_NOSOCK)
return 0;
if (ev->add(ev, fd, mask) == -1) {
acl_msg_error("add fd(%d) error: %s",
fd, acl_last_serror());
return -1;
}
fe->mask |= mask;
}
if (mask & EVENT_READABLE)
fe->r_proc = proc;
if (mask & EVENT_WRITABLE)
fe->w_proc = proc;
fe->ctx = ctx;
fe->pevents = NULL;
fe->pfd = NULL;
if (fd > ev->maxfd)
ev->maxfd = fd;
return 1;
}
static void __event_del(EVENT *ev, int fd, int mask)
{
FILE_EVENT *fe;
if (fd >= ev->setsize) {
acl_msg_error("fd: %d >= setsize: %d", fd, ev->setsize);
errno = ERANGE;
return;
}
fe = &ev->events[fd];
fe->type = TYPE_NONE;
fe->defer = NULL;
fe->pevents = NULL;
fe->pfd = NULL;
if (fe->mask == EVENT_NONE)
{
acl_msg_info("----mask NONE, fd: %d----", fd);
return;
}
ev->del(ev, fd, mask);
fe->mask = fe->mask & (~mask);
if (fd == ev->maxfd && fe->mask == EVENT_NONE) {
/* Update the max fd */
int j;
for (j = ev->maxfd - 1; j >= 0; j--)
if (ev->events[j].mask != EVENT_NONE)
break;
ev->maxfd = j;
}
}
#define DEL_DELAY
void event_del(EVENT *ev, int fd, int mask)
{
FILE_EVENT *fe;
fe = &ev->events[fd];
if (fe->type == TYPE_NOSOCK) {
fe->type = TYPE_NONE;
return;
}
#ifdef DEL_DELAY
if ((mask & EVENT_ERROR) == 0) {
ev->defers[ev->ndefer].fd = fd;
ev->defers[ev->ndefer].mask = mask;
ev->defers[ev->ndefer].pos = ev->ndefer;
ev->events[fd].defer = &ev->defers[ev->ndefer];
ev->ndefer++;
return;
}
#endif
if (fe->defer != NULL) {
int fd2;
ev->ndefer--;
fd2 = ev->defers[ev->ndefer].fd;
if (ev->ndefer > 0) {
int pos = fe->defer->pos;
ev->defers[pos].mask = ev->defers[ev->ndefer].mask;
ev->defers[pos].pos = fe->defer->pos;
ev->defers[pos].fd = fd2;
ev->events[fd2].defer = &ev->defers[pos];
} else {
if (fd2 >= 0)
ev->events[fd2].defer = NULL;
ev->defers[0].mask = EVENT_NONE;
ev->defers[0].pos = 0;
}
ev->defers[ev->ndefer].fd = -1;
fe->defer = NULL;
}
#ifdef DEL_DELAY
__event_del(ev, fd, fe->mask);
#else
__event_del(ev, fd, mask);
#endif
}
int event_mask(EVENT *ev, int fd)
{
if (fd >= ev->setsize)
return 0;
return ev->events[fd].mask;
}
int event_process(EVENT *ev, int left)
{
int processed = 0, numevents, j;
struct timeval tv, *tvp;
int mask, fd, rfired, ndefer;
FILE_EVENT *fe;
if (ev->timeout < 0) {
if (left < 0) {
tv.tv_sec = 1;
tv.tv_usec = 0;
} else {
tv.tv_sec = left / 1000;
tv.tv_usec = (left - tv.tv_sec * 1000) * 1000;
}
} else if (left < 0) {
tv.tv_sec = ev->timeout / 1000;
tv.tv_usec = (ev->timeout - tv.tv_sec * 1000) * 1000;
} else if (left < ev->timeout) {
tv.tv_sec = left / 1000;
tv.tv_usec = (left - tv.tv_sec * 1000) * 1000;
} else {
tv.tv_sec = ev->timeout / 1000;
tv.tv_usec = (ev->timeout - tv.tv_sec * 1000) * 1000;
}
tvp = &tv;
ndefer = ev->ndefer;
for (j = 0; j < ndefer; j++) {
__event_del(ev, ev->defers[j].fd, ev->defers[j].mask);
ev->events[ev->defers[j].fd].defer = NULL;
ev->defers[j].fd = -1;
ev->ndefer--;
}
numevents = ev->loop(ev, tvp);
for (j = 0; j < numevents; j++) {
fd = ev->fired[j].fd;
mask = ev->fired[j].mask;
fe = &ev->events[fd];
if (fe->pevents != NULL) {
if (fe->mask & mask & EVENT_READABLE) {
fe->pfd->revents |= POLLIN;
fe->pevents->nready++;
}
if (fe->mask & mask & EVENT_WRITABLE) {
fe->pfd->revents |= POLLOUT;
fe->pevents->nready++;
}
continue;
}
/* note the fe->mask & mask & ... code: maybe an already
* processed event removed an element that fired and we
* still didn't processed, so we check if the event is
* still valid.
*/
if (fe->mask & mask & EVENT_READABLE) {
rfired = 1;
fe->r_proc(ev, fd, fe->ctx, mask);
} else
rfired = 0;
if (fe->mask & mask & EVENT_WRITABLE) {
if (!rfired || fe->w_proc != fe->r_proc)
fe->w_proc(ev, fd, fe->ctx, mask);
}
processed++;
}
acl_ring_foreach(ev->iter, &ev->pevents_list) {
POLL_EVENTS *pe = acl_ring_to_appl(ev->iter.ptr,
POLL_EVENTS, me);
pe->proc(ev, pe);
processed++;
}
acl_ring_init(&ev->pevents_list);
/* return the number of processed file/time events */
return processed;
}

View File

@ -0,0 +1,155 @@
#include "stdafx.h"
#include <sys/epoll.h>
#include "fiber.h"
#include "event.h"
#include "event_epoll.h"
typedef struct EVENT_EPOLL {
EVENT event;
int epfd;
struct epoll_event *epoll_events;
} EVENT_EPOLL;
static void epoll_event_free(EVENT *ev)
{
EVENT_EPOLL *ep = (EVENT_EPOLL *) ev;
close(ep->epfd);
acl_myfree(ep->epoll_events);
acl_myfree(ep);
}
static int epoll_event_add(EVENT *ev, int fd, int mask)
{
EVENT_EPOLL *ep = (EVENT_EPOLL *) ev;
struct epoll_event ee;
int op;
if ((ev->events[fd].mask & mask) == mask)
return 0;
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
op = ev->events[fd].mask == EVENT_NONE
? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
ee.data.u64 = 0;
ee.data.ptr = NULL;
ee.data.fd = fd;
mask |= ev->events[fd].mask; /* Merge old events */
if (mask & EVENT_READABLE)
ee.events |= EPOLLIN;
if (mask & EVENT_WRITABLE)
ee.events |= EPOLLOUT;
#if 0
#ifdef EPOLLRDHUP
ee.events |= EPOLLERR | EPOLLRDHUP | EPOLLHUP;
#else
ee.events |= EPOLLERR | EPOLLHUP;
#endif
#endif
if (epoll_ctl(ep->epfd, op, fd, &ee) == -1) {
fiber_save_errno();
acl_msg_error("%s, %s(%d): epoll_ctl error %s",
__FILE__, __FUNCTION__, __LINE__, acl_last_serror());
return -1;
}
return 0;
}
static void epoll_event_del(EVENT *ev, int fd, int delmask)
{
EVENT_EPOLL *ep = (EVENT_EPOLL *) ev;
struct epoll_event ee;
int mask = ev->events[fd].mask & (~delmask);
ee.events = 0;
ee.data.u64 = 0;
ee.data.ptr = NULL;
ee.data.fd = fd;
if (mask & EVENT_READABLE)
ee.events |= EPOLLIN;
if (mask & EVENT_WRITABLE)
ee.events |= EPOLLOUT;
if (mask != EVENT_NONE) {
if (epoll_ctl(ep->epfd, EPOLL_CTL_MOD, fd, &ee) < 0) {
fiber_save_errno();
acl_msg_error("%s(%d), epoll_ctl error: %s, fd: %d",
__FUNCTION__, __LINE__, acl_last_serror(), fd);
}
} else {
/* Note, Kernel < 2.6.9 requires a non null event pointer
* even for EPOLL_CTL_DEL.
*/
if (epoll_ctl(ep->epfd, EPOLL_CTL_DEL, fd, &ee) < 0) {
fiber_save_errno();
acl_msg_error("%s(%d), epoll_ctl error: %s, fd: %d",
__FUNCTION__, __LINE__, acl_last_serror(), fd);
}
}
}
static int epoll_event_loop(EVENT *ev, struct timeval *tv)
{
EVENT_EPOLL *ep = (EVENT_EPOLL *) ev;
int retval, j, mask;
struct epoll_event *e;
retval = epoll_wait(ep->epfd, ep->epoll_events, ev->setsize,
tv ? (tv->tv_sec * 1000 + tv->tv_usec / 1000) : -1);
if (retval <= 0)
return retval;
for (j = 0; j < retval; j++) {
mask = 0;
e = ep->epoll_events + j;
if (e->events & EPOLLIN)
mask |= EVENT_READABLE;
if (e->events & EPOLLOUT)
mask |= EVENT_WRITABLE;
if (e->events & EPOLLERR || e->events & EPOLLHUP ) {
if (ev->events[e->data.fd].mask & EVENT_READABLE)
mask |= EVENT_READABLE;
if (ev->events[e->data.fd].mask & EVENT_WRITABLE)
mask |= EVENT_WRITABLE;
}
ev->fired[j].fd = e->data.fd;
ev->fired[j].mask = mask;
}
return retval;
}
static const char *epoll_event_name(void)
{
return "epoll";
}
EVENT *event_epoll_create(int setsize)
{
EVENT_EPOLL *ep = (EVENT_EPOLL *) acl_mymalloc(sizeof(EVENT_EPOLL));
ep->epoll_events = (struct epoll_event *)
acl_mymalloc(sizeof(struct epoll_event) * setsize);
ep->epfd = epoll_create(1024);
acl_assert(ep->epfd >= 0);
ep->event.name = epoll_event_name;
ep->event.loop = epoll_event_loop;
ep->event.add = epoll_event_add;
ep->event.del = epoll_event_del;
ep->event.free = epoll_event_free;
return (EVENT*) ep;
}