[doc] add coro_rpc doc (#681)

* add coro_rpc doc

* fix doc

* update en doc

* fix code
This commit is contained in:
saipubw 2024-05-24 10:58:27 +08:00 committed by GitHub
parent f786c5a8b9
commit 5f3bd02292
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 1529 additions and 313 deletions

View File

@ -130,10 +130,12 @@ class context_base {
/*finish here*/
self_->status_ = context_status::finish_response;
}
const context_info_t<rpc_protocol> *get_context() const noexcept {
const context_info_t<rpc_protocol> *get_context_info() const noexcept {
return self_.get();
}
context_info_t<rpc_protocol> *get_context_info() noexcept {
return self_.get();
}
context_info_t<rpc_protocol> *get_context() noexcept { return self_.get(); }
};
template <typename T>

View File

@ -73,6 +73,11 @@ class client_pool;
namespace coro_rpc {
inline uint64_t get_global_client_id() {
static std::atomic<uint64_t> cid = 0;
return cid.fetch_add(1, std::memory_order::relaxed);
}
#ifdef GENERATE_BENCHMARK_DATA
std::string benchmark_file_path = "./";
#endif
@ -105,7 +110,7 @@ struct async_rpc_result_value_t {
async_rpc_result_value_t(T &&result) : result_(std::move(result)) {}
T &result() noexcept { return result_; }
const T &result() const noexcept { return result_; }
std::string_view attachment() const noexcept {
std::string_view get_attachment() const noexcept {
return buffer_.resp_attachment_buf_;
}
resp_body release_buffer() { return std::move(buffer_); }
@ -155,12 +160,12 @@ class coro_rpc_client {
const inline static rpc_error connect_error = {errc::io_error,
"client has been closed"};
struct config {
uint32_t client_id = 0;
uint64_t client_id = get_global_client_id();
std::chrono::milliseconds timeout_duration =
std::chrono::milliseconds{5000};
std::string host;
std::string port;
bool enable_tcp_no_delay_ = true;
bool enable_tcp_no_delay = true;
#ifdef YLT_ENABLE_SSL
std::filesystem::path ssl_cert_path;
std::string ssl_domain;
@ -172,7 +177,7 @@ class coro_rpc_client {
* @param io_context asio io_context, async event handler
*/
coro_rpc_client(asio::io_context::executor_type executor,
uint32_t client_id = 0)
uint64_t client_id = get_global_client_id())
: control_(std::make_shared<control_t>(executor, false)),
timer_(std::make_unique<coro_io::period_timer>(executor)) {
config_.client_id = client_id;
@ -184,7 +189,7 @@ class coro_rpc_client {
*/
coro_rpc_client(
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor(),
uint32_t client_id = 0)
uint64_t client_id = get_global_client_id())
: control_(
std::make_shared<control_t>(executor->get_asio_executor(), false)),
timer_(std::make_unique<coro_io::period_timer>(
@ -424,7 +429,7 @@ class coro_rpc_client {
ELOGV(WARN, "client_id %d connect timeout", config_.client_id);
co_return errc::timed_out;
}
if (config_.enable_tcp_no_delay_ == true) {
if (config_.enable_tcp_no_delay == true) {
control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec);
}
@ -738,7 +743,7 @@ class coro_rpc_client {
call<func>(std::forward<Args>(args)...));
}
#endif
private:
template <auto func, typename... Args>
async_simple::coro::Lazy<rpc_error> send_request_for_impl(
auto duration, uint32_t &id, coro_io::period_timer &timer,

View File

@ -29,6 +29,7 @@
#include <memory>
#include <mutex>
#include <system_error>
#include <thread>
#include <unordered_map>
#include <vector>
#include <ylt/easylog.hpp>
@ -68,29 +69,36 @@ class coro_rpc_server_base {
* TODO: add doc
* @param thread_num the number of io_context.
* @param port the server port to listen.
* @param listen address of server
* @param conn_timeout_duration client connection timeout. 0 for no timeout.
* default no timeout.
* @param is_enable_tcp_no_delay is tcp socket allow
*/
coro_rpc_server_base(size_t thread_num, unsigned short port,
coro_rpc_server_base(size_t thread_num = std::thread::hardware_concurrency(),
unsigned short port = 9001,
std::string address = "0.0.0.0",
std::chrono::steady_clock::duration
conn_timeout_duration = std::chrono::seconds(0))
conn_timeout_duration = std::chrono::seconds(0),
bool is_enable_tcp_no_delay = true)
: pool_(thread_num),
acceptor_(pool_.get_executor()->get_asio_executor()),
port_(port),
conn_timeout_duration_(conn_timeout_duration),
flag_{stat::init} {
flag_{stat::init},
is_enable_tcp_no_delay_(is_enable_tcp_no_delay) {
init_address(std::move(address));
}
coro_rpc_server_base(size_t thread_num,
std::string address /* = "0.0.0.0:9001" */,
coro_rpc_server_base(size_t thread_num = std::thread::hardware_concurrency(),
std::string address = "0.0.0.0:9001",
std::chrono::steady_clock::duration
conn_timeout_duration = std::chrono::seconds(0))
conn_timeout_duration = std::chrono::seconds(0),
bool is_enable_tcp_no_delay = true)
: pool_(thread_num),
acceptor_(pool_.get_executor()->get_asio_executor()),
conn_timeout_duration_(conn_timeout_duration),
flag_{stat::init} {
flag_{stat::init},
is_enable_tcp_no_delay_(is_enable_tcp_no_delay) {
init_address(std::move(address));
}
@ -99,7 +107,13 @@ class coro_rpc_server_base {
acceptor_(pool_.get_executor()->get_asio_executor()),
port_(config.port),
conn_timeout_duration_(config.conn_timeout_duration),
flag_{stat::init} {
flag_{stat::init},
is_enable_tcp_no_delay_(config.is_enable_tcp_no_delay) {
#ifdef YLT_ENABLE_SSL
if (config.ssl_config) {
init_ssl_context_helper(context_, config.ssl_config.value());
}
#endif
init_address(config.address);
}
@ -109,7 +123,7 @@ class coro_rpc_server_base {
}
#ifdef YLT_ENABLE_SSL
void init_ssl_context(const ssl_configure &conf) {
void init_ssl(const ssl_configure &conf) {
use_ssl_ = init_ssl_context_helper(context_, conf);
}
#endif
@ -122,19 +136,19 @@ class coro_rpc_server_base {
* @return error code if start failed, otherwise block until server stop.
*/
[[nodiscard]] coro_rpc::err_code start() noexcept {
auto ret = async_start();
if (ret) {
ret.value().wait();
return ret.value().value();
}
else {
return ret.error();
}
return async_start().get();
}
[[nodiscard]] coro_rpc::expected<async_simple::Future<coro_rpc::err_code>,
coro_rpc::err_code>
async_start() noexcept {
private:
async_simple::Future<coro_rpc::err_code> make_error_future(
coro_rpc::err_code &&err) {
async_simple::Promise<coro_rpc::err_code> p;
p.setValue(std::move(err));
return p.getFuture();
}
public:
async_simple::Future<coro_rpc::err_code> async_start() noexcept {
{
std::unique_lock lock(start_mtx_);
if (flag_ != stat::init) {
@ -144,8 +158,8 @@ class coro_rpc_server_base {
else if (flag_ == stat::stop) {
ELOGV(INFO, "has stoped");
}
return coro_rpc::unexpected<coro_rpc::err_code>{
coro_rpc::errc::server_has_ran};
return make_error_future(
coro_rpc::err_code{coro_rpc::errc::server_has_ran});
}
errc_ = listen();
if (!errc_) {
@ -177,7 +191,7 @@ class coro_rpc_server_base {
return std::move(future);
}
else {
return coro_rpc::unexpected<coro_rpc::err_code>{errc_};
return make_error_future(coro_rpc::err_code{errc_});
}
}
@ -387,7 +401,9 @@ class coro_rpc_server_base {
int64_t conn_id = ++conn_id_;
ELOGV(INFO, "new client conn_id %d coming", conn_id);
socket.set_option(asio::ip::tcp::no_delay(true), error);
if (is_enable_tcp_no_delay_) {
socket.set_option(asio::ip::tcp::no_delay(true), error);
}
auto conn = std::make_shared<coro_connection>(executor, std::move(socket),
conn_timeout_duration_);
conn->set_quit_callback(
@ -459,6 +475,7 @@ class coro_rpc_server_base {
std::atomic<uint16_t> port_;
std::string address_;
bool is_enable_tcp_no_delay_;
coro_rpc::err_code errc_ = {};
std::chrono::steady_clock::duration conn_timeout_duration_;

View File

@ -25,19 +25,22 @@
namespace coro_rpc {
namespace config {
struct coro_rpc_config_base {
uint16_t port = 8801;
struct config_base {
bool is_enable_tcp_no_delay = true;
uint16_t port = 9001;
unsigned thread_num = std::thread::hardware_concurrency();
std::chrono::steady_clock::duration conn_timeout_duration =
std::chrono::seconds{0};
std::string address = "0.0.0.0";
#ifdef YLT_ENABLE_SSL
std::optional<ssl_configure> ssl_config = std::nullopt;
#endif
};
struct coro_rpc_default_config : public coro_rpc_config_base {
struct config_t : public config_base {
using rpc_protocol = coro_rpc::protocol::coro_rpc_protocol;
using executor_pool_t = coro_io::io_context_pool;
};
} // namespace config
using coro_rpc_server = coro_rpc_server_base<config::coro_rpc_default_config>;
using coro_rpc_server = coro_rpc_server_base<config_t>;
} // namespace coro_rpc

View File

@ -23,7 +23,7 @@ TEST_CASE("test RR") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "localhost:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
@ -62,10 +62,10 @@ TEST_CASE("test WRR") {
coro_rpc::coro_rpc_server server1(1, 8801);
auto res = server1.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
coro_rpc::coro_rpc_server server2(1, 8802);
auto res2 = server2.async_start();
REQUIRE_MESSAGE(res2, "server start failed");
REQUIRE_MESSAGE(!res2.hasResult(), "server start failed");
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
auto hosts =
@ -119,7 +119,7 @@ TEST_CASE("test Random") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "localhost:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(
@ -148,7 +148,7 @@ TEST_CASE("test single host") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
auto hosts = std::vector<std::string_view>{"127.0.0.1:8801"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {
@ -168,7 +168,7 @@ TEST_CASE("test send_request config") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 9813);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
auto hosts = std::vector<std::string_view>{"127.0.0.1:9813"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {

View File

@ -87,7 +87,7 @@ TEST_CASE("test client pool") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto is_started = server.async_start();
REQUIRE(is_started);
REQUIRE(is_started.hasResult() == false);
auto pool = coro_io::client_pool<coro_rpc::coro_rpc_client>::create(
"127.0.0.1:8801", {.max_connection = 100,
.idle_timeout = 300ms,
@ -114,7 +114,7 @@ TEST_CASE("test idle timeout yield") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto is_started = server.async_start();
REQUIRE(is_started);
REQUIRE(!is_started.hasResult());
auto pool = coro_io::client_pool<coro_rpc::coro_rpc_client>::create(
"127.0.0.1:8801", {.max_connection = 100,
.idle_queue_per_max_clear_count = 1,
@ -142,7 +142,7 @@ TEST_CASE("test reconnect") {
async_simple::Promise<async_simple::Unit> p;
coro_io::sleep_for(700ms).start([&server, &p](auto &&) {
auto server_is_started = server.async_start();
REQUIRE(server_is_started);
REQUIRE(!server_is_started.hasResult());
});
auto res = co_await event(100, *pool, cv, lock);
@ -177,7 +177,7 @@ TEST_CASE("test reconnect retry wait time exclude reconnect cost time") {
async_simple::Promise<async_simple::Unit> p;
coro_io::sleep_for(350ms).start([&server, &p](auto &&) {
auto server_is_started = server.async_start();
REQUIRE(server_is_started);
REQUIRE(!server_is_started.hasResult());
});
auto res = co_await event<mock_client>(100, *pool, cv, lock);
CHECK(res);
@ -196,7 +196,7 @@ TEST_CASE("test collect_free_client") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
auto is_started = server.async_start();
REQUIRE(is_started);
REQUIRE(!is_started.hasResult());
auto pool = coro_io::client_pool<coro_rpc::coro_rpc_client>::create(
"127.0.0.1:8801", {.max_connection = 100, .idle_timeout = 300ms});

View File

@ -31,7 +31,7 @@ int main() {
coro_rpc::coro_rpc_server server(std::thread::hardware_concurrency(), 0);
register_handlers(server);
auto started = server.async_start();
if (!started) {
if (started.hasResult()) {
ELOGV(ERROR, "server started failed");
return -1;
}
@ -118,7 +118,7 @@ int main() {
server.stop();
started->wait();
started.wait();
pool.stop();
thd.join();

View File

@ -52,7 +52,7 @@ void async_echo_by_callback(
/* rpc function runs in global io thread pool */
coro_io::post([conn, data]() mutable {
/* send work to global non-io thread pool */
auto *ctx = conn.get_context();
auto *ctx = conn.get_context_info();
conn.response_msg(data); /*response here*/
}).start([](auto &&) {
});

View File

@ -39,7 +39,7 @@ int main() {
server2.register_handler<echo>();
// async start server
auto res = server2.async_start();
assert(res.has_value());
assert(!res.hasResult());
// sync start server & sync await server stop
return !server.start();

View File

@ -6,7 +6,7 @@
std::string echo(std::string str) { return str; }
void upload_file(coro_rpc::context<std::errc> conn, file_part part) {
auto &ctx = *conn.get_context();
auto &ctx = *conn.get_context_info();
if (!ctx.tag().has_value()) {
auto filename = std::to_string(std::time(0)) +
std::filesystem::path(part.filename).extension().string();
@ -27,7 +27,7 @@ void upload_file(coro_rpc::context<std::errc> conn, file_part part) {
void download_file(coro_rpc::context<response_part> conn,
std::string filename) {
auto &ctx = *conn.get_context();
auto &ctx = *conn.get_context_info();
if (!ctx.tag().has_value()) {
std::string actual_filename =
std::filesystem::path(filename).filename().string();

View File

@ -50,9 +50,9 @@ int long_run_func(int val) {
void echo_with_attachment(coro_rpc::context<void> conn) {
ELOGV(INFO, "call function echo_with_attachment, conn ID:%d",
conn.get_context()->get_connection_id());
auto str = conn.get_context()->release_request_attachment();
conn.get_context()->set_response_attachment(std::move(str));
conn.get_context_info()->get_connection_id());
auto str = conn.get_context_info()->release_request_attachment();
conn.get_context_info()->set_response_attachment(std::move(str));
conn.response_msg();
}
template <typename T>
@ -81,7 +81,7 @@ void test_context() {
return;
}
void test_callback_context(coro_rpc::context<void> conn) {
auto *ctx = conn.get_context();
auto *ctx = conn.get_context_info();
test_ctx_impl(ctx, "test_callback_context");
[](coro_rpc::context<void> conn) -> async_simple::coro::Lazy<void> {
co_await coro_io::sleep_for(514ms);

View File

@ -84,12 +84,12 @@ TEST_CASE("testing client") {
future.wait();
coro_rpc_server server(2, coro_rpc_server_port);
#ifdef YLT_ENABLE_SSL
server.init_ssl_context(
server.init_ssl(
ssl_configure{"../openssl_files", "server.crt", "server.key"});
#endif
auto res = server.async_start();
CHECK_MESSAGE(res, "server start failed");
CHECK_MESSAGE(!res.hasResult(), "server start failed");
SUBCASE("call rpc, function not registered") {
g_action = {};
@ -172,11 +172,11 @@ TEST_CASE("testing client with inject server") {
});
coro_rpc_server server(2, coro_rpc_server_port);
#ifdef YLT_ENABLE_SSL
server.init_ssl_context(
server.init_ssl(
ssl_configure{"../openssl_files", "server.crt", "server.key"});
#endif
auto res = server.async_start();
CHECK_MESSAGE(res, "server start failed");
CHECK_MESSAGE(!res.hasResult(), "server start failed");
server.register_handler<hello>();
@ -242,10 +242,10 @@ class SSLClientTester {
inject("server key", server_key_path, server_key);
inject("dh", dh_path, dh);
ssl_configure config{base_path, server_crt_path, server_key_path, dh_path};
server.init_ssl_context(config);
server.init_ssl(config);
server.template register_handler<hi>();
auto res = server.async_start();
CHECK_MESSAGE(res, "server start timeout");
CHECK_MESSAGE(!res.hasResult(), "server start timeout");
std::promise<void> promise;
auto future = promise.get_future();
@ -371,7 +371,7 @@ TEST_CASE("testing client with eof") {
coro_rpc_server server(2, 8801);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
auto ec = client.sync_connect("127.0.0.1", "8801");
REQUIRE_MESSAGE(!ec, ec.message());
@ -393,7 +393,7 @@ TEST_CASE("testing client with attachment") {
coro_rpc_server server(2, 8801);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
auto ec = client.sync_connect("127.0.0.1", "8801");
REQUIRE_MESSAGE(!ec, ec.message());
@ -418,7 +418,7 @@ TEST_CASE("testing client with context response user-defined error") {
g_action = {};
coro_rpc_server server(2, 8801);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
auto ec = client.sync_connect("127.0.0.1", "8801");
REQUIRE(!ec);
@ -437,7 +437,7 @@ TEST_CASE("testing client with shutdown") {
g_action = {};
coro_rpc_server server(2, 8801);
auto res = server.async_start();
CHECK_MESSAGE(res, "server start timeout");
CHECK_MESSAGE(!res.hasResult(), "server start timeout");
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
auto ec = client.sync_connect("127.0.0.1", "8801");
REQUIRE_MESSAGE(!ec, ec.message());
@ -502,7 +502,7 @@ TEST_CASE("testing client sync connect, unit test inject only") {
g_action = {};
coro_rpc_server server(2, 8801);
auto res = server.async_start();
CHECK_MESSAGE(res, "server start timeout");
CHECK_MESSAGE(!res.hasResult(), "server start timeout");
coro_rpc_client client2(*coro_io::get_global_executor(), g_client_id++);
bool ok = client2.init_ssl("../openssl_files", "server.crt");
CHECK(ok == true);
@ -537,7 +537,7 @@ TEST_CASE("testing client call timeout") {
server.register_handler<hello_timeout>();
server.register_handler<hi>();
auto res = server.async_start();
CHECK_MESSAGE(res, "server start timeout");
CHECK_MESSAGE(!res.hasResult(), "server start timeout");
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
auto ec_lazy = client.connect("127.0.0.1", "8801");
auto ec = syncAwait(ec_lazy);

View File

@ -37,12 +37,12 @@ struct CoroServerTester : ServerTester {
server(2, config.port, config.address, config.conn_timeout_duration) {
#ifdef YLT_ENABLE_SSL
if (use_ssl) {
server.init_ssl_context(
server.init_ssl(
ssl_configure{"../openssl_files", "server.crt", "server.key"});
}
#endif
auto res = server.async_start();
CHECK_MESSAGE(res, "server start timeout");
CHECK_MESSAGE(!res.hasResult(), "server start timeout");
}
~CoroServerTester() { server.stop(); }
@ -231,9 +231,9 @@ struct CoroServerTester : ServerTester {
{
auto new_server = coro_rpc_server(2, std::stoi(this->port_));
auto ec = new_server.async_start();
REQUIRE(!ec);
REQUIRE_MESSAGE(ec.error() == coro_rpc::errc::address_in_used,
ec.error().message());
REQUIRE(ec.hasResult());
REQUIRE_MESSAGE(ec.value() == coro_rpc::errc::address_in_used,
ec.value().message());
}
ELOGV(INFO, "OH NO");
}
@ -312,7 +312,7 @@ TEST_CASE("testing coro rpc server stop") {
ELOGV(INFO, "run testing coro rpc server stop");
coro_rpc_server server(2, 8810);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
SUBCASE("stop twice") {
server.stop();
server.stop();
@ -335,7 +335,7 @@ TEST_CASE("test server accept error") {
coro_rpc_server server(2, 8810);
server.register_handler<hi>();
auto res = server.async_start();
CHECK_MESSAGE(res, "server start timeout");
CHECK_MESSAGE(!res.hasResult(), "server start timeout");
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
ELOGV(INFO, "run test server accept error, client_id %d",
client.get_client_id());
@ -356,7 +356,7 @@ TEST_CASE("test server write queue") {
coro_rpc_server server(2, 8810);
server.register_handler<coro_fun_with_delay_return_void_cost_long_time>();
auto res = server.async_start();
CHECK_MESSAGE(res, "server start timeout");
CHECK_MESSAGE(!res.hasResult(), "server start timeout");
std::string buffer;
buffer.reserve(coro_rpc_protocol::REQ_HEAD_LEN +
struct_pack::get_needed_size(std::monostate{}));
@ -420,7 +420,7 @@ TEST_CASE("testing coro rpc write error") {
coro_rpc_server server(2, 8810);
server.register_handler<hi>();
auto res = server.async_start();
CHECK_MESSAGE(res, "server start failed");
CHECK_MESSAGE(!res.hasResult(), "server start failed");
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
ELOGV(INFO, "run testing coro rpc write error, client_id %d",
client.get_client_id());

View File

@ -31,7 +31,7 @@ TEST_CASE("test varadic param") {
std::thread::hardware_concurrency(), 8808);
server->register_handler<test_func>();
auto res = server->async_start();
REQUIRE_MESSAGE(res, "server start failed");
REQUIRE_MESSAGE(!res.hasResult(), "server start failed");
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
syncAwait(client.connect("localhost", std::to_string(server->port())));

View File

@ -5,6 +5,8 @@ INPUT=../include/ylt/struct_pack.hpp \
../include/ylt/coro_rpc/coro_rpc_server.hpp \
../include/ylt/coro_rpc/coro_rpc_client.hpp \
docs/en/coro_rpc/coro_rpc_introduction.md \
docs/en/coro_rpc/coro_rpc_server.md \
docs/en/coro_rpc/coro_rpc_client.md \
docs/en/struct_pack/struct_pack_intro.md
MARKDOWN_SUPPORT = YES

View File

@ -2,7 +2,9 @@ PROJECT_NAME=yaLanTingLibs
GENERATE_HTML=yes
GENERATE_LATEX=no
INPUT=docs/zh/coro_rpc/coro_rpc_doc.hpp \
docs/zh/coro_rpc/coro_rpc_intro.md \
docs/zh/coro_rpc/coro_rpc_introduction.md \
docs/zh/coro_rpc/coro_rpc_server.md \
docs/zh/coro_rpc/coro_rpc_client.md \
docs/zh/struct_pack/struct_pack_doc.hpp \
docs/zh/struct_pack/struct_pack_intro.md

View File

@ -0,0 +1,261 @@
# Introduction to coro_rpc Client
## Base Usage
class `coro_rpc::coro_rpc_client` is the client side of coro_rpc, allowing users to send RPC requests to the server.
Below, we will demonstrate the basic usage of rpc_client.
```cpp
using namespace async_simple;
using namespace coro_rpc;
int add(int a,int b);
Lazy<void> example() {
coro_rpc_client client;
coro_rpc::err_code ec = co_await client.connect("localhost:9001");
if (ec) { /*check if connection error*/
std::cout<<ec.message()<<std::endl;
co_return;
}
rpc_result result = co_await client.call<add>(1,2);
/*rpc_result is type of expected<T,rpc_error>, which T is rpc return type*/
if (!result.has_value()) {
/*call result.error() to get rpc error message*/
std::cout<<"error code:"<<result.error().val()<< ", error message:"<<result.error().msg()<<std::endl;
co_return;
}
assert(result.value()==3); /*call result.value() to get rpc return type*/
}
```
You can use the `set_req_attachment` function to set the attachment for the current request. This is a piece of binary data that will be sent directly to the client without serialization. Similarly, you can also use `get_resp_attachment()` and `release_resp_attachment()` to retrieve the attachment returned by the RPC request.
```cpp
using namespace async_simple;
using namespace coro_rpc;
void attachment_echo() {
auto ctx=coro_rpc::get_context();
ctx->set_resp_attachment(ctx->get_req_attachment());
}
Lazy<std::string> example(coro_rpc_client& client, std::string_view attachment) {
client.set_req_attachment(attachment);
rpc_result result = co_await client.call<attachment_echo>();
if (result.has_value()) {
assert(result.get_resp_attachment()==attachment);
co_return std::move(result.release_resp_attachment());
}
co_return "";
}
```
By default, the RPC client will wait for 5 seconds after sending a request/establishing a connection. If no response is received after 5 seconds, it will return a timeout error. Users can also customize the wait duration by calling the `call_for` function.
```cpp
client.connect("127.0.0.1:9001", std::chrono::seconds{10});
auto result = co_await client.call_for<add>(std::chrono::seconds{10},1,2);
assert(result.value() == 3);
```
The duration can be any `std::chrono::duration`` type, common examples include `std::chrono::seconds`` and `std::chrono::milliseconds``. Notably, if the duration is set to zero, it indicates that the function call will never time out.
## SSL support
coro_rpc supports using OpenSSL to encrypt connections. After installing OpenSSL and importing yalantinglibs into your project with CMake's `find_package` or `fetch_content`, you can enable SSL support by setting the CMake option YLT_ENABLE_SSL=ON. Alternatively, you might manually add the YLT_ENABLE_SSL macro and manually link to OpenSSL.
Once SSL support has been enabled, users can invoke the `init_ssl`` function before establishing a connection to the server. This will create an encrypted link between the client and the server. Its important to note that the coro_rpc server must also be compiled with SSL support enabled, and the `init_ssl` method must be called to enable SSL support before starting the server.
```cpp
client.init_ssl("./","server.crt");
```
The first string represents the base path where the SSL certificate is located, the second string represents the relative path of the SSL certificate relative to the base path.
## Conversion and compile-time checking of RPC parameters
coro_rpc will perform compile-time checks on the validity of arguments during invocation. For example, for the following rpc function:
```cpp
inline std::string echo(std::string str) { return str; }
```
Next, when the current client invokes the rpc function:
```cpp
client.call<echo>(42); // Parameter does not match, compilation error
client.call<echo>(); // Missing parameter, compilation error
client.call<echo>("", 0); // Extra parameters, compilation error
client.call<echo>("hello, coro_rpc"); // The string literal can be converted to std::string, compilation succeeds
```
## Connect Option
The `coro_rpc_client` provides an `init_config`` function for configuring connection options. The following code snippet lists the configurable options.
```cpp
using namespace coro_rpc;
using namespace std::chrono;
void set_config(coro_rpc_client& client) {
client.init_config(config{
.timeout_duration = 5s, // Timeout duration for requests and connections
.host = "localhost", // Server hostname
.port = "9001", // Server port
.enable_tcp_no_delay = true, // Whether to disable socket-level delayed sending of requests
/* The following options are available only when SSL support is activated */
.ssl_cert_path = "./server.crt", // Path to the SSL certificate
.ssl_domain = "localhost"
});
}
```
## Calling Model
Each `coro_rpc_client` is bound to a specific IO thread. By default, it selects a connection via round-robin from the global IO thread pool. Users can also manually bind it to a specific IO thread.
```cpp
auto executor=coro_io::get_global_executor();
coro_rpc_client client(executor),client2(executor);
// Both clients are bound to the same IO thread.
```
Each time a coroutine-based IO task is initiated (such as `connect`, `call`, `send_request`), the client internally submits the IO event to the operating system. When the IO event is completed, the coroutine is then resumed on the bound IO thread to continue execution. For example, in the following code, the task switches to the IO thread for execution after calling connect.
```cpp
/*run in thread 1*/
coro_rpc_client cli;
co_await cli.connect("localhost:9001");
/*run in thread 2*/
do_something();
```
## Connection Pool and Load Balancing
`coro_io` offers a connection pool `client_pool` and a load balancer `channel`. Users can manage `coro_rpc`/`coro_http` connections through the `client_pool`, and can use `channel` to achieve load balancing among multiple hosts. For more details, please refer to the documentation of `coro_io`.
## Connection Reuse
The `coro_rpc_client` can achieve connection reuse through the `send_request`` function. This function is thread-safe, allowing multiple threads to call the `send_request` method on the same client concurrently. The return value of the function is `Lazy<Lazy<async_rpc_result<T>>>`. The first `co_await` waits for the request to be sent, and the second `co_await` waits for the rpc result to return.
Connection reuse allows us to reduce the number of connections under high concurrency, eliminating the need to create new connections. It also improves the throughput of each connection.
Here's a simple example code snippet:
```cpp
using namespace coro_rpc;
using namespace async_simple::coro;
std::string_view echo(std::string_view);
Lazy<void> example(coro_rpc_client& client) {
// Wait for the request to be fully sent
Lazy<async_rpc_result> handler = co_await client.send_request<echo>("Hello");
// Then wait for the server to return the RPC request result
async_rpc_result result = co_await handler;
if (result) {
assert(result->result() == "Hello");
}
else {
// error handle
std::cout<<result.error().msg()<<std::endl;
}
}
```
We can call send_request multiple times to implement connection reuse:
```cpp
using namespace coro_rpc;
using namespace async_simple::coro;
std::string_view echo(std::string_view);
Lazy<void> example(coro_rpc_client& client) {
std::vector<Lazy<async_rpc_result>> handlers;
// First, send 10 requests consecutively
for (int i=0;i<10;++i) {
handlers.push_back(co_await client.send_request<echo>(std::to_string(i)));
}
// Next, wait for all the requests to return
std::vector<async_rpc_result> results = co_await collectAll(std::move(handlers));
for (int i=0;i<10;++i) {
assert(results[i]->result() == std::to_string(i));
}
co_return;
}
```
### Attachment
When using the `send_request` method, since multiple requests might be sent simultaneously, we should not call the `set_req_attachment` method to send an attachment to the server, nor should we call the `get_resp_attachment` and `release_resp_attachment` methods to get the attachment returned by the server.
Instead, we can set the attachment when sending a request by calling the `send_request_with_attachment` function. Additionally, we can retrieve the attachment by calling the `->get_attachment()` and `->release_buffer()` methods of `async_rpc_result`.
```cpp
using namespace coro_rpc;
using namespace async_simple::coro;
int add(int a, int b);
Lazy<std::string> example(coro_rpc_client& client) {
async_rpc_result result = co_await co_await client.send_request_with_attachment<echo>("Hello", 1, 2);
assert(result->result() == 3);
assert(result->get_attachment() == "Hello");
co_return std::move(result->release_buffer().resp_attachment_buf_);
}
```
### Execution order
When the called rpc function is a coroutine rpc function or a callback rpc function, the rpc requests may not necessarily be executed in order. The server might execute multiple rpc requests simultaneously.
For example, suppose there is the following code:
```cpp
using namespace async_simple::coro;
Lazy<void> sleep(int seconds) {
co_await coro_io::sleep(1s * seconds); // Yield the coroutine here
co_return;
}
```
Server registration and startup:
```cpp
using namespace coro_rpc;
void start() {
coro_rpc_server server(/* thread = */1,/* port = */ 8801);
server.register_handler<sleep>();
server.start();
}
```
The client consecutively calls the sleep function twice on the same connection, sleeping for 2 seconds the first time and 1 second the second time.
```cpp
using namespace async_simple::coro;
using namespace coro_rpc;
Lazy<void> call() {
coro_rpc_client cli,cli2;
co_await cli.connect("localhost:8801");
co_await cli2.connect("localhost:8801");
auto handler1 = co_await cli.send_request<sleep>(2);
auto handler2 = co_await cli.send_request<sleep>(1);
auto handler3 = co_await cli2.send_request<sleep>(0);
handler2.start([](auto&&){
std::cout<<"handler2 return"<<std::endl;
});
handler3.start([](auto&&){
d::cout<<"handler3 return"<<std::endl;
});
co_await handler1;
std::cout<<"handler1 return"<<std::endl;
}
```
Under normal circumstances, handler3 will return first, followed by handler2, and finally handler1. Although the server only has one IO thread for executing rpc functions, the coroutine function will yield the coroutine when calling `coro_io::sleep`, thus ensuring that other connections will not be blocked.
### Socket Delayed Sending
When using connection reuse, you can try setting the option `enable_tcp_no_delay` to `false`. This allows the underlying implementation to batch multiple small requests together for sending, thereby increasing throughput, but it may lead to increased latency.
## Thread-safe
For multiple coro_rpc_client instances, they do not interfere with each other and can be safely called in different threads respectively.
When calling a single `coro_rpc_client` simultaneously in multiple threads, it is necessary to note that only some member functions are thread-safe, including `send_request()`, `close()`, `connect()`, `get_executor()`, `get_pipeline_size()`, `get_client_id()`, `get_config()`, etc. If the user has not called the `connect()` function again with an endpoint or hostname, then the `get_port()` and `get_host()` functions are also thread-safe.
It is important to note that the `call`, `get_resp_attachment`, `set_req_attachment`, `release_resp_attachment`, and `init_config` functions are not thread-safe and must not be called by multiple threads simultaneously. In this case, only `send_request` can be used for multiple threads to make concurrent requests over a single connection.

View File

@ -1,6 +1,6 @@
# Introduction
coro_rpc is a high-performance Remote Procedure Call (RPC) framework in C++20, based on stackless coroutine and compile-time reflection. In an `echo` benchmark test on localhost, it reaches a peak QPS of 20 million, which exceeds other RPC libraries, such as grpc and brpc. Rather than high performance, the most key feature of coro_rpc is easy to use: as a header-only library, it does not need to be compiled or installed separately. It allows building an RPC client and server with a few lines of C++ code.
coro_rpc is a high-performance Remote Procedure Call (RPC) framework in C++20, based on stackless coroutine and compile-time reflection. In an `echo` benchmark test on localhost with 96-cores cpu, it reaches a peak QPS of 20 million (in pipeline) or 4.5 million(2000 connection in ping-pong), which exceeds other RPC libraries, such as grpc and brpc. Rather than high performance, the most key feature of coro_rpc is easy to use: as a header-only library, it does not need to be compiled or installed separately. It allows building an RPC client and server with a few lines of C++ code.
The core design goal of coro_rpc is usability. Instead of exposing too many troublesome details of the underlying RPC framework, coro_rpc provides a simplifying abstraction that allows programmers to concentrate principally on business logic and implement an RPC service without much effort. Given such simplicity, coro_rpc goes back to the essence of RPC: a remote function call similar to a normal function call except for the underlying network I/O. So coro_rpc user does not need to care about the underlying networking, data serialization, and so on but focus on up-layer implementations. And coro_rpc provides simple and straightforward APIs to users. Let's see one simple demo below
@ -250,7 +250,7 @@ example::EchoService_Stub stub(&channel);
```cpp
# include <coro_rpc/coro_rpc_client.hpp>
Lazy<void> say_hello(){
Lazy<void> say_hello() {
coro_rpc_client client;
co_await client.connect("localhost", /*port =*/"9000");
while (true){
@ -262,77 +262,6 @@ Lazy<void> say_hello(){
One core feature of coro_rpc is stackless coroutine where users could write asynchronous code in a synchronous manner, which is more simple and easy to understand.
# More features
## Real-time Tasks and Non-Real-time Tasks
The examples shown earlier do not demonstrate how responses are sent back to the client with the results, because by default the coro_rpc framework will help the user to serialize and send the results to client automatically. And the user is completely unaware and only needs to focus on the business logic. It should be noted that, in this scenario, the response callback is executed in the I/O thread, which is suitable for real-time critical scenarios, with the disadvantage of blocking the I/O thread. What if the user does not want to execute the business logic in the io thread, but rather in a thread or thread pool and delays sending messages?
coro_rpc has taken this problem into account. coro_rpc considers that RPC tasks are divided into real-time and Non-real-time tasks. real-time tasks are executed in the I/O thread and sent to the client immediately, with better timeliness and lower latency; Non-real-time tasks can be scheduled in a separate thread, and the requests are sent to the server at some point in the future; coro_rpc supports both kinds of tasks.
Switch to time-delayed task
```cpp
#include <ylt/coro_rpc/connection.hpp>
#include <ylt/coro_rpc/coro_rpc_server.hpp>
//Real-time tasks
std::string echo(std::string str) { return str; }
//Non-Real-time tasks, requests handled in separate thread
void delay_echo(coro_connection<std::string> conn, std::string str) {
std::thread([conn, str]{
conn.response_msg(str); //requests handled in separate thread
}).detach();
}
```
## Asynchronous mode
It is recommended to use coroutine on server development. However, the asynchronous call mode is also supported if user does not prefer coroutine.
- coroutine based rpc server
```cpp
#include <ylt/coro_rpc/coro_rpc_server.hpp>
std::string hello() { return "hello coro_rpc"; }
int main() {
coro_rpc_server server(/*thread_num =*/10, /*port =*/9000);
server.register_handler<hello>();
server.start();
}
```
- Asynchronous rpc server
```cpp
#include <ylt/coro_rpc/async_rpc_server.hpp>
std::string hello() { return "hello coro_rpc"; }
int main() {
async_rpc_server server(/*thread_num =*/10, /*port =*/9000);
server.register_handler<hello, echo>();
server.start();
}
```
Compile-time syntax checks
coro_rpc does a compile-time check on the legality of the arguments when it is called, e.g.:
```cpp
inline std::string echo(std::string str) { return str; }
```
When client is called via:
```cpp
client.call<echo>(42); //Parameter mismatch, compile error
client.call<echo>(); //Missing parameters, compile error
client.call<echo>("", 0); //Redundant parameters, compile error
client.call<echo>("hello, coro_rpc");//Parameters match, OK
```
# Benchmark
## System Configuration

View File

@ -0,0 +1,433 @@
# coro_rpc Server Introduction
## Server registration and startup
### Function registration
Before starting the RPC server, we need to call the `register_handler<>` function to register all RPC functions. Registration is not thread-safe and cannot be done after the RPC server has started.
```cpp
void hello();
Lazy<std::string_view> echo(std::string_view);
int add(int a, int b);
int regist_rpc_funtion(coro_rpc_server& server) {
server.register_handler<hello, echo, add>();
}
```
### Start the server
We can start a server by calling the `.start()` method, which will block the current thread until the server exits.
```cpp
int start_server() {
coro_rpc_server server;
regist_rpc_funtion(server);
coro_rpc::err_code ec = server.start();
/*block util server down*/
}
```
If you do not want to block the current thread, we also allow asynchronously starting a server by calling `async_start()`. After this function returns, it ensures that the server has already started listening on the port (or an error has occurred). Users can check `async_simple::Future<coro_rpc::error_code>::hasResult()` to determine whether the server is currently started successfully and running normally. Calling the `async_simple::Future<coro_rpc::error_code>::get()` method can then be used to wait for the server to stop.
```cpp
int start_server() {
coro_rpc_server server;
regist_rpc_funtion(server);
async_simple::Future<coro_rpc::err_code> ec = server.async_start(); /*won't block here */
assert(!ec.hasResult()) /* check if server start success */
auto err = ec.get(); /*block here util server down then return err code*/
}
```
coro_rpc supports the registration and calling of three types of RPC functions:
1. Ordinary RPC Functions
2. Coroutine RPC Functions
3. Callback RPC Functions
## Ordinary RPC Functions
If a function is neither a coroutine nor its first parameter is of type `coro_rpc::context<T>`, then this RPC function is an ordinary function.
For example, the following functions are ordinary functions:
```cpp
int add(int a, int b);
std::string_view echo(std::string_view str);
struct dummy {
std::string_view echo(std::string_view str) { return str; }
};
```
### Calling model
Synchronous execution is a definite characteristic of ordinary functions. When a connection submits a request for an ordinary function, the server will execute that function on the I/O thread associated with that connection, and it will continue to do so until the function has been completed. Only then will the result be sent back to the client, and subsequent requests from that connection will be addressed. For instance, if a client sends two requests, A and B, in sequence, we guarantee that B will be executed only after A has finished.
It's important to note that performing time-consuming operations within a function can not only block the current connection but may also impede other connections that are bound to the same I/O thread. Therefore, in scenarios where performance is of high concern, one should not register ordinary functions that are too taxing. Instead, one might want to consider the use of coroutine functions or callback functions as an alternative.
### Retrieving Context Information
When a function is called by coro_rpc_server, the following code can be used to obtain context information about the connection.
```cpp
using namespace coro_rpc;
void test() {
context_info_t* ctx = coro_rpc::get_context();
if (ctx->has_closed()) { // Check if the connection has been closed
throw std::runtime_error("Connection is closed!");
}
// Retrieve the connection ID and request ID
ELOGV(INFO, "Call function echo_with_attachment, connection ID: %d, request ID: %d",
ctx->get_connection_id(), ctx->get_request_id());
// Obtain the client's IP and port as well as the server's IP and port
ELOGI << "Remote endpoint: " << ctx->get_remote_endpoint() << ", local endpoint: "
<< ctx->get_local_endpoint();
// Get the name of the RPC function
ELOGI << "RPC function name: " << ctx->get_rpc_function_name();
// Get the request attachment
std::string_view sv{ctx->get_request_attachment()};
// Release the request attachment
std::string str = ctx->release_request_attachment();
// Set the response attachment
ctx->set_response_attachment(std::move(str));
}
```
An attachment is an additional piece of data that accompanies an RPC request. coro_rpc does not serialize it. Users can obtain a view of the request's accompanying attachment or release it from the context for separate manipulation. Similarly, users can also set the attachment to be sent back to the RPC client in the response.
### Error Handling
We allow the termination of an RPC call and the return of RPC error codes and error messages to the user by throwing a `coro_rpc::rpc_error` exception.
```cpp
void rpc() {
throw coro_rpc::rpc_error{coro_rpc::errc::io_error}; // Return a custom error code
throw coro_rpc::rpc_error{10404}; // Return a custom error code
throw coro_rpc::rpc_error{10404, "404 Not Found"}; // Return a custom error code and error message
}
```
An RPC error code is a 16-bit unsigned integer. The range 0-255 is reserved for error codes used by the RPC framework itself, whereas user-defined error codes can be any integer within [256, 65535]. When an RPC returns a user-defined error code, the connection will not be closed. However, if the returned error code is one reserved by the RPC framework and indicates a severe RPC error, it will result in the disconnection of the RPC link.
## Coroutine RPC Functions
If an RPC function has a return type of `async_simple::coro::Lazy<T>`, then it's considered a coroutine function. Compared to ordinary functions, coroutine functions are asynchronous, which means they can yield the I/O thread while waiting for events to complete, thus improving concurrency performance.
For instance, the following RPC function uses a coroutine to submit a heavy computation task to the global thread pool, thereby avoiding blocking the current I/O thread.
```cpp
using namespace async_simple::coro;
int heavy_calculate(int value);
Lazy<int> calculate(int value) {
auto val = co_await coro_io::post([value](){return heavy_calculate(value);}); //将任务提交到全局线程池执行让出当前IO线程直到任务完成。
co_return val;
}
```
Users can also use async_simple::Promise<T> to submit tasks to a custom thread pool:
```cpp
using namespace async_simple::coro;
void heavy_calculate(int value);
Lazy<int> calculate(int value) {
async_simple::Promise<int> p;
std::thread th([&p,value](){
auto ret = heavy_calculate(value);
p.setValue(ret); // Task completed, wake up the RPC function
});
th.detach();
auto ret = co_await p.get_future(); // Wait for the task to complete
co_return ret;
}
```
### Calling Model
When a connection submits a coroutine function request, the server will start a new coroutine on the I/O thread that the connection is bound to and execute the function within this new coroutine. Once the coroutine function completes, the server will send the RPC result back to the client based on its return value. If the coroutine yields during execution, the I/O thread will continue to execute other tasks, such as handling the next request or managing other connections bound to the same I/O thread.
For example, consider the following code:
```cpp
using namespace async_simple::coro;
Lazy<void> sleep(int seconds) {
co_await coro_io::sleep(1s * seconds); // Yield the coroutine here
co_return;
}
```
Then the server register and start:
```cpp
using namespace coro_rpc;
void start() {
coro_rpc_server server(/* thread = */1,/* port = */ 8801);
server.register_handler<sleep>();
server.start();
}
```
The client invokes the sleep function twice consecutively on the same connection, sleeping for 2 seconds the first time and 1 second the second time.
```cpp
using namespace async_simple::coro;
using namespace coro_rpc;
Lazy<void> call() {
coro_rpc_client cli,cli2;
co_await cli.connect("localhost:8801");
co_await cli2.connect("localhost:8801");
auto handler1 = co_await cli.send_request<sleep>(2);
auto handler2 = co_await cli.send_request<sleep>(1);
auto handler3 = co_await cli2.send_request<sleep>(0);
handler2.start([](auto&&){
std::cout<<"handler2 return"<<std::endl;
});
handler3.start([](auto&&){
d::cout<<"handler3 return"<<std::endl;
});
co_await handler1;
std::cout<<"handler1 return"<<std::endl;
}
```
Under normal circumstances, handler3 will return first, followed by handler2, and then handler1. Although the server has only one I/O thread for executing RPC functions, the coroutine function will yield the current coroutine when calling `coro_io::sleep`, thus ensuring other connections are not blocked.
### Obtaining Context Information
When a coroutine function is called by the coro_rpc_server, it can call `coro_io::get_context_in_coro()` to obtain context information. It is important to note that calling `coro_io::get_context()` at this time will result in incorrect context information being obtained.
```cpp
using namespace coro_rpc;
using namespace async_simple::coro;
Lazy<void> test() {
context_info_t* ctx = co_await coro_rpc::get_context_in_coro();
}
```
### Error Handling
Similar to regular functions, we can return RPC errors by throwing the `coro_rpc::rpc_error` exception, allowing for customized RPC error codes and messages.
## Callback RPC Functions
We also support the more traditional callback functions to implement asynchronous RPC calls. The syntax for a callback function is as follows:
```cpp
void echo(coro_rpc::context</* return type = */ std::string_view>, std::string_view param);
```
If a function's return type is `void` and the first parameter is of type `coro_rpc::context<T>`, then this function is a callback function. The `coro_rpc::context<T>` is similar to a smart pointer, holding the callback handle and context information for this RPC call.
For example, in the code below, we copy `coro_rpc::context<std::string_view>` to another thread, which then sleeps for 30 seconds before returning the result to the RPC client by calling `coro_rpc::context<std::string_view>::response_msg()`.
```cpp
using namespace std::chrono;
void echo(coro_rpc::context<std::string_view> ctx, std::string_view param) {
std::thread th([ctx, param](){
std::this_thread::sleep_for(30s);
ctx.response_msg(param);
});
return;
}
```
It should be noted that view types in the RPC function parameters, such as std::string_view and std::span, will have their underlying data become invalid after all copies of the coro_rpc::context<T> object for this RPC call are destructed.
### Calling Model
When a connection receives a request for a callback function, the I/O thread allocated to that connection will immediately execute the function until it is completed, after which other requests will be processed. Since callback functions do not have return values, the server does not immediately reply to the client after the RPC function is executed.
When the user calls `coro_rpc::context<T>::response_msg` or `coro_rpc::context<T>::response_error`, the RPC server will be notified, and only then will the result be sent to the client. Therefore, users must ensure that they actively call one of the callback functions at some point in their code.
### Obtaining Context Information
In callback functions, we can call `coro_rpc::context<T>::get_context_info()` to obtain the coroutine's context information. Additionally, `coro_io::get_context()` can be used to obtain context information before the RPC function returns. However, after the RPC function has returned, the context information pointed to by `coro_io::get_context()` may be modified or invalid. Therefore, it's recommended to use `coro_rpc::context<T>::get_context_info()` to obtain context information.
```cpp
void echo(coro_rpc::context<void> ctx) {
context_info_t* info = ctx.get_context_info();
return;
}
```
### Error Handling
In callback functions, one should not and cannot return RPC errors by throwing exceptions, because the error might not occur within the call stack of the RPC function. Instead, we can use the coro_rpc::context<T>::response_error() function to return RPC errors.
```cpp
void echo(coro_rpc::context<void> ctx) {
ctx.response_error(10015); // Custom RPC error code
ctx.response_error(10015, "my error msg"); // Custom RPC error code and error message
ctx.response_error(coro_rpc::errc::io_error); // Using the built-in error code of the RPC framework
return;
}
```
The RPC error code is a 16-bit unsigned integer. The range 0-255 is reserved for error codes used by the RPC framework, and user-defined error codes can be any integer between [256, 65535]. When an RPC returns a user-defined error code, the connection will not be terminated. However, if an error code from the RPC framework is returned, it is considered a serious RPC error, leading to the disconnection of the RPC link.
## Connections and I/O Threads
The server internally has an I/O thread pool, the size of which defaults to the number of logical threads of the CPU. After the server starts, it launches a listening task on one of the I/O threads to accept connections from clients. Each time a connection is accepted, the server selects an I/O thread through round-robin to bind it to. Subsequently, all steps including data transmission, serialization, RPC routing, etc., of that connection are executed on this I/O thread. The RPC functions are also executed on the same I/O thread.
This means if your RPC functions will block the current thread (e.g., thread sleep, synchronous file read/write), it is better to make them asynchronous to avoid blocking the I/O thread, thereby preventing other requests from being blocked. For example, `async_simple` provides coroutine locks such as `Mutex` and `Spinlock`, and components such as `Promise` and `Future` that wrap asynchronous tasks as coroutine tasks. `coro_io` offers coroutine-based asynchronous file read/write, asynchronous read/write of sockets, sleep, and the `period_timer` timer. It also allows submitting high-CPU-load tasks to the global blocking task thread pool through `coro_io::post`. coro_rpc/coro_http offer coroutine-based asynchronous RPC and HTTP calls, respectively. easylog by default submits log content to a background thread for writing, ensuring the foreground does not block.
## Parameter and Return Value Types
coro_rpc allows users to register rpc functions with multiple parameters (up to 64), and the types of arguments and return values can be user-defined aggregate structures. They also support various data structures provided by the C++ standard library and many third-party libraries. For details, see: [struct_pack type system](https://alibaba.github.io/yalantinglibs/en/struct_pack/struct_pack_type_system.html)
If your rpc argument or return value type is not supported by the struct_pack type system, we also allow users to register their own structures or custom serialization algorithms. For more details, see: [Custom feature](https://alibaba.github.io/yalantinglibs/en/struct_pack/struct_pack_intro.html#custom-type)
## RPC Return Value Construction and Checking
Furthermore, for callback functions, coro_rpc will try to construct the return value type from the parameter list. If it fails to construct, it will lead to a compilation failure.
```cpp
void echo(coro_rpc::context<std::string> ctx) {
ctx.response_msg(); // Unable to construct std::string. Compilation fails.
ctx.response_msg(42); // Unable to construct std::string. Compilation fails.
ctx.response_msg(42,'A'); // Able to construct std::string, compilation passes.
ctx.response_msg("Hello"); // Able to construct std::string, compilation passes.
return;
}
```
## SSL Support
coro_rpc supports encrypting connections with OpenSSL. After installing OpenSSL and importing yalantinglibs into your project using cmake's `find_package/fetch_content`, you can enable SSL support by turning on the cmake option `YLT_ENABLE_SSL=ON`. Alternatively, you can manually add the macro `YLT_ENABLE_SSL` and manually link OpenSSL.
Once SSL support is enabled, users can call the `init_ssl` function before connecting to the server. This will establish an encrypted link between the client and the server. It should be noted that the coro_rpc server also must have SSL support enabled at compile time.
```cpp
coro_rpc_server server;
server.init_ssl({
.base_path = "./", // Base path of ssl files.
.cert_file = "server.crt", // Path of the certificate relative to base_path.
.key_file = "server.key" // Path of the private key relative to base_path.
});
```
After enabling SSL support, the server will reject all non-SSL connections.
## Advanced Settings
We provide the coro_rpc::config_t class, which allows users to set the details of the server:
```cpp
struct config_base {
bool is_enable_tcp_no_delay = true; /* Whether to respond immediately to tcp requests */
uint16_t port = 9001; /* Listening port */
unsigned thread_num = std::thread::hardware_concurrency(); /* Number of connections used internally by rpc server, default is the number of logical cores */
std::chrono::steady_clock::duration conn_timeout_duration =
std::chrono::seconds{0}; /* Timeout duration for rpc requests, 0 seconds means rpc requests will not automatically timeout */
std::string address="0.0.0.0"; /* Listening address */
/* The following settings are only applicable if SSL is enabled */
std::optional<ssl_configure> ssl_config = std::nullopt; // Configure whether to enable ssl
};
struct ssl_configure {
std::string base_path; // Base path of ssl files.
std::string cert_file; // Path of the certificate relative to base_path.
std::string key_file; // Path of the private key relative to base_path.
std::string dh_file; // Path of the dh_file relative to base_path (optional).
}
int start() {
coro_rpc::config_t config{};
coro_rpc_server server(config);
/* Register rpc function here... */
server.start();
}
```
## Registration and Invocation of Special RPC Functions
### Registration and Invocation of Member Functions
coro_rpc supports registering and invoking member functions:
For example, consider the following function:
```cpp
struct dummy {
std::string_view echo(std::string_view str) { return str; }
Lazy<std::string_view> coroutine_echo(std::string_view str) {co_return str;}
void callback_echo(coro_rpc::context</*return type = */ std::string_view> ctx, std::string_view str) {
ctx.response_msg(str);
}
};
```
The server can register these functions like this:
```cpp
#include "rpc_service.h"
#include <ylt/coro_rpc/coro_rpc_server.hpp>
int main() {
coro_rpc_server server;
dummy d{};
server.register_handler<&dummy::echo,&dummy::coroutine_echo,&dummy::callback_echo>(&d); // regist member function
server.start();
}
```
It's important to note that the lifecycle of the registered dummy type must be considered to ensure it remains alive while the server is running. Otherwise, the invoking behavior is undefined.
The client can call these functions like this:
```cpp
#include "rpc_service.h"
#include <coro_rpc/coro_rpc_client.hpp>
Lazy<void> test_client() {
coro_rpc_client client;
co_await client.connect("localhost", /*port =*/"9000");
// calling RPC
{
auto result = co_await client.call<&dummy::echo>("hello");
assert(result.value() == "hello");
}
{
auto result = co_await client.call<&dummy::coroutine_echo>("hello");
assert(result.value() == "hello");
}
{
auto result = co_await client.call<&dummy::callback_echo>("hello");
assert(result.value() == "hello");
}
}
```
### Specialized Template Functions
coro_rpc allows users to register and call specialized template functions.
For example, consider the following function:
```cpp
template<typename T>
T echo(T param) { return param; }
```
The server can register these functions like this:
```cpp
#include <ylt/coro_rpc/coro_rpc_server.hpp>
using namespace coro_rpc;
int main() {
coro_rpc_server server;
server.register_handler<echo<int>,echo<std::string>,echo<std::vector<int>>>(&d); // Register specialized template functions
server.start();
}
```
The client can call like this:
```cpp
using namespace coro_rpc;
using namespace async_simple::coro;
Lazy<void> rpc_call(coro_rpc_client& cli) {
assert(co_await cli.call<echo<int>>(42).value() == 42);
assert(co_await cli.call<echo<int>>("Hello").value() == "Hello");
assert(co_await cli.call<echo<int>>(std::vector{1,2,3}).value() == std::vector{1,2,3});
}
```

View File

@ -0,0 +1,265 @@
# coro_rpc客户端介绍
## 基本使用
类`coro_rpc::coro_rpc_client`是coro_rpc的客户端用户可以通过它向服务器发送rpc请求。
下面我们展示rpc_client的基本使用方法。
```cpp
using namespace async_simple;
using namespace coro_rpc;
int add(int a,int b);
Lazy<void> example() {
coro_rpc_client client;
coro_rpc::err_code ec = co_await client.connect("localhost:9001");
if (ec) { /*判断连接是否出错*/
std::cout<<ec.message()<<std::endl;
co_return;
}
rpc_result result = co_await client.call<add>(1,2);
/*rpc_result是一个expected<T,rpc_error>类型其中T为rpc返回值*/
if (!result.has_value()) {
/*调用result.error()获取rpc错误信息*/
std::cout<<"error code:"<<result.error().val()<< ", error message:"<<result.error().msg()<<std::endl;
co_return;
}
assert(result.value()==3); /*调用result.value()获取rpc返回值*/
}
```
我们可以通过`set_req_attachment`函数设置本次请求的attachment这是一段不会经过序列化直接发送给客户端的二进制数据。同样的我们也可以使用`get_resp_attachment()`和`release_resp_attachment()`来获取rpc请求返回的attachment。
```cpp
using namespace async_simple;
using namespace coro_rpc;
void attachment_echo() {
auto ctx=coro_rpc::get_context();
ctx->set_resp_attachment(ctx->get_req_attachment());
}
Lazy<std::string> example(coro_rpc_client& client, std::string_view attachment) {
client.set_req_attachment(attachment);
rpc_result result = co_await client.call<attachment_echo>();
if (result.has_value()) {
assert(result.get_resp_attachment()==attachment);
co_return std::move(result.release_resp_attachment());
}
co_return "";
}
```
默认情况下rpc客户端发送请求/建立连接后会等待5秒如果5秒后仍未收到响应则会返回超时错误。
用户也可以通过调用`call_for`函数自定义等待的时长。
```cpp
client.connect("127.0.0.1:9001", std::chrono::seconds{10});
auto result = co_await client.call_for<add>(std::chrono::seconds{10},1,2);
assert(result.value() == 3);
```
时长可以是任一的`std::chrono::duration`类型,常见的如`std::chrono::seconds`,`std::chrono::millseconds`。
特别的如果时长为0代表该函数调用永远也不会超时。
## SSL支持
coro_rpc支持使用openssl对连接进行加密。在安装openssl并使用cmake find_package/fetch_content 将yalantinglibs导入到你的工程后可以打开cmake选项`YLT_ENABLE_SSL=ON`启用ssl支持。或者你也可以手动添加宏`YLT_ENABLE_SSL`并手动链接openssl。
当启用ssl支持后用户可以调用`init_ssl`函数然后再连接到服务器。这会使得客户端与服务器之间建立加密的链接。需要注意的是coro_rpc服务端在编译时也必须启用ssl支持并且在启动服务器之前也需要调用`init_ssl`方法来启用SSL支持。
```cpp
client.init_ssl("./","server.crt");
```
第一个字符串代表SSL证书所在的基本路径第二个字符串代表SSL证书相对于基本路径的相对路径。
当建立连接时,客户端会使用该证书校验服务端发来的证书,以避免中间人攻击。因此,客户端必须持有服务端使用的证书或其根证书。
## RPC参数的转换与编译期检查
coro_rpc会在调用的时候对参数的合法性做编译期检查比如对于如下rpc函数
```cpp
inline std::string echo(std::string str) { return str; }
```
接下来当前client调用rpc函数时
```cpp
client.call<echo>(42);// The argument does not match, a compilation error occurs.
client.call<echo>();// Missing argument, compilation error occurs.
client.call<echo>("", 0);// There are too many arguments, a compilation error occurs.
client.call<echo>("hello, coro_rpc");// The string literal can be converted to std::string, compilation succeeds.
```
## 连接选项
coro_rpc_client提供了`init_config`函数,用于配置连接选项。下面这份代码会列出可配置的选项。
```cpp
using namespace coro_rpc;
using namespace std::chrono;
void set_config(coro_rpc_client& client) {
client.init_config(config{
.timeout_duration = 5s //请求和连接的超时时间
.host = "localhost" // 服务器域名
.port = "9001" // 服务器端口
.enable_tcp_no_delay = true //是否禁止socket底层延迟发送请求
/*以下选项只在激活ssl支持后可用*/
.ssl_cert_path = "./server.crt" //ssl证书路径
.ssl_domain = "localhost"
});
}
```
## 调用模型
每一个`coro_rpc_client`都会绑定到某个IO线程上默认通过轮转法从全局IO线程池中选择一个连接用户也可以手动绑定到特定的IO线程上。
```cpp
auto executor=coro_io::get_global_executor();
coro_rpc_client client(executor),client2(executor);
// 两个客户端都被绑定到同一个io线程上
```
每次发起一个基于协程的IO任务如`connect`,`call`,`send_request`客户端内部会将IO事件提交给操作系统当IO事件完成后再将协程恢复到绑定的IO线程上继续执行。
例如以下代码调用connect之后任务将切换到IO线程执行。
```cpp
/*run in thread 1*/
coro_rpc_client cli;
co_await cli.connect("localhost:9001");
/*run in thread 2*/
do_something();
```
## 连接池与负载均衡
`coro_io`提供了连接池`client_pool`与负载均衡器`channel`。用户可以通过连接池`client_pool`来管理`coro_rpc`/`coro_http`连接,可以使用`channel`实现多个host之间的负载均衡。具体请见`coro_io`的文档。
## 连接复用
`coro_rpc_client` 可以通过 `send_request`函数实现连接复用。该函数是线程安全的允许多个线程同时调用同一个client的 `send_request`方法。该函数返回值为`Lazy<Lazy<async_rpc_result<T>`。第一次`co_await`可以等待请求发送,再次`co_await`则等待rpc返回结果。
连接复用允许我们在高并发下减少连接的个数,无需创建新的连接。同时也能提高每个连接的吞吐量。
下面是一段简单的示例代码:
```cpp
using namespace coro_rpc;
using namespace async_simple::coro;
std::string_view echo(std::string_view);
Lazy<void> example(coro_rpc_client& client) {
//先等待请求发送完毕
Lazy<async_rpc_result> handler = co_await client.send_request<echo>("Hello");
//然后等待服务器返回rpc请求结果
async_rpc_result result = co_await handler;
if (result) {
assert(result->result() == "Hello");
}
else {
// error handle
std::cout<<result.error().msg()<<std::endl;
}
}
```
我们可以多次调用send_request实现连接复用
```cpp
using namespace coro_rpc;
using namespace async_simple::coro;
std::string_view echo(std::string_view);
Lazy<void> example(coro_rpc_client& client) {
std::vector<Lazy<async_rpc_result>> handlers;
//首先连续发送10个请求
for (int i=0;i<10;++i) {
handlers.push_back(co_await client.send_request<echo>(std::to_string(i)));
}
//接下来等待所有的请求返回
std::vector<async_rpc_result> results = co_await collectAll(std::move(handlers));
for (int i=0;i<10;++i) {
assert(results[i]->result() == std::to_string(i));
}
co_return;
}
```
### Attachment
使用`send_request`方法时,由于可能同时发送多个请求,因此我们不能调用`set_req_attachment`方法向服务器发送attachment同样也不能调用`get_resp_attachment`和`release_resp_attachment`方法来获取服务器返回的attachment。
我们可以通过调用`send_request_with_attachment`函数在发送请求时设置attachment。我们也可以通过调用async_rpc_result的`->get_attachment()`方法和`->release_buffer()`方法来获取attachment。
```cpp
using namespace coro_rpc;
using namespace async_simple::coro;
int add(int a, int b);
Lazy<std::string> example(coro_rpc_client& client) {
async_rpc_result result = co_await co_await client.send_request_with_attachment<echo>("Hello", 1, 2);
assert(result->result() == 3);
assert(result->get_attachment() == "Hello");
co_return std::move(result->release_buffer().resp_attachment_buf_);
}
```
### 执行顺序
当调用的rpc函数是协程rpc函数或回调rpc函数时rpc请求不一定会按顺序执行服务端可能会同时执行多个rpc请求。
例如,假如有以下代码:
```cpp
using namespace async_simple::coro;
Lazy<void> sleep(int seconds) {
co_await coro_io::sleep(1s * seconds); // 在此处让出协程
co_return;
}
```
服务器注册并启动:
```cpp
using namespace coro_rpc;
void start() {
coro_rpc_server server(/* thread = */1,/* port = */ 8801);
server.register_handler<sleep>();
server.start();
}
```
客户端连续在同一个连接上调用两次sleep函数第一次sleep2秒第二次sleep1秒。
```cpp
using namespace async_simple::coro;
using namespace coro_rpc;
Lazy<void> call() {
coro_rpc_client cli,cli2;
co_await cli.connect("localhost:8801");
co_await cli2.connect("localhost:8801");
auto handler1 = co_await cli.send_request<sleep>(2);
auto handler2 = co_await cli.send_request<sleep>(1);
auto handler3 = co_await cli2.send_request<sleep>(0);
handler2.start([](auto&&){
std::cout<<"handler2 return"<<std::endl;
});
handler3.start([](auto&&){
d::cout<<"handler3 return"<<std::endl;
});
co_await handler1;
std::cout<<"handler1 return"<<std::endl;
}
```
正常情况下handler3会先返回随后是handler2最后是handler1。尽管服务器只有一个IO线程用于执行rpc函数但协程函数会在调用`coro_io::sleep`时让出该协程,从而保证其他连接不会被阻塞。
### socket延迟发送
当使用连接复用时,可以尝试将选项中的`enable_tcp_no_delay`设为`false`,这允许底层实现将多个小请求打包后一起发送,从而提高吞吐量,但是可能会导致延迟上升。
## 线程安全
对于多个coro_rpc_client实例它们之间互不干扰可以分别在不同的线程中安全的调用。
单个coro_rpc_client在多个线程同时调用时需要注意只有部分成员函数是线程安全的包括`send_request()`,`close()`,`connect()`,`get_executor()`,`get_pipeline_size()`,`get_client_id()`,`get_config()`等。如果用户没有重新调用connect()函数并传入endpoint或hostname那么`get_port()`,`get_host()`函数也是线程安全的。
需要注意,`call`,`get_resp_attachment`,`set_req_attachment`,`release_resp_attachment`和`init_config`函数均不是线程安全的,禁止多个线程同时调用。此时只能使用`send_request`实现多个线程并发请求同一个连接。

View File

@ -1,7 +1,7 @@
# coro_rpc简介
coro_rpc是用C++20开发的基于无栈协程和编译期反射的高性能的rpc库在单机上echo测试qps达到2000万(详情见benchmark部分)
coro_rpc是用C++20开发的基于无栈协程和编译期反射的高性能的rpc库96核cpu的单机上echo测试qps达到2000万(pipeline模式)或450万(ping-pong模式2000连接)(详情见benchmark部分)
性能远高于grpc和brpc等rpc库。然而高性能不是它的主要特色coro_rpc的主要特色是易用性免安装包含头文件就可以用几行代码就可以完成一个rpc服务器和客户端。
coro_rpc的设计理念是以易用性为核心回归rpc本质让用户专注于业务逻辑而不是rpc框架细节几行代码就可以完成rpc开发。
@ -16,7 +16,7 @@ rpc的本质是什么rpc的本质就是一个远程函数除了rpc底层
```cpp
// rpc_service.hpp
inline std::string echo(std::string str) { return str; }
inline std::string_view echo(std::string_view str) { return str; }
```
2.注册rpc函数和启动server
@ -25,6 +25,8 @@ inline std::string echo(std::string str) { return str; }
#include "rpc_service.hpp"
#include <ylt/coro_rpc/coro_rpc_server.hpp>
using namespace coro_rpc;
int main() {
// 初始化服务器
@ -46,10 +48,12 @@ rpc_client端
#include "rpc_service.hpp"
#include <ylt/coro_rpc/coro_rpc_client.hpp>
using namespace coro_rpc;
using namespace async_simple::coro;
Lazy<void> test_client() {
coro_rpc_client client;
co_await client.connect("localhost", /*port =*/"9000");
auto r = co_await client.call<echo>("hello coro_rpc"); //传参数调用rpc函数
std::cout << r.result.value() << "\n"; //will print "hello coro_rpc"
}
@ -64,84 +68,6 @@ client调用rpc函数也同样简单56行代码就可以实现rpc调用了
相信上面的这个简单的例子已经充分展示了coro_rpc的易用性和特点了也体现了rpc的本质即用户可以像调用本地函数那样调用远程函数用户只需要关注rpc函数的业务逻辑即可。
coro_rpc的接口易用性还体现在rpc函数几乎没有任何限制rpc函数可以拥有任意多个参数参数的序列化和反序列化由rpc库自动完成用户无需关心。rpc库支持的参数类型相当广泛详见[struct_pack的类型系统](https://alibaba.github.io/yalantinglibs/zh/struct_pack/struct_pack_type_system.html)
## rpc函数支持任意参数
```cpp
// rpc_service.h
// 客户端只需要包含这个头文件即可无需把rpc的定义暴露给客户端。
void hello(){};
int get_value(int a, int b){return a + b;}
struct person {
int id;
std::string name;
int age;
};
person get_person(person p, int id);
struct dummy {
std::string echo(std::string str) { return str; }
};
// rpc_service.cpp
#include "rpc_service.h"
int get_value(int a, int b){return a + b;}
person get_person(person p, int id) {
p.id = id;
return p;
}
```
server端
```cpp
#include "rpc_service.h"
#include <ylt/coro_rpc/coro_rpc_server.hpp>
int main() {
coro_rpc_server server(/*thread_num =*/10, /*port =*/9000);
server.register_handler<hello, get_value, get_person>();//注册任意参数类型的普通函数
dummy d{};
server.register_handler<&dummy::echo>(&d); //注册成员函数
server.start(); // 启动server
}
```
client端
```cpp
# include "rpc_service.h"
# include <coro_rpc/coro_rpc_client.hpp>
Lazy<void> test_client() {
coro_rpc_client client;
co_await client.connect("localhost", /*port =*/"9000");
//RPC调用
co_await client.call<hello>();
co_await client.call<get_value>(1, 2);
person p{};
co_await client.call<get_person>(p, /*id =*/1);
auto r = co_await client.call<&dummy::echo>("hello coro_rpc");
std::cout << r.result.value() << "\n"; //will print "hello coro_rpc"
}
int main() {
syncAwait(test_client());
}
```
这里面get_person函数的参数和返回值都是结构体通过编译期反射的序列化库[struct_pack](https://alibaba.github.io/yalantinglibs/zh/struct_pack/struct_pack_intro.html)实现自动的序列化和反序列化,用户无感知,省心省力。
# 和grpc、brpc比较易用性
@ -259,94 +185,34 @@ example::EchoService_Stub stub(&channel);
}
```
coro_rpc协程
coro_rpc协程
客户端:
```cpp
# include <coro_rpc/coro_rpc_client.hpp>
std::string_view echo(std::string_view);
#include <coro_rpc/coro_rpc_client.hpp>
Lazy<void> say_hello(){
coro_rpc_client client;
co_await client.connect("localhost", /*port =*/"9000");
co_await client.connect("localhost", /*port =*/"9000");
while (true){
auto r = co_await client.call<echo>("hello coro_rpc");
assert(r.result.value() == "hello coro_rpc");
}
}
```
服务端:
```cpp
std::string_view echo(std::string_view sv) {
return sv;
}
void start() {
coro_rpc_server server(/*thread num = */10,/* listen port = */9000);
server.register_handler<echo>();
server.start();
}
```
coro_rpc的一大特色就是支持无栈协程让用户以同步方式编写异步代码简洁易懂
# coro_rpc更多特色
## 同时支持实时任务和延时任务
前面展示的例子里没有看到如何将rpc函数的结果response到客户端因为默认情况下coro_rpc框架会帮助用户自动的将rpc函数的结果自动序列化并发送到客户端让用户完全无感知只需要专注于业务逻辑。需要说明的是这种场景下rpc函数的业务逻辑是在io线程中执行的这适合对于实时性要求较高的场景下使用缺点是会阻塞IO线程。如果用户不希望在io线程中去执行业务逻辑而是放到线程或线程池中去执行并延迟发送消息该怎么做呢
coro_rpc已经考虑到了这个问题coro_rpc认为rpc任务分为实时任务和延时的任务实时任务在io线程中执行完成后立即发送给客户端实时性最好延时最低延时任务则可以放到独立线程中执行延时处理在未来某个时刻再将结果发送给客户端coro_rpc同时支持这两种任务。
将之前实时任务改成延时任务
```cpp
#include <ylt/coro_rpc/context.hpp>
//实时任务io线程中实时处理和发送结果
std::string echo(std::string str) { return str; }
//延时任务,在另外的独立线程中处理并发送结果
void delay_echo(coro_rpc::context<std::string> conn, std::string str) {
std::thread([conn, str]{
conn.response_msg(str); //在独立线程中发送rpc结果
}).detach();
}
```
## 服务端同时支持协程和异步回调
coro_rpc server推荐使用协程去开发但同时也支持异步回调模式用户如果不希望使用协程则可以使用经典的异步回调模式。
基于协程的rpc server
```cpp
#include <ylt/coro_rpc/coro_rpc_server.hpp>
std::string hello() { return "hello coro_rpc"; }
int main() {
coro_rpc_server server(/*thread_num =*/10, /*port =*/9000);
server.register_handler<hello>();
server.start();
}
```
基于异步回调的rpc server
```cpp
#include <ylt/coro_rpc/async_rpc_server.hpp>
std::string hello() { return "hello coro_rpc"; }
int main() {
async_rpc_server server(/*thread_num =*/10, /*port =*/9000);
server.register_handler<hello>();
server.start();
}
```
rpc调用编译期安全检查
coro_rpc会在调用的时候对参数的合法性做编译期检查比如:
```cpp
inline std::string echo(std::string str) { return str; }
```
client调用rpc
```cpp
client.call<echo>(42);//参数不匹配,编译报错
client.call<echo>();//缺少参数,编译报错
client.call<echo>("", 0);//多了参数,编译报错
client.call<echo>("hello, coro_rpc");//参数匹配ok
```
# benchmark
## 测试环境

View File

@ -0,0 +1,431 @@
# coro_rpc服务端介绍
## 服务器的注册与启动
### 函数注册
在启动rpc服务器之前我们需要调用`register_handler<>`函数注册所有的rpc函数。注册不是线程安全的不能在启动rpc服务器后再注册。
```cpp
void hello();
Lazy<std::string_view> echo(std::string_view);
int add(int a, int b);
int regist_rpc_funtion(coro_rpc_server& server) {
server.register_handler<hello, echo, add>();
}
```
### 启动服务器
我们可以通过调用`.start()`方法,启动一个服务器,这会阻塞当前线程直到服务器退出为止。
```cpp
int start_server() {
coro_rpc_server server;
regist_rpc_funtion(server);
coro_rpc::err_code ec = server.start();
/*block util server down*/
}
```
如果不想阻塞当前线程,我们也允许通过`async_start()`异步启动一个服务器,该函数返回后,保证服务器已经开始监听端口(或发生错误)。用户可以通过检查`async_simple::Future<coro_rpc::error_code>::hasResult()`来判断服务器当前是否启动成功并正常运行。调用`async_simple::Future<coro_rpc::error_code>::get()`方法则可以等待服务器停止。
```cpp
int start_server() {
coro_rpc_server server;
regist_rpc_funtion(server);
async_simple::Future<coro_rpc::err_code> ec = server.async_start(); /*won't block here */
assert(!ec.hasResult()) /* check if server start success */
auto err = ec.get(); /*block here util server down then return err code*/
}
```
coro_rpc支持注册并调用的rpc函数有三种
1. 普通函数
2. 协程函数
3. 回调函数
## 普通rpc函数
如果一个函数既不是协程,同时函数的第一个参数也不是`coro_rpc::context<T>`类型那么这个rpc函数就是一个普通函数。
例如,以下函数都是普通函数:
```cpp
int add(int a, int b);
std::string_view echo(std::string_view str);
struct dummy {
std::string_view echo(std::string_view str) { return str; }
};
```
### 调用模型
普通函数一定是同步执行的。当某个连接发来一个普通函数请求时服务器会在该连接绑定的IO线程上执行该函数直到函数执行完毕然后向客户端返回结果随后才会处理该连接的下一个请求。例如客户端按顺序发送了两个请求A和B则我们保证B一定在A之后执行。
需要注意的是如果在函数内执行长时间的耗时操作不但会阻塞当前连接还有可能会阻塞其他被绑定到该IO线程上的连接。因此在对性能有较高要求的场景中不应该注册过于耗时的普通函数。可以考虑使用协程函数或回调函数来代替。
### 获取上下文信息
当函数被coro_rpc_server调用时可以用下面代码来获取连接的上下文信息。
```cpp
using namespace coro_rpc;
void test() {
context_info_t* ctx = coro_rpc::get_context();
if (ctx->has_closed()) { //检查连接是否被关闭
throw std::runtime_error("connection is close!");
}
//获取连接ID和请求ID
ELOGV(INFO, "call function echo_with_attachment, conn ID:%d, request ID:%d",
ctx->get_connection_id(), ctx->get_request_id());
//获取客户端的ip和端口与服务端的ip和端口
ELOGI << "remote endpoint: " << ctx->get_remote_endpoint() << "local endpoint"
<< ctx->get_local_endpoint();
//获取rpc函数名
ELOGI << "rpc function name:" << ctx->get_rpc_function_name();
//获取请求attachment
std::string_view sv{ctx->get_request_attachment()};
//释放请求attachment
std::string str = ctx->release_request_attachment();
//设置响应attachment
ctx->set_response_attachment(std::move(str));
}
```
An attachment is an additional piece of data that comes with an RPC request. Coro_rpc does not serialize it, allowing users to obtain a view of the attachment that accompanies the request, or to release it from the context and move it separately. Similarly, users can also set the attachment to be sent back to the RPC client.
### 错误处理
我们允许通过抛出`coro_rpc::rpc_error`异常的方式来终止rpc调用并将rpc错误码和错误信息返回给用户。
```cpp
void rpc() {
throw coro_rpc::rpc_error{coro_rpc::errc::io_error}; // 返回自定义错误码
throw coro_rpc::rpc_error{10404}; // 返回自定义错误码
throw coro_rpc::rpc_error{10404,"404 Not Found"}; // 返回自定义错误码和错误消息
}
```
rpc错误码是一个16位的无符号整数。其中0-255是保留给rpc框架使用的错误码用户自定义的错误码可以是[256,65535]之间的任一整数。当rpc返回用户自定义错误码时连接不会断开。如果返回的是rpc框架自带的错误码则视为发生了严重的rpc错误会导致rpc连接断开。
## 协程rpc函数
如果一个rpc函数其返回值类型是`async_simple::coro::Lazy<T>`则我们该函数是协程函数。相比普通函数协程函数是异步的它可以在等待事件完成的时候暂时让出IO线程从而提高并发性能。
例如下面这个rpc函数通过协程将重计算任务提交到全局线程池从而避免阻塞当前I/O线程。
```cpp
using namespace async_simple::coro;
int heavy_calculate(int value);
Lazy<int> calculate(int value) {
auto val = co_await coro_io::post([value](){return heavy_calculate(value);}); //将任务提交到全局线程池执行让出当前IO线程直到任务完成。
co_return val;
}
```
用户也可以使用`async_simple::Promise<T>`将任务提交到自定义线程池:
```cpp
using namespace async_simple::coro;
void heavy_calculate(int value);
Lazy<int> calculate(int value) {
async_simple::Promise<int> p;
std::thread th([&p,value](){
auto ret = heavy_calculate(value);
p.setValue(ret); //任务已完成唤醒rpc函数
});
th.detach();
auto ret = co_await p.get_future(); //等待任务完成
co_return ret;
}
```
### 调用模型
当某个连接发来一个协程函数请求时服务器会在该连接绑定的IO线程上启动一个新的协程在新的协程上执行该函数。当协程函数执行完毕后根据其返回值将rpc结果返回给客户端。如果协程在执行的过程中暂停让出则该IO线程就会继续执行其他的任务如处理下一个请求又例如处理其他绑定在该IO线程上的连接
例如,假定有以下代码:
```cpp
using namespace async_simple::coro;
Lazy<void> sleep(int seconds) {
co_await coro_io::sleep(1s * seconds); //在此处让出协程
co_return;
}
```
服务器注册并启动:
```cpp
using namespace coro_rpc;
void start() {
coro_rpc_server server(/* thread = */1,/* port = */ 8801);
server.register_handler<sleep>();
server.start();
}
```
客户端连续在同一个连接上调用两次sleep函数第一次sleep2秒第二次sleep1秒。
```cpp
using namespace async_simple::coro;
using namespace coro_rpc;
Lazy<void> call() {
coro_rpc_client cli,cli2;
co_await cli.connect("localhost:8801");
co_await cli2.connect("localhost:8801");
auto handler1 = co_await cli.send_request<sleep>(2);
auto handler2 = co_await cli.send_request<sleep>(1);
auto handler3 = co_await cli2.send_request<sleep>(0);
handler2.start([](auto&&){
std::cout<<"handler2 return"<<std::endl;
});
handler3.start([](auto&&){
d::cout<<"handler3 return"<<std::endl;
});
co_await handler1;
std::cout<<"handler1 return"<<std::endl;
}
```
正常情况下handler3会先返回随后是handler2最后是handler1。尽管服务器只有一个IO线程用于执行rpc函数但协程函数会在调用`coro_io::sleep`时让出该协程,从而保证其他连接不会被阻塞。
### 获取上下文信息
当协程函数被coro_rpc_server调用时可以调用`coro_io::get_context_in_coro()`来获取上下文信息。需要注意的是,此时调用`coro_io::get_context()`会获取到错误的上下文信息。
```cpp
using namespace coro_rpc;
using namespace async_simple::coro;
Lazy<void> test() {
context_info_t* ctx = co_await coro_rpc::get_context_in_coro();
}
```
### 错误处理
和普通函数相同,我们可以通过抛出`coro_rpc::rpc_error`异常的方式返回rpc错误允许自定义rpc错误码和错误信息。
## 回调rpc函数
我们同样支持更为传统的回调函数来实现异步RPC调用。回调函数的写法如下
```cpp
void echo(coro_rpc::context</* return type = */ std::string_view>, std::string_view param);
```
如果一个函数的返回值是void类型并且第一个参数是`coro_rpc::context<T>`类型,那么这个函数就是回调函数。`coro_rpc::context<T>`类似于一个智能指针持有本次rpc调用的回调句柄和上下文信息。
例如,在下面的代码中,我们将`coro_rpc::context<std::string_view>`拷贝到另外一个线程该线程睡眠30秒后再通过调用`coro_rpc::context<std::string_view>::response_msg()`将结果返回给rpc客户端。
```cpp
using namespace std::chrono;
void echo(coro_rpc::context<std::string_view> ctx, std::string_view param) {
std::thread th([ctx, param](){
std::this_thread::sleep_for(30s);
ctx.response_msg(param);
});
return;
}
```
需要说明的是rpc函数参数中的std::string_viewstd::span等视图类型其指向的内容会在本次rpc调用的`coro_rpc::context<T>`对象的拷贝副本全部被析构后失效。
### 调用模型
当某个连接发来一个回调函数请求时分配给该连接的IO线程会立即执行该函数直到该函数执行完毕。随后处理其他请求。由于回调函数没有返回值因此rpc函数执行完毕后服务器不会立即回复客户端。
当用户调用回调函数`coro_rpc::context<T>::response_msg()`或`coro_rpc::context<T>::response_error()`后rpc服务器会接收到通知此时才会将结果发送给客户端。因此用户必须保证在代码的某个位置主动调用回调函数。
### 获取上下文信息
在回调函数中,我们可以调用`coro_rpc::context<T>::get_context_info()`来获取协程的上下文信息。此外在rpc函数返回之前也可以使用`coro_io::get_context()`获取上下文信息。但是当rpc函数返回以后通过`coro_io::get_context()`指向的上下文信息可能会被修改或变得无效,因此我们还是建议使用`coro_rpc::context<T>::get_context_info()`来获取上下文信息。
```cpp
void echo(coro_rpc::context<void> ctx) {
context_info_t* info = ctx.get_context_info();
return;
}
```
### 错误处理
在回调函数中不应该也不能通过抛出异常的形式来返回rpc错误因为错误可能不会发生在rpc函数的调用栈中。
作为代替,我们可以调用`coro_rpc::context<T>::response_error()`函数来返回rpc错误。
```cpp
void echo(coro_rpc::context<void> ctx) {
ctx.response_error(10015); //自定义rpc错误码
ctx.response_error(10015, "my error msg"); //自定义rpc错误码和错误消息
ctx.response_error(coro_rpc::errc::io_error); //使用rpc框架自带的错误码
return;
}
```
rpc错误码是一个16位的无符号整数。其中0-255是保留给rpc框架使用的错误码用户自定义的错误码可以是[256,65535]之间的任一整数。当rpc返回用户自定义错误码时连接不会断开。如果返回的是rpc框架自带的错误码则视为发生了严重的rpc错误会导致rpc连接断开。
## 连接与IO线程
服务器内部有一个IO线程池其大小默认为cpu的逻辑线程数目。当服务器启动后它会在某个IO线程上启动一个监听任务接收客户端发来的连接。每次接收连接时服务器会通过轮转法选择一个IO线程将其绑定到连接上。随后该连接上各请求收发数据序列化rpc路由等步骤都会在该IO线程上执行。rpc函数也同样会在该IO线程上执行。
这意味着如果你的rpc函数会阻塞当前线程例如线程sleep同步读写文件那么最好通过异步化来避免阻塞io线程从而避免阻塞其他请求。例如`async_simple::coro`提供了协程锁`Mutex`和`Spinlock`,提供了将异步任务包装为协程任务的`Promise`和`Future`组件。`coro_io`提供了基于协程的异步文件读写socket的异步读写`sleep`和定时器`period_timer`,还可通过`coro_io::post`将重CPU任务提交给全局的阻塞任务线程池。`coro_rpc`/`coro_http`提供了基于协程的异步rpc调用和http调用。`easylog`默认会将日志内容提交给后台线程写入,从而保证前台不阻塞。
## 参数与返回值类型
coro_rpc允许用户注册的rpc函数具有多个参数最多64个参数和返回值的类型可以是用户自定义的聚合结构体也支持了各种c++标准库提供的数据结构和许多第三方库提供的数据结构。详见:[struct_pack的类型系统](https://alibaba.github.io/yalantinglibs/zh/struct_pack/struct_pack_type_system.html)
如果你的rpc参数或返回值类型不属于struct_pack的类型系统支持的类型我们也允许用户注册自己的结构体或者自定义序列化算法详见[自定义功能支持](https://alibaba.github.io/yalantinglibs/zh/struct_pack/struct_pack_intro.html#%E8%87%AA%E5%AE%9A%E4%B9%89%E7%B1%BB%E5%9E%8B%E7%9A%84%E5%BA%8F%E5%88%97%E5%8C%96)
## RPC返回值的构造与检查
此外对于回调函数coro_rpc会尝试通过参数列表构造返回值类型。如果无法构造则会导致编译失败。
```cpp
void echo(coro_rpc::context<std::string> ctx) {
ctx.response_msg(); //无法构造std::string。编译失败。
ctx.response_msg(42); //无法构造std::string。编译失败。
ctx.response_msg(42,'A'); //可以构造std::string编译通过。
ctx.response_msg("Hello"); //可以构造std::string编译通过。
return;
}
```
## SSL支持
coro_rpc支持使用openssl对连接进行加密。在安装openssl并使用cmake find_package/fetch_content 将yalantinglibs导入到你的工程后可以打开cmake选项`YLT_ENABLE_SSL=ON`启用ssl支持。或者你也可以手动添加宏`YLT_ENABLE_SSL`并手动链接openssl。
当启用ssl支持后用户可以调用`init_ssl`函数然后再连接到服务器。这会使得客户端与服务器之间建立加密的链接。需要注意的是coro_rpc服务端在编译时也必须启用ssl支持。
```cpp
coro_rpc_server server;
server.init_ssl({
.base_path = "./", // ssl文件的基本路径
.cert_file = "server.crt", // 证书相对于base_path的路径
.key_file = "server.key" // 私钥相对于base_path的路径
});
```
启用ssl支持后服务器将拒绝一切非ssl连接。
## 高级设置
我们提供了coro_rpc::config_t类用户可以通过该类型设置server的细节
```cpp
struct config_base {
bool is_enable_tcp_no_delay = true; /*tcp请求是否立即响应*/
uint16_t port = 9001; /*监听端口*/
unsigned thread_num = std::thread::hardware_concurrency(); /*rpc server内部使用的连接数默认为逻辑核数*/
std::chrono::steady_clock::duration conn_timeout_duration =
std::chrono::seconds{0}; /*rpc请求的超时时间0秒代表rpc请求不会自动超时*/
std::string address="0.0.0.0"; /*监听地址*/
/*下面设置只有启用SSL才有*/
std::optional<ssl_configure> ssl_config = std::nullopt; // 配置是否启用ssl
};
struct ssl_configure {
std::string base_path; // ssl文件的基本路径
std::string cert_file; // 证书相对于base_path的路径
std::string key_file; // 私钥相对于base_path的路径
std::string dh_file; // dh_file相对于base_path的路径(可选)
}
int start() {
coro_rpc::config_t config{};
coro_rpc_server server(config);
/*regist rpc function here... */
server.start();
}
```
## 特殊rpc函数的注册与调用
### 成员函数的注册与调用
coro_rpc支持注册并调用成员函数
例如,假如有以下函数:
```cpp
struct dummy {
std::string_view echo(std::string_view str) { return str; }
Lazy<std::string_view> coroutine_echo(std::string_view str) {co_return str;}
void callback_echo(coro_rpc::context</*return type = */ std::string_view> ctx, std::string_view str) {
ctx.response_msg(str);
}
};
```
则服务端可以这样注册这些函数。
```cpp
#include "rpc_service.h"
#include <ylt/coro_rpc/coro_rpc_server.hpp>
int main() {
coro_rpc_server server;
dummy d{};
server.register_handler<&dummy::echo,&dummy::coroutine_echo,&dummy::callback_echo>(&d); // 注册成员函数
server.start();
}
```
需要注意的时必须注意被注册的dummy类型的生命周期保证在服务器启动时dummy始终存活。否则调用行为是未定义的。
客户端可以这样调用这些函数:
```cpp
#include "rpc_service.h"
#include <coro_rpc/coro_rpc_client.hpp>
Lazy<void> test_client() {
coro_rpc_client client;
co_await client.connect("localhost", /*port =*/"9000");
//RPC调用
{
auto result = co_await client.call<&dummy::echo>("hello");
assert(result.value() == "hello");
}
{
auto result = co_await client.call<&dummy::coroutine_echo>("hello");
assert(result.value() == "hello");
}
{
auto result = co_await client.call<&dummy::callback_echo>("hello");
assert(result.value() == "hello");
}
}
```
### 特化的模板函数
coro_rpc允许用户注册并调用特化的模板函数。
例如,假如有以下函数:
```cpp
template<typename T>
T echo(T param) { return param; }
```
则服务端可以这样注册这些函数。
```cpp
#include <ylt/coro_rpc/coro_rpc_server.hpp>
using namespace coro_rpc;
int main() {
coro_rpc_server server;
server.register_handler<echo<int>,echo<std::string>,echo<std::vector<int>>>(&d); // 注册特化的模板函数
server.start();
}
```
客户端可以这样调用:
```cpp
using namespace coro_rpc;
using namespace async_simple::coro;
Lazy<void> rpc_call(coro_rpc_client& cli) {
assert(co_await cli.call<echo<int>>(42).value() == 42);
assert(co_await cli.call<echo<int>>("Hello").value() == "Hello");
assert(co_await cli.call<echo<int>>(std::vector{1,2,3}).value() == std::vector{1,2,3});
}
```