add action thread

This commit is contained in:
xianjimli 2020-02-09 09:11:07 +08:00
parent 52d38ec7fc
commit 976105de99
8 changed files with 603 additions and 51 deletions

View File

@ -22,24 +22,44 @@
#include "tkc/mem.h"
#include "tkc/action_queue.h"
ret_t qaction_exec(qaction_t* action) {
return_value_if_fail(action != NULL && action->vt != NULL, RET_BAD_PARAMS);
qaction_t* qaction_init(qaction_t* action, qaction_exec_t exec, void* args, uint32_t args_size) {
return_value_if_fail(action != NULL & exec != NULL, NULL);
return_value_if_fail(args_size <= sizeof(action->args), NULL);
if (action->vt->exec != NULL) {
return action->vt->exec(action);
memset(action, 0x00, sizeof(qaction_t));
action->exec = exec;
if(args != NULL) {
memcpy(action->args, args, args_size);
}
return RET_NOT_IMPL;
return action;
}
ret_t qaction_destroy(qaction_t* action) {
return_value_if_fail(action != NULL && action->vt != NULL, RET_BAD_PARAMS);
ret_t qaction_set_on_event(qaction_t* action, qaction_on_event_t on_event, void* on_event_ctx) {
return_value_if_fail(action != NULL, RET_BAD_PARAMS);
if (action->vt->destroy != NULL) {
return action->vt->destroy(action);
action->on_event = on_event;
action->on_event_ctx = on_event_ctx;
return RET_OK;
}
ret_t qaction_notify(qaction_t* action, event_t* event) {
return_value_if_fail(action != NULL && event != NULL, RET_BAD_PARAMS);
if(action->on_event != NULL) {
action->on_event(action->on_event_ctx, event);
}
return RET_NOT_IMPL;
return RET_OK;
}
ret_t qaction_exec(qaction_t* action) {
return_value_if_fail(action != NULL && action->exec != NULL, RET_BAD_PARAMS);
return action->exec(action);
}
action_queue_t* action_queue_create(uint16_t capacity) {

View File

@ -22,7 +22,7 @@
#ifndef TK_ACTION_QUEUE_H
#define TK_ACTION_QUEUE_H
#include "tkc/types_def.h"
#include "tkc/event.h"
BEGIN_C_DECLS
@ -30,12 +30,7 @@ struct _qaction_t;
typedef struct _qaction_t qaction_t;
typedef ret_t (*qaction_exec_t)(qaction_t* action);
typedef ret_t (*qaction_destroy_t)(qaction_t* action);
typedef struct _qaction_vtable_t {
qaction_exec_t exec;
qaction_destroy_t destroy;
} qaction_vtable_t;
typedef ret_t (*qaction_on_event_t)(qaction_t* action, event_t* event);
/**
* @class qaction_t
@ -43,16 +38,70 @@ typedef struct _qaction_vtable_t {
*/
struct _qaction_t {
/**
* @property {uint32_t*} extra
* @property {void*} on_event_ctx
* @annotation ["readable"]
* (action而不同)
* on_event_ctx
*/
uint32_t extra[8];
void* on_event_ctx;
/**
* @property {qaction_on_event_t} on_event
* @annotation ["readable"]
*
*/
qaction_on_event_t on_event;
/**
* @property {qaction_exec_t} exec
* @annotation ["readable"]
*
*/
qaction_exec_t exec;
/*private*/
qaction_vtable_t* vt;
/**
* @property {uint32_t*} args
* @annotation ["readable"]
* exec的参数(action而不同)
*/
uint32_t args[8];
};
/**
* @method qaction_init
*
*
* @param {qaction_t*} action action对象
* @param {qaction_exec_t} exec
* @param {void*} args
* @param {uint32_t} args_size (sizeof(action->args))
*
* @return {qaction_t*} action对象
*/
qaction_t* qaction_init(qaction_t* action, qaction_exec_t exec, void* args, uint32_t args_size);
/**
* @method qaction_set_on_event
* (线)
*
* @param {qaction_t*} action action对象
* @param {qaction_on_event_t} on_event
* @param {void*} on_event_ctx
*
* @return {ret_t} RET_OK表示成功
*/
ret_t qaction_set_on_event(qaction_t* action, qaction_on_event_t on_event, void* on_event_ctx);
/**
* @method qaction_notify
*
*
* @param {qaction_t*} action action对象
* @param {event_t**} event event对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t qaction_notify(qaction_t* action, event_t* event);
/**
* @method qaction_exec
*
@ -63,16 +112,6 @@ struct _qaction_t {
*/
ret_t qaction_exec(qaction_t* action);
/**
* @method qaction_destroy
*
*
* @param {qaction_t*} action action对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t qaction_destroy(qaction_t* action);
typedef struct _action_queue_t {
uint16_t r;
uint16_t w;

112
src/tkc/action_thread.c Normal file
View File

@ -0,0 +1,112 @@
/**
* File: action_thread.c
* Author: AWTK Develop Team
* Brief: action_thread
*
* Copyright (c) 2020 - 2020 Guangzhou ZHIYUAN Electronics Co.,Ltd.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* License file for more details.
*
*/
/**
* History:
* ================================================================
* 2020-02-08 Li XianJing <xianjimli@hotmail.com> created
*
*/
#include "tkc/mem.h"
#include "tkc/action_thread.h"
#include "tkc/waitable_action_queue.h"
static void* action_thred_entry(void* args) {
qaction_t action;
action_thread_t* thread = (action_thread_t*)args;
memset(&action, 0x00, sizeof(action));
while(!(thread->quit)) {
while(waitable_action_queue_recv(thread->queue, &action, 10000) == RET_OK) {
if(qaction_exec(&action) == RET_QUIT) {
thread->quit = TRUE;
}
thread->executed_actions_nr++;
}
}
return NULL;
}
action_thread_t* action_thread_create(action_thread_pool_t* thread_pool) {
action_thread_t* thread = NULL;
return_value_if_fail(thread_pool != NULL, NULL);
thread = TKMEM_ZALLOC(action_thread_t);
return_value_if_fail(thread != NULL, NULL);
thread->thread_pool = thread_pool;
thread->thread = tk_thread_create(action_thred_entry, thread);
goto_error_if_fail(thread->thread != NULL);
thread->queue = waitable_action_queue_create(10);
goto_error_if_fail(thread->queue != NULL);
tk_thread_start(thread->thread);
return thread;
error:
if(thread->thread != NULL) {
tk_thread_destroy(thread->thread);
}
if(thread->queue != NULL) {
waitable_action_queue_destroy(thread->queue);
}
TKMEM_FREE(thread);
return NULL;
}
ret_t action_thread_exec(action_thread_t* thread, qaction_t* action) {
return_value_if_fail(thread != NULL && thread->queue != NULL, RET_BAD_PARAMS);
return_value_if_fail(action != NULL && action->exec != NULL, RET_BAD_PARAMS);
return waitable_action_queue_send(thread->queue, action, 1000);
}
ret_t action_thread_set_max_actions_nr(action_thread_t* thread, uint32_t max_actions_nr) {
return_value_if_fail(thread != NULL, RET_BAD_PARAMS);
thread->max_actions_nr = max_actions_nr;
return RET_OK;
}
static ret_t qaction_quit_exec(qaction_t* action) {
return_value_if_fail(action != NULL, RET_BAD_PARAMS);
return RET_QUIT;
}
static ret_t action_thread_quit(action_thread_t* thread) {
return_value_if_fail(thread != NULL, RET_BAD_PARAMS);
thread->quit = TRUE;
return RET_OK;
}
ret_t action_thread_destroy(action_thread_t* thread) {
return_value_if_fail(thread != NULL, RET_BAD_PARAMS);
action_thread_quit(thread);
tk_thread_join(thread->thread);
tk_thread_destroy(thread->thread);
waitable_action_queue_destroy(thread->queue);
return RET_OK;
}

130
src/tkc/action_thread.h Normal file
View File

@ -0,0 +1,130 @@
/**
* File: action_thread.h
* Author: AWTK Develop Team
* Brief: action_thread
*
* Copyright (c) 2020 - 2020 Guangzhou ZHIYUAN Electronics Co.,Ltd.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* License file for more details.
*
*/
/**
* History:
* ================================================================
* 2020-02-08 Li XianJing <xianjimli@hotmail.com> created
*
*/
#ifndef TK_ACTION_THREAD_H
#define TK_ACTION_THREAD_H
#include "tkc/thread.h"
#include "tkc/waitable_action_queue.h"
BEGIN_C_DECLS
struct _action_thread_pool_t;
typedef struct _action_thread_pool_t action_thread_pool_t;
/**
* @class action_thread_t
* action的线程
*
*> 线action queue
*/
typedef struct _action_thread_t {
/**
* @property {bool_t} running
* @annotation ["readable"]
*
*/
bool_t running;
/**
* @property {tk_thread_t*} thread
* @annotation ["readable"]
* 线
*/
tk_thread_t* thread;
/**
* @property {action_thread_pool_t*} thread_pool
* @annotation ["readable"]
* 线
*/
action_thread_pool_t* thread_pool;
/**
* @property {waitable_action_queue_t*} queue
* @annotation ["readable"]
* action queue
*/
waitable_action_queue_t* queue;
/**
* @property {uint32_t} max_actions_nr
* @annotation ["readable"]
* max_actions_nr个action后自动回收
*/
uint32_t max_actions_nr;
/**
* @property {uint32_t} executed_actions_nr
* @annotation ["readable"]
* action的个数
*/
uint32_t executed_actions_nr;
/*private*/
bool_t quit;
} action_thread_t;
/**
* @method action_thread_create
* @annotation ["constructor"]
* @param {action_thread_pool_t*} thread_pool thread_pool对象
* action_thread对象
*
* @return {action_thread_t*} action_thread对象
*/
action_thread_t* action_thread_create(action_thread_pool_t* thread_pool);
/**
* @method action_thread_exec
* 线action
*
* @param {action_thread_t*} thread action_thread对象
* @param {qaction_t*} action action对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t action_thread_exec(action_thread_t* thread, qaction_t* action);
/**
* @method action_thread_set_max_actions_nr
* max_actions_nr线max_actions_nr个action后
*
* @param {action_thread_t*} thread action_thread对象
* @param {uint32_t} max_actions_nr action个数
*
* @return {ret_t} RET_OK表示成功
*/
ret_t action_thread_set_max_actions_nr(action_thread_t* thread, uint32_t max_actions_nr);
/**
* @method action_thread_destroy
*
*
* @param {action_thread_t*} thread action_thread对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t action_thread_destroy(action_thread_t* thread);
END_C_DECLS
#endif /*TK_ACTION_THREAD_H*/

View File

@ -0,0 +1,147 @@
/**
* File: action_thread_pool.c
* Author: AWTK Develop Team
* Brief: action_thread_pool
*
* Copyright (c) 2020 - 2020 Guangzhou ZHIYUAN Electronics Co.,Ltd.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* License file for more details.
*
*/
/**
* History:
* ================================================================
* 2020-02-08 Li XianJing <xianjimli@hotmail.com> created
*
*/
#include "tkc/mem.h"
#include "tkc/action_thread_pool.h"
action_thread_pool_t* action_thread_pool_create(uint16_t capacity, uint16_t max_free_nr) {
action_thread_pool_t* thread_pool = NULL;
uint32_t size = sizeof(action_thread_pool_t) + sizeof(action_thread_t*) * capacity;
return_value_if_fail(capacity > 0 && max_free_nr > 0, NULL);
thread_pool = (action_thread_pool_t*)TKMEM_ALLOC(size);
return_value_if_fail(thread_pool != NULL, NULL);
thread_pool->capacity = capacity;
thread_pool->max_free_nr = max_free_nr;
thread_pool->mutex = tk_mutex_create();
if(thread_pool->mutex == NULL) {
TKMEM_FREE(thread_pool);
thread_pool = NULL;
}
return thread_pool;
}
ret_t action_thread_pool_exec(action_thread_pool_t* thread_pool, qaction_t* action) {
action_thread_t* thread = NULL;
return_value_if_fail(thread_pool != NULL && action != NULL, RET_BAD_PARAMS);
thread = action_thread_pool_get(thread_pool);
return_value_if_fail(thread != NULL, RET_BAD_PARAMS);
action_thread_set_max_actions_nr(thread, 1);
return action_thread_exec(thread, action);
}
ret_t action_thread_pool_put(action_thread_pool_t* thread_pool, action_thread_t* thread) {
uint32_t i = 0;
uint32_t free_nr = action_thread_pool_get_free_nr(thread_pool);
return_value_if_fail(thread_pool != NULL && thread != NULL, RET_BAD_PARAMS);
return_value_if_fail(thread->thread_pool == thread_pool, RET_BAD_PARAMS);
return_value_if_fail(tk_mutex_lock(thread_pool->mutex) == RET_OK, RET_BAD_PARAMS);
if(free_nr < thread_pool->max_free_nr) {
thread->running = FALSE;
thread->max_actions_nr = 0;
thread->executed_actions_nr = 0;
} else {
for(i = 0; i < thread_pool->capacity; i++) {
if(thread == thread_pool->threads[i]) {
thread_pool->threads[i] = NULL;
break;
}
}
action_thread_destroy(thread);
}
return tk_mutex_unlock(thread_pool->mutex);
}
uint32_t action_thread_pool_get_free_nr(action_thread_pool_t* thread_pool) {
uint32_t i = 0;
uint32_t n = 0;
action_thread_t* thread = NULL;
return_value_if_fail(thread_pool != NULL, 0);
return_value_if_fail(tk_mutex_lock(thread_pool->mutex) == RET_OK, 0);
for(i = 0; i < thread_pool->capacity; i++) {
thread = thread_pool->threads[i];
if(thread != NULL && !(thread->running)) {
n++;
}
}
tk_mutex_unlock(thread_pool->mutex);
return n;
}
action_thread_t* action_thread_pool_get(action_thread_pool_t* thread_pool) {
uint32_t i = 0;
action_thread_t* thread = NULL;
return_value_if_fail(thread_pool != NULL && thread != NULL, NULL);
return_value_if_fail(thread->thread_pool == thread_pool, NULL);
return_value_if_fail(tk_mutex_lock(thread_pool->mutex) == RET_OK, NULL);
for(i = 0; i < thread_pool->capacity; i++) {
thread = thread_pool->threads[i];
if(thread != NULL && !(thread->running)) {
thread->running = TRUE;
break;
}
}
if(thread == NULL) {
for(i = 0; i < thread_pool->capacity; i++) {
thread = thread_pool->threads[i];
if(thread == NULL) {
thread = action_thread_create(thread_pool);
thread->running = TRUE;
thread_pool->threads[i] = thread;
break;
}
}
}
tk_mutex_unlock(thread_pool->mutex);
return thread;
}
ret_t action_thread_pool_destroy(action_thread_pool_t* thread_pool) {
uint32_t i = 0;
action_thread_t* thread = NULL;
return_value_if_fail(thread_pool != NULL, RET_BAD_PARAMS);
for(i = 0; i < thread_pool->capacity; i++) {
thread = thread_pool->threads[i];
if(thread != NULL) {
action_thread_destroy(thread);
thread_pool->threads[i] = NULL;
}
}
tk_mutex_destroy(thread_pool->mutex);
memset(thread_pool, 0x00, sizeof(action_thread_pool_t));
TKMEM_FREE(thread_pool);
return RET_OK;
}

View File

@ -0,0 +1,119 @@
/**
* File: action_thread_pool.h
* Author: AWTK Develop Team
* Brief: action_thread_pool
*
* Copyright (c) 2020 - 2020 Guangzhou ZHIYUAN Electronics Co.,Ltd.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* License file for more details.
*
*/
/**
* History:
* ================================================================
* 2020-02-08 Li XianJing <xianjimli@hotmail.com> created
*
*/
#ifndef TK_ACTION_THREAD_POOL_H
#define TK_ACTION_THREAD_POOL_H
#include "tkc/action_thread.h"
BEGIN_C_DECLS
/**
* @class action_thread_pool_t
* 线
*/
typedef struct _action_thread_pool_t {
/**
* @property {uint32_t} capacity
* @annotation ["readable"]
*
*/
uint32_t capacity;
/**
* @property {uint32_t} max_free_nr
* @annotation ["readable"]
*
*/
uint32_t max_free_nr;;
/*private*/
tk_mutex_t* mutex;
action_thread_t* threads[1];
} action_thread_pool_t;
/**
* @method action_thread_pool_create
* @annotation ["constructor"]
* action_thread_pool对象
*
* @param {uint16_t} capacity 线
* @param {uint16_t} max_free_nr 线
*
* @return {action_thread_pool_t*} action_thread_pool对象
*/
action_thread_pool_t* action_thread_pool_create(uint16_t capacity, uint16_t max_free_nr);
/**
* @method action_thread_pool_exec
* 线线线action
*
* @param {action_thread_pool_t*} thread_pool action_thread_pool对象
* @param {qaction_t*} action action对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t action_thread_pool_exec(action_thread_pool_t* thread_pool, qaction_t* action);
/**
* @method action_thread_pool_put
* 线线
*
* @param {action_thread_pool_t*} thread_pool action_thread_pool对象
* @param {qthread_t*} thread thread对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t action_thread_pool_put(action_thread_pool_t* thread_pool, action_thread_t* thread);
/**
* @method action_thread_pool_get
* 线线
*
* @param {action_thread_pool_t*} thread_pool action_thread_pool对象
*
* @return {action_thread_t*} 线
*/
action_thread_t* action_thread_pool_get(action_thread_pool_t* thread_pool);
/**
* @method action_thread_pool_get_free_nr
* 线
*
* @param {action_thread_pool_t*} thread_pool action_thread_pool对象
*
* @return {uint32_t} 线
*/
uint32_t action_thread_pool_get_free_nr(action_thread_pool_t* thread_pool);
/**
* @method action_thread_pool_destroy
*
*
* @param {action_thread_pool_t*} thread_pool action_thread_pool对象
*
* @return {ret_t} RET_OK表示成功
*/
ret_t action_thread_pool_destroy(action_thread_pool_t* thread_pool);
END_C_DECLS
#endif /*TK_ACTION_THREAD_POOL_H*/

View File

@ -18,7 +18,7 @@ TEST(ActionQueue, basic) {
ASSERT_EQ(q->full, FALSE);
ASSERT_EQ(q->capacity, 10);
w.extra[0] = 1234;
w.args[0] = 1234;
ASSERT_EQ(action_queue_recv(q, &r), RET_FAIL);
ASSERT_EQ(action_queue_send(q, &w), RET_OK);
ASSERT_EQ(action_queue_recv(q, &r), RET_OK);
@ -26,7 +26,7 @@ TEST(ActionQueue, basic) {
ASSERT_EQ(action_queue_recv(q, &r), RET_FAIL);
for (i = 0; i < NR; i++) {
w.extra[0] = i;
w.args[0] = i;
ASSERT_EQ(action_queue_send(q, &w), RET_OK);
}
@ -35,7 +35,7 @@ TEST(ActionQueue, basic) {
for (i = 0; i < NR; i++) {
ASSERT_EQ(action_queue_recv(q, &r), RET_OK);
ASSERT_EQ(r.extra[0], i);
ASSERT_EQ(r.args[0], i);
}
ASSERT_EQ(action_queue_recv(q, &r), RET_FAIL);
ASSERT_EQ(q->full, FALSE);

View File

@ -13,20 +13,6 @@ static ret_t qaction_exec_dummy(qaction_t* req) {
return RET_OK;
}
static ret_t qaction_destroy_dummy(qaction_t* req) {
destroy_times++;
return RET_OK;
}
static qaction_vtable_t qvt = {qaction_exec_dummy, qaction_destroy_dummy};
static qaction_t* qaction_init(qaction_t* a) {
memset(a, 0x00, sizeof(qaction_t));
a->vt = &qvt;
return a;
}
static void* consumer(void* args) {
uint32_t n = 0;
qaction_t action;
@ -35,7 +21,6 @@ static void* consumer(void* args) {
while (waitable_action_queue_recv(q, &action, 3000) == RET_OK) {
n++;
qaction_exec(&action);
qaction_destroy(&action);
}
log_debug("consumer done\n");
@ -45,7 +30,7 @@ static void* consumer(void* args) {
static void* producer(void* args) {
uint32_t i = 0;
qaction_t action;
qaction_t* a = qaction_init(&action);
qaction_t* a = qaction_init(&action, qaction_exec_dummy, NULL, 0);
uint32_t id = tk_pointer_to_int(args);
log_debug("p=%u start\n", id);