Replace thread detaching by thread joining before main's end (#1637)

Scopes lifetime of any thread used by libmamba to `main()`'s scope.

This fixes undefined behavior when threads are running after `main()`
is finished. At the end of `main()`, static objects are destroyed in
an unspecified order which can also lead easilly to undefined behaviors.
This change should fix both issues.

For now the implementation of `MainExecutor` is simple: it spawn a
new thread for each task scheduled and does not provide means to track
the progress and/or end of tasks. This reflects the previous behavior.
The intent is to enabled changing the implementation of `MainExecutor`
in the future to improve execution speed and/or execution resources
usage.
This commit is contained in:
Klaim (Joël Lamotte) 2022-04-27 12:01:28 +02:00 committed by GitHub
parent 53eb28d0a4
commit 51e917e124
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 303 additions and 21 deletions

View File

@ -155,6 +155,7 @@ set(LIBMAMBA_SOURCES
${LIBMAMBA_SOURCE_DIR}/core/validate.cpp
${LIBMAMBA_SOURCE_DIR}/core/virtual_packages.cpp
${LIBMAMBA_SOURCE_DIR}/core/env_lockfile.cpp
${LIBMAMBA_SOURCE_DIR}/core/execution.cpp
# API (high-level)
${LIBMAMBA_SOURCE_DIR}/api/c_api.cpp
${LIBMAMBA_SOURCE_DIR}/api/channel_loader.cpp

View File

@ -17,6 +17,6 @@ dependencies:
- reproc-cpp
- yaml-cpp
- termcolor-cpp
- cli11 2.1.2
- cli11 >=2.2
- spdlog
- sel(win): winreg

View File

@ -0,0 +1,118 @@
// Copyright (c) 2022, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#ifndef MAMBA_CORE_EXECUTION_HPP
#define MAMBA_CORE_EXECUTION_HPP
#include <vector>
#include <future>
#include <thread>
#include <atomic>
#include <mutex>
namespace mamba
{
class MainExecutorError : public std::runtime_error
{
using std::runtime_error::runtime_error;
};
// Main execution resource (for example threads) handler for this library.
// Allows scoping the lifetime of threads being used by the library.
// The user code can either create an instance of this type to determine
// itself the lifetime of the threads, or it can just use `MainExecutor::instance()`
// to obtain a global static instance. In this last case, `MainExecutor::instance().close()`
// have to be called before the end of `main()` to avoid undefined behaviors.
// WARNING: this is a temporary solution designed to evolve, the current implementation
// uses threads directly, a future implementation will use a thread-pool or other similar
// mechanisms.
class MainExecutor
{
public:
// Set itself as the main executor.
// Throws `MainExecutorError` if another instance already exists.
MainExecutor();
// Closes (see `close()`) and unregister itself as the main executor.
// Blocks until all scheduled tasks are done and all resources are released (threads
// joined).
~MainExecutor();
// Returns a reference to the current main executor.
// If no main executor have been set previously to this call,
// a global one is created and returned. In this case the user must
// call `MainExecutor::instance().close()` before the end of `main()` to avoid
// undefined behaviors.
static MainExecutor& instance();
// If the default (global) main executor is being used, close and destroy it.
// Do nothing otherwise.
// This is mostly used for testing and libraries using the default main executor.
static void stop_default();
// Schedules a task for execution.
// The task must be a callable which takes either the provided arguments or none.
// If this executor is open, the task is scheduled for execution and will be called
// as soon as execution resources are available. The call to the task is not garanteed
// to have been done at the end of the execution of this function, nor before.
// If this executor is closed, the task is ignored and no code will be executed nor the task
// be called.
template <typename Task, typename... Args>
void schedule(Task&& task, Args&&... args)
{
if (is_open)
{
std::scoped_lock lock{ mutex };
if (is_open) // Double check necessary for correctness
threads.emplace_back(std::forward<Task>(task), std::forward<Args>(args)...);
}
}
// Moves ownership of a thread into this executor.
// This is used in case a thread needs to be manipulated in a particular way,
// but we still want to avoid having to use `std::thread::detach()`. By
// transfering the ownership of the thread to this executor, we are guaranteed that
// the thread will be joined before the end of the lifetime of this executor.
// If this executor is closed, no code will be executed and the thread will be destroyed,
// resulting in a call to `std::terminate()` if the thread is not already joined.
void take_ownership(std::thread thread)
{
if (thread.joinable() && is_open)
{
std::scoped_lock lock{ mutex };
if (is_open) // Double check necessary for correctness
threads.push_back(std::move(thread));
}
}
// Closes this executor:
// Only returns once all tasks scheduled before this call are finished
// and all owned execution resources (aka threads) are released.
// Note that if any task never ends, this function will never end either.
// Once called this function makes all other functions no-op, even before returning, to
// prevent running tasks from scheduling more tasks to run. This is should be used to
// manually determine the lifetime of the executor's resources.
void close()
{
is_open = false;
std::scoped_lock lock{ mutex };
for (auto&& t : threads)
t.join();
threads.clear();
}
private:
std::atomic<bool> is_open{ true };
std::vector<std::thread> threads;
std::recursive_mutex mutex;
};
}
#endif

View File

@ -90,6 +90,11 @@ namespace mamba
void detach();
std::thread::native_handle_type native_handle();
std::thread extract()
{
return std::move(m_thread);
}
private:
std::thread m_thread;
};

View File

@ -0,0 +1,49 @@
#include "mamba/core/execution.hpp"
#include <atomic>
#include <mutex>
#include <cassert>
namespace mamba
{
static std::atomic<MainExecutor*> main_executor{ nullptr };
static std::mutex default_executor_mutex; // TODO: replace by sychronized_value once available
static std::unique_ptr<MainExecutor> default_executor;
MainExecutor& MainExecutor::instance()
{
if (!main_executor)
{
// When no MainExecutor was created before we create a static one.
std::scoped_lock lock{ default_executor_mutex };
if (!main_executor) // double check necessary to avoid data race
{
default_executor = std::make_unique<MainExecutor>();
assert(main_executor == default_executor.get());
}
}
return *main_executor;
}
void MainExecutor::stop_default()
{
std::scoped_lock lock{ default_executor_mutex };
default_executor.reset();
}
MainExecutor::MainExecutor()
{
MainExecutor* expected = nullptr;
if (!main_executor.compare_exchange_strong(expected, this))
throw MainExecutorError("attempted to create multiple main executors");
}
MainExecutor::~MainExecutor()
{
close();
main_executor = nullptr;
}
}

View File

@ -9,11 +9,12 @@
#include <iostream>
#include <iomanip>
#include <utility>
#include <thread>
#include <limits>
#include <random>
#include <sstream>
#include "mamba/core/execution.hpp"
namespace cursor
{
class CursorMovementTriple
@ -968,8 +969,7 @@ namespace mamba
start();
m_marked_to_terminate = false;
m_watch_print_started = true;
std::thread t([&]() { run(); });
t.detach();
MainExecutor::instance().schedule([&] { run(); });
}
void ProgressBarManager::start()
@ -1309,11 +1309,12 @@ namespace mamba
pause();
set_full();
time_point_t stop_time_point = now() + delay;
const time_point_t stop_time_point
= now() + delay; // FIXME: can be captured by the lambda?
if (delay.count())
{
std::thread t(
MainExecutor::instance().schedule(
[&](const time_point_t& stop_time_point)
{
std::lock_guard<std::mutex> lock(m_mutex);
@ -1324,7 +1325,6 @@ namespace mamba
stop();
},
stop_time_point);
t.detach();
}
else
{

View File

@ -15,8 +15,10 @@
#include "mamba/core/output.hpp"
#include "mamba/core/pool.hpp"
#include "mamba/core/thread_utils.hpp"
#include "mamba/core/execution.hpp"
#include "mamba/core/util_scope.hpp"
#include "termcolor/termcolor.hpp"
extern "C"
@ -320,8 +322,7 @@ namespace mamba
}
LOG_INFO << "Download finished, validating '" << m_tarball_path.string() << "'";
thread v(&PackageDownloadExtractTarget::validate_extract, this);
v.detach();
MainExecutor::instance().schedule(&PackageDownloadExtractTarget::validate_extract, this);
return true;
}
@ -388,8 +389,8 @@ namespace mamba
m_tarball_path = tarball_cache / m_filename;
m_validation_result = VALIDATION_RESULT::VALID;
thread v(&PackageDownloadExtractTarget::extract_from_cache, this);
v.detach();
MainExecutor::instance().schedule(&PackageDownloadExtractTarget::extract_from_cache,
this);
LOG_DEBUG << "Using cached tarball '" << m_filename << "'";
return nullptr;
}

View File

@ -42,6 +42,7 @@ extern "C"
#include "mamba/core/util.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/thread_utils.hpp"
#include "mamba/core/execution.hpp"
#include "mamba/core/util_os.hpp"
#include "mamba/core/util_random.hpp"
@ -1021,7 +1022,7 @@ namespace mamba
return signum;
});
t.detach();
MainExecutor::instance().take_ownership(std::move(t.extract()));
{
std::unique_lock<std::mutex> l(m);

View File

@ -27,6 +27,7 @@ set(TEST_SRCS
test_virtual_packages.cpp
test_util.cpp
test_env_lockfile.cpp
test_execution.cpp
)
message(STATUS "Building libmamba C++ tests")

View File

@ -0,0 +1,98 @@
#include <gtest/gtest.h>
#include <mamba/core/execution.hpp>
namespace mamba
{
// Spawns a number of threads that will execute the provided task a given number of times.
// This is useful to make sure there are great chances that the tasks
// are being scheduled concurrently.
// Joins all threads before exiting.
template <typename Func>
void execute_tasks_from_concurrent_threads(size_t task_count,
size_t tasks_per_thread,
Func work)
{
std::vector<std::thread> producers;
size_t tasks_left_to_launch = task_count;
while (tasks_left_to_launch > 0)
{
const size_t tasks_to_generate = std::min(tasks_per_thread, tasks_left_to_launch);
producers.emplace_back(
[=]
{
for (int i = 0; i < tasks_to_generate; ++i)
{
work();
}
});
tasks_left_to_launch -= tasks_to_generate;
}
for (auto&& t : producers)
t.join(); // Make sure all the producers are finished before continuing.
}
TEST(execution, stop_default_always_succeeds)
{
MainExecutor::stop_default(); // Make sure no other default main executor is running.
MainExecutor::instance(); // Make sure we use the defaut main executor.
MainExecutor::stop_default(); // Stop the default main executor and make sure it's not
// enabled for the following tests.
MainExecutor::stop_default(); // However the number of time we call it it should never
// fail.
}
TEST(execution, manual_executor_construction_destruction)
{
MainExecutor executor;
}
TEST(execution, two_main_executors_fails)
{
MainExecutor executor;
ASSERT_THROW(MainExecutor{}, MainExecutorError);
}
TEST(execution, tasks_complete_before_destruction_ends)
{
const size_t arbitrary_task_count = 2048;
const size_t arbitrary_tasks_per_generator = 24;
std::atomic<int> counter{ 0 };
{
MainExecutor executor;
execute_tasks_from_concurrent_threads(arbitrary_task_count,
arbitrary_tasks_per_generator,
[&] { executor.schedule([&] { ++counter; }); });
} // All threads from the executor must have been joined here.
EXPECT_EQ(counter, arbitrary_task_count);
}
TEST(execution, closed_prevents_more_scheduling_and_joins)
{
const size_t arbitrary_task_count = 2048;
const size_t arbitrary_tasks_per_generator = 36;
std::atomic<int> counter{ 0 };
{
MainExecutor executor;
execute_tasks_from_concurrent_threads(arbitrary_task_count,
arbitrary_tasks_per_generator,
[&] { executor.schedule([&] { ++counter; }); });
executor.close();
EXPECT_EQ(counter, arbitrary_task_count);
execute_tasks_from_concurrent_threads(
arbitrary_task_count,
arbitrary_tasks_per_generator,
[&] { executor.schedule([&] { throw "this code must never be executed"; }); });
}
EXPECT_EQ(counter,
arbitrary_task_count); // We re-check to make sure no thread are executed anymore
// as soon as `.close()` was called.
}
}

View File

@ -3,6 +3,7 @@
#include "mamba/core/context.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/thread_utils.hpp"
#include "mamba/core/execution.hpp"
namespace mamba
{
@ -34,16 +35,15 @@ namespace mamba
for (size_t i = 0; i < 5; ++i)
{
mamba::thread t(
[&res]()
MainExecutor::instance().take_ownership(mamba::thread{
[&res]
{
{
std::unique_lock<std::mutex> lk(res_mutex);
++res;
}
std::this_thread::sleep_for(std::chrono::milliseconds(300));
});
t.detach();
} }.extract());
}
if (interrupt)
{

View File

@ -1,10 +1,9 @@
#include <gtest/gtest.h>
#include <thread>
#include "mamba/core/util.hpp"
#include "mamba/core/util_random.hpp"
#include "mamba/core/util_scope.hpp"
#include "mamba/core/execution.hpp"
namespace mamba

View File

@ -17,7 +17,7 @@ dependencies:
- reproc-cpp
- yaml-cpp
- termcolor-cpp
- cli11 2.1.2
- cli11 >=2.2
- spdlog
- pybind11
- pytest

View File

@ -28,6 +28,7 @@
#include "mamba/core/validate.hpp"
#include "mamba/core/virtual_packages.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/execution.hpp"
#include <stdexcept>
@ -48,6 +49,11 @@ PYBIND11_MODULE(bindings, m)
{
using namespace mamba;
// Close and destroy the main executor as soon as we are done.
// Closing makes sure all threads used by this library are done before the end
// of the program.
m.add_object("_cleanup", py::capsule([] { mamba::MainExecutor::stop_default(); }));
py::class_<fs::path>(m, "Path")
.def(py::init<std::string>())
.def("__str__", [](fs::path& self) -> std::string { return self.string(); })

View File

@ -13,6 +13,7 @@
#include "mamba/core/context.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/thread_utils.hpp"
#include "mamba/core/execution.hpp"
#include "mamba/core/util_os.hpp"
#include <CLI/CLI.hpp>
@ -24,6 +25,8 @@ using namespace mamba; // NOLINT(build/namespaces)
int
main(int argc, char** argv)
{
mamba::MainExecutor scoped_threads;
init_console();
auto& ctx = Context::instance();

View File

@ -12,6 +12,7 @@
#include "mamba/api/install.hpp"
#include "mamba/core/util_os.hpp"
#include "mamba/core/util_random.hpp"
#include "mamba/core/execution.hpp"
#include <nlohmann/json.hpp>
@ -494,7 +495,7 @@ set_run_command(CLI::App* subcom)
}
#ifndef _WIN32
std::thread t(
MainExecutor::instance().schedule(
[]()
{
signal(
@ -511,7 +512,6 @@ set_run_command(CLI::App* subcom)
proc.stop(sa);
});
});
t.detach();
#endif
// check if we need this