diff --git a/app/wizard_demo/httpd_proxy/http_servlet.cpp b/app/wizard_demo/httpd_proxy/http_servlet.cpp index 5f613a849..2fca601e0 100644 --- a/app/wizard_demo/httpd_proxy/http_servlet.cpp +++ b/app/wizard_demo/httpd_proxy/http_servlet.cpp @@ -1,11 +1,14 @@ #include "stdafx.h" #include "tcp_transfer.h" +#include "http_transfer.h" #include "http_servlet.h" -http_servlet::http_servlet(acl::socket_stream* stream, acl::session* session) +http_servlet::http_servlet(acl::socket_stream* stream, acl::session* session, + int port /* 80 */) : acl::HttpServlet(stream, session) +, port_(port) { - handlers_["/hello"] = &http_servlet::onHello; + handlers_["/hello"] = &http_servlet::on_hello; } http_servlet::~http_servlet(void) @@ -39,17 +42,19 @@ bool http_servlet::doOther(request_t&, response_t& res, const char* method) bool http_servlet::doGet(request_t& req, response_t& res) { - return doPost(req, res); + const char* path = req.getPathInfo(); + handler_t handler = path && *path ? handlers_[path] : NULL; + return handler ? (this->*handler)(req, res) : transfer_get(req, res); } bool http_servlet::doPost(request_t& req, response_t& res) { const char* path = req.getPathInfo(); handler_t handler = path && *path ? handlers_[path] : NULL; - return handler ? (this->*handler)(req, res) : onDefault(req, res); + return handler ? (this->*handler)(req, res) : transfer_post(req, res); } -bool http_servlet::onHello(request_t& req, response_t& res) +bool http_servlet::on_hello(request_t& req, response_t& res) { res.setContentType("text/html; charset=utf-8") // 设置响应字符集 .setKeepAlive(req.isKeepAlive()) // 设置是否保持长连接 @@ -89,30 +94,26 @@ bool http_servlet::onHello(request_t& req, response_t& res) return res.write(buf) && res.write(NULL, 0); } -bool http_servlet::onDefault(request_t& req, response_t& res) +bool http_servlet::transfer_get(request_t& req, response_t& res) { - const char* host = req.getRemoteHost(); - if (host == NULL || *host == 0) { - logger_error("no Host in request head"); - return false; - } + http_transfer fiber_peer(acl::HTTP_METHOD_GET, req, res, port_); + fiber_peer.start(); - acl::string buf(host); - char* port = strrchr(buf.c_str(), ':'); - if (port == NULL || *(port + 1) == 0) { - port = "80"; - } else { - *port++ = 0; - } + bool keep_alive; + fiber_peer.wait(&keep_alive); + return keep_alive && req.isKeepAlive(); +} - acl::string peer_addr; - peer_addr.format("%s|%s", buf.c_str(), port); - acl::socket_stream peer; - if (!peer.open(buf.c_str(), 0, 0)) { - logger_error("connect %s error %s", buf.c_str(), acl::last_serror()); - return false; - } - return true; +bool http_servlet::transfer_post(request_t& req, response_t& res) +{ + http_transfer fiber_peer(acl::HTTP_METHOD_POST, req, res, port_); + fiber_peer.start(); + + bool keep_alive; + fiber_peer.wait(&keep_alive); + + printf("transfer_post finished\r\n"); + return keep_alive && req.isKeepAlive(); } bool http_servlet::doConnect(request_t& req, response_t& res) @@ -144,11 +145,11 @@ bool http_servlet::doConnect(request_t& req, response_t& res) } acl::socket_stream& local = req.getSocketStream(); - doTcpProxy(local, peer); + transfer_tcp(local, peer); return false; } -bool http_servlet::doTcpProxy(acl::socket_stream& local, acl::socket_stream& peer) +bool http_servlet::transfer_tcp(acl::socket_stream& local, acl::socket_stream& peer) { tcp_transfer fiber_local(local, peer); tcp_transfer fiber_peer(peer, local); @@ -162,7 +163,7 @@ bool http_servlet::doTcpProxy(acl::socket_stream& local, acl::socket_stream& pee fiber_local.wait(); fiber_peer.wait(); - printf("doProxy finished, local fd=%d, peer fd=%d\r\n", + printf("transfer_tcp finished, local fd=%d, peer fd=%d\r\n", fiber_local.get_input().sock_handle(), fiber_local.get_output().sock_handle()); return true; diff --git a/app/wizard_demo/httpd_proxy/http_servlet.h b/app/wizard_demo/httpd_proxy/http_servlet.h index 52145f1f7..e1ba7f7de 100644 --- a/app/wizard_demo/httpd_proxy/http_servlet.h +++ b/app/wizard_demo/httpd_proxy/http_servlet.h @@ -3,7 +3,7 @@ class http_servlet : public acl::HttpServlet { public: - http_servlet(acl::socket_stream*, acl::session*); + http_servlet(acl::socket_stream*, acl::session*, int port = 80); ~http_servlet(); protected: @@ -22,11 +22,13 @@ protected: bool doConnect(request_t&, response_t&); private: + int port_; typedef bool (http_servlet::*handler_t)(request_t&,response_t&); std::map handlers_; - bool onDefault(request_t&, response_t&); - bool onHello(request_t&, response_t&); + bool on_hello(request_t&, response_t&); + bool transfer_get(request_t&, response_t&); + bool transfer_post(request_t&, response_t&); - bool doTcpProxy(acl::socket_stream& local, acl::socket_stream& peer); + bool transfer_tcp(acl::socket_stream& local, acl::socket_stream& peer); }; diff --git a/app/wizard_demo/httpd_proxy/http_transfer.cpp b/app/wizard_demo/httpd_proxy/http_transfer.cpp index 99789e909..f767e7aef 100644 --- a/app/wizard_demo/httpd_proxy/http_transfer.cpp +++ b/app/wizard_demo/httpd_proxy/http_transfer.cpp @@ -1,27 +1,198 @@ #include "stdafx.h" #include "http_transfer.h" -http_transfer::http_transfer(request_t& req, response_t& res) -: req_(req) +http_transfer::http_transfer(acl::http_method_t method, request_t& req, + response_t& res, int port) +: port_(port) +, method_(method) +, req_(req) , res_(res) -, peer_(NULL) +, client_(NULL) {} -http_transfer::~http_transfer(void) {} - -void http_transfer::set_peer(http_transfer& peer) { - peer_ = &peer; +http_transfer::~http_transfer(void) { + delete client_; } -void http_transfer::wait(void) { - (void) box_.pop(); +void http_transfer::wait(bool* keep_alive) { + bool* res = box_.pop(); + assert(res); + *keep_alive = *res; + delete res; } void http_transfer::run(void) { - - if (peer_) { - peer_->kill(); + bool* res = new bool; + switch (method_) { + case acl::HTTP_METHOD_GET: + *res = transfer_get(); + break; + case acl::HTTP_METHOD_POST: + *res = transfer_post(); + break; + default: + logger_error("not support method: %d", (int) method_); + *res = false; + break; } - box_.push(NULL); + box_.push(res); +} + +bool http_transfer::open_peer(request_t& req, acl::socket_stream& conn) +{ + const char* host = req.getRemoteHost(); + if (host == NULL || *host == 0) { + logger_error("no Host in request head"); + return false; + } + + acl::string buf(host); + + char* ptr = strrchr(buf.c_str(), ':'); + if (ptr != NULL && *(ptr + 1) != 0) { + *ptr++ = 0; + int port = atoi(ptr); + if (port > 0 && port < 65535) { + port_ = port; + } + } + + acl::string addr; + addr.format("%s|%d", buf.c_str(), port_); + + if (conn.open(addr, 0, 0)) { + logger("connect %s ok", addr.c_str()); + return true; + } + + logger_error("connect %s error %s", addr.c_str(), acl::last_serror()); + return false; +} + +bool http_transfer::transfer_request_head(acl::socket_stream& conn) { + acl::string header; + req_.sprint_header(header, NULL); + if (header.empty()) { + logger_error("http request head empty"); + return false; + } + + header += "\r\n"; + + if (conn.write(header) == -1) { + logger_error("write request header error"); + return false; + } + + printf(">>>send head: [%s]\r\n", header.c_str()); + client_ = new acl::http_client(&conn, true); + return true; +} + +bool http_transfer::transfer_request_body(acl::socket_stream& conn) { + long long length = req_.getContentLength(); + if (length <= 0) { + return true; + } + + acl::istream& in = req_.getInputStream(); + long long n = 0; + char buf[8192]; + + while (n < length) { + int ret = in.read(buf, sizeof(buf), false); + if (ret == -1) { + logger_error("read request body error"); + return false; + } + + if (conn.write(buf, ret) == -1) { + logger_error("send request body error"); + return false; + } + + n += ret; + } + + return true; +} + +bool http_transfer::transfer_get(void) { + if (!open_peer(req_, conn_)) { + logger_error("open server error"); + return false; + } + + if (!transfer_request_head(conn_)) { + logger_error("transfer_request_head error"); + return false; + } else { + return transfer_response(); + } +} + +bool http_transfer::transfer_post(void) { + if (!open_peer(req_, conn_)) { + logger_error("open server error"); + return false; + } + + if (!transfer_request_head(conn_)) { + logger_error("transfer_request_head error"); + return false; + } else if (!transfer_request_body(conn_)) { + logger_error("transfer_request_body error"); + return false; + } else { + return transfer_response(); + } +} + +bool http_transfer::transfer_response(void) { + assert(client_); + if (!client_->read_head()) { + logger_error("read response head error"); + return false; + } + + acl::string header; + client_->sprint_header(header, NULL); + if (header.empty()) { + logger_error("response header empty"); + return false; + } + + header += "\r\n"; + + printf("response head:\r\n[%s]\r\n", header.c_str()); + + acl::ostream& out = res_.getOutputStream(); + if (out.write(header) == -1) { + logger_error("send response head error"); + return false; + } + + long long length = client_->body_length(); + if (length == 0) { + return client_->is_server_keep_alive(); + } + + char buf[8192]; + while (true) { + int ret = client_->read_body(buf, sizeof(buf)); + if (ret <= 0) { + break; + } else if (out.write(buf, ret) == -1) { + logger_error("send response body error"); + return false; + } + } + + + if (length < 0) { + return false; + } + + return client_->is_server_keep_alive(); } diff --git a/app/wizard_demo/httpd_proxy/http_transfer.h b/app/wizard_demo/httpd_proxy/http_transfer.h index 364e8a9e0..fe34b3acc 100644 --- a/app/wizard_demo/httpd_proxy/http_transfer.h +++ b/app/wizard_demo/httpd_proxy/http_transfer.h @@ -3,20 +3,33 @@ class http_transfer : public acl::fiber { public: - http_transfer(request_t& req, response_t& res); + http_transfer(acl::http_method_t method, request_t& req, + response_t& res, int port); ~http_transfer(void); - void set_peer(http_transfer& peer); - void wait(void); + void wait(bool* keep_alive); protected: // @override void run(void); private: - acl::fiber_tbox box_; + acl::fiber_tbox box_; + + int port_; + acl::http_method_t method_; request_t& req_; response_t& res_; - http_transfer* peer_; + acl::socket_stream conn_; + acl::http_client* client_; + + bool open_peer(request_t& req, acl::socket_stream& conn); + + bool transfer_get(void); + bool transfer_post(void); + + bool transfer_request_head(acl::socket_stream& conn); + bool transfer_request_body(acl::socket_stream& conn); + bool transfer_response(void); }; diff --git a/lib_fiber/c/src/event/event_iocp.c b/lib_fiber/c/src/event/event_iocp.c index 491f56ca4..b917b494d 100644 --- a/lib_fiber/c/src/event/event_iocp.c +++ b/lib_fiber/c/src/event/event_iocp.c @@ -396,8 +396,8 @@ static int iocp_add_write(EVENT_IOCP *ev, FILE_EVENT *fe) fe->mask |= EVENT_WRITE; return 0; } else { - msg_warn("%s(%d): WriteFile error(%d, %s)", - __FUNCTION__, __LINE__, acl_fiber_last_error(), last_serror()); + msg_warn("%s(%d): WriteFile error(%d, %s)", __FUNCTION__, + __LINE__, acl_fiber_last_error(), last_serror()); fe->mask |= EVENT_ERR; assert(fe->writer); array_append(ev->events, fe->writer); @@ -456,7 +456,11 @@ static void iocp_event_save(EVENT_IOCP *ei, IOCP_EVENT *event, fe->mask &= ~EVENT_READ; } else if ((event->type & (IOCP_EVENT_WRITE | IOCP_EVENT_POLLW))) { if (fe->status & STATUS_CONNECTING) { - // just for the calling of getpeername() + // Just for the calling of getpeername(): + // If the socket is ready for connecting server, we + // should set SO_UPDATE_CONNECT_CONTEXT here, because + // the peer address wasn't associated with the socket + // automaticlly in IOCP mode. DWORD val = 1; setsockopt(fe->fd, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, (char *)&val, sizeof(DWORD)); @@ -483,7 +487,7 @@ static int iocp_wait(EVENT *ev, int timeout) (OVERLAPPED**) &event, timeout); if (event == NULL) { - break; + break; } if (event->type & IOCP_EVENT_DEAD) { @@ -492,6 +496,11 @@ static int iocp_wait(EVENT *ev, int timeout) } event->refer--; + + // If the associated FILE_EVENT with the event has gone in + // iocp_close_sock(), we should check the event's refer and + // free it when refer is 0. + if (event->fe == NULL) { if (event->refer == 0) { mem_free(event);