mirror of https://github.com/mamba-org/mamba.git
maint: use `synchronized_value` where we use a mutex to protect data (#3992)
This commit is contained in:
parent
a91898370c
commit
14a94d379a
|
@ -14,6 +14,7 @@
|
|||
#include <vector>
|
||||
|
||||
#include "mamba/core/error_handling.hpp"
|
||||
#include "mamba/util/synchronized_value.hpp"
|
||||
|
||||
namespace mamba
|
||||
{
|
||||
|
@ -71,10 +72,10 @@ namespace mamba
|
|||
return;
|
||||
}
|
||||
|
||||
std::scoped_lock lock{ threads_mutex };
|
||||
auto synched_threads = threads.synchronize();
|
||||
if (is_open) // Double check necessary for correctness
|
||||
{
|
||||
threads.emplace_back(std::forward<Task>(task), std::forward<Args>(args)...);
|
||||
synched_threads->emplace_back(std::forward<Task>(task), std::forward<Args>(args)...);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -92,10 +93,10 @@ namespace mamba
|
|||
return;
|
||||
}
|
||||
|
||||
std::scoped_lock lock{ threads_mutex };
|
||||
auto synched_threads = threads.synchronize();
|
||||
if (is_open) // Double check necessary for correctness
|
||||
{
|
||||
threads.push_back(std::move(thread));
|
||||
synched_threads->push_back(std::move(thread));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -116,12 +117,12 @@ namespace mamba
|
|||
|
||||
invoke_close_handlers();
|
||||
|
||||
std::scoped_lock lock{ threads_mutex };
|
||||
for (auto&& t : threads)
|
||||
auto synched_threads = threads.synchronize();
|
||||
for (auto&& t : *synched_threads)
|
||||
{
|
||||
t.join();
|
||||
}
|
||||
threads.clear();
|
||||
synched_threads->clear();
|
||||
}
|
||||
|
||||
using on_close_handler = std::function<void()>;
|
||||
|
@ -133,21 +134,20 @@ namespace mamba
|
|||
return;
|
||||
}
|
||||
|
||||
std::scoped_lock lock{ handlers_mutex };
|
||||
auto handlers = close_handlers.synchronize();
|
||||
if (is_open) // Double check needed to avoid adding new handles while closing.
|
||||
{
|
||||
close_handlers.push_back(std::move(handler));
|
||||
handlers->push_back(std::move(handler));
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
std::atomic<bool> is_open{ true };
|
||||
std::vector<std::thread> threads;
|
||||
std::recursive_mutex threads_mutex; // TODO: replace by synchronized_value once available
|
||||
|
||||
std::vector<on_close_handler> close_handlers;
|
||||
std::recursive_mutex handlers_mutex; // TODO: replace by synchronized_value once available
|
||||
using Threads = std::vector<std::thread>;
|
||||
using CloseHandlers = std::vector<on_close_handler>;
|
||||
util::synchronized_value<Threads, std::recursive_mutex> threads;
|
||||
util::synchronized_value<CloseHandlers, std::recursive_mutex> close_handlers;
|
||||
|
||||
void invoke_close_handlers();
|
||||
};
|
||||
|
|
|
@ -186,7 +186,7 @@ namespace mamba
|
|||
|
||||
inline void counting_semaphore::lock()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_access_mutex);
|
||||
std::unique_lock lock{ m_access_mutex };
|
||||
m_cv.wait(lock, [&]() { return m_value > 0; });
|
||||
--m_value;
|
||||
}
|
||||
|
@ -194,7 +194,7 @@ namespace mamba
|
|||
inline void counting_semaphore::unlock()
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_access_mutex);
|
||||
std::unique_lock lock{ m_access_mutex };
|
||||
if (++m_value <= 0)
|
||||
{
|
||||
return;
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
@ -17,6 +16,7 @@
|
|||
#include <fmt/core.h>
|
||||
|
||||
#include "mamba/download/request.hpp"
|
||||
#include "mamba/util/synchronized_value.hpp"
|
||||
|
||||
namespace mamba::download
|
||||
{
|
||||
|
@ -66,6 +66,16 @@ namespace mamba::download
|
|||
MirrorRequest& operator=(MirrorRequest&&) = default;
|
||||
};
|
||||
|
||||
struct MirrorStats // Moved out of Mirror internals because of compilers not agreeing:
|
||||
// https://godbolt.org/z/GcjWhrb9W
|
||||
{
|
||||
std::optional<std::size_t> allowed_connections = std::nullopt;
|
||||
std::size_t max_tried_connections = 0;
|
||||
std::size_t running_transfers = 0;
|
||||
std::size_t successful_transfers = 0;
|
||||
std::size_t failed_transfers = 0;
|
||||
};
|
||||
|
||||
// A Mirror represents a location from where an asset can be downloaded.
|
||||
// It handles the generation of required requests to get the asset, and
|
||||
// provides some statistics about its usage.
|
||||
|
@ -109,16 +119,13 @@ namespace mamba::download
|
|||
MirrorID m_id;
|
||||
size_t m_max_retries;
|
||||
|
||||
// TODO: use synchronized value
|
||||
std::mutex m_stats_mutex;
|
||||
std::optional<std::size_t> m_allowed_connections = std::nullopt;
|
||||
std::size_t m_max_tried_connections = 0;
|
||||
std::size_t m_running_transfers = 0;
|
||||
std::size_t m_successful_transfers = 0;
|
||||
std::size_t m_failed_transfers = 0;
|
||||
static_assert(std::default_initializable<MirrorStats>);
|
||||
|
||||
util::synchronized_value<MirrorStats> m_stats;
|
||||
};
|
||||
|
||||
std::unique_ptr<Mirror> make_mirror(std::string url);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -8,9 +8,11 @@
|
|||
#define MAMBA_UTIL_SYNCHRONIZED_VALUE_HPP
|
||||
|
||||
#include <concepts>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <tuple>
|
||||
#include <utility>
|
||||
|
||||
namespace mamba::util
|
||||
{
|
||||
|
@ -35,6 +37,26 @@ namespace mamba::util
|
|||
template <class T, template <class...> class U>
|
||||
constexpr bool is_type_instance_of_v = is_type_instance_of<T, U>::value;
|
||||
|
||||
/// `true` if the instances of two provided types can be compared with operator==.
|
||||
/// Notice that this concept is less restrictive than `std::equality_comparable_with`,
|
||||
/// which requires the existence of a common reference type for T and U. This additional
|
||||
/// restriction makes it impossible to use it in the context here (originally of sparrow), where
|
||||
/// we want to compare objects that are logically similar while being "physically" different.
|
||||
// Source:
|
||||
// https://github.com/man-group/sparrow/blob/66f70418cf1b00cc294c99bbbe04b5b4d2f83c98/include/sparrow/utils/mp_utils.hpp#L604-L619
|
||||
|
||||
template <class T, class U>
|
||||
concept weakly_equality_comparable_with = requires(
|
||||
const std::remove_reference_t<T>& t,
|
||||
const std::remove_reference_t<U>& u
|
||||
) {
|
||||
{ t == u } -> std::convertible_to<bool>;
|
||||
{ t != u } -> std::convertible_to<bool>;
|
||||
{ u == t } -> std::convertible_to<bool>;
|
||||
{ u != t } -> std::convertible_to<bool>;
|
||||
};
|
||||
|
||||
|
||||
/////////////////////////////
|
||||
|
||||
|
||||
|
@ -256,7 +278,9 @@ namespace mamba::util
|
|||
|
||||
/// Constructs with a provided value as initializer for the stored object.
|
||||
template <typename V>
|
||||
requires std::assignable_from<T&, V> and (not std::same_as<this_type, std::decay_t<V>>)
|
||||
requires(not std::same_as<T, std::decay_t<V>>)
|
||||
and (not std::same_as<this_type, std::decay_t<V>>)
|
||||
and std::assignable_from<T&, V>
|
||||
synchronized_value(V&& value) noexcept
|
||||
: m_value(std::forward<V>(value))
|
||||
{
|
||||
|
@ -266,6 +290,11 @@ namespace mamba::util
|
|||
// the definition here.
|
||||
}
|
||||
|
||||
/// Constructs with a provided value as initializer for the stored object.
|
||||
// NOTE: this is redundant with the generic impl, but required to workaround
|
||||
// apple-clang failing to properly constrain the generic impl.
|
||||
synchronized_value(T value) noexcept;
|
||||
|
||||
/// Constructs with a provided initializer list used to initialize the stored object.
|
||||
template <typename V>
|
||||
requires std::constructible_from<T, std::initializer_list<V>>
|
||||
|
@ -284,14 +313,17 @@ namespace mamba::util
|
|||
the call. If `SharedMutex<M> == true`, the lock is a shared-lock for the provided
|
||||
`synchronized_value`'s mutex.
|
||||
*/
|
||||
template <std::equality_comparable_with<T> U, Mutex OtherMutex>
|
||||
template <std::default_initializable U, Mutex OtherMutex>
|
||||
requires std::assignable_from<T&, U>
|
||||
auto operator=(const synchronized_value<U, OtherMutex>& other) -> synchronized_value&;
|
||||
|
||||
/** Locks and assign the provided value to the stored object.
|
||||
The lock is released before the end of the call.
|
||||
*/
|
||||
template <typename V>
|
||||
requires std::assignable_from<T&, V> and (not std::same_as<this_type, std::decay_t<V>>)
|
||||
requires(not std::same_as<T, std::decay_t<V>>)
|
||||
and (not std::same_as<this_type, std::decay_t<V>>)
|
||||
and std::assignable_from<T&, V>
|
||||
auto operator=(V&& value) noexcept -> synchronized_value&
|
||||
{
|
||||
// NOTE: when moving the definition outside the class,
|
||||
|
@ -303,6 +335,13 @@ namespace mamba::util
|
|||
return *this;
|
||||
}
|
||||
|
||||
/** Locks and assign the provided value to the stored object.
|
||||
The lock is released before the end of the call.
|
||||
*/
|
||||
// NOTE: this is redundant with the generic impl, but required to workaround
|
||||
// apple-clang failing to properly constrain the generic impl.
|
||||
auto operator=(const T& value) noexcept -> synchronized_value&;
|
||||
|
||||
/** Locks and return the value of the current object.
|
||||
The lock is released before the end of the call.
|
||||
If `SharedMutex<M> == true`, the lock is a shared-lock.
|
||||
|
@ -465,12 +504,12 @@ namespace mamba::util
|
|||
/** Locks (shared if possible) and compare equality of the stored object's value with the
|
||||
provided value.
|
||||
*/
|
||||
auto operator==(const std::equality_comparable_with<T> auto& other_value) const -> bool;
|
||||
auto operator==(const weakly_equality_comparable_with<T> auto& other_value) const -> bool;
|
||||
|
||||
/** Locks both (shared if possible) and compare equality of the stored object's value with
|
||||
the provided value.
|
||||
*/
|
||||
template <std::equality_comparable_with<T> U, Mutex OtherMutex>
|
||||
template <weakly_equality_comparable_with<T> U, Mutex OtherMutex>
|
||||
auto operator==(const synchronized_value<U, OtherMutex>& other_value) const -> bool;
|
||||
|
||||
auto swap(synchronized_value& other) -> void;
|
||||
|
@ -508,6 +547,12 @@ namespace mamba::util
|
|||
template <std::default_initializable T, Mutex M>
|
||||
synchronized_value<T, M>::synchronized_value() noexcept(std::is_nothrow_default_constructible_v<T>) = default;
|
||||
|
||||
template <std::default_initializable T, Mutex M>
|
||||
synchronized_value<T, M>::synchronized_value(T value) noexcept
|
||||
: m_value(std::move(value))
|
||||
{
|
||||
}
|
||||
|
||||
template <std::default_initializable T, Mutex M>
|
||||
synchronized_value<T, M>::synchronized_value(const synchronized_value& other)
|
||||
{
|
||||
|
@ -516,7 +561,8 @@ namespace mamba::util
|
|||
}
|
||||
|
||||
template <std::default_initializable T, Mutex M>
|
||||
template <std::equality_comparable_with<T> U, Mutex OtherMutex>
|
||||
template <std::default_initializable U, Mutex OtherMutex>
|
||||
requires std::assignable_from<T&, U>
|
||||
auto synchronized_value<T, M>::operator=(const synchronized_value<U, OtherMutex>& other)
|
||||
-> synchronized_value<T, M>&
|
||||
{
|
||||
|
@ -526,6 +572,14 @@ namespace mamba::util
|
|||
return *this;
|
||||
}
|
||||
|
||||
template <std::default_initializable T, Mutex M>
|
||||
auto synchronized_value<T, M>::operator=(const T& value) noexcept -> synchronized_value&
|
||||
{
|
||||
auto _ = lock_as_exclusive(m_mutex);
|
||||
m_value = value;
|
||||
return *this;
|
||||
}
|
||||
|
||||
template <std::default_initializable T, Mutex M>
|
||||
template <typename V>
|
||||
requires std::constructible_from<T, std::initializer_list<V>>
|
||||
|
@ -584,7 +638,8 @@ namespace mamba::util
|
|||
}
|
||||
|
||||
template <std::default_initializable T, Mutex M>
|
||||
auto synchronized_value<T, M>::operator==(const std::equality_comparable_with<T> auto& other_value
|
||||
auto
|
||||
synchronized_value<T, M>::operator==(const weakly_equality_comparable_with<T> auto& other_value
|
||||
) const -> bool
|
||||
{
|
||||
auto _ = lock_as_readonly(m_mutex);
|
||||
|
@ -592,7 +647,7 @@ namespace mamba::util
|
|||
}
|
||||
|
||||
template <std::default_initializable T, Mutex M>
|
||||
template <std::equality_comparable_with<T> U, Mutex OtherMutex>
|
||||
template <weakly_equality_comparable_with<T> U, Mutex OtherMutex>
|
||||
auto
|
||||
synchronized_value<T, M>::operator==(const synchronized_value<U, OtherMutex>& other_value) const
|
||||
-> bool
|
||||
|
@ -623,7 +678,6 @@ namespace mamba::util
|
|||
{
|
||||
return std::make_tuple(std::forward<SynchronizedValues>(sync_values).synchronize()...);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
|
@ -8,8 +8,8 @@ namespace mamba
|
|||
|
||||
void MainExecutor::invoke_close_handlers()
|
||||
{
|
||||
std::scoped_lock lock{ handlers_mutex };
|
||||
for (auto&& handler : close_handlers)
|
||||
auto synched_handlers = close_handlers.synchronize();
|
||||
for (auto&& handler : *synched_handlers)
|
||||
{
|
||||
const auto result = safe_invoke(handler);
|
||||
if (!result)
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "mamba/core/util.hpp"
|
||||
#include "mamba/specs/conda_url.hpp"
|
||||
#include "mamba/util/string.hpp"
|
||||
#include "mamba/util/synchronized_value.hpp"
|
||||
#include "mamba/util/url_manip.hpp"
|
||||
|
||||
#include "progress_bar_impl.hpp"
|
||||
|
@ -272,6 +273,9 @@ namespace mamba
|
|||
* Console *
|
||||
***********/
|
||||
|
||||
|
||||
using ConsoleBuffer = std::vector<std::string>;
|
||||
|
||||
class ConsoleData
|
||||
{
|
||||
public:
|
||||
|
@ -281,23 +285,33 @@ namespace mamba
|
|||
{
|
||||
}
|
||||
|
||||
ConsoleData(const ConsoleData&) = delete;
|
||||
ConsoleData& operator=(const ConsoleData&) = delete;
|
||||
|
||||
ConsoleData(ConsoleData&&) noexcept = delete;
|
||||
ConsoleData& operator=(ConsoleData&&) noexcept = delete;
|
||||
|
||||
|
||||
const Context& m_context;
|
||||
|
||||
std::mutex m_mutex;
|
||||
std::unique_ptr<ProgressBarManager> p_progress_bar_manager;
|
||||
|
||||
std::string json_hier;
|
||||
unsigned int json_index;
|
||||
unsigned int json_index = 0;
|
||||
nlohmann::json json_log;
|
||||
bool is_json_print_cancelled = false;
|
||||
|
||||
std::vector<std::string> m_buffer;
|
||||
struct Data
|
||||
{
|
||||
std::unique_ptr<ProgressBarManager> progress_bar_manager;
|
||||
ConsoleBuffer buffer;
|
||||
};
|
||||
|
||||
util::synchronized_value<Data> m_synched_data;
|
||||
|
||||
TaskSynchronizer m_tasksync;
|
||||
};
|
||||
|
||||
Console::Console(const Context& context)
|
||||
: p_data(new ConsoleData{ context })
|
||||
: p_data(std::make_unique<ConsoleData>(context))
|
||||
{
|
||||
set_singleton(*this);
|
||||
|
||||
|
@ -346,11 +360,11 @@ namespace mamba
|
|||
{
|
||||
if (force_print || !(context().output_params.quiet || context().output_params.json))
|
||||
{
|
||||
const std::lock_guard<std::mutex> lock(p_data->m_mutex);
|
||||
auto synched_data = p_data->m_synched_data.synchronize();
|
||||
|
||||
if (p_data->p_progress_bar_manager && p_data->p_progress_bar_manager->started())
|
||||
if (synched_data->progress_bar_manager && synched_data->progress_bar_manager->started())
|
||||
{
|
||||
p_data->m_buffer.push_back(hide_secrets(str));
|
||||
synched_data->buffer.push_back(hide_secrets(str));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -362,12 +376,8 @@ namespace mamba
|
|||
void Console::print_buffer(std::ostream& ostream)
|
||||
{
|
||||
auto& data = instance().p_data;
|
||||
decltype(data->m_buffer) tmp;
|
||||
|
||||
{
|
||||
const std::lock_guard<std::mutex> lock(data->m_mutex);
|
||||
data->m_buffer.swap(tmp);
|
||||
}
|
||||
ConsoleBuffer tmp;
|
||||
data->m_synched_data->buffer.swap(tmp);
|
||||
|
||||
for (const auto& message : tmp)
|
||||
{
|
||||
|
@ -436,11 +446,11 @@ namespace mamba
|
|||
}
|
||||
else
|
||||
{
|
||||
return p_data->p_progress_bar_manager->add_progress_bar(
|
||||
return p_data->m_synched_data->progress_bar_manager->add_progress_bar(
|
||||
name,
|
||||
{
|
||||
/* .graphics = */ context().graphics_params,
|
||||
/* .ascii_only = */ context().ascii_only,
|
||||
.graphics = context().graphics_params,
|
||||
.ascii_only = context().ascii_only,
|
||||
},
|
||||
expected_total
|
||||
);
|
||||
|
@ -449,31 +459,35 @@ namespace mamba
|
|||
|
||||
void Console::clear_progress_bars()
|
||||
{
|
||||
return p_data->p_progress_bar_manager->clear_progress_bars();
|
||||
return p_data->m_synched_data->progress_bar_manager->clear_progress_bars();
|
||||
}
|
||||
|
||||
ProgressBarManager& Console::init_progress_bar_manager(ProgressBarMode mode)
|
||||
{
|
||||
p_data->p_progress_bar_manager = make_progress_bar_manager(mode);
|
||||
p_data->p_progress_bar_manager->register_print_hook(Console::print_buffer);
|
||||
p_data->p_progress_bar_manager->register_print_hook(MessageLogger::print_buffer);
|
||||
p_data->p_progress_bar_manager->register_pre_start_hook(MessageLogger::activate_buffer);
|
||||
p_data->p_progress_bar_manager->register_post_stop_hook(MessageLogger::deactivate_buffer);
|
||||
auto new_progress_bar_manager = make_progress_bar_manager(mode);
|
||||
new_progress_bar_manager->register_print_hook(Console::print_buffer);
|
||||
new_progress_bar_manager->register_print_hook(MessageLogger::print_buffer);
|
||||
new_progress_bar_manager->register_pre_start_hook(MessageLogger::activate_buffer);
|
||||
new_progress_bar_manager->register_post_stop_hook(MessageLogger::deactivate_buffer);
|
||||
|
||||
return *(p_data->p_progress_bar_manager);
|
||||
auto synched_data = p_data->m_synched_data.synchronize();
|
||||
synched_data->progress_bar_manager = std::move(new_progress_bar_manager);
|
||||
|
||||
return *(synched_data->progress_bar_manager); // unsafe!
|
||||
}
|
||||
|
||||
void Console::terminate_progress_bar_manager()
|
||||
{
|
||||
if (p_data->p_progress_bar_manager)
|
||||
auto synched_data = p_data->m_synched_data.synchronize();
|
||||
if (synched_data->progress_bar_manager)
|
||||
{
|
||||
p_data->p_progress_bar_manager->terminate();
|
||||
synched_data->progress_bar_manager->terminate();
|
||||
}
|
||||
}
|
||||
|
||||
ProgressBarManager& Console::progress_bar_manager()
|
||||
{
|
||||
return *(p_data->p_progress_bar_manager);
|
||||
return *(p_data->m_synched_data->progress_bar_manager);
|
||||
}
|
||||
|
||||
void Console::json_print()
|
||||
|
@ -543,12 +557,10 @@ namespace mamba
|
|||
* MessageLogger *
|
||||
*****************/
|
||||
|
||||
struct MessageLoggerData
|
||||
{
|
||||
static std::mutex m_mutex;
|
||||
static bool use_buffer;
|
||||
static std::vector<std::pair<std::string, log_level>> m_buffer;
|
||||
};
|
||||
static std::atomic<bool> message_logger_use_buffer;
|
||||
|
||||
using MessageLoggerBuffer = std::vector<std::pair<std::string, log_level>>;
|
||||
static util::synchronized_value<MessageLoggerBuffer> message_logger_buffer;
|
||||
|
||||
MessageLogger::MessageLogger(log_level level)
|
||||
: m_level(level)
|
||||
|
@ -558,14 +570,13 @@ namespace mamba
|
|||
|
||||
MessageLogger::~MessageLogger()
|
||||
{
|
||||
if (!MessageLoggerData::use_buffer && Console::is_available())
|
||||
if (!message_logger_use_buffer && Console::is_available())
|
||||
{
|
||||
emit(m_stream.str(), m_level);
|
||||
}
|
||||
else
|
||||
{
|
||||
const std::lock_guard<std::mutex> lock(MessageLoggerData::m_mutex);
|
||||
MessageLoggerData::m_buffer.push_back({ m_stream.str(), m_level });
|
||||
message_logger_buffer->push_back({ m_stream.str(), m_level });
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -608,22 +619,18 @@ namespace mamba
|
|||
|
||||
void MessageLogger::activate_buffer()
|
||||
{
|
||||
MessageLoggerData::use_buffer = true;
|
||||
message_logger_use_buffer = true;
|
||||
}
|
||||
|
||||
void MessageLogger::deactivate_buffer()
|
||||
{
|
||||
MessageLoggerData::use_buffer = false;
|
||||
message_logger_use_buffer = false;
|
||||
}
|
||||
|
||||
void MessageLogger::print_buffer(std::ostream& /*ostream*/)
|
||||
{
|
||||
decltype(MessageLoggerData::m_buffer) tmp;
|
||||
|
||||
{
|
||||
const std::lock_guard<std::mutex> lock(MessageLoggerData::m_mutex);
|
||||
MessageLoggerData::m_buffer.swap(tmp);
|
||||
}
|
||||
MessageLoggerBuffer tmp;
|
||||
message_logger_buffer->swap(tmp);
|
||||
|
||||
for (const auto& [msg, level] : tmp)
|
||||
{
|
||||
|
|
|
@ -458,7 +458,7 @@ namespace mamba
|
|||
void PackageFetcher::update_urls_txt() const
|
||||
{
|
||||
// TODO: check if this lock is really required
|
||||
std::lock_guard<std::mutex> lock(urls_txt_mutex);
|
||||
std::unique_lock lock{ urls_txt_mutex };
|
||||
const auto urls_file_path = m_cache_path / "urls.txt";
|
||||
std::ofstream urls_txt(urls_file_path.std_path(), std::ios::app);
|
||||
urls_txt << url() << std::endl;
|
||||
|
|
|
@ -740,7 +740,7 @@ namespace mamba
|
|||
void extract(const fs::u8path& file, const fs::u8path& dest, const ExtractOptions& options)
|
||||
{
|
||||
static std::mutex extract_mutex;
|
||||
std::lock_guard<std::mutex> lock(extract_mutex);
|
||||
std::unique_lock lock{ extract_mutex };
|
||||
|
||||
if (util::ends_with(file.string(), ".tar.bz2"))
|
||||
{
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#include <mutex>
|
||||
|
||||
#include "mamba/util/build.hpp"
|
||||
#include "mamba/util/synchronized_value.hpp"
|
||||
|
||||
extern "C"
|
||||
{
|
||||
|
@ -105,19 +106,18 @@ namespace mamba
|
|||
|
||||
static std::atomic<MainExecutor*> main_executor{ nullptr };
|
||||
|
||||
static std::unique_ptr<MainExecutor> default_executor;
|
||||
static std::mutex default_executor_mutex; // TODO: replace by synchronized_value once available
|
||||
static util::synchronized_value<std::unique_ptr<MainExecutor>> default_executor;
|
||||
|
||||
MainExecutor& MainExecutor::instance()
|
||||
{
|
||||
if (!main_executor)
|
||||
{
|
||||
// When no MainExecutor was created before we create a static one.
|
||||
std::scoped_lock lock{ default_executor_mutex };
|
||||
auto synched_default_executor = default_executor.synchronize();
|
||||
if (!main_executor) // double check necessary to avoid data race
|
||||
{
|
||||
default_executor = std::make_unique<MainExecutor>();
|
||||
assert(main_executor == default_executor.get());
|
||||
*synched_default_executor = std::make_unique<MainExecutor>();
|
||||
assert(main_executor == synched_default_executor->get());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -126,8 +126,7 @@ namespace mamba
|
|||
|
||||
void MainExecutor::stop_default()
|
||||
{
|
||||
std::scoped_lock lock{ default_executor_mutex };
|
||||
default_executor.reset();
|
||||
default_executor->reset();
|
||||
}
|
||||
|
||||
MainExecutor::MainExecutor()
|
||||
|
|
|
@ -1065,6 +1065,32 @@ namespace mamba
|
|||
#pragma GCC diagnostic pop
|
||||
#endif
|
||||
|
||||
struct LockedFilesRegistry_Data // made public to workaround CWG2335, should be private
|
||||
// otherwise
|
||||
{
|
||||
// TODO: replace by something like boost::multiindex or equivalent to avoid having
|
||||
// to handle 2 hashmaps
|
||||
std::unordered_map<fs::u8path, std::weak_ptr<LockFileOwner>> locked_files; // TODO:
|
||||
// consider
|
||||
// replacing
|
||||
// by
|
||||
// real
|
||||
// concurrent
|
||||
// set
|
||||
// to
|
||||
// avoid
|
||||
// having
|
||||
// to
|
||||
// lock
|
||||
// the
|
||||
// whole
|
||||
// container
|
||||
|
||||
std::unordered_map<int, fs::u8path> fd_to_locked_path; // this is a workaround the
|
||||
// usage of file descriptors
|
||||
// on linux instead of paths
|
||||
};
|
||||
|
||||
class LockedFilesRegistry
|
||||
{
|
||||
public:
|
||||
|
@ -1105,10 +1131,10 @@ namespace mamba
|
|||
}
|
||||
|
||||
const auto absolute_file_path = fs::absolute(file_path);
|
||||
std::scoped_lock lock{ mutex };
|
||||
auto data = m_data.synchronize();
|
||||
|
||||
const auto it = locked_files.find(absolute_file_path);
|
||||
if (it != locked_files.end())
|
||||
const auto it = data->locked_files.find(absolute_file_path);
|
||||
if (it != data->locked_files.end())
|
||||
{
|
||||
if (auto lockedfile = it->second.lock())
|
||||
{
|
||||
|
@ -1123,8 +1149,8 @@ namespace mamba
|
|||
{
|
||||
auto lockedfile = std::make_shared<LockFileOwner>(absolute_file_path, timeout);
|
||||
auto tracker = std::weak_ptr{ lockedfile };
|
||||
locked_files.insert_or_assign(absolute_file_path, std::move(tracker));
|
||||
fd_to_locked_path.insert_or_assign(lockedfile->fd(), absolute_file_path);
|
||||
data->locked_files.insert_or_assign(absolute_file_path, std::move(tracker));
|
||||
data->fd_to_locked_path.insert_or_assign(lockedfile->fd(), absolute_file_path);
|
||||
assert(is_lockfile_locked(*lockedfile));
|
||||
return lockedfile;
|
||||
}
|
||||
|
@ -1135,9 +1161,9 @@ namespace mamba
|
|||
bool is_locked(const fs::u8path& file_path) const
|
||||
{
|
||||
const auto absolute_file_path = fs::absolute(file_path);
|
||||
std::scoped_lock lock{ mutex };
|
||||
auto it = locked_files.find(file_path);
|
||||
if (it != locked_files.end())
|
||||
auto data = m_data.synchronize();
|
||||
auto it = data->locked_files.find(file_path);
|
||||
if (it != data->locked_files.end())
|
||||
{
|
||||
return !it->second.expired();
|
||||
}
|
||||
|
@ -1150,9 +1176,9 @@ namespace mamba
|
|||
// note: the resulting value will be obsolete before returning.
|
||||
bool is_locked(int fd) const
|
||||
{
|
||||
std::scoped_lock lock{ mutex };
|
||||
const auto it = fd_to_locked_path.find(fd);
|
||||
if (it != fd_to_locked_path.end())
|
||||
auto data = m_data.synchronize();
|
||||
const auto it = data->fd_to_locked_path.find(fd);
|
||||
if (it != data->fd_to_locked_path.end())
|
||||
{
|
||||
return is_locked(it->second);
|
||||
}
|
||||
|
@ -1162,30 +1188,12 @@ namespace mamba
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
|
||||
std::atomic_bool m_is_file_locking_allowed{ true };
|
||||
std::atomic<std::chrono::seconds> m_default_lock_timeout{ std::chrono::seconds::zero() };
|
||||
|
||||
// TODO: replace by something like boost::multiindex or equivalent to avoid having to
|
||||
// handle 2 hashmaps
|
||||
std::unordered_map<fs::u8path, std::weak_ptr<LockFileOwner>> locked_files; // TODO:
|
||||
// consider
|
||||
// replacing
|
||||
// by real
|
||||
// concurrent
|
||||
// set to
|
||||
// avoid
|
||||
// having to
|
||||
// lock the
|
||||
// whole
|
||||
// container
|
||||
|
||||
std::unordered_map<int, fs::u8path> fd_to_locked_path; // this is a workaround the
|
||||
// usage of file descriptors on
|
||||
// linux instead of paths
|
||||
mutable std::recursive_mutex mutex; // TODO: replace by synchronized_value once
|
||||
// available
|
||||
util::synchronized_value<LockedFilesRegistry_Data, std::recursive_mutex> m_data;
|
||||
};
|
||||
|
||||
static LockedFilesRegistry files_locked_by_this_process;
|
||||
|
|
|
@ -92,23 +92,27 @@ namespace mamba::download
|
|||
|
||||
std::size_t Mirror::successful_transfers() const
|
||||
{
|
||||
return m_successful_transfers;
|
||||
return m_stats->successful_transfers;
|
||||
}
|
||||
|
||||
std::size_t Mirror::failed_transfers() const
|
||||
{
|
||||
return m_failed_transfers;
|
||||
return m_stats->failed_transfers;
|
||||
}
|
||||
|
||||
bool Mirror::can_accept_more_connections() const
|
||||
{
|
||||
return !m_allowed_connections.has_value() || m_running_transfers < m_allowed_connections;
|
||||
const auto stats = m_stats.synchronize();
|
||||
return !stats->allowed_connections.has_value()
|
||||
|| stats->running_transfers < stats->allowed_connections;
|
||||
}
|
||||
|
||||
bool Mirror::can_retry_with_fewer_connections() const
|
||||
{
|
||||
return m_running_transfers > 0
|
||||
|| (m_successful_transfers > 0 && m_failed_transfers < m_max_tried_connections);
|
||||
const auto stats = m_stats.synchronize();
|
||||
return stats->running_transfers > 0
|
||||
|| (stats->successful_transfers > 0
|
||||
&& stats->failed_transfers < stats->max_tried_connections);
|
||||
}
|
||||
|
||||
namespace
|
||||
|
@ -118,40 +122,40 @@ namespace mamba::download
|
|||
|
||||
void Mirror::cap_allowed_connections()
|
||||
{
|
||||
lock_guard lock(m_stats_mutex);
|
||||
if (m_running_transfers > 0)
|
||||
auto stats = m_stats.synchronize();
|
||||
if (stats->running_transfers > 0)
|
||||
{
|
||||
m_allowed_connections = m_running_transfers;
|
||||
stats->allowed_connections = stats->running_transfers;
|
||||
}
|
||||
else
|
||||
{
|
||||
m_allowed_connections = std::size_t(1);
|
||||
stats->allowed_connections = std::size_t(1);
|
||||
}
|
||||
}
|
||||
|
||||
void Mirror::increase_running_transfers()
|
||||
{
|
||||
lock_guard lock(m_stats_mutex);
|
||||
++m_running_transfers;
|
||||
if (m_max_tried_connections < m_running_transfers)
|
||||
auto stats = m_stats.synchronize();
|
||||
++stats->running_transfers;
|
||||
if (stats->max_tried_connections < stats->running_transfers)
|
||||
{
|
||||
m_max_tried_connections = m_running_transfers;
|
||||
stats->max_tried_connections = stats->running_transfers;
|
||||
}
|
||||
}
|
||||
|
||||
void Mirror::update_transfers_done(bool success, bool record_success)
|
||||
{
|
||||
lock_guard lock(m_stats_mutex);
|
||||
--m_running_transfers;
|
||||
auto stats = m_stats.synchronize();
|
||||
--stats->running_transfers;
|
||||
if (record_success)
|
||||
{
|
||||
if (success)
|
||||
{
|
||||
++m_successful_transfers;
|
||||
++stats->successful_transfers;
|
||||
}
|
||||
else
|
||||
{
|
||||
++m_failed_transfers;
|
||||
++stats->failed_transfers;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,6 +95,26 @@ namespace
|
|||
auto operator<=>(const ValueType&) const noexcept = default;
|
||||
};
|
||||
|
||||
struct ConvertibleToValueType
|
||||
{
|
||||
int i = 0;
|
||||
|
||||
operator ValueType() const
|
||||
{
|
||||
return { i };
|
||||
}
|
||||
};
|
||||
|
||||
struct ComparableToValueType
|
||||
{
|
||||
int j = 0;
|
||||
};
|
||||
|
||||
bool operator==(const ValueType& left, const ComparableToValueType& right)
|
||||
{
|
||||
return left.x == right.j;
|
||||
}
|
||||
|
||||
// NOTE: We do not use TEMPLATE_TEST_CASE or TEMPLATE_LIST_TEST_CASE here because code coverage
|
||||
// tools (such as gcov/lcov) do not properly attribute coverage to tests instantiated via
|
||||
// template test cases. Instead, we use individual TEST_CASEs for each mutex type, and factorize
|
||||
|
@ -113,6 +133,22 @@ namespace
|
|||
synchronized_value a;
|
||||
}
|
||||
|
||||
SECTION("compatible value assignation")
|
||||
{
|
||||
synchronized_value a;
|
||||
a = ConvertibleToValueType{ 1234 };
|
||||
REQUIRE(a->x == 1234);
|
||||
}
|
||||
|
||||
SECTION("compatible comparison")
|
||||
{
|
||||
synchronized_value a;
|
||||
ComparableToValueType x{ a->x };
|
||||
REQUIRE(a == x);
|
||||
ComparableToValueType y{ a->x + 1 };
|
||||
REQUIRE(a != y);
|
||||
}
|
||||
|
||||
static constexpr auto initial_value = ValueType{ 42 };
|
||||
synchronized_value sv{ initial_value };
|
||||
|
||||
|
|
Loading…
Reference in New Issue