diff --git a/src/streams/SConscript b/src/streams/SConscript index ec2cf77bf..e426c1cf9 100644 --- a/src/streams/SConscript +++ b/src/streams/SConscript @@ -11,9 +11,11 @@ env=DefaultEnvironment().Clone() env.Library(os.path.join(LIB_DIR, 'streams'), sources, LIBS=[]) -LIBS=['streams'] + env['LIBS'] +LIBS=['streams', 'compressors', 'miniz'] + env['LIBS'] env.Program(os.path.join(BIN_DIR, 'udp_send'), ['tools/udp_send.c'], LIBS=LIBS); env.Program(os.path.join(BIN_DIR, 'udp_recv'), ['tools/udp_recv.c'], LIBS=LIBS); env.Program(os.path.join(BIN_DIR, 'serial_echo'), ['tools/serial_echo.c'], LIBS=LIBS); env.Program(os.path.join(BIN_DIR, 'serial_send'), ['tools/serial_send.c'], LIBS=LIBS); env.Program(os.path.join(BIN_DIR, 'serial_at'), ['tools/serial_at.c'], LIBS=LIBS); +env.Program(os.path.join(BIN_DIR, 'shdlc_send'), ['tools/shdlc_send.c'], LIBS=LIBS); +env.Program(os.path.join(BIN_DIR, 'shdlc_recv'), ['tools/shdlc_recv.c'], LIBS=LIBS); diff --git a/src/streams/noisy/ostream_noisy.c b/src/streams/noisy/ostream_noisy.c index 7737f0a5a..07034cbe9 100644 --- a/src/streams/noisy/ostream_noisy.c +++ b/src/streams/noisy/ostream_noisy.c @@ -103,7 +103,7 @@ tk_ostream_t* tk_ostream_noisy_create(tk_ostream_t* real_ostream) { ostream_noisy = TK_OSTREAM_NOISY(obj); return_value_if_fail(ostream_noisy != NULL, NULL); - ostream_noisy->error_level = 1; + ostream_noisy->error_level = 3; ostream_noisy->real_ostream = real_ostream; wbuffer_init_extendable(&(ostream_noisy->wb)); TK_OSTREAM(obj)->write = tk_ostream_noisy_write; diff --git a/src/streams/shdlc/istream_shdlc.c b/src/streams/shdlc/istream_shdlc.c index 8aef6fba6..74593f6e7 100644 --- a/src/streams/shdlc/istream_shdlc.c +++ b/src/streams/shdlc/istream_shdlc.c @@ -44,16 +44,33 @@ static ret_t tk_istream_shdlc_send_ack(tk_istream_t* stream, bool_t ok, uint8_t } ret_t tk_istream_shdlc_read_frame(tk_istream_t* stream, wbuffer_t* wb) { + ret_t ret = RET_OK; shdlc_header_t header = {0}; tk_istream_shdlc_t* istream_shdlc = TK_ISTREAM_SHDLC(stream); + + uint8_t seqno = 0; + uint32_t retry_times = 0; uint32_t timeout = istream_shdlc->timeout; tk_istream_t* real_istream = istream_shdlc->iostream->real_istream; - ret_t ret = shdlc_read_data(real_istream, wb, timeout); - return_value_if_fail(ret != RET_IO, RET_IO); - header.data = wb->data[0]; - if (header.s.type == SHDLC_DATA) { - tk_istream_shdlc_send_ack(stream, ret == RET_OK, header.s.seqno); + for (retry_times = 0; retry_times < istream_shdlc->retry_times; retry_times++) { + ret = shdlc_read_data(real_istream, wb, timeout); + return_value_if_fail(ret != RET_IO, RET_IO); + + header.data = wb->data[0]; + seqno = header.s.seqno; + + if (ret == RET_CRC) { + log_debug("retry_times=%u\n", retry_times); + return_value_if_fail(tk_istream_shdlc_send_ack(stream, FALSE, seqno) == RET_OK, RET_IO); + continue; + } else if (ret == RET_OK) { + if (header.s.type == SHDLC_DATA) { + return_value_if_fail(tk_istream_shdlc_send_ack(stream, TRUE, seqno) == RET_OK, RET_IO); + } + } + + break; } return ret; @@ -99,7 +116,6 @@ ret_t tk_istream_shdlc_read_ack(tk_istream_t* stream, uint8_t seqno) { header.data = wb->data[0]; if (header.s.type != SHDLC_DATA) { - ENSURE(header.s.seqno == seqno); break; } @@ -131,6 +147,9 @@ static ret_t tk_istream_shdlc_set_prop(object_t* obj, const char* name, const va if (tk_str_eq(name, TK_STREAM_PROP_TIMEOUT)) { istream_shdlc->timeout = value_uint32(v); return RET_OK; + } else if (tk_str_eq(name, TK_STREAM_PROP_RETRY_TIMES)) { + istream_shdlc->retry_times = value_uint32(v); + return RET_OK; } return object_set_prop(OBJECT(real_istream), name, v); @@ -143,6 +162,10 @@ static ret_t tk_istream_shdlc_get_prop(object_t* obj, const char* name, value_t* if (tk_str_eq(name, TK_STREAM_PROP_TIMEOUT)) { value_set_uint32(v, istream_shdlc->timeout); + return RET_OK; + } else if (tk_str_eq(name, TK_STREAM_PROP_RETRY_TIMES)) { + value_set_uint32(v, istream_shdlc->retry_times); + return RET_OK; } @@ -180,6 +203,7 @@ tk_istream_t* tk_istream_shdlc_create(tk_iostream_shdlc_t* iostream) { ENSURE(wbuffer_init_extendable(&(istream_shdlc->wb_compress)) != NULL); istream_shdlc->timeout = 3000; + istream_shdlc->retry_times = 10; istream_shdlc->last_seqno = 0xff; istream_shdlc->iostream = iostream; istream_shdlc->compressor = compressor_miniz_create(COMPRESSOR_RATIO_FIRST); diff --git a/src/streams/shdlc/istream_shdlc.h b/src/streams/shdlc/istream_shdlc.h index 39f1f3f4a..5630488fd 100644 --- a/src/streams/shdlc/istream_shdlc.h +++ b/src/streams/shdlc/istream_shdlc.h @@ -47,6 +47,11 @@ struct _tk_istream_shdlc_t { * 读写超时时间(ms) */ uint32_t timeout; + /** + * @property {uint32_t} retry_times + * 失败重传次数。 + */ + uint8_t retry_times; wbuffer_t wb; ring_buffer_t* rb; diff --git a/src/streams/shdlc/ostream_shdlc.c b/src/streams/shdlc/ostream_shdlc.c index 6d1f061d2..32dac4cf6 100644 --- a/src/streams/shdlc/ostream_shdlc.c +++ b/src/streams/shdlc/ostream_shdlc.c @@ -29,7 +29,7 @@ static int32_t tk_ostream_shdlc_write(tk_ostream_t* stream, const uint8_t* buff, tk_ostream_shdlc_t* ostream_shdlc = TK_OSTREAM_SHDLC(stream); uint8_t seqno = ostream_shdlc->seqno; uint32_t timeout = ostream_shdlc->timeout; - uint32_t retry_times = ostream_shdlc->retry_times; + uint32_t retry_times = 0; tk_istream_t* istream = ostream_shdlc->iostream->istream; tk_ostream_t* real_ostream = ostream_shdlc->iostream->real_ostream; @@ -43,7 +43,7 @@ static int32_t tk_ostream_shdlc_write(tk_ostream_t* stream, const uint8_t* buff, return_value_if_fail(shdlc_write_data(wb, seqno, FALSE, buff, size) == RET_OK, 0); } - while (retry_times) { + while (retry_times < ostream_shdlc->retry_times) { return_value_if_fail( tk_ostream_write_len(real_ostream, wb->data, wb->cursor, timeout) == wb->cursor, RET_IO); @@ -56,7 +56,8 @@ static int32_t tk_ostream_shdlc_write(tk_ostream_t* stream, const uint8_t* buff, return size; } - retry_times--; + retry_times++; + log_debug("retry_times=%u\n", retry_times); } log_debug("shdlc write failed\n"); diff --git a/src/streams/tools/shdlc_recv.c b/src/streams/tools/shdlc_recv.c new file mode 100644 index 000000000..6281bfa4b --- /dev/null +++ b/src/streams/tools/shdlc_recv.c @@ -0,0 +1,60 @@ +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN 1 +#endif /*WIN32_LEAN_AND_MEAN*/ + +#include "tkc/utils.h" +#include "tkc/platform.h" +#include "streams/inet/iostream_tcp.h" +#include "streams/shdlc/iostream_shdlc.h" +#include "streams/inet/socket_helper.h" + +void do_recv(int port) { + uint8_t buff[1024]; + int slisten = tcp_listen(port); + log_debug("listen: %d\n", port); + do { + int ret = 0; + int sock = tcp_accept(slisten); + tk_iostream_t* tcp = tk_iostream_tcp_create(sock); + tk_iostream_t* shdlc = tk_iostream_shdlc_create(tcp); + tk_istream_t* is = tk_iostream_get_istream(shdlc); + tk_ostream_t* os = tk_iostream_get_ostream(shdlc); + + memset(buff, 0x00, sizeof(buff)); + + do { + ret = tk_istream_read(is, buff, sizeof(buff)); + if (ret > 0) { + log_debug("read: %s\n", (char*)buff); + ret = tk_ostream_write(os, buff, ret); + } else { + break; + } + } while (TRUE); + + OBJECT_UNREF(tcp); + OBJECT_UNREF(shdlc); + } while (1); + + return; +} + +int main(int argc, char* argv[]) { + int port = 0; + + if (argc != 2) { + printf("Usage: %s port\n", argv[0]); + return 0; + } + + socket_init(); + platform_prepare(); + TK_ENABLE_CONSOLE(); + + port = tk_atoi(argv[1]); + do_recv(port); + + socket_deinit(); + + return 0; +} diff --git a/src/streams/tools/shdlc_send.c b/src/streams/tools/shdlc_send.c new file mode 100644 index 000000000..ff3bc9433 --- /dev/null +++ b/src/streams/tools/shdlc_send.c @@ -0,0 +1,59 @@ +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN 1 +#endif /*WIN32_LEAN_AND_MEAN*/ + +#include "tkc/utils.h" +#include "tkc/platform.h" +#include "streams/inet/iostream_tcp.h" +#include "streams/noisy/iostream_noisy.h" +#include "streams/shdlc/iostream_shdlc.h" +#include "streams/inet/socket_helper.h" + +void do_send(tk_iostream_t* tcp, const char* msg) { + char buff[1024]; + int32_t ret = 0; + uint32_t size = strlen(msg) + 1; + tk_iostream_t* noisy = tk_iostream_noisy_create(tcp); + tk_iostream_t* iostream = tk_iostream_shdlc_create(noisy); + tk_istream_t* istream = tk_iostream_get_istream(iostream); + tk_ostream_t* ostream = tk_iostream_get_ostream(iostream); + + memset(buff, 0x00, sizeof(buff)); + ret = tk_ostream_write(ostream, (uint8_t*)msg, size); + log_debug("send ret=%d %s\n", ret, msg); + + ret = tk_istream_read(istream, (uint8_t*)buff, sizeof(buff)); + log_debug("%d: %s\n", ret, buff); + + object_unref(OBJECT(iostream)); + + return; +} + +int main(int argc, char* argv[]) { + int port = 0; + int sock = 0; + const char* msg = NULL; + const char* host = NULL; + + if (argc != 4) { + printf("Usage: %s host port msg\n", argv[0]); + return 0; + } + + socket_init(); + platform_prepare(); + TK_ENABLE_CONSOLE(); + + host = argv[1]; + port = tk_atoi(argv[2]); + msg = argv[3]; + + sock = tcp_connect(host, port); + + do_send(tk_iostream_tcp_create(sock), msg); + + socket_deinit(); + + return 0; +} diff --git a/src/streams/tools/udp_recv.c b/src/streams/tools/udp_recv.c index fbcd50b2a..8ee79bf36 100644 --- a/src/streams/tools/udp_recv.c +++ b/src/streams/tools/udp_recv.c @@ -48,6 +48,7 @@ int main(int argc, char* argv[]) { } socket_init(); + platform_prepare(); TK_ENABLE_CONSOLE(); port = tk_atoi(argv[1]); diff --git a/src/streams/tools/udp_send.c b/src/streams/tools/udp_send.c index abbb59f9b..df3886ded 100644 --- a/src/streams/tools/udp_send.c +++ b/src/streams/tools/udp_send.c @@ -37,6 +37,7 @@ int main(int argc, char* argv[]) { } socket_init(); + platform_prepare(); TK_ENABLE_CONSOLE(); host = argv[1]; diff --git a/tests/iostream_shdlc_test.cc b/tests/iostream_shdlc_test.cc index 15b30c019..f79ed328a 100644 --- a/tests/iostream_shdlc_test.cc +++ b/tests/iostream_shdlc_test.cc @@ -4,6 +4,7 @@ #include "tkc/socket_pair.h" #include "streams/shdlc/iostream_shdlc.h" #include "streams/inet/iostream_tcp.h" +#include "streams/noisy/iostream_noisy.h" uint8_t rbuff[2048]; uint8_t sbuff[2048]; @@ -106,3 +107,42 @@ TEST(IOStreamSHDLC, large) { OBJECT_UNREF(b_tcp); OBJECT_UNREF(b_io); } + +static void* server_thread_entry_noisy(void* args) { + tk_iostream_t* b_io = TK_IOSTREAM(args); + tk_istream_t* is = tk_iostream_get_istream(b_io); + tk_ostream_t* os = tk_iostream_get_ostream(b_io); + + object_set_prop_int(OBJECT(os), TK_STREAM_PROP_COMPRESS_THRESHOLD, compress_threshold); + assert(tk_ostream_write(os, sbuff, data_size) == data_size); + + return NULL; +} + +TEST(IOStreamSHDLC, noisy) { + int socks[2]; + tk_socketpair(socks); + tk_iostream_t* a_tcp = tk_iostream_tcp_create(socks[0]); + tk_iostream_t* a_io = tk_iostream_shdlc_create(a_tcp); + + tk_iostream_t* b_tcp = tk_iostream_tcp_create(socks[1]); + tk_iostream_t* b_noisy = tk_iostream_noisy_create(b_tcp); + tk_iostream_t* b_io = tk_iostream_shdlc_create(b_noisy); + + tk_istream_t* is = tk_iostream_get_istream(a_io); + tk_ostream_t* os = tk_iostream_get_ostream(a_io); + tk_thread_t* t = tk_thread_create(server_thread_entry_noisy, b_io); + + data_size = 4; + gen_data(); + tk_thread_start(t); + ASSERT_EQ(tk_istream_read_len(is, rbuff, data_size, 30000), data_size); + ASSERT_EQ(memcmp(rbuff, sbuff, data_size), 0); + + tk_thread_join(t); + + OBJECT_UNREF(a_tcp); + OBJECT_UNREF(a_io); + OBJECT_UNREF(b_tcp); + OBJECT_UNREF(b_io); +}