Added PackageFetcher (#2917)

* Added PackageFetcher

* slight refactoring

* Additional cleanup and first pass according to review

* Last pass according to review
This commit is contained in:
Johan Mabille 2023-10-19 15:26:57 +02:00 committed by GitHub
parent e63fe80d61
commit 00dd002fe6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1353 additions and 2114 deletions

View File

@ -154,7 +154,6 @@ set(LIBMAMBA_SOURCES
${LIBMAMBA_SOURCE_DIR}/core/environment.cpp
${LIBMAMBA_SOURCE_DIR}/core/environments_manager.cpp
${LIBMAMBA_SOURCE_DIR}/core/error_handling.cpp
${LIBMAMBA_SOURCE_DIR}/core/fetch.cpp
${LIBMAMBA_SOURCE_DIR}/core/transaction_context.cpp
${LIBMAMBA_SOURCE_DIR}/core/link.cpp
${LIBMAMBA_SOURCE_DIR}/core/history.cpp
@ -169,6 +168,7 @@ set(LIBMAMBA_SOURCES
${LIBMAMBA_SOURCE_DIR}/core/progress_bar.cpp
${LIBMAMBA_SOURCE_DIR}/core/progress_bar_impl.cpp
${LIBMAMBA_SOURCE_DIR}/core/pinning.cpp
${LIBMAMBA_SOURCE_DIR}/core/package_fetcher.cpp
${LIBMAMBA_SOURCE_DIR}/core/package_info.cpp
${LIBMAMBA_SOURCE_DIR}/core/package_paths.cpp
${LIBMAMBA_SOURCE_DIR}/core/query.cpp
@ -179,7 +179,6 @@ set(LIBMAMBA_SOURCES
${LIBMAMBA_SOURCE_DIR}/core/subdirdata.cpp
${LIBMAMBA_SOURCE_DIR}/core/thread_utils.cpp
${LIBMAMBA_SOURCE_DIR}/core/transaction.cpp
${LIBMAMBA_SOURCE_DIR}/core/package_download.cpp
${LIBMAMBA_SOURCE_DIR}/core/util.cpp
${LIBMAMBA_SOURCE_DIR}/core/fsutil.cpp
${LIBMAMBA_SOURCE_DIR}/core/util_os.cpp
@ -249,7 +248,6 @@ set(LIBMAMBA_PUBLIC_HEADERS
${LIBMAMBA_INCLUDE_DIR}/mamba/core/environment.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/environments_manager.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/error_handling.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/fetch.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/satisfiability_error.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/history.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/link.hpp
@ -257,6 +255,7 @@ set(LIBMAMBA_PUBLIC_HEADERS
${LIBMAMBA_INCLUDE_DIR}/mamba/core/menuinst.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/output.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/package_cache.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/package_fetcher.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/package_handling.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/package_info.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/package_paths.hpp
@ -272,7 +271,6 @@ set(LIBMAMBA_PUBLIC_HEADERS
${LIBMAMBA_INCLUDE_DIR}/mamba/core/solver.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/subdirdata.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/thread_utils.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/package_download.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/transaction.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/transaction_context.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/util.hpp

View File

@ -131,6 +131,15 @@ namespace mamba
DownloadMonitor* monitor = nullptr
);
DownloadResult download(
DownloadRequest request,
const Context& context,
DownloadOptions options = {},
DownloadMonitor* monitor = nullptr
);
bool check_resource_exists(const std::string& url, const Context& context);
}
#endif

View File

@ -13,6 +13,7 @@
#include "mamba/core/context.hpp"
#include "mamba/core/download.hpp"
#include "mamba/core/package_fetcher.hpp"
namespace mamba
{
@ -22,20 +23,20 @@ namespace mamba
bool no_clear_progress_bar = false;
};
class DownloadProgressBar : public DownloadMonitor
class SubdirDataMonitor : public DownloadMonitor
{
public:
static bool can_monitor(const Context& context);
explicit DownloadProgressBar(MonitorOptions options = {});
virtual ~DownloadProgressBar() = default;
explicit SubdirDataMonitor(MonitorOptions options = {});
virtual ~SubdirDataMonitor() = default;
DownloadProgressBar(const DownloadProgressBar&) = delete;
DownloadProgressBar& operator=(const DownloadProgressBar&) = delete;
SubdirDataMonitor(const SubdirDataMonitor&) = delete;
SubdirDataMonitor& operator=(const SubdirDataMonitor&) = delete;
DownloadProgressBar(DownloadProgressBar&&) = delete;
DownloadProgressBar& operator=(DownloadProgressBar&&) = delete;
SubdirDataMonitor(SubdirDataMonitor&&) = delete;
SubdirDataMonitor& operator=(SubdirDataMonitor&&) = delete;
void reset_options(MonitorOptions options);
@ -52,14 +53,61 @@ namespace mamba
void complete_checking_progress_bar(std::size_t index);
std::function<void(ProgressBarRepr&)> download_repr(std::size_t index);
using time_point = std::chrono::steady_clock::time_point;
std::vector<time_point> m_throttle_time;
std::vector<ProgressProxy> m_progress_bar;
MonitorOptions m_options;
};
class PackageDownloadMonitor : public DownloadMonitor
{
public:
static bool can_monitor(const Context& context);
PackageDownloadMonitor() = default;
virtual ~PackageDownloadMonitor();
PackageDownloadMonitor(const PackageDownloadMonitor&) = delete;
PackageDownloadMonitor& operator=(const PackageDownloadMonitor&) = delete;
PackageDownloadMonitor(PackageDownloadMonitor&&) = delete;
PackageDownloadMonitor& operator=(PackageDownloadMonitor&&) = delete;
// Requires extract_tasks.size() >= requests.size()
// Requires for i in [0, dl_requests.size()), extract_tasks[i].needs_download()
void observe(
MultiDownloadRequest& dl_requests,
std::vector<PackageExtractTask>& extract_tasks,
DownloadOptions& options
);
void end_monitoring();
private:
void init_extract_bar(ProgressProxy& extract_bar);
void init_download_bar(ProgressProxy& download_bar);
void init_aggregated_extract();
void init_aggregated_download();
void update_extract_bar(std::size_t index, PackageExtractEvent event);
void observe_impl(MultiDownloadRequest& requests, DownloadOptions& options) override;
void on_done_impl() override;
void on_unexpected_termination_impl() override;
void update_progress_bar(std::size_t index, const DownloadEvent& event);
void update_progress_bar(std::size_t index, const DownloadProgress& progress);
void update_progress_bar(std::size_t index, const DownloadError& error);
void update_progress_bar(std::size_t index, const DownloadSuccess& success);
std::vector<ProgressProxy> m_extract_bar;
using time_point = std::chrono::steady_clock::time_point;
std::vector<time_point> m_throttle_time;
std::vector<ProgressProxy> m_download_bar;
};
}
#endif

View File

@ -1,204 +0,0 @@
// Copyright (c) 2019, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#ifndef MAMBA_CORE_FETCH_HPP
#define MAMBA_CORE_FETCH_HPP
#include <fstream>
#include <memory>
#include <string>
#include <vector>
///////////////////////////////////////
// TODO to remove
// For now used for curl_off_t in progress_callback and for CURLcode in set_result
extern "C"
{
#include <curl/curl.h>
}
////////////////////////////////////
#include "progress_bar.hpp"
namespace mamba
{
struct ZstdStream;
struct Bzip2Stream;
class Context;
class CURLHandle;
class CURLMultiHandle;
/******************************
* Config and Context params *
******************************/
void get_config(
const Context& context,
bool& set_low_speed_opt,
bool& set_ssl_no_revoke,
long& connect_timeout_secs,
std::string& ssl_verify
);
std::size_t get_default_retry_timeout(const Context& context);
/*******************
* DownloadTarget *
*******************/
class DownloadTarget
{
public:
DownloadTarget(
Context& context,
const std::string& name,
const std::string& url,
const std::string& filename
);
~DownloadTarget();
DownloadTarget(const DownloadTarget&) = delete;
DownloadTarget& operator=(const DownloadTarget&) = delete;
DownloadTarget(DownloadTarget&&) = delete;
DownloadTarget& operator=(DownloadTarget&&) = delete;
static size_t write_callback(char* ptr, size_t size, size_t nmemb, void* self);
static size_t header_callback(char* buffer, size_t size, size_t nitems, void* self);
static int progress_callback(
void*,
curl_off_t total_to_download,
curl_off_t now_downloaded,
curl_off_t,
curl_off_t
);
void set_mod_etag_headers(const std::string& mod, const std::string& etag);
void set_progress_bar(ProgressProxy progress_proxy);
void set_expected_size(std::size_t size);
void set_head_only(bool yes);
const std::string& get_name() const;
const std::string& get_url() const;
const std::string& get_etag() const;
const std::string& get_mod() const;
const std::string& get_cache_control() const;
std::size_t get_expected_size() const;
int get_http_status() const;
std::size_t get_downloaded_size() const;
std::size_t get_speed();
void init_curl_ssl();
void init_curl_target(const std::string& url);
template <class C>
inline void set_finalize_callback(bool (C::*cb)(const DownloadTarget&), C* data)
{
m_finalize_callback = std::bind(cb, data, std::placeholders::_1);
}
void set_ignore_failure(bool yes)
{
m_ignore_failure = yes;
}
bool get_ignore_failure() const
{
return m_ignore_failure;
}
std::size_t get_result() const;
// TODO find a way to move this from the API
void set_result(CURLcode res);
bool resource_exists();
bool perform();
bool check_result();
bool finalize();
std::string get_transfer_msg();
bool can_retry();
bool retry();
std::chrono::steady_clock::time_point progress_throttle_time() const;
void set_progress_throttle_time(const std::chrono::steady_clock::time_point& time);
const CURLHandle& get_curl_handle() const;
const Context& context() const
{
return m_context;
}
private:
Context& m_context;
std::unique_ptr<ZstdStream> m_zstd_stream;
std::unique_ptr<Bzip2Stream> m_bzip2_stream;
std::unique_ptr<CURLHandle> m_curl_handle;
std::function<bool(const DownloadTarget&)> m_finalize_callback;
std::string m_name, m_filename, m_url;
int m_http_status;
std::size_t m_downloaded_size;
char* m_effective_url;
std::string m_etag, m_mod, m_cache_control;
// validation
std::size_t m_expected_size;
// retry & backoff
std::chrono::steady_clock::time_point m_next_retry;
std::size_t m_retry_wait_seconds;
std::size_t m_retries;
bool m_has_progress_bar;
bool m_ignore_failure;
ProgressProxy m_progress_bar;
std::ofstream m_file;
std::function<void(ProgressBarRepr&)> download_repr();
std::chrono::steady_clock::time_point m_progress_throttle_time;
};
class MultiDownloadTarget
{
public:
explicit MultiDownloadTarget(const Context& context);
~MultiDownloadTarget();
void add(DownloadTarget* target);
bool download(int options);
private:
bool check_msgs(bool failfast);
const Context& m_context;
std::vector<DownloadTarget*> m_targets;
std::vector<DownloadTarget*> m_retry_targets;
std::unique_ptr<CURLMultiHandle> p_curl_handle;
};
const int MAMBA_DOWNLOAD_FAILFAST = 1 << 0;
const int MAMBA_DOWNLOAD_SORT = 1 << 1;
const int MAMBA_NO_CLEAR_PROGRESS_BARS = 1 << 2;
} // namespace mamba
#endif // MAMBA_FETCH_HPP

View File

@ -1,105 +0,0 @@
// Copyright (c) 2023, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#ifndef MAMBA_CORE_PACKAGE_DOWNLOAD_HPP
#define MAMBA_CORE_PACKAGE_DOWNLOAD_HPP
#include <future>
#include <memory>
#include <set>
#include <string>
#include <tuple>
#include <vector>
#include "mamba/fs/filesystem.hpp"
#include "fetch.hpp"
#include "package_cache.hpp"
#include "progress_bar.hpp"
#include "tasksync.hpp"
#include "thread_utils.hpp"
namespace mamba
{
class ChannelContext;
class PackageDownloadExtractTarget
{
public:
enum VALIDATION_RESULT
{
UNDEFINED = 0,
VALID = 1,
SHA256_ERROR,
MD5SUM_ERROR,
SIZE_ERROR,
EXTRACT_ERROR
};
// TODO: REVIEW: consider caputring a reference to the context from the ChannelContext, if
// that makes sense.
PackageDownloadExtractTarget(const PackageInfo& pkg_info, ChannelContext& channel_context);
void write_repodata_record(const fs::u8path& base_path);
void add_url();
bool finalize_callback(const DownloadTarget& target);
bool finished();
void validate();
bool extract(const Context& context);
bool extract_from_cache(const Context& context);
bool validate_extract(const Context& context);
const std::string& name() const;
std::size_t expected_size() const;
VALIDATION_RESULT validation_result() const;
void clear_cache() const;
DownloadTarget* target(Context& context, MultiPackageCache& cache);
std::exception m_decompress_exception;
private:
bool m_finished;
PackageInfo m_package_info;
std::string m_sha256, m_md5;
std::size_t m_expected_size;
bool m_has_progress_bars = false;
ProgressProxy m_download_bar, m_extract_bar;
std::unique_ptr<DownloadTarget> m_target;
std::string m_url, m_name, m_filename;
fs::u8path m_tarball_path, m_cache_path;
std::future<bool> m_extract_future;
VALIDATION_RESULT m_validation_result = VALIDATION_RESULT::UNDEFINED;
TaskSynchronizer m_tasksync;
std::function<void(ProgressBarRepr&)> extract_repr();
std::function<void(ProgressProxy&)> extract_progress_callback();
};
class DownloadExtractSemaphore
{
public:
static std::ptrdiff_t get_max();
static void set_max(int value);
private:
static counting_semaphore semaphore;
friend class PackageDownloadExtractTarget;
};
} // namespace mamba
#endif

View File

@ -0,0 +1,154 @@
// Copyright (c) 2023, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#ifndef MAMBA_CORE_PACKAGE_FETCHER_HPP
#define MAMBA_CORE_PACKAGE_FETCHER_HPP
#include <functional>
#include "mamba/core/channel.hpp"
#include "mamba/core/download.hpp"
#include "mamba/core/package_cache.hpp"
#include "mamba/core/package_handling.hpp"
#include "mamba/core/package_info.hpp"
#include "mamba/core/thread_utils.hpp"
namespace mamba
{
class PackageFetcher;
enum class PackageExtractEvent
{
validate_update,
validate_success,
validate_failure,
extract_update,
extract_success,
extract_failure
};
class PackageExtractTask
{
public:
struct Result
{
bool valid;
bool extracted;
};
using progress_callback_t = std::function<void(PackageExtractEvent)>;
PackageExtractTask(PackageFetcher* fetcher, ExtractOptions options);
const std::string& name() const;
bool needs_download() const;
void set_progress_callback(progress_callback_t cb);
Result run();
Result run(std::size_t downloaded_size);
private:
progress_callback_t* get_progress_callback();
PackageFetcher* p_fetcher;
ExtractOptions m_options;
std::optional<progress_callback_t> m_progress_callback = std::nullopt;
};
class PackageFetcher
{
public:
enum class ValidationResult
{
UNDEFINED = 0,
VALID = 1,
SHA256_ERROR,
MD5SUM_ERROR,
SIZE_ERROR,
EXTRACT_ERROR
};
using post_download_success_t = std::function<void(std::size_t)>;
using progress_callback_t = std::function<void(PackageExtractEvent)>;
PackageFetcher(
const PackageInfo& pkg_info,
ChannelContext& channel_context,
MultiPackageCache& caches
);
const std::string& name() const;
bool needs_download() const;
bool needs_extract() const;
DownloadRequest
build_download_request(std::optional<post_download_success_t> callback = std::nullopt);
ValidationResult
validate(std::size_t downloaded_size, progress_callback_t* cb = nullptr) const;
bool extract(const ExtractOptions& options, progress_callback_t* cb = nullptr);
// The PackageFetcher object should be stable in memory (i.e. not moved) after this
// method has been called, until the PackageExtractTask has been completed.
PackageExtractTask build_extract_task(ExtractOptions options);
void clear_cache() const;
private:
struct CheckSumParams
{
std::string expected;
std::string actual;
std::string name;
ValidationResult error;
};
const std::string& filename() const;
const std::string& url() const;
const std::string& sha256() const;
const std::string& md5() const;
std::size_t expected_size() const;
ValidationResult validate_size(std::size_t downloaded_size) const;
ValidationResult validate_checksum(CheckSumParams params) const;
void write_repodata_record(const fs::u8path& base_path) const;
void update_urls_txt() const;
void update_monitor(progress_callback_t* cb, PackageExtractEvent event) const;
PackageInfo m_package_info;
std::string m_url = "";
fs::u8path m_tarball_path;
fs::u8path m_cache_path;
bool m_needs_download = false;
bool m_needs_extract = false;
};
class PackageFetcherSemaphore
{
public:
static std::ptrdiff_t get_max();
static void set_max(int value);
private:
static counting_semaphore semaphore;
friend class PackageFetcher;
};
}
#endif

View File

@ -22,8 +22,6 @@
#include "mamba/core/util.hpp"
#include "mamba/fs/filesystem.hpp"
#include "package_handling.hpp"
namespace mamba
{
class DownloadMonitor;

View File

@ -100,10 +100,10 @@ namespace mamba
}
expected_t<void> download_res;
if (DownloadProgressBar::can_monitor(ctx))
if (SubdirDataMonitor::can_monitor(ctx))
{
DownloadProgressBar check_monitor({ true, true });
DownloadProgressBar index_monitor;
SubdirDataMonitor check_monitor({ true, true });
SubdirDataMonitor index_monitor;
download_res = MSubdirData::download_indexes(subdirs, ctx, &check_monitor, &index_monitor);
}
else

View File

@ -18,7 +18,7 @@
#include "mamba/core/environment.hpp"
#include "mamba/core/fsutil.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/package_download.hpp"
#include "mamba/core/package_fetcher.hpp"
#include "mamba/core/util.hpp"
#include "mamba/util/build.hpp"
#include "mamba/util/string.hpp"
@ -904,7 +904,7 @@ namespace mamba
void extract_threads_hook(const Context& context)
{
DownloadExtractSemaphore::set_max(context.threads_params.extract_threads);
PackageFetcherSemaphore::set_max(context.threads_params.extract_threads);
}
}

View File

@ -17,9 +17,9 @@
#include "mamba/api/install.hpp"
#include "mamba/core/activation.hpp"
#include "mamba/core/channel.hpp"
#include "mamba/core/download.hpp"
#include "mamba/core/env_lockfile.hpp"
#include "mamba/core/environments_manager.hpp"
#include "mamba/core/fetch.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/package_cache.hpp"
#include "mamba/core/pinning.hpp"
@ -734,14 +734,10 @@ namespace mamba
{
LOG_INFO << "Downloading lockfile";
tmp_lock_file = std::make_unique<TemporaryFile>();
DownloadTarget dt(
channel_context.context(),
"Environment Lockfile",
lockfile,
tmp_lock_file->path()
);
bool success = dt.perform();
if (!success || dt.get_http_status() != 200)
DownloadRequest request("Environment Lockfile", lockfile, tmp_lock_file->path());
DownloadResult res = download(std::move(request), channel_context.context());
if (!res || res.value().transfer.http_status != 200)
{
throw std::runtime_error(
fmt::format("Could not download environment lockfile from {}", lockfile)

View File

@ -6,6 +6,8 @@
#include <iostream>
#include <solv/solver.h>
#include "mamba/api/channel_loader.hpp"
#include "mamba/api/configuration.hpp"
#include "mamba/api/repoquery.hpp"

View File

@ -4,6 +4,8 @@
//
// The full license is in the file LICENSE, distributed with this software.
#include <solv/solver.h>
#include "mamba/api/channel_loader.hpp"
#include "mamba/api/configuration.hpp"
#include "mamba/api/update.hpp"

View File

@ -234,7 +234,6 @@ namespace mamba
CURLHandle::CURLHandle() //(const Context& ctx)
: m_handle(curl_easy_init())
, m_result(CURLE_OK)
{
if (m_handle == nullptr)
{
@ -253,7 +252,6 @@ namespace mamba
rhs.m_handle = nullptr;
rhs.p_headers = nullptr;
std::swap(m_errorbuffer, rhs.m_errorbuffer);
std::swap(m_result, rhs.m_result);
set_opt(CURLOPT_ERRORBUFFER, m_errorbuffer.data());
}
@ -261,7 +259,6 @@ namespace mamba
{
using std::swap;
swap(m_handle, rhs.m_handle);
swap(m_result, rhs.m_result);
swap(p_headers, rhs.p_headers);
swap(m_errorbuffer, rhs.m_errorbuffer);
set_opt(CURLOPT_ERRORBUFFER, m_errorbuffer.data());
@ -459,34 +456,9 @@ namespace mamba
return get_info<std::string>(CURLINFO_EFFECTIVE_URL).value();
}
std::size_t CURLHandle::get_result() const
CURLcode CURLHandle::perform()
{
return static_cast<std::size_t>(m_result);
}
bool CURLHandle::is_curl_res_ok() const
{
return is_curl_res_ok(m_result);
}
void CURLHandle::set_result(CURLcode res)
{
m_result = res;
}
std::string CURLHandle::get_res_error() const
{
return get_res_error(m_result);
}
bool CURLHandle::can_proceed()
{
return can_retry(m_result);
}
void CURLHandle::perform()
{
m_result = curl_easy_perform(m_handle);
return curl_easy_perform(m_handle);
}
CURLId CURLHandle::get_id() const

View File

@ -140,16 +140,7 @@ namespace mamba
const char* get_error_buffer() const;
std::string get_curl_effective_url() const;
[[deprecated]] std::size_t get_result() const;
[[deprecated]] bool is_curl_res_ok() const;
[[deprecated]] void set_result(CURLcode res);
[[deprecated]] std::string get_res_error() const;
// Side-effect programming, to remove
[[deprecated]] bool can_proceed();
void perform();
CURLcode perform();
CURLId get_id() const;
@ -161,7 +152,6 @@ namespace mamba
private:
CURL* m_handle;
CURLcode m_result; // Enum range from 0 to 99
curl_slist* p_headers = nullptr;
std::array<char, CURL_ERROR_SIZE> m_errorbuffer;

View File

@ -90,6 +90,30 @@ namespace mamba
remote_fetch_params.curl_initialized = true;
}
}
struct EnvRemoteParams
{
bool set_low_speed_opt = false;
bool set_ssl_no_revoke = false;
};
EnvRemoteParams get_env_remote_params(const Context& context)
{
// TODO: we should probably store set_low_speed_limit and set_ssl_no_revoke in
// RemoteFetchParams if the request is slower than 30b/s for 60 seconds, cancel.
const std::string no_low_speed_limit = std::getenv("MAMBA_NO_LOW_SPEED_LIMIT")
? std::getenv("MAMBA_NO_LOW_SPEED_LIMIT")
: "0";
const bool set_low_speed_opt = (no_low_speed_limit == "0");
const std::string ssl_no_revoke_env = std::getenv("MAMBA_SSL_NO_REVOKE")
? std::getenv("MAMBA_SSL_NO_REVOKE")
: "0";
const bool set_ssl_no_revoke = context.remote_fetch_params.ssl_no_revoke
|| (ssl_no_revoke_env != "0");
return { set_low_speed_opt, set_ssl_no_revoke };
}
}
/**********************************
@ -231,18 +255,7 @@ namespace mamba
void DownloadAttempt::configure_handle(const Context& context)
{
// TODO: we should probably store set_low_speed_limit and set_ssl_no_revoke in
// RemoteFetchParams if the request is slower than 30b/s for 60 seconds, cancel.
const std::string no_low_speed_limit = std::getenv("MAMBA_NO_LOW_SPEED_LIMIT")
? std::getenv("MAMBA_NO_LOW_SPEED_LIMIT")
: "0";
const bool set_low_speed_opt = (no_low_speed_limit == "0");
const std::string ssl_no_revoke_env = std::getenv("MAMBA_SSL_NO_REVOKE")
? std::getenv("MAMBA_SSL_NO_REVOKE")
: "0";
const bool set_ssl_no_revoke = context.remote_fetch_params.ssl_no_revoke
|| (ssl_no_revoke_env != "0");
const auto [set_low_speed_opt, set_ssl_no_revoke] = get_env_remote_params(context);
m_handle.configure_handle(
util::file_uri_unc2_to_unc4(p_request->url),
@ -827,7 +840,7 @@ namespace mamba
{
if (!context.remote_fetch_params.curl_initialized)
{
// TODO: MOve this into an object that would be autmotacially initialized
// TODO: Move this into an object that would be automatically initialized
// upon construction, and passed by const reference to this function instead
// of context.
Context& ctx = const_cast<Context&>(context);
@ -847,4 +860,35 @@ namespace mamba
return dl.download();
}
}
DownloadResult
download(DownloadRequest request, const Context& context, DownloadOptions options, DownloadMonitor* monitor)
{
MultiDownloadRequest req(1u, std::move(request));
auto res = download(std::move(req), context, std::move(options), monitor);
return std::move(res.front());
}
bool check_resource_exists(const std::string& url, const Context& context)
{
if (!context.remote_fetch_params.curl_initialized)
{
// TODO: Move this into an object that would be automatically initialized
// upon construction, and passed by const reference to this function instead
// of context.
Context& ctx = const_cast<Context&>(context);
init_remote_fetch_params(ctx.remote_fetch_params);
}
const auto [set_low_speed_opt, set_ssl_no_revoke] = get_env_remote_params(context);
return curl::check_resource_exists(
util::file_uri_unc2_to_unc4(url),
set_low_speed_opt,
context.remote_fetch_params.connect_timeout_secs,
set_ssl_no_revoke,
proxy_match(url, context.remote_fetch_params.proxy_servers),
context.remote_fetch_params.ssl_verify
);
}
}

View File

@ -4,112 +4,47 @@
namespace mamba
{
DownloadProgressBar::DownloadProgressBar(MonitorOptions options)
: m_options(std::move(options))
{
}
/*******************************
* ProgressBar for downloading *
*******************************/
void DownloadProgressBar::reset_options(MonitorOptions options)
namespace
{
m_options = std::move(options);
}
using time_point = std::chrono::steady_clock::time_point;
bool DownloadProgressBar::can_monitor(const Context& context)
{
return !(
context.graphics_params.no_progress_bars || context.output_params.json
|| context.output_params.quiet
);
}
void DownloadProgressBar::observe_impl(MultiDownloadRequest& requests, DownloadOptions& options)
{
m_throttle_time.resize(requests.size(), std::chrono::steady_clock::now());
m_progress_bar.reserve(requests.size());
for (std::size_t i = 0; i < requests.size(); ++i)
void update_progress_bar(
ProgressProxy& progress_bar,
time_point& throttle_time,
const DownloadProgress& progress
)
{
m_progress_bar.push_back(Console::instance().add_progress_bar(requests[i].name));
m_progress_bar.back().set_repr_hook(download_repr(i));
const auto now = std::chrono::steady_clock::now();
if (m_options.checking_download)
const auto throttle_treshold = std::chrono::milliseconds(50);
if (now - throttle_time < throttle_treshold)
{
m_progress_bar.back().repr().postfix.set_value("Checking");
return;
}
requests[i].progress = [this, i](const DownloadEvent& e) { update_progress_bar(i, e); };
}
auto& pbar_manager = Console::instance().progress_bar_manager();
if (!pbar_manager.started())
{
pbar_manager.watch_print();
}
options.on_unexpected_termination = [this]() { on_unexpected_termination(); };
}
void DownloadProgressBar::on_done_impl()
{
auto& pbar_manager = Console::instance().progress_bar_manager();
if (pbar_manager.started())
{
pbar_manager.terminate();
if (!m_options.no_clear_progress_bar)
else
{
pbar_manager.clear_progress_bars();
throttle_time = now;
}
}
m_throttle_time.clear();
m_progress_bar.clear();
m_options = MonitorOptions{};
}
void DownloadProgressBar::on_unexpected_termination_impl()
{
Console::instance().progress_bar_manager().terminate();
}
if (!progress.total_to_download)
{
progress_bar.activate_spinner();
}
else
{
progress_bar.deactivate_spinner();
}
void DownloadProgressBar::update_progress_bar(std::size_t index, const DownloadEvent& event)
{
std::visit([this, index](auto&& arg) { update_progress_bar(index, arg); }, event);
}
void DownloadProgressBar::update_progress_bar(std::size_t index, const DownloadProgress& progress)
{
auto now = std::chrono::steady_clock::now();
const auto throttle_treshold = std::chrono::milliseconds(50);
if (now - m_throttle_time[index] < throttle_treshold)
{
return;
}
else
{
m_throttle_time[index] = now;
progress_bar.update_progress(progress.downloaded_size, progress.total_to_download);
progress_bar.set_speed(progress.speed_Bps);
}
ProgressProxy& progress_bar = m_progress_bar[index];
if (!progress.total_to_download)
void update_progress_bar(ProgressProxy& progress_bar, const DownloadError& error)
{
progress_bar.activate_spinner();
}
else
{
progress_bar.deactivate_spinner();
}
progress_bar.update_progress(progress.downloaded_size, progress.total_to_download);
progress_bar.set_speed(progress.speed_Bps);
}
void DownloadProgressBar::update_progress_bar(std::size_t index, const DownloadError& error)
{
if (m_options.checking_download)
{
complete_checking_progress_bar(index);
}
else
{
ProgressProxy& progress_bar = m_progress_bar[index];
if (error.transfer.has_value())
{
const int http_status = error.transfer.value().http_status;
@ -122,17 +57,9 @@ namespace mamba
progress_bar.set_full();
progress_bar.mark_as_completed();
}
}
void DownloadProgressBar::update_progress_bar(std::size_t index, const DownloadSuccess& success)
{
if (m_options.checking_download)
void update_progress_bar(ProgressProxy& progress_bar, const DownloadSuccess& success)
{
complete_checking_progress_bar(index);
}
else
{
ProgressProxy& progress_bar = m_progress_bar[index];
if (success.transfer.http_status == 304)
{
auto& r = progress_bar.repr();
@ -165,9 +92,145 @@ namespace mamba
r.print(console_stream, 0, false);
}
}
std::function<void(ProgressBarRepr&)> download_repr()
{
return [](ProgressBarRepr& r)
{
r.current.set_value(fmt::format(
"{:>7}",
to_human_readable_filesize(static_cast<double>(r.progress_bar().current()), 1)
));
std::string total_str;
if (!r.progress_bar().total()
|| (r.progress_bar().total() == std::numeric_limits<std::size_t>::max()))
{
total_str = "??.?MB";
}
else
{
total_str = to_human_readable_filesize(
static_cast<double>(r.progress_bar().total()),
1
);
}
r.total.set_value(fmt::format("{:>7}", total_str));
auto speed = r.progress_bar().speed();
r.speed.set_value(fmt::format(
"@ {:>7}/s",
speed ? to_human_readable_filesize(static_cast<double>(speed), 1) : "??.?MB"
));
r.separator.set_value("/");
};
}
}
void DownloadProgressBar::complete_checking_progress_bar(std::size_t index)
/*********************
* SubdirDataMonitor *
*********************/
SubdirDataMonitor::SubdirDataMonitor(MonitorOptions options)
: m_options(std::move(options))
{
}
void SubdirDataMonitor::reset_options(MonitorOptions options)
{
m_options = std::move(options);
}
bool SubdirDataMonitor::can_monitor(const Context& context)
{
return !(
context.graphics_params.no_progress_bars || context.output_params.json
|| context.output_params.quiet
);
}
void SubdirDataMonitor::observe_impl(MultiDownloadRequest& requests, DownloadOptions& options)
{
m_throttle_time.resize(requests.size(), std::chrono::steady_clock::now());
m_progress_bar.reserve(requests.size());
for (std::size_t i = 0; i < requests.size(); ++i)
{
m_progress_bar.push_back(Console::instance().add_progress_bar(requests[i].name));
m_progress_bar.back().set_repr_hook(download_repr());
if (m_options.checking_download)
{
m_progress_bar.back().repr().postfix.set_value("Checking");
}
requests[i].progress = [this, i](const DownloadEvent& e) { update_progress_bar(i, e); };
}
auto& pbar_manager = Console::instance().progress_bar_manager();
if (!pbar_manager.started())
{
pbar_manager.watch_print();
}
options.on_unexpected_termination = [this]() { on_unexpected_termination(); };
}
void SubdirDataMonitor::on_done_impl()
{
auto& pbar_manager = Console::instance().progress_bar_manager();
if (pbar_manager.started())
{
pbar_manager.terminate();
if (!m_options.no_clear_progress_bar)
{
pbar_manager.clear_progress_bars();
}
}
m_throttle_time.clear();
m_progress_bar.clear();
m_options = MonitorOptions{};
}
void SubdirDataMonitor::on_unexpected_termination_impl()
{
Console::instance().progress_bar_manager().terminate();
}
void SubdirDataMonitor::update_progress_bar(std::size_t index, const DownloadEvent& event)
{
std::visit([this, index](auto&& arg) { update_progress_bar(index, arg); }, event);
}
void SubdirDataMonitor::update_progress_bar(std::size_t index, const DownloadProgress& progress)
{
mamba::update_progress_bar(m_progress_bar[index], m_throttle_time[index], progress);
}
void SubdirDataMonitor::update_progress_bar(std::size_t index, const DownloadError& error)
{
if (m_options.checking_download)
{
complete_checking_progress_bar(index);
}
else
{
mamba::update_progress_bar(m_progress_bar[index], error);
}
}
void SubdirDataMonitor::update_progress_bar(std::size_t index, const DownloadSuccess& success)
{
if (m_options.checking_download)
{
complete_checking_progress_bar(index);
}
else
{
mamba::update_progress_bar(m_progress_bar[index], success);
}
}
void SubdirDataMonitor::complete_checking_progress_bar(std::size_t index)
{
ProgressProxy& progress_bar = m_progress_bar[index];
progress_bar.repr().postfix.set_value("Checked");
@ -176,35 +239,263 @@ namespace mamba
progress_bar.mark_as_completed();
}
std::function<void(ProgressBarRepr&)> DownloadProgressBar::download_repr(std::size_t index)
/**************************
* PackageDownloadMonitor *
**************************/
bool PackageDownloadMonitor::can_monitor(const Context& context)
{
return [this, index](ProgressBarRepr& r)
return SubdirDataMonitor::can_monitor(context);
}
PackageDownloadMonitor::~PackageDownloadMonitor()
{
end_monitoring();
}
void PackageDownloadMonitor::observe(
MultiDownloadRequest& dl_requests,
std::vector<PackageExtractTask>& extract_tasks,
DownloadOptions& options
)
{
assert(extract_tasks.size() >= dl_requests.size());
auto& pbar_manager = Console::instance().init_progress_bar_manager(ProgressBarMode::aggregated
);
m_extract_bar.reserve(extract_tasks.size());
m_throttle_time.resize(dl_requests.size(), std::chrono::steady_clock::now());
m_download_bar.reserve(dl_requests.size());
for (size_t i = 0; i < extract_tasks.size(); ++i)
{
ProgressProxy& progress_bar = m_progress_bar[index];
r.current.set_value(fmt::format(
"{:>7}",
to_human_readable_filesize(static_cast<double>(progress_bar.current()), 1)
));
m_extract_bar.push_back(Console::instance().add_progress_bar(extract_tasks[i].name(), 1));
init_extract_bar(m_extract_bar.back());
extract_tasks[i].set_progress_callback([=](PackageExtractEvent e)
{ update_extract_bar(i, e); });
std::string total_str;
if (!progress_bar.total()
|| (progress_bar.total() == std::numeric_limits<std::size_t>::max()))
if (i < dl_requests.size())
{
total_str = "??.?MB";
assert(extract_tasks[i].needs_download());
m_download_bar.push_back(Console::instance().add_progress_bar(dl_requests[i].name));
init_download_bar(m_download_bar.back());
dl_requests[i].progress = [this, i](const DownloadEvent& e)
{ update_progress_bar(i, e); };
}
else
}
init_aggregated_download();
init_aggregated_extract();
pbar_manager.start();
pbar_manager.watch_print();
options.on_unexpected_termination = [this]() { on_unexpected_termination(); };
}
void PackageDownloadMonitor::end_monitoring()
{
auto& pbar_manager = Console::instance().progress_bar_manager();
if (pbar_manager.started())
{
pbar_manager.terminate();
}
m_throttle_time.clear();
m_download_bar.clear();
m_extract_bar.clear();
}
void PackageDownloadMonitor::init_extract_bar(ProgressProxy& extract_bar)
{
extract_bar.activate_spinner();
extract_bar.set_progress_hook(
[](ProgressProxy& bar) -> void
{
total_str = to_human_readable_filesize(static_cast<double>(progress_bar.total()), 1);
if (bar.started())
{
bar.set_progress(0, 1);
}
}
r.total.set_value(fmt::format("{:>7}", total_str));
);
extract_bar.set_repr_hook(
[](ProgressBarRepr& r) -> void
{
if (r.progress_bar().started())
{
r.postfix.set_value("Extracting");
}
else
{
r.postfix.set_value("Extracted");
}
}
);
Console::instance().progress_bar_manager().add_label("Extract", extract_bar);
}
auto speed = progress_bar.speed();
r.speed.set_value(fmt::format(
"@ {:>7}/s",
speed ? to_human_readable_filesize(static_cast<double>(speed), 1) : "??.?MB"
));
void PackageDownloadMonitor::init_download_bar(ProgressProxy& download_bar)
{
download_bar.set_repr_hook(download_repr());
Console::instance().progress_bar_manager().add_label("Download", download_bar);
}
r.separator.set_value("/");
};
void PackageDownloadMonitor::init_aggregated_extract()
{
auto& pbar_manager = static_cast<AggregatedBarManager&>(
Console::instance().progress_bar_manager()
);
auto* extract_bar = pbar_manager.aggregated_bar("Extract");
if (extract_bar)
{
// lambda capture required because we are calling non const methods
// on extract_bar.
extract_bar->set_repr_hook(
[extract_bar](ProgressBarRepr& repr) -> void
{
auto active_tasks = extract_bar->active_tasks().size();
if (active_tasks == 0)
{
repr.prefix.set_value(fmt::format("{:<16}", "Extracting"));
repr.postfix.set_value(fmt::format("{:<25}", ""));
}
else
{
repr.prefix.set_value(
fmt::format("{:<11} {:>4}", "Extracting", fmt::format("({})", active_tasks))
);
repr.postfix.set_value(fmt::format("{:<25}", extract_bar->last_active_task()));
}
repr.current.set_value(fmt::format("{:>3}", extract_bar->current()));
repr.separator.set_value("/");
std::string total_str;
if (extract_bar->total() == std::numeric_limits<std::size_t>::max())
{
total_str = "?";
}
else
{
total_str = std::to_string(extract_bar->total());
}
repr.total.set_value(fmt::format("{:>3}", total_str));
}
);
}
}
void PackageDownloadMonitor::init_aggregated_download()
{
auto& pbar_manager = static_cast<AggregatedBarManager&>(
Console::instance().progress_bar_manager()
);
auto* dl_bar = pbar_manager.aggregated_bar("Download");
if (dl_bar)
{
// lambda capture required because we are calling non const methods
// on dl_bar.
dl_bar->set_repr_hook(
[dl_bar](ProgressBarRepr& repr) -> void
{
auto active_tasks = dl_bar->active_tasks().size();
if (active_tasks == 0)
{
repr.prefix.set_value(fmt::format("{:<16}", "Downloading"));
repr.postfix.set_value(fmt::format("{:<25}", ""));
}
else
{
repr.prefix.set_value(
fmt::format("{:<11} {:>4}", "Downloading", fmt::format("({})", active_tasks))
);
repr.postfix.set_value(fmt::format("{:<25}", dl_bar->last_active_task()));
}
repr.current.set_value(
fmt::format("{:>7}", to_human_readable_filesize(double(dl_bar->current()), 1))
);
repr.separator.set_value("/");
std::string total_str;
if (dl_bar->total() == std::numeric_limits<std::size_t>::max())
{
total_str = "??.?MB";
}
else
{
total_str = to_human_readable_filesize(double(dl_bar->total()), 1);
}
repr.total.set_value(fmt::format("{:>7}", total_str));
auto speed = dl_bar->avg_speed(std::chrono::milliseconds(500));
repr.speed.set_value(
speed ? fmt::format("@ {:>7}/s", to_human_readable_filesize(double(speed), 1))
: ""
);
}
);
}
}
void PackageDownloadMonitor::update_extract_bar(std::size_t index, PackageExtractEvent event)
{
switch (event)
{
case PackageExtractEvent::validate_update:
m_extract_bar[index].set_postfix("validating");
break;
case PackageExtractEvent::validate_success:
m_extract_bar[index].set_postfix("validated");
break;
case PackageExtractEvent::validate_failure:
m_extract_bar[index].set_postfix("validation failed");
break;
case PackageExtractEvent::extract_update:
m_extract_bar[index].update_progress(0, 1);
break;
case PackageExtractEvent::extract_success:
m_extract_bar[index].set_full();
m_extract_bar[index].mark_as_completed();
break;
case PackageExtractEvent::extract_failure:
default:
m_extract_bar[index].set_postfix("extraction failed");
m_extract_bar[index].mark_as_completed();
break;
}
}
void PackageDownloadMonitor::observe_impl(MultiDownloadRequest&, DownloadOptions&)
{
// Nothing to do, everything has been initialized in the public observe overload
}
void PackageDownloadMonitor::on_done_impl()
{
// Nothing to do, everything will be done in end_monitoring
}
void PackageDownloadMonitor::on_unexpected_termination_impl()
{
Console::instance().progress_bar_manager().terminate();
}
void PackageDownloadMonitor::update_progress_bar(std::size_t index, const DownloadEvent& event)
{
std::visit([this, index](auto&& arg) { update_progress_bar(index, arg); }, event);
}
void
PackageDownloadMonitor::update_progress_bar(std::size_t index, const DownloadProgress& progress)
{
mamba::update_progress_bar(m_download_bar[index], m_throttle_time[index], progress);
}
void PackageDownloadMonitor::update_progress_bar(std::size_t index, const DownloadError& error)
{
mamba::update_progress_bar(m_download_bar[index], error);
}
void
PackageDownloadMonitor::update_progress_bar(std::size_t index, const DownloadSuccess& success)
{
mamba::update_progress_bar(m_download_bar[index], success);
}
}

View File

@ -1,930 +0,0 @@
// Copyright (c) 2019, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#include <string_view>
#include <spdlog/spdlog.h>
#include "mamba/core/context.hpp"
#include "mamba/core/fetch.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/thread_utils.hpp"
#include "mamba/core/util.hpp"
#include "mamba/util/build.hpp"
#include "mamba/util/string.hpp"
#include "mamba/util/url.hpp"
#include "mamba/util/url_manip.hpp"
#include "compression.hpp"
#include "curl.hpp"
#include "progress_bar_impl.hpp"
namespace mamba
{
/*****************************
* Config and Context params *
*****************************/
void get_config(
const Context& context,
bool& set_low_speed_opt,
bool& set_ssl_no_revoke,
long& connect_timeout_secs,
std::string& ssl_verify
)
{
// Don't know if it's better to store these...
// for now only called twice, and if modified during execution we better not...
// if the request is slower than 30b/s for 60 seconds, cancel.
std::string no_low_speed_limit = std::getenv("MAMBA_NO_LOW_SPEED_LIMIT")
? std::getenv("MAMBA_NO_LOW_SPEED_LIMIT")
: "0";
set_low_speed_opt = (no_low_speed_limit == "0");
std::string ssl_no_revoke_env = std::getenv("MAMBA_SSL_NO_REVOKE")
? std::getenv("MAMBA_SSL_NO_REVOKE")
: "0";
set_ssl_no_revoke = (context.remote_fetch_params.ssl_no_revoke || (ssl_no_revoke_env != "0"));
connect_timeout_secs = context.remote_fetch_params.connect_timeout_secs;
ssl_verify = context.remote_fetch_params.ssl_verify;
}
std::size_t get_default_retry_timeout(const Context& context)
{
return static_cast<std::size_t>(context.remote_fetch_params.retry_timeout);
}
/*********************************
* DownloadTarget implementation *
*********************************/
DownloadTarget::DownloadTarget(
Context& context,
const std::string& name,
const std::string& url,
const std::string& filename
)
: m_context(context)
, m_name(name)
, m_filename(filename)
, m_url(util::file_uri_unc2_to_unc4(url))
, m_http_status(10000)
, m_downloaded_size(0)
, m_effective_url(nullptr)
, m_expected_size(0)
, m_retry_wait_seconds(get_default_retry_timeout(context))
, m_retries(0)
, m_has_progress_bar(false)
, m_ignore_failure(false)
{
m_curl_handle = std::make_unique<CURLHandle>();
init_curl_ssl();
init_curl_target(m_url);
}
DownloadTarget::~DownloadTarget()
{
}
int
curl_debug_callback(CURL* /* handle */, curl_infotype type, char* data, size_t size, void* userptr)
{
auto* logger = reinterpret_cast<spdlog::logger*>(userptr);
auto log = Console::hide_secrets(std::string_view(data, size));
switch (type)
{
case CURLINFO_TEXT:
logger->info(fmt::format("* {}", log));
break;
case CURLINFO_HEADER_OUT:
logger->info(fmt::format("> {}", log));
break;
case CURLINFO_HEADER_IN:
logger->info(fmt::format("< {}", log));
break;
default:
break;
}
return 0;
}
void DownloadTarget::init_curl_ssl()
{
auto& ctx = m_context;
if (!ctx.remote_fetch_params.curl_initialized)
{
if (ctx.remote_fetch_params.ssl_verify == "<false>")
{
LOG_DEBUG << "'ssl_verify' not activated, skipping cURL SSL init";
ctx.remote_fetch_params.curl_initialized = true;
return;
}
#ifdef LIBMAMBA_STATIC_DEPS
auto init_res = m_curl_handle->get_ssl_backend_info();
switch (init_res.second)
{
case CurlLogLevel::kInfo:
{
LOG_INFO << init_res.first;
break;
}
case CurlLogLevel::kWarning:
{
LOG_WARNING << init_res.first;
break;
}
case CurlLogLevel::kError:
{
LOG_ERROR << init_res.first;
break;
}
}
#endif
if (!ctx.remote_fetch_params.ssl_verify.size()
&& std::getenv("REQUESTS_CA_BUNDLE") != nullptr)
{
ctx.remote_fetch_params.ssl_verify = std::getenv("REQUESTS_CA_BUNDLE");
LOG_INFO << "Using REQUESTS_CA_BUNDLE " << ctx.remote_fetch_params.ssl_verify;
}
else if (ctx.remote_fetch_params.ssl_verify == "<system>" && util::on_linux)
{
std::array<std::string, 6> cert_locations{
"/etc/ssl/certs/ca-certificates.crt", // Debian/Ubuntu/Gentoo etc.
"/etc/pki/tls/certs/ca-bundle.crt", // Fedora/RHEL 6
"/etc/ssl/ca-bundle.pem", // OpenSUSE
"/etc/pki/tls/cacert.pem", // OpenELEC
"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", // CentOS/RHEL 7
"/etc/ssl/cert.pem", // Alpine Linux
};
bool found = false;
for (const auto& loc : cert_locations)
{
if (fs::exists(loc))
{
ctx.remote_fetch_params.ssl_verify = loc;
found = true;
}
}
if (!found)
{
LOG_ERROR << "No CA certificates found on system";
throw std::runtime_error("Aborting.");
}
}
ctx.remote_fetch_params.curl_initialized = true;
}
}
void DownloadTarget::init_curl_target(const std::string& url)
{
// Get config
bool set_low_speed_opt, set_ssl_no_revoke;
long connect_timeout_secs;
std::string ssl_verify;
get_config(m_context, set_low_speed_opt, set_ssl_no_revoke, connect_timeout_secs, ssl_verify);
// Configure curl handle
m_curl_handle->configure_handle(
url,
set_low_speed_opt,
connect_timeout_secs,
set_ssl_no_revoke,
proxy_match(url, context().remote_fetch_params.proxy_servers),
ssl_verify
);
m_curl_handle->set_opt(CURLOPT_HEADERFUNCTION, &DownloadTarget::header_callback);
m_curl_handle->set_opt(CURLOPT_HEADERDATA, this);
if (util::ends_with(url, ".json.zst"))
{
m_zstd_stream = std::make_unique<ZstdStream>(&DownloadTarget::write_callback, this);
if (util::ends_with(m_filename, ".zst"))
{
m_filename = m_filename.substr(0, m_filename.size() - 4);
}
m_curl_handle->set_opt(CURLOPT_WRITEFUNCTION, ZstdStream::write_callback);
m_curl_handle->set_opt(CURLOPT_WRITEDATA, m_zstd_stream.get());
}
else if (util::ends_with(url, ".json.bz2"))
{
m_bzip2_stream = std::make_unique<Bzip2Stream>(&DownloadTarget::write_callback, this);
if (util::ends_with(m_filename, ".bz2"))
{
m_filename = m_filename.substr(0, m_filename.size() - 4);
}
m_curl_handle->set_opt(CURLOPT_WRITEFUNCTION, Bzip2Stream::write_callback);
m_curl_handle->set_opt(CURLOPT_WRITEDATA, m_bzip2_stream.get());
}
else
{
m_curl_handle->set_opt(CURLOPT_WRITEFUNCTION, &DownloadTarget::write_callback);
m_curl_handle->set_opt(CURLOPT_WRITEDATA, this);
}
if (util::ends_with(url, ".json"))
{
// accept all encodings supported by the libcurl build
m_curl_handle->set_opt(CURLOPT_ACCEPT_ENCODING, "");
m_curl_handle->add_header("Content-Type: application/json");
}
std::string user_agent = fmt::format(
"User-Agent: {} {}",
m_context.remote_fetch_params.user_agent,
curl_version()
);
m_curl_handle->add_header(user_agent);
m_curl_handle->set_opt_header();
m_curl_handle->set_opt(CURLOPT_VERBOSE, m_context.output_params.verbosity >= 2);
// get url host
const auto url_parsed = util::URL::parse(url);
auto host = url_parsed.host();
const auto port = url_parsed.port();
if (port.size())
{
host += ":" + port;
}
if (m_context.authentication_info().count(host))
{
const auto& auth = m_context.authentication_info().at(host);
if (std::holds_alternative<specs::BearerToken>(auth))
{
m_curl_handle->add_header(
fmt::format("Authorization: Bearer {}", std::get<specs::BearerToken>(auth).token)
);
}
}
auto logger = spdlog::get("libcurl");
m_curl_handle->set_opt(CURLOPT_DEBUGFUNCTION, curl_debug_callback);
m_curl_handle->set_opt(CURLOPT_DEBUGDATA, logger.get());
}
bool DownloadTarget::can_retry()
{
if (!m_curl_handle->can_proceed())
{
return false;
}
return m_retries < size_t(m_context.remote_fetch_params.max_retries)
&& (m_http_status == 413 || m_http_status == 429 || m_http_status >= 500)
&& !util::starts_with(m_url, "file://");
}
bool DownloadTarget::retry()
{
auto now = std::chrono::steady_clock::now();
if (now >= m_next_retry)
{
if (m_file.is_open())
{
m_file.close();
}
if (fs::exists(m_filename))
{
fs::remove(m_filename);
}
init_curl_target(m_url);
if (m_has_progress_bar)
{
m_curl_handle->set_opt(CURLOPT_XFERINFOFUNCTION, &DownloadTarget::progress_callback);
m_curl_handle->set_opt(CURLOPT_XFERINFODATA, this);
}
m_retry_wait_seconds = m_retry_wait_seconds
* static_cast<std::size_t>(m_context.remote_fetch_params.retry_backoff
);
m_next_retry = now + std::chrono::seconds(m_retry_wait_seconds);
m_retries++;
return true;
}
else
{
return false;
}
}
size_t DownloadTarget::write_callback(char* ptr, size_t size, size_t nmemb, void* self)
{
auto* s = reinterpret_cast<DownloadTarget*>(self);
auto expected_write_size = size * nmemb;
if (!s->m_file.is_open())
{
s->m_file = open_ofstream(s->m_filename, std::ios::binary);
if (!s->m_file)
{
LOG_ERROR << "Could not open file for download " << s->m_filename << ": "
<< strerror(errno);
// Return a size _different_ than the expected write size to signal an error
return expected_write_size + 1;
}
}
s->m_file.write(ptr, static_cast<std::streamsize>(expected_write_size));
if (!s->m_file)
{
LOG_ERROR << "Could not write to file " << s->m_filename << ": " << strerror(errno);
// Return a size _different_ than the expected write size to signal an error
return expected_write_size + 1;
}
return expected_write_size;
}
size_t DownloadTarget::header_callback(char* buffer, size_t size, size_t nitems, void* self)
{
auto* s = reinterpret_cast<DownloadTarget*>(self);
std::string_view header(buffer, size * nitems);
auto colon_idx = header.find(':');
if (colon_idx != std::string_view::npos)
{
std::string_view key, value;
key = header.substr(0, colon_idx);
colon_idx++;
// remove spaces
while (std::isspace(header[colon_idx]))
{
++colon_idx;
}
// remove \r\n header ending
auto header_end = header.find_first_of("\r\n");
value = header.substr(colon_idx, (header_end > colon_idx) ? header_end - colon_idx : 0);
// http headers are case insensitive!
std::string lkey = util::to_lower(key);
if (lkey == "etag")
{
s->m_etag = value;
}
else if (lkey == "cache-control")
{
s->m_cache_control = value;
}
else if (lkey == "last-modified")
{
s->m_mod = value;
}
}
return nitems * size;
}
std::function<void(ProgressBarRepr&)> DownloadTarget::download_repr()
{
return [&](ProgressBarRepr& r) -> void
{
r.current.set_value(fmt::format(
"{:>7}",
to_human_readable_filesize(static_cast<double>(m_progress_bar.current()), 1)
));
std::string total_str;
if (!m_progress_bar.total()
|| (m_progress_bar.total() == std::numeric_limits<std::size_t>::max()))
{
total_str = "??.?MB";
}
else
{
total_str = to_human_readable_filesize(static_cast<double>(m_progress_bar.total()), 1);
}
r.total.set_value(fmt::format("{:>7}", total_str));
auto speed = m_progress_bar.speed();
r.speed.set_value(fmt::format(
"@ {:>7}/s",
speed ? to_human_readable_filesize(static_cast<double>(speed), 1) : "??.?MB"
));
r.separator.set_value("/");
};
}
std::chrono::steady_clock::time_point DownloadTarget::progress_throttle_time() const
{
return m_progress_throttle_time;
}
void DownloadTarget::set_progress_throttle_time(const std::chrono::steady_clock::time_point& time)
{
m_progress_throttle_time = time;
}
int DownloadTarget::progress_callback(
void* f,
curl_off_t total_to_download,
curl_off_t now_downloaded,
curl_off_t,
curl_off_t
)
{
auto* target = static_cast<DownloadTarget*>(f);
auto now = std::chrono::steady_clock::now();
if (now - target->progress_throttle_time() < std::chrono::milliseconds(50))
{
return 0;
}
else
{
target->set_progress_throttle_time(now);
}
if (!total_to_download && !target->get_expected_size())
{
target->m_progress_bar.activate_spinner();
}
else
{
target->m_progress_bar.deactivate_spinner();
}
if (!total_to_download && target->get_expected_size())
{
target->m_progress_bar.update_current(static_cast<std::size_t>(now_downloaded));
}
else
{
target->m_progress_bar.update_progress(
static_cast<std::size_t>(now_downloaded),
static_cast<std::size_t>(total_to_download)
);
}
target->m_progress_bar.set_speed(target->get_speed());
return 0;
}
void DownloadTarget::set_mod_etag_headers(const std::string& lmod, const std::string& letag)
{
auto to_header = [](const std::string& key, const std::string& value)
{ return key + ": " + value; };
if (!letag.empty())
{
m_curl_handle->add_header(to_header("If-None-Match", letag));
}
if (!lmod.empty())
{
m_curl_handle->add_header(to_header("If-Modified-Since", lmod));
}
}
void DownloadTarget::set_progress_bar(ProgressProxy progress_proxy)
{
m_has_progress_bar = true;
m_progress_bar = progress_proxy;
m_progress_bar.set_repr_hook(download_repr());
m_curl_handle->set_opt(CURLOPT_XFERINFOFUNCTION, &DownloadTarget::progress_callback);
m_curl_handle->set_opt(CURLOPT_XFERINFODATA, this);
m_curl_handle->set_opt(CURLOPT_NOPROGRESS, 0L);
}
void DownloadTarget::set_expected_size(std::size_t size)
{
m_expected_size = size;
}
void DownloadTarget::set_head_only(bool yes)
{
m_curl_handle->set_opt(CURLOPT_NOBODY, yes);
}
const std::string& DownloadTarget::get_name() const
{
return m_name;
}
const std::string& DownloadTarget::get_url() const
{
return m_url;
}
const std::string& DownloadTarget::get_etag() const
{
return m_etag;
}
const std::string& DownloadTarget::get_mod() const
{
return m_mod;
}
const std::string& DownloadTarget::get_cache_control() const
{
return m_cache_control;
}
std::size_t DownloadTarget::get_expected_size() const
{
return m_expected_size;
}
int DownloadTarget::get_http_status() const
{
return m_http_status;
}
std::size_t DownloadTarget::get_downloaded_size() const
{
return m_downloaded_size;
}
std::size_t DownloadTarget::get_speed()
{
auto speed = m_curl_handle->get_info<std::size_t>(CURLINFO_SPEED_DOWNLOAD_T);
// TODO Should we just drop all code below with progress_bar and use value_or(0) in get_info
// above instead?
if (!speed.has_value())
{
if (m_has_progress_bar)
{
return m_progress_bar.avg_speed();
}
else
{
return 0;
}
}
return speed.value();
}
bool DownloadTarget::resource_exists()
{
init_curl_ssl();
bool set_low_speed_opt, set_ssl_no_revoke;
long connect_timeout_secs;
std::string ssl_verify;
get_config(m_context, set_low_speed_opt, set_ssl_no_revoke, connect_timeout_secs, ssl_verify);
return curl::check_resource_exists(
m_url,
set_low_speed_opt,
connect_timeout_secs,
set_ssl_no_revoke,
proxy_match(m_url, context().remote_fetch_params.proxy_servers),
ssl_verify
);
}
bool DownloadTarget::perform()
{
LOG_INFO << "Downloading to filename: " << m_filename;
m_curl_handle->perform();
return (check_result() && finalize());
}
bool DownloadTarget::check_result()
{
if (!m_curl_handle->is_curl_res_ok())
{
std::stringstream err;
err << "Download error (" << m_curl_handle->get_result() << ") "
<< m_curl_handle->get_res_error() << " [" << m_curl_handle->get_curl_effective_url()
<< "]\n";
if (m_curl_handle->get_error_buffer()[0] != '\0')
{
err << m_curl_handle->get_error_buffer();
}
LOG_INFO << err.str();
m_next_retry = std::chrono::steady_clock::now()
+ std::chrono::seconds(m_retry_wait_seconds);
if (m_has_progress_bar)
{
m_progress_bar.update_progress(0, 1);
// m_progress_bar.set_elapsed_time();
m_progress_bar.set_postfix(m_curl_handle->get_res_error());
}
if (!m_ignore_failure && !can_retry())
{
throw std::runtime_error(err.str());
}
return false;
}
else
{
return true;
}
}
std::size_t DownloadTarget::get_result() const
{
return m_curl_handle->get_result();
}
void DownloadTarget::set_result(CURLcode res)
{
m_curl_handle->set_result(res);
}
bool DownloadTarget::finalize()
{
auto avg_speed = get_speed();
m_http_status = m_curl_handle->get_info<int>(CURLINFO_RESPONSE_CODE).value_or(10000);
m_effective_url = m_curl_handle->get_info<char*>(CURLINFO_EFFECTIVE_URL).value();
m_downloaded_size = m_curl_handle->get_info<std::size_t>(CURLINFO_SIZE_DOWNLOAD_T).value_or(0);
LOG_INFO << get_transfer_msg();
if (can_retry())
{
// this request didn't work!
// respect Retry-After header if present, otherwise use default timeout
m_retry_wait_seconds = m_curl_handle->get_info<std::size_t>(CURLINFO_RETRY_AFTER)
.value_or(0);
if (!m_retry_wait_seconds)
{
m_retry_wait_seconds = get_default_retry_timeout(m_context);
}
m_next_retry = std::chrono::steady_clock::now()
+ std::chrono::seconds(m_retry_wait_seconds);
std::stringstream msg;
msg << "Failed (" << m_http_status << "), retry in " << m_retry_wait_seconds << "s";
if (m_has_progress_bar)
{
m_progress_bar.update_progress(0, m_downloaded_size);
m_progress_bar.set_postfix(msg.str());
}
return false;
}
m_file.close();
if (m_has_progress_bar)
{
m_progress_bar.set_speed(avg_speed);
m_progress_bar.set_total(m_downloaded_size);
m_progress_bar.set_full();
m_progress_bar.set_postfix("Downloaded");
}
bool ret = true;
if (m_finalize_callback)
{
ret = m_finalize_callback(*this);
}
else
{
if (m_has_progress_bar)
{
m_progress_bar.mark_as_completed();
}
else
{
Console::instance().print(m_name + " completed");
}
}
if (m_has_progress_bar)
{
// make sure total value is up-to-date
m_progress_bar.update_repr(false);
// select field to display and make sure they are
// properly set if not yet printed by the progress bar manager
ProgressBarRepr r = m_progress_bar.repr();
r.prefix.set_format("{:<50}", 50);
r.progress.deactivate();
r.current.deactivate();
r.separator.deactivate();
auto console_stream = Console::stream();
r.print(console_stream, 0, false);
}
return ret;
}
std::string DownloadTarget::get_transfer_msg()
{
std::stringstream ss;
ss << "Transfer finalized, status: " << m_http_status << " [" << m_effective_url << "] "
<< m_downloaded_size << " bytes";
return ss.str();
}
const CURLHandle& DownloadTarget::get_curl_handle() const
{
return *m_curl_handle;
}
/**************************************
* MultiDownloadTarget implementation *
**************************************/
MultiDownloadTarget::MultiDownloadTarget(const Context& context)
: m_context(context)
{
p_curl_handle = std::make_unique<CURLMultiHandle>(m_context.threads_params.download_threads);
}
MultiDownloadTarget::~MultiDownloadTarget() = default;
void MultiDownloadTarget::add(DownloadTarget* target)
{
if (!target)
{
return;
}
if (&target->context() != &m_context)
{
throw std::invalid_argument(
"DownloadTarget's context is not the same instance as MultiDownloadTarget's context"
);
}
p_curl_handle->add_handle(target->get_curl_handle());
m_targets.push_back(target);
}
bool MultiDownloadTarget::check_msgs(bool failfast)
{
while (auto resp = p_curl_handle->pop_message())
{
const auto& msg = resp.value();
if (!msg.m_transfer_done)
{
// We are only interested in messages about finished transfers
continue;
}
DownloadTarget* current_target = nullptr;
for (const auto& target : m_targets)
{
if (target->get_curl_handle().get_id() == msg.m_handle_id)
{
current_target = target;
break;
}
}
if (!current_target)
{
throw std::runtime_error("Could not find target associated with multi request");
}
current_target->set_result(msg.m_transfer_result);
if (!current_target->check_result() && current_target->can_retry())
{
p_curl_handle->remove_handle(current_target->get_curl_handle());
m_retry_targets.push_back(current_target);
}
else
{
LOG_INFO << "Transfer done for '" << current_target->get_name() << "'";
// We are only interested in messages about finished transfers
p_curl_handle->remove_handle(current_target->get_curl_handle());
// flush file & finalize transfer
if (!current_target->finalize())
{
// transfer did not work! can we retry?
if (current_target->can_retry())
{
LOG_INFO << "Setting retry for '" << current_target->get_name() << "'";
m_retry_targets.push_back(current_target);
}
else
{
if (failfast && current_target->get_ignore_failure() == false)
{
throw std::runtime_error(
"Multi-download failed. Reason: " + current_target->get_transfer_msg()
);
}
}
}
}
}
return true;
}
bool MultiDownloadTarget::download(int options)
{
bool failfast = options & MAMBA_DOWNLOAD_FAILFAST;
bool sort = options & MAMBA_DOWNLOAD_SORT;
bool no_clear_progress_bars = options & MAMBA_NO_CLEAR_PROGRESS_BARS;
auto& ctx = m_context;
if (m_targets.empty())
{
LOG_INFO << "All targets to download are cached";
return true;
}
if (sort)
{
std::sort(
m_targets.begin(),
m_targets.end(),
[](DownloadTarget* a, DownloadTarget* b) -> bool
{ return a->get_expected_size() > b->get_expected_size(); }
);
}
LOG_INFO << "Starting to download targets";
auto& pbar_manager = Console::instance().progress_bar_manager();
interruption_guard g([]() { Console::instance().progress_bar_manager().terminate(); });
// be sure the progress bar manager was not already started
// it would mean this code is part of a larger process using progress bars
bool pbar_manager_started = pbar_manager.started();
if (!(ctx.graphics_params.no_progress_bars || ctx.output_params.json
|| ctx.output_params.quiet || pbar_manager_started))
{
pbar_manager.watch_print();
}
std::size_t still_running = size_t(0);
std::size_t repeats = 0;
do
{
still_running = p_curl_handle->perform();
check_msgs(failfast);
if (!m_retry_targets.empty())
{
auto it = m_retry_targets.begin();
while (it != m_retry_targets.end())
{
if ((*it)->retry())
{
p_curl_handle->add_handle((*it)->get_curl_handle());
it = m_retry_targets.erase(it);
still_running = 1;
}
else
{
++it;
}
}
}
std::size_t timeout = p_curl_handle->get_timeout();
if (timeout == 0u)
{
continue;
}
std::size_t numfds = p_curl_handle->wait(timeout);
// 'numfds' being zero means either a timeout or no file descriptors to
// wait for. Try timeout on first occurrence, then assume no file
// descriptors and no file descriptors to wait for means wait for 100
// milliseconds.
if (!numfds)
{
repeats++;
if (repeats > 1)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
else
{
repeats = 0;
}
}
} while ((still_running || !m_retry_targets.empty()) && !is_sig_interrupted());
if (is_sig_interrupted())
{
Console::instance().print("Download interrupted");
return false;
}
if (!(ctx.graphics_params.no_progress_bars || ctx.output_params.json
|| ctx.output_params.quiet || pbar_manager_started))
{
pbar_manager.terminate();
if (!no_clear_progress_bars)
{
pbar_manager.clear_progress_bars();
}
}
return true;
}
} // namespace mamba

View File

@ -1,437 +0,0 @@
// Copyright (c) 2019, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#include <iostream>
#include <fmt/color.h>
#include <fmt/format.h>
#include <fmt/ostream.h>
#include "mamba/core/channel.hpp"
#include "mamba/core/context.hpp"
#include "mamba/core/execution.hpp"
#include "mamba/core/fetch.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/package_cache.hpp"
#include "mamba/core/package_download.hpp"
#include "mamba/core/package_handling.hpp"
#include "mamba/core/progress_bar.hpp"
#include "mamba/core/thread_utils.hpp"
#include "mamba/core/util.hpp"
#include "mamba/core/validate.hpp"
#include "mamba/fs/filesystem.hpp"
#include "mamba/util/string.hpp"
#include "progress_bar_impl.hpp"
namespace mamba
{
/********************************
* PackageDownloadExtractTarget *
********************************/
counting_semaphore DownloadExtractSemaphore::semaphore(0);
std::ptrdiff_t DownloadExtractSemaphore::get_max()
{
return DownloadExtractSemaphore::semaphore.get_max();
}
void DownloadExtractSemaphore::set_max(int value)
{
DownloadExtractSemaphore::semaphore.set_max(value);
}
PackageDownloadExtractTarget::PackageDownloadExtractTarget(
const PackageInfo& pkg_info,
ChannelContext& channel_context
)
: m_finished(false)
, m_package_info(pkg_info)
{
m_filename = pkg_info.fn;
// only do this for micromamba for now
if (channel_context.context().command_params.is_micromamba)
{
m_url = channel_context.make_channel(pkg_info.url).urls(true)[0];
}
else
{
m_url = pkg_info.url;
}
m_name = pkg_info.name;
m_expected_size = pkg_info.size;
m_sha256 = pkg_info.sha256;
m_md5 = pkg_info.md5;
auto& ctx = channel_context.context();
m_has_progress_bars = !(
ctx.graphics_params.no_progress_bars || ctx.output_params.quiet || ctx.output_params.json
);
}
void PackageDownloadExtractTarget::write_repodata_record(const fs::u8path& base_path)
{
fs::u8path repodata_record_path = base_path / "info" / "repodata_record.json";
fs::u8path index_path = base_path / "info" / "index.json";
nlohmann::json index, solvable_json;
std::ifstream index_file = open_ifstream(index_path);
index_file >> index;
solvable_json = m_package_info.json_record();
index.insert(solvable_json.cbegin(), solvable_json.cend());
if (index.find("size") == index.end() || index["size"] == 0)
{
index["size"] = fs::file_size(m_tarball_path);
}
std::ofstream repodata_record(repodata_record_path.std_path());
repodata_record << index.dump(4);
}
static std::mutex urls_txt_mutex;
void PackageDownloadExtractTarget::add_url()
{
std::lock_guard<std::mutex> lock(urls_txt_mutex);
const auto urls_file_path = m_cache_path / "urls.txt";
std::ofstream urls_txt(urls_file_path.std_path(), std::ios::app);
urls_txt << m_url << std::endl;
}
void PackageDownloadExtractTarget::validate()
{
m_validation_result = VALIDATION_RESULT::VALID;
if (m_expected_size && (m_target->get_downloaded_size() != m_expected_size))
{
LOG_ERROR << "File not valid: file size doesn't match expectation " << m_tarball_path
<< "\nExpected: " << m_expected_size
<< "\nActual: " << m_target->get_downloaded_size() << "\n";
if (m_has_progress_bars)
{
m_download_bar.set_postfix("validation failed");
m_download_bar.mark_as_completed();
}
Console::instance().print(m_filename + " tarball has incorrect size");
m_validation_result = SIZE_ERROR;
return;
}
interruption_point();
if (!m_sha256.empty())
{
auto sha256sum = validation::sha256sum(m_tarball_path);
if (m_sha256 != sha256sum)
{
m_validation_result = SHA256_ERROR;
if (m_has_progress_bars)
{
m_download_bar.set_postfix("validation failed");
m_download_bar.mark_as_completed();
}
Console::instance().print(m_filename + " tarball has incorrect checksum");
LOG_ERROR << "File not valid: SHA256 sum doesn't match expectation " << m_tarball_path
<< "\nExpected: " << m_sha256 << "\nActual: " << sha256sum << "\n";
}
return;
}
if (!m_md5.empty())
{
auto md5sum = validation::md5sum(m_tarball_path);
if (m_md5 != md5sum)
{
m_validation_result = MD5SUM_ERROR;
if (m_has_progress_bars)
{
m_download_bar.set_postfix("validation failed");
m_download_bar.mark_as_completed();
}
Console::instance().print(m_filename + " tarball has incorrect checksum");
LOG_ERROR << "File not valid: MD5 sum doesn't match expectation " << m_tarball_path
<< "\nExpected: " << m_md5 << "\nActual: " << md5sum << "\n";
}
}
}
std::function<void(ProgressBarRepr&)> PackageDownloadExtractTarget::extract_repr()
{
return [&](ProgressBarRepr& r) -> void
{
if (r.progress_bar().started())
{
r.postfix.set_value("Extracting");
}
else
{
r.postfix.set_value("Extracted");
}
};
}
std::function<void(ProgressProxy&)> PackageDownloadExtractTarget::extract_progress_callback()
{
return [&](ProgressProxy& bar) -> void
{
if (bar.started())
{
bar.set_progress(0, 1);
}
};
}
bool PackageDownloadExtractTarget::extract(const Context& context)
{
// Extracting is __not__ yet thread safe it seems...
interruption_point();
if (m_has_progress_bars)
{
m_extract_bar.start();
}
LOG_DEBUG << "Waiting for decompression " << m_tarball_path;
if (m_has_progress_bars)
{
m_extract_bar.update_progress(0, 1);
}
{
std::lock_guard<counting_semaphore> lock(DownloadExtractSemaphore::semaphore);
interruption_point();
LOG_DEBUG << "Decompressing '" << m_tarball_path.string() << "'";
fs::u8path extract_path;
try
{
std::string fn = m_filename;
if (util::ends_with(fn, ".tar.bz2"))
{
fn = fn.substr(0, fn.size() - 8);
}
else if (util::ends_with(fn, ".conda"))
{
fn = fn.substr(0, fn.size() - 6);
}
else
{
LOG_ERROR << "Unknown package format '" << m_filename << "'";
throw std::runtime_error("Unknown package format.");
}
// Be sure the first writable cache doesn't contain invalid extracted package
extract_path = m_cache_path / fn;
if (fs::exists(extract_path))
{
LOG_DEBUG << "Removing '" << extract_path.string()
<< "' before extracting it again";
fs::remove_all(extract_path);
}
const auto extract_options = mamba::ExtractOptions::from_context(context);
// Use non-subproc version if concurrency is disabled to avoid
// any potential subprocess issues
if (DownloadExtractSemaphore::get_max() == 1)
{
mamba::extract(m_tarball_path, extract_path, extract_options);
}
else
{
mamba::extract_subproc(m_tarball_path, extract_path, extract_options);
}
// mamba::extract(m_tarball_path, extract_path);
interruption_point();
LOG_DEBUG << "Extracted to '" << extract_path.string() << "'";
write_repodata_record(extract_path);
add_url();
if (m_has_progress_bars)
{
m_extract_bar.set_full();
m_extract_bar.mark_as_completed();
}
}
catch (std::exception& e)
{
Console::instance().print(m_filename + " extraction failed");
LOG_ERROR << "Error when extracting package: " << e.what();
m_decompress_exception = e;
m_validation_result = VALIDATION_RESULT::EXTRACT_ERROR;
if (m_has_progress_bars)
{
m_extract_bar.set_postfix("extraction failed");
m_extract_bar.mark_as_completed();
}
return false;
}
}
return true;
}
bool PackageDownloadExtractTarget::extract_from_cache(const Context& context)
{
this->extract(context);
m_finished = true;
return true;
}
bool PackageDownloadExtractTarget::validate_extract(const Context& context)
{
using std::chrono::nanoseconds;
if (m_has_progress_bars)
{
m_extract_bar.start();
m_extract_bar.set_postfix("validating");
}
validate();
// Validation
if (m_validation_result != VALIDATION_RESULT::VALID)
{
if (m_has_progress_bars)
{
m_extract_bar.set_postfix("validation failed");
}
LOG_WARNING << "'" << m_tarball_path.string() << "' validation failed";
// abort here, but set finished to true
m_finished = true;
return true;
}
if (m_has_progress_bars)
{
m_extract_bar.set_postfix("validated");
}
LOG_DEBUG << "'" << m_tarball_path.string() << "' successfully validated";
bool result = this->extract(context);
m_finished = true;
return result;
}
bool PackageDownloadExtractTarget::finalize_callback(const DownloadTarget& target)
{
if (m_has_progress_bars)
{
m_download_bar.repr().postfix.set_value("Downloaded").deactivate();
m_download_bar.mark_as_completed();
}
if (m_target->get_http_status() >= 400)
{
LOG_ERROR << "Failed to download package from " << m_url << " (status "
<< m_target->get_http_status() << ")";
m_validation_result = VALIDATION_RESULT::UNDEFINED;
return false;
}
LOG_INFO << "Download finished, validating '" << m_tarball_path.string() << "'";
MainExecutor::instance().schedule(
m_tasksync.synchronized([&] { validate_extract(target.context()); })
);
return true;
}
bool PackageDownloadExtractTarget::finished()
{
return m_finished;
}
auto PackageDownloadExtractTarget::validation_result() const -> VALIDATION_RESULT
{
return m_validation_result;
}
void PackageDownloadExtractTarget::clear_cache() const
{
fs::remove_all(m_tarball_path);
fs::u8path dest_dir = strip_package_extension(m_tarball_path.string());
if (fs::exists(dest_dir))
{
fs::remove_all(dest_dir);
}
}
const std::string& PackageDownloadExtractTarget::name() const
{
return m_name;
}
std::size_t PackageDownloadExtractTarget::expected_size() const
{
return m_expected_size;
}
// todo remove cache from this interface
DownloadTarget* PackageDownloadExtractTarget::target(Context& context, MultiPackageCache& caches)
{
// tarball can be removed, it's fine if only the correct dest dir exists
// 1. If there is extracted cache, use it, otherwise next.
// 2. If there is valid tarball, extract it, otherwise next.
// 3. Run the full download pipeline.
fs::u8path extracted_cache = caches.get_extracted_dir_path(m_package_info);
if (extracted_cache.empty())
{
fs::u8path tarball_cache = caches.get_tarball_path(m_package_info);
// Compute the first writable cache and clean its status for the current package
caches.first_writable_cache(true).clear_query_cache(m_package_info);
m_cache_path = caches.first_writable_path();
if (m_has_progress_bars)
{
m_extract_bar = Console::instance().add_progress_bar(m_name, 1);
m_extract_bar.activate_spinner();
m_extract_bar.set_progress_hook(extract_progress_callback());
m_extract_bar.set_repr_hook(extract_repr());
Console::instance().progress_bar_manager().add_label("Extract", m_extract_bar);
}
if (!tarball_cache.empty())
{
LOG_DEBUG << "Found valid tarball cache at '" << tarball_cache.string() << "'";
m_tarball_path = tarball_cache / m_filename;
m_validation_result = VALIDATION_RESULT::VALID;
MainExecutor::instance().schedule(
m_tasksync.synchronized([&] { extract_from_cache(context); })
);
LOG_DEBUG << "Using cached tarball '" << m_filename << "'";
return nullptr;
}
else
{
caches.clear_query_cache(m_package_info);
// need to download this file
LOG_DEBUG << "Adding '" << m_name << "' to download targets from '" << m_url << "'";
m_tarball_path = m_cache_path / m_filename;
m_target = std::make_unique<DownloadTarget>(
context,
m_name,
m_url,
m_tarball_path.string()
);
m_target->set_finalize_callback(&PackageDownloadExtractTarget::finalize_callback, this);
m_target->set_expected_size(m_expected_size);
if (m_has_progress_bars)
{
m_download_bar = Console::instance().add_progress_bar(m_name, m_expected_size);
m_target->set_progress_bar(m_download_bar);
Console::instance().progress_bar_manager().add_label("Download", m_download_bar);
}
return m_target.get();
}
}
LOG_DEBUG << "Using cached '" << m_name << "'";
m_finished = true;
return nullptr;
}
}

View File

@ -0,0 +1,419 @@
#include "mamba/core/invoke.hpp"
#include "mamba/core/package_fetcher.hpp"
#include "mamba/core/util.hpp"
#include "mamba/core/validate.hpp"
#include "mamba/util/string.hpp"
namespace mamba
{
/**********************
* PackageExtractTask *
**********************/
PackageExtractTask::PackageExtractTask(PackageFetcher* fetcher, ExtractOptions options)
: p_fetcher(fetcher)
, m_options(std::move(options))
{
}
const std::string& PackageExtractTask::name() const
{
return p_fetcher->name();
}
bool PackageExtractTask::needs_download() const
{
return p_fetcher->needs_download();
}
void PackageExtractTask::set_progress_callback(progress_callback_t cb)
{
m_progress_callback = std::move(cb);
}
auto PackageExtractTask::run() -> Result
{
bool is_valid = true;
bool is_extracted = p_fetcher->extract(m_options);
return { is_valid, is_extracted };
}
auto PackageExtractTask::run(std::size_t downloaded_size) -> Result
{
using ValidationResult = PackageFetcher::ValidationResult;
ValidationResult validation_res = p_fetcher->validate(downloaded_size, get_progress_callback());
const bool is_valid = validation_res == ValidationResult::VALID;
bool is_extracted = false;
if (is_valid)
{
is_extracted = p_fetcher->extract(m_options, get_progress_callback());
}
return { is_valid, is_extracted };
}
auto PackageExtractTask::get_progress_callback() -> progress_callback_t*
{
if (m_progress_callback.has_value())
{
return &m_progress_callback.value();
}
else
{
return nullptr;
}
}
/*******************
* PatckageFetcher *
*******************/
PackageFetcher::PackageFetcher(
const PackageInfo& pkg_info,
ChannelContext& channel_context,
MultiPackageCache& caches
)
: m_package_info(pkg_info)
{
// FIXME: only do this for micromamba for now
if (channel_context.context().command_params.is_micromamba)
{
m_url = channel_context.make_channel(pkg_info.url).urls(true)[0];
}
else
{
m_url = pkg_info.url;
}
const fs::u8path extracted_cache = caches.get_extracted_dir_path(m_package_info);
if (extracted_cache.empty())
{
const fs::u8path tarball_cache = caches.get_tarball_path(m_package_info);
auto& cache = caches.first_writable_cache(true);
m_cache_path = cache.path();
if (!tarball_cache.empty())
{
LOG_DEBUG << "Found valid tarball cache at '" << tarball_cache.string() << "'";
cache.clear_query_cache(m_package_info);
m_tarball_path = tarball_cache / filename();
m_needs_extract = true;
LOG_DEBUG << "Using cached tarball '" << filename() << "'";
}
else
{
caches.clear_query_cache(m_package_info);
// need to download this file
LOG_DEBUG << "Adding '" << name() << "' to download targets from '" << url() << "'";
m_tarball_path = m_cache_path / filename();
m_needs_extract = true;
m_needs_download = true;
}
}
else
{
LOG_DEBUG << "Using cached '" << name() << "'";
}
}
const std::string& PackageFetcher::name() const
{
return m_package_info.name;
}
bool PackageFetcher::needs_download() const
{
return m_needs_download;
}
bool PackageFetcher::needs_extract() const
{
return m_needs_extract;
}
DownloadRequest
PackageFetcher::build_download_request(std::optional<post_download_success_t> callback)
{
DownloadRequest request(name(), url(), m_tarball_path.string());
request.expected_size = expected_size();
request.on_success = [this, cb = std::move(callback)](const DownloadSuccess& success)
{
LOG_INFO << "Download finished, tarball available at '" << m_tarball_path.string() << "'";
if (cb.has_value())
{
cb.value()(success.transfer.downloaded_size);
}
m_needs_download = false;
return expected_t<void>();
};
request.on_failure = [](const DownloadError& error)
{
if (error.transfer.has_value())
{
LOG_ERROR << "Failed to download package from "
<< error.transfer.value().effective_url << " (status "
<< error.transfer.value().http_status << ")";
}
else
{
LOG_WARNING << error.message;
}
if (error.retry_wait_seconds.has_value())
{
LOG_WARNING << "Retrying in " << error.retry_wait_seconds.value() << " seconds";
}
};
return request;
}
auto PackageFetcher::validate(std::size_t downloaded_size, progress_callback_t* cb) const
-> ValidationResult
{
update_monitor(cb, PackageExtractEvent::validate_update);
ValidationResult res = validate_size(downloaded_size);
if (res != ValidationResult::VALID)
{
update_monitor(cb, PackageExtractEvent::validate_failure);
return res;
}
interruption_point();
if (!sha256().empty())
{
res = validate_checksum({ sha256(),
validation::sha256sum(m_tarball_path),
"SHA256",
ValidationResult::SHA256_ERROR });
}
else if (!md5().empty())
{
res = validate_checksum(
{ md5(), validation::md5sum(m_tarball_path), "MD5", ValidationResult::MD5SUM_ERROR }
);
}
auto event = res == ValidationResult::VALID ? PackageExtractEvent::validate_success
: PackageExtractEvent::validate_failure;
update_monitor(cb, event);
return res;
}
namespace
{
fs::u8path get_extract_path(const std::string& filename, const fs::u8path& cache_path)
{
std::string fn = filename;
if (util::ends_with(fn, ".tar.bz2"))
{
fn = fn.substr(0, fn.size() - 8);
}
else if (util::ends_with(fn, ".conda"))
{
fn = fn.substr(0, fn.size() - 6);
}
else
{
LOG_ERROR << "Unknown package format '" << filename << "'";
throw std::runtime_error("Unknown package format.");
}
return cache_path / fn;
}
void clear_extract_path(const fs::u8path& path)
{
if (fs::exists(path))
{
LOG_DEBUG << "Removing '" << path.string() << "' before extracting it again";
fs::remove_all(path);
}
}
void extract_impl(
const fs::u8path& tarball_path,
const fs::u8path& extract_path,
const ExtractOptions& options
)
{
// Use non-subproc version if concurrency is disabled to avoid
// any potential subprocess issues
if (PackageFetcherSemaphore::get_max() == 1)
{
mamba::extract(tarball_path, extract_path, options);
}
else
{
mamba::extract_subproc(tarball_path, extract_path, options);
}
}
}
bool PackageFetcher::extract(const ExtractOptions& options, progress_callback_t* cb)
{
// Extracting is __not__ yet thread safe it seems...
interruption_point();
LOG_DEBUG << "Waiting for decompression " << m_tarball_path;
update_monitor(cb, PackageExtractEvent::extract_update);
{
std::lock_guard<counting_semaphore> lock(PackageFetcherSemaphore::semaphore);
interruption_point();
LOG_DEBUG << "Decompressing '" << m_tarball_path.string() << "'";
try
{
const fs::u8path extract_path = get_extract_path(filename(), m_cache_path);
// Be sure the first writable cache doesn't contain invalid extracted package
clear_extract_path(extract_path);
extract_impl(m_tarball_path, extract_path, options);
interruption_point();
LOG_DEBUG << "Extracted to '" << extract_path.string() << "'";
write_repodata_record(extract_path);
update_urls_txt();
update_monitor(cb, PackageExtractEvent::extract_success);
}
catch (std::exception& e)
{
Console::instance().print(filename() + " extraction failed");
LOG_ERROR << "Error when extracting package: " << e.what();
update_monitor(cb, PackageExtractEvent::extract_failure);
return false;
}
}
m_needs_extract = false;
return true;
}
PackageExtractTask PackageFetcher::build_extract_task(ExtractOptions options)
{
return { this, std::move(options) };
}
void PackageFetcher::clear_cache() const
{
fs::remove_all(m_tarball_path);
const fs::u8path dest_dir = strip_package_extension(m_tarball_path.string());
fs::remove_all(dest_dir);
}
/*******************
* Private methods *
*******************/
const std::string& PackageFetcher::filename() const
{
return m_package_info.fn;
}
const std::string& PackageFetcher::url() const
{
return m_url;
}
const std::string& PackageFetcher::sha256() const
{
return m_package_info.sha256;
}
const std::string& PackageFetcher::md5() const
{
return m_package_info.md5;
}
std::size_t PackageFetcher::expected_size() const
{
return m_package_info.size;
}
auto PackageFetcher::validate_size(std::size_t downloaded_size) const -> ValidationResult
{
auto res = ValidationResult::VALID;
if (expected_size() && expected_size() != downloaded_size)
{
res = ValidationResult::SIZE_ERROR;
LOG_ERROR << "File not valid: file size doesn't match expectation " << m_tarball_path
<< "\nExpected: " << expected_size() << "\nActual: " << downloaded_size
<< "\n";
Console::instance().print(filename() + " tarball has incorrect size");
}
return res;
}
auto PackageFetcher::validate_checksum(CheckSumParams params) const -> ValidationResult
{
auto res = ValidationResult::VALID;
if (params.actual != params.expected)
{
res = params.error;
LOG_ERROR << "File not valid: " << params.name << " doesn't match expectation "
<< m_tarball_path << "\nExpected: " << params.expected
<< "\nActual: " << params.actual << "\n";
Console::instance().print(filename() + " tarball has incorrect " + params.name);
// TODO: terminate monitor
}
return res;
}
void PackageFetcher::write_repodata_record(const fs::u8path& base_path) const
{
const fs::u8path repodata_record_path = base_path / "info" / "repodata_record.json";
const fs::u8path index_path = base_path / "info" / "index.json";
nlohmann::json index, solvable_json;
std::ifstream index_file = open_ifstream(index_path);
index_file >> index;
solvable_json = m_package_info.json_record();
index.insert(solvable_json.cbegin(), solvable_json.cend());
if (index.find("size") == index.end() || index["size"] == 0)
{
index["size"] = fs::file_size(m_tarball_path);
}
std::ofstream repodata_record(repodata_record_path.std_path());
repodata_record << index.dump(4);
}
namespace
{
std::mutex urls_txt_mutex;
}
void PackageFetcher::update_urls_txt() const
{
// TODO: check if this lock is really required
std::lock_guard<std::mutex> lock(urls_txt_mutex);
const auto urls_file_path = m_cache_path / "urls.txt";
std::ofstream urls_txt(urls_file_path.std_path(), std::ios::app);
urls_txt << m_url << std::endl;
}
void PackageFetcher::update_monitor(progress_callback_t* cb, PackageExtractEvent event) const
{
if (cb)
{
safe_invoke(*cb, event);
}
}
/***************************
* PackageFetcherSemaphore *
***************************/
counting_semaphore PackageFetcherSemaphore::semaphore(0);
std::ptrdiff_t PackageFetcherSemaphore::get_max()
{
return PackageFetcherSemaphore::semaphore.get_max();
}
void PackageFetcherSemaphore::set_max(int value)
{
PackageFetcherSemaphore::semaphore.set_max(value);
}
}

View File

@ -24,14 +24,17 @@ extern "C" // Incomplete header
#include "mamba/core/channel.hpp"
#include "mamba/core/context.hpp"
#include "mamba/core/download_progress_bar.hpp"
#include "mamba/core/env_lockfile.hpp"
#include "mamba/core/execution.hpp"
#include "mamba/core/link.hpp"
#include "mamba/core/match_spec.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/package_download.hpp"
#include "mamba/core/package_fetcher.hpp"
#include "mamba/core/pool.hpp"
#include "mamba/core/thread_utils.hpp"
#include "mamba/core/transaction.hpp"
#include "mamba/core/validate.hpp"
#include "mamba/util/flat_set.hpp"
#include "mamba/util/string.hpp"
#include "solv-cpp/pool.hpp"
@ -825,208 +828,228 @@ namespace mamba
add_json(to_unlink, "UNLINK");
}
bool MTransaction::fetch_extract_packages()
namespace
{
std::vector<std::unique_ptr<PackageDownloadExtractTarget>> targets;
MultiDownloadTarget multi_dl{ m_pool.context() };
auto& pbar_manager = Console::instance().init_progress_bar_manager(ProgressBarMode::aggregated
);
auto& aggregated_pbar_manager = dynamic_cast<AggregatedBarManager&>(pbar_manager);
auto& channel_context = m_pool.channel_context();
auto& ctx = channel_context.context();
DownloadExtractSemaphore::set_max(ctx.threads_params.extract_threads);
if (ctx.experimental && ctx.validation_params.verify_artifacts)
using FetcherList = std::vector<PackageFetcher>;
// Free functions instead of private method to avoid exposing downloaders
// and package fetchers in the header. Ideally we may want a pimpl or
// a private implementation header when we refactor this class.
FetcherList
build_fetchers(MPool& pool, const Solution& solution, MultiPackageCache& multi_cache)
{
LOG_INFO << "Content trust is enabled, package(s) signatures will be verified";
}
FetcherList fetchers;
auto& channel_context = pool.channel_context();
auto& ctx = channel_context.context();
for_each_to_install(
m_solution.actions,
[&](const auto& pkg)
if (ctx.experimental && ctx.validation_params.verify_artifacts)
{
if (ctx.experimental && ctx.validation_params.verify_artifacts)
{
const auto& repo_checker = channel_context.make_channel(pkg.channel)
.repo_checker(ctx, m_multi_cache);
repo_checker.verify_package(
pkg.json_signable(),
nlohmann::json::parse(pkg.signatures)
);
LOG_DEBUG << "'" << pkg.name << "' trusted from '" << pkg.channel << "'";
}
targets.emplace_back(
std::make_unique<PackageDownloadExtractTarget>(pkg, m_pool.channel_context())
);
DownloadTarget* download_target = targets.back()->target(ctx, m_multi_cache);
if (download_target != nullptr)
{
multi_dl.add(download_target);
}
LOG_INFO << "Content trust is enabled, package(s) signatures will be verified";
}
);
for_each_to_install(
solution.actions,
[&](const auto& pkg)
{
if (ctx.experimental && ctx.validation_params.verify_artifacts)
{
const auto& repo_checker = channel_context.make_channel(pkg.channel)
.repo_checker(ctx, multi_cache);
repo_checker.verify_package(
pkg.json_signable(),
nlohmann::json::parse(pkg.signatures)
);
if (ctx.experimental && ctx.validation_params.verify_artifacts)
{
auto out = Console::stream();
fmt::print(
out,
"Content trust verifications successful, {} ",
fmt::styled("package(s) are trusted", ctx.graphics_params.palette.safe)
LOG_DEBUG << "'" << pkg.name << "' trusted from '" << pkg.channel << "'";
}
fetchers.emplace_back(pkg, channel_context, multi_cache);
}
);
LOG_INFO << "All package(s) are trusted";
if (ctx.experimental && ctx.validation_params.verify_artifacts)
{
auto out = Console::stream();
fmt::print(
out,
"Content trust verifications successful, {} ",
fmt::styled("package(s) are trusted", ctx.graphics_params.palette.safe)
);
LOG_INFO << "All package(s) are trusted";
}
return fetchers;
}
if (!(ctx.graphics_params.no_progress_bars || ctx.output_params.json
|| ctx.output_params.quiet))
using ExtractTaskList = std::vector<PackageExtractTask>;
ExtractTaskList
build_extract_tasks(const Context& context, FetcherList& fetchers, std::size_t extract_size)
{
interruption_guard g([]() { Console::instance().progress_bar_manager().terminate(); });
auto extract_options = ExtractOptions::from_context(context);
ExtractTaskList extract_tasks;
extract_tasks.reserve(extract_size);
std::transform(
fetchers.begin(),
fetchers.begin() + static_cast<std::ptrdiff_t>(extract_size),
std::back_inserter(extract_tasks),
[extract_options](auto& f) { return f.build_extract_task(extract_options); }
);
return extract_tasks;
}
auto* dl_bar = aggregated_pbar_manager.aggregated_bar("Download");
if (dl_bar)
using ExtractTrackerList = std::vector<std::future<PackageExtractTask::Result>>;
MultiDownloadRequest build_download_requests(
FetcherList& fetchers,
ExtractTaskList& extract_tasks,
ExtractTrackerList& extract_trackers,
std::size_t download_size
)
{
MultiDownloadRequest download_requests;
download_requests.reserve(download_size);
for (auto [fit, eit] = std::tuple{ fetchers.begin(), extract_tasks.begin() };
fit != fetchers.begin() + static_cast<std::ptrdiff_t>(download_size);
++fit, ++eit)
{
dl_bar->set_repr_hook(
[=](ProgressBarRepr& repr) -> void
auto ceit = eit; // Apple Clang cannot capture eit
auto task = std::make_shared<std::packaged_task<PackageExtractTask::Result(std::size_t)>>(
[ceit](std::size_t downloaded_size) { return ceit->run(downloaded_size); }
);
extract_trackers.push_back(task->get_future());
download_requests.push_back(fit->build_download_request(
[extract_task = std::move(task)](std::size_t downloaded_size)
{
auto active_tasks = dl_bar->active_tasks().size();
if (active_tasks == 0)
{
repr.prefix.set_value(fmt::format("{:<16}", "Downloading"));
repr.postfix.set_value(fmt::format("{:<25}", ""));
}
else
{
repr.prefix.set_value(fmt::format(
"{:<11} {:>4}",
"Downloading",
fmt::format("({})", active_tasks)
));
repr.postfix.set_value(fmt::format("{:<25}", dl_bar->last_active_task()));
}
repr.current.set_value(fmt::format(
"{:>7}",
to_human_readable_filesize(double(dl_bar->current()), 1)
));
repr.separator.set_value("/");
std::string total_str;
if (dl_bar->total() == std::numeric_limits<std::size_t>::max())
{
total_str = "??.?MB";
}
else
{
total_str = to_human_readable_filesize(double(dl_bar->total()), 1);
}
repr.total.set_value(fmt::format("{:>7}", total_str));
auto speed = dl_bar->avg_speed(std::chrono::milliseconds(500));
repr.speed.set_value(
speed
? fmt::format("@ {:>7}/s", to_human_readable_filesize(double(speed), 1))
: ""
MainExecutor::instance().schedule(
[t = std::move(extract_task)](std::size_t ds) { (*t)(ds); },
downloaded_size
);
}
);
));
}
auto* extract_bar = aggregated_pbar_manager.aggregated_bar("Extract");
if (extract_bar)
{
extract_bar->set_repr_hook(
[=](ProgressBarRepr& repr) -> void
{
auto active_tasks = extract_bar->active_tasks().size();
if (active_tasks == 0)
{
repr.prefix.set_value(fmt::format("{:<16}", "Extracting"));
repr.postfix.set_value(fmt::format("{:<25}", ""));
}
else
{
repr.prefix.set_value(fmt::format(
"{:<11} {:>4}",
"Extracting",
fmt::format("({})", active_tasks)
));
repr.postfix.set_value(
fmt::format("{:<25}", extract_bar->last_active_task())
);
}
repr.current.set_value(fmt::format("{:>3}", extract_bar->current()));
repr.separator.set_value("/");
std::string total_str;
if (extract_bar->total() == std::numeric_limits<std::size_t>::max())
{
total_str = "?";
}
else
{
total_str = std::to_string(extract_bar->total());
}
repr.total.set_value(fmt::format("{:>3}", total_str));
}
);
}
pbar_manager.start();
pbar_manager.watch_print();
return download_requests;
}
bool downloaded = multi_dl.download(MAMBA_DOWNLOAD_FAILFAST | MAMBA_DOWNLOAD_SORT);
bool all_valid = true;
void schedule_remaining_extractions(
ExtractTaskList& extract_tasks,
ExtractTrackerList& extract_trackers,
std::size_t download_size
)
{
// We schedule extractions for packages that don't need to be downloaded,
// because downloading a package already triggers its extraction.
for (auto it = extract_tasks.begin() + static_cast<std::ptrdiff_t>(download_size);
it != extract_tasks.end();
++it)
{
std::packaged_task task{ [=] { return it->run(); } };
extract_trackers.push_back(task.get_future());
MainExecutor::instance().schedule(std::move(task));
}
}
if (!downloaded)
bool trigger_download(
MultiDownloadRequest requests,
const Context& context,
DownloadOptions options,
PackageDownloadMonitor* monitor
)
{
auto result = download(std::move(requests), context, options, monitor);
bool all_downloaded = std::all_of(
result.begin(),
result.end(),
[](const auto& r) { return r; }
);
return all_downloaded;
}
bool clear_invalid_caches(const FetcherList& fetchers, ExtractTrackerList& trackers)
{
bool all_valid = true;
for (auto [fit, eit] = std::tuple{ fetchers.begin(), trackers.begin() };
eit != trackers.end();
++fit, ++eit)
{
PackageExtractTask::Result res = eit->get();
if (!res.valid || !res.extracted)
{
fit->clear_cache();
all_valid = false;
}
}
return all_valid;
}
}
bool MTransaction::fetch_extract_packages()
{
auto& ctx = m_pool.channel_context().context();
PackageFetcherSemaphore::set_max(ctx.threads_params.extract_threads);
FetcherList fetchers = build_fetchers(m_pool, m_solution, m_multi_cache);
auto download_end = std::partition(
fetchers.begin(),
fetchers.end(),
[](const auto& f) { return f.needs_download(); }
);
auto extract_end = std::partition(
download_end,
fetchers.end(),
[](const auto& f) { return f.needs_extract(); }
);
// At this point:
// - [fetchers.begin(), download_end) contains packages that need to be downloaded,
// validated and extracted
// - [download_end, extract_end) contains packages that need to be extracted only
// - [extract_end, fetchers.end()) contains packages already installed and extracted
auto download_size = static_cast<std::size_t>(std::distance(fetchers.begin(), download_end));
auto extract_size = static_cast<std::size_t>(std::distance(fetchers.begin(), extract_end));
ExtractTaskList extract_tasks = build_extract_tasks(ctx, fetchers, extract_size);
ExtractTrackerList extract_trackers;
extract_trackers.reserve(extract_tasks.size());
MultiDownloadRequest download_requests = build_download_requests(
fetchers,
extract_tasks,
extract_trackers,
download_size
);
std::unique_ptr<PackageDownloadMonitor> monitor = nullptr;
DownloadOptions download_options{ true, true };
if (PackageDownloadMonitor::can_monitor(ctx))
{
monitor = std::make_unique<PackageDownloadMonitor>();
monitor->observe(download_requests, extract_tasks, download_options);
}
schedule_remaining_extractions(extract_tasks, extract_trackers, download_size);
bool all_downloaded = trigger_download(
std::move(download_requests),
ctx,
download_options,
monitor.get()
);
if (!all_downloaded)
{
LOG_ERROR << "Download didn't finish!";
return false;
}
// make sure that all targets have finished extracting
while (!is_sig_interrupted())
// Blocks until all extraction are done
for (auto& task : extract_trackers)
{
bool all_finished = true;
for (const auto& t : targets)
{
if (!t->finished())
{
all_finished = false;
break;
}
}
if (all_finished)
{
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
task.wait();
}
if (!(ctx.graphics_params.no_progress_bars || ctx.output_params.json
|| ctx.output_params.quiet))
const bool all_valid = clear_invalid_caches(fetchers, extract_trackers);
// TODO: see if we can move this into the caller
if (!all_valid)
{
pbar_manager.terminate();
pbar_manager.clear_progress_bars();
throw std::runtime_error(std::string("Found incorrect downloads. Aborting"));
}
for (const auto& t : targets)
{
if (t->validation_result() != PackageDownloadExtractTarget::VALIDATION_RESULT::VALID
&& t->validation_result()
!= PackageDownloadExtractTarget::VALIDATION_RESULT::UNDEFINED)
{
t->clear_cache();
all_valid = false;
throw std::runtime_error(
std::string("Found incorrect download: ") + t->name() + ". Aborting"
);
}
}
return !is_sig_interrupted() && downloaded && all_valid;
return !is_sig_interrupted() && all_valid;
}
bool MTransaction::empty()

View File

@ -14,7 +14,7 @@
#include <openssl/evp.h>
#include "mamba/core/context.hpp"
#include "mamba/core/fetch.hpp"
#include "mamba/core/download.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/validate.hpp"
#include "mamba/util/string.hpp"
@ -1446,16 +1446,11 @@ namespace mamba::validation
const auto url = mamba::util::URL::parse(base_url + "/key_mgr.json");
auto dl_target = std::make_unique<mamba::DownloadTarget>(
context,
"key_mgr.json",
url.pretty_str(),
tmp_metadata_path.string()
);
if (dl_target->resource_exists())
if (check_resource_exists(url.pretty_str(), context))
{
if (dl_target->perform())
DownloadRequest request("key_mgr.json", url.pretty_str(), tmp_metadata_path.string());
DownloadResult res = download(std::move(request), context);
if (res)
{
KeyMgrRole key_mgr = create_key_mgr(tmp_metadata_path);
@ -1611,16 +1606,12 @@ namespace mamba::validation
const auto url = mamba::util::URL::parse(base_url + "/pkg_mgr.json");
auto dl_target = std::make_unique<mamba::DownloadTarget>(
context,
"pkg_mgr.json",
url.pretty_str(),
tmp_metadata_path.string()
);
if (dl_target->resource_exists())
if (check_resource_exists(url.pretty_str(), context))
{
if (dl_target->perform())
DownloadRequest request("pkg_mgr.json", url.pretty_str(), tmp_metadata_path.string());
DownloadResult res = download(std::move(request), context);
if (res)
{
PkgMgrRole pkg_mgr = create_pkg_mgr(tmp_metadata_path);
@ -2164,16 +2155,12 @@ namespace mamba::validation
auto url = ::mamba::util::concat(m_base_url, "/", f.string());
tmp_file_path = tmp_dir_path / f;
auto dl_target = std::make_unique<mamba::DownloadTarget>(
m_context,
f.string(),
url,
tmp_file_path.string()
);
if (dl_target->resource_exists())
if (check_resource_exists(url, m_context))
{
if (dl_target->perform())
DownloadRequest request(f.string(), url, tmp_file_path.string());
DownloadResult res = download(std::move(request), m_context);
if (res)
{
break;
}

View File

@ -7,7 +7,6 @@ __all__ = [
"ChannelPriority",
"CompressedProblemsGraph",
"Context",
"DownloadTargetList",
"ExtraPkgInfo",
"History",
"Key",
@ -19,8 +18,6 @@ __all__ = [
"MAMBA_CLEAN_LOCKS",
"MAMBA_CLEAN_PKGS",
"MAMBA_CLEAN_TARBALLS",
"MAMBA_DOWNLOAD_FAILFAST",
"MAMBA_DOWNLOAD_SORT",
"MAMBA_FORCE_REINSTALL",
"MAMBA_NO_DEPS",
"MAMBA_ONLY_DEPS",
@ -829,11 +826,6 @@ class Context:
pass
pass
class DownloadTargetList:
def __init__(self) -> None: ...
def download(self, arg0: int) -> bool: ...
pass
class ExtraPkgInfo:
def __init__(self) -> None: ...
@property
@ -1675,8 +1667,6 @@ MAMBA_CLEAN_INDEX = 2
MAMBA_CLEAN_LOCKS = 16
MAMBA_CLEAN_PKGS = 4
MAMBA_CLEAN_TARBALLS = 8
MAMBA_DOWNLOAD_FAILFAST = 1
MAMBA_DOWNLOAD_SORT = 2
MAMBA_FORCE_REINSTALL = 4
MAMBA_NO_DEPS = 1
MAMBA_ONLY_DEPS = 2

View File

@ -14,6 +14,7 @@
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pybind11/stl_bind.h>
#include <solv/solver.h>
#include "mamba/api/clean.hpp"
#include "mamba/api/configuration.hpp"
@ -21,7 +22,6 @@
#include "mamba/core/context.hpp"
#include "mamba/core/download_progress_bar.hpp"
#include "mamba/core/execution.hpp"
#include "mamba/core/fetch.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/package_handling.hpp"
#include "mamba/core/pool.hpp"
@ -208,14 +208,14 @@ namespace mambapy
bool download()
{
using namespace mamba;
// TODO: expose DownloadProgressBar to libmambapy and remove this
// TODO: expose SubdirDataMonitor to libmambapy and remove this
// logic
Context& ctx = mambapy::singletons.context();
expected_t<void> download_res;
if (DownloadProgressBar::can_monitor(ctx))
if (SubdirDataMonitor::can_monitor(ctx))
{
DownloadProgressBar check_monitor({ true, true });
DownloadProgressBar index_monitor;
SubdirDataMonitor check_monitor({ true, true });
SubdirDataMonitor index_monitor;
download_res = MSubdirData::download_indexes(
m_subdirs,
ctx,
@ -266,10 +266,7 @@ PYBIND11_MODULE(bindings, m)
auto pyPackageInfo = py::class_<PackageInfo>(m, "PackageInfo");
auto pyPrefixData = py::class_<PrefixData>(m, "PrefixData");
auto pySolver = py::class_<MSolver>(m, "Solver");
auto pyMultiDownloadTarget = py::class_<MultiDownloadTarget, std::unique_ptr<MultiDownloadTarget>>(
m,
"DownloadTargetList"
);
// only used in a return type; does it belong in the module?
auto pyRootRole = py::class_<validation::RootRole>(m, "RootRole");
@ -663,12 +660,6 @@ PYBIND11_MODULE(bindings, m)
m.def("cache_fn_url", &cache_fn_url);
m.def("create_cache_dir", &create_cache_dir);
pyMultiDownloadTarget
.def(py::init(
[] { return std::make_unique<MultiDownloadTarget>(mambapy::singletons.context()); }
))
.def("download", &MultiDownloadTarget::download);
py::enum_<ChannelPriority>(m, "ChannelPriority")
.value("Flexible", ChannelPriority::Flexible)
.value("Strict", ChannelPriority::Strict)
@ -1330,10 +1321,6 @@ PYBIND11_MODULE(bindings, m)
m.attr("MAMBA_ONLY_DEPS") = PY_MAMBA_ONLY_DEPS;
m.attr("MAMBA_FORCE_REINSTALL") = PY_MAMBA_FORCE_REINSTALL;
// DOWNLOAD FLAGS
m.attr("MAMBA_DOWNLOAD_FAILFAST") = MAMBA_DOWNLOAD_FAILFAST;
m.attr("MAMBA_DOWNLOAD_SORT") = MAMBA_DOWNLOAD_SORT;
// CLEAN FLAGS
m.attr("MAMBA_CLEAN_ALL") = MAMBA_CLEAN_ALL;
m.attr("MAMBA_CLEAN_INDEX") = MAMBA_CLEAN_INDEX;

View File

@ -6,6 +6,8 @@
#include <string>
#include <solv/solver.h>
#include "mamba/api/configuration.hpp"
#include "mamba/api/create.hpp"
#include "mamba/api/remove.hpp"

View File

@ -4,6 +4,8 @@
//
// The full license is in the file LICENSE, distributed with this software.
#include <solv/solver.h>
#include "mamba/api/configuration.hpp"
#include "mamba/api/repoquery.hpp"

View File

@ -7,6 +7,7 @@
#include <fmt/color.h>
#include <fmt/format.h>
#include <reproc++/run.hpp>
#include <solv/solver.h>
#include "mamba/api/channel_loader.hpp"
#include "mamba/api/configuration.hpp"