fix timeout (#658)

This commit is contained in:
qicosmos 2024-04-15 17:47:47 +08:00 committed by GitHub
parent ab0fb6b4b5
commit e6342743f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 82 additions and 76 deletions

View File

@ -286,12 +286,13 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
if (!ok) { if (!ok) {
co_return resp_data{std::make_error_code(std::errc::protocol_error), 404}; co_return resp_data{std::make_error_code(std::errc::protocol_error), 404};
} }
{
auto future = start_timer(conn_timeout_duration_, "connect timer"); auto time_out_guard =
timer_guard(this, conn_timeout_duration_, "connect timer");
data = co_await connect(u); data = co_await connect(u);
if (auto ec = co_await wait_future(std::move(future)); ec) { }
co_return resp_data{ec, 404}; if (socket_->is_timeout_) {
co_return resp_data{std::make_error_code(std::errc::timed_out), 404};
} }
if (!data.net_err) { if (!data.net_err) {
data.status = 200; data.status = 200;
@ -623,38 +624,27 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
void set_max_single_part_size(size_t size) { max_single_part_size_ = size; } void set_max_single_part_size(size_t size) { max_single_part_size_ = size; }
async_simple::Future<async_simple::Unit> start_timer( struct timer_guard {
std::chrono::steady_clock::duration duration, std::string msg) { timer_guard(coro_http_client *self,
is_timeout_ = false; std::chrono::steady_clock::duration duration, std::string msg)
: self(self) {
self->socket_->is_timeout_ = false;
async_simple::Promise<async_simple::Unit> promise; if (self->enable_timeout_) {
auto fut = promise.getFuture(); self->timeout(self->timer_, duration, std::move(msg))
.start([](auto &&) {
if (enable_timeout_) { });
timeout(timer_, std::move(promise), duration, std::move(msg)) }
.via(&executor_wrapper_) return;
.detach();
} }
else { ~timer_guard() {
promise.setValue(async_simple::Unit{}); if (self->enable_timeout_ && self->socket_->is_timeout_ == false) {
std::error_code ignore_ec;
self->timer_.cancel(ignore_ec);
}
} }
return fut; coro_http_client *self;
} };
async_simple::coro::Lazy<std::error_code> wait_future(
async_simple::Future<async_simple::Unit> &&future) {
if (!enable_timeout_) {
co_return std::error_code{};
}
std::error_code err_code;
timer_.cancel(err_code);
co_await std::move(future);
if (is_timeout_) {
co_return std::make_error_code(std::errc::timed_out);
}
co_return std::error_code{};
}
async_simple::coro::Lazy<resp_data> async_upload_multipart(std::string uri) { async_simple::coro::Lazy<resp_data> async_upload_multipart(std::string uri) {
std::shared_ptr<int> guard(nullptr, [this](auto) { std::shared_ptr<int> guard(nullptr, [this](auto) {
@ -684,18 +674,21 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
size_t size = 0; size_t size = 0;
if (socket_->has_closed_) { if (socket_->has_closed_) {
auto future = start_timer(conn_timeout_duration_, "connect timer"); {
auto time_out_guard =
data = co_await connect(u); timer_guard(this, conn_timeout_duration_, "connect timer");
if (ec = co_await wait_future(std::move(future)); ec) { data = co_await connect(u);
co_return resp_data{ec, 404}; }
if (socket_->is_timeout_) {
co_return resp_data{std::make_error_code(std::errc::timed_out), 404};
} }
if (data.net_err) { if (data.net_err) {
co_return data; co_return data;
} }
} }
auto future = start_timer(req_timeout_duration_, "upload timer"); auto time_out_guard =
timer_guard(this, conn_timeout_duration_, "request timer");
std::tie(ec, size) = co_await async_write(asio::buffer(header_str)); std::tie(ec, size) = co_await async_write(asio::buffer(header_str));
#ifdef INJECT_FOR_HTTP_CLIENT_TEST #ifdef INJECT_FOR_HTTP_CLIENT_TEST
if (inject_write_failed == ClientInjectAction::write_failed) { if (inject_write_failed == ClientInjectAction::write_failed) {
@ -714,7 +707,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
data = co_await send_single_part(key, part); data = co_await send_single_part(key, part);
if (data.net_err) { if (data.net_err) {
if (data.net_err == asio::error::operation_aborted) { if (socket_->is_timeout_) {
data.net_err = std::make_error_code(std::errc::timed_out); data.net_err = std::make_error_code(std::errc::timed_out);
} }
co_return data; co_return data;
@ -725,16 +718,18 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
last_part.append("--").append(BOUNDARY).append("--").append(CRCF); last_part.append("--").append(BOUNDARY).append("--").append(CRCF);
if (std::tie(ec, size) = co_await async_write(asio::buffer(last_part)); if (std::tie(ec, size) = co_await async_write(asio::buffer(last_part));
ec) { ec) {
if (socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}
co_return resp_data{ec, 404}; co_return resp_data{ec, 404};
} }
bool is_keep_alive = true; bool is_keep_alive = true;
data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx), data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx),
http_method::POST); http_method::POST);
if (auto errc = co_await wait_future(std::move(future)); errc) { if (socket_->is_timeout_) {
ec = errc; ec = std::make_error_code(std::errc::timed_out);
} }
handle_result(data, ec, is_keep_alive); handle_result(data, ec, is_keep_alive);
co_return data; co_return data;
} }
@ -880,20 +875,25 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
size_t size = 0; size_t size = 0;
if (socket_->has_closed_) { if (socket_->has_closed_) {
auto future = start_timer(conn_timeout_duration_, "connect timer"); {
auto guard = timer_guard(this, conn_timeout_duration_, "connect timer");
data = co_await connect(u); data = co_await connect(u);
if (ec = co_await wait_future(std::move(future)); ec) { }
co_return resp_data{ec, 404}; if (socket_->is_timeout_) {
co_return resp_data{std::make_error_code(std::errc::timed_out), 404};
} }
if (data.net_err) { if (data.net_err) {
co_return data; co_return data;
} }
} }
auto future = start_timer(req_timeout_duration_, "upload timer"); auto time_guard =
timer_guard(this, conn_timeout_duration_, "request timer");
std::tie(ec, size) = co_await async_write(asio::buffer(header_str)); std::tie(ec, size) = co_await async_write(asio::buffer(header_str));
if (ec) { if (ec) {
if (socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}
co_return resp_data{ec, 404}; co_return resp_data{ec, 404};
} }
@ -945,19 +945,19 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
} }
} }
} }
if (ec) {
if (ec && ec == asio::error::operation_aborted) { if (socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out); ec = std::make_error_code(std::errc::timed_out);
}
co_return resp_data{ec, 404}; co_return resp_data{ec, 404};
} }
bool is_keep_alive = true; bool is_keep_alive = true;
data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx), data = co_await handle_read(ec, size, is_keep_alive, std::move(ctx),
http_method::POST); http_method::POST);
if (auto errc = co_await wait_future(std::move(future)); errc) { if (ec && socket_->is_timeout_) {
ec = errc; ec = std::make_error_code(std::errc::timed_out);
} }
handle_result(data, ec, is_keep_alive); handle_result(data, ec, is_keep_alive);
co_return data; co_return data;
} }
@ -1020,15 +1020,20 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
u.path = uri; u.path = uri;
} }
if (socket_->has_closed_) { if (socket_->has_closed_) {
auto conn_future = start_timer(conn_timeout_duration_, "connect timer");
host_ = proxy_host_.empty() ? u.get_host() : proxy_host_; host_ = proxy_host_.empty() ? u.get_host() : proxy_host_;
port_ = proxy_port_.empty() ? u.get_port() : proxy_port_; port_ = proxy_port_.empty() ? u.get_port() : proxy_port_;
auto guard = timer_guard(this, conn_timeout_duration_, "connect timer");
if (ec = co_await coro_io::async_connect(&executor_wrapper_, if (ec = co_await coro_io::async_connect(&executor_wrapper_,
socket_->impl_, host_, port_); socket_->impl_, host_, port_);
ec) { ec) {
break; break;
} }
if (socket_->is_timeout_) {
data.net_err = std::make_error_code(std::errc::timed_out);
co_return data;
}
if (enable_tcp_no_delay_) { if (enable_tcp_no_delay_) {
socket_->impl_.set_option(asio::ip::tcp::no_delay(true), ec); socket_->impl_.set_option(asio::ip::tcp::no_delay(true), ec);
if (ec) { if (ec) {
@ -1059,9 +1064,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
} }
} }
socket_->has_closed_ = false; socket_->has_closed_ = false;
if (ec = co_await wait_future(std::move(conn_future)); ec) {
break;
}
} }
std::vector<asio::const_buffer> vec; std::vector<asio::const_buffer> vec;
@ -1080,7 +1082,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
#ifdef CORO_HTTP_PRINT_REQ_HEAD #ifdef CORO_HTTP_PRINT_REQ_HEAD
CINATRA_LOG_DEBUG << req_head_str; CINATRA_LOG_DEBUG << req_head_str;
#endif #endif
auto future = start_timer(req_timeout_duration_, "request timer"); auto guard = timer_guard(this, req_timeout_duration_, "request timer");
if (has_body) { if (has_body) {
std::tie(ec, size) = co_await async_write(vec); std::tie(ec, size) = co_await async_write(vec);
} }
@ -1090,14 +1092,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
if (ec) { if (ec) {
break; break;
} }
data = data =
co_await handle_read(ec, size, is_keep_alive, std::move(ctx), method); co_await handle_read(ec, size, is_keep_alive, std::move(ctx), method);
if (auto errc = co_await wait_future(std::move(future)); errc) {
ec = errc;
}
} while (0); } while (0);
if (ec && socket_->is_timeout_) {
ec = std::make_error_code(std::errc::timed_out);
}
handle_result(data, ec, is_keep_alive); handle_result(data, ec, is_keep_alive);
co_return data; co_return data;
} }
@ -1179,6 +1179,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
struct socket_t { struct socket_t {
asio::ip::tcp::socket impl_; asio::ip::tcp::socket impl_;
std::atomic<bool> has_closed_ = true; std::atomic<bool> has_closed_ = true;
bool is_timeout_ = false;
asio::streambuf head_buf_; asio::streambuf head_buf_;
asio::streambuf chunked_buf_; asio::streambuf chunked_buf_;
#ifdef CINATRA_ENABLE_SSL #ifdef CINATRA_ENABLE_SSL
@ -1665,6 +1666,11 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return resp_data{ec, 404}; co_return resp_data{ec, 404};
} }
if (socket_->is_timeout_) {
auto ec = std::make_error_code(std::errc::timed_out);
co_return resp_data{ec, 404};
}
if (enable_tcp_no_delay_) { if (enable_tcp_no_delay_) {
std::error_code ec; std::error_code ec;
socket_->impl_.set_option(asio::ip::tcp::no_delay(true), ec); socket_->impl_.set_option(asio::ip::tcp::no_delay(true), ec);
@ -1960,17 +1966,19 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
} }
async_simple::coro::Lazy<bool> timeout( async_simple::coro::Lazy<bool> timeout(
auto &timer, auto promise, std::chrono::steady_clock::duration duration, auto &timer, std::chrono::steady_clock::duration duration,
std::string msg) { std::string msg) {
auto watcher = std::weak_ptr(socket_);
timer.expires_after(duration); timer.expires_after(duration);
is_timeout_ = co_await timer.async_await(); auto is_timeout = co_await timer.async_await();
if (!is_timeout_) { if (!is_timeout) {
promise.setValue(async_simple::Unit());
co_return false; co_return false;
} }
CINATRA_LOG_WARNING << msg << " timeout"; if (auto socket = watcher.lock(); socket) {
close_socket(*socket_); socket_->is_timeout_ = true;
promise.setValue(async_simple::Unit()); CINATRA_LOG_WARNING << msg << " timeout";
close_socket(*socket_);
}
co_return true; co_return true;
} }
@ -2025,8 +2033,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
#endif #endif
std::string redirect_uri_; std::string redirect_uri_;
bool enable_follow_redirect_ = false; bool enable_follow_redirect_ = false;
bool is_timeout_ = false;
bool enable_timeout_ = false; bool enable_timeout_ = false;
std::chrono::steady_clock::duration conn_timeout_duration_ = std::chrono::steady_clock::duration conn_timeout_duration_ =
std::chrono::seconds(8); std::chrono::seconds(8);