[coro_rpc]rpc client support send_request without wait for response (#672)

This commit is contained in:
saipubw 2024-05-17 17:59:22 +08:00 committed by GitHub
parent 6e684c01ef
commit a41f755d8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 903 additions and 489 deletions

View File

@ -78,11 +78,11 @@ class client_pool : public std::enable_shared_from_this<
break;
}
while (true) {
ELOG_DEBUG << "start collect timeout client of pool{"
ELOG_TRACE << "start collect timeout client of pool{"
<< self->host_name_
<< "}, now client count: " << clients.size();
std::size_t is_all_cleared = clients.clear_old(clear_cnt);
ELOG_DEBUG << "finish collect timeout client of pool{"
ELOG_TRACE << "finish collect timeout client of pool{"
<< self->host_name_
<< "}, now client cnt: " << clients.size();
if (is_all_cleared != 0) [[unlikely]] {
@ -109,36 +109,42 @@ class client_pool : public std::enable_shared_from_this<
static auto rand_time(std::chrono::milliseconds ms) {
static thread_local std::default_random_engine r;
std::uniform_real_distribution e(0.7f, 1.3f);
std::uniform_real_distribution e(1.0f, 1.2f);
return std::chrono::milliseconds{static_cast<long>(e(r) * ms.count())};
}
async_simple::coro::Lazy<void> reconnect(std::unique_ptr<client_t>& client) {
static async_simple::coro::Lazy<void> reconnect(
std::unique_ptr<client_t>& client, std::weak_ptr<client_pool> watcher) {
using namespace std::chrono_literals;
for (unsigned int i = 0; i < pool_config_.connect_retry_count; ++i) {
ELOG_DEBUG << "try to reconnect client{" << client.get() << "},host:{"
std::shared_ptr<client_pool> self = watcher.lock();
uint32_t i = UINT32_MAX; // (at least connect once)
do {
ELOG_TRACE << "try to reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "}, try count:" << i
<< "max retry limit:" << pool_config_.connect_retry_count;
<< "}, try count:" << i << "max retry limit:"
<< self->pool_config_.connect_retry_count;
auto pre_time_point = std::chrono::steady_clock::now();
bool ok = client_t::is_ok(co_await client->reconnect(host_name_));
bool ok = client_t::is_ok(co_await client->connect(self->host_name_));
auto post_time_point = std::chrono::steady_clock::now();
auto cost_time = post_time_point - pre_time_point;
ELOG_DEBUG << "reconnect client{" << client.get()
ELOG_TRACE << "reconnect client{" << client.get()
<< "} cost time: " << cost_time / std::chrono::milliseconds{1}
<< "ms";
if (ok) {
ELOG_DEBUG << "reconnect client{" << client.get() << "} success";
ELOG_TRACE << "reconnect client{" << client.get() << "} success";
co_return;
}
ELOG_DEBUG << "reconnect client{" << client.get()
ELOG_TRACE << "reconnect client{" << client.get()
<< "} failed. If client close:{" << client->has_closed()
<< "}";
auto wait_time = rand_time(
(pool_config_.reconnect_wait_time * (i + 1) - cost_time) / 1ms * 1ms);
(self->pool_config_.reconnect_wait_time - cost_time) / 1ms * 1ms);
self = nullptr;
if (wait_time.count() > 0)
co_await coro_io::sleep_for(wait_time, &client->get_executor());
}
self = watcher.lock();
++i;
} while (i < self->pool_config_.connect_retry_count);
ELOG_WARN << "reconnect client{" << client.get() << "},host:{"
<< client->get_host() << ":" << client->get_port()
<< "} out of max limit, stop retry. connect failed";
@ -150,30 +156,23 @@ class client_pool : public std::enable_shared_from_this<
async_simple::Promise<std::unique_ptr<client_t>> promise_;
};
async_simple::coro::Lazy<void> connect_client(
static async_simple::coro::Lazy<void> connect_client(
std::unique_ptr<client_t> client, std::weak_ptr<client_pool> watcher,
std::shared_ptr<promise_handler> handler) {
ELOG_DEBUG << "try to connect client{" << client.get()
<< "} to host:" << host_name_;
auto result = co_await client->connect(host_name_);
std::shared_ptr<client_pool> self = watcher.lock();
if (!client_t::is_ok(result)) {
ELOG_DEBUG << "connect client{" << client.get() << "} to failed. ";
if (self) {
co_await reconnect(client);
}
}
if (client) {
ELOG_DEBUG << "connect client{" << client.get() << "} successful!";
}
co_await reconnect(client, watcher);
auto has_get_connect = handler->flag_.exchange(true);
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
}
else {
auto conn_lim = std::min<unsigned>(10u, pool_config_.max_connection);
if (self && free_clients_.size() < conn_lim && client) {
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
if (client) {
auto self = watcher.lock();
auto conn_lim =
std::min<unsigned>(10u, self->pool_config_.max_connection);
if (self && self->free_clients_.size() < conn_lim) {
self->enqueue(self->free_clients_, std::move(client),
self->pool_config_.idle_timeout);
}
}
}
}
@ -226,7 +225,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
});
ELOG_DEBUG << "wait client by promise {" << &handler->promise_ << "}";
ELOG_TRACE << "wait client by promise {" << &handler->promise_ << "}";
client = co_await handler->promise_.getFuture();
if (client) {
executor->schedule([timer] {
@ -236,7 +235,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
else {
ELOG_DEBUG << "get free client{" << client.get() << "}. from queue";
ELOG_TRACE << "get free client{" << client.get() << "}. from queue";
}
co_return std::move(client);
}
@ -248,7 +247,7 @@ class client_pool : public std::enable_shared_from_this<
if (clients.enqueue(std::move(client)) == 1) {
std::size_t expected = 0;
if (clients.collecter_cnt_.compare_exchange_strong(expected, 1)) {
ELOG_DEBUG << "start timeout client collecter of client_pool{"
ELOG_TRACE << "start timeout client collecter of client_pool{"
<< host_name_ << "}";
collect_idle_timeout_client(
this->weak_from_this(), clients,
@ -272,7 +271,7 @@ class client_pool : public std::enable_shared_from_this<
if (!has_get_connect) {
handler->promise_.setValue(std::move(client));
promise_cnt_ -= cnt;
ELOG_DEBUG << "collect free client{" << client.get()
ELOG_TRACE << "collect free client{" << client.get()
<< "} and wake up promise{" << &handler->promise_ << "}";
return;
}
@ -282,12 +281,12 @@ class client_pool : public std::enable_shared_from_this<
if (free_clients_.size() < pool_config_.max_connection) {
if (client) {
ELOG_DEBUG << "collect free client{" << client.get() << "} enqueue";
ELOG_TRACE << "collect free client{" << client.get() << "} enqueue";
enqueue(free_clients_, std::move(client), pool_config_.idle_timeout);
}
}
else {
ELOG_DEBUG << "out of max connection limit <<"
ELOG_TRACE << "out of max connection limit <<"
<< pool_config_.max_connection << ", collect free client{"
<< client.get() << "} enqueue short connect queue";
enqueue(short_connect_clients_, std::move(client),
@ -295,7 +294,7 @@ class client_pool : public std::enable_shared_from_this<
}
}
else {
ELOG_DEBUG << "client{" << client.get()
ELOG_TRACE << "client{" << client.get()
<< "} is closed. we won't collect it";
}

View File

@ -317,7 +317,7 @@ inline async_simple::coro::Lazy<void> sleep_for(Duration d) {
template <typename R, typename Func, typename Executor>
struct post_helper {
void operator()(auto handler) {
asio::dispatch(e, [this, handler]() {
asio::post(e, [this, handler]() {
try {
if constexpr (std::is_same_v<R, async_simple::Try<void>>) {
func();

View File

@ -75,7 +75,7 @@ class ExecutorWrapper : public async_simple::Executor {
context_t &context() { return executor_.context(); }
auto get_asio_executor() { return executor_; }
auto get_asio_executor() const { return executor_; }
operator ExecutorImpl() { return executor_; }

File diff suppressed because it is too large Load Diff

View File

@ -35,6 +35,7 @@ enum class errc : uint16_t {
message_too_large,
server_has_ran,
invalid_rpc_result,
serial_number_conflict,
};
inline constexpr std::string_view make_error_message(errc ec) noexcept {
switch (ec) {
@ -70,6 +71,8 @@ inline constexpr std::string_view make_error_message(errc ec) noexcept {
return "server has ran";
case errc::invalid_rpc_result:
return "invalid rpc result";
case errc::serial_number_conflict:
return "serial number conflict";
default:
return "unknown user-defined error";
}
@ -103,8 +106,14 @@ inline bool operator!(errc ec) noexcept { return ec == errc::ok; }
struct rpc_error {
coro_rpc::err_code code; //!< error code
std::string msg; //!< error message
rpc_error() {}
rpc_error(coro_rpc::err_code code, std::string_view msg)
: code(code), msg(std::string{msg}) {}
rpc_error(coro_rpc::err_code code)
: code(code), msg(std::string{make_error_message(code)}) {}
uint16_t& val() { return *(uint16_t*)&(code.ec); }
const uint16_t& val() const { return *(uint16_t*)&(code.ec); }
constexpr operator bool() const noexcept { return code; }
};
STRUCT_PACK_REFL(rpc_error, val(), msg);

View File

@ -48,8 +48,7 @@ namespace protocol {
struct coro_rpc_protocol;
}
template <typename T,
typename rpc_protocol = coro_rpc::protocol::coro_rpc_protocol>
using rpc_result = expected<T, rpc_error>;
template <typename T>
using rpc_result = expected<T, coro_rpc::rpc_error>;
} // namespace coro_rpc

View File

@ -284,6 +284,12 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
// only make socket connet(or handshake) to the host
async_simple::coro::Lazy<resp_data> connect(std::string uri) {
if (should_reset_) {
reset();
}
else {
should_reset_ = false;
}
resp_data data{};
bool no_schema = !has_schema(uri);
std::string append_uri;
@ -896,11 +902,6 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
resp_chunk_str_.clear();
}
async_simple::coro::Lazy<resp_data> reconnect(std::string uri) {
reset();
co_return co_await connect(std::move(uri));
}
std::string_view get_host() { return host_; }
std::string_view get_port() { return port_; }
@ -2119,6 +2120,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::string redirect_uri_;
bool enable_follow_redirect_ = false;
bool enable_timeout_ = false;
bool should_reset_ = false;
std::chrono::steady_clock::duration conn_timeout_duration_ =
std::chrono::seconds(8);
std::chrono::steady_clock::duration req_timeout_duration_ =

View File

@ -156,9 +156,9 @@ TEST_CASE("test reconnect") {
struct mock_client : public coro_rpc::coro_rpc_client {
using coro_rpc::coro_rpc_client::coro_rpc_client;
async_simple::coro::Lazy<coro_rpc::errc> reconnect(
async_simple::coro::Lazy<coro_rpc::errc> connect(
const std::string &hostname) {
auto ec = co_await this->coro_rpc::coro_rpc_client::reconnect(hostname);
auto ec = co_await this->coro_rpc::coro_rpc_client::connect(hostname);
if (ec) {
co_await coro_io::sleep_for(300ms);
}
@ -184,7 +184,7 @@ TEST_CASE("test reconnect retry wait time exclude reconnect cost time") {
CHECK(pool->free_client_count() == 100);
auto dur = std::chrono::steady_clock::now() - tp;
std::cout << dur.count() << std::endl;
CHECK((dur >= 400ms && dur <= 800ms));
CHECK((dur >= 500ms && dur <= 799ms));
server.stop();
co_return;
}());

View File

@ -25,11 +25,16 @@
#include <climits>
#include <cstdlib>
#include <memory>
#include <system_error>
#include <thread>
#include <ylt/coro_io/client_pool.hpp>
#include <ylt/coro_io/coro_io.hpp>
#include <ylt/coro_rpc/coro_rpc_client.hpp>
#include "ylt/coro_io/io_context_pool.hpp"
#include "ylt/coro_rpc/impl/coro_rpc_client.hpp"
#include "ylt/coro_rpc/impl/errno.h"
#include "ylt/coro_rpc/impl/expected.hpp"
#include "ylt/easylog.hpp"
using namespace coro_rpc;
using namespace async_simple::coro;
@ -39,37 +44,74 @@ std::string echo(std::string_view sv);
std::atomic<uint64_t> qps = 0;
std::atomic<uint64_t> working_echo = 0;
/*!
* \example helloworld/concurrency_clients.main.cpp
* \brief demo for run concurrency clients
*/
Lazy<void> call_echo(std::shared_ptr<coro_io::channel<coro_rpc_client>> channel,
int cnt) {
while (true) {
++working_echo;
for (int i = 0; i < cnt; ++i) {
auto res = co_await channel->send_request(
[](coro_rpc_client &client, std::string_view hostname) -> Lazy<void> {
auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) {
ELOG_ERROR << "coro_rpc err: \n" << res.error().msg;
co_return;
}
if (res.value() != "Hello world!"sv) {
ELOG_ERROR << "err echo resp: \n" << res.value();
co_return;
}
++qps;
co_return;
});
if (!res) {
ELOG_ERROR << "client pool err: connect failed.\n";
}
int request_cnt = 10000;
// Lazy<std::vector<std::chrono::microseconds>>
// call_echo(std::shared_ptr<coro_io::channel<coro_rpc_client>> channel) {
// std::vector<std::chrono::microseconds> result;
// result.reserve(request_cnt);
// auto tp = std::chrono::steady_clock::now();
// ++working_echo;
// for (int i = 0; i < request_cnt; ++i) {
// auto res = co_await channel->send_request(
// [](coro_rpc_client &client, std::string_view hostname) -> Lazy<void>
// {
// auto res = co_await client.call<echo>("Hello world!");
// if (!res.has_value()) {
// ELOG_ERROR << "coro_rpc err: \n" << res.error().msg;
// co_return;
// }
// if (res.value() != "Hello world!"sv) {
// ELOG_ERROR << "err echo resp: \n" << res.value();
// co_return;
// }
// co_return;
// });
// if (!res) {
// ELOG_ERROR << "client pool err: connect failed.\n";
// break;
// }
// ++qps;
// auto old_tp=tp;
// tp= std::chrono::steady_clock::now();
// result.push_back(std::chrono::duration_cast<std::chrono::microseconds>(
// tp - old_tp));
// }
// co_return std::move(result);
// }
Lazy<std::vector<std::chrono::microseconds>> call_echo(
std::shared_ptr<coro_io::channel<coro_rpc_client>> channel) {
std::vector<std::chrono::microseconds> result;
result.reserve(request_cnt);
auto tp = std::chrono::steady_clock::now();
++working_echo;
for (int i = 0; i < request_cnt; ++i) {
auto res = co_await channel->send_request(
[](coro_rpc_client &client, std::string_view hostname) {
return client.send_request<echo>("Hello world!");
});
if (!res) {
ELOG_ERROR << "client pool err: connect failed.\n"
<< std::make_error_code(res.error());
break;
}
--working_echo;
co_await coro_io::sleep_for(30s);
auto rpc_result = co_await res.value();
if (!rpc_result) {
ELOG_ERROR << "recv response failed\n" << rpc_result.error().msg;
break;
}
if (rpc_result->result() != "Hello world!") {
ELOG_ERROR << "error rpc reponse\n" << rpc_result->result();
}
++qps;
auto old_tp = tp;
tp = std::chrono::steady_clock::now();
result.push_back(
std::chrono::duration_cast<std::chrono::microseconds>(tp - old_tp));
}
co_return std::move(result);
}
Lazy<void> qps_watcher() {
@ -87,6 +129,19 @@ Lazy<void> qps_watcher() {
}
}
std::vector<std::chrono::microseconds> result;
void latency_watcher() {
std::sort(result.begin(), result.end());
auto arr = {0.1, 0.3, 0.5, 0.7, 0.9, 0.95, 0.99, 0.999, 0.9999, 0.99999, 1.0};
for (auto e : arr) {
std::cout
<< (e * 100) << "% request finished in:"
<< result[std::max<std::size_t>(0, result.size() * e - 1)].count() /
1000.0
<< "ms" << std::endl;
}
}
int main() {
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "localhost:8801"};
@ -95,11 +150,17 @@ int main() {
hosts, coro_io::channel<coro_rpc_client>::channel_config{
.pool_config{.max_connection = worker_cnt}});
auto chan_ptr = std::make_shared<decltype(chan)>(std::move(chan));
auto executor = coro_io::get_global_block_executor();
for (int i = 0; i < worker_cnt; ++i) {
call_echo(chan_ptr, 10000).start([](auto &&) {
call_echo(chan_ptr).start([=](auto &&res) {
executor->schedule([res = std::move(res.value())]() mutable {
result.insert(result.end(), res.begin(), res.end());
--working_echo;
});
});
}
syncAwait(qps_watcher());
latency_watcher();
std::cout << "Done!" << std::endl;
return 0;
}

View File

@ -17,9 +17,13 @@
#include <atomic>
#include <chrono>
#include <climits>
#include <cstdint>
#include <cstdlib>
#include <locale>
#include <memory>
#include <mutex>
#include <optional>
#include <system_error>
#include <thread>
#include <ylt/coro_io/client_pool.hpp>
#include <ylt/coro_io/coro_io.hpp>
@ -27,77 +31,72 @@
#include "async_simple/coro/Collect.h"
#include "async_simple/coro/Lazy.h"
#include "async_simple/coro/Mutex.h"
#include "async_simple/coro/SyncAwait.h"
#include "ylt/coro_io/io_context_pool.hpp"
#include "ylt/coro_rpc/impl/coro_rpc_client.hpp"
#include "ylt/coro_rpc/impl/errno.h"
#include "ylt/coro_rpc/impl/expected.hpp"
#include "ylt/easylog.hpp"
#include "ylt/easylog/record.hpp"
std::string echo(std::string_view sv);
constexpr unsigned thread_cnt = 1920;
constexpr auto request_cnt = 1000;
using namespace coro_rpc;
using namespace async_simple::coro;
using namespace std::string_view_literals;
std::atomic<uint64_t> qps = 0;
auto finish_executor = coro_io::get_global_block_executor();
std::atomic<uint64_t> working_echo = 0;
std::atomic<uint64_t> busy_echo = 0;
struct guard {
guard(std::atomic<uint64_t> &ref) : ref(ref) { ++ref; }
~guard() { --ref; }
std::atomic<uint64_t> &ref;
};
/*!
* \example helloworld/concurrency_clients.main.cpp
* \brief demo for run concurrency clients
*/
Lazy<std::vector<std::chrono::microseconds>> call_echo(
coro_io::client_pool<coro_rpc_client> &client_pool, int cnt) {
std::vector<std::chrono::microseconds> result;
result.reserve(cnt);
auto client_pool =
coro_io::client_pool<coro_rpc_client>::create("127.0.0.1:8801");
std::atomic<int32_t> qps = 0;
Lazy<std::vector<std::chrono::microseconds>> send() {
std::vector<std::chrono::microseconds> latencys;
latencys.reserve(request_cnt);
auto tp = std::chrono::steady_clock::now();
++working_echo;
for (int i = 0; i < cnt; ++i) {
auto tp = std::chrono::steady_clock::now();
auto res = co_await client_pool.send_request(
[=](coro_rpc_client &client) -> Lazy<void> {
guard g{busy_echo};
if (client.has_closed()) {
co_return;
}
auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) {
ELOG_ERROR << "coro_rpc err: \n" << res.error().msg;
client.close();
co_return;
}
if (res.value() != "Hello world!"sv) {
ELOG_ERROR << "err echo resp: \n" << res.value();
co_return;
}
++qps;
co_return;
int id = 0;
for (int i = 0; i < request_cnt; ++i) {
auto result = co_await client_pool->send_request(
[](coro_rpc_client& client) -> Lazy<coro_rpc::rpc_result<std::string>> {
co_return co_await client.call<echo>("Hello world!");
});
if (!res) {
ELOG_ERROR << "client pool err: connect failed.\n";
if (!result) {
ELOG_ERROR << "get client form client pool failed: \n"
<< std::make_error_code(result.error()).message();
continue;
}
else {
result.push_back(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now() - tp));
auto& call_result = result.value();
if (!call_result) {
ELOG_ERROR << "call err: \n" << call_result.error().msg;
break;
}
qps.fetch_add(1, std::memory_order::release);
auto old_tp = tp;
tp = std::chrono::steady_clock::now();
latencys.push_back(
std::chrono::duration_cast<std::chrono::microseconds>(tp - old_tp));
}
co_return std::move(result);
co_return std::move(latencys);
}
Lazy<void> qps_watcher(coro_io::client_pool<coro_rpc_client> &clients) {
Lazy<void> qps_watcher() {
using namespace std::chrono_literals;
while (working_echo > 0) {
do {
co_await coro_io::sleep_for(1s);
uint64_t cnt = qps.exchange(0);
std::cout << "QPS:" << cnt
<< " free connection: " << clients.free_client_count()
<< " working echo:" << working_echo << " busy echo:" << busy_echo
<< std::endl;
cnt = 0;
}
<< " free connection: " << client_pool->free_client_count()
<< " working echo:" << working_echo << std::endl;
} while (working_echo > 0);
}
std::vector<std::chrono::microseconds> result;
void latency_watcher() {
@ -112,23 +111,18 @@ void latency_watcher() {
}
}
int main() {
auto thread_cnt = std::thread::hardware_concurrency();
auto client_pool = coro_io::client_pool<coro_rpc_client>::create(
"localhost:8801",
coro_io::client_pool<coro_rpc_client>::pool_config{
.max_connection = thread_cnt * 20,
.client_config = {.timeout_duration = std::chrono::seconds{50}}});
auto finish_executor = coro_io::get_global_block_executor();
for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) {
call_echo(*client_pool, 10000).start([finish_executor](auto &&res) {
finish_executor->schedule([res = std::move(res.value())] {
result.insert(result.end(), res.begin(), res.end());
--working_echo;
auto executor = coro_io::get_global_block_executor();
for (int i = 0, lim = thread_cnt * 10; i < lim; ++i) {
coro_io::get_global_executor()->schedule([=]() {
send().start([executor](auto&& res) {
executor->schedule([res = std::move(res.value())]() mutable {
result.insert(result.end(), res.begin(), res.end());
--working_echo;
});
});
});
}
syncAwait(qps_watcher(*client_pool));
syncAwait(qps_watcher());
latency_watcher();
std::cout << "Done!" << std::endl;
return 0;

View File

@ -26,6 +26,8 @@
#include <ylt/coro_io/client_pool.hpp>
#include <ylt/coro_io/coro_io.hpp>
#include <ylt/coro_rpc/coro_rpc_client.hpp>
#include "ylt/coro_io/io_context_pool.hpp"
std::string echo(std::string_view sv);
using namespace coro_rpc;
using namespace async_simple::coro;
@ -39,10 +41,15 @@ std::atomic<uint64_t> working_echo = 0;
* \brief demo for run concurrency clients
*/
Lazy<void> call_echo(coro_io::client_pools<coro_rpc_client> &client_pools,
int cnt) {
int request_cnt = 10000;
Lazy<std::vector<std::chrono::microseconds>> call_echo(
coro_io::client_pools<coro_rpc_client> &client_pools) {
++working_echo;
for (int i = 0; i < cnt; ++i) {
std::vector<std::chrono::microseconds> result;
result.reserve(request_cnt);
auto tp = std::chrono::steady_clock::now();
for (int i = 0; i < request_cnt; ++i) {
auto res = co_await client_pools.send_request(
i % 2 ? "localhost:8801" : "127.0.0.1:8801",
[=](coro_rpc_client &client) -> Lazy<void> {
@ -60,9 +67,14 @@ Lazy<void> call_echo(coro_io::client_pools<coro_rpc_client> &client_pools,
});
if (!res) {
ELOG_ERROR << "client pool err: connect failed.\n";
break;
}
auto old_tp = tp;
tp = std::chrono::steady_clock::now();
result.push_back(
std::chrono::duration_cast<std::chrono::microseconds>(tp - old_tp));
}
--working_echo;
co_return std::move(result);
}
Lazy<void> qps_watcher(coro_io::client_pools<coro_rpc_client> &clients) {
@ -78,15 +90,32 @@ Lazy<void> qps_watcher(coro_io::client_pools<coro_rpc_client> &clients) {
cnt = 0;
}
}
std::vector<std::chrono::microseconds> result;
void latency_watcher() {
std::sort(result.begin(), result.end());
auto arr = {0.1, 0.3, 0.5, 0.7, 0.9, 0.95, 0.99, 0.999, 0.9999, 0.99999, 1.0};
for (auto e : arr) {
std::cout
<< (e * 100) << "% request finished in:"
<< result[std::max<std::size_t>(0, result.size() * e - 1)].count() /
1000.0
<< "ms" << std::endl;
}
}
int main() {
auto thread_cnt = std::thread::hardware_concurrency();
auto &clients = coro_io::g_clients_pool<coro_rpc_client>();
auto executor = coro_io::get_global_block_executor();
for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) {
call_echo(clients, 10000).start([](auto &&) {
call_echo(clients).start([=](auto &&res) {
executor->schedule([res = std::move(res.value())]() mutable {
result.insert(result.end(), res.begin(), res.end());
--working_echo;
});
});
}
syncAwait(qps_watcher(clients));
latency_watcher();
std::cout << "Done!" << std::endl;
return 0;
}

View File

@ -13,80 +13,114 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <async_simple/coro/Lazy.h>
#include <async_simple/coro/Sleep.h>
#include <asio/io_context.hpp>
#include <asio/steady_timer.hpp>
#include <atomic>
#include <chrono>
#include <climits>
#include <cstdint>
#include <cstdlib>
#include <system_error>
#include <locale>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <ylt/coro_io/client_pool.hpp>
#include <ylt/coro_io/coro_io.hpp>
#include <ylt/coro_rpc/coro_rpc_client.hpp>
#include "async_simple/coro/Collect.h"
#include "async_simple/coro/Lazy.h"
#include "async_simple/coro/Mutex.h"
#include "async_simple/coro/SyncAwait.h"
#include "ylt/coro_io/io_context_pool.hpp"
#include "ylt/coro_rpc/impl/coro_rpc_client.hpp"
#include "ylt/coro_rpc/impl/errno.h"
#include "ylt/coro_rpc/impl/expected.hpp"
#include "ylt/easylog.hpp"
std::string echo(std::string_view sv);
constexpr auto thread_cnt = 96 * 20;
using namespace coro_rpc;
using namespace async_simple::coro;
using namespace std::string_view_literals;
using namespace std::chrono_literals;
std::atomic<uint64_t> qps = 0;
auto finish_executor = coro_io::get_global_block_executor();
std::atomic<uint64_t> working_echo = 0;
/*!
* \example helloworld/concurrency_clients.main.cpp
* \brief demo for run concurrency clients
*/
Lazy<void> call_echo(int cnt) {
std::vector<std::unique_ptr<coro_rpc_client>> clients;
std::atomic<uint64_t>& get_qps(int id) {
static std::atomic<uint64_t> ar[thread_cnt * 8];
return ar[id * 8];
}
Lazy<std::vector<std::chrono::microseconds>> send(int id, int cnt) {
std::vector<std::chrono::microseconds> result;
auto& cli = *clients[id];
auto& qps = get_qps(id);
result.reserve(cnt);
++working_echo;
coro_rpc_client client;
auto ec = co_await client.connect("localhost:8801");
for (int i = 0; i < 3 && !ec; ++i) {
co_await coro_io::sleep_for(rand() % 10000 * 1ms);
ec = co_await client.reconnect("localhost:8801");
}
if (!ec) {
for (int i = 0; i < cnt; ++i) {
auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) {
ELOG_ERROR << "coro_rpc err: \n" << res.error().msg;
co_return;
}
if (res.value() != "Hello world!"sv) {
ELOG_ERROR << "err echo resp: \n" << res.value();
co_return;
}
++qps;
auto tp = std::chrono::steady_clock::now();
for (int i = 0; i < cnt; ++i) {
auto res_ = co_await cli.call<echo>("Hello world!");
if (!res_.has_value()) {
ELOG_ERROR << "coro_rpc err: \n" << res_.error().msg;
continue;
}
}
else {
std::cout << "connect failed \n";
++qps;
auto old_tp = tp;
tp = std::chrono::steady_clock::now();
result.push_back(
std::chrono::duration_cast<std::chrono::microseconds>(tp - old_tp));
}
--working_echo;
co_return std::move(result);
}
Lazy<void> qps_watcher() {
using namespace std::chrono_literals;
while (working_echo > 0) {
do {
co_await coro_io::sleep_for(1s);
uint64_t cnt = qps.exchange(0);
std::cout << "QPS:" << cnt << " working echo:" << working_echo << std::endl;
uint64_t cnt = 0;
for (int i = 0; i < thread_cnt; ++i) {
auto& qps = get_qps(i);
uint64_t tmp = qps.exchange(0);
cnt += tmp;
}
std::cout << "QPS:"
<< cnt
// << " free connection: " << clients.free_client_count()
<< " working echo:" << working_echo << std::endl;
cnt = 0;
} while (working_echo > 0);
}
std::vector<std::chrono::microseconds> result;
void latency_watcher() {
std::sort(result.begin(), result.end());
auto arr = {0.1, 0.3, 0.5, 0.7, 0.9, 0.95, 0.99, 0.999, 0.9999, 0.99999, 1.0};
for (auto e : arr) {
std::cout
<< (e * 100) << "% request finished in:"
<< result[std::max<std::size_t>(0, result.size() * e - 1)].count() /
1000.0
<< "ms" << std::endl;
}
}
int main() {
auto thread_cnt = std::thread::hardware_concurrency();
for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) {
call_echo(100000).start([](auto &&) {
for (int i = 0; i < thread_cnt; ++i) {
clients.emplace_back(std::make_unique<coro_rpc_client>(
coro_io::get_global_executor()->get_asio_executor()));
syncAwait(clients.back()->connect("localhost:8801"));
}
for (int i = 0, lim = thread_cnt; i < lim; ++i) {
send(i, 20000).via(&clients[i]->get_executor()).start([](auto&& res) {
finish_executor->schedule([res = std::move(res.value())] {
result.insert(result.end(), res.begin(), res.end());
});
});
}
syncAwait(qps_watcher());
latency_watcher();
std::cout << "Done!" << std::endl;
return 0;
}

View File

@ -102,9 +102,11 @@ Lazy<std::string_view> nested_echo(std::string_view sv) {
coro_io::g_clients_pool<coro_rpc::coro_rpc_client>().at("127.0.0.1:8802");
assert(client != nullptr);
ELOGV(INFO, "connect another server");
auto ret = co_await client->send_request([sv](coro_rpc_client &client) {
return client.call<echo>(sv);
});
auto ret = co_await client->send_request(
[sv](coro_rpc_client &client)
-> Lazy<coro_rpc::rpc_result<std::string_view>> {
co_return co_await client.call<echo>(sv);
});
co_return ret.value().value();
}

View File

@ -56,6 +56,8 @@ Lazy<std::shared_ptr<coro_rpc_client>> create_client(
co_return client;
}
void show(auto& s) { return; }
TEST_CASE("testing client") {
{
coro_rpc::coro_rpc_client client;
@ -146,6 +148,7 @@ TEST_CASE("testing client") {
std::string arg;
arg.resize(2048);
auto ret = co_await client->template call<large_arg_fun>(arg);
show(ret);
CHECK(ret.value() == arg);
co_return;
};
@ -205,6 +208,7 @@ TEST_CASE("testing client with inject server") {
auto client = co_await create_client(io_context, port);
g_action = inject_action::close_socket_after_send_length;
auto ret = co_await client->template call<hello>();
show(ret);
REQUIRE_MESSAGE(ret.error().code == coro_rpc::errc::io_error,
ret.error().msg);
};

View File

@ -346,13 +346,6 @@ TEST_CASE("test server accept error") {
REQUIRE_MESSAGE(ret.error().code == coro_rpc::errc::io_error,
ret.error().msg);
REQUIRE(client.has_closed() == true);
ec = syncAwait(client.connect("127.0.0.1", "8810"));
REQUIRE_MESSAGE(ec == coro_rpc::errc::io_error,
std::to_string(client.get_client_id()).append(ec.message()));
ret = syncAwait(client.call<hi>());
CHECK(!ret);
REQUIRE(client.has_closed() == true);
g_action = {};
}

View File

@ -67,12 +67,11 @@ struct RPC_trait<void> {
};
using coro_rpc_protocol = coro_rpc::protocol::coro_rpc_protocol;
template <auto func>
rpc_result<util::function_return_type_t<decltype(func)>, coro_rpc_protocol>
get_result(const auto &pair) {
rpc_result<util::function_return_type_t<decltype(func)>> get_result(
const auto &pair) {
auto &&[rpc_errc, buffer] = pair;
using T = util::function_return_type_t<decltype(func)>;
using return_type = rpc_result<util::function_return_type_t<decltype(func)>,
coro_rpc_protocol>;
using return_type = rpc_result<util::function_return_type_t<decltype(func)>>;
rpc_return_type_t<T> ret;
struct_pack::err_code ec;
rpc_error err;