support lazy callback (#647)
This commit is contained in:
parent
b8e130c57f
commit
83c5dbf642
|
@ -42,6 +42,10 @@
|
||||||
|
|
||||||
namespace coro_io {
|
namespace coro_io {
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
constexpr inline bool is_lazy_v =
|
||||||
|
util::is_specialization_v<std::remove_cvref_t<T>, async_simple::coro::Lazy>;
|
||||||
|
|
||||||
template <typename Arg, typename Derived>
|
template <typename Arg, typename Derived>
|
||||||
class callback_awaitor_base {
|
class callback_awaitor_base {
|
||||||
private:
|
private:
|
||||||
|
@ -395,9 +399,64 @@ async_simple::coro::Lazy<std::pair<
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
inline decltype(auto) select_impl(T &pair) {
|
||||||
|
using Func = std::tuple_element_t<1, std::remove_cvref_t<T>>;
|
||||||
|
using ValueType =
|
||||||
|
typename std::tuple_element_t<0, std::remove_cvref_t<T>>::ValueType;
|
||||||
|
using return_type = std::invoke_result_t<Func, async_simple::Try<ValueType>>;
|
||||||
|
|
||||||
|
auto &callback = std::get<1>(pair);
|
||||||
|
if constexpr (coro_io::is_lazy_v<return_type>) {
|
||||||
|
auto executor = std::get<0>(pair).getExecutor();
|
||||||
|
return std::make_pair(
|
||||||
|
std::move(std::get<0>(pair)),
|
||||||
|
[executor, callback = std::move(callback)](auto &&val) {
|
||||||
|
if (executor) {
|
||||||
|
callback(std::move(val)).via(executor).start([](auto &&) {
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
callback(std::move(val)).start([](auto &&) {
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return pair;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
template <typename... T>
|
template <typename... T>
|
||||||
auto select(T &&...args) {
|
inline auto select(T &&...args) {
|
||||||
return async_simple::coro::collectAny(std::forward<T>(args)...);
|
return async_simple::coro::collectAny(select_impl(args)...);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename T, typename Callback>
|
||||||
|
inline auto select(std::vector<T> vec, Callback callback) {
|
||||||
|
if constexpr (coro_io::is_lazy_v<Callback>) {
|
||||||
|
std::vector<async_simple::Executor *> executors;
|
||||||
|
for (auto &lazy : vec) {
|
||||||
|
executors.push_back(lazy.getExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
|
return async_simple::coro::collectAny(
|
||||||
|
std::move(vec),
|
||||||
|
[executors, callback = std::move(callback)](size_t index, auto &&val) {
|
||||||
|
auto executor = executors[index];
|
||||||
|
if (executor) {
|
||||||
|
callback(index, std::move(val)).via(executor).start([](auto &&) {
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
callback(index, std::move(val)).start([](auto &&) {
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return async_simple::coro::collectAny(std::move(vec), std::move(callback));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Socket, typename AsioBuffer>
|
template <typename Socket, typename AsioBuffer>
|
||||||
|
|
|
@ -152,7 +152,7 @@ struct CollectAnyAwaiter {
|
||||||
auto count = e->downCount();
|
auto count = e->downCount();
|
||||||
if (count == size + 1) {
|
if (count == size + 1) {
|
||||||
r->_idx = i;
|
r->_idx = i;
|
||||||
(*callback)(i, std::move(result));
|
(void)(*callback)(i, std::move(result));
|
||||||
c.resume();
|
c.resume();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -222,7 +222,7 @@ struct CollectAnyVariadicPairAwaiter {
|
||||||
callback](auto&& res) mutable {
|
callback](auto&& res) mutable {
|
||||||
auto count = event->downCount();
|
auto count = event->downCount();
|
||||||
if (count == std::tuple_size<InputType>() + 1) {
|
if (count == std::tuple_size<InputType>() + 1) {
|
||||||
callback(std::move(res));
|
(void)callback(std::move(res));
|
||||||
*result = I;
|
*result = I;
|
||||||
continuation.resume();
|
continuation.resume();
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,7 +111,43 @@ async_simple::coro::Lazy<void> test_select_channel() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void callback_lazy() {
|
||||||
|
using namespace async_simple::coro;
|
||||||
|
auto test0 = []() mutable -> Lazy<int> {
|
||||||
|
co_return 41;
|
||||||
|
};
|
||||||
|
|
||||||
|
auto test1 = []() mutable -> Lazy<int> {
|
||||||
|
co_return 42;
|
||||||
|
};
|
||||||
|
|
||||||
|
auto collectAnyLazy = [](auto&&... args) mutable -> Lazy<size_t> {
|
||||||
|
co_return co_await collectAny(std::move(args)...);
|
||||||
|
};
|
||||||
|
|
||||||
|
syncAwait(
|
||||||
|
collectAnyLazy(std::pair{test1(), [&](auto&& val) mutable -> Lazy<void> {
|
||||||
|
CHECK(val.value() == 42);
|
||||||
|
int r = co_await test0();
|
||||||
|
int result = r + val.value();
|
||||||
|
CHECK(result == 83);
|
||||||
|
}}));
|
||||||
|
|
||||||
|
std::vector<Lazy<int>> input;
|
||||||
|
input.push_back(test1());
|
||||||
|
|
||||||
|
auto index = syncAwait(collectAnyLazy(
|
||||||
|
std::move(input), [&test0](size_t index, auto val) mutable -> Lazy<void> {
|
||||||
|
CHECK(val.value() == 42);
|
||||||
|
int r = co_await test0();
|
||||||
|
int result = r + val.value();
|
||||||
|
CHECK(result == 83);
|
||||||
|
}));
|
||||||
|
CHECK(index == 0);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_CASE("test channel send recieve, test select channel and coroutine") {
|
TEST_CASE("test channel send recieve, test select channel and coroutine") {
|
||||||
async_simple::coro::syncAwait(test_coro_channel());
|
async_simple::coro::syncAwait(test_coro_channel());
|
||||||
async_simple::coro::syncAwait(test_select_channel());
|
async_simple::coro::syncAwait(test_select_channel());
|
||||||
|
callback_lazy();
|
||||||
}
|
}
|
Loading…
Reference in New Issue