add mqtt_client for transfering mqtt message between localhost and network.

This commit is contained in:
shuxin   zheng 2021-03-05 14:30:21 +08:00
parent 7474b95a83
commit c2594aa5fe
18 changed files with 399 additions and 47 deletions

View File

@ -15,10 +15,18 @@ public:
return pkt_id_; return pkt_id_;
} }
protected:
// @override
bool to_string(string& out); bool to_string(string& out);
// @override
int update(const char* data, int dlen); int update(const char* data, int dlen);
// @override
bool is_finished(void) const {
return finished_;
}
public: public:
int update_header_var(const char* data, int dlen); int update_header_var(const char* data, int dlen);

View File

@ -0,0 +1,37 @@
#pragma once
#include "../stdlib/string.hpp"
#include "../connpool/connect_client.hpp"
#include "../stream/socket_stream.hpp"
namespace acl {
class mqtt_message;
class mqtt_client : public connect_client {
public:
mqtt_client(const char* addr, int conn_timeout = 10, int rw_timeout = 10);
~mqtt_client(void);
bool send(mqtt_message& message);
mqtt_message* get_message(void);
public:
bool read_header(mqtt_message& header);
bool read_body(const mqtt_message& header, mqtt_message& body);
mqtt_message* create_body(const mqtt_message& header);
protected:
// @override
bool open(void);
private:
string addr_;
int conn_timeout_;
int rw_timeout_;
socket_stream conn_;
};
} // namespace acl

View File

@ -29,10 +29,18 @@ public:
return connack_code_; return connack_code_;
} }
protected:
// @override
bool to_string(string& out); bool to_string(string& out);
// @override
int update(const char* data, int dlen); int update(const char* data, int dlen);
// @override
bool is_finished(void) const {
return finished_;
}
public: public:
int update_header_var(const char* data, int dlen); int update_header_var(const char* data, int dlen);

View File

@ -25,14 +25,19 @@ public:
void set_will_topic(const char* topic); void set_will_topic(const char* topic);
void set_will_msg(const char* msg); void set_will_msg(const char* msg);
protected:
// @override
bool to_string(string& out); bool to_string(string& out);
// @override
int update(const char* data, int dlen); int update(const char* data, int dlen);
// @override
bool is_finished(void) const { bool is_finished(void) const {
return finished_; return finished_;
} }
public:
unsigned short get_keep_alive(void) const { unsigned short get_keep_alive(void) const {
return keep_alive_; return keep_alive_;
} }

View File

@ -9,7 +9,20 @@ public:
mqtt_disconnect(void); mqtt_disconnect(void);
~mqtt_disconnect(void); ~mqtt_disconnect(void);
protected:
// @override
bool to_string(string& out); bool to_string(string& out);
// @override
int update(const char*, int dlen) {
return dlen;
}
// @override
bool is_finished(void) const {
return true;
}
}; };
} // namespace acl } // namespace acl

View File

@ -70,6 +70,21 @@ public:
return dlen_; return dlen_;
} }
public:
virtual bool to_string(string& out) {
(void) out;
return false;
}
virtual int update(const char* data, int dlen) {
(void) data;
return dlen;
}
virtual bool is_finished(void) const {
return false;
}
protected: protected:
unsigned status_; unsigned status_;

View File

@ -9,7 +9,19 @@ public:
mqtt_pingreq(void); mqtt_pingreq(void);
~mqtt_pingreq(void); ~mqtt_pingreq(void);
protected:
// @override
bool to_string(string& out); bool to_string(string& out);
// @override
int update(const char*, int dlen) {
return dlen;
}
// @override
bool is_finished(void) const {
return true;
}
}; };
} // namespace acl } // namespace acl

View File

@ -9,7 +9,17 @@ public:
mqtt_pingresp(void); mqtt_pingresp(void);
~mqtt_pingresp(void); ~mqtt_pingresp(void);
protected:
// @override
bool to_string(string& out); bool to_string(string& out);
// @override
int update(const char*, int dlen);
// @override
bool is_finished(void) const {
return true;
}
}; };
} // namespace acl } // namespace acl

View File

@ -6,7 +6,7 @@ namespace acl {
class mqtt_publish : public mqtt_message { class mqtt_publish : public mqtt_message {
public: public:
mqtt_publish(unsigned payload_len); mqtt_publish(unsigned body_len = 0);
~mqtt_publish(void); ~mqtt_publish(void);
void set_dup(bool yes); void set_dup(bool yes);
@ -41,23 +41,27 @@ public:
return payload_len_; return payload_len_;
} }
bool to_string(string& out);
int update(const char* data, int dlen);
bool is_finished(void) const {
return finished_;
}
const string& get_payload(void) const { const string& get_payload(void) const {
return payload_; return payload_;
} }
protected:
// @override
bool to_string(string& out);
// @override
int update(const char* data, int dlen);
// @override
bool is_finished(void) const {
return finished_;
}
public: public:
int update_header_var(const char* data, int dlen); int update_header_var(const char* data, int dlen);
int update_topic_len(const char* data, int dlen); int update_topic_len(const char* data, int dlen);
int update_topic_val(const char* data, int dlen); int update_topic_val(const char* data, int dlen);
int update_header_pktid(const char* data, int dlen); int update_pktid(const char* data, int dlen);
int update_payload(const char* data, int dlen); int update_payload(const char* data, int dlen);
protected: protected:

View File

@ -12,10 +12,18 @@ public:
void set_pkt_id(unsigned short id); void set_pkt_id(unsigned short id);
void add_topic_qos(mqtt_qos_t qos); void add_topic_qos(mqtt_qos_t qos);
protected:
// @override
bool to_string(string& out); bool to_string(string& out);
// @override
int update(const char* data, int dlen); int update(const char* data, int dlen);
// @override
bool is_finished(void) const {
return finished_;
}
public: public:
int update_header_var(const char* data, int dlen); int update_header_var(const char* data, int dlen);
int update_topic_qos(const char* data, int dlen); int update_topic_qos(const char* data, int dlen);
@ -28,7 +36,7 @@ private:
unsigned short pkt_id_; unsigned short pkt_id_;
std::vector<mqtt_qos_t> qoses_; std::vector<mqtt_qos_t> qoses_;
unsigned payload_len_; unsigned body_len_;
unsigned nread_; unsigned nread_;
}; };

View File

@ -6,7 +6,7 @@ namespace acl {
class mqtt_subscribe : public mqtt_message { class mqtt_subscribe : public mqtt_message {
public: public:
mqtt_subscribe(unsigned payload_len = 0); mqtt_subscribe(unsigned body_dlen = 0);
~mqtt_subscribe(void); ~mqtt_subscribe(void);
void set_pkt_id(unsigned short id); void set_pkt_id(unsigned short id);
@ -16,10 +16,18 @@ public:
return pkt_id_; return pkt_id_;
} }
protected:
// @override
bool to_string(string& out); bool to_string(string& out);
// @override
int update(const char* data, int dlen); int update(const char* data, int dlen);
// @override
bool is_finished(void) const {
return finished_;
}
public: public:
int update_header_var(const char* data, int dlen); int update_header_var(const char* data, int dlen);
int update_topic_len(const char* data, int dlen); int update_topic_len(const char* data, int dlen);
@ -41,7 +49,7 @@ private:
std::vector<string> topics_; std::vector<string> topics_;
std::vector<mqtt_qos_t> qoses_; std::vector<mqtt_qos_t> qoses_;
unsigned payload_len_; unsigned body_len_;
unsigned nread_; unsigned nread_;
string topic_; string topic_;

View File

@ -6,16 +6,24 @@ namespace acl {
class mqtt_unsubscribe : public mqtt_message { class mqtt_unsubscribe : public mqtt_message {
public: public:
mqtt_unsubscribe(unsigned payload_len = 0); mqtt_unsubscribe(unsigned body_len = 0);
~mqtt_unsubscribe(void); ~mqtt_unsubscribe(void);
void set_pkt_id(unsigned short id); void set_pkt_id(unsigned short id);
void add_topic(const char* topic); void add_topic(const char* topic);
protected:
// @override
bool to_string(string& out); bool to_string(string& out);
// @override
int update(const char* data, int dlen); int update(const char* data, int dlen);
// @override
bool is_finished(void) const {
return finished_;
}
public: public:
int update_header_var(const char* data, int dlen); int update_header_var(const char* data, int dlen);
int update_topic_len(const char* data, int dlen); int update_topic_len(const char* data, int dlen);
@ -29,7 +37,7 @@ private:
unsigned short pkt_id_; unsigned short pkt_id_;
std::vector<string> topics_; std::vector<string> topics_;
unsigned payload_len_; unsigned body_len_;
unsigned nread_; unsigned nread_;
string topic_; string topic_;

View File

@ -87,6 +87,7 @@ int mqtt_ack::update_header_var(const char* data, int dlen) {
logger_error("unpack pkt id error"); logger_error("unpack pkt id error");
return -1; return -1;
} }
finished_ = true; finished_ = true;
return dlen; return dlen;
} }

View File

@ -0,0 +1,215 @@
#include "acl_stdafx.hpp"
#include "acl_cpp/stdlib/util.hpp"
#include "acl_cpp/mqtt/mqtt_message.hpp"
#include "acl_cpp/mqtt/mqtt_connect.hpp"
#include "acl_cpp/mqtt/mqtt_connack.hpp"
#include "acl_cpp/mqtt/mqtt_publish.hpp"
#include "acl_cpp/mqtt/mqtt_puback.hpp"
#include "acl_cpp/mqtt/mqtt_pubrec.hpp"
#include "acl_cpp/mqtt/mqtt_pubrel.hpp"
#include "acl_cpp/mqtt/mqtt_pubcomp.hpp"
#include "acl_cpp/mqtt/mqtt_subscribe.hpp"
#include "acl_cpp/mqtt/mqtt_suback.hpp"
#include "acl_cpp/mqtt/mqtt_unsubscribe.hpp"
#include "acl_cpp/mqtt/mqtt_unsuback.hpp"
#include "acl_cpp/mqtt/mqtt_pingreq.hpp"
#include "acl_cpp/mqtt/mqtt_pingresp.hpp"
#include "acl_cpp/mqtt/mqtt_disconnect.hpp"
#include "acl_cpp/mqtt/mqtt_client.hpp"
namespace acl {
mqtt_client::mqtt_client(const char* addr, int conn_timeout, int rw_timeout)
: addr_(addr)
, conn_timeout_(conn_timeout)
, rw_timeout_(rw_timeout)
{
}
mqtt_client::~mqtt_client(void) {}
bool mqtt_client::open(void) {
if (conn_.opened()) {
return true;
}
if (!conn_.open(addr_, conn_timeout_, rw_timeout_)) {
logger_error("connect redis %s error: %s",
addr_.c_str(), last_serror());
return false;
}
return true;
}
bool mqtt_client::send(mqtt_message& message) {
string buff;
if (!message.to_string(buff)) {
logger_error("build mqtt message error");
return false;
}
if (buff.empty()) {
logger_error("mqtt message empty");
return false;
}
if (!open()) {
logger_error("open error");
return false;
}
if (conn_.write(buff) == -1) {
logger_error("send message error=%s", last_serror());
conn_.close();
return false;
}
return true;
}
mqtt_message* mqtt_client::get_message(void) {
mqtt_message header(MQTT_RESERVED_MIN);
if (!read_header(header)) {
conn_.close();
logger_error("get header error");
return NULL;
}
mqtt_message* body = create_body(header);
if (body == NULL) {
logger_error("create_message error");
return NULL;
}
if (!read_body(header, *body)) {
delete body;
return NULL;
}
return body;
}
bool mqtt_client::read_header(mqtt_message& header) {
char ch;
if (!conn_.read(ch)) {
logger_error("read header type error: %s", last_serror());
return false;
}
if (header.header_update(&ch, 1) != 0) {
logger_error("invalid header type=%d", (int) ch);
return false;
}
for (int i = 0; i < 4; i++) {
if (!conn_.read(ch)) {
logger_error("read one char error: %s, i=%d",
last_serror(), i);
return false;
}
if (header.header_update(&ch, 1) != 0) {
logger_error("header_update error, ch=%d", (int) ch);
return false;
}
if (header.header_finish()) {
break;
}
}
if (!header.header_finish()) {
logger_error("get mqtt header error");
return false;
}
return true;
}
bool mqtt_client::read_body(const mqtt_message& header, mqtt_message& body) {
unsigned len = header.get_data_length();
if (len == 0) {
return true;
}
char buf[8192];
while (len > 0) {
size_t size = sizeof(buf) > len ? len : sizeof(buf);
int n = conn_.read(buf, size);
if (n == -1) {
logger_error("read body error: %s", last_serror());
return false;
}
len -= n;
n = body.update(buf, (int) size);
if (n == -1) {
logger_error("update body error");
return false;
} else if (n != 0) {
logger_error("invalid body data");
return false;
}
}
if (!body.is_finished()) {
logger_error("body not finished!");
return false;
}
return true;
}
mqtt_message* mqtt_client::create_body(const mqtt_message& header) {
mqtt_type_t type = header.get_type();
mqtt_message* message;
unsigned dlen = header.get_data_length();
switch (type) {
case MQTT_CONNECT:
message = NEW mqtt_connect();
break;
case MQTT_CONNACK:
message = NEW mqtt_connack();
break;
case MQTT_PUBLISH:
message = NEW mqtt_publish(dlen);
break;
case MQTT_PUBACK:
message = NEW mqtt_puback();
break;
case MQTT_PUBREC:
message = NEW mqtt_pubrec();
break;
case MQTT_PUBREL:
message = NEW mqtt_pubrel();
break;
case MQTT_PUBCOMP:
message = NEW mqtt_pubcomp();
break;
case MQTT_SUBSCRIBE:
message = NEW mqtt_subscribe(dlen);
break;
case MQTT_SUBACK:
message = NEW mqtt_suback(dlen);
break;
case MQTT_UNSUBSCRIBE:
message = NEW mqtt_unsubscribe(dlen);
break;
case MQTT_UNSUBACK:
message = NEW mqtt_unsuback();
break;
case MQTT_PINGREQ:
message = NEW mqtt_pingreq();
break;
case MQTT_PINGRESP:
message = NEW mqtt_pingresp();
break;
case MQTT_DISCONNECT:
message = NEW mqtt_disconnect();
break;
default:
logger_error("unknown mqtt type=%d", (int) type);
message = NULL;
break;
}
return message;
}
} // namespace acl

View File

@ -7,11 +7,11 @@ enum {
MQTT_STAT_HDR_VAR, MQTT_STAT_HDR_VAR,
MQTT_STAT_TOPIC_LEN, MQTT_STAT_TOPIC_LEN,
MQTT_STAT_TOPIC_VAL, MQTT_STAT_TOPIC_VAL,
MQTT_STAT_HDR_PKTID, MQTT_STAT_PKTID,
MQTT_STAT_PAYLOAD, MQTT_STAT_PAYLOAD,
}; };
mqtt_publish::mqtt_publish(unsigned payload_len /* 0 */) mqtt_publish::mqtt_publish(unsigned body_len /* 0 */)
: mqtt_message(MQTT_PUBLISH) : mqtt_message(MQTT_PUBLISH)
, finished_(false) , finished_(false)
, dlen_(0) , dlen_(0)
@ -20,7 +20,7 @@ mqtt_publish::mqtt_publish(unsigned payload_len /* 0 */)
, qos_(MQTT_QOS0) , qos_(MQTT_QOS0)
, retain_(false) , retain_(false)
, pkt_id_(0) , pkt_id_(0)
, payload_len_(payload_len) , payload_len_(body_len)
{ {
status_ = MQTT_STAT_HDR_VAR; // just for update() status_ = MQTT_STAT_HDR_VAR; // just for update()
} }
@ -93,7 +93,7 @@ static struct {
{ MQTT_STAT_TOPIC_LEN, &mqtt_publish::update_topic_len }, { MQTT_STAT_TOPIC_LEN, &mqtt_publish::update_topic_len },
{ MQTT_STAT_TOPIC_VAL, &mqtt_publish::update_topic_val }, { MQTT_STAT_TOPIC_VAL, &mqtt_publish::update_topic_val },
{ MQTT_STAT_HDR_PKTID, &mqtt_publish::update_header_pktid }, { MQTT_STAT_PKTID, &mqtt_publish::update_pktid },
{ MQTT_STAT_PAYLOAD, &mqtt_publish::update_payload }, { MQTT_STAT_PAYLOAD, &mqtt_publish::update_payload },
}; };
@ -167,14 +167,14 @@ int mqtt_publish::update_topic_val(const char* data, int dlen) {
} }
dlen_ = 0; dlen_ = 0;
status_ = MQTT_STAT_HDR_PKTID; status_ = MQTT_STAT_PKTID;
return dlen; return dlen;
} }
#define HDR_PKTID_LEN 2 #define HDR_PKTID_LEN 2
int mqtt_publish::update_header_pktid(const char* data, int dlen) { int mqtt_publish::update_pktid(const char* data, int dlen) {
assert(data && dlen > 0); assert(data && dlen > 0);
assert(sizeof(buff_) >= HDR_PKTID_LEN); assert(sizeof(buff_) >= HDR_PKTID_LEN);

View File

@ -8,12 +8,12 @@ enum {
MQTT_STAT_TOPIC_QOS, MQTT_STAT_TOPIC_QOS,
}; };
mqtt_suback::mqtt_suback(unsigned payload_len /* 0 */) mqtt_suback::mqtt_suback(unsigned body_len /* 0 */)
: mqtt_message(MQTT_SUBACK) : mqtt_message(MQTT_SUBACK)
, finished_(false) , finished_(false)
, dlen_(0) , dlen_(0)
, pkt_id_(0) , pkt_id_(0)
, payload_len_(payload_len) , body_len_(body_len)
, nread_(0) , nread_(0)
{ {
status_ = MQTT_STAT_HDR_VAR; /* just for update */ status_ = MQTT_STAT_HDR_VAR; /* just for update */
@ -27,7 +27,7 @@ void mqtt_suback::set_pkt_id(unsigned short id) {
void mqtt_suback::add_topic_qos(mqtt_qos_t qos) { void mqtt_suback::add_topic_qos(mqtt_qos_t qos) {
qoses_.push_back(qos); qoses_.push_back(qos);
payload_len_ += 1; body_len_ += 1;
} }
bool mqtt_suback::to_string(string& out) { bool mqtt_suback::to_string(string& out) {
@ -38,8 +38,8 @@ bool mqtt_suback::to_string(string& out) {
bool old_mode = out.get_bin(); bool old_mode = out.get_bin();
payload_len_ += sizeof(pkt_id_); body_len_ += sizeof(pkt_id_);
this->set_data_length(payload_len_); this->set_data_length(body_len_);
if (!this->pack_header(out)) { if (!this->pack_header(out)) {
out.set_bin(old_mode); out.set_bin(old_mode);
@ -110,7 +110,7 @@ int mqtt_suback::update_header_var(const char* data, int dlen) {
return -1; return -1;
} }
if (nread_ >= payload_len_) { if (nread_ >= body_len_) {
logger_warn("no payload!"); logger_warn("no payload!");
finished_ = true; finished_ = true;
return dlen; return dlen;
@ -135,7 +135,7 @@ int mqtt_suback::update_topic_qos(const char* data, int dlen) {
qoses_.push_back((mqtt_qos_t) qos); qoses_.push_back((mqtt_qos_t) qos);
if (nread_ >= payload_len_) { if (nread_ >= body_len_) {
finished_ = true; finished_ = true;
return dlen; return dlen;
} }

View File

@ -10,12 +10,12 @@ enum {
MQTT_STAT_TOPIC_QOS, MQTT_STAT_TOPIC_QOS,
}; };
mqtt_subscribe::mqtt_subscribe(unsigned payload_len /* 0 */) mqtt_subscribe::mqtt_subscribe(unsigned body_len /* 0 */)
: mqtt_message(MQTT_SUBSCRIBE) : mqtt_message(MQTT_SUBSCRIBE)
, finished_(false) , finished_(false)
, dlen_(0) , dlen_(0)
, pkt_id_(0) , pkt_id_(0)
, payload_len_(payload_len) , body_len_(body_len)
, nread_(0) , nread_(0)
{ {
status_ = MQTT_STAT_HDR_VAR; /* just for update */ status_ = MQTT_STAT_HDR_VAR; /* just for update */
@ -30,7 +30,7 @@ void mqtt_subscribe::set_pkt_id(unsigned short id) {
void mqtt_subscribe::add_topic(const char* topic, mqtt_qos_t qos) { void mqtt_subscribe::add_topic(const char* topic, mqtt_qos_t qos) {
topics_.push_back(topic); topics_.push_back(topic);
qoses_.push_back(qos); qoses_.push_back(qos);
payload_len_ += strlen(topic) + 1; body_len_ += strlen(topic) + 1;
} }
bool mqtt_subscribe::to_string(string& out) { bool mqtt_subscribe::to_string(string& out) {
@ -41,8 +41,8 @@ bool mqtt_subscribe::to_string(string& out) {
bool old_mode = out.get_bin(); bool old_mode = out.get_bin();
payload_len_ += sizeof(pkt_id_); body_len_ += sizeof(pkt_id_);
this->set_data_length(payload_len_); this->set_data_length(body_len_);
if (!this->pack_header(out)) { if (!this->pack_header(out)) {
out.set_bin(old_mode); out.set_bin(old_mode);
@ -115,7 +115,7 @@ int mqtt_subscribe::update_header_var(const char* data, int dlen) {
return -1; return -1;
} }
if (nread_ >= payload_len_) { if (nread_ >= body_len_) {
logger_error("no payload!"); logger_error("no payload!");
return -1; return -1;
} }
@ -154,9 +154,9 @@ int mqtt_subscribe::update_topic_len(const char* data, int dlen) {
return -1; return -1;
} }
if (nread_ >= payload_len_) { if (nread_ >= body_len_) {
logger_error("overflow, nread=%u, payload_len=%u", logger_error("overflow, nread=%u, payload_len=%u",
nread_, payload_len_); nread_, body_len_);
return -1; return -1;
} }
@ -209,7 +209,7 @@ int mqtt_subscribe::update_topic_qos(const char* data, int dlen) {
qoses_.push_back((mqtt_qos_t) qos); qoses_.push_back((mqtt_qos_t) qos);
if (nread_ >= payload_len_) { if (nread_ >= body_len_) {
finished_ = true; finished_ = true;
return dlen; return dlen;
} }

View File

@ -9,12 +9,12 @@ enum {
MQTT_STAT_TOPIC_VAL, MQTT_STAT_TOPIC_VAL,
}; };
mqtt_unsubscribe::mqtt_unsubscribe(unsigned payload_len /* 0 */) mqtt_unsubscribe::mqtt_unsubscribe(unsigned body_len /* 0 */)
: mqtt_message(MQTT_UNSUBSCRIBE) : mqtt_message(MQTT_UNSUBSCRIBE)
, finished_(false) , finished_(false)
, dlen_(0) , dlen_(0)
, pkt_id_(0) , pkt_id_(0)
, payload_len_(payload_len) , body_len_(body_len)
, nread_(0) , nread_(0)
{ {
status_ = MQTT_STAT_HDR_VAR; /* just for update */ status_ = MQTT_STAT_HDR_VAR; /* just for update */
@ -28,7 +28,7 @@ void mqtt_unsubscribe::set_pkt_id(unsigned short id) {
void mqtt_unsubscribe::add_topic(const char* topic) { void mqtt_unsubscribe::add_topic(const char* topic) {
topics_.push_back(topic); topics_.push_back(topic);
payload_len_ += strlen(topic); body_len_ += strlen(topic);
} }
bool mqtt_unsubscribe::to_string(string& out) { bool mqtt_unsubscribe::to_string(string& out) {
@ -39,8 +39,8 @@ bool mqtt_unsubscribe::to_string(string& out) {
bool old_mode = out.get_bin(); bool old_mode = out.get_bin();
payload_len_ += sizeof(pkt_id_); body_len_ += sizeof(pkt_id_);
this->set_data_length(payload_len_); this->set_data_length(body_len_);
if (!this->pack_header(out)) { if (!this->pack_header(out)) {
out.set_bin(old_mode); out.set_bin(old_mode);
@ -111,7 +111,7 @@ int mqtt_unsubscribe::update_header_var(const char* data, int dlen) {
return -1; return -1;
} }
if (nread_ >= payload_len_) { if (nread_ >= body_len_) {
logger_error("no payload!"); logger_error("no payload!");
return -1; return -1;
} }
@ -150,9 +150,9 @@ int mqtt_unsubscribe::update_topic_len(const char* data, int dlen) {
return -1; return -1;
} }
if (nread_ >= payload_len_) { if (nread_ >= body_len_) {
logger_error("overflow, nread=%u, payload_len=%u", logger_error("overflow, nread=%u, body_len=%u",
nread_, payload_len_); nread_, body_len_);
return -1; return -1;
} }
@ -178,7 +178,7 @@ int mqtt_unsubscribe::update_topic_val(const char* data, int dlen) {
nread_ += (unsigned) topic_.size(); nread_ += (unsigned) topic_.size();
if (nread_ >= payload_len_) { if (nread_ >= body_len_) {
finished_ = true; finished_ = true;
return dlen; return dlen;
} }