test httpd_proxy demo

This commit is contained in:
shuxin   zheng 2022-01-24 18:36:08 +08:00
parent 0c6ace92e0
commit d7fa5eab36
5 changed files with 251 additions and 55 deletions

View File

@ -1,11 +1,14 @@
#include "stdafx.h"
#include "tcp_transfer.h"
#include "http_transfer.h"
#include "http_servlet.h"
http_servlet::http_servlet(acl::socket_stream* stream, acl::session* session)
http_servlet::http_servlet(acl::socket_stream* stream, acl::session* session,
int port /* 80 */)
: acl::HttpServlet(stream, session)
, port_(port)
{
handlers_["/hello"] = &http_servlet::onHello;
handlers_["/hello"] = &http_servlet::on_hello;
}
http_servlet::~http_servlet(void)
@ -39,17 +42,19 @@ bool http_servlet::doOther(request_t&, response_t& res, const char* method)
bool http_servlet::doGet(request_t& req, response_t& res)
{
return doPost(req, res);
const char* path = req.getPathInfo();
handler_t handler = path && *path ? handlers_[path] : NULL;
return handler ? (this->*handler)(req, res) : transfer_get(req, res);
}
bool http_servlet::doPost(request_t& req, response_t& res)
{
const char* path = req.getPathInfo();
handler_t handler = path && *path ? handlers_[path] : NULL;
return handler ? (this->*handler)(req, res) : onDefault(req, res);
return handler ? (this->*handler)(req, res) : transfer_post(req, res);
}
bool http_servlet::onHello(request_t& req, response_t& res)
bool http_servlet::on_hello(request_t& req, response_t& res)
{
res.setContentType("text/html; charset=utf-8") // 设置响应字符集
.setKeepAlive(req.isKeepAlive()) // 设置是否保持长连接
@ -89,30 +94,26 @@ bool http_servlet::onHello(request_t& req, response_t& res)
return res.write(buf) && res.write(NULL, 0);
}
bool http_servlet::onDefault(request_t& req, response_t& res)
bool http_servlet::transfer_get(request_t& req, response_t& res)
{
const char* host = req.getRemoteHost();
if (host == NULL || *host == 0) {
logger_error("no Host in request head");
return false;
}
http_transfer fiber_peer(acl::HTTP_METHOD_GET, req, res, port_);
fiber_peer.start();
acl::string buf(host);
char* port = strrchr(buf.c_str(), ':');
if (port == NULL || *(port + 1) == 0) {
port = "80";
} else {
*port++ = 0;
}
bool keep_alive;
fiber_peer.wait(&keep_alive);
return keep_alive && req.isKeepAlive();
}
acl::string peer_addr;
peer_addr.format("%s|%s", buf.c_str(), port);
acl::socket_stream peer;
if (!peer.open(buf.c_str(), 0, 0)) {
logger_error("connect %s error %s", buf.c_str(), acl::last_serror());
return false;
}
return true;
bool http_servlet::transfer_post(request_t& req, response_t& res)
{
http_transfer fiber_peer(acl::HTTP_METHOD_POST, req, res, port_);
fiber_peer.start();
bool keep_alive;
fiber_peer.wait(&keep_alive);
printf("transfer_post finished\r\n");
return keep_alive && req.isKeepAlive();
}
bool http_servlet::doConnect(request_t& req, response_t& res)
@ -144,11 +145,11 @@ bool http_servlet::doConnect(request_t& req, response_t& res)
}
acl::socket_stream& local = req.getSocketStream();
doTcpProxy(local, peer);
transfer_tcp(local, peer);
return false;
}
bool http_servlet::doTcpProxy(acl::socket_stream& local, acl::socket_stream& peer)
bool http_servlet::transfer_tcp(acl::socket_stream& local, acl::socket_stream& peer)
{
tcp_transfer fiber_local(local, peer);
tcp_transfer fiber_peer(peer, local);
@ -162,7 +163,7 @@ bool http_servlet::doTcpProxy(acl::socket_stream& local, acl::socket_stream& pee
fiber_local.wait();
fiber_peer.wait();
printf("doProxy finished, local fd=%d, peer fd=%d\r\n",
printf("transfer_tcp finished, local fd=%d, peer fd=%d\r\n",
fiber_local.get_input().sock_handle(),
fiber_local.get_output().sock_handle());
return true;

View File

@ -3,7 +3,7 @@
class http_servlet : public acl::HttpServlet
{
public:
http_servlet(acl::socket_stream*, acl::session*);
http_servlet(acl::socket_stream*, acl::session*, int port = 80);
~http_servlet();
protected:
@ -22,11 +22,13 @@ protected:
bool doConnect(request_t&, response_t&);
private:
int port_;
typedef bool (http_servlet::*handler_t)(request_t&,response_t&);
std::map<std::string, handler_t> handlers_;
bool onDefault(request_t&, response_t&);
bool onHello(request_t&, response_t&);
bool on_hello(request_t&, response_t&);
bool transfer_get(request_t&, response_t&);
bool transfer_post(request_t&, response_t&);
bool doTcpProxy(acl::socket_stream& local, acl::socket_stream& peer);
bool transfer_tcp(acl::socket_stream& local, acl::socket_stream& peer);
};

View File

@ -1,27 +1,198 @@
#include "stdafx.h"
#include "http_transfer.h"
http_transfer::http_transfer(request_t& req, response_t& res)
: req_(req)
http_transfer::http_transfer(acl::http_method_t method, request_t& req,
response_t& res, int port)
: port_(port)
, method_(method)
, req_(req)
, res_(res)
, peer_(NULL)
, client_(NULL)
{}
http_transfer::~http_transfer(void) {}
void http_transfer::set_peer(http_transfer& peer) {
peer_ = &peer;
http_transfer::~http_transfer(void) {
delete client_;
}
void http_transfer::wait(void) {
(void) box_.pop();
void http_transfer::wait(bool* keep_alive) {
bool* res = box_.pop();
assert(res);
*keep_alive = *res;
delete res;
}
void http_transfer::run(void) {
if (peer_) {
peer_->kill();
bool* res = new bool;
switch (method_) {
case acl::HTTP_METHOD_GET:
*res = transfer_get();
break;
case acl::HTTP_METHOD_POST:
*res = transfer_post();
break;
default:
logger_error("not support method: %d", (int) method_);
*res = false;
break;
}
box_.push(NULL);
box_.push(res);
}
bool http_transfer::open_peer(request_t& req, acl::socket_stream& conn)
{
const char* host = req.getRemoteHost();
if (host == NULL || *host == 0) {
logger_error("no Host in request head");
return false;
}
acl::string buf(host);
char* ptr = strrchr(buf.c_str(), ':');
if (ptr != NULL && *(ptr + 1) != 0) {
*ptr++ = 0;
int port = atoi(ptr);
if (port > 0 && port < 65535) {
port_ = port;
}
}
acl::string addr;
addr.format("%s|%d", buf.c_str(), port_);
if (conn.open(addr, 0, 0)) {
logger("connect %s ok", addr.c_str());
return true;
}
logger_error("connect %s error %s", addr.c_str(), acl::last_serror());
return false;
}
bool http_transfer::transfer_request_head(acl::socket_stream& conn) {
acl::string header;
req_.sprint_header(header, NULL);
if (header.empty()) {
logger_error("http request head empty");
return false;
}
header += "\r\n";
if (conn.write(header) == -1) {
logger_error("write request header error");
return false;
}
printf(">>>send head: [%s]\r\n", header.c_str());
client_ = new acl::http_client(&conn, true);
return true;
}
bool http_transfer::transfer_request_body(acl::socket_stream& conn) {
long long length = req_.getContentLength();
if (length <= 0) {
return true;
}
acl::istream& in = req_.getInputStream();
long long n = 0;
char buf[8192];
while (n < length) {
int ret = in.read(buf, sizeof(buf), false);
if (ret == -1) {
logger_error("read request body error");
return false;
}
if (conn.write(buf, ret) == -1) {
logger_error("send request body error");
return false;
}
n += ret;
}
return true;
}
bool http_transfer::transfer_get(void) {
if (!open_peer(req_, conn_)) {
logger_error("open server error");
return false;
}
if (!transfer_request_head(conn_)) {
logger_error("transfer_request_head error");
return false;
} else {
return transfer_response();
}
}
bool http_transfer::transfer_post(void) {
if (!open_peer(req_, conn_)) {
logger_error("open server error");
return false;
}
if (!transfer_request_head(conn_)) {
logger_error("transfer_request_head error");
return false;
} else if (!transfer_request_body(conn_)) {
logger_error("transfer_request_body error");
return false;
} else {
return transfer_response();
}
}
bool http_transfer::transfer_response(void) {
assert(client_);
if (!client_->read_head()) {
logger_error("read response head error");
return false;
}
acl::string header;
client_->sprint_header(header, NULL);
if (header.empty()) {
logger_error("response header empty");
return false;
}
header += "\r\n";
printf("response head:\r\n[%s]\r\n", header.c_str());
acl::ostream& out = res_.getOutputStream();
if (out.write(header) == -1) {
logger_error("send response head error");
return false;
}
long long length = client_->body_length();
if (length == 0) {
return client_->is_server_keep_alive();
}
char buf[8192];
while (true) {
int ret = client_->read_body(buf, sizeof(buf));
if (ret <= 0) {
break;
} else if (out.write(buf, ret) == -1) {
logger_error("send response body error");
return false;
}
}
if (length < 0) {
return false;
}
return client_->is_server_keep_alive();
}

View File

@ -3,20 +3,33 @@
class http_transfer : public acl::fiber
{
public:
http_transfer(request_t& req, response_t& res);
http_transfer(acl::http_method_t method, request_t& req,
response_t& res, int port);
~http_transfer(void);
void set_peer(http_transfer& peer);
void wait(void);
void wait(bool* keep_alive);
protected:
// @override
void run(void);
private:
acl::fiber_tbox<int> box_;
acl::fiber_tbox<bool> box_;
int port_;
acl::http_method_t method_;
request_t& req_;
response_t& res_;
http_transfer* peer_;
acl::socket_stream conn_;
acl::http_client* client_;
bool open_peer(request_t& req, acl::socket_stream& conn);
bool transfer_get(void);
bool transfer_post(void);
bool transfer_request_head(acl::socket_stream& conn);
bool transfer_request_body(acl::socket_stream& conn);
bool transfer_response(void);
};

View File

@ -396,8 +396,8 @@ static int iocp_add_write(EVENT_IOCP *ev, FILE_EVENT *fe)
fe->mask |= EVENT_WRITE;
return 0;
} else {
msg_warn("%s(%d): WriteFile error(%d, %s)",
__FUNCTION__, __LINE__, acl_fiber_last_error(), last_serror());
msg_warn("%s(%d): WriteFile error(%d, %s)", __FUNCTION__,
__LINE__, acl_fiber_last_error(), last_serror());
fe->mask |= EVENT_ERR;
assert(fe->writer);
array_append(ev->events, fe->writer);
@ -456,7 +456,11 @@ static void iocp_event_save(EVENT_IOCP *ei, IOCP_EVENT *event,
fe->mask &= ~EVENT_READ;
} else if ((event->type & (IOCP_EVENT_WRITE | IOCP_EVENT_POLLW))) {
if (fe->status & STATUS_CONNECTING) {
// just for the calling of getpeername()
// Just for the calling of getpeername():
// If the socket is ready for connecting server, we
// should set SO_UPDATE_CONNECT_CONTEXT here, because
// the peer address wasn't associated with the socket
// automaticlly in IOCP mode.
DWORD val = 1;
setsockopt(fe->fd, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,
(char *)&val, sizeof(DWORD));
@ -483,7 +487,7 @@ static int iocp_wait(EVENT *ev, int timeout)
(OVERLAPPED**) &event, timeout);
if (event == NULL) {
break;
break;
}
if (event->type & IOCP_EVENT_DEAD) {
@ -492,6 +496,11 @@ static int iocp_wait(EVENT *ev, int timeout)
}
event->refer--;
// If the associated FILE_EVENT with the event has gone in
// iocp_close_sock(), we should check the event's refer and
// free it when refer is 0.
if (event->fe == NULL) {
if (event->refer == 0) {
mem_free(event);