Adding multicast ...

This commit is contained in:
shuxin   zheng 2023-09-20 15:15:11 +08:00
parent e5938c9f87
commit fff2bed41b
7 changed files with 263 additions and 21 deletions

View File

@ -16,6 +16,7 @@ extern "C" {
#define ACL_INET_FLAG_REUSEPORT (1 << 1)
#define ACL_INET_FLAG_FASTOPEN (1 << 2)
#define ACL_INET_FLAG_EXCLUSIVE (1 << 3)
#define ACL_INET_FLAG_MULTILOOP_ON (1 << 4)
/**
*

View File

@ -117,6 +117,43 @@ ACL_API ACL_VSTREAM *acl_vstream_bind(const char *addr, int rw_timeout, unsigned
*/
ACL_API void acl_vstream_set_udp_io(ACL_VSTREAM *stream);
/**
*
* @param addr {const char*} IP地址
* @param iface {const char*} IP
* @param port {int} Port
* @param timeout {int} IO
* @param flag {unsigned}
* @return {ACL_VSTREAM*} NULL
*/
ACL_API ACL_VSTREAM *acl_vstream_bind_multicast(const char *addr,
const char *iface, int port, int timeout, unsigned flag);
/**
* TTL (1--255)
* @param sock {ACL_SOCKET}
* @param ttl {int}
* @return {int} -1 0
*/
ACL_API int acl_multicast_set_ttl(ACL_SOCKET sock, int ttl);
/**
*
* @param sock {ACL_SOCKET}
* @param addr {const char*}
* @return {int} 0 -1
*/
ACL_API int acl_multicast_set_if(ACL_SOCKET sock, const char *addr);
/**
* API
* @param sock {ACL_SOCKET}
* @param addr {const char*} IP
* @param iface {const char*} IP
* @return {int} 0 -1
*/
ACL_API int acl_multicast_drop(ACL_SOCKET sock, const char *addr, const char *iface);
#ifdef __cplusplus
}
#endif

View File

@ -360,3 +360,167 @@ void acl_vstream_set_udp_io(ACL_VSTREAM *stream)
ACL_VSTREAM_CTL_CONTEXT, stream->context,
ACL_VSTREAM_CTL_END);
}
/****************************************************************************/
static int join_multicast(ACL_SOCKET sock, const char *addr,
const char *iface, int enable_loop)
{
struct ip_mreq mreq;
memset(&mreq, 0, sizeof(mreq));
mreq.imr_multiaddr.s_addr = inet_addr(addr);
mreq.imr_interface.s_addr = strcmp(iface, "0.0.0.0") == 0 ?
htonl(INADDR_ANY) : inet_addr(iface);
if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
(const char*) &mreq, sizeof(mreq)) == -1) {
acl_msg_error("%s(%d), %s: setsockopt %d error %s", __FILE__,
__LINE__, __FUNCTION__, (int) sock, acl_last_serror());
return -1;
}
int on = enable_loop;
if (setsockopt(sock,IPPROTO_IP, IP_MULTICAST_LOOP,
(const char*) &on, sizeof(on)) == -1) {
acl_msg_error("%s(%d), %s: setsockopt IP_MULTICAST_LOOP error %s",
__FILE__, __LINE__, __FUNCTION__, acl_last_serror());
return -1;
}
return 0;
}
static int multicast_read(ACL_SOCKET fd, void *buf, size_t size,
int timeout acl_unused, ACL_VSTREAM *stream, void *arg acl_unused)
{
return udp_read(fd, buf, size, timeout, stream, arg);
}
static int multicast_write(ACL_SOCKET fd, const void *buf, size_t size,
int timeout acl_unused, ACL_VSTREAM *stream, void *arg acl_unused)
{
int ret;
if (stream->sa_peer_len == 0) {
acl_msg_error("%s, %s(%d): peer addr null",
__FUNCTION__, __FILE__, __LINE__);
return -1;
}
/* 与 UDP 单发不同之处使用了 local 保存的地址做为目标地址,因为其存放着组播地址 */
ret = (int) sendto(fd, buf, (int) size, 0,
(struct sockaddr*) stream->sa_local,
(int) stream->sa_local_len);
return ret;
}
ACL_VSTREAM *acl_vstream_bind_multicast(const char *addr, const char *iface,
int port, int timeout, unsigned flag)
{
ACL_SOCKET sock;
ACL_VSTREAM *stream;
char addrbuf[256];
int enable_loop = (flag & ACL_INET_FLAG_MULTILOOP_ON) ? 1 : 0;
#if defined(_WIN32) || defined(_WIN64)
acl_snprintf(addrbuf, sizeof(addrbuf), "%s|%d", iface, port);
#else
acl_snprintf(addrbuf, sizeof(addrbuf), "%s|%d", addr, port);
#endif
sock = acl_udp_bind(addrbuf, flag);
if (sock == ACL_SOCKET_INVALID) {
acl_msg_error("%s(%d), %s: bind addr %s error %s", __FILE__,
__LINE__, __FUNCTION__, addrbuf, acl_last_serror());
return NULL;
}
if (join_multicast(sock, addr, iface, enable_loop) == -1) {
acl_socket_close(sock);
return NULL;
}
stream = acl_vstream_fdopen(sock, O_RDWR, 4096, -1, ACL_VSTREAM_TYPE_SOCK);
stream->rw_timeout = timeout;
/* 设置本地绑定地址,该地址同时用做外发地址 */
acl_snprintf(addrbuf, sizeof(addrbuf), "%s|%d", addr, port);
acl_vstream_set_local(stream, addrbuf);
acl_vstream_ctl(stream,
ACL_VSTREAM_CTL_READ_FN, multicast_read,
ACL_VSTREAM_CTL_WRITE_FN, multicast_write,
ACL_VSTREAM_CTL_CONTEXT, stream->context,
ACL_VSTREAM_CTL_END);
return stream;
}
int acl_multicast_set_ttl(ACL_SOCKET sock, int ttl)
{
if (ttl < 0) {
acl_msg_error("%s: invalid ttl=%d", __FUNCTION__, ttl);
return -1;
}
if (sock == ACL_SOCKET_INVALID) {
acl_msg_error("%s: invalid socket", __FUNCTION__ );
return -1;
}
if (setsockopt(sock, IPPROTO_IP,IP_TTL,
(const char*) &ttl, sizeof(ttl)) == -1) {
acl_msg_error("%s: setsockopt IP_TTL error %s, sock=%d",
__FUNCTION__, acl_last_serror(), sock);
return -1;
}
return 0;
}
int acl_multicast_set_if(ACL_SOCKET sock, const char *addr)
{
struct in_addr in;
if (addr == NULL || *addr == 0) {
acl_msg_error("%s: addr null", __FUNCTION__);
return -1;
}
if (sock == ACL_SOCKET_INVALID) {
acl_msg_error("%s: invalid socket", __FUNCTION__);
return -1;
}
in.s_addr = inet_addr(addr);
if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
(const char*) &in, sizeof(in)) < 0) {
acl_msg_error("%s: set IP_MULTICAST_IF error=%s, addr=%s",
__FUNCTION__, acl_last_serror(), addr);
return -1;
}
return 0;
}
int acl_multicast_drop(ACL_SOCKET sock, const char *addr, const char *iface)
{
if (sock == ACL_SOCKET_INVALID) {
acl_msg_error("%s: invalid socket", __FUNCTION__);
return -1;
}
struct ip_mreq ipmr;
memset(&ipmr, 0, sizeof(ipmr));
ipmr.imr_interface.s_addr = strcmp(iface, "0.0.0.0") == 0 ?
htonl(INADDR_ANY) : inet_addr(iface);
ipmr.imr_multiaddr.s_addr = inet_addr(addr);
if (setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,
(const char*)&ipmr, sizeof(ipmr)) == -1) {
acl_msg_error("%s: set IP_DROP_MEMBERSHIP error=%s",
__FUNCTION__, acl_last_serror());
return -1;
}
return 0;
}

View File

@ -3240,7 +3240,7 @@ void acl_vstream_set_local(ACL_VSTREAM *fp, const char *addr)
acl_myfree(fp->sa_local);
}
fp->sa_local = set_sock_addr(addr, &fp->sa_local_size);
fp->sa_local = set_sock_addr(addr, &fp->sa_local_size);
if (fp->sa_local) {
if (fp->sa_local->sa_family == AF_INET) {
fp->type |= ACL_VSTREAM_TYPE_INET4;

View File

@ -20,6 +20,7 @@ enum {
OPEN_FLAG_REUSEPORT = (1 << 1), // 端口复用,要求 Linux3.0 以上
OPEN_FLAG_FASTOPEN = (1 << 2), // 是否启用 Fast open实验阶段
OPEN_FLAG_EXCLUSIVE = (1 << 3), // 是否禁止复用地址
OPEN_FLAG_MULTICAST_LOOP = (1 << 4), // 是否允许组播时接收回路包
};
/**

View File

@ -60,14 +60,26 @@ public:
time_unit_t unit = time_unit_s);
/**
* UDP UDP
* UDP UDP
* @param addr {const char*} ip:port
* UNIX Linux Linux abstract unix socket
* @param rw_timeout {int} ()
* @param flag {unsigned} server_socket.hpp
* @param flags {unsigned} server_socket.hpp
* @return {bool}
*/
bool bind_udp(const char* addr, int rw_timeout = -1, unsigned flag = 0);
bool bind_udp(const char* addr, int rw_timeout = -1, unsigned flags = 0);
/**
*
* @param addr {const char*} IP
* @param iface {const char*} IP
* @param port {int}
* @param rw_timeout {int} IO
* @param flags {unsigned} server_socket.hpp
* @return {bool}
*/
bool bind_multicast(const char* addr, const char* iface, int port,
int rw_timeout = -1, unsigned flags = 0);
/**
*

View File

@ -59,32 +59,59 @@ bool socket_stream::open(ACL_VSTREAM* vstream, bool udp_mode /* = false */)
return true;
}
// ½« C++ API µÄ flags ת±äΪ C API µÄ flags.
static unsigned to_oflags(unsigned flags)
{
unsigned oflags = 0;
if (flags & OPEN_FLAG_NONBLOCK) {
oflags |= ACL_NON_BLOCKING;
} else {
oflags |= ACL_BLOCKING;
}
if (flags & OPEN_FLAG_REUSEPORT) {
oflags |= ACL_INET_FLAG_REUSEPORT;
}
// It's only useful for TCP.
if (flags & OPEN_FLAG_FASTOPEN) {
oflags |= ACL_INET_FLAG_FASTOPEN;
}
if (flags & OPEN_FLAG_EXCLUSIVE) {
oflags |= ACL_INET_FLAG_EXCLUSIVE;
}
return oflags;
}
bool socket_stream::bind_udp(const char* addr, int rw_timeout /* = 0 */,
unsigned flag /* = 0 */)
unsigned flags /* = 0 */)
{
if (stream_) {
acl_vstream_close(stream_);
}
unsigned oflag = 0;
unsigned oflags = to_oflags(flags);
stream_ = acl_vstream_bind(addr, rw_timeout, oflags);
if (stream_ == NULL) {
return false;
}
eof_ = false;
opened_ = true;
return true;
}
if (flag & OPEN_FLAG_NONBLOCK) {
oflag |= ACL_NON_BLOCKING;
} else {
oflag |= ACL_BLOCKING;
bool socket_stream::bind_multicast(const char *addr, const char *iface,
int port, int rw_timeout, unsigned int flags)
{
if (stream_) {
acl_vstream_close(stream_);
}
if (flag & OPEN_FLAG_REUSEPORT) {
oflag |= ACL_INET_FLAG_REUSEPORT;
}
if (flag & OPEN_FLAG_FASTOPEN) {
oflag |= ACL_INET_FLAG_FASTOPEN;
}
if (flag & OPEN_FLAG_EXCLUSIVE) {
oflag |= ACL_INET_FLAG_EXCLUSIVE;
}
stream_ = acl_vstream_bind(addr, rw_timeout, oflag);
unsigned oflags = to_oflags(flags);
stream_ = acl_vstream_bind_multicast(addr, iface, port,
rw_timeout, oflags);
if (stream_ == NULL) {
return false;
}