fixed bugs in websocket::read_frame_data and WebSocketServlet.cpp.

This commit is contained in:
zhengshuxin 2018-10-20 00:30:02 +08:00
parent 6b3133e2ca
commit f4ed8742fb
8 changed files with 224 additions and 222 deletions

View File

@ -1,5 +1,9 @@
修改历史列表:
544) 2018.10.20
544.1) bugfix: websocket::read_frame_data 被循环调用时,掩码使用方式有误
544.2) bugfix: WebSocketServlet.cpp 中有多个成员变量未初始化
543) 2018.10.8
543.1) feature: server_socket 构造方法分开多个,避免使用时产生歧义
543.2) feature: master_udp 增加 proc_on_unbind 虚方法,用于当套接口关闭前的回调

View File

@ -100,7 +100,7 @@ public:
* @return {bool} false
*
*/
bool doRun(void);
virtual bool doRun(void);
/**
* HttpServlet HTTP doXXX
@ -111,7 +111,7 @@ public:
* 便 acl_master
* @return {bool}
*/
bool doRun(session& session, socket_stream* stream = NULL);
virtual bool doRun(session& session, socket_stream* stream = NULL);
/**
* HttpServlet HTTP doXXX
@ -120,7 +120,7 @@ public:
* @param stream {socket_stream*}
* @return {bool}
*/
bool doRun(const char* memcached_addr, socket_stream* stream);
virtual bool doRun(const char* memcached_addr, socket_stream* stream);
/**
* HTTP GET

View File

@ -7,6 +7,8 @@ namespace acl
class websocket;
class session;
class HttpServletRequest;
class HttpServletResponse;
class WebSocketServlet: public HttpServlet
{
@ -44,33 +46,14 @@ public:
virtual ~WebSocketServlet(void);
/**
* HttpServlet HTTP doXXX
* @param session {session&} session
* @param stream {socket_stream*} acl_master
* apache CGI
* NULL
* 便 acl_master
* @return {bool}
*/
virtual bool doRun(session& session, socket_stream* stream = NULL)
{
return HttpServlet::doRun(session, stream);
}
// @override
bool doRun(void);
/**
* HttpServlet HTTP doXXX
* memcached session
* @param memcached_addr {const char*} memcached IP:PORT
* @param stream {socket_stream*}
* @return {bool}
*/
virtual bool doRun(const char* memcached_addr, socket_stream* stream)
{
return HttpServlet::doRun(memcached_addr, stream);
}
// @override
bool doRun(session& session, socket_stream* stream = NULL);
virtual bool doRun(void);
// @override
bool doRun(const char* memcached_addr, socket_stream* stream);
/**
* .
@ -78,7 +61,7 @@ public:
* @return {bool} false. true
*/
bool send_binary(const char *buf, int len);
bool sendBinary(const char *buf, int len);
/**
* .
@ -86,23 +69,23 @@ public:
* @return {bool} false. true
*/
bool send_text(const char *text);
bool sendText(const char *text);
/**
* pong .
* @param rw_timeout {const char *}
* @return {bool} false. true
*/
bool send_pong(const char *buffer = NULL);
bool sendPong(const char *buffer = NULL);
/**
* pong .
* @param rw_timeout {const char *}
* @return {bool} false. true
*/
bool send_ping(const char *buffer = NULL);
bool sendPing(const char *buffer = NULL);
unsigned long long get_max_msg_len(void) const
unsigned long long getMaxMsgLen(void) const
{
return max_msg_len_;
}
@ -111,7 +94,7 @@ public:
* websocket websocket连接.
* @param unsigned long long{len}
*/
void set_max_msg_len(unsigned long long len)
void setMaxMsgLen(unsigned long long len)
{
max_msg_len_ = len;
}
@ -121,7 +104,7 @@ protected:
* websocket
* @return {void}
*/
virtual void on_close(void);
virtual void onClose(void);
/**
* websocket ping .
@ -129,7 +112,7 @@ protected:
* @param {int} len
* @return {bool} false
*/
virtual bool on_ping(const char *buf, unsigned long long len) = 0;
virtual bool onPing(const char *buf, unsigned long long len) = 0;
/**
* websocket pong .
@ -137,7 +120,7 @@ protected:
* @param {int} len
* @return {bool} false
*/
virtual bool on_pong(const char *buf, unsigned long long len) = 0;
virtual bool onPong(const char *buf, unsigned long long len) = 0;
/**
* websocket ping .
@ -146,7 +129,7 @@ protected:
* @param text{bool } true
* @return {bool} false
*/
virtual bool on_message(char *data, unsigned long long len, bool text) = 0;
virtual bool onMessage(char *data, unsigned long long len, bool text) = 0;
private:
// @override
@ -157,9 +140,9 @@ private:
unsigned long long max_msg_len_;
websocket *ws_;
char *recv_buffer_;
int write_pos_;
int opcode_;
char *buffer_;
int wpos_;
int opcode_;
};
} // namespace acl

View File

@ -21,21 +21,21 @@ class socket_stream;
enum
{
FRAME_CONTINUATION = 0x00,
FRAME_TEXT = 0x01,
FRAME_BINARY = 0x02,
FRAME_RSV3 = 0x03,
FRAME_RSV4 = 0x04,
FRAME_RSV5 = 0x05,
FRAME_RSV6 = 0x06,
FRAME_RSV7 = 0x07,
FRAME_CLOSE = 0x08,
FRAME_PING = 0x09,
FRAME_PONG = 0x0A,
FRAME_CTL_RSVB = 0x0B,
FRAME_CTL_RSVC = 0x0C,
FRAME_CTL_RSVD = 0x0D,
FRAME_CTL_RSVE = 0x0E,
FRAME_CTL_RSVF = 0x0F,
FRAME_TEXT = 0x01,
FRAME_BINARY = 0x02,
FRAME_RSV3 = 0x03,
FRAME_RSV4 = 0x04,
FRAME_RSV5 = 0x05,
FRAME_RSV6 = 0x06,
FRAME_RSV7 = 0x07,
FRAME_CLOSE = 0x08,
FRAME_PING = 0x09,
FRAME_PONG = 0x0A,
FRAME_CTL_RSVB = 0x0B,
FRAME_CTL_RSVC = 0x0C,
FRAME_CTL_RSVD = 0x0D,
FRAME_CTL_RSVE = 0x0E,
FRAME_CTL_RSVF = 0x0F,
};
struct frame_header

View File

@ -75,41 +75,40 @@ bool WebsocketServlet_impl::doPost(acl::HttpServletRequest& req,
return res.write(buf) && res.write(NULL, 0);
}
bool WebsocketServlet_impl::on_ping(const char *buf, unsigned long long len)
bool WebsocketServlet_impl::onPing(const char*, unsigned long long)
{
return send_pong();
return sendPong();
}
bool WebsocketServlet_impl::on_pong(const char *buf, unsigned long long len)
bool WebsocketServlet_impl::onPong(const char*, unsigned long long)
{
return send_ping();
return sendPing();
}
bool WebsocketServlet_impl::on_message(char *data, unsigned long long len, bool text)
bool WebsocketServlet_impl::onMessage(char *data, unsigned long long len, bool text)
{
(void) text;
switch (step_)
{
case 0:
{
printf("FileName:%s\n",data);
printf("FileName:%s\n", data);
filename_.append(data, len);
step_++;
}
break;
break;
case 1:
{
printf("FileSize:%s\n", data);
filesize_ = std::strtol(data, 0, 10);
filesize_ = strtol(data, 0, 10);
step_++;
}
break;
break;
case 2:
{
if (!file_)
{
file_ = new acl::ofstream();
file_->open_trunc(filename_);
}
file_->write(data, len);
current_filesize_ += len;
if (current_filesize_ == filesize_)
@ -124,8 +123,7 @@ bool WebsocketServlet_impl::on_message(char *data, unsigned long long len, bool
current_filesize_ = 0;
step_ = 0;
}
}
break;
break;
default:
break;
}

View File

@ -4,65 +4,62 @@ class WebsocketServlet_impl : public acl::WebSocketServlet
{
public:
WebsocketServlet_impl(acl::redis_client_cluster& cluster, size_t max_conns);
~WebsocketServlet_impl();
~WebsocketServlet_impl(void);
acl::session& get_session() const
acl::session& get_session(void) const
{
return *session_;
}
protected:
virtual bool doUnknown(acl::HttpServletRequest&,
acl::HttpServletResponse& res);
virtual bool doGet(acl::HttpServletRequest& req,
acl::HttpServletResponse& res);
virtual bool doPost(acl::HttpServletRequest& req,
acl::HttpServletResponse& res);
// @override
bool doUnknown(acl::HttpServletRequest&, acl::HttpServletResponse&);
// @override
bool doGet(acl::HttpServletRequest&, acl::HttpServletResponse&);
// @override
bool doPost(acl::HttpServletRequest&, acl::HttpServletResponse&);
//for websocket
/**
* websocket
* @return {void}
*/
virtual void on_close()
{
}
/**
* websocket ping .
* @param {const char *} buf
* @param {int} len
* @return {bool} false
*/
virtual bool on_ping(const char *buf, unsigned long long len);
/**
* websocket pong .
* @param {const char *} buf
* @param {int} len
* @return {bool} false
*/
virtual bool on_pong(const char *buf, unsigned long long len);
* @override
* websocket
* @return {void}
*/
void onClose(void) {}
/**
* websocket ping .
* @param data{char *}
* @param len{unsigned long long}
* @param text{bool } true
* @return {bool} false
*/
virtual bool on_message(char *data, unsigned long long len, bool text);
* @override
* websocket ping .
* @param {const char *} buf
* @param {int} len
* @return {bool} false
*/
bool onPing(const char *buf, unsigned long long len);
/**
* @override
* websocket pong .
* @param {const char *} buf
* @param {int} len
* @return {bool} false
*/
bool onPong(const char *buf, unsigned long long len);
/**
* @override
* websocket ping .
* @param data{char *}
* @param len{unsigned long long}
* @param text{bool } true
* @return {bool} false
*/
bool onMessage(char *data, unsigned long long len, bool text);
private:
acl::session* session_;
int step_;
acl::string filename_;
acl::ofstream *file_;
int filesize_;
int current_filesize_;
};

View File

@ -23,34 +23,50 @@ namespace acl
{
WebSocketServlet::WebSocketServlet(socket_stream* stream, session* session)
: HttpServlet(stream, session)
, max_msg_len_(100 * 1024*1024)
, ws_(NULL)
: HttpServlet(stream, session)
, max_msg_len_(100 * 1024*1024)
, ws_(NULL)
, buffer_(NULL)
, wpos_(0)
, opcode_(0)
{
}
WebSocketServlet::WebSocketServlet(socket_stream* stream,
const char* memcache_addr)
:HttpServlet(stream, memcache_addr)
, max_msg_len_(100 * 1024 * 1024)
, ws_(NULL)
: HttpServlet(stream, memcache_addr)
, max_msg_len_(100 * 1024 * 1024)
, ws_(NULL)
, buffer_(NULL)
, wpos_(0)
, opcode_(0)
{
}
WebSocketServlet::WebSocketServlet(void)
:max_msg_len_(100 * 1024 * 1024),
ws_(NULL)
: max_msg_len_(100 * 1024 * 1024)
, ws_(NULL)
, buffer_(NULL)
, wpos_(0)
, opcode_(0)
{
}
WebSocketServlet::~WebSocketServlet(void)
{
if (ws_)
delete ws_;
ws_ = NULL;
delete ws_;
if (buffer_)
acl_myfree(buffer_);
}
bool WebSocketServlet::doRun(session& session, socket_stream* stream /* = NULL */)
{
return HttpServlet::doRun(session, stream);
}
bool WebSocketServlet::doRun(const char* memcached_addr, socket_stream* stream)
{
return HttpServlet::doRun(memcached_addr, stream);
}
bool WebSocketServlet::doRun(void)
@ -58,9 +74,9 @@ bool WebSocketServlet::doRun(void)
if (!ws_)
{
bool ret = HttpServlet::doRun();
//websocket upgrade ok.
//maybe http without keepalive ,
//return false,framework will close this connection.
// websocket upgrade ok.
// maybe http without keepalive ,
// return false, framework will close this connection.
if (ws_)
return true;
return ret;
@ -78,26 +94,26 @@ bool WebSocketServlet::doRun(void)
if (len > 0)
{
if (!recv_buffer_)
recv_buffer_ = (char*) acl_mymalloc((size_t)len + 1);
if (!buffer_)
buffer_ = (char*) acl_mymalloc((size_t) len + 1);
else
//frame not finish.
recv_buffer_ = (char*) acl_myrealloc(recv_buffer_,
write_pos_ + (size_t)len + 1);
buffer_ = (char*) acl_myrealloc(buffer_,
wpos_ + (size_t) len + 1);
if (ws_->read_frame_data(recv_buffer_+ write_pos_, (size_t) len) < 0)
if (ws_->read_frame_data(buffer_ + wpos_, (size_t) len) < 0)
{
write_pos_ = 0;
acl_myfree(recv_buffer_);
recv_buffer_ = NULL;
acl_myfree(buffer_);
buffer_ = NULL;
wpos_ = 0;
return false;
}
write_pos_ += (int) len;
recv_buffer_[write_pos_] = '\0';
wpos_ += (int) len;
buffer_[wpos_] = '\0';
}
int opcode = ws_->get_frame_opcode();
bool ret = false;
int opcode = ws_->get_frame_opcode();
bool ret = false;
if (ws_->get_frame_fin() == false)
{
@ -114,17 +130,17 @@ bool WebSocketServlet::doRun(void)
switch (opcode)
{
case FRAME_PING:
ret = on_ping(recv_buffer_, write_pos_);
ret = onPing(buffer_, wpos_);
break;
case FRAME_PONG:
ret = on_pong(recv_buffer_, write_pos_);
ret = onPong(buffer_, wpos_);
break;
case FRAME_CLOSE:
on_close();
onClose();
break;
case FRAME_TEXT:
case FRAME_BINARY:
ret = on_message(recv_buffer_, write_pos_,
ret = onMessage(buffer_, wpos_,
ws_->get_frame_opcode() == FRAME_TEXT);
break;
default:
@ -133,17 +149,17 @@ bool WebSocketServlet::doRun(void)
break;
}
if (recv_buffer_)
if (buffer_)
{
acl_myfree(recv_buffer_);
recv_buffer_ = NULL;
write_pos_ = 0;
acl_myfree(buffer_);
buffer_ = NULL;
wpos_ = 0;
}
return ret;
}
bool WebSocketServlet::send_binary(const char *buffer, int len)
bool WebSocketServlet::sendBinary(const char *buffer, int len)
{
ws_->set_frame_opcode(FRAME_BINARY);
ws_->set_frame_fin(true);
@ -151,7 +167,7 @@ bool WebSocketServlet::send_binary(const char *buffer, int len)
return ws_->send_frame_data(buffer, len);
}
bool WebSocketServlet::send_text(const char *buffer)
bool WebSocketServlet::sendText(const char *buffer)
{
ws_->set_frame_opcode(FRAME_TEXT);
ws_->set_frame_fin(true);
@ -159,7 +175,7 @@ bool WebSocketServlet::send_text(const char *buffer)
return ws_->send_frame_data(buffer, strlen(buffer));
}
bool WebSocketServlet::send_pong(const char *text)
bool WebSocketServlet::sendPong(const char *text)
{
size_t len = !text ? 0 : strlen(text);
ws_->set_frame_opcode(FRAME_PONG);
@ -168,7 +184,7 @@ bool WebSocketServlet::send_pong(const char *text)
return ws_->send_frame_data(text, len);
}
bool WebSocketServlet::send_ping(const char *text)
bool WebSocketServlet::sendPing(const char *text)
{
size_t len = !text ? 0 : strlen(text);
ws_->set_frame_opcode(FRAME_PING);
@ -180,11 +196,11 @@ bool WebSocketServlet::send_ping(const char *text)
bool WebSocketServlet::doWebsocket(HttpServletRequest&, HttpServletResponse&)
{
acl_assert(!ws_);
ws_ = new websocket(*getStream());
ws_ = NEW websocket(*getStream());
return true;
}
void WebSocketServlet::on_close(void)
void WebSocketServlet::onClose(void)
{
}

View File

@ -25,8 +25,9 @@ websocket::websocket(socket_stream& client)
websocket::~websocket(void)
{
if (header_buf_)
if (header_buf_) {
acl_myfree(header_buf_);
}
}
websocket& websocket::reset(void)
@ -92,45 +93,46 @@ websocket& websocket::set_frame_masking_key(unsigned int mask)
void websocket::make_frame_header(void)
{
header_len_ = 2;
if (header_.payload_len > 65535)
if (header_.payload_len > 65535) {
header_len_ += 8;
else if (header_.payload_len >= 126)
} else if (header_.payload_len >= 126) {
header_len_ += 2;
if (header_.mask)
}
if (header_.mask) {
header_len_ += 4;
}
if (header_len_ > header_size_)
{
if (header_len_ > header_size_) {
header_buf_ = (char*) acl_myrealloc(header_buf_, header_len_);
header_size_ = header_len_;
}
unsigned char* ptr = (unsigned char*) header_buf_;
if (header_.fin)
if (header_.fin) {
ptr[0] = 0x80;
else
} else {
ptr[0] = 0x00;
}
ptr[0] |= header_.opcode;
if (header_.payload_len && header_.mask)
if (header_.payload_len && header_.mask) {
ptr[1] = 0x80;
else
} else {
ptr[1] = 0x00;
}
unsigned long long offset = 1;
unsigned long long payload_len = header_.payload_len;
if (payload_len <= 125)
if (payload_len <= 125) {
ptr[offset++] |= payload_len & 0xff;
else if (payload_len <= 65535)
{
} else if (payload_len <= 65535) {
ptr[offset++] |= 126;
ptr[offset++] = (unsigned char) (payload_len >> 8) & 0xff;
ptr[offset++] = (unsigned char) payload_len & 0xff;
}
else
{
} else {
ptr[offset++] |= 127;
ptr[offset++] = (unsigned char) ((payload_len >> 56) & 0xff);
ptr[offset++] = (unsigned char) ((payload_len >> 48) & 0xff);
@ -142,8 +144,7 @@ void websocket::make_frame_header(void)
ptr[offset++] = (unsigned char) (payload_len & 0xff);
}
if (payload_len > 0 && header_.mask)
{
if (payload_len > 0 && header_.mask) {
unsigned int masking_key = header_.masking_key;
ptr[offset++] = (unsigned char) ((masking_key >> 24) & 0xff);
ptr[offset++] = (unsigned char) ((masking_key >> 16) & 0xff);
@ -162,11 +163,12 @@ bool websocket::send_frame_data(const char* str)
bool websocket::send_frame_data(const void* data, size_t len)
{
if (data == NULL || len == 0)
if (data == NULL || len == 0) {
return send_frame_data((void*) data, len);
}
void* buf = acl_mymemdup(data, len);
bool ret = send_frame_data(buf, len);
bool ret = send_frame_data(buf, len);
acl_myfree(buf);
return ret;
}
@ -178,30 +180,28 @@ bool websocket::send_frame_data(char* str)
bool websocket::send_frame_data(void* data, size_t len)
{
if (!header_sent_)
{
if (!header_sent_) {
header_sent_ = true;
make_frame_header();
if (client_.write(header_buf_, header_len_) == -1)
{
if (client_.write(header_buf_, header_len_) == -1) {
logger_error("write header error %s, len: %d",
last_serror(), (int) header_len_);
return false;
}
}
if (data == NULL || len == 0)
if (data == NULL || len == 0) {
return true;
if (header_.mask)
{
unsigned char* mask = (unsigned char*) &header_.masking_key;
for (size_t i = 0; i < len; i++)
((char*) data)[i] ^= mask[i % 4];
}
if (client_.write(data, len) == -1)
{
if (header_.mask) {
unsigned char* mask = (unsigned char*) &header_.masking_key;
for (size_t i = 0; i < len; i++) {
((char*) data)[i] ^= mask[i % 4];
}
}
if (client_.write(data, len) == -1) {
logger_error("write frame data error %s", last_serror());
return false;
}
@ -216,11 +216,12 @@ bool websocket::send_frame_pong(const char* str)
bool websocket::send_frame_pong(const void* data, size_t len)
{
if (data == NULL || len == 0)
if (data == NULL || len == 0) {
return send_frame_pong((void*) NULL, 0);
}
void* buf = acl_mymemdup(data, len);
bool ret = send_frame_pong(buf, len);
bool ret = send_frame_pong(buf, len);
acl_myfree(buf);
return ret;
}
@ -247,8 +248,9 @@ bool websocket::send_frame_ping(const char* str)
bool websocket::send_frame_ping(const void* data, size_t len)
{
if (data == NULL || len == 0)
if (data == NULL || len == 0) {
return send_frame_ping((void*) NULL, 0);
}
void* buf = acl_mymemdup(data, len);
bool ret = send_frame_ping(buf, len);
@ -275,10 +277,11 @@ static bool is_big_endian(void)
{
const int n = 1;
if (*(char*) &n)
if (*(char*) &n) {
return false;
else
} else {
return true;
}
}
#ifndef swap64
@ -302,11 +305,11 @@ bool websocket::read_frame_head(void)
int ret;
unsigned char buf[8];
if (client_.read(buf, 2) == -1)
{
if (last_error() != ACL_ETIMEDOUT)
if (client_.read(buf, 2) == -1) {
if (last_error() != ACL_ETIMEDOUT) {
logger_error("read first two char error: %d, %s",
last_error(), last_serror());
}
return false;
}
@ -318,37 +321,37 @@ bool websocket::read_frame_head(void)
header_.mask = (buf[1] >> 7) & 0x01;
unsigned char payload_len = buf[1] & 0x7f;
if (payload_len <= 125)
if (payload_len <= 125) {
header_.payload_len = payload_len;
}
// payload_len == 126 | 127
else if ((ret = client_.read(buf, payload_len == 126 ? 2 : 8)) == -1)
{
if (last_error() != ACL_ETIMEDOUT)
else if ((ret = client_.read(buf, payload_len == 126 ? 2 : 8)) == -1) {
if (last_error() != ACL_ETIMEDOUT) {
logger_error("read ext_payload_len error: %d, %s",
last_error(), last_serror());
}
return false;
}
else if (ret == 2)
{
} else if (ret == 2) {
unsigned short n;
memcpy(&n, buf, ret);
header_.payload_len = ntohs(n);
}
else // ret == 8
{
} else {
// ret == 8
memcpy(&header_.payload_len, buf, ret);
header_.payload_len = ntoh64(header_.payload_len);
}
if (!header_.mask)
if (!header_.mask) {
return true;
}
if (client_.read(&header_.masking_key, sizeof(unsigned int)) == -1)
{
if (last_error() != ACL_ETIMEDOUT)
if (client_.read(&header_.masking_key, sizeof(unsigned int)) == -1) {
if (last_error() != ACL_ETIMEDOUT) {
logger_error("read masking_key error: %d, %s",
last_error(), last_serror());
}
return false;
}
@ -357,30 +360,31 @@ bool websocket::read_frame_head(void)
int websocket::read_frame_data(char* buf, size_t size)
{
if (payload_nread_ == header_.payload_len)
if (payload_nread_ >= header_.payload_len) {
return 0;
}
if (header_.payload_len - payload_nread_ < size)
if (header_.payload_len < payload_nread_ + size) {
size = (size_t) (header_.payload_len - payload_nread_);
}
int ret = client_.read(buf, size, false);
if (ret == -1)
{
if (last_error() != ACL_ETIMEDOUT)
if (ret == -1) {
if (last_error() != ACL_ETIMEDOUT) {
logger_error("read frame data error: %d, %s",
last_error(), last_serror());
}
return -1;
}
if (header_.mask)
{
if (header_.mask) {
unsigned char* mask = (unsigned char*) &header_.masking_key;
for (int i = 0; i < ret; i++)
buf[i] ^= mask[i % 4];
for (int i = 0; i < ret; i++) {
buf[i] ^= mask[(payload_nread_ + i) % 4];
}
}
payload_nread_ += ret;
return ret;
}