fixed one bug in fiber.c for memory leak

add some samples for lib_liber
This commit is contained in:
ubuntu14 2016-11-27 21:03:24 +08:00
parent 2c3b988fcf
commit bcb2f1b85c
57 changed files with 2735 additions and 124 deletions

View File

@ -30,6 +30,7 @@ CFLAGS = -c -g -W \
-D_POSIX_PTHREAD_SEMANTICS \
-DACL_PREPARE_COMPILE \
-Winvalid-pch
#-DDEBUG_MEM
#-DUSE_EPOLL \
#-Wno-tautological-compare \
#-Wno-invalid-source-encoding \

View File

@ -1,5 +1,8 @@
修改历史列表:
569) 2016.11.27
569.1) feature: 增加了内存调试函数 acl_default_meminfo 用来显示内存分配及释放情况
568) 2016.11.18
568.1) bugfix: acl_vstream.c 中的函数 acl_vstream_fdopen 设置 IO 句柄为非阻塞
模式是存在问题的,上层应用应该自己决定是否设为非阻塞模式

View File

@ -68,6 +68,8 @@ ACL_API int acl_mempool_total_allocated(void);
ACL_API void acl_default_memstat(const char *filename, int line,
void *ptr, size_t *len, size_t *real_len);
ACL_API void acl_default_meminfo(void);
/**
*
* 100000000

View File

@ -268,10 +268,12 @@ int acl_read_wait(ACL_SOCKET fd, int timeout)
acl_last_serror(), (int) fd);
return -1;
case 0:
/*
acl_msg_warn("%s(%d), %s: poll timeout: %s, fd: %d, "
"delay: %d, spent: %ld", __FILE__, __LINE__,
myname, acl_last_serror(), fd, delay,
(long) (time(NULL) - begin));
*/
acl_set_error(ACL_ETIMEDOUT);
return -1;
default:

View File

@ -126,6 +126,17 @@ static char empty_string[] = "";
} while (0)
#endif /* ACL_WINDOWS */
#ifdef DEBUG_MEM
static __thread int __nmalloc = 0;
static __thread int __ncalloc = 0;
static __thread int __nrealloc = 0;
static __thread int __nfree = 0;
static __thread int __nstrdup = 0;
static __thread int __nstrndup = 0;
static __thread int __nmemdup = 0;
static __thread ssize_t __nsize = 0;
#endif
void acl_default_memstat(const char *filename, int line,
void *ptr, size_t *len, size_t *real_len)
{
@ -145,6 +156,17 @@ void acl_default_memstat(const char *filename, int line,
*real_len = SPACE_FOR(*len);
}
void acl_default_meminfo(void)
{
#ifdef DEBUG_MEM
printf("%s(%d): __nmalloc: %d, __ncalloc: %d, __nrealloc: %d, "
"__nfree: %d, diff: %d, __nsize: %ld\r\n",
__FUNCTION__, __LINE__, __nmalloc, __ncalloc, __nrealloc,
__nfree, __nmalloc + __nrealloc - __nfree,
(unsigned long) __nsize);
#endif
}
void acl_default_set_memlimit(size_t len)
{
acl_assert(len > 0);
@ -163,6 +185,7 @@ void *acl_default_malloc(const char *filename, int line, size_t len)
char *ptr;
MBLOCK *real_ptr;
const char *pname = NULL;
#if 0
printf("%s:%d, len: %d\r\n", filename, line, (int) len);
acl_trace_info();
@ -190,6 +213,11 @@ void *acl_default_malloc(const char *filename, int line, size_t len)
acl_trace_info();
}
#ifdef DEBUG_MEM
__nmalloc++;
__nsize += new_len;
#endif
#ifdef _USE_GLIB
if ((real_ptr = (MBLOCK *) g_malloc(new_len)) == 0) {
acl_msg_error("%s(%d)->%s: new_len: %d, g_malloc error(%s)",
@ -204,11 +232,8 @@ void *acl_default_malloc(const char *filename, int line, size_t len)
return 0;
}
#endif
CHECK_OUT_PTR(ptr, real_ptr, len);
#if 0
memset(ptr, FILLER, len);
#endif
CHECK_OUT_PTR(ptr, real_ptr, len);
return ptr;
}
@ -218,6 +243,9 @@ void *acl_default_calloc(const char *filename, int line,
void *ptr;
int n;
#ifdef DEBUG_MEM
__ncalloc++;
#endif
n = (int) (nmemb * size);
ptr = acl_default_malloc(filename, line, n);
memset(ptr, FILLER, n);
@ -266,6 +294,11 @@ void *acl_default_realloc(const char *filename, int line,
acl_trace_info();
}
#ifdef DEBUG_MEM
__nrealloc++;
__nsize += new_len - old_len;
#endif
#ifdef _USE_GLIB
if ((real_ptr = (MBLOCK *) g_realloc((char *) real_ptr, new_len)) == 0)
acl_msg_fatal("%s(%d)->%s: realloc: insufficient memory: %s",
@ -310,6 +343,12 @@ void acl_default_free(const char *filename, int line, void *ptr)
/*
memset((char *) real_ptr, FILLER, SPACE_FOR(len));
*/
#ifdef DEBUG_MEM
__nfree++;
__nsize -= len;
#endif
#ifdef _USE_GLIB
g_free(real_ptr);
#else
@ -340,6 +379,11 @@ char *acl_default_strdup(const char *filename, int line, const char *str)
if (*str == 0)
return (char *) empty_string;
#endif
#ifdef DEBUG_MEM
__nstrdup++;
#endif
return strcpy(acl_default_malloc(pname, line, strlen(str) + 1), str);
}
@ -366,8 +410,14 @@ char *acl_default_strndup(const char *filename, int line,
if (*str == 0)
return (char *) empty_string;
#endif
if ((cp = memchr(str, 0, len)) != 0)
len = cp - str;
#ifdef DEBUG_MEM
__nstrndup++;
#endif
result = memcpy(acl_default_malloc(pname, line, len + 1), str, len);
result[len] = 0;
return result;
@ -389,5 +439,10 @@ void *acl_default_memdup(const char *filename, int line,
if (ptr == 0)
acl_msg_fatal("%s(%d)->%s: null pointer argument",
pname, line, myname);
#ifdef DEBUG_MEM
__nmemdup++;
#endif
return memcpy(acl_default_malloc(pname, line, len), ptr, len);
}

View File

@ -1,6 +1,9 @@
修改历史列表:
-----------------------------------------------------------------------
453) 2016.11.24
453.1) bugfix: istream.cpp 中的 gets 函数不应当读超时时设置 eof_ 标志位
452) 2016.11.18
452.1) bugfix: redis_list::bpop 及 redis_pubsub::get_message 的超时时间应
不受 socket_stream 里的超时时间影响

View File

@ -130,8 +130,10 @@ public:
* get the value of a key
* @param key {const char*} key
* the key of a string
* @param buf {string&}
* store the value of a key after GET executed correctly
* @param buf {string&} true
* key
* store the value of a key after GET executed correctly, key not
* exist if the buf is empty when return true
* @return {bool} false key
* if the GET was executed correctly, false if error happened or
* is is not a string of the key

View File

@ -7,6 +7,21 @@
namespace acl {
#if ACL_EWOULDBLOCK == ACL_EAGAIN
# define CHECK_ERROR(e) do { \
if (e != ACL_EINTR && e != ACL_ETIMEDOUT && e != ACL_EWOULDBLOCK) \
eof_ = true; \
} while (0)
#else
# define CHECK_ERROR(e) do { \
if (e != ACL_EINTR && e != ACL_ETIMEDOUT \
&& e != ACL_EWOULDBLOCK && e != ACL_EAGAIN) \
{ \
eof_ = true; \
} \
} while (0)
#endif
int istream::read(void* buf, size_t size, bool loop /* = true */)
{
int ret;
@ -16,7 +31,7 @@ int istream::read(void* buf, size_t size, bool loop /* = true */)
ret = acl_vstream_read(stream_, buf, size);
if (ret == ACL_VSTREAM_EOF)
{
eof_ = true;
CHECK_ERROR(errno);
return -1;
} else
return ret;
@ -28,7 +43,7 @@ bool istream::readtags(void *buf, size_t* size, const char *tag, size_t taglen)
if (ret == ACL_VSTREAM_EOF)
{
*size = 0;
eof_ = true;
CHECK_ERROR(errno);
return false;
}
@ -40,25 +55,6 @@ bool istream::readtags(void *buf, size_t* size, const char *tag, size_t taglen)
return false;
}
bool istream::gets(void* buf, size_t* size, bool nonl /* = true */)
{
int ret;
if (nonl)
ret = acl_vstream_gets_nonl(stream_, buf, *size);
else
ret = acl_vstream_gets(stream_, buf, *size);
if (ret == ACL_VSTREAM_EOF) {
eof_ = true;
*size = 0;
return false;
} else {
*size = ret;
if ((stream_->flag & ACL_VSTREAM_FLAG_TAGYES))
return true;
return false;
}
}
bool istream::read(acl_int64& n, bool loop /* = true */)
{
return read(&n, sizeof(n), loop) == (int) sizeof(n);
@ -113,6 +109,25 @@ bool istream::read(string* s, size_t max, bool loop /* = true */)
return read(*s, max, loop);
}
bool istream::gets(void* buf, size_t* size, bool nonl /* = true */)
{
int ret;
if (nonl)
ret = acl_vstream_gets_nonl(stream_, buf, *size);
else
ret = acl_vstream_gets(stream_, buf, *size);
if (ret == ACL_VSTREAM_EOF) {
CHECK_ERROR(errno);
*size = 0;
return false;
} else {
*size = ret;
if ((stream_->flag & ACL_VSTREAM_FLAG_TAGYES))
return true;
return false;
}
}
bool istream::gets(string& s, bool nonl /* = true */, size_t max /* = 0 */)
{
char buf[8192];
@ -132,8 +147,12 @@ bool istream::gets(string& s, bool nonl /* = true */, size_t max /* = 0 */)
return true;
}
if (size > 0)
s.append(buf, size);
if (size == 0)
break;
printf(">>>size: %d\r\n", (int) size);
s.append(buf, size);
}
return false;
@ -151,8 +170,10 @@ bool istream::gets(string& s, bool nonl /* = true */, size_t max /* = 0 */)
return true;
}
if (size > 0)
s.append(buf, size);
if (size == 0)
break;
s.append(buf, size);
max -= size;
if (max == 0)
@ -187,9 +208,13 @@ bool istream::readtags(string& s, const string& tag)
s.append(buf, size);
return true;
}
if (size > 0)
s.append(buf, size);
if (size == 0)
break;
s.append(buf, size);
}
return false;
}
@ -203,15 +228,15 @@ int istream::getch()
{
int ret = acl_vstream_getc(stream_);
if (ret == ACL_VSTREAM_EOF)
eof_ = true;
CHECK_ERROR(errno);
return ret;
}
int istream::ugetch(int ch)
{
int ret = acl_vstream_ungetc(stream_, ch);
int ret = acl_vstream_ungetc(stream_, ch);
if (ret == ACL_VSTREAM_EOF)
eof_ = true;
CHECK_ERROR(errno);
return ret;
}
@ -242,6 +267,7 @@ bool istream::gets_peek(string& buf, bool nonl /* = true */,
eof_ = true;
}
}
return ready ? true : false;
}

View File

@ -18,8 +18,8 @@ CFLAGS = -c -g -W \
-D_POSIX_PTHREAD_SEMANTICS \
-D_USE_FAST_MACRO \
-Wno-long-long \
-DUSE_JMP
#-DUSE_VALGRIND
-DUSE_JMP \
-DUSE_VALGRIND
#-Wno-clobbered
#-O3

View File

@ -26,6 +26,18 @@ void acl_fiber_hook_api(int onoff);
ACL_FIBER *acl_fiber_create(void (*fn)(ACL_FIBER *, void *),
void *arg, size_t size);
/**
* 线
* @retur {int}
*/
int acl_fiber_ndead(void);
/**
*
* @retur {ACL_FIBER*} NULL
*/
ACL_FIBER *acl_fiber_running(void);
/**
* ID
* @param fiber {const ACL_FIBER*} acl_fiber_create
@ -53,6 +65,14 @@ void acl_fiber_set_errno(ACL_FIBER *fiber, int errnum);
*/
int acl_fiber_errno(ACL_FIBER *fiber);
/**
*
* errno
* @param fiber {ACL_FIBER*}
* @param yesno {int}
*/
void acl_fiber_keep_errno(ACL_FIBER *fiber, int yesno);
/**
*
* @param fiber {const ACL_FIBER*}
@ -60,6 +80,18 @@ int acl_fiber_errno(ACL_FIBER *fiber);
*/
int acl_fiber_status(const ACL_FIBER *fiber);
/**
* 退
* @param fiber {const ACL_FIBER*}
*/
void acl_fiber_kill(ACL_FIBER *fiber);
/**
* 退
* @param fiber {const ACL_FIBER*}
*/
int acl_fiber_killed(ACL_FIBER *fiber);
/**
*
* @return {int}

View File

@ -229,7 +229,7 @@ static int channel_alt(FIBER_ALT a[])
n = i;
canblock = a[i].op == CHANEND;
t = fiber_running();
t = acl_fiber_running();
for (i = 0; i < n; i++) {
a[i].fiber = t;
@ -297,6 +297,9 @@ static int channel_alt(FIBER_ALT a[])
acl_fiber_switch();
if (acl_fiber_killed(t))
return -1;
/*
* the guy who ran the op took care of dequeueing us
* and then set a[0].alt to the one that was executed.
@ -345,9 +348,10 @@ int acl_channel_sendp(ACL_CHANNEL *c, void *v)
void *acl_channel_recvp(ACL_CHANNEL *c)
{
void *v;
void *v = NULL;
channel_op(c, CHANRCV, (void *) &v, 1);
if (channel_op(c, CHANRCV, (void *) &v, 1) < 0)
return NULL;
return v;
}
@ -358,9 +362,10 @@ int acl_channel_sendp_nb(ACL_CHANNEL *c, void *v)
void *acl_channel_recvp_nb(ACL_CHANNEL *c)
{
void *v;
void *v = NULL;
channel_op(c, CHANRCV, (void *) &v, 0);
if (channel_op(c, CHANRCV, (void *) &v, 0) < 0)
return NULL;
return v;
}
@ -371,9 +376,10 @@ int acl_channel_sendul(ACL_CHANNEL *c, unsigned long val)
unsigned long acl_channel_recvul(ACL_CHANNEL *c)
{
unsigned long val;
unsigned long val = (unsigned long) -1;
channel_op(c, CHANRCV, &val, 1);
if (channel_op(c, CHANRCV, &val, 1) < 0)
return (unsigned long) -1;
return val;
}
@ -386,6 +392,7 @@ unsigned long acl_channel_recvul_nb(ACL_CHANNEL *c)
{
unsigned long val;
channel_op(c, CHANRCV, &val, 0);
if (channel_op(c, CHANRCV, &val, 0) < 0)
return (unsigned long) -1;
return val;
}

View File

@ -71,7 +71,8 @@ static int check_fdtype(int fd)
struct stat s;
if (fstat(fd, &s) < 0) {
acl_msg_info("fd: %d fstat error", fd);
acl_msg_info("%s(%d), %s: fd: %d fstat error %s", __FILE__,
__LINE__, __FUNCTION__, fd, acl_last_serror());
return -1;
}

View File

@ -10,6 +10,8 @@
#include "event_epoll.h" /* just for hook_epoll */
#include "fiber.h"
#define MAX_CACHE 100
typedef int *(*errno_fn)(void);
typedef int (*fcntl_fn)(int, int, ...);
@ -21,6 +23,7 @@ typedef struct {
ACL_RING dead; /* dead fiber queue */
ACL_FIBER **fibers;
size_t size;
size_t slot;
int exitcode;
ACL_FIBER *running;
ACL_FIBER original;
@ -39,6 +42,7 @@ static acl_pthread_key_t __fiber_key;
/* forward declare */
static ACL_FIBER *fiber_alloc(void (*fn)(ACL_FIBER *, void *),
void *arg, size_t size);
static void fiber_free(ACL_FIBER *fiber);
void acl_fiber_hook_api(int onoff)
{
@ -99,6 +103,7 @@ static void fiber_check(void)
#endif
__thread_fiber->fibers = NULL;
__thread_fiber->size = 0;
__thread_fiber->slot = 0;
__thread_fiber->idgen = 0;
__thread_fiber->count = 0;
@ -174,10 +179,17 @@ void acl_fiber_set_errno(ACL_FIBER *fiber, int errnum)
int acl_fiber_errno(ACL_FIBER *fiber)
{
fiber->flag |= FIBER_F_SAVE_ERRNO;
return fiber->errnum;
}
void acl_fiber_keep_errno(ACL_FIBER *fiber, int yesno)
{
if (yesno)
fiber->flag |= FIBER_F_SAVE_ERRNO;
else
fiber->flag &= ~FIBER_F_SAVE_ERRNO;
}
void fiber_save_errno(void)
{
ACL_FIBER *curr;
@ -189,14 +201,14 @@ void fiber_save_errno(void)
curr = &__thread_fiber->original;
if (curr->flag & FIBER_F_SAVE_ERRNO) {
curr->flag &= ~FIBER_F_SAVE_ERRNO;
//curr->flag &= ~FIBER_F_SAVE_ERRNO;
return;
}
if (__sys_errno != NULL)
curr->errnum = *__sys_errno();
acl_fiber_set_errno(curr, *__sys_errno());
else
curr->errnum = errno;
acl_fiber_set_errno(curr, errno);
}
#if defined(__x86_64__)
@ -286,8 +298,45 @@ void fiber_save_errno(void)
siglongjmp(ctx, 1)
#endif
static void fiber_kick(int max)
{
ACL_RING *head;
ACL_FIBER *fiber;
while (max > 0) {
head = acl_ring_pop_head(&__thread_fiber->dead);
if (head == NULL)
break;
fiber = ACL_RING_TO_APPL(head, ACL_FIBER,me);
fiber_free(fiber);
max--;
}
}
static void fiber_swap(ACL_FIBER *from, ACL_FIBER *to)
{
if (from->status == FIBER_STATUS_EXITING) {
size_t slot = from->slot;
int n = acl_ring_size(&__thread_fiber->dead);
/* if the cached dead fibers reached the limit,
* some will be freed
*/
if (n > MAX_CACHE) {
n -= MAX_CACHE;
fiber_kick(n);
}
if (!from->sys)
__thread_fiber->count--;
__thread_fiber->fibers[slot] =
__thread_fiber->fibers[--__thread_fiber->slot];
__thread_fiber->fibers[slot]->slot = slot;
acl_ring_prepend(&__thread_fiber->dead, &from->me);
}
#ifdef USE_JMP
/* use setcontext() for the initial jump, as it allows us to set up
* a stack, but continue with longjmp() as it's much faster.
@ -309,12 +358,36 @@ static void fiber_swap(ACL_FIBER *from, ACL_FIBER *to)
#endif
}
ACL_FIBER *fiber_running(void)
ACL_FIBER *acl_fiber_running(void)
{
fiber_check();
return __thread_fiber->running;
}
void acl_fiber_kill(ACL_FIBER *fiber)
{
if (fiber == NULL) {
acl_msg_error("%s(%d), %s: fiber NULL",
__FILE__, __LINE__, __FUNCTION__);
return;
}
fiber->flag |= FIBER_F_KILLED;
if (fiber == acl_fiber_running()) {
acl_msg_error("%s(%d), %s: fiber-%d kill itself disable!",
__FILE__, __LINE__, __FUNCTION__, acl_fiber_id(fiber));
return;
}
acl_fiber_ready(fiber);
}
int acl_fiber_killed(ACL_FIBER *fiber)
{
return fiber->flag & FIBER_F_KILLED;
}
void fiber_exit(int exit_code)
{
fiber_check();
@ -386,7 +459,12 @@ static void fiber_start(unsigned int x, unsigned int y)
fiber_exit(0);
}
#define MAX_CACHE 100
int acl_fiber_ndead(void)
{
if (__thread_fiber == NULL)
return 0;
return acl_ring_size(&__thread_fiber->dead);
}
static void fiber_free(ACL_FIBER *fiber)
{
@ -406,24 +484,9 @@ static ACL_FIBER *fiber_alloc(void (*fn)(ACL_FIBER *, void *),
sigset_t zero;
union cc_arg carg;
ACL_RING *head;
size_t n;
fiber_check();
n = acl_ring_size(&__thread_fiber->dead);
/* if the cached dead fibers reached the limit, some will be freed */
if (n > MAX_CACHE) {
n -= MAX_CACHE;
while (n > 0) {
head = acl_ring_pop_head(&__thread_fiber->dead);
acl_assert(head != NULL);
fiber = ACL_RING_TO_APPL(head, ACL_FIBER,me);
fiber_free(fiber);
n--;
}
}
#define APPL ACL_RING_TO_APPL
/* try to reuse the fiber memory in dead queue */
@ -441,10 +504,12 @@ static ACL_FIBER *fiber_alloc(void (*fn)(ACL_FIBER *, void *),
fiber->arg = arg;
fiber->size = size;
fiber->id = ++__thread_fiber->idgen;
fiber->flag = 0;
carg.p = fiber;
fiber->context = (ucontext_t *) acl_mymalloc(sizeof(ucontext_t));
if (fiber->context == NULL)
fiber->context = (ucontext_t *) acl_mymalloc(sizeof(ucontext_t));
sigemptyset(&zero);
sigprocmask(SIG_BLOCK, &zero, &fiber->context->uc_sigmask);
@ -479,13 +544,17 @@ ACL_FIBER *acl_fiber_create(void (*fn)(ACL_FIBER *, void *),
ACL_FIBER *fiber = fiber_alloc(fn, arg, size);
__thread_fiber->count++;
if (__thread_fiber->size % 64 == 0)
if (__thread_fiber->slot >= __thread_fiber->size) {
__thread_fiber->size += 128;
__thread_fiber->fibers = (ACL_FIBER **) acl_myrealloc(
__thread_fiber->fibers,
(__thread_fiber->size + 64) * sizeof(ACL_FIBER *));
__thread_fiber->size * sizeof(ACL_FIBER *));
}
fiber->slot = __thread_fiber->slot;
__thread_fiber->fibers[__thread_fiber->slot++] = fiber;
fiber->slot = __thread_fiber->size;
__thread_fiber->fibers[__thread_fiber->size++] = fiber;
acl_fiber_ready(fiber);
return fiber;
@ -498,7 +567,7 @@ int acl_fiber_id(const ACL_FIBER *fiber)
int acl_fiber_self(void)
{
ACL_FIBER *curr = fiber_running();
ACL_FIBER *curr = acl_fiber_running();
return acl_fiber_id(curr);
}
@ -586,18 +655,6 @@ void acl_fiber_switch(void)
acl_assert(current);
#endif
if (current->status == FIBER_STATUS_EXITING) {
size_t slot = current->slot;
if (!current->sys)
__thread_fiber->count--;
__thread_fiber->fibers[slot] =
__thread_fiber->fibers[--__thread_fiber->size];
__thread_fiber->fibers[slot]->slot = slot;
acl_ring_append(&__thread_fiber->dead, &current->me);
}
head = acl_ring_pop_head(&__thread_fiber->ready);
if (head == NULL) {

View File

@ -29,6 +29,7 @@ struct ACL_FIBER {
int sys;
unsigned int flag;
#define FIBER_F_SAVE_ERRNO (unsigned) 1 << 0
#define FIBER_F_KILLED (unsigned) 1 << 1
FIBER_LOCAL **locals;
int nlocal;
@ -109,7 +110,6 @@ struct ACL_FIBER_SEM {
extern __thread int acl_var_hook_sys_api;
/* in fiber_schedule.c */
ACL_FIBER *fiber_running(void);
void fiber_save_errno(void);
void fiber_exit(int exit_code);
void fiber_system(void);

View File

@ -236,7 +236,7 @@ unsigned int acl_fiber_delay(unsigned int milliseconds)
ev->timeout = 1;
fiber = fiber_running();
fiber = acl_fiber_running();
fiber->when = when;
acl_ring_detach(&fiber->me);
@ -337,7 +337,7 @@ void fiber_wait_read(int fd)
return;
}
__thread_fiber->io_fibers[fd] = fiber_running();
__thread_fiber->io_fibers[fd] = acl_fiber_running();
__thread_fiber->io_count++;
acl_fiber_switch();
@ -362,7 +362,7 @@ void fiber_wait_write(int fd)
return;
}
__thread_fiber->io_fibers[fd] = fiber_running();
__thread_fiber->io_fibers[fd] = acl_fiber_running();
__thread_fiber->io_count++;
acl_fiber_switch();

View File

@ -22,14 +22,14 @@ static int __lock(ACL_FIBER_MUTEX *l, int block)
ACL_FIBER *curr;
if (l->owner == NULL){
l->owner = fiber_running();
l->owner = acl_fiber_running();
return 1;
}
if(!block)
return 0;
curr = fiber_running();
curr = acl_fiber_running();
acl_ring_prepend(&l->waiting, &curr->me);
acl_fiber_switch();
@ -105,7 +105,7 @@ static int __rlock(ACL_FIBER_RWLOCK *l, int block)
if (!block)
return 0;
curr = fiber_running();
curr = acl_fiber_running();
acl_ring_prepend(&l->rwaiting, &curr->me);
acl_fiber_switch();
@ -127,14 +127,14 @@ static int __wlock(ACL_FIBER_RWLOCK *l, int block)
ACL_FIBER *curr;
if (l->writer == NULL && l->readers == 0) {
l->writer = fiber_running();
l->writer = acl_fiber_running();
return 1;
}
if (!block)
return 0;
curr = fiber_running();
curr = acl_fiber_running();
acl_ring_prepend(&l->wwaiting, &curr->me);
acl_fiber_switch();

View File

@ -26,7 +26,7 @@ int acl_fiber_sem_wait(ACL_FIBER_SEM *sem)
return sem->num;
}
curr = fiber_running();
curr = acl_fiber_running();
acl_ring_prepend(&sem->waiting, &curr->me);
acl_fiber_switch();

View File

@ -209,6 +209,7 @@ inline ssize_t fiber_read(int fd, void *buf, size_t count)
{
ssize_t ret;
EVENT *ev;
ACL_FIBER *me;
if (fd < 0) {
acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
@ -233,8 +234,16 @@ inline ssize_t fiber_read(int fd, void *buf, size_t count)
event_clear_readable(ev, fd);
ret = __sys_read(fd, buf, count);
if (ret < 0)
fiber_save_errno();
if (ret >= 0)
return ret;
fiber_save_errno();
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me));
return ret;
}
@ -242,6 +251,7 @@ inline ssize_t fiber_readv(int fd, const struct iovec *iov, int iovcnt)
{
ssize_t ret;
EVENT *ev;
ACL_FIBER *me;
if (fd < 0) {
acl_msg_error("%s: invalid fd: %d", __FUNCTION__, fd);
@ -266,8 +276,16 @@ inline ssize_t fiber_readv(int fd, const struct iovec *iov, int iovcnt)
event_clear_readable(ev, fd);
ret = __sys_readv(fd, iov, iovcnt);
if (ret < 0)
fiber_save_errno();
if (ret >= 0)
return ret;
fiber_save_errno();
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me));
return ret;
}
@ -275,6 +293,7 @@ inline ssize_t fiber_recv(int sockfd, void *buf, size_t len, int flags)
{
ssize_t ret;
EVENT *ev;
ACL_FIBER *me;
if (sockfd < 0) {
acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd);
@ -298,9 +317,18 @@ inline ssize_t fiber_recv(int sockfd, void *buf, size_t len, int flags)
if (ev)
event_clear_readable(ev, sockfd);
ret = __sys_recv(sockfd, buf, len, flags);
if (ret < 0)
fiber_save_errno();
if (ret >= 0)
return ret;
fiber_save_errno();
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me));
return ret;
}
@ -309,6 +337,7 @@ inline ssize_t fiber_recvfrom(int sockfd, void *buf, size_t len, int flags,
{
ssize_t ret;
EVENT *ev;
ACL_FIBER *me;
if (sockfd < 0) {
acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd);
@ -335,8 +364,18 @@ inline ssize_t fiber_recvfrom(int sockfd, void *buf, size_t len, int flags,
event_clear_readable(ev, sockfd);
ret = __sys_recvfrom(sockfd, buf, len, flags, src_addr, addrlen);
if (ret < 0)
fiber_save_errno();
if (ret >= 0)
return ret;
fiber_save_errno();
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0) {
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me));
return -1;
}
return ret;
}
@ -344,6 +383,7 @@ inline ssize_t fiber_recvmsg(int sockfd, struct msghdr *msg, int flags)
{
ssize_t ret;
EVENT *ev;
ACL_FIBER *me;
if (sockfd < 0) {
acl_msg_error("%s: invalid sockfd: %d", __FUNCTION__, sockfd);
@ -368,8 +408,16 @@ inline ssize_t fiber_recvmsg(int sockfd, struct msghdr *msg, int flags)
event_clear_readable(ev, sockfd);
ret = __sys_recvmsg(sockfd, msg, flags);
if (ret < 0)
fiber_save_errno();
if (ret >= 0)
return ret;
fiber_save_errno();
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me));
return ret;
}
@ -377,6 +425,8 @@ inline ssize_t fiber_recvmsg(int sockfd, struct msghdr *msg, int flags)
inline ssize_t fiber_read(int fd, void *buf, size_t count)
{
ACL_FIBER *me;
while (1) {
ssize_t n = __sys_read(fd, buf, count);
@ -395,11 +445,19 @@ inline ssize_t fiber_read(int fd, void *buf, size_t count)
#endif
return -1;
fiber_wait_read(fd);
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
}
}
inline ssize_t fiber_readv(int fd, const struct iovec *iov, int iovcnt)
{
ACL_FIBER *me;
while (1) {
ssize_t n = __sys_readv(fd, iov, iovcnt);
@ -419,11 +477,19 @@ inline ssize_t fiber_readv(int fd, const struct iovec *iov, int iovcnt)
return -1;
fiber_wait_read(fd);
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
}
}
inline ssize_t fiber_recv(int sockfd, void *buf, size_t len, int flags)
{
ACL_FIBER *me;
while (1) {
ssize_t n = __sys_recv(sockfd, buf, len, flags);
@ -443,12 +509,20 @@ inline ssize_t fiber_recv(int sockfd, void *buf, size_t len, int flags)
return -1;
fiber_wait_read(sockfd);
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
}
}
inline ssize_t fiber_recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen)
{
ACL_FIBER *me;
while (1) {
ssize_t n = __sys_recvfrom(sockfd, buf, len, flags,
src_addr, addrlen);
@ -469,11 +543,19 @@ inline ssize_t fiber_recvfrom(int sockfd, void *buf, size_t len, int flags,
return -1;
fiber_wait_read(sockfd);
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
}
}
inline ssize_t fiber_recvmsg(int sockfd, struct msghdr *msg, int flags)
{
ACL_FIBER *me;
while (1) {
ssize_t n = __sys_recvmsg(sockfd, msg, flags);
@ -493,6 +575,12 @@ inline ssize_t fiber_recvmsg(int sockfd, struct msghdr *msg, int flags)
return -1;
fiber_wait_read(sockfd);
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
}
}
@ -502,6 +590,8 @@ inline ssize_t fiber_recvmsg(int sockfd, struct msghdr *msg, int flags)
inline ssize_t fiber_write(int fd, const void *buf, size_t count)
{
ACL_FIBER *me;
while (1) {
ssize_t n = __sys_write(fd, buf, count);
@ -521,11 +611,19 @@ inline ssize_t fiber_write(int fd, const void *buf, size_t count)
return -1;
fiber_wait_write(fd);
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
}
}
inline ssize_t fiber_writev(int fd, const struct iovec *iov, int iovcnt)
{
ACL_FIBER *me;
while (1) {
ssize_t n = __sys_writev(fd, iov, iovcnt);
@ -545,11 +643,19 @@ inline ssize_t fiber_writev(int fd, const struct iovec *iov, int iovcnt)
return -1;
fiber_wait_write(fd);
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
}
}
inline ssize_t fiber_send(int sockfd, const void *buf, size_t len, int flags)
{
ACL_FIBER *me;
while (1) {
ssize_t n = __sys_send(sockfd, buf, len, flags);
@ -569,12 +675,20 @@ inline ssize_t fiber_send(int sockfd, const void *buf, size_t len, int flags)
return -1;
fiber_wait_write(sockfd);
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
}
}
inline ssize_t fiber_sendto(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen)
{
ACL_FIBER *me;
while (1) {
ssize_t n = __sys_sendto(sockfd, buf, len, flags,
dest_addr, addrlen);
@ -595,11 +709,19 @@ inline ssize_t fiber_sendto(int sockfd, const void *buf, size_t len, int flags,
return -1;
fiber_wait_write(sockfd);
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
}
}
inline ssize_t fiber_sendmsg(int sockfd, const struct msghdr *msg, int flags)
{
ACL_FIBER *me;
while (1) {
ssize_t n = __sys_sendmsg(sockfd, msg, flags);
@ -619,6 +741,12 @@ inline ssize_t fiber_sendmsg(int sockfd, const struct msghdr *msg, int flags)
return -1;
fiber_wait_write(sockfd);
me = acl_fiber_running();
if ((me->flag & FIBER_F_KILLED) != 0)
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(me));
}
}

View File

@ -127,6 +127,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
{
int clifd;
EVENT *ev;
ACL_FIBER *me = acl_fiber_running();
if (sockfd < 0) {
acl_msg_error("%s: invalid sockfd %d", __FUNCTION__, sockfd);
@ -163,6 +164,12 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
if (ev)
event_clear_readable(ev, sockfd);
if ((me->flag & FIBER_F_KILLED) != 0) {
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me));
return -1;
}
clifd = __sys_accept(sockfd, addr, addrlen);
if (clifd >= 0) {
@ -190,6 +197,12 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
if (ev)
event_clear_readable(ev, sockfd);
if ((me->flag & FIBER_F_KILLED) != 0) {
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me));
return -1;
}
clifd = __sys_accept(sockfd, addr, addrlen);
if (clifd >= 0) {
@ -207,6 +220,7 @@ int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen)
{
int err;
socklen_t len = sizeof(err);
ACL_FIBER *me = acl_fiber_running();
if (!acl_var_hook_sys_api)
return __sys_connect(sockfd, addr, addrlen);
@ -258,6 +272,12 @@ int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen)
fiber_wait_write(sockfd);
if ((me->flag & FIBER_F_KILLED) != 0) {
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__, acl_fiber_id(me));
return -1;
}
ret = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *) &err, &len);
if (ret == 0 && err == 0)
{
@ -365,7 +385,7 @@ int poll(struct pollfd *fds, nfds_t nfds, int timeout)
ev = fiber_io_event();
pe.fds = fds;
pe.nfds = nfds;
pe.fiber = fiber_running();
pe.fiber = acl_fiber_running();
pe.proc = poll_callback;
SET_TIME(begin);
@ -375,8 +395,17 @@ int poll(struct pollfd *fds, nfds_t nfds, int timeout)
fiber_io_inc();
acl_fiber_switch();
if ((pe.fiber->flag & FIBER_F_KILLED) != 0) {
acl_ring_detach(&pe.me);
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(pe.fiber));
break;
}
if (acl_ring_size(&ev->poll_list) == 0)
ev->timeout = -1;
if (pe.nready != 0 || timeout == 0)
break;
@ -775,7 +804,7 @@ int epoll_wait(int epfd, struct epoll_event *events,
ee->events = events;
ee->maxevents = maxevents;
ee->fiber = fiber_running();
ee->fiber = acl_fiber_running();
ee->proc = epoll_callback;
SET_TIME(begin);
@ -786,6 +815,15 @@ int epoll_wait(int epfd, struct epoll_event *events,
acl_fiber_switch();
ev->timeout = -1;
if ((ee->fiber->flag & FIBER_F_KILLED) != 0) {
acl_ring_detach(&ee->me);
acl_msg_info("%s(%d), %s: fiber-%d is existing",
__FILE__, __LINE__, __FUNCTION__,
acl_fiber_id(ee->fiber));
break;
}
if (ee->nready != 0 || timeout == 0)
break;

View File

@ -327,7 +327,7 @@ static void fiber_accept_main(ACL_FIBER *fiber, void *ctx)
acl_msg_warn("accept connection: %s(%d, %d), stoping ...",
acl_last_serror(), errno, ACL_EAGAIN);
server_abort(fiber_running());
server_abort(acl_fiber_running());
}
acl_msg_info("%s(%d), %s: fiber-%d exit now", __FILE__, __LINE__,

View File

@ -1,4 +1,12 @@
30) 2016.11.27
30.1) bugfix: fiber.c 中函数 acl_fiber_create 中存在运行时内存增长问题,即
__thread_fiber->fibers 对象在 realloc 过程中内存不断增长,因为 __thread_fier->size
计算方法有问题
29) 2016.11.24
29.1) feature: 增加 acl_fiber_kill 接口用来通知指定协程退出
28) 2016.10.31
28.1) feature: fiber.c 中实现了 __i386__ 下 SETJMPLONGJMP 的汇编方式,从
libdill 借鉴而来

View File

@ -1,2 +1,19 @@
include ../Makefile_cpp.in
PROG = chat
.PHONEY = all clean cl rebuild rb
all:
@(cd server; make)
@(cd client; make)
@(cd reader; make)
@(cd writer; make)
@(cd connect; make)
@(cd demo; make)
clean cl:
@(cd server; make clean)
@(cd client; make clean)
@(cd reader; make clean)
@(cd writer; make clean)
@(cd connect; make clean)
@(cd demo; make clean)
rebuild rb: cl all

View File

@ -0,0 +1,126 @@
CC = g++
CFLAGS = -c -g -W -Wall -Wcast-qual -Wcast-align \
-Waggregate-return -Wno-long-long \
-Wpointer-arith -Werror -Wshadow -O3 \
-D_REENTRANT -D_POSIX_PTHREAD_SEMANTICS -D_USE_FAST_MACRO
###########################################################
#Check system:
# Linux, SunOS, Solaris, BSD variants, AIX, HP-UX
SYSLIB = -ldl -lpthread -lz
CHECKSYSRES = @echo "Unknow system type!";exit 1
UNIXNAME = $(shell uname -sm)
OSTYPE = $(shell uname -p)
RPATH = linux64
ifeq ($(CC),)
CC = gcc
endif
# For FreeBSD
ifeq ($(findstring FreeBSD, $(UNIXNAME)), FreeBSD)
ifeq ($(findstring gcc, $(CC)), gcc)
CFLAGS += -Wstrict-prototypes
endif
CFLAGS += -DFREEBSD -D_REENTRANT
SYSLIB = -lcrypt -lpthread -lz
RPATH = freebsd
endif
# For Darwin
ifeq ($(findstring Darwin, $(UNIXNAME)), Darwin)
CFLAGS += -DMACOSX -Wno-invalid-source-encoding \
-Wno-extended-offsetof
UNIXTYPE = MACOSX
SYSLIB += -liconv -rdynamic
RPATH = macos
endif
#Path for Linux
ifeq ($(findstring Linux, $(UNIXNAME)), Linux)
ifeq ($CC, "gcc")
CFLAGS += -Wstrict-prototypes
endif
ifeq ($(findstring i686, $(OSTYPE)), i686)
RPATH = linux32
endif
ifeq ($(findstring x86_64, $(OSTYPE)), x86_64)
RPATH = linux64
endif
CFLAGS += -DLINUX2 -D_REENTRANT
SYSLIB += -lcrypt
endif
#Path for SunOS
ifeq ($(findstring SunOS, $(UNIXNAME)), SunOS)
ifeq ($(findstring 86, $(UNIXNAME)), 86)
SYSLIB += -lsocket -lnsl -lrt
endif
ifeq ($(findstring sun4u, $(UNIXNAME)), sun4u)
SYSLIB += -lsocket -lnsl -lrt
endif
ifeq ($CC, "gcc")
CFLAGS += -Wstrict-prototypes
endif
CFLAGS += -DSUNOS5 -D_REENTRANT
RPATH = sunos_x86
endif
#Path for HP-UX
ifeq ($(findstring HP-UX, $(UNIXNAME)), HP-UX)
ifeq ($CC, "gcc")
CFLAGS += -Wstrict-prototypes
endif
CFLAGS += -DHP_UX -DHPUX11
PLAT_NAME=hp-ux
endif
#Find system type.
ifneq ($(SYSPATH),)
CHECKSYSRES = @echo "System is $(shell uname -sm)"
endif
###########################################################
CFLAGS += -I../.. -I../../../../lib_acl/include \
-I../../../../lib_protocol/include \
-I../../../../lib_acl_cpp/include \
-I../../../../lib_fiber/c/include \
-I../../../../lib_fiber/cpp/include
EXTLIBS =
LDFLAGS = -L../../../../lib_fiber/lib -l_fiber_cpp -l_fiber \
-L../../../../lib_acl_cpp/lib -l_acl_cpp \
-L../../../../lib_protocol/lib -l_protocol \
-L../../../../lib_acl/lib -l_acl \
$(EXTLIBS) $(SYSLIB)
COMPILE = $(CC) $(CFLAGS)
LINK = $(CC) $(OBJ) $(LDFLAGS)
###########################################################
OBJ_PATH = .
#Project's objs
SRC = $(wildcard *.cpp)
OBJ = $(patsubst %.cpp, $(OBJ_PATH)/%.o, $(notdir $(SRC)))
$(OBJ_PATH)/%.o: %.cpp
$(COMPILE) $< -o $@
.PHONY = all clean
all: RM $(OBJ)
$(LINK) -o $(PROG)
@echo ""
@echo "All ok! Output:$(PROG)"
@echo ""
RM:
rm -f $(PROG)
clean cl:
rm -f $(PROG)
rm -f $(OBJ)
rebuild rb: clean all
install:
cp $(PROG) ../../../dist/master/libexec/$(RPATH)/
cp $(PROG).cf ../../../dist/master/conf/service/
###########################################################

View File

@ -0,0 +1,2 @@
include ../Makefile.in
PROG = chat_client

View File

@ -0,0 +1,213 @@
#include "stdafx.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "fiber/lib_fiber.hpp"
#include "stamp.h"
#include "user_client.h"
#define STACK_SIZE 128000
static const char* __user_prefix = "user";
static int __rw_timeout = 0;
static int __max_client = 10;
static int __cur_client = 10;
static struct timeval __begin;
static int __total_read = 0;
static bool client_login(user_client* uc)
{
acl::string buf;
buf.format("login|%s\r\n", uc->get_name());
if (uc->get_stream().write(buf) == -1)
{
printf("login failed error, user: %s\r\n", uc->get_name());
return false;
}
if (uc->get_stream().gets(buf) == false)
{
printf("login failed, gets error %s\r\n", acl::last_serror());
return false;
}
return true;
}
static void client_logout(user_client* client)
{
client->notify_exit();
}
static void fiber_reader(user_client* client)
{
acl::socket_stream& conn = client->get_stream();
conn.set_rw_timeout(5);
if (client_login(client) == false)
{
client_logout(client);
return;
}
printf("fiber-%d: user: %s login OK\r\n", acl_fiber_self(),
client->get_name());
acl::string buf;
const char* to = client->get_to();
acl::string msg;
int max_loop = client->get_max_loop(), i = 0, n = 0;
for (;;)
{
msg.format("chat|%s|hello world\r\n", to);
if (conn.write(msg) != (int) msg.size())
{
printf("fiber-%d: msg(%s) write error %s\r\n",
acl_fiber_self(), msg.c_str(),
acl::last_serror());
}
if (conn.gets(buf))
{
if (++i <= 1)
printf("fiber-%d: gets->%s\r\n",
acl_fiber_self(), buf.c_str());
n++;
__total_read++;
if (n == max_loop)
break;
continue;
}
printf("%s(%d): user: %s, gets error %s, fiber: %d\r\n",
__FUNCTION__, __LINE__, client->get_name(),
acl::last_serror(), acl_fiber_self());
if (client->existing())
{
printf("----existing now----\r\n");
break;
}
if (errno == ETIMEDOUT)
printf("ETIMEDOUT\r\n");
else if (errno == EAGAIN)
printf("EAGAIN\r\n");
else {
printf("gets error: %d, %s\r\n",
errno, acl::last_serror());
break;
}
}
printf(">>%s(%d), user: %s, logout, loop: %d, ngets: %d\r\n",
__FUNCTION__, __LINE__, client->get_name(), i, n);
client_logout(client);
}
static void fiber_client(const char* addr, const char* user,
const char* to, int loop)
{
acl::socket_stream conn;
if (conn.open(addr, 0, 10) == false)
{
printf("connect %s error %s\r\n", addr, acl::last_serror());
}
else
{
printf("fiber-%d, connect %s ok\r\n", acl_fiber_self(), addr);
user_client* client = new user_client(conn, user, to, loop);
go[=] {
fiber_reader(client);
};
client->wait_exit();
printf("--- client %s exit now ----\r\n", client->get_name());
delete client;
}
__cur_client--;
if (__cur_client == 0)
{
printf("-----All fibers over!-----\r\n");
struct timeval end;
gettimeofday(&end, NULL);
double spent = stamp_sub(&end, &__begin);
printf("---Total read: %d, spent: %.2f, speed: %.2f---\r\n",
__total_read, spent,
(1000 * __total_read) / (spent < 1 ? 1 : spent));
acl_fiber_schedule_stop();
}
}
static void usage(const char *procname)
{
printf("usage: %s -h [help]\r\n"
" -s server_addr\r\n"
" -n max_loop\r\n"
" -c max_client\r\n"
" -r rw_timeout\r\n" , procname);
}
int main(int argc, char *argv[])
{
char addr[64];
int ch, max_loop = 100;
acl::log::stdout_open(true);
snprintf(addr, sizeof(addr), "%s", "127.0.0.1:9002");
while ((ch = getopt(argc, argv, "hs:r:c:n:")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
return 0;
case 's':
snprintf(addr, sizeof(addr), "%s", optarg);
break;
case 'r':
__rw_timeout = atoi(optarg);
break;
case 'c':
__max_client = atoi(optarg);
break;
case 'n':
max_loop = atoi(optarg);
break;
default:
break;
}
}
__cur_client = __max_client;
gettimeofday(&__begin, NULL);
for (int i = 0; i < __max_client; i++)
{
char user[128], to[128];
snprintf(user, sizeof(user), "%s-%d", __user_prefix, i);
if (i + 1 == __max_client)
snprintf(to, sizeof(to), "%s-0", __user_prefix);
else
snprintf(to, sizeof(to), "%s-%d", __user_prefix, i);
go[=] {
fiber_client(addr, user, to, max_loop);
};
}
acl::fiber::schedule();
return 0;
}

View File

@ -0,0 +1,8 @@
// stdafx.cpp : 只包括标准包含文件的源文件
// master_threads.pch 将成为预编译头
// stdafx.obj 将包含预编译类型信息
#include "stdafx.h"
// TODO: 在 STDAFX.H 中
//引用任何所需的附加头文件,而不是在此文件中引用

View File

@ -0,0 +1,20 @@
// stdafx.h : 标准系统包含文件的包含文件,
// 或是常用但不常更改的项目特定的包含文件
//
#pragma once
//#include <iostream>
//#include <tchar.h>
// TODO: 在此处引用程序要求的附加头文件
#include "lib_acl.h"
#include "fiber/lib_fiber.h"
#include "acl_cpp/lib_acl.hpp"
#ifdef WIN32
#define snprintf _snprintf
#endif

View File

@ -0,0 +1,63 @@
#pragma once
#include <list>
class user_client
{
public:
user_client(acl::socket_stream& conn, const char* user,
const char* to, int max_loop)
: conn_(conn), name_(user), to_(to), max_loop_(max_loop) {}
~user_client(void)
{
}
acl::socket_stream& get_stream(void) const
{
return conn_;
}
const char* get_name(void) const
{
return name_.c_str();
}
const char* get_to(void) const
{
return to_.c_str();
}
int get_max_loop(void) const
{
return max_loop_;
}
bool existing(void) const
{
return existing_;
}
void set_existing(void)
{
existing_ = true;
}
void wait_exit(void)
{
int mtype;
chan_exit_.pop(mtype);
}
void notify_exit(void)
{
int mtype = 1;
chan_exit_.put(mtype);
}
private:
acl::socket_stream& conn_;
acl::channel<int> chan_exit_;
acl::string name_;
acl::string to_;
int max_loop_;
bool existing_ = false;
};

View File

@ -0,0 +1,3 @@
#!/bin/sh
valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes -v ./chat_client

View File

@ -0,0 +1,2 @@
include ../Makefile.in
PROG = client

View File

@ -0,0 +1,113 @@
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <sys/socket.h>
#include "lib_acl.h"
#include "acl_cpp/lib_acl.hpp"
static void usage(const char *procname)
{
printf("usage: %s -h [help]\r\n"
" -s server_ip\r\n"
" -n max_loop\r\n", procname);
}
struct A {
int m;
ACL_RING entry;
};
static void test(void)
{
ACL_RING ring;
acl_ring_init(&ring);
for (int i = 0; i < 10; i++)
{
A* a = new A;
a->m = i;
acl_ring_prepend(&ring, &a->entry);
printf("put %d\r\n", i);
}
printf("-------------------------------------------------------\r\n");
while (true)
{
ACL_RING* head = acl_ring_pop_head(&ring);
if (head == NULL)
break;
A* a = ACL_RING_TO_APPL(head, A, entry);
printf("pop %d\r\n", a->m);
delete a;
}
exit (0);
}
int main(int argc, char *argv[])
{
int ch, max_loop = 100, n = 100;
acl::string addr("127.0.0.1:9002");
if (0)
test();
signal(SIGPIPE, SIG_IGN);
while ((ch = getopt(argc, argv, "hn:s:l:")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
return 0;
case 'n':
max_loop = atoi(optarg);
break;
case 'l':
n = atoi(optarg);
break;
case 's':
addr = optarg;
break;
default:
break;
}
}
for (int j = 0; j < n; j++)
{
std::vector<acl::socket_stream*> conns;
int i = 0;
for (; i < max_loop; i++)
{
acl::socket_stream* conn = new acl::socket_stream;
if (conn->open(addr, 0, 0) == false)
{
printf("connect %s error %s\r\n",
addr.c_str(), acl::last_serror());
delete conn;
break;
}
conns.push_back(conn);
}
printf("i: %d, max_loop: %d\r\n", i, max_loop);
sleep(1);
for (std::vector<acl::socket_stream*>::iterator
it = conns.begin(); it != conns.end(); ++it)
{
//int sock = (*it)->sock_handle();
//shutdown(sock, SHUT_RDWR);
delete *it;
}
conns.clear();
}
return 0;
}

View File

@ -0,0 +1,2 @@
include ../Makefile.in
PROG = chat

View File

@ -0,0 +1,330 @@
#include "stdafx.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "fiber/lib_fiber.h"
#include "user_client.h"
#define STACK_SIZE 128000
static void client_read_callback(int type, ACL_EVENT *event,
ACL_VSTREAM *conn, void *ctx);
static int __rw_timeout = 0;
static int __stop = 0;
static int __use_kernel = 0;
static std::map<acl::string, user_client*> __users;
static void remove_user(user_client* uc)
{
const char* name = uc->get_name();
if (name == NULL || *name == 0)
{
printf("no name!\r\n");
return;
}
std::map<acl::string, user_client*>::iterator it = __users.find(name);
if (it == __users.end())
printf("not exist, name: %s\r\n", name);
else
{
__users.erase(it);
printf("delete user ok, name: %s\r\n", name);
}
}
static bool client_login(user_client* uc, ACL_ARGV* tokens)
{
if (tokens->argc < 2)
{
acl::string tmp;
tmp.format("invalid argc: %d < 2\r\n", tokens->argc);
printf("%s", tmp.c_str());
return acl_vstream_writen(uc->get_stream(), tmp.c_str(),
(int) tmp.size()) != ACL_VSTREAM_EOF;
}
acl::string msg;
const char* name = tokens->argv[1];
std::map<acl::string, user_client*>::iterator it = __users.find(name);
if (it == __users.end())
{
__users[name] = uc;
uc->set_name(name);
msg.format("user %s login ok\r\n", name);
}
else
msg.format("user %s already login\r\n", name);
printf("%s", msg.c_str());
return acl_vstream_writen(uc->get_stream(), msg.c_str(),
(int) msg.size()) != ACL_VSTREAM_EOF;
}
static bool client_chat(user_client* uc, ACL_ARGV* tokens)
{
if (tokens->argc < 3)
{
acl::string tmp;
tmp.format("invalid argc: %d < 3\r\n", tokens->argc);
printf("%s", tmp.c_str());
return acl_vstream_writen(uc->get_stream(), tmp.c_str(),
(int) tmp.size()) != ACL_VSTREAM_EOF;
}
const char* to = tokens->argv[1];
const char* msg = tokens->argv[2];
std::map<acl::string, user_client*>::iterator it = __users.find(to);
if (it == __users.end())
{
acl::string tmp;
tmp.format("from user: %s, to user: %s not exist\r\n",
uc->get_name(), to);
printf("%s", tmp.c_str());
return acl_vstream_writen(uc->get_stream(), tmp.c_str(),
(int) tmp.size()) != ACL_VSTREAM_EOF;
}
else
{
it->second->push(msg);
return true;
}
}
static void fiber_client_read(ACL_FIBER *fiber acl_unused, void *ctx)
{
user_client* uc = (user_client*) ctx;
ACL_VSTREAM *conn = uc->get_stream();
ACL_EVENT *event = uc->get_event();
char buf[8192];
int ret = acl_vstream_gets_nonl(conn, buf, sizeof(buf) - 1);
printf(">>>ret: %d\r\n", ret);
if (ret == ACL_VSTREAM_EOF)
{
if (uc->already_login())
remove_user(uc);
acl_vstream_close(conn);
delete uc;
return;
}
buf[ret] = 0;
printf("gets: %s\r\n", buf);
bool status;
ACL_ARGV* tokens = acl_argv_split(buf, "|");
if (strcasecmp(tokens->argv[0], "login") == 0)
{
if (uc->already_login())
{
status = true;
printf("user: %s already logined\r\n", uc->get_name());
}
else
status = client_login(uc, tokens);
}
else if (!uc->already_login())
{
acl::string tmp;
tmp.format("you're not login yet!\r\n");
printf("%s", tmp.c_str());
status = acl_vstream_writen(conn, tmp.c_str(), (int) tmp.size())
== ACL_VSTREAM_EOF ? false : true;
}
else if (strcasecmp(tokens->argv[0], "chat") == 0)
status = client_chat(uc, tokens);
else
{
acl::string tmp;
tmp.format("invalid data: %s\r\n", buf);
printf("%s", tmp.c_str());
status = acl_vstream_writen(conn, tmp.c_str(), (int) tmp.size())
== ACL_VSTREAM_EOF ? false : true;
}
acl_argv_free(tokens);
if (status == false)
{
if (uc->already_login())
remove_user(uc);
acl_vstream_close(conn);
delete uc;
return;
}
acl_event_enable_read(event, conn, 120, client_read_callback, uc);
}
static void client_read_callback(int type acl_unused, ACL_EVENT *event,
ACL_VSTREAM *conn, void *ctx)
{
user_client* uc = (user_client*) ctx;
acl_event_disable_readwrite(event, conn);
acl_fiber_create(fiber_client_read, uc, STACK_SIZE);
}
static void listen_callback(int type acl_unused, ACL_EVENT *event,
ACL_VSTREAM *sstream, void *ctx acl_unused)
{
char ip[64];
ACL_VSTREAM *conn = acl_vstream_accept(sstream, ip, sizeof(ip));
if (conn == NULL) {
printf("accept error %s\r\n", acl_last_serror());
return;
}
printf(">>>accept one, fd: %d\r\n", ACL_VSTREAM_SOCK(conn));
user_client* uc = new user_client(event, conn);
acl_event_enable_read(event, conn, 120, client_read_callback, uc);
}
static void fiber_flush(ACL_FIBER* fiber acl_unused, void* ctx)
{
user_client* uc = (user_client*) ctx;
ACL_EVENT* event = uc->get_event();
ACL_VSTREAM* conn = uc->get_stream();
acl::string* msg;
uc->set_busy(true);
while ((msg = uc->pop()) != NULL)
{
if (acl_vstream_writen(conn, msg->c_str(), (int) msg->size())
== ACL_VSTREAM_EOF)
{
printf("flush to user: %s error\r\n", uc->get_name());
delete msg;
break;
// don't delete uc here, which will be remove in
// read callback
//acl_vstream_close(conn);
//delete uc;
//return;
}
}
uc->set_busy(false);
acl_event_enable_read(event, conn, 120, client_read_callback, uc);
}
static void fflush_all(void)
{
std::map<acl::string, user_client*>::iterator it = __users.begin();
std::map<acl::string, user_client*>::iterator next = it;
for (; it != __users.end(); it = next)
{
++next;
if (it->second->empty() || it->second->is_busy())
continue;
ACL_EVENT* event = it->second->get_event();
ACL_VSTREAM* conn = it->second->get_stream();
acl_event_disable_readwrite(event, conn);
acl_fiber_create(fiber_flush, it->second, STACK_SIZE);
}
}
static void fiber_event(ACL_FIBER *fiber acl_unused, void *ctx)
{
ACL_VSTREAM *sstream = (ACL_VSTREAM *) ctx;
ACL_EVENT *event;
time_t last_ping = time(NULL);
if (__use_kernel)
event = acl_event_new(ACL_EVENT_KERNEL, 0, 0, 100000);
else
event = acl_event_new(ACL_EVENT_POLL, 0, 0, 100000);
printf(">>>enable read fd: %d\r\n", ACL_VSTREAM_SOCK(sstream));
acl_event_enable_listen(event, sstream, 0, listen_callback, NULL);
while (!__stop)
{
acl_event_loop(event);
fflush_all();
if (time(NULL) - last_ping >= 1)
{
//ping_all();
last_ping = time(NULL);
}
}
acl_vstream_close(sstream);
acl_event_free(event);
acl_fiber_schedule_stop();
}
static void usage(const char *procname)
{
printf("usage: %s -h [help] -s listen_addr\r\n"
" -r rw_timeout\r\n"
" -R [use real http]\r\n"
" -k [use kernel event]\r\n", procname);
}
int main(int argc, char *argv[])
{
char addr[64];
ACL_VSTREAM *sstream;
int ch;
acl::log::stdout_open(true);
snprintf(addr, sizeof(addr), "%s", "127.0.0.1:9002");
while ((ch = getopt(argc, argv, "hs:r:k")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
return 0;
case 's':
snprintf(addr, sizeof(addr), "%s", optarg);
break;
case 'r':
__rw_timeout = atoi(optarg);
break;
case 'k':
__use_kernel = 1;
break;
default:
break;
}
}
sstream = acl_vstream_listen(addr, 128);
if (sstream == NULL) {
printf("acl_vstream_listen error %s\r\n", acl_last_serror());
return 1;
}
printf("listen %s ok\r\n", addr);
printf("%s: call fiber_creater\r\n", __FUNCTION__);
acl_fiber_create(fiber_event, sstream, STACK_SIZE);
printf("call fiber_schedule\r\n");
acl_fiber_schedule();
return 0;
}

View File

@ -0,0 +1,8 @@
// stdafx.cpp : 只包括标准包含文件的源文件
// master_threads.pch 将成为预编译头
// stdafx.obj 将包含预编译类型信息
#include "stdafx.h"
// TODO: 在 STDAFX.H 中
//引用任何所需的附加头文件,而不是在此文件中引用

View File

@ -0,0 +1,20 @@
// stdafx.h : 标准系统包含文件的包含文件,
// 或是常用但不常更改的项目特定的包含文件
//
#pragma once
//#include <iostream>
//#include <tchar.h>
// TODO: 在此处引用程序要求的附加头文件
#include "lib_acl.h"
#include "fiber/lib_fiber.h"
#include "acl_cpp/lib_acl.hpp"
#ifdef WIN32
#define snprintf _snprintf
#endif

View File

@ -0,0 +1,80 @@
#pragma once
#include <list>
class user_client
{
public:
user_client(ACL_EVENT*event, ACL_VSTREAM* conn)
: event_(event), conn_(conn), busy_(false) {}
~user_client(void)
{
for (std::list<acl::string*>::iterator it = messages_.begin();
it != messages_.end(); ++it)
{
delete *it;
}
}
ACL_EVENT* get_event(void) const
{
return event_;
}
ACL_VSTREAM* get_stream(void) const
{
return conn_;
}
bool already_login(void) const
{
return !name_.empty();
}
bool empty(void) const
{
return messages_.empty();
}
acl::string* pop(void)
{
if (messages_.empty())
return NULL;
acl::string* msg = messages_.front();
messages_.pop_front();
return msg;
}
void push(const char* msg)
{
acl::string* buf = new acl::string(msg);
(*buf) << "\r\n";
messages_.push_back(buf);
}
void set_name(const char* name)
{
name_ = name;
}
const char* get_name(void) const
{
return name_.c_str();
}
bool is_busy(void) const
{
return busy_;
}
void set_busy(bool yes)
{
busy_ = yes;
}
private:
ACL_EVENT* event_;
ACL_VSTREAM* conn_;
acl::string name_;
std::list<acl::string*> messages_;
bool busy_;
};

View File

@ -0,0 +1,2 @@
include ../Makefile.in
PROG = reader

View File

@ -0,0 +1,180 @@
#include "stdafx.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "fiber/lib_fiber.hpp"
#include "stamp.h"
#include "user_client.h"
#define STACK_SIZE 128000
static const char* __user_prefix = "reader";
static int __rw_timeout = 0;
static int __max_client = 10;
static int __cur_client = 10;
static struct timeval __begin;
static int __total_read = 0;
static bool client_login(user_client* uc)
{
acl::string buf;
buf.format("login|%s\r\n", uc->get_name());
if (uc->get_stream().write(buf) == -1)
{
printf("login failed error, user: %s\r\n", uc->get_name());
return false;
}
if (uc->get_stream().gets(buf) == false)
{
printf("login failed, gets error %s\r\n", acl::last_serror());
return false;
}
return true;
}
static void client_logout(user_client* client)
{
client->notify_exit();
}
static void fiber_reader(user_client* client)
{
acl::socket_stream& conn = client->get_stream();
conn.set_rw_timeout(0);
if (client_login(client) == false)
{
client_logout(client);
return;
}
printf("fiber-%d: user: %s login OK\r\n", acl_fiber_self(),
client->get_name());
acl::string buf;
int max_loop = client->get_max_loop(), i;
for (i = 0; i < max_loop; ++i)
{
if (conn.gets(buf))
{
if (i <= 1)
printf("fiber-%d: gets->%s\r\n",
acl_fiber_self(), buf.c_str());
__total_read++;
continue;
}
printf("%s(%d): user: %s, gets error %s, fiber: %d\r\n",
__FUNCTION__, __LINE__, client->get_name(),
acl::last_serror(), acl_fiber_self());
break;
}
printf(">>%s(%d), user: %s, logout, loop: %d\r\n",
__FUNCTION__, __LINE__, client->get_name(), i);
client_logout(client);
}
static void fiber_client(const char* addr, const char* user, int max_loop)
{
acl::socket_stream conn;
if (conn.open(addr, 0, 0) == false)
{
printf("connect %s error %s\r\n", addr, acl::last_serror());
}
else
{
printf("fiber-%d, connect %s ok\r\n", acl_fiber_self(), addr);
user_client* client = new user_client(conn, user, max_loop);
go[=] {
fiber_reader(client);
};
client->wait_exit();
printf("--- client %s exit now ----\r\n", client->get_name());
delete client;
}
__cur_client--;
if (__cur_client == 0)
{
printf("-----All fibers over!-----\r\n");
struct timeval end;
gettimeofday(&end, NULL);
double spent = stamp_sub(&end, &__begin);
printf("---Total read: %d, spent: %.2f, speed: %.2f---\r\n",
__total_read, spent,
(1000 * __total_read) / (spent < 1 ? 1 : spent));
acl_fiber_schedule_stop();
}
}
static void usage(const char *procname)
{
printf("usage: %s -h [help]\r\n"
" -s server_addr\r\n"
" -n max_loop\r\n"
" -c max_client\r\n"
" -r rw_timeout\r\n" , procname);
}
int main(int argc, char *argv[])
{
char addr[64];
int ch, max_loop = 100;
acl::log::stdout_open(true);
snprintf(addr, sizeof(addr), "%s", "127.0.0.1:9002");
while ((ch = getopt(argc, argv, "hs:r:c:n:")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
return 0;
case 's':
snprintf(addr, sizeof(addr), "%s", optarg);
break;
case 'r':
__rw_timeout = atoi(optarg);
break;
case 'c':
__max_client = atoi(optarg);
break;
case 'n':
max_loop = atoi(optarg);
break;
default:
break;
}
}
__cur_client = __max_client;
gettimeofday(&__begin, NULL);
for (int i = 0; i < __max_client; i++)
{
char user[128];
snprintf(user, sizeof(user), "%s-%d", __user_prefix, i);
go[=] {
fiber_client(addr, user, max_loop);
};
}
acl::fiber::schedule();
return 0;
}

View File

@ -0,0 +1,8 @@
// stdafx.cpp : 只包括标准包含文件的源文件
// master_threads.pch 将成为预编译头
// stdafx.obj 将包含预编译类型信息
#include "stdafx.h"
// TODO: 在 STDAFX.H 中
//引用任何所需的附加头文件,而不是在此文件中引用

View File

@ -0,0 +1,20 @@
// stdafx.h : 标准系统包含文件的包含文件,
// 或是常用但不常更改的项目特定的包含文件
//
#pragma once
//#include <iostream>
//#include <tchar.h>
// TODO: 在此处引用程序要求的附加头文件
#include "lib_acl.h"
#include "fiber/lib_fiber.h"
#include "acl_cpp/lib_acl.hpp"
#ifdef WIN32
#define snprintf _snprintf
#endif

View File

@ -0,0 +1,56 @@
#pragma once
#include <list>
class user_client
{
public:
user_client(acl::socket_stream& conn, const char* user, int max_loop)
: conn_(conn), name_(user), max_loop_(max_loop) {}
~user_client(void)
{
}
acl::socket_stream& get_stream(void) const
{
return conn_;
}
const char* get_name(void) const
{
return name_.c_str();
}
int get_max_loop(void) const
{
return max_loop_;
}
bool existing(void) const
{
return existing_;
}
void set_existing(void)
{
existing_ = true;
}
void wait_exit(void)
{
int mtype;
chan_exit_.pop(mtype);
}
void notify_exit(void)
{
int mtype = 1;
chan_exit_.put(mtype);
}
private:
acl::socket_stream& conn_;
acl::channel<int> chan_exit_;
acl::string name_;
int max_loop_;
bool existing_ = false;
};

View File

@ -0,0 +1,3 @@
#!/bin/sh
valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes -v ./chat_client

View File

@ -0,0 +1,2 @@
include ../Makefile.in
PROG = chat_server

View File

@ -0,0 +1,446 @@
#include "stdafx.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "fiber/lib_fiber.hpp"
#include "user_client.h"
#define STACK_SIZE 32000
static int __rw_timeout = 0;
static acl::channel<int> __chan_monitor;
static std::map<acl::string, user_client*> __users;
static void remove_user(user_client* uc)
{
const char* name = uc->get_name();
if (name == NULL || *name == 0)
{
printf("%s(%d): no name!\r\n", __FUNCTION__, __LINE__);
return;
}
std::map<acl::string, user_client*>::iterator it = __users.find(name);
if (it == __users.end())
printf("%s(%d): not exist, name: %s\r\n",
__FUNCTION__, __LINE__, name);
else
{
__users.erase(it);
printf("delete user ok, name: %s\r\n", name);
}
}
static void client_logout(user_client* client)
{
if (client->already_login())
remove_user(client);
if (client->is_reading())
{
printf("%s(%d): user: %s, kill_reader\r\n",
__FUNCTION__, __LINE__, client->get_name());
client->kill_reader();
}
if (client->is_waiting())
{
printf("fiber-%d: %s(%d): user: %s, notify logout\r\n",
acl_fiber_self(), __FUNCTION__, __LINE__,
client->get_name());
client->notify(MT_LOGOUT);
}
if (!client->is_reading() && !client->is_waiting())
client->notify_exit();
}
static bool client_flush(user_client* client)
{
acl::socket_stream& conn = client->get_stream();
acl::string* msg;
bool ret = true;
while ((msg = client->pop()) != NULL)
{
if (conn.write(*msg) == -1)
{
printf("flush to user: %s error %s\r\n",
client->get_name(), acl::last_serror());
delete msg;
ret = false;
break;
}
delete msg;
}
return ret;
}
static int __nwriter = 0;
static void fiber_writer(user_client* client)
{
client->set_waiter();
client->set_waiting(true);
while (true)
{
int mtype;
client->wait(mtype);
if (client_flush(client) == false)
{
printf("%s(%d), user: %s, flush error %s\r\n",
__FUNCTION__, __LINE__, client->get_name(),
acl::last_serror());
break;
}
#ifdef USE_CHAN
if (mtype == MT_LOGOUT)
{
printf("%s(%d), user: %s, MT_LOGOUT\r\n",
__FUNCTION__, __LINE__, client->get_name());
break;
}
if (mtype == MT_KICK)
{
printf("%s(%d), user: %s, MT_KICK\r\n",
__FUNCTION__, __LINE__, client->get_name());
client->get_stream().write("You're kicked\r\n");
break;
}
#endif
}
client->set_waiting(false);
printf(">>%s(%d), user: %s, logout\r\n", __FUNCTION__, __LINE__,
client->get_name());
client_logout(client);
printf("-------__nwriter: %d-----\r\n", --__nwriter);
}
static bool client_login(user_client* uc)
{
acl::string buf;
while (true)
{
if (uc->get_stream().gets(buf) == false)
{
printf("%s(%d): gets error %s\r\n",
__FUNCTION__, __LINE__, acl::last_serror());
if (errno == ETIMEDOUT)
{
printf("Login timeout\r\n");
uc->get_stream().write("Login timeout\r\n");
}
return false;
}
if (!buf.empty())
break;
}
std::vector<acl::string>& tokens = buf.split2("|");
if (tokens.size() < 2)
{
acl::string tmp;
tmp.format("invalid argc: %d < 2\r\n", (int) tokens.size());
printf("%s", tmp.c_str());
return uc->get_stream().write(tmp) != -1;
}
acl::string msg;
const acl::string& name = tokens[1];
std::map<acl::string, user_client*>::iterator it = __users.find(name);
if (it == __users.end())
{
__users[name] = uc;
uc->set_name(name);
msg.format("user %s login ok\r\n", name.c_str());
}
else
msg.format("user %s already login\r\n", name.c_str());
printf("%s", msg.c_str());
return uc->get_stream().write(msg) != -1;
}
static bool client_chat(user_client* uc, std::vector<acl::string>& tokens)
{
if (tokens.size() < 3)
{
printf("invalid argc: %d < 3\r\n", (int) tokens.size());
return true;
}
const acl::string& to = tokens[1];
const acl::string& msg = tokens[2];
std::map<acl::string, user_client*>::iterator it = __users.find(to);
if (it == __users.end())
{
acl::string tmp;
tmp.format("chat >> from user: %s, to user: %s not exist\r\n",
uc->get_name(), to.c_str());
printf("%s", tmp.c_str());
return uc->get_stream().write(tmp) != -1;
}
it->second->push(msg);
it->second->notify(MT_MSG);
return true;
}
static bool client_kick(user_client* uc, std::vector<acl::string>& tokens)
{
if (tokens.size() < 2)
{
printf("invalid argc: %d < 2\r\n", (int) tokens.size());
return true;
}
const acl::string& to = tokens[1];
std::map<acl::string, user_client*>::iterator it = __users.find(to);
if (it == __users.end())
{
acl::string tmp;
tmp.format("kick >> from user: %s, to user: %s not exist\r\n",
uc->get_name(), to.c_str());
printf("%s", tmp.c_str());
return uc->get_stream().write(tmp) != -1;
}
it->second->notify(MT_KICK);
return true;
}
static int __nreader = 0;
static void fiber_reader(user_client* client)
{
acl::socket_stream& conn = client->get_stream();
conn.set_rw_timeout(0);
client->set_reader();
client->set_reading(true);
if (client_login(client) == false)
{
client->set_reading(false);
printf("----------client_logout-------\r\n");
client_logout(client);
printf("----__nreader: %d-----\r\n", --__nreader);
return;
}
go_stack(STACK_SIZE) [&] {
__nwriter++;
fiber_writer(client);
};
conn.set_rw_timeout(0);
bool stop = false;
acl::string buf;
while (true)
{
bool ret = conn.gets(buf);
if (ret == false)
{
printf("%s(%d): user: %s, gets error %s, fiber: %d\r\n",
__FUNCTION__, __LINE__, client->get_name(),
acl::last_serror(), acl_fiber_self());
if (client->exiting())
{
printf("----exiting now----\r\n");
break;
}
if (errno == ETIMEDOUT)
{
if (conn.write("ping\r\n") == -1)
{
printf("ping error\r\n");
break;
}
}
else if (errno == EAGAIN)
printf("EAGAIN\r\n");
else {
printf("gets error: %d, %s\r\n",
errno, acl::last_serror());
break;
}
continue;
}
if (buf.empty())
continue;
std::vector<acl::string>& tokens = buf.split2("|");
if (tokens[0] == "quit" || tokens[0] == "exit")
{
conn.write("Bye!\r\n");
break;
}
else if (tokens[0] == "chat")
{
if (client_chat(client, tokens) == false)
break;
}
else if (tokens[0] == "kick")
{
if (client_kick(client, tokens) == false)
break;
}
else if (tokens[0] == "stop")
{
stop = true;
break;
}
else
printf("invalid data: %s, cmd: [%s]\r\n",
buf.c_str(), tokens[0].c_str());
}
printf(">>%s(%d), user: %s, logout\r\n", __FUNCTION__, __LINE__,
client->get_name());
client->set_reading(false);
client_logout(client);
printf("----__nreader: %d-----\r\n", --__nreader);
if (stop)
{
int dumy = 1;
__chan_monitor.put(dumy);
}
}
static int __nclients = 0;
static void fiber_client(acl::socket_stream* conn)
{
user_client* client = new user_client(*conn);
go_stack(STACK_SIZE) [=] {
__nreader++;
fiber_reader(client);
};
client->wait_exit();
printf("----- client (%s), exit now -----\r\n", client->get_name());
delete client;
delete conn;
printf("----__nclients: %d-----\r\n", --__nclients);
printf("----dead fibers: %d---\r\n", acl_fiber_ndead());
}
static ACL_FIBER *__fiber_accept = NULL;
static void fiber_accept(acl::server_socket& ss)
{
__fiber_accept = acl_fiber_running();
while (true)
{
acl::socket_stream* conn = ss.accept();
if (conn == NULL)
{
printf("accept error %s\r\n", acl::last_serror());
break;
}
go_stack(STACK_SIZE) [=] {
__nclients++;
fiber_client(conn);
};
}
}
static void fiber_monitor(void)
{
int n;
__chan_monitor.pop(n);
printf("--- kill fiber_accept ---\r\n");
acl_fiber_kill(__fiber_accept);
printf("--- stop fiber schedule ---\r\n");
acl_fiber_schedule_stop();
}
static void usage(const char *procname)
{
printf("usage: %s -h [help]\r\n"
" -s listen_addr\r\n"
" -r rw_timeout\r\n" , procname);
}
int main(int argc, char *argv[])
{
char addr[64];
int ch;
acl::log::stdout_open(true);
snprintf(addr, sizeof(addr), "%s", "127.0.0.1:9002");
while ((ch = getopt(argc, argv, "hs:r:")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
return 0;
case 's':
snprintf(addr, sizeof(addr), "%s", optarg);
break;
case 'r':
__rw_timeout = atoi(optarg);
break;
default:
break;
}
}
acl::server_socket ss;
if (ss.open(addr) == false)
{
printf("listen %s error %s\r\n", addr, acl::last_serror());
return 1;
}
printf("listen %s ok\r\n", addr);
go[&] {
fiber_accept(ss);
};
go[] {
fiber_monitor();
};
acl::fiber::schedule();
return 0;
}

View File

@ -0,0 +1,8 @@
// stdafx.cpp : 只包括标准包含文件的源文件
// master_threads.pch 将成为预编译头
// stdafx.obj 将包含预编译类型信息
#include "stdafx.h"
// TODO: 在 STDAFX.H 中
//引用任何所需的附加头文件,而不是在此文件中引用

View File

@ -0,0 +1,20 @@
// stdafx.h : 标准系统包含文件的包含文件,
// 或是常用但不常更改的项目特定的包含文件
//
#pragma once
//#include <iostream>
//#include <tchar.h>
// TODO: 在此处引用程序要求的附加头文件
#include "lib_acl.h"
#include "fiber/lib_fiber.h"
#include "acl_cpp/lib_acl.hpp"
#ifdef WIN32
#define snprintf _snprintf
#endif

View File

@ -0,0 +1,165 @@
#pragma once
#include <list>
enum
{
MT_MSG,
MT_LOGOUT,
MT_KICK,
};
class user_client
{
public:
#ifdef USE_CHAN
user_client(acl::socket_stream& conn) : conn_(conn) {}
#else
user_client(acl::socket_stream& conn) : conn_(conn), sem_msg_(1) {}
#endif
~user_client(void)
{
for (std::list<acl::string*>::iterator it = messages_.begin();
it != messages_.end(); ++it)
{
delete *it;
}
}
acl::socket_stream& get_stream(void) const
{
return conn_;
}
bool already_login(void) const
{
return !name_.empty();
}
bool empty(void) const
{
return messages_.empty();
}
acl::string* pop(void)
{
if (messages_.empty())
return NULL;
acl::string* msg = messages_.front();
messages_.pop_front();
return msg;
}
void push(const char* msg)
{
acl::string* buf = new acl::string(msg);
(*buf) << "\r\n";
messages_.push_back(buf);
}
void set_name(const char* name)
{
name_ = name;
}
const char* get_name(void) const
{
return name_.c_str();
}
void wait(int& mtype)
{
#ifdef USE_CHAN
chan_msg_.pop(mtype);
#else
(void) mtype;
sem_msg_.wait();
#endif
}
void notify(int mtype)
{
#ifdef USE_CHAN
chan_msg_.put(mtype);
#else
(void) mtype;
(void) sem_msg_.post();
#endif
}
void kill_reader(void)
{
if (fiber_reader_)
{
exiting_ = true;
acl_fiber_kill(fiber_reader_);
}
}
bool exiting(void) const
{
return exiting_;
}
void set_exiting(void)
{
exiting_ = true;
}
void set_reading(bool yes)
{
reading_ = yes;
}
bool is_reading(void) const
{
return reading_;
}
void set_waiting(bool yes)
{
waiting_ = yes;
}
bool is_waiting(void) const
{
return waiting_;
}
void wait_exit(void)
{
int mtype;
chan_exit_.pop(mtype);
}
void notify_exit(void)
{
int mtype = MT_LOGOUT;
chan_exit_.put(mtype);
}
void set_reader(void)
{
fiber_reader_ = acl_fiber_running();
}
void set_waiter(void)
{
fiber_waiter_ = acl_fiber_running();
}
private:
acl::socket_stream& conn_;
#ifdef USE_CHAN
acl::channel<int> chan_msg_;
#else
acl::fiber_sem sem_msg_;
#endif
acl::channel<int> chan_exit_;
acl::string name_;
std::list<acl::string*> messages_;
bool exiting_ = false;
bool reading_ = false;
bool waiting_ = false;
ACL_FIBER* fiber_reader_ = NULL;
ACL_FIBER* fiber_waiter_ = NULL;
};

View File

@ -0,0 +1,4 @@
#!/bin/sh
#valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes --max-stackframe=3426305034400000 -v ./fiber -n 10 -m 20
valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes -v ./chat_server

View File

@ -0,0 +1,2 @@
include ../Makefile.in
PROG = writer

View File

@ -0,0 +1,192 @@
#include "stdafx.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "fiber/lib_fiber.hpp"
#include "stamp.h"
#include "user_client.h"
#define STACK_SIZE 128000
static const char* __writer_prefix = "writer";
static const char* __reader_prefix = "reader";
static int __rw_timeout = 0;
static int __max_client = 10;
static int __cur_client = 10;
static struct timeval __begin;
static int __total_write = 0;
static bool client_login(user_client* uc)
{
acl::string buf;
buf.format("login|%s\r\n", uc->get_name());
if (uc->get_stream().write(buf) == -1)
{
printf("login failed error, user: %s\r\n", uc->get_name());
return false;
}
if (uc->get_stream().gets(buf) == false)
{
printf("login failed, gets error %s\r\n", acl::last_serror());
return false;
}
return true;
}
static void client_logout(user_client* client)
{
client->notify_exit();
}
static void fiber_writer(user_client* client)
{
acl::socket_stream& conn = client->get_stream();
conn.set_rw_timeout(0);
if (client_login(client) == false)
{
client_logout(client);
return;
}
printf("fiber-%d: user: %s login OK\r\n", acl_fiber_self(),
client->get_name());
acl::string buf;
const char* to = client->get_to();
acl::string msg;
int max_loop = client->get_max_loop(), i;
for (i = 0; i < max_loop; i++)
{
msg.format("chat|%s|hello world\r\n", to);
if (conn.write(msg) != (int) msg.size())
{
printf("fiber-%d: msg(%s) write error %s\r\n",
acl_fiber_self(), msg.c_str(),
acl::last_serror());
if (client->existing())
{
printf("----existing now----\r\n");
break;
}
}
__total_write++;
if (i < 1)
printf(">>>msg: %s", msg.c_str());
acl::fiber::yield();
}
printf(">>%s(%d), user: %s, logout, nwrite: %d\r\n",
__FUNCTION__, __LINE__, client->get_name(), i);
client_logout(client);
}
static void fiber_client(const char* addr, const char* user,
const char* to, int loop)
{
acl::socket_stream conn;
if (conn.open(addr, 0, 0) == false)
{
printf("connect %s error %s\r\n", addr, acl::last_serror());
}
else
{
printf("fiber-%d, connect %s ok\r\n", acl_fiber_self(), addr);
user_client* client = new user_client(conn, user, to, loop);
go[=] {
fiber_writer(client);
};
client->wait_exit();
printf("--- client %s exit now ----\r\n", client->get_name());
delete client;
}
__cur_client--;
if (__cur_client == 0)
{
printf("-----All fibers over!-----\r\n");
struct timeval end;
gettimeofday(&end, NULL);
double spent = stamp_sub(&end, &__begin);
printf("---Total write: %d, spent: %.2f, speed: %.2f---\r\n",
__total_write, spent,
(1000 * __total_write) / (spent < 1 ? 1 : spent));
acl_fiber_schedule_stop();
}
}
static void usage(const char *procname)
{
printf("usage: %s -h [help]\r\n"
" -s server_addr\r\n"
" -n max_loop\r\n"
" -c max_client\r\n"
" -r rw_timeout\r\n" , procname);
}
int main(int argc, char *argv[])
{
char addr[64];
int ch, max_loop = 100;
acl::log::stdout_open(true);
snprintf(addr, sizeof(addr), "%s", "127.0.0.1:9002");
while ((ch = getopt(argc, argv, "hs:r:c:n:")) > 0) {
switch (ch) {
case 'h':
usage(argv[0]);
return 0;
case 's':
snprintf(addr, sizeof(addr), "%s", optarg);
break;
case 'r':
__rw_timeout = atoi(optarg);
break;
case 'c':
__max_client = atoi(optarg);
break;
case 'n':
max_loop = atoi(optarg);
break;
default:
break;
}
}
__cur_client = __max_client;
gettimeofday(&__begin, NULL);
for (int i = 0; i < __max_client; i++)
{
char user[128], to[128];
snprintf(user, sizeof(user), "%s-%d", __writer_prefix, i);
snprintf(to, sizeof(to), "%s-%d", __reader_prefix, i);
go[=] {
fiber_client(addr, user, to, max_loop);
};
}
acl::fiber::schedule();
return 0;
}

View File

@ -0,0 +1,8 @@
// stdafx.cpp : 只包括标准包含文件的源文件
// master_threads.pch 将成为预编译头
// stdafx.obj 将包含预编译类型信息
#include "stdafx.h"
// TODO: 在 STDAFX.H 中
//引用任何所需的附加头文件,而不是在此文件中引用

View File

@ -0,0 +1,20 @@
// stdafx.h : 标准系统包含文件的包含文件,
// 或是常用但不常更改的项目特定的包含文件
//
#pragma once
//#include <iostream>
//#include <tchar.h>
// TODO: 在此处引用程序要求的附加头文件
#include "lib_acl.h"
#include "fiber/lib_fiber.h"
#include "acl_cpp/lib_acl.hpp"
#ifdef WIN32
#define snprintf _snprintf
#endif

View File

@ -0,0 +1,63 @@
#pragma once
#include <list>
class user_client
{
public:
user_client(acl::socket_stream& conn, const char* user,
const char* to, int max_loop)
: conn_(conn), name_(user), to_(to), max_loop_(max_loop) {}
~user_client(void)
{
}
acl::socket_stream& get_stream(void) const
{
return conn_;
}
const char* get_name(void) const
{
return name_.c_str();
}
const char* get_to(void) const
{
return to_.c_str();
}
int get_max_loop(void) const
{
return max_loop_;
}
bool existing(void) const
{
return existing_;
}
void set_existing(void)
{
existing_ = true;
}
void wait_exit(void)
{
int mtype;
chan_exit_.pop(mtype);
}
void notify_exit(void)
{
int mtype = 1;
chan_exit_.put(mtype);
}
private:
acl::socket_stream& conn_;
acl::channel<int> chan_exit_;
acl::string name_;
acl::string to_;
int max_loop_;
bool existing_ = false;
};

View File

@ -0,0 +1,3 @@
#!/bin/sh
valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes -v ./chat_client

View File

@ -5,6 +5,8 @@ static int __fibers_count = 2;
static int __fibers_max = 2;
static int __oper_count = 100;
//static struct timeval __begin, __end;
static ACL_FIBER **__workers = NULL;
static ACL_CHANNEL *__chan_exit = NULL;
static acl::redis_client_cluster __redis_cluster;
@ -150,7 +152,7 @@ static bool redis_del(ACL_FIBER& fiber, ACL_CHANNEL &chan, PKT &pkt)
return true;
}
static void fiber_redis_worker(ACL_FIBER *fiber, void *ctx)
static void fiber_worker(ACL_FIBER *fiber, void *ctx)
{
ACL_CHANNEL *chan = ((MYCHAN *) ctx)->chan;
@ -199,11 +201,11 @@ static void fiber_redis_worker(ACL_FIBER *fiber, void *ctx)
static int __display = 0;
static void fiber_redis(ACL_FIBER *fiber, void *ctx)
static void fiber_result(ACL_FIBER *fiber, void *ctx)
{
MYCHANS *mychans = (MYCHANS *) ctx;
MYCHAN *mychan = &mychans->chans[mychans->off++];
ACL_CHANNEL *chan = mychan->chan;
MYCHANS *mychans = (MYCHANS *) ctx;
MYCHAN *mychan = &mychans->chans[mychans->off++];
ACL_CHANNEL *chan = mychan->chan;
PKT pkt;
if (mychans->off == mychans->size)
@ -227,7 +229,7 @@ static void fiber_redis(ACL_FIBER *fiber, void *ctx)
PKT* res = (PKT *) acl_channel_recvp(chan);
if (res == NULL)
{
printf("%s(%d): fiber-%d: acl_channel_recvp errork, key = %s\r\n",
printf("%s(%d): fiber-%d: acl_channel_recvp error, key = %s\r\n",
__FUNCTION__, __LINE__, acl_fiber_id(fiber),
pkt.key.c_str());
break;
@ -259,10 +261,28 @@ static void fiber_redis(ACL_FIBER *fiber, void *ctx)
if (--__fibers_count == 0)
{
printf("---All fibers are over!---\r\n");
acl_fiber_schedule_stop();
unsigned long n = 100;
acl_channel_sendul(__chan_exit, n);
}
}
static void fiber_wait(ACL_FIBER *, void *ctx)
{
ACL_CHANNEL *chan = (ACL_CHANNEL *) ctx;
unsigned long n = acl_channel_recvul(chan);
printf("----fiber-%d: get n: %lu---\r\n", acl_fiber_self(), n);
for (int i = 0; __workers[i] != NULL; i++)
{
printf("kill fiber-%d\r\n", acl_fiber_id(__workers[i]));
acl_fiber_kill(__workers[i]);
}
printf("---- fiber schedul stopping now ----\r\n");
acl_fiber_schedule_stop();
}
static void usage(const char *procname)
{
printf("usage: %s -h [help]\r\n"
@ -333,13 +353,20 @@ int main(int argc, char *argv[])
mychans.chans[i].cmd = cmd;
}
__workers = new ACL_FIBER*[nworkers + 1];
for (int i = 0; i < nworkers; i++)
acl_fiber_create(fiber_redis_worker, &mychans.chans[i], 32000);
__workers[i] =
acl_fiber_create(fiber_worker, &mychans.chans[i], 32000);
__fibers_count = __fibers_max;
__workers[nworkers] = NULL;
__fibers_count = nworkers;
for (int i = 0; i < __fibers_max; i++)
acl_fiber_create(fiber_redis, &mychans, 32000);
(void) acl_fiber_create(fiber_result, &mychans, 32000);
__chan_exit = acl_channel_create(sizeof(unsigned long), 1000);
acl_fiber_create(fiber_wait, __chan_exit, 32000);
acl_fiber_schedule();
@ -347,6 +374,9 @@ int main(int argc, char *argv[])
acl_channel_free(mychans.chans[i].chan);
delete [] mychans.chans;
acl_channel_free(__chan_exit);
delete [] __workers;
return 0;
}

View File

@ -0,0 +1,3 @@
#!/bin/sh
valgrind --tool=memcheck --leak-check=yes --leak-check=full --show-reachable=yes -v ./redis -c 100 -n 100 -w 10

View File

@ -20,10 +20,11 @@ static void io_timer(ACL_FIBER *fiber, void *ctx)
assert(fiber == ft->timer);
acl_fiber_set_errno(ft->fiber, ETIMEDOUT);
acl_fiber_keep_errno(ft->fiber, 1);
printf("timer-%d wakeup, set fiber-%d, errno: %d, %d\r\n",
acl_fiber_id(fiber), acl_fiber_id(ft->fiber),
ETIMEDOUT, acl_fiber_errno(ft->fiber));
// printf("timer-%d wakeup, set fiber-%d, errno: %d, %d\r\n",
// acl_fiber_id(fiber), acl_fiber_id(ft->fiber),
// ETIMEDOUT, acl_fiber_errno(ft->fiber));
acl_fiber_ready(ft->fiber);
}
@ -55,7 +56,10 @@ static void echo_client(ACL_FIBER *fiber, void *ctx)
break;
if (++ntimeout > 2)
{
printf("too many timeout: %d\r\n", ntimeout);
break;
}
printf("ntimeout: %d\r\n", ntimeout);
ft->timer = acl_fiber_create_timer(__rw_timeout * 1000,