[coro_io] fix client pool slow connect bug (#657)

This commit is contained in:
saipubw 2024-04-10 18:10:56 +08:00 committed by GitHub
parent 31b6e46971
commit ab0fb6b4b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 234 additions and 223 deletions

View File

@ -36,12 +36,14 @@
#include <random> #include <random>
#include <shared_mutex> #include <shared_mutex>
#include <string_view> #include <string_view>
#include <system_error>
#include <thread> #include <thread>
#include <type_traits> #include <type_traits>
#include <unordered_map> #include <unordered_map>
#include <utility> #include <utility>
#include <ylt/util/expected.hpp> #include <ylt/util/expected.hpp>
#include "async_simple/Common.h"
#include "async_simple/coro/Collect.h" #include "async_simple/coro/Collect.h"
#include "coro_io.hpp" #include "coro_io.hpp"
#include "detail/client_queue.hpp" #include "detail/client_queue.hpp"
@ -105,46 +107,14 @@ class client_pool : public std::enable_shared_from_this<
co_return; co_return;
} }
struct client_connect_helper { static auto rand_time(std::chrono::milliseconds ms) {
std::unique_ptr<client_t> client; static thread_local std::default_random_engine r;
std::weak_ptr<client_pool> pool_watcher; std::uniform_real_distribution e(0.7f, 1.3f);
std::weak_ptr<bool> spinlock_watcher; return std::chrono::milliseconds{static_cast<long>(e(r) * ms.count())};
client_connect_helper(std::unique_ptr<client_t>&& client, }
std::weak_ptr<client_pool>&& pool_watcher,
std::weak_ptr<bool>&& spinlock_watcher)
: client(std::move(client)),
pool_watcher(std::move(pool_watcher)),
spinlock_watcher(std::move(spinlock_watcher)) {}
client_connect_helper(client_connect_helper&& o)
: client(std::move(o.client)),
pool_watcher(std::move(o.pool_watcher)),
spinlock_watcher(std::move(o.spinlock_watcher)) {}
client_connect_helper& operator=(client_connect_helper&& o) {
client = std::move(o.client);
pool_watcher = std::move(o.pool_watcher);
spinlock_watcher = std::move(o.spinlock_watcher);
return *this;
}
~client_connect_helper() {
if (client) {
if (auto pool = pool_watcher.lock(); pool) {
int cnt = 0;
while (spinlock_watcher.lock()) {
std::this_thread::yield();
++cnt;
if (cnt % 10000 == 0) {
ELOG_WARN << "spinlock of client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "}cost too much time, spin count: " << cnt;
}
}
pool->collect_free_client(std::move(client));
}
}
}
};
async_simple::coro::Lazy<void> reconnect(std::unique_ptr<client_t>& client) { async_simple::coro::Lazy<void> reconnect(std::unique_ptr<client_t>& client) {
using namespace std::chrono_literals;
for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) { for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) {
ELOG_DEBUG << "try to reconnect client{" << client.get() << "},host:{" ELOG_DEBUG << "try to reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port() << client->get_host() << ":" << client->get_port()
@ -164,9 +134,10 @@ class client_pool : public std::enable_shared_from_this<
ELOG_DEBUG << "reconnect client{" << client.get() ELOG_DEBUG << "reconnect client{" << client.get()
<< "} failed. If client close:{" << client->has_closed() << "} failed. If client close:{" << client->has_closed()
<< "}"; << "}";
auto wait_time = pool_config_.reconnect_wait_time - cost_time; auto wait_time = rand_time(
(pool_config_.reconnect_wait_time * (i + 1) - cost_time) / 1ms * 1ms);
if (wait_time.count() > 0) if (wait_time.count() > 0)
co_await coro_io::sleep_for(wait_time); co_await coro_io::sleep_for(wait_time, &client->get_executor());
} }
ELOG_WARN << "reconnect client{" << client.get() << "},host:{" ELOG_WARN << "reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port() << client->get_host() << ":" << client->get_port()
@ -174,137 +145,114 @@ class client_pool : public std::enable_shared_from_this<
client = nullptr; client = nullptr;
} }
async_simple::coro::Lazy<client_connect_helper> connect_client( struct promise_handler {
client_connect_helper helper) { std::atomic<bool> flag_ = false;
ELOG_DEBUG << "try to connect client{" << helper.client.get() async_simple::Promise<std::unique_ptr<client_t>> promise_;
};
async_simple::coro::Lazy<void> connect_client(
std::unique_ptr<client_t> client, std::weak_ptr<client_pool> watcher,
std::shared_ptr<promise_handler> handler) {
ELOG_DEBUG << "try to connect client{" << client.get()
<< "} to host:" << host_name_; << "} to host:" << host_name_;
auto result = co_await helper.client->connect(host_name_); auto result = co_await client->connect(host_name_);
std::shared_ptr<client_pool> self = watcher.lock();
if (!client_t::is_ok(result)) { if (!client_t::is_ok(result)) {
ELOG_DEBUG << "connect client{" << helper.client.get() << "} to failed. "; ELOG_DEBUG << "connect client{" << client.get() << "} to failed. ";
co_await reconnect(helper.client); if (self) {
co_await reconnect(client);
}
} }
if (helper.client) { if (client) {
ELOG_DEBUG << "connect client{" << helper.client.get() << "} successful!"; ELOG_DEBUG << "connect client{" << client.get() << "} successful!";
}
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
}
else {
auto conn_lim = std::min<unsigned>(10u, pool_config_.max_connection);
if (self && free_clients_.size() < conn_lim && client) {
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
} }
co_return std::move(helper);
}
auto rand_time() {
static thread_local std::default_random_engine r;
std::uniform_int_distribution<int> e(-25, 25);
return std::chrono::milliseconds{100 + e(r)};
} }
async_simple::coro::Lazy<std::unique_ptr<client_t>> get_client( async_simple::coro::Lazy<std::unique_ptr<client_t>> get_client(
const typename client_t::config& client_config) { const typename client_t::config& client_config) {
std::unique_ptr<client_t> client; std::unique_ptr<client_t> client;
free_clients_.try_dequeue(client); free_clients_.try_dequeue(client);
if (!client) { if (!client) {
short_connect_clients_.try_dequeue(client); short_connect_clients_.try_dequeue(client);
} }
assert(client == nullptr || !client->has_closed());
if (client == nullptr) { if (client == nullptr) {
client = std::make_unique<client_t>(*io_context_pool_.get_executor()); std::unique_ptr<client_t> cli;
if (!client->init_config(client_config)) { auto executor = io_context_pool_.get_executor();
ELOG_ERROR << "init client config{" << client.get() << "} failed."; client = std::make_unique<client_t>(*executor);
co_return nullptr; if (!client->init_config(client_config))
} AS_UNLIKELY {
auto spinlock = std::make_shared<bool>(false); ELOG_ERROR << "init client config failed.";
co_return nullptr;
}
auto client_ptr = client.get(); auto client_ptr = client.get();
auto result = co_await async_simple::coro::collectAny( auto handler = std::make_shared<promise_handler>();
connect_client(client_connect_helper{ connect_client(std::move(client), this->weak_from_this(), handler)
std::move(client), this->shared_from_this(), spinlock}), .start([](auto&&) {
coro_io::sleep_for(rand_time())); });
if (result.index() == 0) { // connect finish in 100ms auto timer = std::make_shared<coro_io::period_timer>(
co_return std::move(std::get<0>(result).value().client); executor->get_asio_executor());
} timer->expires_after(std::chrono::milliseconds{20});
else if (result.index() == 1) { // connect time cost more than 100ms timer->async_await().start([watcher = this->weak_from_this(), handler,
ELOG_DEBUG << "slow connection of client{" << client_ptr client_ptr, timer](auto&& res) {
<< "}, try to get free client from pool."; if (res.value() && !handler->flag_) {
std::unique_ptr<client_t> cli; if (auto self = watcher.lock(); self) {
if (short_connect_clients_.try_dequeue(cli) || ++self->promise_cnt_;
free_clients_.try_dequeue(cli)) { self->promise_queue_.enqueue(handler);
spinlock = nullptr; timer->expires_after(
ELOG_DEBUG << "get free client{" << cli.get() (std::max)(std::chrono::milliseconds{0},
<< "} from pool. skip wait client{" << client_ptr self->pool_config_.max_connection_time -
<< "} connect"; std::chrono::milliseconds{20}));
co_return std::move(cli); timer->async_await().start([handler = std::move(handler),
} client_ptr = client_ptr](auto&& res) {
else { auto has_get_connect = handler->flag_.exchange(true);
auto promise = std::make_unique< if (!has_get_connect) {
async_simple::Promise<std::unique_ptr<client_t>>>(); ELOG_ERROR << "Out of max limitation of connect "
auto* promise_address = promise.get(); "time, connect "
promise_queue.enqueue(promise_address); "failed. skip wait client{"
spinlock = nullptr; << client_ptr << "} connect. ";
if (short_connect_clients_.try_dequeue(cli) || handler->promise_.setValue(std::unique_ptr<client_t>{nullptr});
free_clients_.try_dequeue(cli)) { }
collect_free_client(std::move(cli)); });
}
ELOG_DEBUG << "wait for free client waiter promise{"
<< promise_address << "} response because slow client{"
<< client_ptr << "}";
auto res = co_await collectAny(
[](auto promise)
-> async_simple::coro::Lazy<std::unique_ptr<client_t>> {
co_return co_await promise->getFuture();
}(std::move(promise)),
coro_io::sleep_for(this->pool_config_.max_connection_time));
if (res.index() == 0) {
auto& res0 = std::get<0>(res);
if (!res0.hasError()) {
auto& cli = res0.value();
ELOG_DEBUG << "get free client{" << cli.get() << "} from promise{"
<< promise_address << "}. skip wait client{"
<< client_ptr << "} connect";
co_return std::move(cli);
}
else {
ELOG_ERROR << "Unexcepted branch";
co_return nullptr;
}
}
else {
ELOG_ERROR << "Unexcepted branch. Out of max limitation of connect "
"time, connect "
"failed. skip wait client{"
<< client_ptr << "} connect. "
<< "skip wait promise {" << promise_address
<< "} response";
co_return nullptr;
} }
} }
} });
else { ELOG_DEBUG << "wait client by promise {" << &handler->promise_ << "}";
ELOG_ERROR << "unknown collectAny index while wait client{" client = co_await handler->promise_.getFuture();
<< client_ptr << "} connect"; if (client) {
co_return nullptr; executor->schedule([timer] {
std::error_code ignore_ec;
timer->cancel(ignore_ec);
});
} }
} }
else { else {
ELOG_DEBUG << "get free client{" << client.get() << "}. from queue"; ELOG_DEBUG << "get free client{" << client.get() << "}. from queue";
co_return std::move(client);
} }
co_return std::move(client);
} }
void enqueue( void enqueue(
coro_io::detail::client_queue<std::unique_ptr<client_t>>& clients, coro_io::detail::client_queue<std::unique_ptr<client_t>>& clients,
std::unique_ptr<client_t> client, bool is_short_client) { std::unique_ptr<client_t> client,
std::chrono::milliseconds collect_time) {
if (clients.enqueue(std::move(client)) == 1) { if (clients.enqueue(std::move(client)) == 1) {
std::size_t expected = 0; std::size_t expected = 0;
if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) { if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) {
ELOG_DEBUG << "start timeout client collecter of client_pool{" ELOG_DEBUG << "start timeout client collecter of client_pool{"
<< host_name_ << "}"; << host_name_ << "}";
collect_idle_timeout_client( collect_idle_timeout_client(
this->shared_from_this(), clients, this->weak_from_this(), clients,
(std::max)( (std::max)(collect_time, std::chrono::milliseconds{50}),
(is_short_client
? (std::min)(pool_config_.idle_timeout,
pool_config_.short_connect_idle_timeout)
: pool_config_.idle_timeout),
std::chrono::milliseconds{50}),
pool_config_.idle_queue_per_max_clear_count) pool_config_.idle_queue_per_max_clear_count)
.via(coro_io::get_global_executor()) .via(coro_io::get_global_executor())
.start([](auto&&) { .start([](auto&&) {
@ -314,28 +262,41 @@ class client_pool : public std::enable_shared_from_this<
} }
void collect_free_client(std::unique_ptr<client_t> client) { void collect_free_client(std::unique_ptr<client_t> client) {
ELOG_DEBUG << "collect free client{" << client.get() << "}"; if (!client->has_closed()) {
if (client && !client->has_closed()) { std::shared_ptr<promise_handler> handler;
async_simple::Promise<std::unique_ptr<client_t>>* promise = nullptr; if (promise_cnt_) {
if (promise_queue.try_dequeue(promise)) { int cnt = 0;
promise->setValue(std::move(client)); while (promise_queue_.try_dequeue(handler)) {
ELOG_DEBUG << "collect free client{" << client.get() ++cnt;
<< "} wake up promise{" << promise << "}"; auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
promise_cnt_ -= cnt;
ELOG_DEBUG << "collect free client{" << client.get()
<< "} and wake up promise{" << &handler->promise_ << "}";
return;
}
}
promise_cnt_ -= cnt;
} }
else if (free_clients_.size() < pool_config_.max_connection) {
ELOG_DEBUG << "collect free client{" << client.get() << "} enqueue"; if (free_clients_.size() < pool_config_.max_connection) {
enqueue(free_clients_, std::move(client), false); if (client) {
ELOG_DEBUG << "collect free client{" << client.get() << "} enqueue";
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
} }
else { else {
ELOG_DEBUG << "out of max connection limit <<" ELOG_DEBUG << "out of max connection limit <<"
<< pool_config_.max_connection << ", collect free client{" << pool_config_.max_connection << ", collect free client{"
<< client.get() << "} enqueue short connect queue"; << client.get() << "} enqueue short connect queue";
enqueue(short_connect_clients_, std::move(client), true); enqueue(short_connect_clients_, std::move(client),
pool_config_.short_connect_idle_timeout);
} }
} }
else { else {
ELOG_DEBUG << "client{" << client.get() ELOG_DEBUG << "client{" << client.get()
<< "} is nullptr or is closed. we won't collect it"; << "} is closed. we won't collect it";
} }
return; return;
@ -489,8 +450,8 @@ class client_pool : public std::enable_shared_from_this<
coro_io::detail::client_queue<std::unique_ptr<client_t>> coro_io::detail::client_queue<std::unique_ptr<client_t>>
short_connect_clients_; short_connect_clients_;
client_pools_t* pools_manager_ = nullptr; client_pools_t* pools_manager_ = nullptr;
moodycamel::ConcurrentQueue<async_simple::Promise<std::unique_ptr<client_t>>*> std::atomic<int> promise_cnt_ = 0;
promise_queue; moodycamel::ConcurrentQueue<std::shared_ptr<promise_handler>> promise_queue_;
async_simple::Promise<async_simple::Unit> idle_timeout_waiter; async_simple::Promise<async_simple::Unit> idle_timeout_waiter;
std::string host_name_; std::string host_name_;
pool_config pool_config_; pool_config pool_config_;
@ -558,14 +519,6 @@ class client_pools {
iter->second = pool; iter->second = pool;
} }
} }
if (has_inserted) {
ELOG_DEBUG << "add new client pool of {" << host_name
<< "} to hash table";
}
else {
ELOG_DEBUG << "add new client pool of {" << host_name
<< "} failed, element existed.";
}
} }
return iter->second; return iter->second;
} }

View File

@ -129,9 +129,8 @@ class coro_rpc_client {
*/ */
coro_rpc_client(asio::io_context::executor_type executor, coro_rpc_client(asio::io_context::executor_type executor,
uint32_t client_id = 0) uint32_t client_id = 0)
: executor(executor), : control_(std::make_shared<control_t>(executor, false)),
timer_(executor), timer_(executor) {
socket_(std::make_shared<asio::ip::tcp::socket>(executor)) {
config_.client_id = client_id; config_.client_id = client_id;
} }
@ -142,10 +141,9 @@ class coro_rpc_client {
coro_rpc_client( coro_rpc_client(
coro_io::ExecutorWrapper<> &executor = *coro_io::get_global_executor(), coro_io::ExecutorWrapper<> &executor = *coro_io::get_global_executor(),
uint32_t client_id = 0) uint32_t client_id = 0)
: executor(executor.get_asio_executor()), : control_(
timer_(executor.get_asio_executor()), std::make_shared<control_t>(executor.get_asio_executor(), false)),
socket_(std::make_shared<asio::ip::tcp::socket>( timer_(executor.get_asio_executor()) {
executor.get_asio_executor())) {
config_.client_id = client_id; config_.client_id = client_id;
} }
@ -317,7 +315,7 @@ class coro_rpc_client {
} }
else { else {
#endif #endif
ret = co_await call_impl<func>(*socket_, std::move(args)...); ret = co_await call_impl<func>(control_->socket_, std::move(args)...);
#ifdef YLT_ENABLE_SSL #ifdef YLT_ENABLE_SSL
} }
#endif #endif
@ -325,7 +323,7 @@ class coro_rpc_client {
std::error_code err_code; std::error_code err_code;
timer_.cancel(err_code); timer_.cancel(err_code);
if (is_timeout_) { if (control_->is_timeout_) {
ret = rpc_result<R, coro_rpc_protocol>{ ret = rpc_result<R, coro_rpc_protocol>{
unexpect_t{}, rpc_error{errc::timed_out, "rpc call timed out"}}; unexpect_t{}, rpc_error{errc::timed_out, "rpc call timed out"}};
} }
@ -340,7 +338,7 @@ class coro_rpc_client {
/*! /*!
* Get inner executor * Get inner executor
*/ */
auto &get_executor() { return executor; } auto &get_executor() { return control_->executor_; }
uint32_t get_client_id() const { return config_.client_id; } uint32_t get_client_id() const { return config_.client_id; }
@ -350,7 +348,7 @@ class coro_rpc_client {
} }
has_closed_ = true; has_closed_ = true;
ELOGV(INFO, "client_id %d close", config_.client_id); ELOGV(INFO, "client_id %d close", config_.client_id);
close_socket(socket_); close_socket(control_);
} }
bool set_req_attachment(std::string_view attachment) { bool set_req_attachment(std::string_view attachment) {
@ -379,10 +377,10 @@ class coro_rpc_client {
}; };
void reset() { void reset() {
close_socket(socket_); close_socket(control_);
socket_ = control_->socket_ =
std::make_shared<asio::ip::tcp::socket>(executor.get_asio_executor()); asio::ip::tcp::socket(control_->executor_.get_asio_executor());
is_timeout_ = false; control_->is_timeout_ = false;
has_closed_ = false; has_closed_ = false;
} }
static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; } static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; }
@ -411,23 +409,23 @@ class coro_rpc_client {
}); });
std::error_code ec = co_await coro_io::async_connect( std::error_code ec = co_await coro_io::async_connect(
&executor, *socket_, config_.host, config_.port); &control_->executor_, control_->socket_, config_.host, config_.port);
std::error_code err_code; std::error_code err_code;
timer_.cancel(err_code); timer_.cancel(err_code);
if (ec) { if (ec) {
if (is_timeout_) { if (control_->is_timeout_) {
co_return errc::timed_out; co_return errc::timed_out;
} }
co_return errc::not_connected; co_return errc::not_connected;
} }
if (is_timeout_) { if (control_->is_timeout_) {
ELOGV(WARN, "client_id %d connect timeout", config_.client_id); ELOGV(WARN, "client_id %d connect timeout", config_.client_id);
co_return errc::timed_out; co_return errc::timed_out;
} }
socket_->set_option(asio::ip::tcp::no_delay(true), ec); control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec);
#ifdef YLT_ENABLE_SSL #ifdef YLT_ENABLE_SSL
if (!config_.ssl_cert_path.empty()) { if (!config_.ssl_cert_path.empty()) {
@ -465,7 +463,7 @@ class coro_rpc_client {
asio::ssl::host_name_verification(config_.ssl_domain)); asio::ssl::host_name_verification(config_.ssl_domain));
ssl_stream_ = ssl_stream_ =
std::make_unique<asio::ssl::stream<asio::ip::tcp::socket &>>( std::make_unique<asio::ssl::stream<asio::ip::tcp::socket &>>(
*socket_, ssl_ctx_); control_->socket_, ssl_ctx_);
ssl_init_ret_ = true; ssl_init_ret_ = true;
} catch (std::exception &e) { } catch (std::exception &e) {
ELOGV(ERROR, "init ssl failed: %s", e.what()); ELOGV(ERROR, "init ssl failed: %s", e.what());
@ -475,14 +473,17 @@ class coro_rpc_client {
#endif #endif
async_simple::coro::Lazy<bool> timeout(auto duration, std::string err_msg) { async_simple::coro::Lazy<bool> timeout(auto duration, std::string err_msg) {
timer_.expires_after(duration); timer_.expires_after(duration);
std::weak_ptr socket_watcher = control_;
bool is_timeout = co_await timer_.async_await(); bool is_timeout = co_await timer_.async_await();
if (!is_timeout) { if (!is_timeout) {
co_return false; co_return false;
} }
if (auto self = socket_watcher.lock()) {
is_timeout_ = is_timeout; self->is_timeout_ = is_timeout;
close_socket(socket_); close_socket(self);
co_return true; co_return true;
}
co_return false;
} }
template <auto func, typename... Args> template <auto func, typename... Args>
@ -581,7 +582,7 @@ class coro_rpc_client {
ret = co_await coro_io::async_write( ret = co_await coro_io::async_write(
socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN)); socket, asio::buffer(buffer.data(), coro_rpc_protocol::REQ_HEAD_LEN));
ELOGV(INFO, "client_id %d shutdown", config_.client_id); ELOGV(INFO, "client_id %d shutdown", config_.client_id);
socket_->shutdown(asio::ip::tcp::socket::shutdown_send); control_->socket_.shutdown(asio::ip::tcp::socket::shutdown_send);
r = rpc_result<R, coro_rpc_protocol>{ r = rpc_result<R, coro_rpc_protocol>{
unexpect_t{}, rpc_error{errc::io_error, ret.first.message()}}; unexpect_t{}, rpc_error{errc::io_error, ret.first.message()}};
co_return r; co_return r;
@ -656,10 +657,10 @@ class coro_rpc_client {
} }
#ifdef UNIT_TEST_INJECT #ifdef UNIT_TEST_INJECT
if (g_action == inject_action::force_inject_client_write_data_timeout) { if (g_action == inject_action::force_inject_client_write_data_timeout) {
is_timeout_ = true; control_->is_timeout_ = true;
} }
#endif #endif
if (is_timeout_) { if (control_->is_timeout_) {
r = rpc_result<R, coro_rpc_protocol>{ r = rpc_result<R, coro_rpc_protocol>{
unexpect_t{}, rpc_error{.code = errc::timed_out, .msg = {}}}; unexpect_t{}, rpc_error{.code = errc::timed_out, .msg = {}}};
} }
@ -789,13 +790,22 @@ class coro_rpc_client {
offset, std::forward<Args>(args)...); offset, std::forward<Args>(args)...);
} }
void close_socket(std::shared_ptr<asio::ip::tcp::socket> socket) { struct control_t {
asio::dispatch( asio::ip::tcp::socket socket_;
executor.get_asio_executor(), [socket = std::move(socket)]() { bool is_timeout_;
asio::error_code ignored_ec; coro_io::ExecutorWrapper<> executor_;
socket->shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec); control_t(asio::io_context::executor_type executor, bool is_timeout)
socket->close(ignored_ec); : socket_(executor), is_timeout_(is_timeout), executor_(executor) {}
}); };
static void close_socket(
std::shared_ptr<coro_rpc_client::control_t> control) {
control->executor_.schedule([control = std::move(control)]() {
asio::error_code ignored_ec;
control->socket_.shutdown(asio::ip::tcp::socket::shutdown_both,
ignored_ec);
control->socket_.close(ignored_ec);
});
} }
#ifdef UNIT_TEST_INJECT #ifdef UNIT_TEST_INJECT
@ -812,10 +822,10 @@ class coro_rpc_client {
call<func>(std::forward<Args>(args)...)); call<func>(std::forward<Args>(args)...));
} }
#endif #endif
private: private:
coro_io::ExecutorWrapper<> executor;
coro_io::period_timer timer_; coro_io::period_timer timer_;
std::shared_ptr<asio::ip::tcp::socket> socket_; std::shared_ptr<control_t> control_;
std::string read_buf_, resp_attachment_buf_; std::string read_buf_, resp_attachment_buf_;
std::string_view req_attachment_; std::string_view req_attachment_;
config config_; config config_;
@ -825,7 +835,6 @@ class coro_rpc_client {
std::unique_ptr<asio::ssl::stream<asio::ip::tcp::socket &>> ssl_stream_; std::unique_ptr<asio::ssl::stream<asio::ip::tcp::socket &>> ssl_stream_;
bool ssl_init_ret_ = true; bool ssl_init_ret_ = true;
#endif #endif
bool is_timeout_ = false;
std::atomic<bool> has_closed_ = false; std::atomic<bool> has_closed_ = false;
}; };
} // namespace coro_rpc } // namespace coro_rpc

View File

@ -197,6 +197,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}); });
} }
coro_io::ExecutorWrapper<> &get_executor() { return executor_wrapper_; }
#ifdef CINATRA_ENABLE_SSL #ifdef CINATRA_ENABLE_SSL
bool init_ssl(int verify_mode, const std::string &base_path, bool init_ssl(int verify_mode, const std::string &base_path,
const std::string &cert_file, const std::string &sni_hostname) { const std::string &cert_file, const std::string &sni_hostname) {

View File

@ -184,7 +184,7 @@ TEST_CASE("test reconnect retry wait time exclude reconnect cost time") {
CHECK(pool->free_client_count() == 100); CHECK(pool->free_client_count() == 100);
auto dur = std::chrono::steady_clock::now() - tp; auto dur = std::chrono::steady_clock::now() - tp;
std::cout << dur.count() << std::endl; std::cout << dur.count() << std::endl;
CHECK((dur >= 500ms && dur <= 800ms)); CHECK((dur >= 400ms && dur <= 800ms));
server.stop(); server.stop();
co_return; co_return;
}()); }());

View File

@ -29,6 +29,8 @@
#include <ylt/coro_io/client_pool.hpp> #include <ylt/coro_io/client_pool.hpp>
#include <ylt/coro_io/coro_io.hpp> #include <ylt/coro_io/coro_io.hpp>
#include <ylt/coro_rpc/coro_rpc_client.hpp> #include <ylt/coro_rpc/coro_rpc_client.hpp>
#include "ylt/easylog.hpp"
using namespace coro_rpc; using namespace coro_rpc;
using namespace async_simple::coro; using namespace async_simple::coro;
using namespace std::string_view_literals; using namespace std::string_view_literals;
@ -51,18 +53,18 @@ Lazy<void> call_echo(std::shared_ptr<coro_io::channel<coro_rpc_client>> channel,
[](coro_rpc_client &client, std::string_view hostname) -> Lazy<void> { [](coro_rpc_client &client, std::string_view hostname) -> Lazy<void> {
auto res = co_await client.call<echo>("Hello world!"); auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) { if (!res.has_value()) {
std::cout << "coro_rpc err: \n" << res.error().msg; ELOG_ERROR << "coro_rpc err: \n" << res.error().msg;
co_return; co_return;
} }
if (res.value() != "Hello world!"sv) { if (res.value() != "Hello world!"sv) {
std::cout << "err echo resp: \n" << res.value(); ELOG_ERROR << "err echo resp: \n" << res.value();
co_return; co_return;
} }
++qps; ++qps;
co_return; co_return;
}); });
if (!res) { if (!res) {
std::cout << "client pool err: connect failed.\n"; ELOG_ERROR << "client pool err: connect failed.\n";
} }
} }
--working_echo; --working_echo;

View File

@ -18,12 +18,20 @@
#include <chrono> #include <chrono>
#include <climits> #include <climits>
#include <cstdlib> #include <cstdlib>
#include <locale>
#include <memory> #include <memory>
#include <thread> #include <thread>
#include <ylt/coro_io/client_pool.hpp> #include <ylt/coro_io/client_pool.hpp>
#include <ylt/coro_io/coro_io.hpp> #include <ylt/coro_io/coro_io.hpp>
#include <ylt/coro_rpc/coro_rpc_client.hpp> #include <ylt/coro_rpc/coro_rpc_client.hpp>
#include "async_simple/coro/Collect.h"
#include "async_simple/coro/Lazy.h"
#include "async_simple/coro/SyncAwait.h"
#include "ylt/coro_io/io_context_pool.hpp"
#include "ylt/easylog.hpp"
std::string echo(std::string_view sv); std::string echo(std::string_view sv);
using namespace coro_rpc; using namespace coro_rpc;
using namespace async_simple::coro; using namespace async_simple::coro;
using namespace std::string_view_literals; using namespace std::string_view_literals;
@ -31,34 +39,52 @@ using namespace std::string_view_literals;
std::atomic<uint64_t> qps = 0; std::atomic<uint64_t> qps = 0;
std::atomic<uint64_t> working_echo = 0; std::atomic<uint64_t> working_echo = 0;
std::atomic<uint64_t> busy_echo = 0;
struct guard {
guard(std::atomic<uint64_t> &ref) : ref(ref) { ++ref; }
~guard() { --ref; }
std::atomic<uint64_t> &ref;
};
/*! /*!
* \example helloworld/concurrency_clients.main.cpp * \example helloworld/concurrency_clients.main.cpp
* \brief demo for run concurrency clients * \brief demo for run concurrency clients
*/ */
Lazy<void> call_echo(coro_io::client_pool<coro_rpc_client> &client_pool, Lazy<std::vector<std::chrono::microseconds>> call_echo(
int cnt) { coro_io::client_pool<coro_rpc_client> &client_pool, int cnt) {
std::vector<std::chrono::microseconds> result;
result.reserve(cnt);
++working_echo; ++working_echo;
for (int i = 0; i < cnt; ++i) { for (int i = 0; i < cnt; ++i) {
auto tp = std::chrono::steady_clock::now();
auto res = co_await client_pool.send_request( auto res = co_await client_pool.send_request(
[=](coro_rpc_client &client) -> Lazy<void> { [=](coro_rpc_client &client) -> Lazy<void> {
guard g{busy_echo};
if (client.has_closed()) {
co_return;
}
auto res = co_await client.call<echo>("Hello world!"); auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) { if (!res.has_value()) {
std::cout << "coro_rpc err: \n" << res.error().msg; ELOG_ERROR << "coro_rpc err: \n" << res.error().msg;
client.close();
co_return; co_return;
} }
if (res.value() != "Hello world!"sv) { if (res.value() != "Hello world!"sv) {
std::cout << "err echo resp: \n" << res.value(); ELOG_ERROR << "err echo resp: \n" << res.value();
co_return; co_return;
} }
++qps; ++qps;
co_return; co_return;
}); });
if (!res) { if (!res) {
std::cout << "client pool err: connect failed.\n"; ELOG_ERROR << "client pool err: connect failed.\n";
}
else {
result.push_back(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - tp));
} }
} }
--working_echo; co_return std::move(result);
} }
Lazy<void> qps_watcher(coro_io::client_pool<coro_rpc_client> &clients) { Lazy<void> qps_watcher(coro_io::client_pool<coro_rpc_client> &clients) {
@ -68,11 +94,23 @@ Lazy<void> qps_watcher(coro_io::client_pool<coro_rpc_client> &clients) {
uint64_t cnt = qps.exchange(0); uint64_t cnt = qps.exchange(0);
std::cout << "QPS:" << cnt std::cout << "QPS:" << cnt
<< " free connection: " << clients.free_client_count() << " free connection: " << clients.free_client_count()
<< " working echo:" << working_echo << std::endl; << " working echo:" << working_echo << " busy echo:" << busy_echo
<< std::endl;
cnt = 0; cnt = 0;
} }
} }
std::vector<std::chrono::microseconds> result;
void latency_watcher() {
std::sort(result.begin(), result.end());
auto arr = {0.1, 0.3, 0.5, 0.7, 0.9, 0.95, 0.99, 0.999, 0.9999, 0.99999, 1.0};
for (auto e : arr) {
std::cout
<< (e * 100) << "% request finished in:"
<< result[std::max<std::size_t>(0, result.size() * e - 1)].count() /
1000.0
<< "ms" << std::endl;
}
}
int main() { int main() {
auto thread_cnt = std::thread::hardware_concurrency(); auto thread_cnt = std::thread::hardware_concurrency();
auto client_pool = coro_io::client_pool<coro_rpc_client>::create( auto client_pool = coro_io::client_pool<coro_rpc_client>::create(
@ -81,11 +119,17 @@ int main() {
.max_connection = thread_cnt * 20, .max_connection = thread_cnt * 20,
.client_config = {.timeout_duration = std::chrono::seconds{50}}}); .client_config = {.timeout_duration = std::chrono::seconds{50}}});
auto finish_executor = coro_io::get_global_block_executor();
for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) { for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) {
call_echo(*client_pool, 10000).start([](auto &&) { call_echo(*client_pool, 10000).start([finish_executor](auto &&res) {
finish_executor->schedule([res = std::move(res.value())] {
result.insert(result.end(), res.begin(), res.end());
--working_echo;
});
}); });
} }
syncAwait(qps_watcher(*client_pool)); syncAwait(qps_watcher(*client_pool));
latency_watcher();
std::cout << "Done!" << std::endl; std::cout << "Done!" << std::endl;
return 0; return 0;
} }

View File

@ -48,18 +48,18 @@ Lazy<void> call_echo(coro_io::client_pools<coro_rpc_client> &client_pools,
[=](coro_rpc_client &client) -> Lazy<void> { [=](coro_rpc_client &client) -> Lazy<void> {
auto res = co_await client.call<echo>("Hello world!"); auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) { if (!res.has_value()) {
std::cout << "coro_rpc err: \n" << res.error().msg; ELOG_ERROR << "coro_rpc err: \n" << res.error().msg;
co_return; co_return;
} }
if (res.value() != "Hello world!"sv) { if (res.value() != "Hello world!"sv) {
std::cout << "err echo resp: \n" << res.value(); ELOG_ERROR << "err echo resp: \n" << res.value();
co_return; co_return;
} }
++qps; ++qps;
co_return; co_return;
}); });
if (!res) { if (!res) {
std::cout << "client pool err: connect failed.\n"; ELOG_ERROR << "client pool err: connect failed.\n";
} }
} }
--working_echo; --working_echo;

View File

@ -54,11 +54,11 @@ Lazy<void> call_echo(int cnt) {
for (int i = 0; i < cnt; ++i) { for (int i = 0; i < cnt; ++i) {
auto res = co_await client.call<echo>("Hello world!"); auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) { if (!res.has_value()) {
std::cout << "coro_rpc err: \n" << res.error().msg; ELOG_ERROR << "coro_rpc err: \n" << res.error().msg;
co_return; co_return;
} }
if (res.value() != "Hello world!"sv) { if (res.value() != "Hello world!"sv) {
std::cout << "err echo resp: \n" << res.value(); ELOG_ERROR << "err echo resp: \n" << res.value();
co_return; co_return;
} }
++qps; ++qps;

View File

@ -72,9 +72,9 @@ TEST_CASE("testing client") {
std::string port = std::to_string(coro_rpc_server_port); std::string port = std::to_string(coro_rpc_server_port);
asio::io_context io_context; asio::io_context io_context;
std::promise<void> promise; std::promise<void> promise;
auto worker = std::make_unique<asio::io_context::work>(io_context);
auto future = promise.get_future(); auto future = promise.get_future();
std::thread thd([&io_context, &promise] { std::thread thd([&io_context, &promise] {
asio::io_context::work work(io_context);
promise.set_value(); promise.set_value();
io_context.run(); io_context.run();
}); });
@ -116,7 +116,7 @@ TEST_CASE("testing client") {
g_action = {}; g_action = {};
auto f = [&io_context, &port]() -> Lazy<void> { auto f = [&io_context, &port]() -> Lazy<void> {
auto client = co_await create_client(io_context, port); auto client = co_await create_client(io_context, port);
auto ret = co_await client->template call_for<hello_timeout>(20ms); auto ret = co_await client->template call_for<hello_timeout>(10ms);
CHECK_MESSAGE(ret.error().code == coro_rpc::errc::timed_out, CHECK_MESSAGE(ret.error().code == coro_rpc::errc::timed_out,
ret.error().msg); ret.error().msg);
co_return; co_return;
@ -154,7 +154,7 @@ TEST_CASE("testing client") {
} }
server.stop(); server.stop();
io_context.stop(); worker = nullptr;
thd.join(); thd.join();
} }
@ -163,8 +163,8 @@ TEST_CASE("testing client with inject server") {
std::string port = std::to_string(coro_rpc_server_port); std::string port = std::to_string(coro_rpc_server_port);
ELOGV(INFO, "inject server port: %d", port.data()); ELOGV(INFO, "inject server port: %d", port.data());
asio::io_context io_context; asio::io_context io_context;
auto worker = std::make_unique<asio::io_context::work>(io_context);
std::thread thd([&io_context] { std::thread thd([&io_context] {
asio::io_context::work work(io_context);
io_context.run(); io_context.run();
}); });
coro_rpc_server server(2, coro_rpc_server_port); coro_rpc_server server(2, coro_rpc_server_port);
@ -212,7 +212,7 @@ TEST_CASE("testing client with inject server") {
} }
server.stop(); server.stop();
io_context.stop(); worker = nullptr;
thd.join(); thd.join();
g_action = inject_action::nothing; g_action = inject_action::nothing;
} }
@ -245,15 +245,15 @@ class SSLClientTester {
std::promise<void> promise; std::promise<void> promise;
auto future = promise.get_future(); auto future = promise.get_future();
worker = std::make_unique<asio::io_context::work>(io_context);
thd = std::thread([this, &promise] { thd = std::thread([this, &promise] {
asio::io_context::work work(io_context);
promise.set_value(); promise.set_value();
io_context.run(); io_context.run();
}); });
future.wait(); future.wait();
} }
~SSLClientTester() { ~SSLClientTester() {
io_context.stop(); worker = nullptr;
thd.join(); thd.join();
} }
void inject(std::string msg, std::string& path, ssl_type type) { void inject(std::string msg, std::string& path, ssl_type type) {
@ -342,6 +342,7 @@ class SSLClientTester {
ssl_type dh; ssl_type dh;
asio::io_context io_context; asio::io_context io_context;
std::thread thd; std::thread thd;
std::unique_ptr<asio::io_context::work> worker;
}; };
TEST_CASE("testing client with ssl server") { TEST_CASE("testing client with ssl server") {