acl/lib_acl_cpp/samples/mqtt/mqtt_client/main.cpp

215 lines
5.2 KiB
C++
Raw Normal View History

2021-03-06 10:49:55 +08:00
#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
2022-07-20 16:25:21 +08:00
#include "lib_acl.h"
2021-03-06 10:49:55 +08:00
#include "acl_cpp/lib_acl.hpp"
2022-07-20 16:25:21 +08:00
static bool __show_messsage = false;
static bool __show_binary = false;
2021-03-08 19:57:24 +08:00
static std::vector<acl::string> __topics;
2021-03-08 15:03:32 +08:00
static bool handle_connack(acl::mqtt_client& conn,
const acl::mqtt_message& message) {
2021-03-06 10:49:55 +08:00
const acl::mqtt_connack& connack = (const acl::mqtt_connack&) message;
2021-03-08 15:03:32 +08:00
printf("%s => connect code=%d\r\n", __FUNCTION__, connack.get_connack_code());
2021-03-06 10:49:55 +08:00
acl::mqtt_subscribe subscribe;
subscribe.set_pkt_id(100);
2021-03-08 19:57:24 +08:00
for (std::vector<acl::string>::const_iterator cit = __topics.begin();
cit != __topics.end(); ++cit) {
subscribe.add_topic(*cit, acl::MQTT_QOS1);
printf(" %s: add topic -> %s\r\n", __FUNCTION__, (*cit).c_str());
}
2021-03-06 10:49:55 +08:00
if (conn.send(subscribe)) {
2021-03-08 19:57:24 +08:00
printf("send subscribe ok\r\n");
2021-03-06 10:49:55 +08:00
return true;
}
2021-03-08 19:57:24 +08:00
printf("send subscribe error\r\n");
2021-03-06 10:49:55 +08:00
return false;
}
2021-03-08 15:03:32 +08:00
static bool handle_suback(acl::mqtt_client& conn,
const acl::mqtt_message& message) {
2021-03-06 10:49:55 +08:00
const acl::mqtt_suback& suback = (const acl::mqtt_suback&) message;
unsigned short pkt_id = suback.get_pkt_id();
2021-03-08 15:03:32 +08:00
printf("%s => pkt_id=%d\r\n", __FUNCTION__, pkt_id);
2021-03-06 10:49:55 +08:00
const std::vector<acl::mqtt_qos_t>& qoses = suback.get_qoses();
for (std::vector<acl::mqtt_qos_t>::const_iterator cit = qoses.begin();
cit != qoses.end(); ++cit) {
printf(" qos=%d\r\n", (int) *cit);
}
2021-03-08 15:03:32 +08:00
acl::mqtt_pingreq pingreq;
if (!conn.send(pingreq)) {
printf("send pingreq error\r\n");
return false;
}
printf("%s => send pingreq ok\r\n", __FUNCTION__);
2021-03-06 10:49:55 +08:00
return true;
}
2021-03-08 19:57:24 +08:00
static bool handle_publish(acl::mqtt_client& conn,
const acl::mqtt_message& message) {
2021-03-06 10:49:55 +08:00
const acl::mqtt_publish& publish = (const acl::mqtt_publish&) message;
const acl::string& payload = publish.get_payload();
2022-07-20 16:25:21 +08:00
if (__show_messsage) {
if (__show_binary) {
for (size_t i = 0; i < payload.size(); i++) {
char ch = payload[i];
printf("[%zd]->%d ", i, (int) ch);
}
printf("\r\n");
} else {
printf("payload: %s\r\n", payload.c_str());
}
}
ACL_VSTREAM* vs = conn.sock_stream()->get_vstream();
assert(vs);
printf("topic: %s, qos: %d, pkt_id: %d, payload len: %zd, total read=%lld\r\n",
2021-03-08 19:57:24 +08:00
publish.get_topic(), (int) publish.get_header().get_qos(),
2022-07-20 16:25:21 +08:00
publish.get_pkt_id(), payload.size(), vs->total_read_cnt);
2021-03-08 19:57:24 +08:00
if (publish.get_header().get_qos() == acl::MQTT_QOS0) {
return true;
}
acl::mqtt_puback puback;
puback.set_pkt_id(publish.get_pkt_id());
if (!conn.send(puback)) {
printf("%s => puback error, pkt_id=%d\r\n",
__FUNCTION__, puback.get_pkt_id());
return false;
}
printf("%s => puback ok, pkt_id=%d\r\n", __FUNCTION__, puback.get_pkt_id());
2021-03-06 10:49:55 +08:00
return true;
}
static bool handle_pingreq(acl::mqtt_client& conn) {
acl::mqtt_pingresp pingresp;
if (conn.send(pingresp)) {
return true;
}
2021-03-08 15:03:32 +08:00
printf("%s => send pingresp error\r\n", __FUNCTION__);
2021-03-06 10:49:55 +08:00
return false;
}
static bool handle_pingresp(void) {
2021-03-08 15:03:32 +08:00
printf("%s => got pingresp\r\n", __FUNCTION__);
2021-03-06 10:49:55 +08:00
return true;
}
static bool handle_disconnect(acl::mqtt_message&) {
2021-03-08 15:03:32 +08:00
printf("%s => disconnect\r\n", __FUNCTION__);
2021-03-06 10:49:55 +08:00
return false;
}
static bool handle_message(acl::mqtt_client& conn, acl::mqtt_message& message) {
2021-03-08 15:03:32 +08:00
acl::mqtt_type_t type = message.get_header().get_type();
2021-03-06 10:49:55 +08:00
switch (type) {
case acl::MQTT_CONNACK:
return handle_connack(conn, message);
case acl::MQTT_PINGREQ:
return handle_pingreq(conn);
case acl::MQTT_PINGRESP:
return handle_pingresp();
case acl::MQTT_DISCONNECT:
return handle_disconnect(message);
case acl::MQTT_SUBACK:
2021-03-08 15:03:32 +08:00
return handle_suback(conn, message);
2021-03-06 10:49:55 +08:00
case acl::MQTT_PUBLISH:
2021-03-08 19:57:24 +08:00
return handle_publish(conn, message);
2021-03-06 10:49:55 +08:00
default:
printf("unknown type=%d\r\n", (int) type);
return false;
}
}
static void usage(const char* procname) {
2022-07-20 16:25:21 +08:00
printf("usage: %s -h [help] \r\n"
" -s addr\r\n"
" -D [if display messages]\r\n"
" -B [if display as binary data format]\r\n"
, procname);
2021-03-06 10:49:55 +08:00
}
int main(int argc, char* argv[]) {
char ch;
2021-03-11 19:03:22 +08:00
acl::string addr("127.0.0.1|1883");
2021-03-06 10:49:55 +08:00
2022-07-20 16:25:21 +08:00
while ((ch = getopt(argc, argv, "hs:DB")) > 0) {
2021-03-06 10:49:55 +08:00
switch (ch) {
case 'h':
usage(argv[0]);
return 0;
case 's':
addr = optarg;
break;
2022-07-20 16:25:21 +08:00
case 'D':
__show_messsage = true;
break;
case 'B':
__show_binary = true;
break;
2021-03-06 10:49:55 +08:00
default:
break;
}
}
acl::log::stdout_open(true);
2021-03-08 19:57:24 +08:00
printf("optind: %d, argc: %d\n", optind, argc);
for (int i = optind; i < argc; i++) {
printf("add one topic: %s\r\n", argv[i]);
__topics.push_back(argv[i]);
}
if (__topics.empty()) {
__topics.push_back("test/topic");
}
2021-03-06 10:49:55 +08:00
acl::mqtt_connect message;
message.set_cid("client-id-test-xxx");
message.set_username("user-zsx");
//message.set_passwd("pass");
#if 0
message.set_will_qos(acl::MQTT_QOS0);
message.set_will_topic("test/topic");
message.set_will_msg("msg-hello");
#endif
printf("-----------------------------------------------\r\n");
acl::mqtt_client conn(addr, 10, 0);
if (!conn.send(message)) {
printf("send message error\r\n");
return 1;
}
printf("send message ok\r\n");
while (true) {
acl::mqtt_message* res = conn.get_message();
if (res == NULL) {
printf("read message error\r\n");
break;
}
2021-03-07 10:34:59 +08:00
//printf("-----------------------------------------------\r\n");
//printf("read one message\r\n");
2021-03-06 10:49:55 +08:00
if (!handle_message(conn, *res)) {
delete res;
break;
}
delete res;
}
return 0;
}