[coro_http][fix][feature]coro_http (#661)

This commit is contained in:
qicosmos 2024-04-18 16:16:15 +08:00 committed by GitHub
parent 7349bd0db8
commit 9641d2a8d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 255 additions and 235 deletions

View File

@ -381,23 +381,22 @@ int main() {
### websocket ### websocket
```c++ ```c++
async_simple::coro::Lazy<void> websocket(coro_http_client &client) { async_simple::coro::Lazy<void> websocket(coro_http_client &client) {
client.on_ws_close([](std::string_view reason) {
std::cout << "web socket close " << reason << std::endl;
});
client.on_ws_msg([](resp_data data) {
std::cout << data.resp_body << std::endl;
});
// connect to your websocket server. // connect to your websocket server.
bool r = co_await client.async_connect("ws://example.com/ws"); bool r = co_await client.async_connect("ws://example.com/ws");
if (!r) { if (!r) {
co_return; co_return;
} }
co_await client.async_send_ws("hello websocket"); co_await client.write_websocket("hello websocket");
co_await client.async_send_ws("test again", /*need_mask = */ false); auto data = co_await client.read_websocket();
co_await client.async_send_ws_close("ws close reason"); CHECK(data.resp_body == "hello websocket");
co_await client.write_websocket("test again");
data = co_await client.read_websocket();
CHECK(data.resp_body == "test again");
co_await client.write_websocket("ws close");
data = co_await client.read_websocket();
CHECK(data.net_err == asio::error::eof);
CHECK(data.resp_body == "ws close");
} }
``` ```

View File

@ -289,6 +289,21 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
{ {
auto time_out_guard = auto time_out_guard =
timer_guard(this, conn_timeout_duration_, "connect timer"); timer_guard(this, conn_timeout_duration_, "connect timer");
if (u.is_websocket()) {
// build websocket http header
add_header("Upgrade", "websocket");
add_header("Connection", "Upgrade");
if (ws_sec_key_.empty()) {
ws_sec_key_ = "s//GYHa/XO7Hd2F2eOGfyA=="; // provide a random string.
}
add_header("Sec-WebSocket-Key", ws_sec_key_);
add_header("Sec-WebSocket-Version", "13");
req_context<> ctx{};
data = co_await async_request(std::move(uri), http_method::GET,
std::move(ctx));
co_return data;
}
data = co_await connect(u); data = co_await connect(u);
} }
if (socket_->is_timeout_) { if (socket_->is_timeout_) {
@ -319,50 +334,41 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
void set_ws_sec_key(std::string sec_key) { ws_sec_key_ = std::move(sec_key); } void set_ws_sec_key(std::string sec_key) { ws_sec_key_ = std::move(sec_key); }
async_simple::coro::Lazy<bool> async_ws_connect(std::string uri) { async_simple::coro::Lazy<resp_data> read_websocket() {
resp_data data{}; co_return co_await async_read_ws();
auto [r, u] = handle_uri(data, uri);
if (!r) {
CINATRA_LOG_WARNING << "url error:";
co_return false;
}
req_context<> ctx{};
if (u.is_websocket()) {
// build websocket http header
add_header("Upgrade", "websocket");
add_header("Connection", "Upgrade");
if (ws_sec_key_.empty()) {
ws_sec_key_ = "s//GYHa/XO7Hd2F2eOGfyA=="; // provide a random string.
}
add_header("Sec-WebSocket-Key", ws_sec_key_);
add_header("Sec-WebSocket-Version", "13");
}
data = co_await async_request(std::move(uri), http_method::GET,
std::move(ctx));
async_read_ws().start([](auto &&) {
});
co_return !data.net_err;
} }
async_simple::coro::Lazy<resp_data> async_send_ws(const char *data, async_simple::coro::Lazy<resp_data> write_websocket(
bool need_mask = true, const char *data, opcode op = opcode::text) {
opcode op = opcode::text) {
std::string str(data); std::string str(data);
co_return co_await async_send_ws(std::span<char>(str), need_mask, op); co_return co_await write_websocket(str, op);
} }
async_simple::coro::Lazy<resp_data> async_send_ws(std::string data, async_simple::coro::Lazy<resp_data> write_websocket(
bool need_mask = true, const char *data, size_t size, opcode op = opcode::text) {
opcode op = opcode::text) { std::string str(data, size);
co_return co_await async_send_ws(std::span<char>(data), need_mask, op); co_return co_await write_websocket(str, op);
}
async_simple::coro::Lazy<resp_data> write_websocket(
std::string_view data, opcode op = opcode::text) {
std::string str(data);
co_return co_await write_websocket(str, op);
}
async_simple::coro::Lazy<resp_data> write_websocket(
std::string &data, opcode op = opcode::text) {
co_return co_await write_websocket(std::span<char>(data), op);
}
async_simple::coro::Lazy<resp_data> write_websocket(
std::string &&data, opcode op = opcode::text) {
co_return co_await write_websocket(std::span<char>(data), op);
} }
template <typename Source> template <typename Source>
async_simple::coro::Lazy<resp_data> async_send_ws(Source source, async_simple::coro::Lazy<resp_data> write_websocket(
bool need_mask = true, Source source, opcode op = opcode::text) {
opcode op = opcode::text) {
resp_data data{}; resp_data data{};
websocket ws{}; websocket ws{};
@ -376,7 +382,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
} }
if constexpr (is_span_v<Source>) { if constexpr (is_span_v<Source>) {
std::string encode_header = ws.encode_frame(source, op, need_mask); std::string encode_header = ws.encode_frame(source, op, true);
std::vector<asio::const_buffer> buffers{ std::vector<asio::const_buffer> buffers{
asio::buffer(encode_header.data(), encode_header.size()), asio::buffer(encode_header.data(), encode_header.size()),
asio::buffer(source.data(), source.size())}; asio::buffer(source.data(), source.size())};
@ -392,8 +398,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
auto result = co_await source(); auto result = co_await source();
std::span<char> msg(result.buf.data(), result.buf.size()); std::span<char> msg(result.buf.data(), result.buf.size());
std::string encode_header = std::string encode_header = ws.encode_frame(msg, op, result.eof);
ws.encode_frame(msg, op, need_mask, result.eof);
std::vector<asio::const_buffer> buffers{ std::vector<asio::const_buffer> buffers{
asio::buffer(encode_header.data(), encode_header.size()), asio::buffer(encode_header.data(), encode_header.size()),
asio::buffer(msg.data(), msg.size())}; asio::buffer(msg.data(), msg.size())};
@ -414,16 +419,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return data; co_return data;
} }
async_simple::coro::Lazy<resp_data> async_send_ws_close( async_simple::coro::Lazy<resp_data> write_websocket_close(
std::string msg = "") { std::string msg = "") {
co_return co_await async_send_ws(std::move(msg), false, opcode::close); co_return co_await write_websocket(std::move(msg), opcode::close);
}
void on_ws_msg(std::function<void(resp_data)> on_ws_msg) {
on_ws_msg_ = std::move(on_ws_msg);
}
void on_ws_close(std::function<void(std::string_view)> on_ws_close) {
on_ws_close_ = std::move(on_ws_close);
} }
#ifdef BENCHMARK_TEST #ifdef BENCHMARK_TEST
@ -1625,14 +1623,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
break; break;
} }
if (chunk_size == 0) {
// all finished, no more data
chunked_buf_.consume(CRCF.size());
data.status = 200;
data.eof = true;
break;
}
if (additional_size < size_t(chunk_size + 2)) { if (additional_size < size_t(chunk_size + 2)) {
// not a complete chunk, read left chunk data. // not a complete chunk, read left chunk data.
size_t size_to_read = chunk_size + 2 - additional_size; size_t size_to_read = chunk_size + 2 - additional_size;
@ -1643,6 +1633,14 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
} }
} }
if (chunk_size == 0) {
// all finished, no more data
chunked_buf_.consume(chunked_buf_.size());
data.status = 200;
data.eof = true;
break;
}
data_ptr = asio::buffer_cast<const char *>(chunked_buf_.data()); data_ptr = asio::buffer_cast<const char *>(chunked_buf_.data());
if (ctx.stream) { if (ctx.stream) {
ec = co_await ctx.stream->async_write(data_ptr, chunk_size); ec = co_await ctx.stream->async_write(data_ptr, chunk_size);
@ -1782,15 +1780,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return resp_data{{}, 200}; co_return resp_data{{}, 200};
} }
// this function must be called before async_ws_connect. async_simple::coro::Lazy<resp_data> async_read_ws() {
async_simple::coro::Lazy<void> async_read_ws() {
resp_data data{}; resp_data data{};
head_buf_.consume(head_buf_.size()); head_buf_.consume(head_buf_.size());
size_t header_size = 2; size_t header_size = 2;
std::shared_ptr sock = socket_; std::shared_ptr sock = socket_;
auto on_ws_msg = on_ws_msg_;
auto on_ws_close = on_ws_close_;
asio::streambuf &read_buf = sock->head_buf_; asio::streambuf &read_buf = sock->head_buf_;
bool has_init_ssl = false; bool has_init_ssl = false;
#ifdef CINATRA_ENABLE_SSL #ifdef CINATRA_ENABLE_SSL
@ -1805,14 +1800,11 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
data.status = 404; data.status = 404;
if (sock->has_closed_) { if (sock->has_closed_) {
co_return; co_return data;
} }
close_socket(*sock); close_socket(*sock);
co_return data;
if (on_ws_msg)
on_ws_msg(data);
co_return;
} }
const char *data_ptr = asio::buffer_cast<const char *>(read_buf.data()); const char *data_ptr = asio::buffer_cast<const char *>(read_buf.data());
@ -1835,9 +1827,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
data.net_err = ec; data.net_err = ec;
data.status = 404; data.status = 404;
close_socket(*sock); close_socket(*sock);
if (on_ws_msg) co_return data;
on_ws_msg(data);
co_return;
} }
} }
@ -1856,14 +1846,11 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
header_size = 2; header_size = 2;
if (is_close_frame) { if (is_close_frame) {
if (on_ws_close)
on_ws_close(data.resp_body);
std::string reason = "close"; std::string reason = "close";
auto close_str = ws.format_close_payload(close_code::normal, auto close_str = ws.format_close_payload(close_code::normal,
reason.data(), reason.size()); reason.data(), reason.size());
auto span = std::span<char>(close_str); auto span = std::span<char>(close_str);
std::string encode_header = ws.encode_frame(span, opcode::close, false); std::string encode_header = ws.encode_frame(span, opcode::close, true);
std::vector<asio::const_buffer> buffers{asio::buffer(encode_header), std::vector<asio::const_buffer> buffers{asio::buffer(encode_header),
asio::buffer(reason)}; asio::buffer(reason)};
@ -1873,12 +1860,9 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
data.net_err = asio::error::eof; data.net_err = asio::error::eof;
data.status = 404; data.status = 404;
if (on_ws_msg) co_return data;
on_ws_msg(data);
co_return;
} }
if (on_ws_msg) co_return data;
on_ws_msg(data);
} }
} }
@ -2019,8 +2003,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::map<std::string, multipart_t> form_data_; std::map<std::string, multipart_t> form_data_;
size_t max_single_part_size_ = 1024 * 1024; size_t max_single_part_size_ = 1024 * 1024;
std::function<void(resp_data)> on_ws_msg_;
std::function<void(std::string_view)> on_ws_close_;
std::string ws_sec_key_; std::string ws_sec_key_;
std::string host_; std::string host_;
std::string port_; std::string port_;

View File

@ -192,78 +192,84 @@ class coro_http_connection
co_await router_.route_coro(coro_handler, request_, response_, key); co_await router_.route_coro(coro_handler, request_, response_, key);
} }
else { else {
bool is_exist = false; if (default_handler_) {
std::function<void(coro_http_request & req, default_handler_(request_, response_);
coro_http_response & resp)>
handler;
std::string method_str{parser_.method()};
std::string url_path = method_str;
url_path.append(" ").append(parser_.url());
std::tie(is_exist, handler, request_.params_) =
router_.get_router_tree()->get(url_path, method_str);
if (is_exist) {
if (handler) {
(handler)(request_, response_);
}
else {
response_.set_status(status_type::not_found);
}
} }
else { else {
bool is_coro_exist = false; bool is_exist = false;
std::function<async_simple::coro::Lazy<void>( std::function<void(coro_http_request & req,
coro_http_request & req, coro_http_response & resp)> coro_http_response & resp)>
coro_handler; handler;
std::string method_str{parser_.method()};
std::tie(is_coro_exist, coro_handler, request_.params_) = std::string url_path = method_str;
router_.get_coro_router_tree()->get_coro(url_path, method_str); url_path.append(" ").append(parser_.url());
std::tie(is_exist, handler, request_.params_) =
if (is_coro_exist) { router_.get_router_tree()->get(url_path, method_str);
if (coro_handler) { if (is_exist) {
co_await coro_handler(request_, response_); if (handler) {
(handler)(request_, response_);
} }
else { else {
response_.set_status(status_type::not_found); response_.set_status(status_type::not_found);
} }
} }
else { else {
bool is_matched_regex_router = false; bool is_coro_exist = false;
// coro regex router std::function<async_simple::coro::Lazy<void>(
auto coro_regex_handlers = router_.get_coro_regex_handlers(); coro_http_request & req, coro_http_response & resp)>
if (coro_regex_handlers.size() != 0) { coro_handler;
for (auto &pair : coro_regex_handlers) {
std::string coro_regex_key{key};
if (std::regex_match(coro_regex_key, request_.matches_, std::tie(is_coro_exist, coro_handler, request_.params_) =
std::get<0>(pair))) { router_.get_coro_router_tree()->get_coro(url_path,
auto coro_handler = std::get<1>(pair); method_str);
if (coro_handler) {
co_await coro_handler(request_, response_); if (is_coro_exist) {
is_matched_regex_router = true; if (coro_handler) {
} co_await coro_handler(request_, response_);
} }
else {
response_.set_status(status_type::not_found);
} }
} }
// regex router else {
if (!is_matched_regex_router) { bool is_matched_regex_router = false;
auto regex_handlers = router_.get_regex_handlers(); // coro regex router
if (regex_handlers.size() != 0) { auto coro_regex_handlers = router_.get_coro_regex_handlers();
for (auto &pair : regex_handlers) { if (coro_regex_handlers.size() != 0) {
std::string regex_key{key}; for (auto &pair : coro_regex_handlers) {
if (std::regex_match(regex_key, request_.matches_, std::string coro_regex_key{key};
if (std::regex_match(coro_regex_key, request_.matches_,
std::get<0>(pair))) { std::get<0>(pair))) {
auto handler = std::get<1>(pair); auto coro_handler = std::get<1>(pair);
if (handler) { if (coro_handler) {
(handler)(request_, response_); co_await coro_handler(request_, response_);
is_matched_regex_router = true; is_matched_regex_router = true;
} }
} }
} }
} }
// regex router
if (!is_matched_regex_router) {
auto regex_handlers = router_.get_regex_handlers();
if (regex_handlers.size() != 0) {
for (auto &pair : regex_handlers) {
std::string regex_key{key};
if (std::regex_match(regex_key, request_.matches_,
std::get<0>(pair))) {
auto handler = std::get<1>(pair);
if (handler) {
(handler)(request_, response_);
is_matched_regex_router = true;
}
}
}
}
}
// not found
if (!is_matched_regex_router)
response_.set_status(status_type::not_found);
} }
// not found
if (!is_matched_regex_router)
response_.set_status(status_type::not_found);
} }
} }
} }
@ -407,6 +413,11 @@ class coro_http_connection
void set_multi_buf(bool r) { multi_buf_ = r; } void set_multi_buf(bool r) { multi_buf_ = r; }
void set_default_handler(
std::function<void(coro_http_request &, coro_http_response &)> &handler) {
default_handler_ = handler;
}
async_simple::coro::Lazy<bool> write_data(std::string_view message) { async_simple::coro::Lazy<bool> write_data(std::string_view message) {
std::vector<asio::const_buffer> buffers; std::vector<asio::const_buffer> buffers;
buffers.push_back(asio::buffer(message)); buffers.push_back(asio::buffer(message));
@ -524,13 +535,6 @@ class coro_http_connection
chunked_buf_.consume(size); chunked_buf_.consume(size);
if (chunk_size == 0) {
// all finished, no more data
chunked_buf_.consume(CRCF.size());
result.eof = true;
co_return result;
}
if (additional_size < size_t(chunk_size + 2)) { if (additional_size < size_t(chunk_size + 2)) {
// not a complete chunk, read left chunk data. // not a complete chunk, read left chunk data.
size_t size_to_read = chunk_size + 2 - additional_size; size_t size_to_read = chunk_size + 2 - additional_size;
@ -542,6 +546,13 @@ class coro_http_connection
} }
} }
if (chunk_size == 0) {
// all finished, no more data
chunked_buf_.consume(chunked_buf_.size());
result.eof = true;
co_return result;
}
data_ptr = asio::buffer_cast<const char *>(chunked_buf_.data()); data_ptr = asio::buffer_cast<const char *>(chunked_buf_.data());
result.data = std::string_view{data_ptr, (size_t)chunk_size}; result.data = std::string_view{data_ptr, (size_t)chunk_size};
chunked_buf_.consume(chunk_size + CRCF.size()); chunked_buf_.consume(chunk_size + CRCF.size());
@ -834,5 +845,7 @@ class coro_http_connection
#endif #endif
bool need_shrink_every_time_ = false; bool need_shrink_every_time_ = false;
bool multi_buf_ = true; bool multi_buf_ = true;
std::function<void(coro_http_request &, coro_http_response &)>
default_handler_ = nullptr;
}; };
} // namespace cinatra } // namespace cinatra

View File

@ -1,6 +1,4 @@
#pragma once #pragma once
#include <async_simple/coro/Lazy.h>
#include <algorithm> #include <algorithm>
#include <functional> #include <functional>
#include <set> #include <set>
@ -17,17 +15,6 @@
#include "ylt/util/type_traits.h" #include "ylt/util/type_traits.h"
namespace cinatra { namespace cinatra {
template <template <typename...> class U, typename T>
struct is_template_instant_of : std::false_type {};
template <template <typename...> class U, typename... args>
struct is_template_instant_of<U, U<args...>> : std::true_type {};
template <typename T>
constexpr inline bool is_lazy_v =
is_template_instant_of<async_simple::coro::Lazy,
std::remove_cvref_t<T>>::value;
template <class, class = void> template <class, class = void>
struct has_before : std::false_type {}; struct has_before : std::false_type {};
@ -64,7 +51,7 @@ class coro_http_router {
// hold keys to make sure map_handles_ key is // hold keys to make sure map_handles_ key is
// std::string_view, avoid memcpy when route // std::string_view, avoid memcpy when route
using return_type = typename util::function_traits<Func>::return_type; using return_type = typename util::function_traits<Func>::return_type;
if constexpr (is_lazy_v<return_type>) { if constexpr (coro_io::is_lazy_v<return_type>) {
std::function<async_simple::coro::Lazy<void>(coro_http_request & req, std::function<async_simple::coro::Lazy<void>(coro_http_request & req,
coro_http_response & resp)> coro_http_response & resp)>
http_handler; http_handler;
@ -78,9 +65,8 @@ class coro_http_router {
(do_before(asps, req, resp, ok), ...); (do_before(asps, req, resp, ok), ...);
if (ok) { if (ok) {
co_await handler(req, resp); co_await handler(req, resp);
(do_after(asps, req, resp, ok), ...);
} }
(do_after(asps, req, resp, ok), ...);
}; };
} }
else { else {
@ -126,8 +112,8 @@ class coro_http_router {
(do_before(asps, req, resp, ok), ...); (do_before(asps, req, resp, ok), ...);
if (ok) { if (ok) {
handler(req, resp); handler(req, resp);
(do_after(asps, req, resp, ok), ...);
} }
(do_after(asps, req, resp, ok), ...);
}; };
} }
else { else {

View File

@ -220,6 +220,61 @@ class coro_http_server {
} }
} }
template <http_method... method, typename... Aspects>
void set_websocket_proxy_handler(std::string url_path,
std::vector<std::string_view> hosts,
coro_io::load_blance_algorithm type =
coro_io::load_blance_algorithm::random,
std::vector<int> weights = {},
Aspects &&...aspects) {
if (hosts.empty()) {
throw std::invalid_argument("not config hosts yet!");
}
auto channel = std::make_shared<coro_io::channel<coro_http_client>>(
coro_io::channel<coro_http_client>::create(hosts, {.lba = type},
weights));
set_http_handler<cinatra::GET>(
url_path,
[channel](coro_http_request &req,
coro_http_response &resp) -> async_simple::coro::Lazy<void> {
websocket_result result{};
while (true) {
result = co_await req.get_conn()->read_websocket();
if (result.ec) {
break;
}
if (result.type == ws_frame_type::WS_CLOSE_FRAME) {
CINATRA_LOG_INFO << "close frame";
break;
}
co_await channel->send_request(
[&req, result](
coro_http_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
auto r =
co_await client.write_websocket(std::string(result.data));
if (r.net_err) {
co_return;
}
auto data = co_await client.read_websocket();
if (data.net_err) {
co_return;
}
auto ec = co_await req.get_conn()->write_websocket(
std::string(result.data));
if (ec) {
co_return;
}
});
}
},
std::forward<Aspects>(aspects)...);
}
void set_max_size_of_cache_files(size_t max_size = 3 * 1024 * 1024) { void set_max_size_of_cache_files(size_t max_size = 3 * 1024 * 1024) {
std::error_code ec; std::error_code ec;
for (const auto &file : for (const auto &file :
@ -495,6 +550,11 @@ class coro_http_server {
void set_shrink_to_fit(bool r) { need_shrink_every_time_ = r; } void set_shrink_to_fit(bool r) { need_shrink_every_time_ = r; }
void set_default_handler(
std::function<void(coro_http_request &, coro_http_response &)> handler) {
default_handler_ = std::move(handler);
}
size_t connection_count() { size_t connection_count() {
std::scoped_lock lock(conn_mtx_); std::scoped_lock lock(conn_mtx_);
return connections_.size(); return connections_.size();
@ -600,6 +660,9 @@ class coro_http_server {
if (need_check_) { if (need_check_) {
conn->set_check_timeout(true); conn->set_check_timeout(true);
} }
if (default_handler_) {
conn->set_default_handler(default_handler_);
}
#ifdef CINATRA_ENABLE_SSL #ifdef CINATRA_ENABLE_SSL
if (use_ssl_) { if (use_ssl_) {
@ -860,6 +923,8 @@ class coro_http_server {
#endif #endif
coro_http_router router_; coro_http_router router_;
bool need_shrink_every_time_ = false; bool need_shrink_every_time_ = false;
std::function<void(coro_http_request &, coro_http_response &)>
default_handler_ = nullptr;
}; };
using http_server = coro_http_server; using http_server = coro_http_server;

View File

@ -126,8 +126,7 @@ class websocket {
return {msg_header_, header_length}; return {msg_header_, header_length};
} }
std::string encode_frame(std::span<char> &data, opcode op, bool need_mask, std::string encode_frame(std::span<char> &data, opcode op, bool eof) {
bool eof = true) {
std::string header; std::string header;
/// Base header. /// Base header.
frame_header hdr{}; frame_header hdr{};
@ -173,11 +172,9 @@ class websocket {
/// The mask is a 32-bit value. /// The mask is a 32-bit value.
uint8_t mask[4] = {}; uint8_t mask[4] = {};
if (need_mask) { header[1] |= 0x80;
header[1] |= 0x80; uint32_t random = (uint32_t)rand();
uint32_t random = (uint32_t)rand(); memcpy(mask, &random, 4);
memcpy(mask, &random, 4);
}
size_t size = header.size(); size_t size = header.size();
header.resize(size + 4); header.resize(size + 4);

View File

@ -205,29 +205,19 @@ async_simple::coro::Lazy<void> use_websocket() {
std::this_thread::sleep_for(300ms); // wait for server start std::this_thread::sleep_for(300ms); // wait for server start
coro_http_client client{}; coro_http_client client{};
client.on_ws_close([](std::string_view reason) { auto r = co_await client.connect("ws://127.0.0.1:9001/ws_echo");
std::cout << reason << "\n"; if (r.net_err) {
assert(reason == "normal close");
});
client.on_ws_msg([](resp_data data) {
if (data.net_err) {
std::cout << data.net_err.message() << "\n";
return;
}
assert(data.resp_body == "hello websocket" ||
data.resp_body == "test again");
});
bool r = co_await client.async_ws_connect("ws://127.0.0.1:9001/ws_echo");
if (!r) {
co_return; co_return;
} }
auto result = auto result = co_await client.write_websocket("hello websocket");
co_await client.async_send_ws("hello websocket"); // mask as default.
assert(!result.net_err); assert(!result.net_err);
result = co_await client.async_send_ws("test again", /*need_mask = */ false); auto data = co_await client.read_websocket();
assert(data.resp_body == "hello websocket");
result = co_await client.write_websocket("test again");
assert(!result.net_err); assert(!result.net_err);
data = co_await client.read_websocket();
assert(data.resp_body == "test again");
} }
async_simple::coro::Lazy<void> static_file_server() { async_simple::coro::Lazy<void> static_file_server() {

View File

@ -381,23 +381,22 @@ int main() {
### websocket ### websocket
```c++ ```c++
async_simple::coro::Lazy<void> websocket(coro_http_client &client) { async_simple::coro::Lazy<void> websocket(coro_http_client &client) {
client.on_ws_close([](std::string_view reason) {
std::cout << "web socket close " << reason << std::endl;
});
client.on_ws_msg([](resp_data data) {
std::cout << data.resp_body << std::endl;
});
// connect to your websocket server. // connect to your websocket server.
bool r = co_await client.async_connect("ws://example.com/ws"); bool r = co_await client.async_connect("ws://example.com/ws");
if (!r) { if (!r) {
co_return; co_return;
} }
co_await client.async_send_ws("hello websocket"); co_await client.write_websocket("hello websocket");
co_await client.async_send_ws("test again", /*need_mask = */ false); auto data = co_await client.read_websocket();
co_await client.async_send_ws_close("ws close reason"); CHECK(data.resp_body == "hello websocket");
co_await client.write_websocket("test again");
data = co_await client.read_websocket();
CHECK(data.resp_body == "test again");
co_await client.write_websocket("ws close");
data = co_await client.read_websocket();
CHECK(data.net_err == asio::error::eof);
CHECK(data.resp_body == "ws close");
} }
``` ```

View File

@ -422,17 +422,17 @@ auto r = async_simple::coro::syncAwait(
``` ```
## websocket ## websocket
websocket 的支持需要3步 websocket 的支持需要3步
- 设置读websocket 数据的回调函数;
- 连接服务器; - 连接服务器;
- 发送websocket 数据; - 发送websocket 数据;
- 读websocket 数据;
设置websocket 读数据接口: websocket 读数据接口:
```c++ ```c++
void on_ws_msg(std::function<void(resp_data)> on_ws_msg); async_simple::coro::Lazy<resp_data> read_websocket();
``` ```
websocket 连接服务器接口: websocket 连接服务器接口:
```c++ ```c++
async_simple::coro::Lazy<bool> async_ws_connect(std::string uri); async_simple::coro::Lazy<resp_data> connect(std::string uri);
``` ```
websocket 发送数据接口: websocket 发送数据接口:
```c++ ```c++
@ -457,10 +457,8 @@ enum opcode : std::uint8_t {
/// 发送websocket 数据 /// 发送websocket 数据
/// \param msg 要发送的websocket 数据 /// \param msg 要发送的websocket 数据
/// \param need_mask 是否需要对数据进行mask默认会mask
/// \param op opcode 一般为text、binary或 close 等类型 /// \param op opcode 一般为text、binary或 close 等类型
async_simple::coro::Lazy<resp_data> async_send_ws(std::string msg, async_simple::coro::Lazy<resp_data> write_websocket(std::string msg,
bool need_mask = true,
opcode op = opcode::text); opcode op = opcode::text);
``` ```
@ -470,23 +468,15 @@ websocket 例子:
coro_http_client client; coro_http_client client;
// 连接websocket 服务器 // 连接websocket 服务器
async_simple::coro::syncAwait( async_simple::coro::syncAwait(
client.async_ws_connect("ws://localhost:8090")); client.connect("ws://localhost:8090"));
std::string send_str(len, 'a'); std::string send_str(len, 'a');
// 设置读数据回调
client.on_ws_msg([&, send_str](resp_data data) {
if (data.net_err) {
std::cout << "ws_msg net error " << data.net_err.message() << "\n";
return;
}
std::cout << "ws msg len: " << data.resp_body.size() << std::endl;
REQUIRE(data.resp_body.size() == send_str.size());
CHECK(data.resp_body == send_str);
});
// 发送websocket 数据 // 发送websocket 数据
async_simple::coro::syncAwait(client.async_send_ws(send_str)); async_simple::coro::syncAwait(client.write_websocket(std::string(send_str)));
auto data = async_simple::coro::syncAwait(client.read_websocket());
REQUIRE(data.resp_body.size() == send_str.size());
CHECK(data.resp_body == send_str);
``` ```
## 线程模型 ## 线程模型

View File

@ -311,23 +311,22 @@ int main() {
### websocket ### websocket
```cpp ```cpp
async_simple::coro::Lazy<void> websocket(coro_http_client &client) { async_simple::coro::Lazy<void> websocket(coro_http_client &client) {
client.on_ws_close([](std::string_view reason) {
std::cout << "web socket close " << reason << std::endl;
});
client.on_ws_msg([](resp_data data) {
std::cout << data.resp_body << std::endl;
});
// connect to your websocket server. // connect to your websocket server.
bool r = co_await client.async_connect("ws://example.com/ws"); bool r = co_await client.async_connect("ws://example.com/ws");
if (!r) { if (!r) {
co_return; co_return;
} }
co_await client.async_send_ws("hello websocket"); co_await client.write_websocket("hello websocket");
co_await client.async_send_ws("test again", /*need_mask = */ false); auto data = co_await client.read_websocket();
co_await client.async_send_ws_close("ws close reason"); CHECK(data.resp_body == "hello websocket");
co_await client.write_websocket("test again");
data = co_await client.read_websocket();
CHECK(data.resp_body == "test again");
co_await client.write_websocket("ws close");
data = co_await client.read_websocket();
CHECK(data.net_err == asio::error::eof);
CHECK(data.resp_body == "ws close");
} }
``` ```