#include "dict_db.h" #include "lib_acl.h" #include #include "dict.h" static int __add = 0, __search = 0; static acl_pthread_pool_t *__thrpool = NULL; static acl_pthread_mutex_t __lock; static int __thrcnt = 32, __begin = 0, __max = 10000, __quit = 0, __write_ok = 0; static int __nwrite = 0, __nread = 0; static int __report_base = 10000; static char __dict_name[256], __key_pre[] = "key5"; static DB_ENV *__db_env; #define LOCK acl_pthread_mutex_lock(&__lock) #define UNLOCK acl_pthread_mutex_unlock(&__lock) static void *deadlock_check_thread(void* arg acl_unused) { int ret; if (__db_env->get_lk_max_lockers(__db_env, &ret) == 0) printf(">>>max lockers=%d\r\n", ret); else printf(">>>can't get max lockers\r\n"); sleep(1); while (!__quit) { (void) __db_env->lock_detect(__db_env, 0, DB_LOCK_YOUNGEST, NULL); sleep(1); } return (NULL); } static void *trickle_check_thread(void *arg acl_unused) { int wrote; sleep(1); while (!__quit) { if (__write_ok) break; (void) __db_env->memp_trickle(__db_env, 100, &wrote); printf("trickle: wrote %d\n", wrote); sleep(1); } return (NULL); } static void create_check_thread(void) { acl_pthread_attr_t attr; acl_pthread_t tid; return; acl_pthread_attr_init(&attr); acl_pthread_attr_setdetachstate(&attr, 1); acl_pthread_create(&tid, &attr, deadlock_check_thread, NULL); if (__add) acl_pthread_create(&tid, &attr, trickle_check_thread, NULL); } static void init(const char *dict_name) { int open_flags = O_CREAT | O_RDWR; int dict_flags = /* DICT_FLAG_LOCK | */ DICT_FLAG_DUP_REPLACE; DICT *dict; acl_init(); acl_pthread_mutex_init(&__lock, NULL); __thrpool = acl_thread_pool_create(__thrcnt, 5); assert(__thrpool); dict_init(); dict_open_init(); snprintf(__dict_name, sizeof(__dict_name), "%s", dict_name); dict = dict_open(dict_name, open_flags, dict_flags); assert(dict); dict_register(dict_name, dict); __db_env = dict_db_env(dict); create_check_thread(); } static void end(const char *dict_name) { dict_unregister(dict_name); } static void thread_write_fn(void *arg) { char *id = (char*) arg; char *dict_name = __dict_name; char key[256], value[256]; time_t begin, last, now; int i; time(&begin); last = begin; for (i = __begin; i < __begin + __max; i++) { snprintf(key, sizeof(key), "%s:%s:%d", __key_pre, id, i); snprintf(value, sizeof(value), "value2value2value2value2value2value2value2value2:%d:%d", (unsigned) acl_pthread_self(), i); dict_update(dict_name, key, value); if (i > 0 && i % __report_base == 0) { time(&now); printf("thread %u add one, i=%d, time=%ld\r\n", (unsigned) acl_pthread_self(), i, now - last); last = now; } LOCK; __nwrite++; UNLOCK; } acl_myfree(id); __write_ok = 1; printf("thread %u add over, i=%d, time=%ld\r\n", (unsigned) acl_pthread_self(), i, time(NULL) - begin); } static void thread_read_fn(void *arg) { char *id = (char*) arg; char *dict_name = __dict_name; char key[256], *value; size_t value_size; time_t begin, last, now; int i; time(&begin); last = begin; for (i = __begin; i < __begin + __max; i++) { snprintf(key, sizeof(key), "%s:%s:%i", __key_pre, id, i); if (dict_lookup(dict_name, key, &value, &value_size) == NULL) { acl_vstream_printf("%s: %s\n", key, dict_errno == DICT_ERR_RETRY ? "soft error" : "not found"); } else { if (i > 0 && i % __report_base == 0) { time(&now); printf(">>%s=%s, time=%ld\r\n", key, value, now - last); last = now; } free(value); LOCK; __nread++; UNLOCK; } } printf("thread %u read over, i=%d, time=%ld\r\n", (unsigned) acl_pthread_self(), i, time(NULL) - begin); acl_myfree(id); } static void run(void) { int i; char *id; if (__add) { acl_vstream_printf("start write thread...\r\n"); for (i = 0; i < __thrcnt; i++) { id = acl_mymalloc(10); sprintf(id, "%d", i); acl_pthread_pool_add(__thrpool, thread_write_fn, id); } while (1) { i = acl_pthread_pool_size(__thrpool); if (i == 0) break; printf("> current threads in thread pool is: %d, nwrite=%d\r\n", i, __nwrite); sleep(1); } acl_vstream_printf("write threads exit now\r\n"); sleep(2); printf("begin to sync all\r\n"); __db_env->memp_sync(__db_env, NULL); } if (__search) { acl_vstream_printf("start read thread...\r\n"); for (i = 0; i < __thrcnt; i++) { id = acl_mymalloc(10); sprintf(id, "%d", i); acl_pthread_pool_add(__thrpool, thread_read_fn, id); } while (1) { i = acl_pthread_pool_size(__thrpool); if (i == 0) break; printf("> current threads in thread pool is: %d, nread=%d\r\n", i, __nread); sleep(1); } acl_vstream_printf("read threads exit now\r\n"); } } static void usage(const char *procname) { printf("usage: %s -h [help] -o oper [add|read|rw|test] -f from -n count\r\n", procname); } int main(int argc, char *argv[]) { const char *dict_name = "btree:test"; char ch, oper[32]; oper[0] = 0; while ((ch = getopt(argc, argv, "ho:n:f:")) > 0) { switch (ch) { case 'h': usage(argv[0]); exit (0); case 'o': ACL_SAFE_STRNCPY(oper, optarg, sizeof(oper)); break; case 'n': __max = atoi(optarg); if (__max < 0) __max = 10000; break; case 'f': __begin = atoi(optarg); if (__begin < 0) __begin = 0; break; default: usage(argv[0]); exit (0); } } if (strcasecmp(oper, "test") == 0) { dict_test_main(argc, argv); return (0); } else if (strcasecmp(oper, "add") == 0) { __add = 1; } else if (strcasecmp(oper, "read") == 0) { __search = 1; } else if (strcasecmp(oper, "rw") == 0) { __add = 1; __search = 1; } else { usage(argv[0]); exit (0); } init(dict_name); run(); __quit = 1; end(dict_name); printf("input any key to exit now\r\n"); getchar(); return (0); }