add tbox_array.hpp and tbox sample.

This commit is contained in:
zsx 2019-03-13 10:28:51 +08:00
parent a5fd4abdb9
commit 0dfd2d45c7
7 changed files with 332 additions and 6 deletions

View File

@ -29,6 +29,7 @@
#include "stdlib/thread_pool.hpp" #include "stdlib/thread_pool.hpp"
#include "stdlib/thread_queue.hpp" #include "stdlib/thread_queue.hpp"
#include "stdlib/tbox.hpp" #include "stdlib/tbox.hpp"
#include "stdlib/tbox_array.hpp"
#include "stdlib/thread_mutex.hpp" #include "stdlib/thread_mutex.hpp"
#include "stdlib/thread_cond.hpp" #include "stdlib/thread_cond.hpp"
#include "stdlib/scan_dir.hpp" #include "stdlib/scan_dir.hpp"

View File

@ -75,21 +75,34 @@ public:
/** /**
* *
* @param t {T*} * @param t {T*}
* @param notify_first {bool} true
*
* @return {bool} * @return {bool}
*/ */
bool push(T* t) bool push(T* t, bool notify_first = true)
{ {
if (lock_.lock() == false) { if (lock_.lock() == false) {
abort(); abort();
} }
tbox_.push_back(t); tbox_.push_back(t);
size_++; size_++;
if (lock_.unlock() == false) {
abort(); if (notify_first) {
} if (cond_.notify() == false) {
if (cond_.notify() == false) { abort();
abort(); }
if (lock_.unlock() == false) {
abort();
}
} else {
if (lock_.unlock() == false) {
abort();
}
if (cond_.notify() == false) {
abort();
}
} }
return true; return true;
} }

View File

@ -0,0 +1,234 @@
#pragma once
#include "../acl_cpp_define.hpp"
#include <list>
#include <stdlib.h>
#include <string.h>
#include "thread_mutex.hpp"
#include "thread_cond.hpp"
#include "noncopyable.hpp"
namespace acl
{
/**
* 线线线
*
*
*
* class myobj
* {
* public:
* myobj(void) {}
* ~myobj(void) {}
*
* void test(void) { printf("hello world\r\n"); }
* };
*
* acl::tbox_array<myobj> tbox;
*
* void thread_producer(void)
* {
* myobj* o = new myobj;
* tbox.push(o);
* }
*
* void thread_consumer(void)
* {
* myobj* o = tbox.pop();
* o->test();
* delete o;
* }
*/
template<typename T>
class tbox_array : public noncopyable
{
public:
/**
*
* @param free_obj {bool} tbox_array
*
*/
tbox_array(bool free_obj = true)
: capacity_(10000)
, off_curr_(0)
, off_next_(0)
, waiters_(0)
, free_obj_(free_obj)
, cond_(&lock_)
{
array_ = (T**) malloc(sizeof(T*) * capacity_);
}
~tbox_array(void)
{
clear(free_obj_);
free(array_);
}
/**
*
* @param free_obj {bool} delete
*/
void clear(bool free_obj = false)
{
if (free_obj) {
for (size_t i = off_curr_; i < off_next_; i++) {
delete array_[i];
}
}
}
/**
*
* @param t {T*}
* @param notify_first {bool} true
*
* @return {bool}
*/
bool push(T* t, bool notify_first = false)
{
if (lock_.lock() == false) {
abort();
}
if (off_next_ == capacity_) {
if (off_curr_ >= 10000) {
#if 1
size_t n = 0;
for (size_t i = off_curr_; i < off_next_; i++) {
array_[n++] = array_[i];
}
#else
memmove(array_, array_ + off_curr_,
(off_next_ - off_curr_) * sizeof(T*));
#endif
off_next_ -= off_curr_;
off_curr_ = 0;
} else {
capacity_ += 10000;
array_ = (T**) realloc(array_, sizeof(T*) * capacity_);
}
}
array_[off_next_++] = t;
if (notify_first) {
if (cond_.notify() == false) {
abort();
}
if (lock_.unlock() == false) {
abort();
}
} else {
if (lock_.unlock() == false) {
abort();
}
if (cond_.notify() == false) {
abort();
}
}
return true;
}
/**
*
* @param wait_ms {int} >= 0 ()
*
* @param found {bool*}
*
* @return {T*} NULL NULL
* push NULL NULL
* wait_ms
* -1 NULL wait_ms
* 0 NULL found true false
*
*/
T* pop(int wait_ms = -1, bool* found = NULL)
{
long long n = ((long long) wait_ms) * 1000;
bool found_flag;
if (lock_.lock() == false) {
abort();
}
while (true) {
T* t = peek(found_flag);
if (found_flag) {
if (lock_.unlock() == false) {
abort();
}
if (found) {
*found = found_flag;
}
return t;
}
// 注意调用顺序,必须先调用 wait 再判断 wait_ms
waiters_++;
if (!cond_.wait(n, true) && wait_ms >= 0) {
waiters_--;
if (lock_.unlock() == false) {
abort();
}
if (found) {
*found = false;
}
return NULL;
}
waiters_--;
}
}
/**
*
* @return {size_t}
*/
size_t size(void) const
{
return off_next_ - off_curr_;
}
public:
void lock(void)
{
if (lock_.lock() == false) {
abort();
}
}
void unlock(void)
{
if (lock_.unlock() == false) {
abort();
}
}
private:
T** array_;
size_t capacity_;
size_t off_curr_;
size_t off_next_;
size_t waiters_;
bool free_obj_;
thread_mutex lock_;
thread_cond cond_;
T* peek(bool& found_flag)
{
if (off_curr_ == off_next_) {
found_flag = false;
if (off_curr_ > 0) {
off_curr_ = off_next_ = 0;
}
return NULL;
}
found_flag = true;
T* t = array_[off_curr_++];
return t;
}
};
} // namespace acl

View File

@ -0,0 +1,2 @@
include ../Makefile.in
PROG = tbox

View File

@ -0,0 +1,8 @@
// stdafx.cpp : 只包括标准包含文件的源文件
// master_threads.pch 将成为预编译头
// stdafx.obj 将包含预编译类型信息
#include "stdafx.h"
// TODO: 在 STDAFX.H 中
//引用任何所需的附加头文件,而不是在此文件中引用

View File

@ -0,0 +1,19 @@
// stdafx.h : 标准系统包含文件的包含文件,
// 或是常用但不常更改的项目特定的包含文件
//
#pragma once
//#include <iostream>
//#include <tchar.h>
// TODO: 在此处引用程序要求的附加头文件
#include "acl_cpp/lib_acl.hpp"
#include "acl_cpp/stdlib/tbox_array.hpp"
#ifdef WIN32
#define snprintf _snprintf
#endif

View File

@ -0,0 +1,49 @@
#include "stdafx.h"
#if 1
#define TBOX acl::tbox_array
#else
#define TBOX acl::tbox
#endif
class producer : public acl::thread
{
public:
producer(TBOX<int>& box, int max) : box_(box), max_(max) {}
~producer(void) {}
protected:
void* run(void)
{
for (int i = 0; i < max_; i++) {
int* n = new int;
*n = i;
box_.push(n, true);
}
return NULL;
}
private:
TBOX<int>& box_;
int max_;
};
int main(void)
{
int max = 50000000;
TBOX<int> box;
producer thr(box, max);
thr.start();
for (int i = 0; i < max; i++) {
int* n = box.pop();
assert(*n == i);
delete n;
}
printf("All over, max=%d\r\n", max);
thr.wait();
return 0;
}