improve buffered stream

This commit is contained in:
lixianjing 2019-10-16 09:57:53 +08:00
parent fa54adb6e9
commit e55f4548bc
5 changed files with 26 additions and 5 deletions

View File

@ -40,13 +40,23 @@ static int32_t tk_istream_buffered_read(tk_istream_t* stream, uint8_t* buff, uin
}
static ret_t tk_istream_buffered_set_prop(object_t* obj, const char* name, const value_t* v) {
return RET_NOT_FOUND;
tk_istream_buffered_t* istream_buffered = TK_ISTREAM_BUFFERED(obj);
tk_istream_t* real_istream = istream_buffered->real_istream;
return object_set_prop(OBJECT(real_istream), name, v);
}
static ret_t tk_istream_buffered_get_prop(object_t* obj, const char* name, value_t* v) {
tk_istream_buffered_t* istream_buffered = TK_ISTREAM_BUFFERED(obj);
tk_istream_t* real_istream = istream_buffered->real_istream;
if(tk_str_eq(name, TK_STREAM_PROP_HAS_BUFFERED_DATA)) {
ring_buffer_t* rb = istream_buffered->rb;
value_set_bool(v, !ring_buffer_is_empty(rb));
return RET_OK;
}
return object_get_prop(OBJECT(real_istream), name, v);
}
@ -54,7 +64,7 @@ static ret_t tk_istream_buffered_on_destroy(object_t* obj) {
tk_istream_buffered_t* istream_buffered = TK_ISTREAM_BUFFERED(obj);
ring_buffer_destroy(istream_buffered->rb);
object_unref(OBJECT(istream_buffered->real_istream));
OBJECT_UNREF(istream_buffered->real_istream);
return RET_OK;
}
@ -80,6 +90,7 @@ tk_istream_t* tk_istream_buffered_create(tk_istream_t* real_istream, uint32_t bu
istream_buffered = TK_ISTREAM_BUFFERED(obj);
return_value_if_fail(istream_buffered != NULL, NULL);
OBJECT_REF(real_istream);
istream_buffered->rb = rb;
istream_buffered->real_istream = real_istream;
TK_ISTREAM(obj)->read = tk_istream_buffered_read;

View File

@ -58,7 +58,10 @@ static ret_t tk_ostream_buffered_flush(tk_ostream_t* stream) {
}
static ret_t tk_ostream_buffered_set_prop(object_t* obj, const char* name, const value_t* v) {
return RET_NOT_FOUND;
tk_ostream_buffered_t* ostream_buffered = TK_OSTREAM_BUFFERED(obj);
tk_ostream_t* real_ostream = ostream_buffered->real_ostream;
return object_set_prop(OBJECT(real_ostream), name, v);
}
static ret_t tk_ostream_buffered_get_prop(object_t* obj, const char* name, value_t* v) {
@ -72,7 +75,7 @@ static ret_t tk_ostream_buffered_on_destroy(object_t* obj) {
tk_ostream_buffered_t* ostream_buffered = TK_OSTREAM_BUFFERED(obj);
wbuffer_deinit(&(ostream_buffered->wb));
object_unref(OBJECT(ostream_buffered->real_ostream));
OBJECT_UNREF(ostream_buffered->real_ostream);
return RET_OK;
}
@ -94,9 +97,9 @@ tk_ostream_t* tk_ostream_buffered_create(tk_ostream_t* real_ostream) {
ostream_buffered = TK_OSTREAM_BUFFERED(obj);
return_value_if_fail(ostream_buffered != NULL, NULL);
OBJECT_REF(ostream_buffered->real_ostream);
ostream_buffered->real_ostream = real_ostream;
wbuffer_init_extendable(&(ostream_buffered->wb));
object_ref(OBJECT(ostream_buffered->real_ostream));
TK_OSTREAM(obj)->write = tk_ostream_buffered_write;
TK_OSTREAM(obj)->flush = tk_ostream_buffered_flush;

View File

@ -554,6 +554,8 @@ float_t object_get_prop_float_by_path(object_t* obj, const char* path, float_t d
#define OBJECT(obj) ((object_t*)(obj))
#define OBJECT_REF(obj) object_ref((object_t*)(obj))
#define OBJECT_UNREF(obj) \
if ((obj) != NULL) { \
object_unref((object_t*)(obj)); \

View File

@ -29,6 +29,7 @@ BEGIN_C_DECLS
#define TK_STREAM_PROP_IS_EOS "is_eos"
#define TK_STREAM_PROP_TIMEOUT "timeout"
#define TK_STREAM_PROP_RETRY_TIMES "retry_times"
#define TK_STREAM_PROP_HAS_BUFFERED_DATA "has_buffered_data"
#define TK_STREAM_PROP_COMPRESS_THRESHOLD "compress_threshold"
#define TK_STREAM_CMD_IFLUSH "iflush"

View File

@ -11,6 +11,10 @@ TEST(IStreamBuffered, basic) {
tk_istream_t* is = tk_istream_buffered_create(mem, 32);
for (i = 0; i < sizeof(str); i++) {
if(i != 0) {
ASSERT_EQ(object_get_prop_bool(OBJECT(is), TK_STREAM_PROP_HAS_BUFFERED_DATA, FALSE), TRUE);
}
ASSERT_EQ(tk_istream_read(is, (uint8_t*)&c, 1), 1);
ASSERT_EQ(c == str[i], true);
}