aio connect support resolving domain name async.

This commit is contained in:
zhengshuxin 2019-05-24 16:59:47 +08:00
parent 56004197a3
commit ec6462f3e6
17 changed files with 532 additions and 396 deletions

View File

@ -1,5 +1,10 @@
修改历史列表:
651) 2019.5.24
651.1) feature: acl_aio.c/acl_aio_connect.c 支持异步解析远程服务器域名并进行
异步连接解析后的 IP 地址及端口
651.2) samples/aio/client2: 测试异步解析域名及异步连接服务器的例子
650) 2019.5.18
650.1) compile: 增加宏编译选项 ACL_CLIENT_ONLY当将 acl 库用于客户端模式时可以有效地
减少编译库的大小

View File

@ -206,6 +206,15 @@ ACL_API ACL_AIO *acl_aio_create2(int event_mode, unsigned int nMsg);
*/
ACL_API ACL_AIO *acl_aio_create3(ACL_EVENT *event);
/**
* DNS DNS
*
* @param aio {ACL_AIO*}
* @param dns_list {const char*} DNS ip1:port,ip2:port...
* @param timeout {int}
*/
ACL_API void acl_aio_set_dns(ACL_AIO *aio, const char *dns_list, int timeout);
/**
* aio->event
* @param aio {ACL_AIO*}
@ -756,12 +765,26 @@ ACL_API void acl_aio_listen(ACL_ASTREAM *astream);
/**
* , .
* @param aio {ACL_AIO*}
* @param saddr {const char*} , : ip:port, : 192.168.0.1:80
* @param addr {const char*} , : ip:port, : 192.168.0.1:80
* @param timeout {int}
* @return {ACL_ASTREAM*}
*/
ACL_API ACL_ASTREAM *acl_aio_connect(ACL_AIO *aio, const char *addr, int timeout);
/**
* acl_aio_connect
* 使 acl_aio_set_dns
* @param aio {ACL_AIO*}
* @param addr {const char*} domain:portwww.sina.com:80
* @param timeout {int}
* @param callback {ACL_AIO_CONNECT_FN}
* @param context {void*} callback
* @return {int} 0 < 0
* ACL_AIO acl_aio_set_dns
*/
ACL_API int acl_aio_connect_addr(ACL_AIO *aio, const char *addr, int timeout,
ACL_AIO_CONNECT_FN callback, void *context);
/*---------------------------- 其它通用异步操作接口 --------------------------*/
/**

View File

@ -1,170 +0,0 @@
#include "stdafx.h"
#include "service_main.h"
static ACL_AIO *__aio = NULL;
static ACL_HTABLE *__cache = NULL;
void ns_cache_init(ACL_AIO *aio)
{
const char *myname = "ns_cache_init";
if (__cache != NULL)
acl_msg_fatal("%s(%d): __cache != NULL", myname, __LINE__);
__aio = aio;
__cache = acl_htable_create(10000, 0);
}
static void ns_cache_timeout_fn(int event_type acl_unused,
ACL_EVENT *event acl_unused, void *context)
{
NS_CACHE_ENTRY *entry = (NS_CACHE_ENTRY*) context;
acl_htable_delete(__cache, entry->domain, NULL);
ns_cache_entry_free(entry);
}
void ns_cache_add(NS_CACHE_ENTRY *entry)
{
const char *myname = "ns_cache_add";
NS_CACHE_ENTRY *old_entry;
if (__cache == NULL)
acl_msg_fatal("%s(%d): ns_cache_init not called",
myname, __LINE__);
old_entry = (NS_CACHE_ENTRY*) acl_htable_find(__cache, entry->domain);
if (old_entry == NULL) {
acl_htable_enter(__cache, entry->domain, (char*) entry);
if (entry->ttl > 0)
acl_aio_request_timer(__aio, ns_cache_timeout_fn,
entry, entry->ttl, 0);
return;
}
if (entry->ttl > 0) {
acl_aio_cancel_timer(__aio, ns_cache_timeout_fn, old_entry);
acl_aio_request_timer(__aio, ns_cache_timeout_fn,
entry, entry->ttl, 0);
}
if (entry == old_entry)
return;
acl_htable_delete(__cache, old_entry->domain, NULL);
ns_cache_entry_free(old_entry);
acl_htable_enter(__cache, entry->domain, (char*) entry);
}
NS_CACHE_ENTRY *ns_cache_find(const char *domain)
{
const char *myname = "ns_cache_find";
NS_CACHE_ENTRY *entry;
char key[MAX_DOMAIN_LEN];
if (__cache == NULL)
acl_msg_fatal("%s(%d): ns_cache_init not called",
myname, __LINE__);
ACL_SAFE_STRNCPY(key, domain, sizeof(key));
acl_lowercase(key);
entry = (NS_CACHE_ENTRY*) acl_htable_find(__cache, key);
return (entry);
}
int ns_cache_delete(const char *domain)
{
const char *myname = "ns_cache_delete";
char key[MAX_DOMAIN_LEN];
if (__cache == NULL)
acl_msg_fatal("%s(%d): ns_cache_init not called",
myname, __LINE__);
ACL_SAFE_STRNCPY(key, domain, sizeof(key));
acl_lowercase(key);
return (acl_htable_delete(__cache, key,
(void (*)(void*))ns_cache_entry_free));
}
void ns_cache_entry_free(NS_CACHE_ENTRY *entry)
{
if (entry->ip_list)
acl_argv_free(entry->ip_list);
acl_myfree(entry);
}
NS_CACHE_ENTRY *ns_cache_entry_new(const char *domain, const ACL_ARGV *ip_list,
const char *cache_buf, int dlen, int ttl)
{
const char *myname = "ns_cache_entry_new";
NS_CACHE_ENTRY *entry;
int i;
if (ip_list == NULL && (cache_buf == NULL || dlen <= 0)) {
acl_msg_error("%s(%d): invalid input args", myname, __LINE__);
return (NULL);
}
if (ip_list != NULL && (cache_buf != NULL && dlen > 0)) {
acl_msg_error("%s(%d): ip_list != NULL and cache_buf != NULL",
myname, __LINE__);
return (NULL);
}
entry = (NS_CACHE_ENTRY*) acl_mycalloc(1, sizeof(NS_CACHE_ENTRY));
entry->idx = 0;
entry->ttl = ttl;
ACL_SAFE_STRNCPY(entry->domain, domain, sizeof(entry->domain));
acl_lowercase(entry->domain);
if (ip_list) {
entry->ip_list = acl_argv_alloc(10);
for (i = 0; i < ip_list->argc; i++) {
acl_argv_add(entry->ip_list, ip_list->argv[i], NULL);
}
entry->cache_dlen = 0;
} else if (cache_buf) {
memcpy(entry->cache_buf, cache_buf, dlen);
entry->cache_dlen = dlen;
}
return (entry);
}
static char __unknown_domain[MAX_DOMAIN_LEN];
static NS_CACHE_ENTRY *__unknown_cache_entry;
void ns_cache_add_unknown(const char *domain, const ACL_ARGV *ip_list)
{
__unknown_cache_entry = ns_cache_entry_new(domain, ip_list, NULL, 0, 0);
ACL_SAFE_STRNCPY(__unknown_domain, domain, sizeof(__unknown_domain));
}
NS_CACHE_ENTRY *ns_cache_unknown(void)
{
const char *myname = "ns_cache_unknown";
if (__unknown_cache_entry == NULL)
acl_msg_fatal("%s(%d): __unknown_cache_entry null",
myname, __LINE__);
return (__unknown_cache_entry);
}
ACL_ARGV *ns_cache_ip_list(NS_CACHE_ENTRY *cache_entry)
{
ACL_ARGV *argv = acl_argv_alloc(10);
ACL_ARGV *ip_list = cache_entry->ip_list;
int i, n;
n = ip_list->argc;
if (cache_entry->idx >= n)
cache_entry->idx = 0;
i = cache_entry->idx++;
while (n-- > 0) {
acl_argv_add(argv, ip_list->argv[i++], NULL);
if (i >= ip_list->argc)
i = 0;
}
return (argv);
}

View File

@ -20,14 +20,17 @@ static ACL_CFG_STR_TABLE __conf_str_tab[] = {
};
int var_cfg_hijack_unknown; // 是否需要对不存在域名进行劫持
int var_cfg_dns_neighbor_port;// 上游 DNS 服务器 PORT
static ACL_CFG_BOOL_TABLE __conf_bool_tab[] = {
{ "hijack_unknown", 0, &var_cfg_hijack_unknown },
{ 0, 0, 0 },
};
int var_cfg_server_port; // 本机绑定的端口号
int var_cfg_dns_neighbor_port;// 上游 DNS 服务器 PORT
static ACL_CFG_INT_TABLE __conf_int_tab[] = {
{ "server_port", 53, &var_cfg_server_port, 0, 0 },
{ "dns_neighbor_port", 53, &var_cfg_dns_neighbor_port, 0, 0 },
{ 0, 0, 0, 0, 0 }
};

View File

@ -17,6 +17,7 @@ extern char *var_cfg_domain_unknown;
extern char *var_cfg_dns_name;
extern char *var_cfg_dns_ip;
extern char *var_cfg_dns_neighbor_ip;
extern int var_cfg_server_port;
extern int var_cfg_hijack_unknown;
extern int var_cfg_dns_neighbor_port;

View File

@ -1,5 +1,6 @@
service server {
server_port = 53
# 本地域名映射关系表格式域名1:ip11:ip12..., 域名2:ip21:ip22...
# domain_map = mailz.hexun.com:127.0.0.1:211.157.110.179:211.157.110.185, \
# www1.sina.com:192.168.0.1:192.168.0.2:192.168.0.3, \
@ -11,6 +12,7 @@ service server {
263es.com:192.168.198.47, \
i.163.com:127.0.0.1, \
ex.qq.com:127.0.0.1, \
test.zsx.com:127.0.0.1, \
#all:192.168.198.47
# 未知域名返回的缺省地址列表
@ -25,7 +27,8 @@ service server {
# 上游 DNS 服务器的 IP
# dns_neighbor_ip = 211.157.131.7
dns_neighbor_ip = 192.168.198.47
# dns_neighbor_ip = 192.168.198.47
dns_neighbor_ip = 8.8.8.8
# 上游 DNS 服务器的 PORT
dns_neighbor_port = 53
}

View File

@ -7,16 +7,21 @@
static SERVICE *__service;
static char *__conf_file = "dgate.cf";
static const char *__conf_file = "dgate.cf";
static void init(void)
{
acl_socket_init();
conf_load(__conf_file);
printf("local port: %d\r\n", var_cfg_server_port);
//acl_msg_open("dgate.log", "dgate");
acl_msg_stdout_enable(1);
__service = service_create("0.0.0.0", 53, var_cfg_dns_neighbor_ip,
var_cfg_dns_neighbor_port);
__service = service_create("0.0.0.0", (short) var_cfg_server_port,
var_cfg_dns_neighbor_ip, var_cfg_dns_neighbor_port);
printf("neighbor dns_ip: %s, dns_port: %d\r\n",
var_cfg_dns_neighbor_ip, var_cfg_dns_neighbor_port);
}
@ -39,7 +44,7 @@ int main(int argc, char* argv[])
switch (ch) {
case 'h':
usage(argv[0]);
exit (0);
return 0;
case 'f':
__conf_file = acl_mystrdup(optarg);
break;

View File

@ -34,7 +34,7 @@ static int rfc1035HeaderPack(char *buf, size_t sz, rfc1035_message * hdr)
unsigned short t;
if (sz < 12)
acl_msg_fatal("%s: sz(%d) < 12", myname, sz);
acl_msg_fatal("%s: sz(%d) < 12", myname, (int) sz);
s = htons(hdr->id);
memcpy(buf + off, &s, sizeof(s));
@ -89,7 +89,7 @@ static int rfc1035LabelPack(char *buf, size_t sz, const char *label)
if (len > RFC1035_MAXLABELSZ)
len = RFC1035_MAXLABELSZ;
if (sz < len + 1)
acl_msg_fatal("%s: sz(%d) < len(%d) + 1", myname, sz, len);
acl_msg_fatal("%s: sz(%d) < len(%d) + 1", myname, (int) sz, (int) len);
*(buf + off) = (char) len;
off++;
memcpy(buf + off, label, len);
@ -122,7 +122,7 @@ static int rfc1035NamePack(char *buf, size_t sz, const char *name)
acl_myfree(copy);
off += rfc1035LabelPack(buf + off, sz - off, NULL);
if (off > (int) sz)
acl_msg_fatal("%s: off(%d) > sz(%d)", myname, off, sz);
acl_msg_fatal("%s: off(%d) > sz(%d)", myname, off, (int) sz);
return off;
}
@ -147,7 +147,7 @@ static int rfc1035QuestionPack(char *buf, size_t sz, const char *name,
memcpy(buf + off, &s, sizeof(s));
off += sizeof(s);
if (off > (int) sz)
acl_msg_fatal("%s: off(%d) > sz(%d)", myname, off, sz);
acl_msg_fatal("%s: off(%d) > sz(%d)", myname, off, (int) sz);
return off;
}
@ -239,10 +239,10 @@ static int rfc1035NameUnpack(const char *buf, size_t sz, int *off,
size_t len;
if (ns <= 0)
acl_msg_fatal("%s: ns(%d) <= 0", myname, ns);
acl_msg_fatal("%s: ns(%d) <= 0", myname, (int) ns);
do {
if (*off >= (int) sz)
acl_msg_fatal("%s: *off(%d) >= sz(%d)", myname, *off, sz);
acl_msg_fatal("%s: *off(%d) >= sz(%d)", myname, *off, (int) sz);
c = *(buf + (*off));
if (c > 191) {
/* blasted compression */
@ -290,7 +290,7 @@ static int rfc1035NameUnpack(const char *buf, size_t sz, int *off,
*name = '\0';
/* make sure we didn't allow someone to overflow the name buffer */
if (no > (int) ns)
acl_msg_fatal("%s: no(%d) > ns(%d)", myname, no, ns);
acl_msg_fatal("%s: no(%d) > ns(%d)", myname, no, (int) ns);
return 0;
}
@ -376,7 +376,7 @@ static int rfc1035RRUnpack(const char *buf, size_t sz, int *off, rfc1035_rr * RR
}
(*off) += rdlength;
if (*off > (int) sz)
acl_msg_fatal("%s: *off(%d) > sz(%d)", myname, *off, sz);
acl_msg_fatal("%s: *off(%d) > sz(%d)", myname, *off, (int) sz);
return 0;
}
@ -676,7 +676,7 @@ static int rfc1035RRPack(const rfc1035_rr *RR, char *buf, size_t sz)
}
if ((unsigned) off > sz)
acl_msg_fatal("%s: off(%d) > sz(%d)", myname, off, sz);
acl_msg_fatal("%s: off(%d) > sz(%d)", myname, (int) off, (int) sz);
return (off);
}
@ -781,7 +781,7 @@ ssize_t rfc1035BuildAQuery(const char *hostname, char *buf, size_t sz,
}
if (offset > sz)
acl_msg_fatal("%s: offset(%d) > sz(%d)", myname, offset, sz);
acl_msg_fatal("%s: offset(%d) > sz(%d)", myname, (int) offset, (int) sz);
return (ssize_t) offset;
}
@ -822,7 +822,7 @@ ssize_t rfc1035BuildPTRQuery(const struct in_addr addr, char *buf,
ACL_SAFE_STRNCPY(query->name, rev, sizeof(query->name));
}
if (offset > sz)
acl_msg_fatal("%s: offset(%d) > sz(%d)", myname, offset, sz);
acl_msg_fatal("%s: offset(%d) > sz(%d)", myname, (int) offset, (int) sz);
return (ssize_t) offset;
}

View File

@ -3,7 +3,7 @@
#include "configure.h"
#include "service_main.h"
#define UDP_READ_NONE 1
#define UDP_READ_NONE -2
typedef struct UDP_CTX
{
@ -37,7 +37,7 @@ static int udp_read(ACL_SOCKET fd, void *buf, size_t size,
(struct sockaddr*) from_addr, from_addr_len);
#endif
return (ret);
return ret;
}
static int udp_write(ACL_SOCKET fd, const void *buf, size_t size,
@ -53,7 +53,7 @@ static int udp_write(ACL_SOCKET fd, const void *buf, size_t size,
(struct sockaddr*) to_addr, to_addr_len);
#endif
return (ret);
return ret;
}
static int request_read_fn(ACL_SOCKET fd, void *buf, size_t size,
@ -63,15 +63,17 @@ static int request_read_fn(ACL_SOCKET fd, void *buf, size_t size,
UDP_CTX *ctx = (UDP_CTX*) arg;
int ret;
fp->read_ready = 0;
ctx->client_addr_len = sizeof(ctx->client_addr);
ret = udp_read(fd, buf, size, &ctx->client_addr,
&ctx->client_addr_len);
if (ret < 0) {
acl_msg_error("%s(%d): recvfrom error(%s)",
myname, __LINE__, acl_last_serror());
return (UDP_READ_NONE);
acl_msg_error("%s(%d): recvfrom error(%s), ret=%d",
myname, __LINE__, acl_last_serror(), ret);
return UDP_READ_NONE;
}
return (ret);
return ret;
}
static int respond_read_fn(ACL_SOCKET fd, void *buf, size_t size,
@ -82,6 +84,8 @@ static int respond_read_fn(ACL_SOCKET fd, void *buf, size_t size,
struct sockaddr_in server_addr;
int ret, addr_len = sizeof(server_addr);
fp->read_ready = 0;
ret = udp_read(fd, buf, size, &server_addr, &addr_len);
if (ret < 0) {
acl_msg_error("%s(%d): recvfrom error(%s)",
@ -93,9 +97,9 @@ static int respond_read_fn(ACL_SOCKET fd, void *buf, size_t size,
acl_inet_ntoa(server_addr.sin_addr, ip, sizeof(ip));
acl_msg_warn("%s(%d): invalid addr(%s) from server",
myname, __LINE__, ip);
return (UDP_READ_NONE);
return UDP_READ_NONE;
}
return (ret);
return ret;
}
static int udp_write_fn(ACL_SOCKET fd acl_unused, const void *buf acl_unused,
@ -105,7 +109,7 @@ static int udp_write_fn(ACL_SOCKET fd acl_unused, const void *buf acl_unused,
const char *myname = "udp_write_fn";
acl_msg_fatal("%s(%d): not support!", myname, __LINE__);
return (-1);
return -1;
}
#if 0
@ -133,7 +137,7 @@ static ACL_VSTREAM *stream_udp_open(void)
acl_msg_fatal("%s(%d): socket create error", myname, __LINE__);
stream = acl_vstream_fdopen(fd, O_RDWR, 1024, 0, ACL_VSTREAM_TYPE_SOCK);
return (stream);
return stream;
}
static ACL_VSTREAM *stream_udp_bind(struct sockaddr_in addr)
@ -147,7 +151,7 @@ static ACL_VSTREAM *stream_udp_bind(struct sockaddr_in addr)
if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) != 0)
acl_msg_fatal("%s(%d): can't bind", myname, __LINE__);
return (stream);
return stream;
}
static ACL_ARGV *build_ip_list(DOMAIN_MAP *domain_map)
@ -168,7 +172,7 @@ static ACL_ARGV *build_ip_list(DOMAIN_MAP *domain_map)
i = 0;
}
return (argv);
return argv;
}
static void reply_client_local(ACL_SOCKET fd, DOMAIN_MAP *domain_map,
@ -272,7 +276,7 @@ static void reply_client(ACL_SOCKET fd, char *buf, int dlen,
}
}
static int read_respond_callback(ACL_ASTREAM *astream, void *context,
static int read_respond_callback(ACL_ASTREAM *astream acl_unused, void *context,
char *data, int len)
{
const char *myname = "read_respond_callback";
@ -285,8 +289,7 @@ static int read_respond_callback(ACL_ASTREAM *astream, void *context,
char respond_buf[MAX_BUF];
if (len == UDP_READ_NONE) {
acl_aio_read(astream);
return (0);
return 0;
}
memcpy(&id, data, 2);
@ -299,8 +302,7 @@ static int read_respond_callback(ACL_ASTREAM *astream, void *context,
create_key(key, sizeof(key), SERVICE_CTX_UDP_REQUEST, id);
acl_msg_warn("%s(%d): not found id(%s)",
myname, __LINE__, key);
acl_aio_read(astream);
return (0);
return 0;
}
len = len > MAX_BUF ? MAX_BUF : len;
@ -310,8 +312,7 @@ static int read_respond_callback(ACL_ASTREAM *astream, void *context,
len, service_ctx);
service_ctx_free(service_ctx);
acl_aio_read(astream);
return (0);
return 0;
}
static const char* get_query_type(int n)
@ -382,8 +383,7 @@ static int read_request_callback(ACL_ASTREAM *astream, void *context,
int ret;
if (len == UDP_READ_NONE) {
acl_aio_read(astream);
return (0);
return 0;
}
service_ctx = service_ctx_new(ctx->service, astream,
@ -422,7 +422,7 @@ static int read_request_callback(ACL_ASTREAM *astream, void *context,
ACL_VSTREAM_SOCK(client_stream),
domain_map,
service_ctx);
return (0);
return 0;
}
}
@ -441,8 +441,7 @@ static int read_request_callback(ACL_ASTREAM *astream, void *context,
service_ctx->request_buf, service_ctx->request_len,
&ctx->remote_addr, sizeof(ctx->remote_addr));
acl_aio_read(astream);
return (0);
return 0;
}
void service_udp_init(SERVICE *service, const char *local_ip,

View File

@ -6,7 +6,7 @@
static int __nresult = 0;
static void callback(ACL_DNS_DB *dns_db, void *ctx, int errnum)
static void dns_lookup_callback(ACL_DNS_DB *dns_db, void *ctx, int errnum)
{
ACL_ITER iter;
char *domain = (char*) ctx;
@ -18,7 +18,7 @@ static void callback(ACL_DNS_DB *dns_db, void *ctx, int errnum)
return;
}
printf(">>>lookup result, domain(%s)\n", dns_db->name);
printf(">>>%s: lookup result, domain(%s)\n", __FUNCTION__, dns_db->name);
acl_foreach(iter, dns_db) {
const ACL_HOST_INFO *info;
@ -32,11 +32,12 @@ static void callback(ACL_DNS_DB *dns_db, void *ctx, int errnum)
static void dns_lookup(char *domains, const char *dns_ips, int dns_port)
{
ACL_AIO *aio = acl_aio_create(ACL_EVENT_SELECT);
ACL_DNS *dns = acl_dns_create(aio, 5);
ACL_ARGV *argv = acl_argv_split(domains, ",:;|");
ACL_AIO *aio = acl_aio_create(ACL_EVENT_SELECT);
int timeout = 5;
ACL_DNS *dns = acl_dns_create(aio, timeout);
ACL_ARGV *argv = acl_argv_split(domains, ",:;|");
ACL_ARGV *ip_argv = acl_argv_split(dns_ips, ",:;|");
ACL_ITER iter;
ACL_ITER iter;
/* 打开DNS缓存功能 */
acl_dns_open_cache(dns, 100);
@ -44,35 +45,37 @@ static void dns_lookup(char *domains, const char *dns_ips, int dns_port)
/* 添加DNS服务器地址 */
acl_foreach(iter, ip_argv) {
char *ip = (char*) iter.data;
printf("add one dns, ip=%s, port=%d\r\n", ip, dns_port);
acl_dns_add_dns(dns, ip, dns_port, 24);
}
acl_argv_free(ip_argv);
/* 校验DNS地址有效性 */
/* 设置校验DNS地址有效性标记位 */
acl_dns_check_dns_ip(dns);
/* 查询域名所对应的IP地址 */
acl_foreach(iter, argv) {
char *domain = (char*) iter.data;
printf(">>>call dns lookup for: %s\n", domain);
acl_dns_lookup(dns, domain, callback, domain);
acl_dns_lookup(dns, domain, dns_lookup_callback, domain);
}
/* 进入事件循环中 */
while (__nresult < iter.size) {
acl_aio_loop(aio);
}
printf("---------------------------------------------------\n");
printf(">>>Dns cache result search\n\n");
printf(">>>DNS cache result search\n\n");
/* 查询结果清零 */
__nresult = 0;
/* 通过缓存查询域名所对应的IP地址 */
acl_foreach(iter, argv) {
char *domain = (char*) iter.data;
printf(">>>call dns lookup for: %s\n", domain);
acl_dns_lookup(dns, domain, callback, domain);
printf(">>>call dns lookup from cache for: %s\n", domain);
acl_dns_lookup(dns, domain, dns_lookup_callback, domain);
}
while (__nresult < iter.size) {
@ -124,6 +127,7 @@ int main(int argc, char *argv[])
return (0);
}
acl_msg_stdout_enable(1);
dns_lookup(domains, dns_ips, dns_port);
#ifdef ACL_MS_WINDOWS
printf("Enter any key to exit\n");

View File

@ -32,8 +32,15 @@ void acl_aio_free(ACL_AIO *aio)
void acl_aio_free2(ACL_AIO *aio, int keep)
{
if (!keep && aio->event)
if (aio->dns) {
acl_dns_close(aio->dns);
acl_aio_check(aio);
}
if (!keep && aio->event) {
acl_event_free(aio->event);
}
acl_array_free(aio->dead_streams, NULL);
acl_myfree(aio);
}
@ -89,6 +96,44 @@ ACL_AIO *acl_aio_create3(ACL_EVENT *event)
return aio;
}
void acl_aio_set_dns(ACL_AIO *aio, const char *dns_list, int timeout)
{
ACL_ARGV *tokens;
ACL_ITER iter;
acl_assert(dns_list && *dns_list);
tokens = acl_argv_split(dns_list, ",; \t\r\n");
if (tokens == NULL) {
acl_msg_error("%s(%d), %s: invalid dns_list=%s",
__FILE__, __LINE__, __FUNCTION__, dns_list);
return;
}
if (aio->dns == NULL) {
aio->dns = acl_dns_create(aio, timeout);
acl_dns_check_dns_ip(aio->dns);
}
acl_foreach(iter, tokens) {
char *ip = (char *) iter.data;
char *ptr = strrchr(ip, '|');
int port;
if (ptr == NULL) {
ptr = strrchr(ip, ':');
}
if (ptr && *(ptr + 1) != 0) {
*ptr = 0;
port = atoi(++ptr);
} else {
port = 53;
}
acl_dns_add_dns(aio->dns, ip, port, 24);
}
acl_argv_free(tokens);
}
int acl_aio_event_mode(ACL_AIO *aio)
{
return aio->event_mode;
@ -96,28 +141,32 @@ int acl_aio_event_mode(ACL_AIO *aio)
int acl_aio_get_keep_read(ACL_AIO *aio)
{
if (aio)
if (aio) {
return aio->keep_read;
}
return 0;
}
void acl_aio_set_keep_read(ACL_AIO *aio, int onoff)
{
if (aio)
if (aio) {
aio->keep_read = onoff;
}
}
int acl_aio_get_delay_sec(ACL_AIO *aio)
{
if (aio)
if (aio) {
return (aio->delay_sec);
}
return -1;
}
int acl_aio_get_delay_usec(ACL_AIO *aio)
{
if (aio)
if (aio) {
return aio->delay_usec;
}
return -1;
}
@ -139,17 +188,17 @@ void acl_aio_set_delay_usec(ACL_AIO *aio, int delay_usec)
void acl_aio_set_check_inter(ACL_AIO *aio, int check_inter)
{
if (aio && check_inter >= 0)
if (aio && check_inter >= 0) {
acl_event_set_check_inter(aio->event, check_inter);
}
}
void acl_aio_loop(ACL_AIO *aio)
{
if (aio == NULL || aio->event == NULL)
return;
acl_event_loop(aio->event);
aio_delay_check(aio);
if (aio && aio->event) {
acl_event_loop(aio->event);
aio_delay_check(aio);
}
}
int acl_aio_last_nready(ACL_AIO *aio)
@ -163,8 +212,9 @@ int acl_aio_last_nready(ACL_AIO *aio)
ACL_EVENT *acl_aio_event(ACL_AIO *aio)
{
if (aio)
if (aio) {
return aio->event;
}
return NULL;
}
@ -180,8 +230,9 @@ acl_int64 acl_aio_request_timer(ACL_AIO *aio, ACL_EVENT_NOTIFY_TIME timer_fn,
{
const char *myname = "acl_aio_request_timer";
if (aio == NULL || aio->event == NULL || timer_fn == NULL)
if (aio == NULL || aio->event == NULL || timer_fn == NULL) {
acl_msg_fatal("%s: input invalid", myname);
}
return acl_event_request_timer(aio->event, timer_fn, context,
idle_limit, keep);
@ -191,8 +242,9 @@ acl_int64 acl_aio_cancel_timer(ACL_AIO *aio, ACL_EVENT_NOTIFY_TIME timer_fn, voi
{
const char *myname = "acl_aio_cancel_timer";
if (aio == NULL || aio->event == NULL || timer_fn == NULL)
if (aio == NULL || aio->event == NULL || timer_fn == NULL) {
acl_msg_fatal("%s: input invalid", myname);
}
return acl_event_cancel_timer(aio->event, timer_fn, context);
}
@ -202,8 +254,9 @@ void acl_aio_keep_timer(ACL_AIO *aio, ACL_EVENT_NOTIFY_TIME callback,
{
const char *myname = "acl_aio_keep_timer";
if (aio == NULL || aio->event == NULL)
if (aio == NULL || aio->event == NULL) {
acl_msg_fatal("%s: input invalid", myname);
}
acl_event_keep_timer(aio->event, callback, context, onoff);
}
@ -211,7 +264,8 @@ int acl_aio_timer_ifkeep(ACL_AIO *aio, ACL_EVENT_NOTIFY_TIME callback, void *con
{
const char *myname = "acl_aio_timer_ifkeep";
if (aio == NULL || aio->event == NULL)
if (aio == NULL || aio->event == NULL) {
acl_msg_fatal("%s: input invalid", myname);
}
return acl_event_timer_ifkeep(aio->event, callback, context);
}

View File

@ -12,6 +12,7 @@
#endif
#include "stdlib/acl_stdlib.h"
#include "net/acl_net.h"
#include "net/acl_sane_inet.h"
#include "aio/acl_aio.h"
#endif
@ -43,9 +44,10 @@ static void ConnectTimer(int event_type acl_unused,
{
ACL_ASTREAM *astream = (ACL_ASTREAM*) ctx;
if (astream->aio->event_mode != ACL_EVENT_WMSG)
if (astream->aio->event_mode != ACL_EVENT_WMSG) {
acl_msg_fatal("event_mode(%d) != ACL_EVENT_WMSG(%d)",
astream->aio->event_mode, ACL_EVENT_WMSG);
}
__connect_notify_callback(ACL_EVENT_RW_TIMEOUT, NULL, NULL, ctx);
}
#endif
@ -62,24 +64,26 @@ static void __connect_notify_callback(int event_type, ACL_EVENT *event,
/* 先判断是否是超时导致返回 */
if ((event_type & ACL_EVENT_RW_TIMEOUT) != 0) {
if (aio_timeout_callback(astream) < 0)
if (aio_timeout_callback(astream) < 0) {
acl_aio_iocp_close(astream);
else if (astream->flag & ACL_AIO_FLAG_IOCP_CLOSE)
} else if (astream->flag & ACL_AIO_FLAG_IOCP_CLOSE) {
/* 该流正处于IO延迟关闭状态因为本次写IO已经成功完成
* IO延迟关闭过程
*/
acl_aio_iocp_close(astream);
else
} else {
acl_event_enable_write(event, astream->stream,
astream->timeout, __connect_notify_callback,
astream);
}
return;
}
#ifdef ACL_WINDOWS
/* 如果是基于 win32 窗口消息的事件引擎则需要取消之前设置的超时定时器 */
if (astream->aio->event_mode == ACL_EVENT_WMSG)
if (astream->aio->event_mode == ACL_EVENT_WMSG) {
acl_aio_cancel_timer(astream->aio, ConnectTimer, astream);
}
#endif
if ((event_type & ACL_EVENT_XCPT) != 0) {
@ -90,8 +94,9 @@ static void __connect_notify_callback(int event_type, ACL_EVENT *event,
errlen = sizeof(err);
ret = getsockopt(ACL_VSTREAM_SOCK(acl_aio_vstream(astream)),
SOL_SOCKET, SO_ERROR, (char *) &err, &errlen);
if (ret >= 0)
if (ret >= 0) {
acl_set_error(err);
}
#if defined(ACL_SUNOS5)
/*
@ -100,14 +105,16 @@ static void __connect_notify_callback(int event_type, ACL_EVENT *event,
* connect and just returns EPIPE. Create a fake
* error message for connect. -- fenner@parc.xerox.com
*/
if (ret < 0 && errno == EPIPE)
if (ret < 0 && errno == EPIPE) {
acl_set_error(ACL_ENOTCONN);
}
#endif
if (errno == 0 || errno == ACL_EISCONN)
if (errno == 0 || errno == ACL_EISCONN) {
event_type = ACL_EVENT_CONNECT;
else if ((event_type & ACL_EVENT_CONNECT) == 0)
} else if ((event_type & ACL_EVENT_CONNECT) == 0) {
event_type |= ACL_EVENT_XCPT;
}
if ((event_type & ACL_EVENT_XCPT) != 0) {
astream->flag |= ACL_AIO_FLAG_DEAD;
@ -115,8 +122,9 @@ static void __connect_notify_callback(int event_type, ACL_EVENT *event,
return;
}
if ((event_type & ACL_EVENT_CONNECT) == 0)
if ((event_type & ACL_EVENT_CONNECT) == 0) {
acl_msg_fatal("%s: unknown event: %d", myname, event_type);
}
/* 将引用计数加1以防止在 connect_fn 内部调用了关闭过程connect_fn
* -1
@ -134,35 +142,40 @@ static void __connect_notify_callback(int event_type, ACL_EVENT *event,
acl_fifo_init(&connect_handles);
acl_foreach_reverse(iter, astream->connect_handles) {
AIO_CONNECT_HOOK *handle = (AIO_CONNECT_HOOK*) iter.data;
if (handle->disable)
if (handle->disable) {
continue;
}
acl_fifo_push(&connect_handles, handle);
}
while (1) {
AIO_CONNECT_HOOK *handle = acl_fifo_pop(&connect_handles);
if (handle == NULL)
if (handle == NULL) {
break;
}
ret = handle->callback(astream, handle->ctx);
if (ret == 0)
if (ret == 0) {
continue;
}
astream->nrefer--;
if (ret < 0 || astream->flag & ACL_AIO_FLAG_IOCP_CLOSE)
if (ret < 0 || astream->flag & ACL_AIO_FLAG_IOCP_CLOSE) {
acl_aio_iocp_close(astream);
}
return;
}
}
astream->nrefer--;
if (ret < 0)
if (ret < 0) {
acl_aio_iocp_close(astream);
else if ((astream->flag & ACL_AIO_FLAG_IOCP_CLOSE))
} else if ((astream->flag & ACL_AIO_FLAG_IOCP_CLOSE)) {
/* 之前该流已经被设置了IO完成延迟关闭标志位
* IO完成延迟关闭过程
*/
acl_aio_iocp_close(astream);
}
}
ACL_ASTREAM *acl_aio_connect(ACL_AIO *aio, const char *addr, int timeout)
@ -171,8 +184,9 @@ ACL_ASTREAM *acl_aio_connect(ACL_AIO *aio, const char *addr, int timeout)
ACL_ASTREAM *astream;
ACL_VSTREAM *cstream;
if (aio == NULL || addr == NULL || *addr == 0)
if (aio == NULL || addr == NULL || *addr == 0) {
acl_msg_fatal("%s: input invalid", myname);
}
#ifdef ACL_EVENTS_STYLE_IOCP
if (aio->event_mode == ACL_EVENT_KERNEL) {
@ -183,26 +197,32 @@ ACL_ASTREAM *acl_aio_connect(ACL_AIO *aio, const char *addr, int timeout)
aio->rbuf_size, timeout, ACL_VSTREAM_TYPE_SOCK);
acl_assert(cstream);
acl_vstream_set_peer(cstream, addr);
} else
#endif
} else {
cstream = acl_vstream_connect(addr, ACL_NON_BLOCKING,
0, 0, aio->rbuf_size);
}
#else
cstream = acl_vstream_connect(addr, ACL_NON_BLOCKING,
0, 0, aio->rbuf_size);
#endif
if (cstream == NULL) {
acl_msg_error("%s: connect addr(%s) error", myname, addr);
return (NULL);
return NULL;
}
cstream->flag |= ACL_VSTREAM_FLAG_CONNECTING;
astream = acl_aio_open(aio, cstream);
if (astream == NULL)
if (astream == NULL) {
acl_msg_fatal("%s: open astream error", myname);
}
#ifdef ACL_WINDOWS
if (timeout > 0 && aio->event_mode == ACL_EVENT_WMSG)
if (timeout > 0 && aio->event_mode == ACL_EVENT_WMSG) {
acl_aio_request_timer(aio, ConnectTimer, astream,
timeout * 1000000, 0);
}
#endif
astream->error = acl_last_error();
acl_aio_ctl(astream, ACL_AIO_CTL_TIMEOUT, timeout, ACL_AIO_CTL_END);
@ -210,3 +230,170 @@ ACL_ASTREAM *acl_aio_connect(ACL_AIO *aio, const char *addr, int timeout)
WRITE_SAFE_ENABLE(astream, __connect_notify_callback);
return astream;
}
/*--------------------------------------------------------------------------*/
typedef struct {
ACL_AIO *aio;
ACL_AIO_CONNECT_FN callback;
void *context;
int port;
int timeout;
ACL_ARGV *ip_list;
} RESOLVE_CTX;
static void resolve_ctx_free(RESOLVE_CTX *ctx)
{
acl_argv_free(ctx->ip_list);
acl_myfree(ctx);
}
static int connect_callback(ACL_ASTREAM *astream, void *context);
static int connect_timeout(ACL_ASTREAM *astream acl_unused, void *context);
static int connect_failed(ACL_ASTREAM *astream acl_unused, void *context)
{
RESOLVE_CTX *ctx = (RESOLVE_CTX *) context;
acl_aio_del_connect_hook(astream, connect_callback, context);
acl_aio_del_timeo_hook(astream, connect_timeout, context);
acl_aio_del_close_hook(astream, connect_failed, context);
acl_set_error(ACL_ECONNREFUSED);
ctx->callback(NULL, ctx->context);
resolve_ctx_free(ctx);
return -1;
}
static int connect_timeout(ACL_ASTREAM *astream acl_unused, void *context)
{
RESOLVE_CTX *ctx = (RESOLVE_CTX *) context;
acl_aio_del_connect_hook(astream, connect_callback, context);
acl_aio_del_timeo_hook(astream, connect_timeout, context);
acl_aio_del_close_hook(astream, connect_failed, context);
acl_set_error(ACL_ETIMEDOUT);
ctx->callback(NULL, ctx->context);
resolve_ctx_free(ctx);
return -1;
}
static int connect_callback(ACL_ASTREAM *astream, void *context)
{
RESOLVE_CTX *ctx = (RESOLVE_CTX *) context;
acl_aio_del_connect_hook(astream, connect_callback, context);
acl_aio_del_timeo_hook(astream, connect_timeout, context);
acl_aio_del_close_hook(astream, connect_failed, context);
acl_set_error(0);
ctx->callback(astream, ctx->context);
resolve_ctx_free(ctx);
return 0;
}
static void dns_lookup_callback(ACL_DNS_DB *db, void *context,
int errnum acl_unused)
{
RESOLVE_CTX *ctx = (RESOLVE_CTX *) context;
ACL_ITER iter;
ACL_ASTREAM *astream = NULL;
if (db == NULL) {
acl_set_error(-1);
ctx->callback(NULL, ctx->context);
resolve_ctx_free(ctx);
return;
}
acl_foreach(iter, db) {
const ACL_HOST_INFO *info = (const ACL_HOST_INFO *) iter.data;
acl_argv_add(ctx->ip_list, info->ip, NULL);
}
if (acl_argv_size(ctx->ip_list) <= 0) {
acl_set_error(-2);
ctx->callback(NULL, ctx->context);
resolve_ctx_free(ctx);
return;
}
acl_foreach(iter, ctx->ip_list) {
char addr[128];
const char *ip = (const char *) iter.data;
snprintf(addr, sizeof(addr), "%s|%d", ip, ctx->port);
astream = acl_aio_connect(ctx->aio, addr, ctx->timeout);
if (astream != NULL) {
break;
}
}
if (astream != NULL) {
acl_aio_add_connect_hook(astream, connect_callback, ctx);
acl_aio_add_timeo_hook(astream, connect_timeout, ctx);
acl_aio_add_close_hook(astream, connect_failed, ctx);
} else {
acl_set_error(ACL_ECONNREFUSED);
ctx->callback(NULL, ctx->context);
resolve_ctx_free(ctx);
}
}
int acl_aio_connect_addr(ACL_AIO *aio, const char *addr, int timeout,
ACL_AIO_CONNECT_FN callback, void *context)
{
char buf[128], *ptr;
int port;
RESOLVE_CTX *ctx;
if (aio->dns == NULL) {
acl_msg_error("%s(%d), %s: call acl_aio_set_dns first",
__FILE__, __LINE__, __FUNCTION__);
return -1;
}
ACL_SAFE_STRNCPY(buf, addr, sizeof(buf));
ptr = strrchr(buf, '|');
if (ptr == NULL) {
ptr = strrchr(buf, ':');
}
if (ptr == NULL || *(ptr + 1) == 0) {
acl_msg_error("%s(%d), %s: invalid addr=%s",
__FILE__, __LINE__, __FUNCTION__, addr);
return -1;
}
*ptr++ = 0;
port = atoi(ptr);
if (port <= 0 || port > 65535) {
acl_msg_error("%s(%d), %s: invalid port=%d, addr=%s",
__FILE__, __LINE__, __FUNCTION__, port, addr);
return -1;
}
ctx = (RESOLVE_CTX *) acl_mycalloc(1, sizeof(RESOLVE_CTX));
ctx->aio = aio;
ctx->callback = callback;
ctx->context = context;
ctx->port = port;
ctx->timeout = timeout;
ctx->ip_list = acl_argv_alloc(5);
if (acl_is_ip(buf) || acl_valid_unix(buf)) {
ACL_ASTREAM *astream = acl_aio_connect(aio, addr, timeout);
if (astream == NULL) {
resolve_ctx_free(ctx);
return -1;
}
acl_aio_add_connect_hook(astream, connect_callback, ctx);
acl_aio_add_timeo_hook(astream, connect_timeout, ctx);
acl_aio_add_close_hook(astream, connect_failed, ctx);
} else {
acl_dns_lookup(aio->dns, buf, dns_lookup_callback, ctx);
}
return 0;
}

View File

@ -9,6 +9,7 @@ extern "C" {
#include "stdlib/acl_vstream.h"
#include "stdlib/acl_vstring.h"
#include "stdlib/acl_array.h"
#include "net/acl_dns.h"
#include "aio/acl_aio.h"
struct ACL_AIO {
@ -23,6 +24,7 @@ struct ACL_AIO {
int timer_active;
unsigned int tid;
#endif
ACL_DNS *dns;
};
typedef struct AIO_READ_HOOK {

View File

@ -23,20 +23,22 @@ void acl_mask_addr(unsigned char *addr_bytes,
{
unsigned char *p;
if (network_bits > addr_byte_count * CHAR_BIT)
acl_msg_panic("mask_addr: address byte count %d"
" too small for bit count %d",
addr_byte_count, network_bits);
if (network_bits > addr_byte_count * CHAR_BIT) {
acl_msg_panic("mask_addr: address byte count %d too small for"
" bit count %d", addr_byte_count, network_bits);
}
p = addr_bytes + network_bits / CHAR_BIT;
network_bits %= CHAR_BIT;
/* "using unsigned" is just avoiding gcc6.1 warning */
if (network_bits != 0)
if (network_bits != 0) {
*p++ &= (unsigned char) ~0 << (unsigned)
(CHAR_BIT - network_bits);
}
while (p < addr_bytes + addr_byte_count)
while (p < addr_bytes + addr_byte_count) {
*p++ = 0;
}
}

View File

@ -41,6 +41,8 @@ struct ACL_DNS_REQ{
ACL_DNS *dns;
};
#define SAFE_COPY ACL_SAFE_STRNCPY
static void dns_stream_open(ACL_DNS *dns);
/* ACL_VSTREAM: 从数据流读取数据的回调函数 */
@ -52,6 +54,9 @@ static int dns_read(ACL_SOCKET fd, void *buf, size_t size,
ACL_DNS *dns = (ACL_DNS*) arg;
int ret;
/* xxx: 必须先将系统可读标志位置0以免引起事件引擎的重复触发 */
stream->read_ready = 0;
dns->addr_from.addr_len = sizeof(dns->addr_from.addr);
#ifdef ACL_UNIX
ret = (int) recvfrom(fd, buf, size, 0,
@ -61,12 +66,13 @@ static int dns_read(ACL_SOCKET fd, void *buf, size_t size,
ret = recvfrom(fd, (char*) buf, (int) size, 0,
(struct sockaddr*) &dns->addr_from.addr,
&dns->addr_from.addr_len);
#else
#else
#error "unknown OS"
#endif
if (ret < 0)
acl_msg_error("%s, %s(%d): recvfrom error(%s)",
__FILE__, myname, __LINE__, acl_last_serror());
#endif
if (ret < 0) {
acl_msg_error("%s, %s(%d): recvfrom error(%s), ret=%d",
__FILE__, myname, __LINE__, acl_last_serror(), ret);
}
return ret;
}
@ -75,22 +81,23 @@ static int dns_read(ACL_SOCKET fd, void *buf, size_t size,
static int dns_write(ACL_SOCKET fd, const void *buf, size_t size,
int timeout acl_unused, ACL_VSTREAM *stream acl_unused, void *arg)
{
const char *myname = "dns_write";
ACL_DNS *dns = (ACL_DNS*) arg;
int ret;
unsigned short i;
ACL_DNS_ADDR *addr;
if (dns->dns_list->count <= 0)
if (dns->dns_list->count <= 0) {
acl_msg_fatal("%s(%d): dns_list's size(%d) invalid",
myname, __LINE__, dns->dns_list->count);
__FUNCTION__, __LINE__, dns->dns_list->count);
}
/* 根据当前ID号取模获得目标DNS地址 */
i = dns->qid % dns->dns_list->count;
addr = acl_array_index(dns->dns_list, i);
if (addr == NULL)
if (addr == NULL) {
acl_msg_fatal("%s(%d): addr null for %d",
myname, __LINE__, i);
__FUNCTION__, __LINE__, i);
}
#ifdef ACL_UNIX
ret = (int) sendto(fd, buf, size, 0,
@ -114,8 +121,9 @@ static ACL_DNS_DB *build_dns_db(const rfc1035_message *res, int count,
ACL_SOCKADDR *saddr;
int i;
if (ttl_min)
if (ttl_min) {
*ttl_min = 100000000;
}
for (i = 0; i < count; i++) {
if (res->answer[i].type == RFC1035_TYPE_A) {
@ -138,7 +146,7 @@ static ACL_DNS_DB *build_dns_db(const rfc1035_message *res, int count,
/* 目前该模块仅支持 IPV4 */
saddr->sa.sa_family = AF_INET;
if (inet_ntop(AF_INET, &saddr->in.sin_addr,
if (!inet_ntop(AF_INET, &saddr->in.sin_addr,
phost->ip, sizeof(phost->ip))) {
continue;
@ -161,32 +169,32 @@ static ACL_DNS_DB *build_dns_db(const rfc1035_message *res, int count,
static int dns_lookup_callback(ACL_ASTREAM *astream acl_unused, void *ctx,
char *data, int dlen)
{
const char *myname = "dns_lookup_callback";
ACL_DNS *dns = (ACL_DNS*) ctx;
int ret;
ACL_DNS_REQ *handle;
ACL_DNS_REQ *req;
rfc1035_message *res;
char key[RFC1035_MAXHOSTNAMESZ + 16];
/* 解析DNS响应数据包 */
ret = rfc1035MessageUnpack(data, dlen, &res);
if (ret < 0) {
if (res == NULL)
return (0);
if (res == NULL) {
return 0;
}
snprintf(key, sizeof(key), "%s:%d", res->query->name, res->id);
acl_lowercase(key);
handle = acl_htable_find(dns->lookup_table, key);
req = acl_htable_find(dns->lookup_table, key);
if (handle) {
void (*callback)(ACL_DNS_DB*, void*, int) = handle->callback;
void *arg = handle->ctx;
if (req) {
void (*callback)(ACL_DNS_DB*, void*, int) = req->callback;
void *arg = req->ctx;
/* 取消定时器 */
acl_aio_cancel_timer(dns->aio, dns->lookup_timeout, handle);
acl_aio_cancel_timer(dns->aio, dns->lookup_timeout, req);
/* 释放该查询对象 */
acl_htable_delete(dns->lookup_table, handle->key, NULL);
acl_myfree(handle);
acl_htable_delete(dns->lookup_table, req->key, NULL);
acl_myfree(req);
/* 通知应用查询失败 */
callback(NULL, arg, res->rcode);
}
@ -209,7 +217,7 @@ static int dns_lookup_callback(ACL_ASTREAM *astream acl_unused, void *ctx,
addr = acl_array_index(dns->dns_list, i);
if (addr == NULL)
acl_msg_fatal("%s(%d): addr null for %d",
myname, __LINE__, i);
__FUNCTION__, __LINE__, i);
if (dns->addr_from.addr.in.sin_addr.s_addr
!= addr->addr.in.sin_addr.s_addr) {
@ -221,7 +229,7 @@ static int dns_lookup_callback(ACL_ASTREAM *astream acl_unused, void *ctx,
inet_ntop(AF_INET, &addr->addr.in.sin_addr,
to, sizeof(to));
acl_msg_warn("%s(%d): from(%s) != to(%s)",
myname, __LINE__, from, to);
__FUNCTION__, __LINE__, from, to);
rfc1035MessageDestroy(res);
return 0;
}
@ -239,7 +247,7 @@ static int dns_lookup_callback(ACL_ASTREAM *astream acl_unused, void *ctx,
addr = acl_array_index(dns->dns_list, i);
if (addr == NULL)
acl_msg_fatal("%s(%d): addr null for %d",
myname, __LINE__, i);
__FUNCTION__, __LINE__, i);
in.s_addr = dns->addr_from.addr.in.sin_addr.s_addr;
acl_mask_addr((unsigned char*) &in.s_addr,
@ -252,7 +260,7 @@ static int dns_lookup_callback(ACL_ASTREAM *astream acl_unused, void *ctx,
inet_ntop(AF_INET, &addr->addr.in.sin_addr,
to, sizeof(to));
acl_msg_warn("%s(%d): from(%s) != to(%s)",
myname, __LINE__, from, to);
__FUNCTION__, __LINE__, from, to);
rfc1035MessageDestroy(res);
return 0;
}
@ -260,19 +268,19 @@ static int dns_lookup_callback(ACL_ASTREAM *astream acl_unused, void *ctx,
acl_lowercase(res->query->name);
snprintf(key, sizeof(key), "%s:%d", res->query->name, res->id);
handle = acl_htable_find(dns->lookup_table, key);
if (handle != NULL) {
req = acl_htable_find(dns->lookup_table, key);
if (req != NULL) {
int ttl_min;
void (*callback)(ACL_DNS_DB*, void*, int) = handle->callback;
void *arg = handle->ctx;
void (*callback)(ACL_DNS_DB*, void*, int) = req->callback;
void *arg = req->ctx;
ACL_DNS_DB *dns_db = build_dns_db(res, ret,
(unsigned int*) &ttl_min);
/* 取消定时器 */
acl_aio_cancel_timer(dns->aio, dns->lookup_timeout, handle);
acl_aio_cancel_timer(dns->aio, dns->lookup_timeout, req);
/* 释放该查询对象 */
acl_htable_delete(dns->lookup_table, handle->key, NULL);
acl_myfree(handle);
acl_htable_delete(dns->lookup_table, req->key, NULL);
acl_myfree(req);
/* 回调函数用户的回调函数 */
callback(dns_db, arg, res->rcode);
@ -312,52 +320,45 @@ static int dns_lookup_error(ACL_ASTREAM *server acl_unused, void *ctx acl_unused
acl_msg_warn("%s(%d): dns_lookup error %s",
myname, __LINE__, acl_last_serror());
#endif
return (-1);
return -1;
}
/* 创建DNS查询的异步流 */
static void dns_stream_open(ACL_DNS *dns)
{
const char *myname = "dns_stream_open";
ACL_SOCKET fd = socket(PF_INET, SOCK_DGRAM, 0);
ACL_VSTREAM *vstream;
ACL_VSTREAM *stream = acl_vstream_bind("0.0.0.0:0", 0, 0);
assert(stream);
if (fd == ACL_SOCKET_INVALID)
acl_msg_fatal("%s: socket create error", myname);
/* 创建 ACL_VSTREAM 流并设置读写接口 */
vstream = acl_vstream_fdopen(fd, O_RDWR, 1024, 0, ACL_VSTREAM_TYPE_SOCK);
acl_vstream_ctl(vstream,
/* 创建异步流 */
dns->astream = acl_aio_open(dns->aio, stream);
acl_vstream_ctl(stream,
ACL_VSTREAM_CTL_READ_FN, dns_read,
ACL_VSTREAM_CTL_WRITE_FN, dns_write,
ACL_VSTREAM_CTL_CONTEXT, dns,
ACL_VSTREAM_CTL_END);
/* 创建异步流 */
dns->astream = acl_aio_open(dns->aio, vstream);
/* 设置查询套接口可读时的回调函数 */
/* 设置查询套接口可读、关闭时的回调函数 */
acl_aio_add_read_hook(dns->astream, dns_lookup_callback, dns);
acl_aio_add_close_hook(dns->astream, dns_lookup_error, dns);
/* 设置该异步流为持续读状态 */
dns->astream->keep_read = 1;
}
static int dns_lookup_send(ACL_DNS *dns, ACL_DNS_REQ *handle, const char *domain)
static int dns_lookup_send(ACL_DNS *dns, ACL_DNS_REQ *req, const char *domain)
{
const char *myname = "dns_lookup_send";
char buf[1024];
int ret;
char buf[1024];
int ret;
memset(buf, 0, sizeof(buf));
/* 创建DNS查询数据包 */
ret = (int) rfc1035BuildAQuery(domain, buf, sizeof(buf), dns->qid, NULL);
if (ret < 0) {
acl_msg_error("%s(%d): rfc1035BuildAQuery error for(%s)",
myname, __LINE__, domain);
return (-1);
__FUNCTION__, __LINE__, domain);
return -1;
}
/* 增加ID号 */
@ -368,8 +369,8 @@ static int dns_lookup_send(ACL_DNS *dns, ACL_DNS_REQ *handle, const char *domain
/* 设置定时器 */
acl_aio_request_timer(dns->aio, dns->lookup_timeout,
handle, dns->timeout * 1000000, 0);
return (0);
req, dns->timeout * 1000000, 0);
return 0;
}
/* 查询超时的回调函数 */
@ -377,31 +378,32 @@ static int dns_lookup_send(ACL_DNS *dns, ACL_DNS_REQ *handle, const char *domain
static void dns_lookup_timeout(int event_type, ACL_EVENT *event acl_unused,
void *context)
{
const char *myname = "dns_lookup_timeout";
ACL_DNS_REQ *handle = (ACL_DNS_REQ*) context;
ACL_DNS *dns = handle->dns;
void (*callback)(ACL_DNS_DB*, void*, int) = handle->callback;
void *arg = handle->ctx;
ACL_DNS_REQ *req = (ACL_DNS_REQ*) context;
ACL_DNS *dns = req->dns;
void (*callback)(ACL_DNS_DB*, void*, int) = req->callback;
void *arg = req->ctx;
if (event_type != ACL_EVENT_TIME) {
acl_msg_warn("%s(%d): invalid event_type(%d)",
myname, __LINE__, event_type);
__FUNCTION__, __LINE__, event_type);
}
if (++handle->nretry <= dns->retry_limit) {
if (++req->nretry <= dns->retry_limit) {
char domain[RFC1035_MAXHOSTNAMESZ + 16], *ptr;
ACL_SAFE_STRNCPY(domain, handle->key, sizeof(domain));
SAFE_COPY(domain, req->key, sizeof(domain));
ptr = strchr(domain, ':');
if (ptr)
if (ptr) {
*ptr = 0;
if (dns_lookup_send(dns, handle, domain) == 0)
}
if (dns_lookup_send(dns, req, domain) == 0) {
return;
}
}
/* 释放该查询对象 */
acl_htable_delete(handle->dns->lookup_table, handle->key, NULL);
acl_myfree(handle);
acl_htable_delete(req->dns->lookup_table, req->key, NULL);
acl_myfree(req);
/* 回调函数用户的回调函数 */
callback(NULL, arg, ACL_DNS_ERR_TIMEOUT);
@ -409,18 +411,20 @@ static void dns_lookup_timeout(int event_type, ACL_EVENT *event acl_unused,
void acl_dns_init(ACL_DNS *dns, ACL_AIO *aio, int timeout)
{
dns->flag &= ~ACL_DNS_FLAG_ALLOC; /* 默认为栈空间 */
dns->aio = aio;
dns->timeout = timeout > 0 ? timeout : 5;
dns->qid = 0;
dns->dns_idx = 0;
dns->flag &= ~ACL_DNS_FLAG_ALLOC; /* 默认为栈空间 */
dns->aio = aio;
dns->timeout = timeout > 0 ? timeout : 5;
dns->qid = 0;
dns->dns_idx = 0;
dns->retry_limit = 0;
/* 创建DNS服务器地址数组 */
dns->dns_list = acl_array_create(10);
/* 创建查询对象表 */
dns->lookup_table = acl_htable_create(1024, 0);
/* 设置 DNS 查询超时的回调函数*/
dns->lookup_timeout = dns_lookup_timeout;
/* 打开异步读取DNS服务器响应的数据流 */
@ -436,7 +440,7 @@ ACL_DNS *acl_dns_create(ACL_AIO *aio, int timeout)
acl_dns_init(dns, aio, timeout);
dns->flag |= ACL_DNS_FLAG_ALLOC; /* 设置为堆分配的变量 */
return (dns);
return dns;
}
void acl_dns_close(ACL_DNS *dns)
@ -444,8 +448,8 @@ void acl_dns_close(ACL_DNS *dns)
ACL_ITER iter;
acl_foreach(iter, dns->lookup_table) {
ACL_DNS_REQ *handle = (ACL_DNS_REQ*) iter.data;
acl_myfree(handle);
ACL_DNS_REQ *req = (ACL_DNS_REQ*) iter.data;
acl_myfree(req);
}
acl_htable_free(dns->lookup_table, NULL);
@ -455,24 +459,26 @@ void acl_dns_close(ACL_DNS *dns)
dns->dns_cache = NULL;
}
acl_aio_iocp_close(dns->astream);
dns->aio = NULL;
dns->aio = NULL;
dns->astream = NULL;
acl_array_destroy(dns->dns_list, acl_myfree_fn);
if (dns->groups) {
acl_foreach(iter, dns->groups) {
ACL_DOMAIN_GROUP *tmp = (ACL_DOMAIN_GROUP*) iter.data;
if (tmp->excepts)
if (tmp->excepts) {
acl_argv_free(tmp->excepts);
}
acl_myfree(tmp);
}
acl_array_destroy(dns->groups, NULL);
}
if ((dns->flag & ACL_DNS_FLAG_ALLOC))
if ((dns->flag & ACL_DNS_FLAG_ALLOC)) {
acl_myfree(dns);
else
} else {
dns->flag = 0;
}
}
void acl_dns_check_dns_ip(ACL_DNS *dns)
@ -500,11 +506,9 @@ static void cache_free_fn(const ACL_CACHE2_INFO *info acl_unused, void *arg)
void acl_dns_open_cache(ACL_DNS *dns, int limit)
{
if (dns->dns_cache)
return;
if (limit <= 0)
return;
dns->dns_cache = acl_cache2_create(limit, cache_free_fn);
if (dns->dns_cache == NULL && limit > 0) {
dns->dns_cache = acl_cache2_create(limit, cache_free_fn);
}
}
void acl_dns_add_dns(ACL_DNS *dns, const char *dns_ip,
@ -523,7 +527,7 @@ void acl_dns_add_dns(ACL_DNS *dns, const char *dns_ip,
/* 设置DNS服务器地址 */
ACL_SAFE_STRNCPY(addr->ip, dns_ip, sizeof(addr->ip));
SAFE_COPY(addr->ip, dns_ip, sizeof(addr->ip));
addr->port = dns_port;
memset(&addr->addr, 0, sizeof(addr->addr));
@ -533,8 +537,8 @@ void acl_dns_add_dns(ACL_DNS *dns, const char *dns_ip,
addr->addr_len = sizeof(struct sockaddr_in);
addr->in.s_addr = addr->addr.in.sin_addr.s_addr;
acl_mask_addr((unsigned char*) &addr->addr.in.sin_addr.s_addr,
sizeof(addr->addr.in.sin_addr.s_addr), mask_length);
acl_mask_addr((unsigned char*) &addr->in.s_addr,
sizeof(addr->in.s_addr), mask_length);
/* 将该DNS地址添加进数组中 */
@ -559,7 +563,7 @@ void acl_dns_add_host(ACL_DNS *dns, const char *domain, const char *ip_list)
char *ip = (char*) iter.data;
ACL_HOSTNAME *phost = acl_mycalloc(1, sizeof(ACL_HOSTNAME));
ACL_SAFE_STRNCPY(phost->ip, ip, sizeof(phost->ip));
SAFE_COPY(phost->ip, ip, sizeof(phost->ip));
phost->saddr.sa.sa_family = AF_INET;
phost->saddr.in.sin_addr.s_addr = inet_addr(ip);
(void) acl_array_append(dns_db->h_db, phost);
@ -579,8 +583,9 @@ void acl_dns_add_group(ACL_DNS *dns, const char *group, const char *refer,
ACL_DOMAIN_GROUP *dmgrp;
ACL_ITER iter;
if (dns->groups == NULL)
if (dns->groups == NULL) {
dns->groups = acl_array_create(10);
}
acl_foreach(iter, dns->groups) {
dmgrp = (ACL_DOMAIN_GROUP*) iter.data;
@ -593,32 +598,34 @@ void acl_dns_add_group(ACL_DNS *dns, const char *group, const char *refer,
dmgrp = (ACL_DOMAIN_GROUP*) acl_mycalloc(1, sizeof(ACL_DOMAIN_GROUP));
ACL_SAFE_STRNCPY(dmgrp->group, group, sizeof(dmgrp->group));
SAFE_COPY(dmgrp->group, group, sizeof(dmgrp->group));
acl_lowercase(dmgrp->group);
dmgrp->group_len = (int) strlen(dmgrp->group);
if (refer == NULL || *refer == 0)
ACL_SAFE_STRNCPY(dmgrp->domain, dmgrp->group, sizeof(dmgrp->domain));
else {
ACL_SAFE_STRNCPY(dmgrp->domain, refer, sizeof(dmgrp->domain));
if (refer == NULL || *refer == 0) {
SAFE_COPY(dmgrp->domain, dmgrp->group, sizeof(dmgrp->domain));
} else {
SAFE_COPY(dmgrp->domain, refer, sizeof(dmgrp->domain));
acl_lowercase(dmgrp->domain);
}
if (excepts)
if (excepts) {
dmgrp->excepts = acl_argv_split(excepts, ",; \t");
else
} else {
dmgrp->excepts = NULL;
}
acl_array_append(dns->groups, dmgrp);
if (ip_list && *ip_list)
if (ip_list && *ip_list) {
acl_dns_add_host(dns, dmgrp->domain, ip_list);
}
}
ACL_DNS_REQ *acl_dns_lookup(ACL_DNS *dns, const char *domain_in,
void (*callback)(ACL_DNS_DB *, void *, int), void *ctx)
{
char key[RFC1035_MAXHOSTNAMESZ + 16], domain[RFC1035_MAXHOSTNAMESZ];
ACL_DNS_REQ *handle;
ACL_DNS_REQ *req;
/* 先检查是否匹配域名组 */
if (dns->groups) {
@ -627,9 +634,13 @@ ACL_DNS_REQ *acl_dns_lookup(ACL_DNS *dns, const char *domain_in,
acl_foreach(iter, dns->groups) {
ACL_ITER iter2;
ACL_DOMAIN_GROUP *tmp = (ACL_DOMAIN_GROUP*) iter.data;
#define NEQ acl_strrncasecmp
/* 先找到域名组对象 */
if (acl_strrncasecmp(tmp->group, domain_in, tmp->group_len))
if (NEQ(tmp->group, domain_in, tmp->group_len)) {
continue;
}
/* 检查该域名是否是域名组的例外域名 */
if (!tmp->excepts) {
dmgrp = tmp;
@ -637,18 +648,21 @@ ACL_DNS_REQ *acl_dns_lookup(ACL_DNS *dns, const char *domain_in,
}
acl_foreach(iter2, tmp->excepts) {
char *except = (char*) iter2.data;
if (strcasecmp(except, domain_in) == 0)
if (strcasecmp(except, domain_in) == 0) {
goto END_FOREACH_TAG;
}
}
}
END_FOREACH_TAG:
if (dmgrp)
ACL_SAFE_STRNCPY(domain, dmgrp->domain, sizeof(domain));
else
ACL_SAFE_STRNCPY(domain, domain_in, sizeof(domain));
} else
ACL_SAFE_STRNCPY(domain, domain_in, sizeof(domain));
if (dmgrp) {
SAFE_COPY(domain, dmgrp->domain, sizeof(domain));
} else {
SAFE_COPY(domain, domain_in, sizeof(domain));
}
} else {
SAFE_COPY(domain, domain_in, sizeof(domain));
}
acl_lowercase(domain);
@ -664,44 +678,48 @@ END_FOREACH_TAG:
snprintf(key, sizeof(key), "%s:%d", domain, dns->qid);
acl_lowercase(key);
handle = (ACL_DNS_REQ*) acl_htable_find(dns->lookup_table, key);
req = (ACL_DNS_REQ*) acl_htable_find(dns->lookup_table, key);
/* XXX: 不应存在相同的键存在, 因为该键是由域名及自动ID组成 */
if (handle != NULL) {
acl_msg_warn("%s(%d): key(%s) exist", __FUNCTION__, __LINE__, key);
if (req != NULL) {
acl_msg_warn("%s(%d): key(%s) exist",
__FUNCTION__, __LINE__, key);
callback(NULL, ctx, ACL_DNS_ERR_EXIST);
return NULL;
}
/* 分配新的查询对象 */
handle = (ACL_DNS_REQ*) acl_mycalloc(1, sizeof(ACL_DNS_REQ));
handle->dns = dns;
handle->callback = callback;
handle->ctx = ctx;
handle->qid = dns->qid;
ACL_SAFE_STRNCPY(handle->key, key, sizeof(handle->key));
req = (ACL_DNS_REQ*) acl_mycalloc(1, sizeof(ACL_DNS_REQ));
req->dns = dns;
req->callback = callback;
req->ctx = ctx;
req->qid = dns->qid;
SAFE_COPY(req->key, key, sizeof(req->key));
/* 添加进查询对象表中 */
if (acl_htable_enter(dns->lookup_table, key, handle) == NULL)
if (acl_htable_enter(dns->lookup_table, key, req) == NULL) {
acl_msg_fatal("%s(%d): enter htable error(%s)",
__FUNCTION__, __LINE__, acl_last_serror());
}
if (dns_lookup_send(dns, handle, domain) < 0) {
if (dns_lookup_send(dns, req, domain) < 0) {
acl_htable_delete(dns->lookup_table, key, NULL);
acl_myfree(handle);
acl_myfree(req);
callback(NULL, ctx, ACL_DNS_ERR_BUILD_REQ);
return NULL;
}
return handle;
return req;
}
void acl_dns_cancel(ACL_DNS_REQ *handle)
void acl_dns_cancel(ACL_DNS_REQ *req)
{
if (handle == NULL || handle->dns == NULL) {
if (req == NULL || req->dns == NULL) {
acl_msg_error("%s(%d): input error", __FUNCTION__, __LINE__);
return;
}
acl_htable_delete(handle->dns->lookup_table, handle->key, NULL);
acl_myfree(handle);
acl_htable_delete(req->dns->lookup_table, req->key, NULL);
acl_myfree(req);
}
const char *acl_dns_serror(int errnum)
@ -738,7 +756,7 @@ const char *acl_dns_serror(int errnum)
{ ACL_DNS_ERR_BUILD_REQ, "Can't build query packet" },
{ 0, 0 }
};
const char *unknown = "Unknown Error";
static const char *unknown = "Unknown Error";
size_t i;
for (i = 0; errmsg[i].msg != NULL; ++i) {

View File

@ -73,8 +73,8 @@ static int udp_res_lookup(ACL_RES *res, const char *data, int dlen, char *buf, i
acl_msg_fatal("%s: socket create error", myname);
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(res->dns_port);
addr.sin_family = AF_INET;
addr.sin_port = htons(res->dns_port);
addr.sin_addr.s_addr = inet_addr(res->dns_ip);
ret = sendto(fd, data, dlen, 0, (struct sockaddr *) &addr,