[coro_rpc] [coro_http] use global executor instead of inner io_context (#358)

This commit is contained in:
saipubw 2023-07-10 16:33:20 +08:00 committed by GitHub
parent a50c4e1e0e
commit fa92e01e32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 136 additions and 110 deletions

View File

@ -132,32 +132,15 @@ class coro_rpc_client {
* Create client with io_context
* @param io_context asio io_context, async event handler
*/
coro_rpc_client(coro_io::ExecutorWrapper<> &executor, uint32_t client_id = 0)
coro_rpc_client(
coro_io::ExecutorWrapper<> &executor = *coro_io::get_global_executor(),
uint32_t client_id = 0)
: executor(executor.get_asio_executor()),
socket_(executor.get_asio_executor()) {
config_.client_id = client_id;
read_buf_.resize(default_read_buf_size_);
}
/*!
* Create client
*/
coro_rpc_client(uint32_t client_id = 0)
: inner_io_context_(std::make_unique<asio::io_context>()),
executor(inner_io_context_->get_executor()),
socket_(inner_io_context_->get_executor()) {
config_.client_id = client_id;
std::promise<void> promise;
thd_ = std::thread([this, &promise] {
work_ = std::make_unique<asio::io_context::work>(*inner_io_context_);
executor.schedule([&] {
promise.set_value();
});
inner_io_context_->run();
});
promise.get_future().wait();
}
[[nodiscard]] bool init_config(const config &conf) {
config_ = conf;
#ifdef YLT_ENABLE_SSL
@ -194,7 +177,7 @@ class coro_rpc_client {
config_.timeout_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(timeout_duration);
reset();
return connect(true);
return connect(is_reconnect_t{true});
}
[[nodiscard]] async_simple::coro::Lazy<std::errc> reconnect(
@ -207,7 +190,7 @@ class coro_rpc_client {
config_.timeout_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(timeout_duration);
reset();
return connect(true);
return connect(is_reconnect_t{true});
}
/*!
* Connect server
@ -231,7 +214,7 @@ class coro_rpc_client {
return connect();
}
[[nodiscard]] async_simple::coro::Lazy<std::errc> connect(
std::string endpoint,
std::string_view endpoint,
std::chrono::steady_clock::duration timeout_duration =
std::chrono::seconds(5)) {
auto pos = endpoint.find(':');
@ -254,10 +237,7 @@ class coro_rpc_client {
}
#endif
~coro_rpc_client() {
close();
stop_inner_io_context();
}
~coro_rpc_client() { close(); }
/*!
* Call RPC function with default timeout (5 second)
@ -359,6 +339,12 @@ class coro_rpc_client {
friend class coro_io::client_pool;
private:
// the const char * will convert to bool instead of std::string_view
// use this struct to prevent it.
struct is_reconnect_t {
bool value = false;
};
void reset() {
close_socket();
socket_ = decltype(socket_)(executor.get_asio_executor());
@ -368,14 +354,14 @@ class coro_rpc_client {
static bool is_ok(std::errc ec) noexcept { return ec == std::errc{}; }
[[nodiscard]] async_simple::coro::Lazy<std::errc> connect(
bool is_reconnect = false) {
is_reconnect_t is_reconnect = is_reconnect_t{false}) {
#ifdef YLT_ENABLE_SSL
if (!ssl_init_ret_) {
std::cout << "ssl_init_ret_: " << ssl_init_ret_ << std::endl;
co_return std::errc::not_connected;
}
#endif
if (!is_reconnect && has_closed_)
if (!is_reconnect.value && has_closed_)
AS_UNLIKELY {
ELOGV(ERROR,
"a closed client is not allowed connect again, please use "
@ -756,26 +742,6 @@ class coro_rpc_client {
has_closed_ = true;
}
void stop_inner_io_context() {
if (thd_.joinable()) {
work_ = nullptr;
if (thd_.get_id() == std::this_thread::get_id()) {
// we are now running in inner_io_context_, so destruction it in
// another thread
std::thread thrd{[ioc = std::move(inner_io_context_),
thd = std::move(thd_)]() mutable {
thd.join();
}};
thrd.detach();
}
else {
thd_.join();
}
}
return;
}
#ifdef UNIT_TEST_INJECT
public:
std::errc sync_connect(const std::string &host, const std::string &port) {
@ -790,9 +756,6 @@ class coro_rpc_client {
}
#endif
private:
std::unique_ptr<asio::io_context> inner_io_context_;
std::unique_ptr<asio::io_context::work> work_;
std::thread thd_;
coro_io::ExecutorWrapper<> executor;
asio::ip::tcp::socket socket_;
std::vector<std::byte> read_buf_;

View File

@ -21,6 +21,7 @@
#include <utility>
#include <ylt/coro_io/coro_file.hpp>
#include <ylt/coro_io/coro_io.hpp>
#include <ylt/coro_io/io_context_pool.hpp>
#include "http_parser.hpp"
#include "response_cv.hpp"
@ -181,28 +182,14 @@ class coro_http_client {
std::string domain;
#endif
};
coro_http_client()
: io_ctx_(std::make_unique<asio::io_context>()),
socket_(std::make_shared<socket_t>(io_ctx_->get_executor())),
executor_wrapper_(io_ctx_->get_executor()),
timer_(&executor_wrapper_) {
std::promise<void> promise;
io_thd_ = std::thread([this, &promise] {
work_ = std::make_unique<asio::io_context::work>(*io_ctx_);
executor_wrapper_.schedule([&] {
promise.set_value();
});
io_ctx_->run();
});
promise.get_future().wait();
}
coro_http_client(asio::io_context::executor_type executor)
: socket_(std::make_shared<socket_t>(executor)),
executor_wrapper_(executor),
timer_(&executor_wrapper_) {}
coro_http_client(coro_io::ExecutorWrapper<> *executor)
coro_http_client(
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor())
: coro_http_client(executor->get_asio_executor()) {}
bool init_config(const config &conf) {
@ -237,22 +224,7 @@ class coro_http_client {
return true;
}
~coro_http_client() {
async_close();
if (io_thd_.joinable()) {
work_ = nullptr;
if (io_thd_.get_id() == std::this_thread::get_id()) {
std::thread thrd{[io_ctx = std::move(io_ctx_),
io_thd = std::move(io_thd_)]() mutable {
io_thd.join();
}};
thrd.detach();
}
else {
io_thd_.join();
}
}
}
~coro_http_client() { async_close(); }
void async_close() {
if (socket_->has_closed_)
@ -1649,12 +1621,9 @@ class coro_http_client {
return has_http_scheme;
}
std::unique_ptr<asio::io_context> io_ctx_;
coro_io::ExecutorWrapper<> executor_wrapper_;
std::unique_ptr<asio::io_context::work> work_;
coro_io::period_timer timer_;
std::thread io_thd_;
std::shared_ptr<socket_t> socket_;
asio::streambuf read_buf_;
simple_buffer body_{};

View File

@ -1,7 +1,6 @@
if("${yaLanTingLibs_SOURCE_DIR}" STREQUAL "${CMAKE_SOURCE_DIR}")
# if is the subproject of yalantinglibs
# do nothing
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/output/examples/coro_http)
else()
# else find installed yalantinglibs
cmake_minimum_required(VERSION 3.15)

View File

@ -1,7 +1,6 @@
if("${yaLanTingLibs_SOURCE_DIR}" STREQUAL "${CMAKE_SOURCE_DIR}")
# if is the subproject of yalantinglibs
# do nothing
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/output/examples)
else()
# else find installed yalantinglibs
cmake_minimum_required(VERSION 3.15)

View File

@ -40,5 +40,6 @@ add_executable(coro_rpc_example_channel channel.cpp)
add_executable(coro_rpc_example_client_pool client_pool.cpp)
add_executable(coro_rpc_example_client_pools client_pools.cpp)
add_executable(coro_rpc_example_client client.cpp)
add_executable(coro_rpc_example_concurrent_clients concurrent_clients.cpp)
add_executable(coro_rpc_example_server server.cpp rpc_service.cpp)

View File

@ -51,11 +51,11 @@ Lazy<void> call_echo(std::shared_ptr<coro_io::channel<coro_rpc_client>> channel,
[](coro_rpc_client &client, std::string_view hostname) -> Lazy<void> {
auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) {
std::cout << "coro_rpc err: \n";
std::cout << "coro_rpc err: \n" << res.error().msg;
co_return;
}
if (res.value() != "Hello world!"sv) {
std::cout << "err echo resp: \n";
std::cout << "err echo resp: \n" << res.value();
co_return;
}
++qps;

View File

@ -44,11 +44,11 @@ Lazy<void> call_echo(coro_io::client_pool<coro_rpc_client> &client_pool,
[=](coro_rpc_client &client) -> Lazy<void> {
auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) {
std::cout << "coro_rpc err: \n";
std::cout << "coro_rpc err: \n" << res.error().msg;
co_return;
}
if (res.value() != "Hello world!"sv) {
std::cout << "err echo resp: \n";
std::cout << "err echo resp: \n" << res.value();
co_return;
}
++qps;

View File

@ -48,11 +48,11 @@ Lazy<void> call_echo(coro_io::client_pools<coro_rpc_client> &client_pools,
[=](coro_rpc_client &client) -> Lazy<void> {
auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) {
std::cout << "coro_rpc err: \n";
std::cout << "coro_rpc err: \n" << res.error().msg;
co_return;
}
if (res.value() != "Hello world!"sv) {
std::cout << "err echo resp: \n";
std::cout << "err echo resp: \n" << res.value();
co_return;
}
++qps;

View File

@ -0,0 +1,92 @@
/*
* Copyright (c) 2023, Alibaba Group Holding Limited;
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <async_simple/coro/Lazy.h>
#include <async_simple/coro/Sleep.h>
#include <asio/io_context.hpp>
#include <asio/steady_timer.hpp>
#include <atomic>
#include <chrono>
#include <climits>
#include <cstdlib>
#include <system_error>
#include <thread>
#include <ylt/coro_io/client_pool.hpp>
#include <ylt/coro_io/coro_io.hpp>
#include <ylt/coro_rpc/coro_rpc_client.hpp>
#include "ylt/coro_rpc/impl/coro_rpc_client.hpp"
std::string echo(std::string_view sv);
using namespace coro_rpc;
using namespace async_simple::coro;
using namespace std::string_view_literals;
using namespace std::chrono_literals;
std::atomic<uint64_t> qps = 0;
std::atomic<uint64_t> working_echo = 0;
/*!
* \example helloworld/concurrency_clients.main.cpp
* \brief demo for run concurrency clients
*/
Lazy<void> call_echo(int cnt) {
++working_echo;
coro_rpc_client client;
std::errc ec = co_await client.connect("localhost:8801");
for (int i = 0; i < 3 && ec != std::errc{}; ++i) {
co_await coro_io::sleep_for(rand() % 10000 * 1ms);
ec = co_await client.reconnect("localhost:8801");
}
if (ec == std::errc{}) {
for (int i = 0; i < cnt; ++i) {
auto res = co_await client.call<echo>("Hello world!");
if (!res.has_value()) {
std::cout << "coro_rpc err: \n" << res.error().msg;
co_return;
}
if (res.value() != "Hello world!"sv) {
std::cout << "err echo resp: \n" << res.value();
co_return;
}
++qps;
}
}
else {
std::cout << "connect failed \n";
}
--working_echo;
}
Lazy<void> qps_watcher() {
using namespace std::chrono_literals;
while (working_echo > 0) {
co_await coro_io::sleep_for(1s);
uint64_t cnt = qps.exchange(0);
std::cout << "QPS:" << cnt << " working echo:" << working_echo << std::endl;
cnt = 0;
}
}
int main() {
auto thread_cnt = std::thread::hardware_concurrency();
for (int i = 0, lim = thread_cnt * 20; i < lim; ++i) {
call_echo(100000).start([](auto &&) {
});
}
syncAwait(qps_watcher());
std::cout << "Done!" << std::endl;
return 0;
}

View File

@ -28,6 +28,7 @@
#include "doctest.h"
#include "inject_action.hpp"
#include "rpc_api.hpp"
#include "ylt/coro_io/io_context_pool.hpp"
#ifdef _MSC_VER
#define CORO_RPC_FUNCTION_SIGNATURE __FUNCSIG__
@ -138,7 +139,8 @@ struct ServerTester : TesterConfig {
g_client_id++);
}
else {
client = std::make_shared<coro_rpc_client>(g_client_id++);
client = std::make_shared<coro_rpc_client>(
*coro_io::get_global_executor(), g_client_id++);
}
#ifdef YLT_ENABLE_SSL
if (use_ssl) {
@ -355,7 +357,8 @@ struct ServerTester : TesterConfig {
g_client_id++);
}
else {
client = std::make_shared<coro_rpc_client>(g_client_id++);
client = std::make_shared<coro_rpc_client>(
*coro_io::get_global_executor(), g_client_id++);
}
#ifdef YLT_ENABLE_SSL
if (use_ssl) {

View File

@ -368,7 +368,7 @@ TEST_CASE("testing client with eof") {
server.async_start().start([](auto&&) {
});
CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout");
coro_rpc_client client(g_client_id++);
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
auto ec = client.sync_connect("127.0.0.1", "8801");
REQUIRE_MESSAGE(ec == std::errc{}, make_error_code(ec).message());
@ -390,7 +390,7 @@ TEST_CASE("testing client with shutdown") {
server.async_start().start([](auto&&) {
});
CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout");
coro_rpc_client client(g_client_id++);
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
auto ec = client.sync_connect("127.0.0.1", "8801");
REQUIRE_MESSAGE(ec == std::errc{}, make_error_code(ec).message());
server.register_handler<hello, client_hello>();
@ -420,7 +420,7 @@ TEST_CASE("testing client timeout") {
SUBCASE("connect, ip timeout") {
g_action = {};
// https://stackoverflow.com/questions/100841/artificially-create-a-connection-timeout-error
coro_rpc_client client(g_client_id++);
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
auto ret = client.connect("10.255.255.1", "8801", 5ms);
auto val = syncAwait(ret);
CHECK_MESSAGE(val == std::errc::timed_out, make_error_code(val).message());
@ -439,14 +439,14 @@ TEST_CASE("testing client timeout") {
// }
}
TEST_CASE("testing client connect err") {
coro_rpc_client client(g_client_id++);
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
auto val = syncAwait(client.connect("127.0.0.1", "8801"));
CHECK_MESSAGE(val == std::errc::not_connected,
make_error_code(val).message());
}
#ifdef UNIT_TEST_INJECT
TEST_CASE("testing client sync connect, unit test inject only") {
coro_rpc_client client(g_client_id++);
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
auto val = client.sync_connect("127.0.0.1", "8801");
CHECK_MESSAGE(val == std::errc::not_connected,
make_error_code(val).message());
@ -457,7 +457,7 @@ TEST_CASE("testing client sync connect, unit test inject only") {
server.async_start().start([](auto&&) {
});
CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout");
coro_rpc_client client2(g_client_id++);
coro_rpc_client client2(*coro_io::get_global_executor(), g_client_id++);
bool ok = client2.init_ssl("../openssl_files", "server.crt");
CHECK(ok == true);
val = client2.sync_connect("127.0.0.1", "8801");
@ -474,7 +474,7 @@ TEST_CASE("testing client call timeout") {
// coro_rpc_server server(2, 8801);
// server.async_start().start([](auto&&) {
// });
coro_rpc_client client(g_client_id++);
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
// auto ec_lazy = client.connect("127.0.0.1", "8801", 5ms);
// auto ec = syncAwait(ec_lazy);
// assert(ec == std::errc{});
@ -492,7 +492,7 @@ TEST_CASE("testing client call timeout") {
server.async_start().start([](auto&&) {
});
CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout");
coro_rpc_client client(g_client_id++);
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
auto ec_lazy = client.connect("127.0.0.1", "8801");
auto ec = syncAwait(ec_lazy);
REQUIRE(ec == std::errc{});

View File

@ -291,7 +291,7 @@ TEST_CASE("test server accept error") {
server.async_start().start([](auto &&) {
});
CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout");
coro_rpc_client client(g_client_id++);
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
ELOGV(INFO, "run test server accept error, client_id %d",
client.get_client_id());
auto ec = syncAwait(client.connect("127.0.0.1", "8810"));
@ -386,7 +386,7 @@ TEST_CASE("testing coro rpc write error") {
server.async_start().start([](auto &&) {
});
CHECK_MESSAGE(server.wait_for_start(3s), "server start timeout");
coro_rpc_client client(g_client_id++);
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
ELOGV(INFO, "run testing coro rpc write error, client_id %d",
client.get_client_id());
auto ec = syncAwait(client.connect("127.0.0.1", "8810"));

View File

@ -39,7 +39,7 @@ TEST_CASE("test varadic param") {
}
});
REQUIRE_MESSAGE(server->wait_for_start(3s), "server start timeout");
coro_rpc_client client(g_client_id++);
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
syncAwait(client.connect("localhost", std::to_string(server->port())));