improve streams

This commit is contained in:
lixianjing 2019-10-16 22:43:13 -07:00
parent 9f4923e6f2
commit d9cee378a9
13 changed files with 138 additions and 36 deletions

View File

@ -40,6 +40,18 @@ static int32_t tk_istream_buffered_read(tk_istream_t* stream, uint8_t* buff, uin
return ring_buffer_read(rb, buff, max_size);
}
static ret_t tk_istream_buffered_wait_for_data(tk_istream_t* stream, uint32_t timeout_ms) {
tk_istream_buffered_t* istream_buffered = TK_ISTREAM_BUFFERED(stream);
tk_istream_t* real_istream = istream_buffered->real_istream;
ring_buffer_t* rb = istream_buffered->rb;
if (!ring_buffer_is_empty(rb)) {
return RET_OK;
} else {
return tk_istream_wait_for_data(real_istream, timeout_ms);
}
}
static ret_t tk_istream_buffered_set_prop(object_t* obj, const char* name, const value_t* v) {
tk_istream_buffered_t* istream_buffered = TK_ISTREAM_BUFFERED(obj);
tk_istream_t* real_istream = istream_buffered->real_istream;
@ -95,6 +107,7 @@ tk_istream_t* tk_istream_buffered_create(tk_istream_t* real_istream, uint32_t bu
istream_buffered->rb = rb;
istream_buffered->real_istream = real_istream;
TK_ISTREAM(obj)->read = tk_istream_buffered_read;
TK_ISTREAM(obj)->wait_for_data = tk_istream_buffered_wait_for_data;
return TK_ISTREAM(obj);
}

View File

@ -28,6 +28,15 @@ static int32_t tk_istream_file_read(tk_istream_t* stream, uint8_t* buff, uint32_
return fs_file_read(istream_file->file, buff, max_size);
}
static ret_t tk_istream_file_wait_for_data(tk_istream_t* stream, uint32_t timeout_ms) {
tk_istream_file_t* istream_file = TK_ISTREAM_FILE(stream);
if(!fs_file_eof(istream_file->file)) {
return RET_OK;
} else {
return RET_EOS;
}
}
static ret_t tk_istream_file_seek(tk_istream_t* stream, uint32_t offset) {
tk_istream_file_t* istream_file = TK_ISTREAM_FILE(stream);
@ -85,6 +94,7 @@ tk_istream_t* tk_istream_file_create(const char* filename) {
istream_file->file = file;
TK_ISTREAM(obj)->read = tk_istream_file_read;
TK_ISTREAM(obj)->seek = tk_istream_file_seek;
TK_ISTREAM(obj)->wait_for_data = tk_istream_file_wait_for_data;
return TK_ISTREAM(obj);
}

View File

@ -31,9 +31,9 @@ static int32_t tk_istream_tcp_read(tk_istream_t* stream, uint8_t* buff, uint32_t
int32_t ret = 0;
tk_istream_tcp_t* istream_tcp = TK_ISTREAM_TCP(stream);
if (socket_wait_for_data(istream_tcp->sock, 100) == RET_OK) {
ret = recv(istream_tcp->sock, buff, max_size, 0);
if (ret <= 0) {
ret = recv(istream_tcp->sock, buff, max_size, 0);
if(ret <= 0) {
if (errno != EAGAIN) {
perror("recv");
istream_tcp->is_broken = TRUE;
}
@ -42,6 +42,12 @@ static int32_t tk_istream_tcp_read(tk_istream_t* stream, uint8_t* buff, uint32_t
return ret;
}
static ret_t tk_istream_tcp_wait_for_data(tk_istream_t* stream, uint32_t timeout_ms) {
tk_istream_tcp_t* istream_tcp = TK_ISTREAM_TCP(stream);
return socket_wait_for_data(istream_tcp->sock, timeout_ms);
}
static ret_t tk_istream_tcp_get_prop(object_t* obj, const char* name, value_t* v) {
tk_istream_tcp_t* istream_tcp = TK_ISTREAM_TCP(obj);
if (tk_str_eq(name, TK_STREAM_PROP_FD)) {
@ -72,6 +78,7 @@ tk_istream_t* tk_istream_tcp_create(int sock) {
istream_tcp->sock = sock;
TK_ISTREAM(obj)->read = tk_istream_tcp_read;
TK_ISTREAM(obj)->wait_for_data = tk_istream_tcp_wait_for_data;
return TK_ISTREAM(obj);
}

View File

@ -45,6 +45,12 @@ static int32_t tk_istream_udp_read(tk_istream_t* stream, uint8_t* buff, uint32_t
return ret;
}
static ret_t tk_istream_udp_wait_for_data(tk_istream_t* stream, uint32_t timeout_ms) {
tk_istream_udp_t* istream_udp = TK_ISTREAM_UDP(stream);
return socket_wait_for_data(istream_udp->sock, timeout_ms);
}
static ret_t tk_istream_udp_get_prop(object_t* obj, const char* name, value_t* v) {
tk_istream_udp_t* istream_udp = TK_ISTREAM_UDP(obj);
if (tk_str_eq(name, TK_STREAM_PROP_FD)) {
@ -75,6 +81,7 @@ tk_istream_t* tk_istream_udp_create(int sock) {
istream_udp->sock = sock;
TK_ISTREAM(obj)->read = tk_istream_udp_read;
TK_ISTREAM(obj)->wait_for_data = tk_istream_udp_wait_for_data;
return TK_ISTREAM(obj);
}

View File

@ -44,6 +44,15 @@ static int32_t tk_istream_mem_read(tk_istream_t* stream, uint8_t* buff, uint32_t
return size;
}
static ret_t tk_istream_mem_wait_for_data(tk_istream_t* stream, uint32_t timeout_ms) {
tk_istream_mem_t* istream_mem = TK_ISTREAM_MEM(stream);
if(istream_mem->cursor < istream_mem->size) {
return RET_OK;
} else {
return RET_EOS;
}
}
static ret_t tk_istream_mem_seek(tk_istream_t* stream, uint32_t offset) {
tk_istream_mem_t* istream_mem = TK_ISTREAM_MEM(stream);
return_value_if_fail(offset <= istream_mem->size, RET_BAD_PARAMS);
@ -102,6 +111,7 @@ tk_istream_t* tk_istream_mem_create(uint8_t* buff, uint32_t size, uint32_t packe
istream_mem->packet_size = packet_size;
TK_ISTREAM(obj)->read = tk_istream_mem_read;
TK_ISTREAM(obj)->seek = tk_istream_mem_seek;
TK_ISTREAM(obj)->wait_for_data = tk_istream_mem_wait_for_data;
return TK_ISTREAM(obj);
}

View File

@ -43,7 +43,7 @@ static ret_t tk_istream_shdlc_send_ack(tk_istream_t* stream, bool_t ok, uint8_t
: RET_IO;
}
ret_t tk_istream_shdlc_read_frame(tk_istream_t* stream, wbuffer_t* wb) {
ret_t tk_istream_shdlc_read_frame(tk_istream_t* stream, wbuffer_t* wb, bool_t expect_data) {
ret_t ret = RET_OK;
shdlc_header_t header = {0};
tk_istream_shdlc_t* istream_shdlc = TK_ISTREAM_SHDLC(stream);
@ -53,16 +53,26 @@ ret_t tk_istream_shdlc_read_frame(tk_istream_t* stream, wbuffer_t* wb) {
uint32_t timeout = istream_shdlc->timeout;
tk_istream_t* real_istream = istream_shdlc->iostream->real_istream;
if(!object_get_prop_bool(OBJECT(real_istream), TK_STREAM_PROP_IS_OK, TRUE)) {
return RET_IO;
}
for (retry_times = 0; retry_times < istream_shdlc->retry_times; retry_times++) {
if(!object_get_prop_bool(OBJECT(real_istream), TK_STREAM_PROP_IS_OK, TRUE)) {
return RET_IO;
}
ret = shdlc_read_data(real_istream, wb, timeout);
return_value_if_fail(ret != RET_IO, RET_IO);
header.data = wb->data[0];
header.data = wb->data != NULL ? wb->data[0] : 0;
seqno = header.s.seqno;
if (ret == RET_CRC) {
log_debug("retry_times=%u\n", retry_times);
return_value_if_fail(tk_istream_shdlc_send_ack(stream, FALSE, seqno) == RET_OK, RET_IO);
if (ret == RET_CRC || ret == RET_TIMEOUT) {
log_debug("retry_times=%u\n", (retry_times+1));
if(expect_data) {
return_value_if_fail(tk_istream_shdlc_send_ack(stream, FALSE, seqno) == RET_OK, RET_IO);
}
continue;
} else if (ret == RET_OK) {
if (header.s.type == SHDLC_DATA) {
@ -91,6 +101,8 @@ static ret_t tk_istream_shdlc_save_data_frame(tk_istream_t* stream, wbuffer_t* w
compressor_t* c = istream_shdlc->compressor;
wbuffer_t* wb_c = &(istream_shdlc->wb_compress);
return_value_if_fail(compressor_uncompress(c, data, size, wb_c) == RET_OK, 0);
log_debug("compressed data: %u => %u\n", size, wb_c->cursor);
data = wb_c->data;
size = wb_c->cursor;
}
@ -111,7 +123,7 @@ ret_t tk_istream_shdlc_read_ack(tk_istream_t* stream, uint8_t seqno) {
wbuffer_t* wb = &(istream_shdlc->wb);
do {
ret = tk_istream_shdlc_read_frame(stream, wb);
ret = tk_istream_shdlc_read_frame(stream, wb, FALSE);
return_value_if_fail(ret == RET_OK, ret);
header.data = wb->data[0];
@ -134,12 +146,24 @@ static int32_t tk_istream_shdlc_read(tk_istream_t* stream, uint8_t* buff, uint32
return ring_buffer_read(rb, buff, max_size);
}
return_value_if_fail(tk_istream_shdlc_read_frame(stream, wb) == RET_OK, 0);
return_value_if_fail(tk_istream_shdlc_read_frame(stream, wb, TRUE) == RET_OK, 0);
return_value_if_fail(tk_istream_shdlc_save_data_frame(stream, wb) == RET_OK, 0);
return ring_buffer_read(rb, buff, max_size);
}
static ret_t tk_istream_shdlc_wait_for_data(tk_istream_t* stream, uint32_t timeout_ms) {
tk_istream_shdlc_t* istream_shdlc = TK_ISTREAM_SHDLC(stream);
tk_istream_t* real_istream = tk_iostream_get_istream(istream_shdlc->iostream->real_iostream);
ring_buffer_t* rb = istream_shdlc->rb;
if (!ring_buffer_is_empty(rb)) {
return RET_OK;
} else {
return tk_istream_wait_for_data(real_istream, timeout_ms);
}
}
static ret_t tk_istream_shdlc_set_prop(object_t* obj, const char* name, const value_t* v) {
tk_istream_shdlc_t* istream_shdlc = TK_ISTREAM_SHDLC(obj);
tk_istream_t* real_istream = tk_iostream_get_istream(istream_shdlc->iostream->real_iostream);
@ -208,6 +232,7 @@ tk_istream_t* tk_istream_shdlc_create(tk_iostream_shdlc_t* iostream) {
istream_shdlc->iostream = iostream;
istream_shdlc->compressor = compressor_miniz_create(COMPRESSOR_RATIO_FIRST);
TK_ISTREAM(obj)->read = tk_istream_shdlc_read;
TK_ISTREAM(obj)->wait_for_data = tk_istream_shdlc_wait_for_data;
return TK_ISTREAM(obj);
}

View File

@ -22,6 +22,7 @@
#include "tkc/mem.h"
#include "compressors/compressor_miniz.h"
#include "streams/shdlc/shdlc_helper.h"
#include "streams/shdlc/istream_shdlc.h"
#include "streams/shdlc/ostream_shdlc.h"
static int32_t tk_ostream_shdlc_write(tk_ostream_t* stream, const uint8_t* buff, uint32_t size) {
@ -44,6 +45,10 @@ static int32_t tk_ostream_shdlc_write(tk_ostream_t* stream, const uint8_t* buff,
}
while (retry_times < ostream_shdlc->retry_times) {
if(!object_get_prop_bool(OBJECT(real_ostream), TK_STREAM_PROP_IS_OK, TRUE)) {
return RET_IO;
}
return_value_if_fail(
tk_ostream_write_len(real_ostream, wb->data, wb->cursor, timeout) == wb->cursor, RET_IO);

View File

@ -1,7 +1,7 @@
/**
* File: shdlc_helper.h
* File: shdlc_helper.c
* Author: AWTK Develop Team
* Brief: iostream for shdlc
* Brief: shdlc helper functions.
*
* Copyright (c) 2019 - 2019 Guangzhou ZHIYUAN Electronics Co.,Ltd.
*
@ -33,6 +33,7 @@ static inline uint8_t shdlc_unescape(uint8_t c) {
static ret_t shdlc_write_uint8(wbuffer_t* wb, uint8_t c) {
if (c == SHDLC_FLAG || c == SHDLC_ESCAPE) {
return_value_if_fail(wbuffer_write_uint8(wb, SHDLC_ESCAPE) == RET_OK, RET_FAIL);
c = shdlc_escape(c);
}
@ -112,7 +113,7 @@ ret_t shdlc_read_data(tk_istream_t* istream, wbuffer_t* wb, uint32_t timeout) {
return_value_if_fail(istream != NULL && wb != NULL, RET_BAD_PARAMS);
do {
return_value_if_fail(tk_istream_read_len(istream, &c, 1, timeout) == 1, RET_IO);
return_value_if_fail(tk_istream_read_len(istream, &c, 1, timeout) == 1, RET_TIMEOUT);
if (c == SHDLC_FLAG) {
break;
}
@ -121,13 +122,13 @@ ret_t shdlc_read_data(tk_istream_t* istream, wbuffer_t* wb, uint32_t timeout) {
} while (1);
if (is_broken_frame) {
wbuffer_write_uint8(wb, 0x00);
log_debug("meet broken frame\n");
return RET_CRC;
}
/*now the data is in buffered stream in normal case*/
timeout = tk_min(100, timeout);
do {
return_value_if_fail(tk_istream_read_len(istream, &c, 1, timeout) == 1, RET_IO);
return_value_if_fail(tk_istream_read_len(istream, &c, 1, timeout) == 1, RET_TIMEOUT);
if (c != SHDLC_FLAG) {
break;
}
@ -139,7 +140,7 @@ ret_t shdlc_read_data(tk_istream_t* istream, wbuffer_t* wb, uint32_t timeout) {
return_value_if_fail(wbuffer_write_uint8(wb, c) == RET_OK, RET_OOM);
do {
return_value_if_fail(tk_istream_read_len(istream, &c, 1, timeout) == 1, RET_IO);
return_value_if_fail(tk_istream_read_len(istream, &c, 1, timeout) == 1, RET_TIMEOUT);
if (c == SHDLC_FLAG) {
uint8_t fcs_low = 0;
@ -172,3 +173,4 @@ ret_t shdlc_read_data(tk_istream_t* istream, wbuffer_t* wb, uint32_t timeout) {
uint8_t shdlc_seqno_inc(uint8_t seqno) {
return (seqno + 1) & 0x07;
}

View File

@ -12,6 +12,8 @@ void do_recv(int port) {
uint8_t buff[1024];
int slisten = tcp_listen(port);
log_debug("listen: %d\n", port);
return_if_fail(slisten > 0);
do {
int ret = 0;
int sock = tcp_accept(slisten);
@ -30,6 +32,11 @@ void do_recv(int port) {
} else {
break;
}
if(!object_get_prop_bool(OBJECT(is), TK_STREAM_PROP_IS_OK, FALSE)) {
log_debug("client disconnected\n");
break;
}
} while (TRUE);
OBJECT_UNREF(tcp);

View File

@ -2,6 +2,7 @@
#define WIN32_LEAN_AND_MEAN 1
#endif /*WIN32_LEAN_AND_MEAN*/
#include "tkc/mem.h"
#include "tkc/utils.h"
#include "tkc/platform.h"
#include "streams/inet/iostream_tcp.h"
@ -38,6 +39,7 @@ int main(int argc, char* argv[]) {
int port = 0;
int sock = 0;
uint32_t times = 10;
char* content = NULL;
const char* msg = NULL;
const char* host = NULL;
@ -55,9 +57,19 @@ int main(int argc, char* argv[]) {
msg = argv[3];
times = tk_atoi(argv[4]);
if(msg[0] == '@') {
uint32_t size = 0;
const char* filename = msg + 1;
content = file_read(filename, &size);
msg = content;
}
sock = tcp_connect(host, port);
socket_set_blocking(sock, FALSE);
do_send(tk_iostream_tcp_create(sock), msg, times);
TKMEM_FREE(content);
socket_deinit();
return 0;

View File

@ -59,6 +59,7 @@ int32_t tk_istream_read_len(tk_istream_t* stream, uint8_t* buff, uint32_t max_si
uint32_t now = 0;
uint32_t end = 0;
int32_t offset = 0;
ret_t ret = RET_OK;
int32_t read_bytes = 0;
int32_t remain_bytes = max_size;
return_value_if_fail(stream != NULL && stream->read != NULL, -1);
@ -69,26 +70,24 @@ int32_t tk_istream_read_len(tk_istream_t* stream, uint8_t* buff, uint32_t max_si
do {
errno = 0;
read_bytes = tk_istream_read(stream, buff + offset, remain_bytes);
ret = tk_istream_wait_for_data(stream, 20);
if (read_bytes <= 0) {
if (!object_get_prop_bool(OBJECT(stream), TK_STREAM_PROP_IS_OK, TRUE)) {
log_debug("stream is broken\n");
if (ret == RET_TIMEOUT) {
now = time_now_ms();
if (now > end) {
log_debug("read timeout.\n");
break;
}
if (object_get_prop_bool(OBJECT(stream), TK_STREAM_PROP_IS_EOS, FALSE)) {
log_debug("stream is end\n");
break;
}
if (errno == EAGAIN || errno == 0) {
sleep_ms(10);
continue;
} else {
log_debug("errno=%d\n", errno);
break;
continue;
}
} else if(ret != RET_OK) {
break;
}
read_bytes = tk_istream_read(stream, buff + offset, remain_bytes);
if (read_bytes <= 0) {
log_debug("errno=%d\n", errno);
break;
}
offset += read_bytes;

View File

@ -175,7 +175,12 @@ typedef enum _ret_t {
* @const RET_IO
* IO错误
*/
RET_IO
RET_IO,
/**
* @const RET_EOS
* End of Stream
*/
RET_EOS
} ret_t;
#ifdef ANDROID

View File

@ -293,7 +293,7 @@ uint16_t* tk_memset16(uint16_t* buff, uint16_t val, uint32_t size) {
return_value_if_fail(buff != NULL, NULL);
while ((uint32_t)pb % 4 != 0 && size > 0) {
while ((size_t)pb % 4 != 0 && size > 0) {
*p = val;
p++;
@ -338,7 +338,7 @@ uint32_t* tk_memset24(uint32_t* buff, void* val, uint32_t size) {
uint8_t* pb = (uint8_t*)buff;
uint8_t* src = (uint8_t*)val;
while ((uint32_t)pb % 4 != 0 && size > 0) {
while ((size_t)pb % 4 != 0 && size > 0) {
pb[0] = src[0];
pb[1] = src[1];
pb[2] = src[2];