acl/lib_acl_cpp/include/acl_cpp/mqtt/mqtt_aclient.hpp
2021-03-11 19:03:22 +08:00

128 lines
3.1 KiB
C++

#pragma once
#include "../acl_cpp_define.hpp"
#include "../stream/aio_socket_stream.hpp"
namespace acl {
class sslbase_conf;
class mqtt_header;
class mqtt_message;
class ACL_CPP_API mqtt_aclient : public aio_open_callback {
public:
/**
* constructor
* @param handle {aio_handle&}
* @param ssl_conf {sslbase_conf*} if not NULL, ssl will be used
*/
mqtt_aclient(aio_handle& handle, sslbase_conf* ssl_conf = NULL);
/**
* because the subclass object was created dynamically, the method will
* be called when the subclass object is to be freed
*/
virtual void destroy(void) = 0;
sslbase_conf* get_ssl_conf(void) const {
return ssl_conf_;
}
/**
* connect the remote mqtt server, when connected with the server,
* the callback on_connect() will be called
* @param addr {const char*} the mqtt server's addr with the format
* ip|port, or domain|port
* @param conn_timeout {int} the timeout for connecting to the server
* @param rw_timeout {int} the timeout read/write with the server
* @return bool {bool} if return false, you should call destroy() to
* delete the subclass object
*/
bool open(const char* addr, int conn_timeout, int rw_timeout);
/**
* called when connect or accept one connection
* @param conn {aio_socket_stream*}
* @return bool {bool} if return false, you should call destroy() to
* delete the subclass object
*/
bool open(aio_socket_stream* conn);
/**
* close the connection with the mqtt server async
*/
void close(void);
/**
* get the connection with the mqtt server
* @return {aio_socket_stream*} return NULL if not connected
*/
aio_socket_stream* get_conn(void) const {
return conn_;
}
/**
* set the remote host name to specify the SSL SNI for SSL handshake
* @param host {const char*} the host name
*/
void set_host(const char* host);
public:
bool send(mqtt_message& message);
protected:
// the subclass should be created dynamically
virtual ~mqtt_aclient(void);
// @override dummy
bool open_callback(void) { return true; }
// @override
bool timeout_callback(void);
// @override
void close_callback(void);
// @override
bool read_wakeup(void);
// @override
bool read_callback(char* data, int len);
protected:
// wait for reading data from peer
bool message_await(void);
virtual void on_ns_failed(void) {}
virtual void on_connect_timeout(void) {}
virtual void on_connect_failed(void) {}
virtual bool on_read_timeout(void) { return false; }
virtual void on_disconnect(void) {};
virtual bool on_open(void) = 0;
virtual bool on_header(const mqtt_header&) { return true; };
virtual bool on_body(const mqtt_message&) = 0;
private:
aio_handle& handle_;
sslbase_conf* ssl_conf_;
aio_socket_stream* conn_;
int conn_timeout_;
int rw_timeout_;
string host_;
struct sockaddr_storage ns_addr_;
struct sockaddr_storage serv_addr_;
mqtt_header* header_;
mqtt_message* body_;
bool open_done(void);
bool handle_ssl_handshake(void);
int handle_data(char* data, int len);
bool handle_connect(const ACL_ASTREAM_CTX* ctx);
static int connect_callback(const ACL_ASTREAM_CTX* ctx);
};
} // namespace acl