add websocket in lib_acl_cpp.

This commit is contained in:
zhengshuxin 2016-09-20 19:55:38 +08:00
parent a4387ea170
commit 3846cda043
52 changed files with 2438 additions and 57 deletions

View File

@ -37,8 +37,8 @@
#define DELIMITER_LEFT "$<"
#define DELIMITER_RIGHT ">"
#define SECTIONTAG_HEAD "<!-- "DELIMITER_LEFT
#define SECTIONTAG_TAIL DELIMITER_RIGHT" -->"
#define SECTIONTAG_HEAD "<!-- " DELIMITER_LEFT
#define SECTIONTAG_TAIL DELIMITER_RIGHT " -->"
/* The object, holding template data */
typedef struct template_s tpl_t;

View File

@ -82,10 +82,15 @@ ifneq ($(SYSPATH),)
endif
###########################################################
CFLAGS += -I../../../lib_acl/include -I../../../lib_protocol/include -I../../../lib_acl_cpp/include
ACL_PATH = ../../..
CFLAGS += -I$(ACL_PATH)/lib_acl/include \
-I$(ACL_PATH)/lib_protocol/include \
-I$(ACL_PATH)/lib_acl_cpp/include
EXTLIBS =
LDFLAGS = -L../../../lib_acl_cpp/lib -l_acl_cpp -L../../../lib_protocol/lib -l_protocol -L../../../lib_acl/lib -l_acl \
$(EXTLIBS) $(SYSLIB)
LDFLAGS = -L$(ACL_PATH)/lib_acl_cpp/lib -l_acl_cpp \
-L$(ACL_PATH)/lib_protocol/lib -l_protocol \
-L$(ACL_PATH)/lib_acl/lib -l_acl \
$(EXTLIBS) $(SYSLIB)
COMPILE = $(CC) $(CFLAGS)
LINK = $(CC) $(OBJ) $(LDFLAGS)

View File

@ -82,16 +82,17 @@ ifneq ($(SYSPATH),)
endif
###########################################################
CFLAGS += -I../../../lib_acl/include \
-I../../../lib_protocol/include \
-I../../../lib_acl_cpp/include \
-I../../../lib_fiber/c/include \
-I../../../lib_fiber/cpp/include
ACL_PATH = ../../..
CFLAGS += -I$(ACL_PATH)/lib_acl/include \
-I$(ACL_PATH)/lib_protocol/include \
-I$(ACL_PATH)/lib_acl_cpp/include \
-I$(ACL_PATH)/lib_fiber/c/include \
-I$(ACL_PATH)/lib_fiber/cpp/include
EXTLIBS =
LDFLAGS = -L../../../lib_fiber/lib -l_fiber_cpp -l_fiber \
-L../../../lib_acl_cpp/lib -l_acl_cpp \
-L../../../lib_protocol/lib -l_protocol \
-L../../../lib_acl/lib -l_acl \
LDFLAGS = -L$(ACL_PATH)/lib_fiber/lib -l_fiber_cpp -l_fiber \
-L$(ACL_PATH)/lib_acl_cpp/lib -l_acl_cpp \
-L$(ACL_PATH)/lib_protocol/lib -l_protocol \
-L$(ACL_PATH)/lib_acl/lib -l_acl \
$(EXTLIBS) $(SYSLIB)
COMPILE = $(CC) $(CFLAGS)

View File

@ -0,0 +1,2 @@
include ./Makefile.in
PROG = fiber_chat

View File

@ -0,0 +1,124 @@
CC = g++
CFLAGS = -c -g -W -Wall -Wcast-qual -Wcast-align \
-Waggregate-return -Wno-long-long \
-Wpointer-arith -Werror -Wshadow -O3 \
-D_REENTRANT -D_POSIX_PTHREAD_SEMANTICS -D_USE_FAST_MACRO
###########################################################
#Check system:
# Linux, SunOS, Solaris, BSD variants, AIX, HP-UX
SYSLIB = -lpthread -lz -ldl
CHECKSYSRES = @echo "Unknow system type!";exit 1
UNIXNAME = $(shell uname -sm)
OSTYPE = $(shell uname -p)
RPATH = linux64
ifeq ($(CC),)
CC = gcc
endif
# For FreeBSD
ifeq ($(findstring FreeBSD, $(UNIXNAME)), FreeBSD)
ifeq ($(findstring gcc, $(CC)), gcc)
CFLAGS += -Wstrict-prototypes
endif
CFLAGS += -DFREEBSD -D_REENTRANT
SYSLIB = -lcrypt -lpthread -lz
RPATH = freebsd
endif
# For Darwin
ifeq ($(findstring Darwin, $(UNIXNAME)), Darwin)
CFLAGS += -DMACOSX -Wno-invalid-source-encoding \
-Wno-extended-offsetof
UNIXTYPE = MACOSX
SYSLIB += -liconv -rdynamic
RPATH = macos
endif
#Path for Linux
ifeq ($(findstring Linux, $(UNIXNAME)), Linux)
ifeq ($CC, "gcc")
CFLAGS += -Wstrict-prototypes
endif
ifeq ($(findstring i686, $(OSTYPE)), i686)
RPATH = linux32
endif
ifeq ($(findstring x86_64, $(OSTYPE)), x86_64)
RPATH = linux64
endif
CFLAGS += -DLINUX2 -D_REENTRANT
SYSLIB += -lcrypt
endif
#Path for SunOS
ifeq ($(findstring SunOS, $(UNIXNAME)), SunOS)
ifeq ($(findstring 86, $(UNIXNAME)), 86)
SYSLIB += -lsocket -lnsl -lrt
endif
ifeq ($(findstring sun4u, $(UNIXNAME)), sun4u)
SYSLIB += -lsocket -lnsl -lrt
endif
ifeq ($CC, "gcc")
CFLAGS += -Wstrict-prototypes
endif
CFLAGS += -DSUNOS5 -D_REENTRANT
RPATH = sunos_x86
endif
#Path for HP-UX
ifeq ($(findstring HP-UX, $(UNIXNAME)), HP-UX)
ifeq ($CC, "gcc")
CFLAGS += -Wstrict-prototypes
endif
CFLAGS += -DHP_UX -DHPUX11
PLAT_NAME=hp-ux
endif
#Find system type.
ifneq ($(SYSPATH),)
CHECKSYSRES = @echo "System is $(shell uname -sm)"
endif
###########################################################
ACL_PATH = ../../..
CFLAGS += -I$(ACL_PATH)/lib_acl/include \
-I$(ACL_PATH)/lib_protocol/include \
-I$(ACL_PATH)/lib_acl_cpp/include \
-I$(ACL_PATH)/lib_fiber/c/include \
-I$(ACL_PATH)/lib_fiber/cpp/include
EXTLIBS =
LDFLAGS = -L$(ACL_PATH)/lib_fiber/lib -l_fiber_cpp -l_fiber \
-L$(ACL_PATH)/lib_acl_cpp/lib -l_acl_cpp \
-L$(ACL_PATH)/lib_protocol/lib -l_protocol \
-L$(ACL_PATH)/lib_acl/lib -l_acl \
$(EXTLIBS) $(SYSLIB)
COMPILE = $(CC) $(CFLAGS)
LINK = $(CC) $(OBJ) $(LDFLAGS)
###########################################################
OBJ_PATH = .
#Project's objs
SRC = $(wildcard *.cpp)
OBJ = $(patsubst %.cpp, $(OBJ_PATH)/%.o, $(notdir $(SRC)))
$(OBJ_PATH)/%.o: %.cpp
$(COMPILE) $< -o $@
.PHONY = all clean
all: RM $(OBJ)
$(LINK) -o $(PROG)
@echo ""
@echo "All ok! Output:$(PROG)"
@echo ""
RM:
rm -f $(PROG)
clean:
rm -f $(PROG)
rm -f $(OBJ)
install:
cp $(PROG) ../../../dist/master/libexec/$(RPATH)/
cp $(PROG).cf ../../../dist/master/conf/service/
###########################################################

View File

@ -0,0 +1,107 @@
service fiber_chat
# 进程是否禁止运行
master_disable = no
# 服务地址及端口号
# for master_type = inet
# master_service = 127.0.0.1:5001
# for master_type = unix
# master_service = echo.sock
# for master_type = sock
master_service = 127.0.0.1:5001, 5002
# 服务监听为域套接口
# master_service = aio_echo.sock
# 服务类型
# master_type = inet
# master_type = unix
master_type = sock
# 当子进程异常退出时如果该值非空则将子进程异常退出的消息通知该服务
# master_notify_addr = 127.0.0.1:5801
# 邮件通知接收者
# master_notify_recipients = zhengshuxin@hotmail.com
# 是否允许延迟接受客户端连接如果为0则表示关闭该功能如果大于0则表示打开此功能
# 并且此值代表延迟接受连接的超时值超过此值时如果客户端依然没有发来数据则操作
# 系统会在系统层直接关闭该连接
# master_defer_accept = 0
# 是否只允许私有访问, 如果为 y, 则域套接口创建在 {install_path}/var/log/private/ 目录下,
# 如果为 n, 则域套接口创建在 {install_path}/var/log/public/ 目录下,
master_private = n
master_unpriv = n
# 是否需要 chroot: n -- no, y -- yes
master_chroot = n
# 每隔多长时间触发一次单位为秒(仅对 trigger 模式有效)
master_wakeup = -
# 最大进程数
master_maxproc = 1
# 预启动进程数该值不得大于 master_maxproc
master_prefork = 1
# 进程程序名
master_command = fiber_chat
# 进程日志记录文件
master_log = {install_path}/var/log/fiber_chat
# 调试日志方式格式tag:level; tag:level; tab:level, all:1; 101:2
# master_debug =
# 进程启动参数只能为: -u [是否允许以某普通用户的身份运行]
# master_args =
# 传递给服务子进程的环境变量, 可以通过 getenv("SERVICE_ENV") 获得此值
# master_env = mempool_limit:512000000
# 当启动多个子进程实例时该开关控制多个子进程在接收连接时是否向 acl_master 发送消息报告自己的状态
# master_status_notify = 1
# 是否允许产生 core 文件
# fiber_enable_core = 1
# 每个进程实例处理连接数的最大次数超过此值后进程实例主动退出
fiber_use_limit = 0
# 每个进程实例的空闲超时时间超过此值后进程实例主动退出
fiber_idle_limit = 0
# 进程运行时所在的路径
fiber_queue_dir = {install_path}/var
# 读写超时时间, 单位为秒
fiber_rw_timeout = 120
# 读缓冲区的缓冲区大小
fiber_buf_size = 8192
# 进程运行时的用户身份
fiber_owner = root
# 当启用 master_dispatch 连接分开服务后该配置指定 master_dispatch 所监听的
# 域套接口的全路径这样本子进程就可以从 master_dispatch 获得客户端连接
# fiber_dispatch_addr = {install_path}/var/private/dispatch.sock
# fiber_dispatch_addr 开启后下面参数控制本服务进程发给前端 master_dispatch 的服务标识信息
# fiber_dispatch_type = default
# 线程的堆栈空间大小单位为字节
fiber_stack_size = 64000
# 允许访问 udserver 的客户端IP地址范围
# fiber_access_allow = 127.0.0.1:255.255.255.255, 127.0.0.1:127.0.0.1
fiber_access_allow = all
# acl_master 退出时如果该值置1则该程序不等所有连接处理完毕便立即退出
fiber_quick_abort = 1
############################################################################
# 应用自己的配置选项
# mysql 服务地址
# mysql_dbaddr = /tmp/mysql.sock
# mysql_dbaddr = 10.0.250.199:3306
# 连接 mysql 数据库的连接池的最大值
# mysql_dbmax = 200
# ping mysql 连接的间隔时间, 以秒为单位
# mysql_dbping = 10
# mysql 连接空闲的时间间隔, 以秒为单位
# mysql_dbtimeout = 30
# 数据库名称
# mysql_dbname = fiber_db
# 数据库访问用户
# mysql_dbuser = fiber_user
# 数据库用户访问密码
# mysql_dbpass = 111111
# 是否输出当前内存的状态信息
# debug_mem = 1
# 是否在一个线程中连接读
# loop_read = 1
}

View File

@ -0,0 +1,358 @@
#include "stdafx.h"
#include "http_servlet.h"
static std::vector<chat_client*> clients_;
http_servlet::http_servlet(acl::socket_stream* stream, acl::session* session)
: acl::HttpServlet(stream, session)
{
}
http_servlet::~http_servlet(void)
{
}
bool http_servlet::doError(acl::HttpServletRequest&,
acl::HttpServletResponse& res)
{
res.setStatus(400);
res.setContentType("text/html; charset=");
// 发送 http 响应头
if (res.sendHeader() == false)
return false;
// 发送 http 响应体
acl::string buf;
buf.format("<root error='some error happened!' />\r\n");
(void) res.getOutputStream().write(buf);
return false;
}
bool http_servlet::doOther(acl::HttpServletRequest&,
acl::HttpServletResponse& res, const char* method)
{
res.setStatus(400);
res.setContentType("text/html; charset=");
// 发送 http 响应头
if (res.sendHeader() == false)
return false;
// 发送 http 响应体
acl::string buf;
buf.format("<root error='unkown request method %s' />\r\n", method);
(void) res.getOutputStream().write(buf);
return false;
}
bool http_servlet::doGet(acl::HttpServletRequest& req,
acl::HttpServletResponse& res)
{
return doPost(req, res);
}
bool http_servlet::doPost(acl::HttpServletRequest& req,
acl::HttpServletResponse& res)
{
// 如果需要 http session 控制,请打开下面注释,且需要保证
// 在 master_service.cpp 的函数 thread_on_read 中设置的
// memcached 服务正常工作
/*
const char* sid = req.getSession().getAttribute("sid");
if (*sid == 0)
req.getSession().setAttribute("sid", "xxxxxx");
sid = req.getSession().getAttribute("sid");
*/
// 如果需要取得浏览器 cookie 请打开下面注释
/*
*/
res.setContentType("text/xml; charset=utf-8") // 设置响应字符集
.setKeepAlive(req.isKeepAlive()) // 设置是否保持长连接
.setContentEncoding(true) // 自动支持压缩传输
.setChunkedTransferEncoding(true); // 采用 chunk 传输方式
const char* param1 = req.getParameter("name1");
const char* param2 = req.getParameter("name2");
// 创建 xml 格式的数据体
acl::xml1 body;
body.get_root()
.add_child("root", true)
.add_child("params", true)
.add_child("param", true)
.add_attr("name1", param1 ? param1 : "null")
.get_parent()
.add_child("param", true)
.add_attr("name2", param2 ? param2 : "null");
acl::string buf;
body.build_xml(buf);
// 发送 http 响应体,因为设置了 chunk 传输模式,所以需要多调用一次
// res.write 且两个参数均为 0 以表示 chunk 传输数据结束
return res.write(buf) && res.write(NULL, 0);
}
bool http_servlet::doWebsocket(acl::HttpServletRequest& req,
acl::HttpServletResponse&)
{
acl::socket_stream& ss = req.getSocketStream();
acl::websocket in(ss), out(ss);
while (true)
{
if (in.read_frame_head() == false)
{
printf("read_frame_head error\r\n");
return false;
}
bool ret;
unsigned char opcode = in.get_frame_opcode();
printf("opcode: 0x%x\r\n", opcode);
switch (opcode)
{
case acl::FRAME_PING:
ret = doPing(in, out);
break;
case acl::FRAME_PONG:
ret = doPong(in, out);
break;
case acl::FRAME_CLOSE:
ret = doClose(in, out);
break;
case acl::FRAME_TEXT:
case acl::FRAME_BINARY:
ret = doMsg(in, out);
break;
case acl::FRAME_CONTINUATION:
ret = false;
break;
default:
ret = false;
break;
}
if (ret == false)
return false;
}
// XXX: NOT REACHED
return false;
}
bool http_servlet::doPing(acl::websocket& in, acl::websocket& out)
{
unsigned long long len = in.get_frame_payload_len();
if (len == 0)
{
if (out.send_frame_pong(NULL, 0) == false)
{
remove(out.get_stream());
return false;
}
else
return true;
}
out.reset().set_frame_fin(true)
.set_frame_opcode(acl::FRAME_PONG)
.set_frame_payload_len(len);
char buf[8192];
while (true)
{
int ret = in.read_frame_data(buf, sizeof(buf) - 1);
if (ret == 0)
break;
if (ret < 0)
{
printf("read_frame_data error\r\n");
remove(out.get_stream());
return false;
}
buf[ret] = 0;
printf("read: [%s]\r\n", buf);
if (out.send_frame_data(buf, ret) == false)
{
printf("send_frame_data error\r\n");
remove(out.get_stream());
return false;
}
}
return true;
}
bool http_servlet::doPong(acl::websocket& in, acl::websocket& out)
{
unsigned long long len = in.get_frame_payload_len();
if (len == 0)
return true;
char buf[8192];
while (true)
{
int ret = in.read_frame_data(buf, sizeof(buf) - 1);
if (ret == 0)
break;
if (ret < 0)
{
printf("read_frame_data error\r\n");
remove(out.get_stream());
return false;
}
buf[ret] = 0;
printf("read: [%s]\r\n", buf);
}
return true;
}
bool http_servlet::doClose(acl::websocket&, acl::websocket& out)
{
acl::socket_stream& conn = out.get_stream();
remove(conn);
return false;
}
bool http_servlet::doMsg(acl::websocket& in, acl::websocket& out)
{
acl::string tbuf((size_t) in.get_frame_payload_len() + 1);
acl::socket_stream& conn = out.get_stream();
char buf[8192];
while (true)
{
int ret = in.read_frame_data(buf, sizeof(buf) - 1);
if (ret == 0)
break;
if (ret < 0)
{
printf("read_frame_data error\r\n");
remove(conn);
return false;
}
tbuf.append(buf, ret);
}
std::vector<acl::string>& tokens = tbuf.split2("|");
const acl::string& cmd = tokens[0];
if (cmd.equal("login", false))
{
if (doLogin(conn, tokens) == false)
{
remove(conn);
return false;
}
else
return true;
}
else if (cmd.equal("chat", false))
{
if (doChat(conn, tokens) == false)
{
remove(conn);
return false;
}
else
return true;
}
else
{
printf("unknown msg: %s\r\n", tbuf.c_str());
remove(conn);
return false;
}
return true;
}
bool http_servlet::doLogin(acl::socket_stream& conn,
const std::vector<acl::string>& tokens)
{
// format: login|user
if (tokens.size() != 2)
{
printf("format: login|user\r\n");
return false;
}
const acl::string& user = tokens[1];
chat_client* client = new chat_client(user, conn);
clients_.push_back(client);
printf("status: ok, cmd: login, user: %s\r\n", user.c_str());
return true;
}
bool http_servlet::doChat(acl::socket_stream&,
const std::vector<acl::string>& tokens)
{
// format: chat|msg|to_user
if (tokens.size() != 3)
{
printf("format: chat|msg|to_user\r\n");
return false;
}
const acl::string& msg = tokens[1];
const acl::string& to_user = tokens[2];
chat_client* to_client = find(to_user);
if (to_client == NULL)
{
printf("no exist to_user: %s\r\n", to_user.c_str());
return true;
}
acl::websocket out((to_client->get_conn()));
out.set_frame_fin(true)
.set_frame_opcode(acl::FRAME_TEXT)
.set_frame_payload_len(msg.size());
(void) out.send_frame_data(msg.c_str(), msg.size());
printf("status: ok, cmd: chat, msg: %s, to_user: %s\r\n",
msg.c_str(), to_user.c_str());
return true;
}
chat_client* http_servlet::find(const char* user)
{
for (std::vector<chat_client*>::iterator it = clients_.begin();
it != clients_.end(); ++it)
{
printf("user: %s, %s\r\n", user, (*it)->get_name().c_str());
if (*(*it) == user)
return *it;
}
return NULL;
}
void http_servlet::remove(acl::socket_stream& conn)
{
for (std::vector<chat_client*>::iterator it = clients_.begin();
it != clients_.end(); ++it)
{
acl::socket_stream* ss = &(*it)->get_conn();
if (ss == &conn)
{
clients_.erase(it);
return;
}
}
printf("not found connection: %p\r\n", &conn);
}

View File

@ -0,0 +1,67 @@
#pragma once
#include <vector>
class chat_client
{
public:
chat_client(const char* name, acl::socket_stream& conn)
: name_(name), conn_(conn) {}
~chat_client(void);
const acl::string& get_name(void) const
{
return name_;
}
acl::socket_stream& get_conn(void) const
{
return conn_;
}
bool operator == (const char* name) const
{
return name_.equal(name, false);
}
private:
acl::string name_;
acl::socket_stream& conn_;
};
class http_servlet : public acl::HttpServlet
{
public:
http_servlet(acl::socket_stream* stream, acl::session* session);
~http_servlet();
protected:
// @override
bool doError(acl::HttpServletRequest&, acl::HttpServletResponse&);
// @override
bool doOther(acl::HttpServletRequest&,
acl::HttpServletResponse&, const char* method);
// @override
bool doGet(acl::HttpServletRequest&, acl::HttpServletResponse&);
// @override
bool doPost(acl::HttpServletRequest&, acl::HttpServletResponse&);
// @override
bool doWebsocket(acl::HttpServletRequest&, acl::HttpServletResponse&);
private:
bool doPing(acl::websocket&, acl::websocket&);
bool doPong(acl::websocket&, acl::websocket&);
bool doClose(acl::websocket&, acl::websocket&);
bool doMsg(acl::websocket&, acl::websocket&);
private:
bool doLogin(acl::socket_stream&, const std::vector<acl::string>&);
bool doChat(acl::socket_stream&, const std::vector<acl::string>& tokens);
chat_client* find(const char* user);
void remove(acl::socket_stream& conn);
};

View File

@ -0,0 +1,33 @@
#include "stdafx.h"
#include "master_service.h"
int main(int argc, char *argv[])
{
acl::acl_cpp_init();
master_service& ms = acl::singleton2<master_service>::get_instance();
// 设置配置参数表
ms.set_cfg_int(var_conf_int_tab);
ms.set_cfg_int64(var_conf_int64_tab);
ms.set_cfg_str(var_conf_str_tab);
ms.set_cfg_bool(var_conf_bool_tab);
if (argc >= 2 && strcasecmp(argv[1], "help") == 0)
printf("usage: %s alone configure addr\r\n", argv[0]);
else if (argc >= 2 && strcasecmp(argv[1], "alone") == 0)
{
const char* addr = ":9001";
acl::log::stdout_open(true);
if (argc >= 4)
addr = argv[3];
printf("listen: %s\r\n", addr);
ms.run_alone(addr, argc >= 3 ? argv[2] : NULL, 0);
}
else
ms.run_daemon(argc, argv);
return 0;
}

View File

@ -0,0 +1,74 @@
#include "stdafx.h"
#include "http_servlet.h"
#include "master_service.h"
static char *var_cfg_str;
acl::master_str_tbl var_conf_str_tab[] = {
{ "str", "test_msg", &var_cfg_str },
{ 0, 0, 0 }
};
static int var_cfg_debug_enable;
acl::master_bool_tbl var_conf_bool_tab[] = {
{ "debug_enable", 1, &var_cfg_debug_enable },
{ 0, 0, 0 }
};
static int var_cfg_io_timeout;
acl::master_int_tbl var_conf_int_tab[] = {
{ "io_timeout", 120, &var_cfg_io_timeout, 0, 0 },
{ 0, 0 , 0 , 0, 0 }
};
acl::master_int64_tbl var_conf_int64_tab[] = {
{ 0, 0 , 0 , 0, 0 }
};
//////////////////////////////////////////////////////////////////////////
master_service::master_service(void)
{
}
master_service::~master_service(void)
{
}
void master_service::on_accept(acl::socket_stream& conn)
{
logger("connect from %s, fd %d", conn.get_peer(), conn.sock_handle());
conn.set_rw_timeout(120);
acl::memcache_session session("127.0.0.1:11211");
http_servlet servlet(&conn, &session);
// charset: big5, gb2312, gb18030, gbk, utf-8
servlet.setLocalCharset("utf-8");
while(servlet.doRun()) {}
logger("disconnect from %s", conn.get_peer());
}
void master_service::proc_pre_jail(void)
{
logger(">>>proc_pre_jail<<<");
}
void master_service::proc_on_init(void)
{
logger(">>>proc_on_init<<<");
}
void master_service::proc_on_exit(void)
{
logger(">>>proc_on_exit<<<");
}

View File

@ -0,0 +1,30 @@
#pragma once
extern acl::master_str_tbl var_conf_str_tab[];
extern acl::master_bool_tbl var_conf_bool_tab[];
extern acl::master_int_tbl var_conf_int_tab[];
extern acl::master_int64_tbl var_conf_int64_tab[];
//////////////////////////////////////////////////////////////////////////
class master_service : public acl::master_fiber
{
public:
master_service(void);
~master_service(void);
protected:
// @override
void on_accept(acl::socket_stream& conn);
// @override
void proc_pre_jail(void);
// @override
void proc_on_init(void);
// @override
void proc_on_exit(void);
private:
};

View File

@ -0,0 +1,8 @@
// stdafx.cpp : 只包括标准包含文件的源文件
// master_threads.pch 将成为预编译头
// stdafx.obj 将包含预编译类型信息
#include "stdafx.h"
// TODO: 在 STDAFX.H 中
//引用任何所需的附加头文件,而不是在此文件中引用

View File

@ -0,0 +1,21 @@
// stdafx.h : 标准系统包含文件的包含文件,
// 或是常用但不常更改的项目特定的包含文件
//
#pragma once
//#include <iostream>
//#include <tchar.h>
// TODO: 在此处引用程序要求的附加头文件
#include "lib_acl.h"
#include "acl_cpp/lib_acl.hpp"
#include "fiber/lib_fiber.h"
#include "fiber/lib_fiber.hpp"
#ifdef WIN32
#define snprintf _snprintf
#endif

View File

@ -0,0 +1,73 @@
<html>
<meta charset="utf-8" />
<title>WebSocket Test</title>
<script language="javascript" type="text/javascript">
var wsUri = "ws://127.0.0.1:9001/";
var output;
var websocket = null;
function init() {
output = document.getElementById("output");
testWebSocket();
}
function testWebSocket() {
if (websocket != null)
websocket.close();
websocket = new WebSocket(wsUri);
websocket.onopen = function (evt) { onOpen(evt) };
websocket.onclose = function (evt) { onClose(evt) };
websocket.onmessage = function (evt) { onMessage(evt) };
websocket.onerror = function (evt) { onError(evt) };
} function onOpen(evt) {
writeToScreen("CONNECTED");
}
function onClose(evt) {
writeToScreen("DISCONNECTED");
}
function onMessage(evt) {
document.getElementById('result').value = evt.data;
}
function onError(evt) {
writeToScreen('<span style="color: red;">ERROR:</span> ' + evt.data);
}
function doLogin(user) {
message = "login|" + user; websocket.send(message);
}
function doSend(to_user, msg) {
message = "chat|" + msg + "|" + to_user; websocket.send(message);
}
function writeToScreen(message) {
var pre = document.createElement("p");
pre.style.wordWrap = "break-word";
pre.innerHTML = message; output.insertBefore(pre);
}
function OnConnect() {
wsUri = document.getElementById("url").value;
init();
}
</script>
<body>
<fieldset>
<legend>WebSocket</legend>
<p><span>Server url:</span>
<input id="url" type="text" value="ws://127.0.0.1:9001/" />
<input type="button" value="Connected" onclick="OnConnect()" />
</p>
<p>
me: <input id="user" type="text" value="user1" />
<input type="button" value="login" onclick="doLogin(document.getElementById('user').value)" />
</p>
<p>
send to: <input id="to_user" type="text" value="user2" /> &nbsp;&nbsp;
msg: <input id="msg" type="text" value="test" />
<input type="button" value="send" onclick="doSend(document.getElementById('to_user').value, document.getElementById('msg').value)" />
</p>
<p><span>recv:</span><input id="result" type="text" value="" /></p>
</fieldset>
<div id="output"> </div>
</body>
</html>

View File

@ -0,0 +1,3 @@
#!/bin/sh
valgrind --tool=memcheck --leak-check=yes -v ./fiber_chat alone fiber_chat.cf

View File

@ -31,6 +31,7 @@ CFLAGS = -c -g -W \
-DACL_PREPARE_COMPILE \
-Winvalid-pch
#-DUSE_EPOLL \
#-Wno-tautological-compare \
#-Wno-invalid-source-encoding \
#-Wno-extended-offsetof
#-Wcast-align

View File

@ -164,15 +164,15 @@ ACL_API int acl_strrncmp(const char *s1, const char *s2, size_t n);
* @param needle {const char *}
* @return {char *} != NULL: Ok, NULL:
*/
ACL_API char *acl_rstrstr(char *haystack, const char *needle);
ACL_API char *acl_rstrstr(const char *haystack, const char *needle);
/**
*
* @param haystack {char *} Ô´×Ö·û´®
* @param haystack {const char *} Ô´×Ö·û´®
* @param needle {const char *}
* @return {char *} != NULL: Ok, NULL:
*/
ACL_API char *acl_strcasestr(char *haystack, const char *needle);
ACL_API char *acl_strcasestr(const char *haystack, const char *needle);
/**
*
@ -180,7 +180,7 @@ ACL_API char *acl_strcasestr(char *haystack, const char *needle);
* @param needle {const char *}
* @return {char *} != NULL: Ok, NULL:
*/
ACL_API char *acl_rstrcasestr(char *haystack, const char *needle);
ACL_API char *acl_rstrcasestr(const char *haystack, const char *needle);
/**
* strlen

View File

@ -175,7 +175,7 @@ static void slice3_mbuf_alloc(ACL_SLICE *slice)
#endif
SLICE3 *slice3 = (SLICE3*) slice;
MBUF3 *mbuf;
int i, incr_real = 0;
int i, incr_real = 0, n;
char *ptr;
mbuf = (MBUF3*) acl_default_malloc(__FILE__, __LINE__,
@ -188,7 +188,8 @@ static void slice3_mbuf_alloc(ACL_SLICE *slice)
slice->nalloc++;
mbuf->mslots.slots = NULL;
MBUF_SLOTS_SPACE(slice, &mbuf->mslots, slice->page_nslots, incr_real);
n = slice->page_nslots;
MBUF_SLOTS_SPACE(slice, &mbuf->mslots, n, incr_real);
acl_assert(mbuf->mslots.islots == 0);
for (i = 0; i < slice->page_nslots; i++) {
@ -448,7 +449,7 @@ static void slice2_mbuf_alloc(ACL_SLICE *slice)
#endif
SLICE2 *slice2 = (SLICE2*) slice;
MBUF2 *mbuf;
int i, incr_real = 0;
int i, incr_real = 0, n;
char *ptr;
mbuf = (MBUF2*) acl_default_malloc(__FILE__, __LINE__, slice->page_size);
@ -459,7 +460,8 @@ static void slice2_mbuf_alloc(ACL_SLICE *slice)
ptr = mbuf->payload;
slice->nalloc++;
MBUF_SLOTS_SPACE(slice, &slice2->mslots, slice->page_nslots, incr_real);
n = slice->page_nslots;
MBUF_SLOTS_SPACE(slice, &slice2->mslots, n, incr_real);
for (i = 0; i < slice->page_nslots; i++) {
ptr += SLICE2_HEAD_SIZE;
@ -807,7 +809,7 @@ static void slice1_mbuf_alloc(ACL_SLICE *slice)
SLICE1 *slice1 = (SLICE1*) slice;
MBUF1 *mbuf = (MBUF1*) acl_default_malloc(__FILE__, __LINE__,
sizeof(MBUF1));
int i, incr_real = 0;
int i, incr_real = 0, n;
char *ptr;
mbuf->buf = (void*) acl_default_malloc(__FILE__, __LINE__,
@ -816,7 +818,8 @@ static void slice1_mbuf_alloc(ACL_SLICE *slice)
ptr = (char*) mbuf->buf;
slice->nalloc++;
MBUF_SLOTS_SPACE(slice, &slice1->mslots, slice->page_nslots, incr_real);
n = slice->page_nslots;
MBUF_SLOTS_SPACE(slice, &slice1->mslots, n, incr_real);
for (i = 0; i < slice->page_nslots; i++) {
slice1->mslots.slots[slice1->mslots.islots++] = ptr;

View File

@ -15,7 +15,7 @@
#include "../charmap.h"
char *acl_rstrstr(char *haystack, const char *needle)
char *acl_rstrstr(const char *haystack, const char *needle)
{
unsigned char *cp, *haystack_end;
const unsigned char *np = 0, *needle_end;
@ -44,7 +44,7 @@ char *acl_rstrstr(char *haystack, const char *needle)
return (NULL);
}
char *acl_rstrcasestr(char *haystack, const char *needle)
char *acl_rstrcasestr(const char *haystack, const char *needle)
{
const unsigned char *cm = maptolower;
unsigned char *cp, *haystack_end;
@ -74,7 +74,7 @@ char *acl_rstrcasestr(char *haystack, const char *needle)
return (NULL);
}
char *acl_strcasestr(char *haystack, const char *needle)
char *acl_strcasestr(const char *haystack, const char *needle)
{
const unsigned char *cm = maptolower;
const unsigned char *np = 0;

View File

@ -1,6 +1,9 @@
修改历史列表:
-----------------------------------------------------------------------
437) 2016.9.17 -- 9.19
437.1) feature: 增加了 websocket 类
436) 2016.9.7
436.1) bugfix: istream.cpp 中的函数 gets 在调用 acl_vstream_gets 或
acl_vstream_gets_nonl 后判断标志位 ACL_VSTREAM_FLAG_TAGYES 的方法有误

View File

@ -123,7 +123,7 @@ public:
bool doRun(const char* memcached_addr, socket_stream* stream);
/**
* HTTP GET
* HTTP GET
*/
virtual bool doGet(HttpServletRequest&, HttpServletResponse&)
{
@ -132,7 +132,16 @@ public:
}
/**
* HTTP POST
* HTTP websocket
*/
virtual bool doWebsocket(HttpServletRequest&, HttpServletResponse&)
{
logger_error("child not implement doWebsocket yet!");
return false;
}
/**
* HTTP POST
*/
virtual bool doPost(HttpServletRequest&, HttpServletResponse&)
{
@ -141,7 +150,7 @@ public:
}
/**
* HTTP PUT
* HTTP PUT
*/
virtual bool doPut(HttpServletRequest&, HttpServletResponse&)
{
@ -150,7 +159,7 @@ public:
}
/**
* HTTP CONNECT
* HTTP CONNECT
*/
virtual bool doConnect(HttpServletRequest&, HttpServletResponse&)
{
@ -159,7 +168,7 @@ public:
}
/**
* HTTP PURGE SQUID
* HTTP PURGE SQUID
*
*/
virtual bool doPurge(HttpServletRequest&, HttpServletResponse&)
@ -169,7 +178,7 @@ public:
}
/**
* HTTP DELETE
* HTTP DELETE
*/
virtual bool doDelete(HttpServletRequest&, HttpServletResponse&)
{
@ -178,7 +187,7 @@ public:
}
/**
* HTTP HEAD
* HTTP HEAD
*/
virtual bool doHead(HttpServletRequest&, HttpServletResponse&)
{
@ -187,7 +196,7 @@ public:
}
/**
* HTTP OPTION
* HTTP OPTION
*/
virtual bool doOptions(HttpServletRequest&, HttpServletResponse&)
{
@ -196,7 +205,7 @@ public:
}
/**
* HTTP PROPFIND
* HTTP PROPFIND
*/
virtual bool doPropfind(HttpServletRequest&, HttpServletResponse&)
{
@ -205,7 +214,7 @@ public:
}
/**
* HTTP
* HTTP
* @param method {const char*}
*/
virtual bool doOther(HttpServletRequest&, HttpServletResponse&,
@ -217,7 +226,7 @@ public:
}
/**
* HTTP
* HTTP
*/
virtual bool doUnknown(HttpServletRequest&, HttpServletResponse&)
{
@ -226,7 +235,7 @@ public:
}
/**
* HTTP
* HTTP
*/
virtual bool doError(HttpServletRequest&, HttpServletResponse&)
{

View File

@ -128,6 +128,12 @@ public:
*/
istream& getInputStream(void) const;
/**
* HTTP
* @return {socket_stream&}
*/
socket_stream& getSocketStream(void) const;
/**
* HTTP
* @return {acl_int64} -1 GET

View File

@ -228,10 +228,10 @@ public:
ostream& getOutputStream(void) const;
/**
* http HttpServlet
* @param request {HttpServletRequest*}
* HTTP
* @return {socket_stream&}
*/
void setHttpServletRequest(HttpServletRequest* request);
socket_stream& getSocketStream(void) const;
/**
* http_client
@ -242,6 +242,12 @@ public:
return client_;
}
/**
* http HttpServlet
* @param request {HttpServletRequest*}
*/
void setHttpServletRequest(HttpServletRequest* request);
private:
dbuf_guard* dbuf_internal_;
dbuf_guard* dbuf_;

View File

@ -55,9 +55,9 @@ public:
*/
void reset(void);
//////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////
// HTTP 请求与 HTTP 响应通用的方法函数
//////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////
/**
* HTTP
@ -159,6 +159,12 @@ public:
return keep_alive_;
}
http_header& set_upgrade(const char* value = "websocket");
const char* get_upgrade(void) const
{
return upgrade_;
}
/**
* HTTP cookie
* @param name {const char*} cookie
@ -191,9 +197,9 @@ public:
*/
bool is_request(void) const;
/////////////////////////////////////////////////////////////////////
// HTTP 请求方法函数
/////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////
// HTTP 请求方法函数
//////////////////////////////////////////////////////////////////////
/**
* HTTP
@ -292,6 +298,37 @@ public:
http_header& add_int(const char* name, unsigned long long int value);
#endif
http_header& set_ws_origin(const char* url);
http_header& set_ws_key(const char* key);
http_header& set_ws_protocol(const char* proto);
http_header& set_ws_version(int ver);
const char* get_ws_origin(void) const
{
return ws_origin_;
}
const char* get_ws_key(void) const
{
return ws_sec_key_;
}
const char* get_ws_protocol(void) const
{
return ws_sec_proto_;
}
int get_ws_version(void) const
{
return ws_sec_ver_;
}
http_header& set_ws_accept(const char* key);
const char* get_ws_accept(void) const
{
return ws_sec_accept_;
}
/**
* url
* @param url {const char*} URL
@ -320,9 +357,9 @@ public:
*/
virtual void redicrect_reset(void) {}
/////////////////////////////////////////////////////////////////////
// HTTP 响应方法函数
/////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////
// HTTP 响应方法函数
//////////////////////////////////////////////////////////////////////
/**
* HTTP
@ -417,9 +454,20 @@ private:
#endif
bool chunked_transfer_; // 是否为 chunked 传输模式
bool transfer_gzip_; // 数据是否采用 gzip 压缩
char* upgrade_;
// just for websocket
char* ws_origin_;
char* ws_sec_key_;
char* ws_sec_proto_;
int ws_sec_ver_;
char* ws_sec_accept_;
void init(void); // 初始化
void clear(void);
void build_common(string& buf) const; // 构建通用头
void append_accept_key(const char* sec_key, string& out) const;
};
} // namespace acl end

View File

@ -0,0 +1,142 @@
/**
* Copyright (C) 2015-2018
* All rights reserved.
*
* AUTHOR(S)
* niukey@qq.com
* shuxin.zheng@qq.com
*
* VERSION
* Sun 18 Sep 2016 05:15:52 PM CST
*/
#pragma once
#include "acl_cpp/acl_cpp_define.hpp"
#include <string>
namespace acl
{
class socket_stream;
enum
{
FRAME_CONTINUATION = 0x00,
FRAME_TEXT = 0x01,
FRAME_BINARY = 0x02,
FRAME_RSV3 = 0x03,
FRAME_RSV4 = 0x04,
FRAME_RSV5 = 0x05,
FRAME_RSV6 = 0x06,
FRAME_RSV7 = 0x07,
FRAME_CLOSE = 0x08,
FRAME_PING = 0x09,
FRAME_PONG = 0x0A,
FRAME_CTL_RSVB = 0x0B,
FRAME_CTL_RSVC = 0x0C,
FRAME_CTL_RSVD = 0x0D,
FRAME_CTL_RSVE = 0x0E,
FRAME_CTL_RSVF = 0x0F,
};
struct frame_header
{
bool fin;
bool rsv1;
bool rsv2;
bool rsv3;
unsigned char opcode:4;
bool mask;
unsigned long long payload_len;
unsigned int masking_key;
};
class websocket
{
public:
websocket(socket_stream& client);
~websocket(void);
websocket& reset(void);
socket_stream& get_stream(void) const
{
return client_;
}
bool read_frame_head(void);
int read_frame_data(char* buf, size_t size);
const frame_header& get_frame_header(void) const
{
return header_;
}
websocket& set_frame_fin(bool yes);
websocket& set_frame_rsv1(bool yes);
websocket& set_frame_rsv2(bool yes);
websocket& set_frame_rsv3(bool yes);
websocket& set_frame_opcode(unsigned char type);
websocket& set_frame_payload_len(unsigned long long len);
websocket& set_frame_masking_key(unsigned int mask);
bool send_frame_data(char* data, size_t len);
bool send_frame_pong(char* data, size_t len);
bool send_frame_ping(char* data, size_t len);
bool get_frame_fin(void) const
{
return header_.fin;
}
bool get_frame_rsv1(void) const
{
return header_.rsv1;
}
bool get_frame_rsv2(void) const
{
return header_.rsv2;
}
bool get_frame_rsv3(void) const
{
return header_.rsv3;
}
unsigned char get_frame_opcode(void) const
{
return header_.opcode;
}
bool get_frame_mask(void) const
{
return header_.mask;
}
unsigned long long get_frame_payload_len(void) const
{
return header_.payload_len;
}
unsigned int get_frame_masking_key(void) const
{
return header_.masking_key;
}
unsigned long long get_frame_payload_nread(void) const
{
return payload_nread_;
}
private:
socket_stream& client_;
struct frame_header header_;
char* header_buf_;
size_t header_size_;
size_t header_len_;
unsigned long long payload_nread_;
bool header_sent_;
void make_frame_header(void);
};
} // namespace acl

View File

@ -85,6 +85,7 @@
#include "acl_cpp/http/http_utils.hpp"
#include "acl_cpp/http/http_request_pool.hpp"
#include "acl_cpp/http/http_request_manager.hpp"
#include "acl_cpp/http/websocket.hpp"
#include "acl_cpp/db/query.hpp"
#include "acl_cpp/db/mysql_conf.hpp"

View File

@ -306,6 +306,22 @@ public:
*/
int hlen(const char* key);
/**
* key
* Returns the string length of the value associated with field
* in the hash stored at key
* @param key {const char*} key
* the hash key
* @param name {const char*} key
* the field's name
* @return {int} key name 0 key
* -1
* If the key or the field do not exist, 0 is returned; If the key is
* not the hash key or error happened, -1 is returned.
*/
int hstrlen(const char* key, const char* name, size_t name_len);
int hstrlen(const char* key, const char *name);
/**
*
* scan the name and value of all fields in hash stored at key

View File

@ -669,6 +669,8 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y
<File
RelativePath=".\src\http\HttpSession.cpp">
</File>
<File
RelativePath=".\src\http\websocket.cpp">
</Filter>
<Filter
Name="hsocket"
@ -1240,6 +1242,9 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y
<File
RelativePath=".\include\acl_cpp\http\HttpSession.hpp">
</File>
<File
RelativePath=".\include\acl_cpp\http\websocket.hpp">
</File>
</Filter>
<Filter
Name="hsocket"

View File

@ -565,6 +565,10 @@
RelativePath=".\src\http\HttpSession.cpp"
>
</File>
<File
RelativePath=".\src\http\websocket.cpp"
>
</File>
</Filter>
<Filter
Name="ipc"
@ -1383,6 +1387,10 @@
RelativePath=".\include\acl_cpp\http\HttpSession.hpp"
>
</File>
<File
RelativePath=".\include\acl_cpp\http\websocket.hpp"
>
</File>
</Filter>
<Filter
Name="ipc"

View File

@ -250,6 +250,7 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y
<ClCompile Include="src\http\http_response.cpp" />
<ClCompile Include="src\http\http_service.cpp" />
<ClCompile Include="src\http\http_utils.cpp" />
<ClCompile Include="src\http\websocket.cpp" />
<ClCompile Include="src\ipc\ipc_client.cpp" />
<ClCompile Include="src\ipc\ipc_server.cpp" />
<ClCompile Include="src\ipc\ipc_service.cpp" />
@ -423,6 +424,7 @@ copy $(TargetName).pdb ..\dist\lib\win32\$(TargetName).pdb /Y
<ClInclude Include="include\acl_cpp\http\http_service.hpp" />
<ClInclude Include="include\acl_cpp\http\http_type.hpp" />
<ClInclude Include="include\acl_cpp\http\http_utils.hpp" />
<ClInclude Include="include\acl_cpp\http\websocket.hpp" />
<ClInclude Include="include\acl_cpp\ipc\ipc_client.hpp" />
<ClInclude Include="include\acl_cpp\ipc\ipc_server.hpp" />
<ClInclude Include="include\acl_cpp\ipc\ipc_service.hpp" />

View File

@ -283,6 +283,9 @@
<ClCompile Include="src\http\http_utils.cpp">
<Filter>src\http</Filter>
</ClCompile>
<ClCompile Include="src\http\websocket.cpp">
<Filter>src\http</Filter>
</ClCompile>
<ClCompile Include="src\stdlib\sha1.cpp">
<Filter>src\stdlib</Filter>
</ClCompile>
@ -627,6 +630,9 @@
<ClInclude Include="include\acl_cpp\http\http_utils.hpp">
<Filter>include\http</Filter>
</ClInclude>
<ClInclude Include="include\acl_cpp\http\websocket.hpp">
<Filter>include\http</Filter>
</ClInclude>
<ClInclude Include="include\acl_cpp\http\HttpServlet.hpp">
<Filter>include\http</Filter>
</ClInclude>

View File

@ -444,6 +444,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClCompile Include="src\http\http_response.cpp" />
<ClCompile Include="src\http\http_service.cpp" />
<ClCompile Include="src\http\http_utils.cpp" />
<ClCompile Include="src\http\websocket.cpp" />
<ClCompile Include="src\ipc\ipc_client.cpp" />
<ClCompile Include="src\ipc\ipc_server.cpp" />
<ClCompile Include="src\ipc\ipc_service.cpp" />
@ -617,6 +618,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClInclude Include="include\acl_cpp\http\http_service.hpp" />
<ClInclude Include="include\acl_cpp\http\http_type.hpp" />
<ClInclude Include="include\acl_cpp\http\http_utils.hpp" />
<ClInclude Include="include\acl_cpp\http\websocket.hpp" />
<ClInclude Include="include\acl_cpp\ipc\ipc_client.hpp" />
<ClInclude Include="include\acl_cpp\ipc\ipc_server.hpp" />
<ClInclude Include="include\acl_cpp\ipc\ipc_service.hpp" />

View File

@ -280,6 +280,9 @@
<ClCompile Include="src\http\http_utils.cpp">
<Filter>Source Files\http</Filter>
</ClCompile>
<ClCompile Include="src\http\websocket.cpp">
<Filter>Source Files\http</Filter>
</ClCompile>
<ClCompile Include="src\stdlib\sha1.cpp">
<Filter>Source Files\stdlib</Filter>
</ClCompile>
@ -624,6 +627,9 @@
<ClInclude Include="include\acl_cpp\http\http_utils.hpp">
<Filter>Header Files\http</Filter>
</ClInclude>
<ClInclude Include="include\acl_cpp\http\websocket.hpp">
<Filter>Header Files\http</Filter>
</ClInclude>
<ClInclude Include="include\acl_cpp\http\HttpServlet.hpp">
<Filter>Header Files\http</Filter>
</ClInclude>

View File

@ -444,6 +444,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClCompile Include="src\http\http_response.cpp" />
<ClCompile Include="src\http\http_service.cpp" />
<ClCompile Include="src\http\http_utils.cpp" />
<ClCompile Include="src\http\websocket.cpp" />
<ClCompile Include="src\ipc\ipc_client.cpp" />
<ClCompile Include="src\ipc\ipc_server.cpp" />
<ClCompile Include="src\ipc\ipc_service.cpp" />
@ -617,6 +618,7 @@ copy $(TargetName).pdb ..\dist\lib\win64\$(TargetName).pdb /Y
<ClInclude Include="include\acl_cpp\http\http_service.hpp" />
<ClInclude Include="include\acl_cpp\http\http_type.hpp" />
<ClInclude Include="include\acl_cpp\http\http_utils.hpp" />
<ClInclude Include="include\acl_cpp\http\websocket.hpp" />
<ClInclude Include="include\acl_cpp\ipc\ipc_client.hpp" />
<ClInclude Include="include\acl_cpp\ipc\ipc_server.hpp" />
<ClInclude Include="include\acl_cpp\ipc\ipc_service.hpp" />

View File

@ -280,6 +280,9 @@
<ClCompile Include="src\http\http_utils.cpp">
<Filter>Source Files\http</Filter>
</ClCompile>
<ClCompile Include="src\http\websocket.cpp">
<Filter>Source Files\http</Filter>
</ClCompile>
<ClCompile Include="src\stdlib\sha1.cpp">
<Filter>Source Files\stdlib</Filter>
</ClCompile>
@ -624,6 +627,9 @@
<ClInclude Include="include\acl_cpp\http\http_utils.hpp">
<Filter>Header Files\http</Filter>
</ClInclude>
<ClInclude Include="include\acl_cpp\http\websocket.hpp">
<Filter>Header Files\http</Filter>
</ClInclude>
<ClInclude Include="include\acl_cpp\http\HttpServlet.hpp">
<Filter>Header Files\http</Filter>
</ClInclude>

View File

@ -0,0 +1,14 @@
base_path = ../../..
PROG = websocket
include ../../Makefile.in
#Path for SunOS
ifeq ($(findstring SunOS, $(UNIXNAME)), SunOS)
EXTLIBS = -liconv
endif
ifeq ($(findstring FreeBSD, $(UNIXNAME)), FreeBSD)
EXTLIBS = -L/usr/local/lib -liconv
endif
ifeq ($(findstring Darwin, $(UNIXNAME)), Darwin)
EXTLIBS += -L/usr/lib -liconv
endif
EXTLIBS += -lz

View File

@ -0,0 +1,46 @@
<html>
<meta charset="utf-8" />
<title>WebSocket Test</title>
<script language="javascript" type="text/javascript">
var wsUri = "ws://127.0.0.1:9001/";
var output;
function init() {
output = document.getElementById("output");
testWebSocket();
}
function testWebSocket() {
websocket = new WebSocket(wsUri);
websocket.onopen = function (evt) { onOpen(evt) };
websocket.onclose = function (evt) { onClose(evt) };
websocket.onmessage = function (evt) { onMessage(evt) };
websocket.onerror = function (evt) { onError(evt) };
} function onOpen(evt) {
writeToScreen("CONNECTED");
}
function onClose(evt) { writeToScreen("DISCONNECTED"); }
function onMessage(evt) { document.getElementById('result').value = evt.data; }
function onError(evt) { writeToScreen('<span style="color: red;">ERROR:</span> ' + evt.data); }
function doSend(message) { websocket.send(message); }
function writeToScreen(message) {
var pre = document.createElement("p"); pre.style.wordWrap = "break-word"; pre.innerHTML = message; output.insertBefore(pre);
}
function OnConnect() {
wsUri = document.getElementById("url").value;
init();
}
</script>
<body>
<fieldset>
<legend>WebSocket</legend>
<p><span>Server url:</span><input id="url" type="text" value="ws://127.0.0.1:9001/"/><input type="button" value="Connected" onclick="OnConnect()" /></p>
<p><span>send:</span><input id="youname" type="text" value="test"/><input type="button" value="Submit" onclick="doSend(document.getElementById('youname').value)" /></p>
<p><span>recv:</span><input id="result" type="text" value="" /></p>
</fieldset>
<div id="output">
</div>
</body>
</html>

View File

@ -0,0 +1,229 @@
#include "stdafx.h"
#include "acl_cpp/http/websocket.hpp"
#include "http_servlet.h"
http_servlet::http_servlet(acl::redis_client_cluster& cluster, size_t max_conns)
{
// 创建 session 存储对象
session_ = new acl::redis_session(cluster, max_conns);
}
http_servlet::~http_servlet(void)
{
delete session_;
}
bool http_servlet::doUnknown(acl::HttpServletRequest&,
acl::HttpServletResponse& res)
{
res.setStatus(400);
res.setContentType("text/html; charset=");
// 发送 http 响应头
if (res.sendHeader() == false)
return false;
// 发送 http 响应体
acl::string buf("<root error='unkown request method' />\r\n");
(void) res.getOutputStream().write(buf);
return false;
}
bool http_servlet::doGet(acl::HttpServletRequest& req,
acl::HttpServletResponse& res)
{
printf("in doGet\r\n");
return doPost(req, res);
}
bool http_servlet::doPing(acl::websocket& in, acl::websocket& out)
{
unsigned long long len = in.get_frame_payload_len();
if (len == 0)
return out.send_frame_pong(NULL, 0);
out.reset().set_frame_fin(true)
.set_frame_opcode(acl::FRAME_PONG)
.set_frame_payload_len(len);
char buf[8192];
while (true)
{
int ret = in.read_frame_data(buf, sizeof(buf) - 1);
if (ret == 0)
break;
if (ret < 0)
{
printf("read_frame_data error\r\n");
return false;
}
buf[ret] = 0;
printf("read: [%s]\r\n", buf);
if (out.send_frame_data(buf, ret) == false)
{
printf("send_frame_data error\r\n");
return false;
}
}
return true;
}
bool http_servlet::doPong(acl::websocket& in, acl::websocket&)
{
unsigned long long len = in.get_frame_payload_len();
if (len == 0)
return true;
char buf[8192];
while (true)
{
int ret = in.read_frame_data(buf, sizeof(buf) - 1);
if (ret == 0)
break;
if (ret < 0)
{
printf("read_frame_data error\r\n");
return false;
}
buf[ret] = 0;
printf("read: [%s]\r\n", buf);
}
return true;
}
bool http_servlet::doClose(acl::websocket&, acl::websocket&)
{
return false;
}
bool http_servlet::doMsg(acl::websocket& in, acl::websocket& out)
{
unsigned long long len = in.get_frame_payload_len();
out.reset().set_frame_fin(true)
.set_frame_opcode(acl::FRAME_TEXT)
.set_frame_payload_len(len);
char buf[8192];
while (true)
{
int ret = in.read_frame_data(buf, sizeof(buf) - 1);
if (ret == 0)
break;
if (ret < 0)
{
printf("read_frame_data error\r\n");
return false;
}
buf[ret] = 0;
printf("read: [%s]\r\n", buf);
if (out.send_frame_data(buf, ret) == false)
{
printf("send_frame_data error\r\n");
return false;
}
}
sleep(1);
char info[256];
snprintf(info, sizeof(info), "hello world!");
out.reset().set_frame_fin(true)
.set_frame_opcode(acl::FRAME_TEXT)
.set_frame_payload_len(strlen(info));
if (out.send_frame_data(info, strlen(info)) == false)
{
printf("send_frame_data error\r\n");
return false;
}
sleep(1);
snprintf(info, sizeof(info), "hello zsx!");
out.reset().set_frame_fin(true)
.set_frame_opcode(acl::FRAME_TEXT)
.set_frame_payload_len(strlen(info));
if (out.send_frame_data(info, strlen(info)) == false)
{
printf("send_frame_data error\r\n");
return false;
}
sleep(1);
snprintf(info, sizeof(info), "GoodBye!");
out.reset().set_frame_fin(true)
.set_frame_opcode(acl::FRAME_TEXT)
.set_frame_payload_len(strlen(info));
if (out.send_frame_data(info, strlen(info)) == false)
{
printf("send_frame_data error\r\n");
return false;
}
return true;
}
bool http_servlet::doWebsocket(acl::HttpServletRequest& req,
acl::HttpServletResponse&)
{
acl::socket_stream& ss = req.getSocketStream();
acl::websocket in(ss), out(ss);
while (true)
{
if (in.read_frame_head() == false)
{
printf("read_frame_head error\r\n");
return false;
}
bool ret;
unsigned char opcode = in.get_frame_opcode();
printf("opcode: 0x%x\r\n", opcode);
switch (opcode)
{
case acl::FRAME_PING:
ret = doPing(in, out);
break;
case acl::FRAME_PONG:
ret = doPong(in, out);
break;
case acl::FRAME_CLOSE:
ret = doClose(in, out);
break;
case acl::FRAME_TEXT:
case acl::FRAME_BINARY:
ret = doMsg(in, out);
break;
case acl::FRAME_CONTINUATION:
ret = false;
break;
default:
ret = false;
break;
}
if (ret == false)
return false;
}
// XXX: NOT REACHED
return false;
}
bool http_servlet::doPost(acl::HttpServletRequest&,
acl::HttpServletResponse& res)
{
res.setContentType("text/xml; charset=utf-8") // 设置响应字符集
.setContentEncoding(true) // 设置是否压缩数据
.setChunkedTransferEncoding(false); // 采用 chunk 传输方式
printf("error\r\n");
acl::string buf("error");
// 发送 http 响应体,因为设置了 chunk 传输模式,所以需要多调用一次
// res.write 且两个参数均为 0 以表示 chunk 传输数据结束
return res.write(buf) && res.write(NULL, 0);
}

View File

@ -0,0 +1,34 @@
#pragma once
class http_servlet : public acl::HttpServlet
{
public:
http_servlet(acl::redis_client_cluster& cluster, size_t max_conns);
~http_servlet();
acl::session& get_session() const
{
return *session_;
}
protected:
// @override
bool doUnknown(acl::HttpServletRequest&, acl::HttpServletResponse&);
// @override
bool doGet(acl::HttpServletRequest&, acl::HttpServletResponse&);
// @override
bool doPost(acl::HttpServletRequest&, acl::HttpServletResponse&);
// @override
bool doWebsocket(acl::HttpServletRequest&, acl::HttpServletResponse&);
private:
acl::session* session_;
bool doPing(acl::websocket&, acl::websocket&);
bool doPong(acl::websocket&, acl::websocket&);
bool doClose(acl::websocket&, acl::websocket&);
bool doMsg(acl::websocket&, acl::websocket&);
};

View File

@ -0,0 +1,41 @@
#include "stdafx.h"
#include "master_service.h"
int main(int argc, char* argv[])
{
// 初始化 acl 库
acl::acl_cpp_init();
master_service& ms = acl::singleton2<master_service>::get_instance();
// 设置配置参数表
ms.set_cfg_int(var_conf_int_tab);
ms.set_cfg_int64(var_conf_int64_tab);
ms.set_cfg_str(var_conf_str_tab);
ms.set_cfg_bool(var_conf_bool_tab);
// 开始运行
if (argc >= 2 && strcmp(argv[1], "alone") == 0)
{
acl::log::stdout_open(true); // 日志输出至标准输出
const char* conf = NULL;
if (argc >= 3)
conf = argv[2];
const char* addr = "0.0.0.0:9001";
if (argc >= 4)
addr = argv[4];
printf("listen on: %s\r\n", addr);
ms.run_alone(addr, conf, 0, 1000); // 单独运行方式
printf("Enter any key to exit now\r\n");
getchar();
}
else
ms.run_daemon(argc, argv); // acl_master 控制模式运行
return 0;
}

View File

@ -0,0 +1,132 @@
#include "stdafx.h"
#include "http_servlet.h"
#include "master_service.h"
////////////////////////////////////////////////////////////////////////////////
// 配置内容项
char *var_cfg_redis_servers;
acl::master_str_tbl var_conf_str_tab[] = {
{ "redis_servers", "127.0.0.1:9000", &var_cfg_redis_servers },
{ 0, 0, 0 }
};
int var_cfg_keep_loop;
acl::master_bool_tbl var_conf_bool_tab[] = {
{ "keep_loop", 1, &var_cfg_keep_loop },
{ 0, 0, 0 }
};
int var_cfg_conn_timeout;
int var_cfg_rw_timeout;
int var_cfg_max_threads;
acl::master_int_tbl var_conf_int_tab[] = {
{ "conn_timeout", 10, &var_cfg_conn_timeout, 0, 0 },
{ "rw_timeout", 10, &var_cfg_rw_timeout, 0, 0 },
{ "ioctl_max_threads", 128, &var_cfg_max_threads, 0, 0 },
{ 0, 0 , 0 , 0, 0 }
};
long long int var_cfg_int64;
acl::master_int64_tbl var_conf_int64_tab[] = {
{ "int64", 120, &var_cfg_int64, 0, 0 },
{ 0, 0 , 0 , 0, 0 }
};
static acl::redis_client_cluster* session_server = NULL;
////////////////////////////////////////////////////////////////////////////////
master_service::master_service()
{
}
master_service::~master_service()
{
}
bool master_service::thread_on_read(acl::socket_stream* conn)
{
//logger("read from %s", conn->get_peer(true));
http_servlet* servlet = (http_servlet*) conn->get_ctx();
if (servlet == NULL)
logger_fatal("servlet null!");
acl::session& session = servlet->get_session();
while (true)
{
bool ret = servlet->doRun(session, conn);
if (ret == false)
return false;
if (!var_cfg_keep_loop)
return true;
}
}
bool master_service::thread_on_accept(acl::socket_stream* conn)
{
logger("connect from %s, fd: %d", conn->get_peer(true),
conn->sock_handle());
conn->set_rw_timeout(0);
conn->set_tcp_non_blocking(false);
// 使用 redis 集群来存储 session
http_servlet* servlet = new http_servlet(*session_server,
var_cfg_max_threads);
conn->set_ctx(servlet);
return true;
}
bool master_service::thread_on_timeout(acl::socket_stream* conn)
{
logger("read timeout from %s, fd: %d", conn->get_peer(),
conn->sock_handle());
return false;
}
void master_service::thread_on_close(acl::socket_stream* conn)
{
logger("disconnect from %s, fd: %d", conn->get_peer(true),
conn->sock_handle());
http_servlet* servlet = (http_servlet*) conn->get_ctx();
if (servlet)
delete servlet;
}
void master_service::thread_on_init()
{
}
void master_service::thread_on_exit()
{
}
void master_service::proc_on_init()
{
// 创建 redis 集群客户端对象,并使用 redis 集群来存储 session
session_server = new acl::redis_client_cluster;
session_server->init(NULL, var_cfg_redis_servers, var_cfg_max_threads);
}
bool master_service::proc_exit_timer(size_t nclients, size_t nthreads)
{
if (nclients == 0 || nthreads == 0)
{
logger("clients count: %d, threads count: %d",
(int) nclients, (int) nthreads);
return true;
}
return false;
}
void master_service::proc_on_exit()
{
delete session_server;
}

View File

@ -0,0 +1,88 @@
#pragma once
////////////////////////////////////////////////////////////////////////////////
// 配置内容项
extern acl::master_str_tbl var_conf_str_tab[];
extern acl::master_bool_tbl var_conf_bool_tab[];
extern acl::master_int_tbl var_conf_int_tab[];
extern acl::master_int64_tbl var_conf_int64_tab[];
////////////////////////////////////////////////////////////////////////////////
//class acl::socket_stream;
class master_service : public acl::master_threads
{
public:
master_service();
~master_service();
protected:
/**
*
* @param stream {socket_stream*}
* @return {bool} false
* false
*/
virtual bool thread_on_read(acl::socket_stream* stream);
/**
* 线线
*
* @param stream {socket_stream*}
* @return {bool} false
* thread_main
*/
virtual bool thread_on_accept(acl::socket_stream* stream);
/**
* IO true
*
* @param stream {socket_stream*}
* @return {bool} false
* thread_main
*/
virtual bool thread_on_timeout(acl::socket_stream* stream);
/**
* 线
* @param stream {socket_stream*}
*/
virtual void thread_on_close(acl::socket_stream* stream);
/**
* 线线
*/
virtual void thread_on_init();
/**
* 线线退
*/
virtual void thread_on_exit();
/**
*
*
*/
virtual void proc_on_init();
/**
* 退退
* 1) true 退
* 2) 退
* 3) (ioctl_quick_abort) 0
* 退
* 4) 退
* @param ncleints {size_t}
* @param nthreads {size_t} 线线
* @return {bool} false 退
* 退
*/
virtual bool proc_exit_timer(size_t nclients, size_t nthreads);
/**
* 退
*/
virtual void proc_on_exit();
};

View File

@ -0,0 +1,8 @@
// stdafx.cpp : 只包括标准包含文件的源文件
// master_threads.pch 将成为预编译头
// stdafx.obj 将包含预编译类型信息
#include "stdafx.h"
// TODO: 在 STDAFX.H 中
//引用任何所需的附加头文件,而不是在此文件中引用

View File

@ -0,0 +1,19 @@
// stdafx.h : 标准系统包含文件的包含文件,
// 或是常用但不常更改的项目特定的包含文件
//
#pragma once
//#include <iostream>
//#include <tchar.h>
// TODO: 在此处引用程序要求的附加头文件
#include "acl_cpp/lib_acl.hpp"
#include "lib_acl.h"
#ifdef WIN32
#define snprintf _snprintf
#endif

View File

@ -0,0 +1,114 @@
service http_server {
# 进程是否禁止运行
master_disable = no
# 服务地址及端口号
# for master_type = inet
# master_service = 127.0.0.1:5001
# for master_type = unix
# master_service = echo.sock
# for master_type = sock
master_service = 127.0.0.1:5001, 5002, :5003, echo.sock, echo2.sock
# 服务监听为域套接口
# master_service = aio_echo.sock
# 服务类型
# master_type = inet
# master_type = unix
master_type = sock
# 当子进程异常退出时如果该值非空则将子进程异常退出的消息通知该服务
# master_notify_addr = 127.0.0.1:5801
# 邮件通知接收者
# master_notify_recipients = zhengshuxin@hotmail.com
# 是否允许延迟接受客户端连接如果为0则表示关闭该功能如果大于0则表示打开此功能
# 并且此值代表延迟接受连接的超时值超过此值时如果客户端依然没有发来数据则操作
# 系统会在系统层直接关闭该连接
# master_defer_accept = 0
# 是否只允许私有访问, 如果为 y, 则域套接口创建在 {install_path}/var/log/private/ 目录下,
# 如果为 n, 则域套接口创建在 {install_path}/var/log/public/ 目录下,
master_private = n
master_unpriv = n
# 是否需要 chroot: n -- no, y -- yes
master_chroot = n
# 每隔多长时间触发一次单位为秒(仅对 trigger 模式有效)
master_wakeup = -
# 最大进程数
master_maxproc = 1
# 预启动进程数该值不得大于 master_maxproc
# master_prefork = 0
# 进程程序名
master_command = http_server
# 进程日志记录文件
master_log = {install_path}/var/log/http_server
# 调试日志方式格式tag:level; tag:level; tab:level, all:1; 101:2
# master_debug =
# 进程启动参数只能为: -u [是否允许以某普通用户的身份运行]
# master_args =
# 传递给服务子进程的环境变量, 可以通过 getenv("SERVICE_ENV") 获得此值
# master_env = mempool_limit:512000000
# master_env = logme:FALSE, priority:E_LOG_INFO, action:E_LOG_PER_DAY, flush:sync_flush, imit_size:512,\
# sync_action:E_LOG_SEM, sem_name:/tmp/ioctl_echo.sem
# 当启动多个子进程实例时该开关控制多个子进程在接收连接时是否向 acl_master 发送消息报告自己的状态
# master_status_notify = 1
# 是否允许产生 core 文件
# ioctl_enable_core = 1
# 每个进程实例处理连接数的最大次数超过此值后进程实例主动退出
ioctl_use_limit = 0
# 每个进程实例的空闲超时时间超过此值后进程实例主动退出
ioctl_idle_limit = 0
# 记录进程PID的位置(对于多进程实例来说没有意义)
ioctl_pid_dir = {install_path}/var/pid
# 进程运行时所在的路径
ioctl_queue_dir = {install_path}/var
# 读写超时时间, 单位为秒
ioctl_rw_timeout = 120
# 读缓冲区的缓冲区大小
ioctl_buf_size = 8192
# 每次 accept 时的循环接收的最大次数
ioctl_max_accept = 25
# 在并发访问量非常低的情况下如访问量在 10 / 以下时可以找开此值(即赋为1)
# 以加速事件循环过程, 从而防止服务进程阻塞在 select 上的时间过长而影响访问速度
# ioctl_enable_dog = 1
# 进程运行时的用户身份
ioctl_owner = root
# select 进行循环时的时间间隔
# 单位为秒
ioctl_delay_sec = 1
# 单位为微秒
ioctl_delay_usec = 500
# 采用事件循环的方式: select(default), poll, kernel(epoll/devpoll/kqueue)
ioctl_event_mode = kernel
# 事件引擎检查所有空闲描述符的时间间隔(毫秒)
# ioctl_check_inter = 100
# 当启用 master_dispatch 连接分开服务后该配置指定 master_dispatch 所监听的
# 域套接口的全路径这样本子进程就可以从 master_dispatch 获得客户端连接
# ioctl_dispatch_addr = {install_path}/var/private/dispatch.sock
# ioctl_dispatch_addr 开启后下面参数控制本服务进程发给前端 master_dispatch 的服务标识信息
# ioctl_dispatch_type = default
# 线程池的最大线程数
ioctl_max_threads = 250
# 线程池中工作线程等待任务时间间隔(毫秒)
# ioctl_schedule_wait = 50
# 线程任务调度的时间间隔大于此值(毫秒)后记警告日志
# ioctl_schedule_warn = 100
# 线程处理任务拥堵数超过此阀值后记警告日志设为 0 则内部只有当拥堵任务数超过总线程数的 10 倍时才报警
# ioctl_qlen_warn = 0
# 线程的堆栈空间大小单位为字节0表示使用系统缺省值
ioctl_stacksize = 0
# 允许访问 udserver 的客户端IP地址范围
ioctl_access_allow = all
# acl_master 退出时如果该值置1则该程序不等所有连接处理完毕便立即退出
ioctl_quick_abort = 1
############################################################################
# 应用自己的配置选项
}

View File

@ -101,6 +101,31 @@ HttpServlet& HttpServlet::setParseBodyLimit(int length)
return *this;
}
static bool upgradeWebsocket(HttpServletRequest& req, HttpServletResponse& res)
{
const char* ptr = req.getHeader("Connection");
if (ptr == NULL)
return false;
if (acl_strcasestr(ptr, "Upgrade") == NULL)
return false;
ptr = req.getHeader("Upgrade");
if (ptr == NULL)
return false;
if (strcasecmp(ptr, "websocket") != 0)
return false;
const char* key = req.getHeader("Sec-WebSocket-Key");
if (key == NULL || *key == 0)
{
logger_warn("no Sec-WebSocket-Key");
return false;
}
http_header& header = res.getHttpHeader();
header.set_upgrade("websocket");
header.set_ws_accept(key);
return true;
}
bool HttpServlet::start()
{
socket_stream* in;
@ -156,7 +181,16 @@ bool HttpServlet::start()
switch (method)
{
case HTTP_METHOD_GET:
ret = doGet(*req_, *res_);
if (upgradeWebsocket(*req_, *res_))
{
if (res_->sendHeader() == false)
{
logger_error("sendHeader error!");
return false;
}
ret = doWebsocket(*req_, *res_);
} else
ret = doGet(*req_, *res_);
break;
case HTTP_METHOD_POST:
ret = doPost(*req_, *res_);

View File

@ -464,6 +464,11 @@ istream& HttpServletRequest::getInputStream(void) const
return stream_;
}
socket_stream& HttpServletRequest::getSocketStream(void) const
{
return stream_;
}
void HttpServletRequest::parseParameters(const char* str)
{
const char* requestCharset = getCharacterEncoding();

View File

@ -230,6 +230,11 @@ ostream& HttpServletResponse::getOutputStream(void) const
return stream_;
}
socket_stream& HttpServletResponse::getSocketStream(void) const
{
return stream_;
}
void HttpServletResponse::setHttpServletRequest(HttpServletRequest* request)
{
request_ = request;

View File

@ -6,6 +6,7 @@
#include "acl_cpp/stdlib/util.hpp"
#include "acl_cpp/stdlib/string.hpp"
#include "acl_cpp/stdlib/url_coder.hpp"
#include "acl_cpp/stdlib/sha1.hpp"
#include "acl_cpp/http/HttpCookie.hpp"
#include "acl_cpp/http/http_header.hpp"
#endif
@ -87,6 +88,13 @@ void http_header::init()
content_length_ = -1;
chunked_transfer_ = false;
transfer_gzip_ = false;
upgrade_ = NULL;
ws_origin_ = NULL;
ws_sec_key_ = NULL;
ws_sec_proto_ = NULL;
ws_sec_ver_ = -1;
ws_sec_accept_ = NULL;
}
void http_header::clear()
@ -233,7 +241,10 @@ void http_header::build_common(string& buf) const
if (is_request_ == false && cgi_mode_)
return;
if (keep_alive_)
if (upgrade_ && *upgrade_)
buf << "Upgrade: " << upgrade_ << "\r\nConnection: Upgrade\r\n";
else if (keep_alive_)
buf << "Connection: " << "Keep-Alive\r\n";
else
buf << "Connection: " << "Close\r\n";
@ -540,6 +551,52 @@ http_header& http_header::add_format(const char* name, const char* fmt, ...)
return add_param(name, buf.c_str());
}
http_header& http_header::set_upgrade(const char* value /* = "websocket" */)
{
if (value && *value)
{
upgrade_ = dbuf_->dbuf_strdup(value);
status_ = 101; // automatic set status_ to 101
}
else
upgrade_ = NULL;
return *this;
}
http_header& http_header::set_ws_origin(const char* url)
{
if (url && *url)
ws_origin_ = dbuf_->dbuf_strdup(url);
return *this;
}
http_header& http_header::set_ws_key(const char* key)
{
if (key && *key)
ws_sec_key_ = dbuf_->dbuf_strdup(key);
return *this;
}
http_header& http_header::set_ws_protocol(const char* proto)
{
if (proto && *proto)
ws_sec_proto_ = dbuf_->dbuf_strdup(proto);
return *this;
}
http_header& http_header::set_ws_version(int ver)
{
ws_sec_ver_ = ver;
return *this;
}
http_header& http_header::set_ws_accept(const char* key)
{
if (key && *key)
ws_sec_key_ = dbuf_->dbuf_strdup(key);
return *this;
}
bool http_header::build_request(string& buf) const
{
if (url_ == NULL || *url_ == 0)
@ -582,7 +639,9 @@ bool http_header::build_request(string& buf) const
buf += tmp.c_str();
}
}
buf += " HTTP/1.1\r\n";
if (accept_compress_)
// 因为目前的 zlib_stream 仅支持于此
buf += "Accept-Encoding: gzip\r\n";
@ -603,6 +662,17 @@ bool http_header::build_request(string& buf) const
buf += "\r\n";
}
build_common(buf);
if (ws_origin_ && *ws_origin_)
buf << "Origin: " << ws_origin_ << "\r\n";
if (ws_sec_key_ && *ws_sec_key_)
buf << "Sec-WebSocket-Key: " << ws_sec_key_ << "\r\n";
if (ws_sec_proto_ && *ws_sec_proto_)
buf << "Sec-Websocket-Protocol: " << ws_sec_proto_ << "\r\n";
if (ws_sec_ver_ > 0)
buf << "Sec-WebSocket-Version: " << ws_sec_ver_ << "\r\n";
// 添加分段请求字段
if (range_from_ >= 0)
{
@ -612,7 +682,6 @@ bool http_header::build_request(string& buf) const
buf += "\r\n";
}
build_common(buf);
buf += "\r\n";
return (true);
@ -794,6 +863,35 @@ static const char *http_status(int status)
return (__maps[i].hs[pos].title);
}
void http_header::append_accept_key(const char* sec_key, string& out) const
{
string tmp(sec_key);
tmp += "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
sha1 sha;
sha.input(tmp.c_str(), tmp.size());
unsigned char digest[20];
sha.result((unsigned *) digest);
//little endian to big endian
for (int i = 0; i < 20; i += 4)
{
unsigned char c;
c = digest[i];
digest[i] = digest[i + 3];
digest[i + 3] = c;
c = digest[i + 1];
digest[i + 1] = digest[i + 2];
digest[i + 2] = c;
}
unsigned char* s = acl_base64_encode((char*) digest, 20);
out << "Sec-WebSocket-Accept: " << (char*) s << "\r\n";
acl_myfree(s);
}
bool http_header::build_response(string& out) const
{
out.clear();
@ -833,13 +931,23 @@ bool http_header::build_response(string& out) const
}
}
if (upgrade_ && *upgrade_)
{
build_common(out);
if (ws_sec_key_ && *ws_sec_key_)
append_accept_key(ws_sec_key_, out);
out << "\r\n";
return true;
}
// 添加分段响应字段
if (range_from_ >= 0 && range_to_ >= range_from_ && range_total_ > 0)
out << "Content-Range: bytes=" << range_from_ << '-'
<< range_to_ << '/' << range_total_ << "\r\n";
// 如果是 gzip 压缩数据,当非 chunked 传输时,必须取消 Content-Length 字段,
// 同时禁止保持长连接,即: Connection: close
// 如果是 gzip 压缩数据,当非 chunked 传输时,必须取消 Content-Length
// 字段,同时禁止保持长连接,即: Connection: close
if (transfer_gzip_)
{
out << "Content-Encoding: gzip\r\n";

View File

@ -0,0 +1,309 @@
#include "acl_stdafx.hpp"
#ifndef ACL_PREPARE_COMPILE
#include "acl_cpp/stdlib/log.hpp"
#include "acl_cpp/stream/socket_stream.hpp"
#include "acl_cpp/http/websocket.hpp"
#endif
namespace acl
{
websocket::websocket(socket_stream& client)
: client_(client)
, header_buf_(NULL)
, header_size_(0)
, header_len_(0)
, payload_nread_(0)
, header_sent_(false)
{
}
websocket::~websocket(void)
{
if (header_buf_)
acl_myfree(header_buf_);
}
websocket& websocket::reset(void)
{
header_.fin = false;
header_.rsv1 = false;
header_.rsv2 = false;
header_.rsv3 = false;
header_.opcode = FRAME_CONTINUATION;
header_.mask = false;
header_.payload_len = 0;
header_.masking_key = 0;
payload_nread_ = 0;
header_sent_ = false;
return *this;
}
websocket& websocket::set_frame_fin(bool yes)
{
header_.fin = yes;
return *this;
}
websocket& websocket::set_frame_rsv1(bool yes)
{
header_.rsv1 = yes;
return *this;
}
websocket& websocket::set_frame_rsv2(bool yes)
{
header_.rsv2 = yes;
return *this;
}
websocket& websocket::set_frame_rsv3(bool yes)
{
header_.rsv3 = yes;
return *this;
}
websocket& websocket::set_frame_opcode(unsigned char type)
{
header_.opcode = type;
return *this;
}
websocket& websocket::set_frame_payload_len(unsigned long long len)
{
header_.payload_len = len;
return *this;
}
websocket& websocket::set_frame_masking_key(unsigned int mask)
{
header_.masking_key = mask;
header_.mask = true;
return *this;
}
void websocket::make_frame_header(void)
{
header_len_ = 2;
if (header_.payload_len > 65535)
header_len_ += 8;
else if (header_.payload_len > 126)
header_len_ += 2;
if (header_.mask)
header_len_ += 4;
if (header_len_ > header_size_)
{
header_buf_ = (char*) acl_myrealloc(header_buf_, header_len_);
header_size_ = header_len_;
}
unsigned char* ptr = (unsigned char*) header_buf_;
if (header_.fin)
ptr[0] = 0x80;
else
ptr[0] = 0x00;
ptr[0] |= header_.opcode;
if (header_.payload_len && header_.mask)
ptr[1] = 0x80;
else
ptr[1] = 0x00;
unsigned long long offset = 1;
unsigned long long payload_len = header_.payload_len;
if (payload_len <= 125)
ptr[offset++] |= payload_len & 0xff;
else if (payload_len <= 65535)
{
ptr[offset++] |= 126;
ptr[offset++] = (unsigned char) (payload_len >> 8) & 0xff;
ptr[offset++] = payload_len & 0xff;
}
else
{
ptr[offset++] |= 127;
ptr[offset++] = (unsigned char) ((payload_len >> 56) & 0xff);
ptr[offset++] = (unsigned char) ((payload_len >> 48) & 0xff);
ptr[offset++] = (unsigned char) ((payload_len >> 40) & 0xff);
ptr[offset++] = (unsigned char) ((payload_len >> 32) & 0xff);
ptr[offset++] = (unsigned char) ((payload_len >> 24) & 0xff);
ptr[offset++] = (unsigned char) ((payload_len >> 16) & 0xff);
ptr[offset++] = (unsigned char) ((payload_len >> 8) & 0xff);
ptr[offset++] = (unsigned char) (payload_len & 0xff);
}
if (payload_len > 0 && header_.mask)
{
unsigned int masking_key = header_.masking_key;
ptr[offset++] = (unsigned char) ((masking_key >> 24) & 0xff);
ptr[offset++] = (unsigned char) ((masking_key >> 16) & 0xff);
ptr[offset++] = (unsigned char) ((masking_key >> 8) & 0xff);
ptr[offset++] = (unsigned char) (masking_key & 0xff);
// save result in masking_key for send_frame_data
memcpy(&header_.masking_key, ptr + offset - 4, 4);
}
}
bool websocket::send_frame_data(char* data, size_t len)
{
if (!header_sent_)
{
header_sent_ = true;
make_frame_header();
if (client_.write(header_buf_, header_len_) == -1)
{
logger_error("write header error %s, len: %d",
last_serror(), (int) header_len_);
return false;
}
}
if (data == NULL || len == 0)
return true;
if (header_.mask)
{
unsigned char* mask = (unsigned char*) &header_.masking_key;
for (size_t i = 0; i < len; i++)
data[i] ^= mask[i % 4];
}
if (client_.write(data, len) == -1)
{
logger_error("write frame data error %s", last_serror());
return false;
}
return true;
}
bool websocket::send_frame_pong(char* data, size_t len)
{
reset();
set_frame_fin(true);
set_frame_opcode(FRAME_PONG);
set_frame_payload_len(0);
return send_frame_data(data, len);
}
bool websocket::send_frame_ping(char* data, size_t len)
{
reset();
set_frame_fin(true);
set_frame_opcode(FRAME_PING);
set_frame_payload_len(len);
return send_frame_data(data, len);
}
static bool is_big_endian(void)
{
const int n = 1;
if (*(char*) &n)
return false;
else
return true;
}
#define swap64(val) (((val) >> 56) | \
(((val) & 0x00ff000000000000ll) >> 40) | \
(((val) & 0x0000ff0000000000ll) >> 24) | \
(((val) & 0x000000ff00000000ll) >> 8) | \
(((val) & 0x00000000ff000000ll) << 8) | \
(((val) & 0x0000000000ff0000ll) << 24) | \
(((val) & 0x000000000000ff00ll) << 40) | \
(((val) << 56)))
#define hton64(val) is_big_endian() ? val : swap64(val)
#define ntoh64(val) hton64(val)
bool websocket::read_frame_head(void)
{
reset();
int ret;
unsigned char buf[8];
if (client_.read(buf, 2) == -1)
{
logger_error("read first two char error: %s", last_serror());
return false;
}
header_.fin = (buf[0] >> 7) & 0x01;
header_.rsv1 = (buf[0] >> 6) & 0x01;
header_.rsv2 = (buf[0] >> 5) & 0x01;
header_.rsv3 = (buf[0] >> 4) & 0x01;
header_.opcode = buf[0] & 0x0f;
header_.mask = (buf[1] >> 7) & 0x01;
unsigned char payload_len = buf[1] & 0x7f;
if (payload_len <= 125)
header_.payload_len = payload_len;
// payload_len == 126 | 127
else if ((ret = client_.read(buf, payload_len == 126 ? 2 : 8)) == -1)
{
logger_error("read ext_payload_len error %s", last_serror());
return false;
}
else if (ret == 2)
{
unsigned int n;
memcpy(&n, buf, ret);
header_.payload_len = ntohl(n);
}
else // ret == 8
{
memcpy(&header_.payload_len, buf, ret);
header_.payload_len = ntoh64(header_.payload_len);
}
if (!header_.mask)
return true;
if (client_.read(&header_.masking_key, sizeof(unsigned int)) == -1)
{
logger_error("read masking_key error %s", last_serror());
return false;
}
return true;
}
int websocket::read_frame_data(char* buf, size_t size)
{
if (payload_nread_ == header_.payload_len)
return 0;
if (header_.payload_len - payload_nread_ < size)
size = (size_t) (header_.payload_len - payload_nread_);
int ret = client_.read(buf, size);
if (ret == -1)
{
logger_error("read frame data error %s", last_serror());
return -1;
}
if (header_.mask)
{
unsigned char* mask = (unsigned char*) &header_.masking_key;
for (int i = 0; i < ret; i++)
buf[i] ^= mask[i % 4];
}
payload_nread_ += ret;
return ret;
}
} // namespace acl

View File

@ -351,6 +351,23 @@ int redis_hash::hlen(const char* key)
return get_number();
}
int redis_hash::hstrlen(const char* key, const char* name)
{
return hstrlen(key, name, strlen(name));
}
int redis_hash::hstrlen(const char* key, const char* name, size_t name_len)
{
const char* names[1];
names[0] = name;
size_t names_len[1];
names_len[0] = name_len;
hash_slot(key);
build("HSTRLEN", key, names, names_len, 1);
return get_number();
}
int redis_hash::hscan(const char* key, int cursor, std::map<string, string>& out,
const char* pattern /* = NULL */, const size_t* count /* = NULL */)
{

View File

@ -324,7 +324,7 @@ int http_hdr_entry_replace2(HTTP_HDR *hh, const char *name,
HTTP_HDR_ENTRY *entry;
ACL_VSTRING *value = acl_vstring_alloc(256);
int once, n = 0, len = (int) strlen(from), i;
char *(*find_fn)(char *, const char*);
char *(*find_fn)(const char *, const char*);
if (strcasecmp(name, "Set-Cookie") == 0)
once = 0;
@ -333,7 +333,7 @@ int http_hdr_entry_replace2(HTTP_HDR *hh, const char *name,
if (ignore_case)
find_fn = acl_strcasestr;
else
find_fn = (char *(*)(char*, const char*)) strstr;
find_fn = strstr;
for (i = 0; i < hh->entry_lnk->count; i++) {
char *ptr_prev, *ptr;