add websocket supporting in http_aclient

This commit is contained in:
zhengshuxin 2019-06-09 22:46:07 +08:00
parent 7d252c27ad
commit 16f37938b0
5 changed files with 239 additions and 59 deletions

View File

@ -1,5 +1,8 @@
修改历史列表:
573) 2019.6.9
573.1) feature: http_aclient 类增加针对 websocket 的支持
572) 2019.6.8
572.1) feature: redis_client/redis_client_pool/redis_client_cluster 支持 SSL 通信方式

View File

@ -121,19 +121,42 @@ protected:
return true;
}
/**
* text
* @return {bool} true
*/
virtual bool on_ws_frame_text(void) { return true; }
/**
* binary
* @return {bool} true
*/
virtual bool on_ws_frame_binary(void) { return true; }
/**
*
*/
virtual void on_ws_frame_closed(void) {}
/**
* websocket
* @param data {char*}
* @param dlen {size_t}
* @return {bool} true
*/
virtual bool on_ws_read_body(char* data, size_t dlen)
virtual bool on_ws_frame_data(char* data, size_t dlen)
{
(void) data;
(void) dlen;
return true;
}
/**
*
* @return {bool} true
*/
virtual bool on_ws_frame_finish(void) { return true; }
protected:
/**
* WEB HTTP HTTP
@ -176,8 +199,13 @@ protected:
bool keep_alive_;
websocket* ws_in_;
websocket* ws_out_;
string* buff_;
bool handle_websocket(void);
bool handle_ws_data(void);
bool handle_ws_ping(void);
bool handle_ws_pong(void);
bool handle_ws_other(void);
private:
static int connect_callback(ACL_ASTREAM* stream, void* ctx);

View File

@ -201,12 +201,13 @@ public:
* >0:
*/
int peek_frame_data(char* buf, size_t size);
int peek_frame_data(string& buf, size_t size);
/**
* websocket
* websocket
* @return {bool}
*/
bool is_read_head(void) const;
bool is_head_finish(void) const;
/**
*
@ -315,7 +316,7 @@ private:
bool header_sent_;
unsigned status_;
string* header_read_;
string* peek_buf_;
void make_frame_header(void);

View File

@ -23,6 +23,9 @@ http_aclient::http_aclient(aio_handle& handle, polarssl_conf* ssl_conf /* NULL *
, hdr_res_(NULL)
, http_res_(NULL)
, keep_alive_(false)
, ws_in_(NULL)
, ws_out_(NULL)
, buff_(NULL)
{
header_ = NEW http_header;
}
@ -40,6 +43,10 @@ http_aclient::~http_aclient(void)
stream_->unbind();
delete stream_;
}
delete ws_in_;
delete ws_out_;
delete buff_;
}
http_header& http_aclient::request_header(void)
@ -118,34 +125,83 @@ void http_aclient::close_callback(void)
this->destroy();
}
bool http_aclient::handle_websocket(void)
bool http_aclient::handle_ws_ping(void)
{
acl_assert(ws_in_);
if (buff_ == NULL) {
buff_ = NEW string(1024);
}
if (ws_in_->is_read_head()) {
if (!ws_in_->peek_frame_head()) {
while (true) {
int ret = ws_in_->peek_frame_data(*buff_, 1024);
bool res;
switch (ret) {
case -1:
if (ws_in_->eof()) {
buff_->clear();
return false;
}
return true;
case 0:
res = ws_out_->send_frame_pong((void*) buff_->c_str(),
buff_->size());
buff_->clear();
return res;
default:
break;
}
}
}
unsigned char opcode = ws_in_->get_frame_opcode();
switch (opcode) {
case FRAME_TEXT:
case FRAME_BINARY:
break;
case FRAME_CLOSE:
return false;
case FRAME_PING:
return true;
case FRAME_PONG:
return true;
default:
return true;
bool http_aclient::handle_ws_pong(void)
{
if (buff_ == NULL) {
buff_ = NEW string(1024);
}
while (true) {
int ret = ws_in_->peek_frame_data(*buff_, 1024);
switch (ret) {
case -1:
if (ws_in_->eof()) {
buff_->clear();
return false;
}
return true;
case 0:
buff_->clear();
return true;
default:
break;
}
}
}
bool http_aclient::handle_ws_other(void)
{
if (buff_ == NULL) {
buff_ = NEW string(1024);
}
while (true) {
int ret = ws_in_->peek_frame_data(*buff_, 1024);
switch (ret) {
case -1:
if (ws_in_->eof()) {
buff_->clear();
return false;
}
return true;
case 0:
buff_->clear();
return true;
default:
break;
}
}
}
bool http_aclient::handle_ws_data(void)
{
char buf[8192];
size_t size = sizeof(buf) - 1;
@ -158,9 +214,9 @@ bool http_aclient::handle_websocket(void)
}
return true;
case 0:
return true;
return this->on_ws_frame_finish();
default:
if (!this->on_ws_read_body(buf, ret)) {
if (!this->on_ws_frame_data(buf, ret)) {
return false;
}
break;
@ -168,6 +224,57 @@ bool http_aclient::handle_websocket(void)
}
}
bool http_aclient::handle_websocket(void)
{
acl_assert(ws_in_);
if (!ws_in_->is_head_finish()) {
if (!ws_in_->peek_frame_head()) {
if (ws_in_->eof()) {
return false;
}
return true;
}
// 当读完数据帧头时,根据不同帧类型回调不同方法
unsigned char opcode = ws_in_->get_frame_opcode();
switch (opcode) {
case FRAME_TEXT:
if (!this->on_ws_frame_text()) {
return false;
}
break;
case FRAME_BINARY:
if (!this->on_ws_frame_binary()) {
return false;
}
break;
case FRAME_CLOSE:
this->on_ws_frame_closed();
return false;
case FRAME_PING:
return true;
case FRAME_PONG:
return true;
default:
return true;
}
}
unsigned char opcode = ws_in_->get_frame_opcode();
switch (opcode) {
case FRAME_TEXT:
case FRAME_BINARY:
return handle_ws_data();
case FRAME_PING:
return handle_ws_ping();
case FRAME_PONG:
return handle_ws_pong();
default:
return handle_ws_other();
}
}
// 在 SSL 握手阶段,该方法会多次调用,直至 SSL 握手成功或失败
bool http_aclient::read_wakeup(void)
{

View File

@ -29,7 +29,7 @@ websocket::websocket(socket_stream& client)
, payload_nsent_(0)
, header_sent_(false)
, status_(WS_HEAD_2BYTES)
, header_read_(NULL)
, peek_buf_(NULL)
{
reset();
}
@ -39,7 +39,7 @@ websocket::~websocket(void)
if (header_buf_) {
acl_myfree(header_buf_);
}
delete header_read_;
delete peek_buf_;
}
websocket& websocket::reset(void)
@ -53,13 +53,13 @@ websocket& websocket::reset(void)
header_.payload_len = 0;
header_.masking_key = 0;
payload_nread_ = 0;
payload_nsent_ = 0;
header_sent_ = false;
payload_nread_ = 0;
payload_nsent_ = 0;
header_sent_ = false;
status_ = WS_HEAD_2BYTES;
status_ = WS_HEAD_2BYTES;
if (header_read_) {
header_read_->clear();
if (peek_buf_) {
peek_buf_->clear();
}
return *this;
@ -432,19 +432,19 @@ void websocket::update_head_2bytes(unsigned char ch1, unsigned ch2)
bool websocket::peek_head_2bytes(void)
{
size_t len = header_read_->size();
size_t len = peek_buf_->size();
if (len >= 2) {
logger_fatal("overflow, len=%ld", (long) len);
}
if (!client_.readn_peek(header_read_, 2 - len, false)) {
if (!client_.readn_peek(peek_buf_, 2 - len, false)) {
return false;
}
assert(header_read_->size() == 2);
assert(peek_buf_->size() == 2);
unsigned char* s = (unsigned char*) header_read_->c_str();
unsigned char* s = (unsigned char*) peek_buf_->c_str();
update_head_2bytes(s[0], s[1]);
if (header_.payload_len == 126) {
@ -459,79 +459,79 @@ bool websocket::peek_head_2bytes(void)
status_ = WS_HEAD_FINISH;
}
header_read_->clear();
peek_buf_->clear();
return true;
}
bool websocket::peek_head_len_2bytes(void)
{
size_t len = header_read_->size();
size_t len = peek_buf_->size();
if (len >= 2) {
logger_fatal("overflow, len=%ld", (long) len);
}
if (!client_.readn_peek(header_read_, 2 - len, false)) {
if (!client_.readn_peek(peek_buf_, 2 - len, false)) {
return false;
}
assert(header_read_->size() == 2);
assert(peek_buf_->size() == 2);
unsigned short n;
memcpy(&n, header_read_->c_str(), 2);
memcpy(&n, peek_buf_->c_str(), 2);
header_.payload_len = ntohs(n);
status_ = header_.mask ? WS_HEAD_MASKING_KEY : WS_HEAD_FINISH;
header_read_->clear();
peek_buf_->clear();
return true;
}
bool websocket::peek_head_len_8bytes(void)
{
size_t len = header_read_->size();
size_t len = peek_buf_->size();
if (len >= 8) {
logger_fatal("overflow, len=%ld", (long) len);
}
if (!client_.readn_peek(header_read_, 8 - len, false)) {
if (!client_.readn_peek(peek_buf_, 8 - len, false)) {
return false;
}
assert(header_read_->size() == 8);
assert(peek_buf_->size() == 8);
memcpy(&header_.payload_len, header_read_->c_str(), 8);
memcpy(&header_.payload_len, peek_buf_->c_str(), 8);
header_.payload_len = ntoh64(header_.payload_len);
status_ = header_.mask ? WS_HEAD_MASKING_KEY : WS_HEAD_FINISH;
header_read_->clear();
peek_buf_->clear();
return true;
}
bool websocket::peek_head_masking_key(void)
{
size_t len = header_read_->size();
size_t len = peek_buf_->size();
if (len >= sizeof(unsigned)) {
logger_fatal("overflow, len=%ld", (long) len);
}
if (!client_.readn_peek(header_read_, sizeof(unsigned) - len, false)) {
if (!client_.readn_peek(peek_buf_, sizeof(unsigned) - len, false)) {
return false;
}
assert(header_read_->size() == sizeof(unsigned));
assert(peek_buf_->size() == sizeof(unsigned));
memcpy(&header_.masking_key, header_read_->c_str(), sizeof(unsigned));
memcpy(&header_.masking_key, peek_buf_->c_str(), sizeof(unsigned));
status_ = WS_HEAD_FINISH;
header_read_->clear();
peek_buf_->clear();
return true;
}
bool websocket::peek_frame_head(void)
{
if (header_read_ == NULL) {
header_read_ = NEW string(8);
if (peek_buf_ == NULL) {
peek_buf_ = NEW string(8);
}
while (true) {
@ -565,9 +565,9 @@ bool websocket::peek_frame_head(void)
}
}
bool websocket::is_read_head(void) const
bool websocket::is_head_finish(void) const
{
return status_ != WS_HEAD_FINISH;
return status_ == WS_HEAD_FINISH;
}
int websocket::peek_frame_data(char* buf, size_t size)
@ -581,17 +581,21 @@ int websocket::peek_frame_data(char* buf, size_t size)
size = (size_t) (header_.payload_len - payload_nread_);
}
if (!client_.read_peek(header_read_, false)) {
// 如果未读满所要求的数据且读到的数据为空,则返回-1
// readn_peek 第三个参数为 true 要求内部自动清空缓冲区
if (!client_.readn_peek(peek_buf_, size, true) && peek_buf_->empty()) {
return -1;
}
size_t len = header_read_->size();
memcpy(buf, header_read_->c_str(), len);
acl_assert(!peek_buf_->empty());
size_t len = peek_buf_->size();
memcpy(buf, peek_buf_->c_str(), len);
if (header_.mask) {
unsigned char* mask = (unsigned char*) &header_.masking_key;
for (size_t i = 0; i < len; i++) {
((char*) buf)[i] ^= mask[(payload_nread_ + i) % 4];
buf[i] ^= mask[(payload_nread_ + i) % 4];
}
}
@ -599,6 +603,43 @@ int websocket::peek_frame_data(char* buf, size_t size)
return len;
}
int websocket::peek_frame_data(string& buf, size_t size)
{
if (payload_nread_ >= header_.payload_len) {
reset();
return 0;
}
if (header_.payload_len < payload_nread_ + size) {
size = (size_t) (header_.payload_len - payload_nread_);
}
size_t nbefore = buf.size();
if (!client_.readn_peek(buf, size, false)) {
if (buf.size() == nbefore) {
return -1;
}
}
size_t nafter = buf.size();
acl_assert(nafter > nbefore);
size_t len = nafter - nbefore;
if (header_.mask) {
unsigned char* mask = (unsigned char*) &header_.masking_key;
char* ptr = buf.c_str() + nbefore;
for (size_t i = 0; i < len; i++) {
ptr[i] ^= mask[(payload_nread_ + i) % 4];
}
}
payload_nread_ += len;
return len;
}
bool websocket::eof(void)
{
return client_.eof();