* Refactored Subdir Metadata

* Plugged downloader into subdirdata

* Simplified Donwloader APIs

* Added DownloadMonitor and plugged it into channel_loader

* Simplified subdir API

* Fixed python bindings

* Changes according to review (on going)

* Added json parsers to subdirmetadata
This commit is contained in:
Johan Mabille 2023-09-29 16:19:14 +02:00 committed by GitHub
parent 146827c0eb
commit 474be8b9b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1667 additions and 1145 deletions

View File

@ -147,6 +147,7 @@ set(LIBMAMBA_SOURCES
${LIBMAMBA_SOURCE_DIR}/core/channel.cpp
${LIBMAMBA_SOURCE_DIR}/core/context.cpp
${LIBMAMBA_SOURCE_DIR}/core/download.cpp
${LIBMAMBA_SOURCE_DIR}/core/download_progress_bar.cpp
${LIBMAMBA_SOURCE_DIR}/core/environment.cpp
${LIBMAMBA_SOURCE_DIR}/core/environments_manager.cpp
${LIBMAMBA_SOURCE_DIR}/core/error_handling.cpp
@ -209,6 +210,7 @@ endforeach()
set(LIBMAMBA_PUBLIC_HEADERS
${LIBMAMBA_INCLUDE_DIR}/mamba/version.hpp
# Utility library
${LIBMAMBA_INCLUDE_DIR}/mamba/util/json.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/util/deprecation.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/util/build.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/util/cast.hpp
@ -238,6 +240,7 @@ set(LIBMAMBA_PUBLIC_HEADERS
${LIBMAMBA_INCLUDE_DIR}/mamba/core/palette.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/context.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/download.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/download_progress_bar.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/environment.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/environments_manager.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/error_handling.hpp

View File

@ -15,6 +15,7 @@
#include <tl/expected.hpp>
#include "mamba/core/context.hpp"
#include "mamba/core/error_handling.hpp"
namespace mamba
{
@ -23,7 +24,7 @@ namespace mamba
int http_status = 0;
std::string effective_url = "";
std::size_t downloaded_size = 0;
std::size_t average_speed = 0;
std::size_t average_speed_Bps = 0;
};
struct DownloadSuccess
@ -48,6 +49,7 @@ namespace mamba
{
std::size_t downloaded_size = 0;
std::size_t total_to_download = 0;
std::size_t speed_Bps = 0;
};
using DownloadEvent = std::variant<DownloadProgress, DownloadError, DownloadSuccess>;
@ -57,7 +59,7 @@ namespace mamba
using progress_callback_t = std::function<void(const DownloadEvent&)>;
// TODO: remove these functions when we plug a library with continuation
using on_success_callback_t = std::function<bool(const DownloadSuccess&)>;
using on_success_callback_t = std::function<expected_t<void>(const DownloadSuccess&)>;
using on_failure_callback_t = std::function<void(const DownloadError&)>;
std::string name;
@ -66,8 +68,8 @@ namespace mamba
bool head_only;
bool ignore_failure;
std::optional<std::size_t> expected_size = std::nullopt;
std::optional<std::string> if_none_match = std::nullopt;
std::optional<std::string> if_modified_since = std::nullopt;
std::optional<std::string> etag = std::nullopt;
std::optional<std::string> last_modified = std::nullopt;
std::optional<progress_callback_t> progress = std::nullopt;
std::optional<on_success_callback_t> on_success = std::nullopt;
@ -82,29 +84,53 @@ namespace mamba
);
};
using DownloadRequestList = std::vector<DownloadRequest>;
struct MultiDownloadRequest
{
DownloadRequestList requests;
};
using MultiDownloadRequest = std::vector<DownloadRequest>;
using DownloadResult = tl::expected<DownloadSuccess, DownloadError>;
using DownloadResultList = std::vector<DownloadResult>;
struct MultiDownloadResult
{
DownloadResultList results;
};
using MultiDownloadResult = std::vector<DownloadResult>;
struct DownloadOptions
{
using termination_function = std::optional<std::function<void()>>;
bool fail_fast = false;
bool sort = true;
termination_function on_unexpected_termination = std::nullopt;
};
MultiDownloadResult
download(MultiDownloadRequest requests, const Context& context, DownloadOptions options = {});
class DownloadMonitor
{
public:
virtual ~DownloadMonitor() = default;
DownloadMonitor(const DownloadMonitor&) = delete;
DownloadMonitor& operator=(const DownloadMonitor&) = delete;
DownloadMonitor(DownloadMonitor&&) = delete;
DownloadMonitor& operator=(DownloadMonitor&&) = delete;
void observe(MultiDownloadRequest& requests, DownloadOptions& options);
void on_done();
void on_unexpected_termination();
protected:
DownloadMonitor() = default;
private:
virtual void observe_impl(MultiDownloadRequest& requests, DownloadOptions& options) = 0;
virtual void on_done_impl() = 0;
virtual void on_unexpected_termination_impl() = 0;
};
MultiDownloadResult download(
MultiDownloadRequest requests,
const Context& context,
DownloadOptions options = {},
DownloadMonitor* monitor = nullptr
);
}
#endif

View File

@ -0,0 +1,65 @@
// Copyright (c) 2019, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#ifndef MAMBA_CORE_DOWNLOAD_PROGRESS_BAR_HPP
#define MAMBA_CORE_DOWNLOAD_PROGRESS_BAR_HPP
#include <chrono>
#include <functional>
#include <vector>
#include "mamba/core/context.hpp"
#include "mamba/core/download.hpp"
namespace mamba
{
struct MonitorOptions
{
bool checking_download = false;
bool no_clear_progress_bar = false;
};
class DownloadProgressBar : public DownloadMonitor
{
public:
static bool can_monitor(const Context& context);
explicit DownloadProgressBar(MonitorOptions options = {});
virtual ~DownloadProgressBar() = default;
DownloadProgressBar(const DownloadProgressBar&) = delete;
DownloadProgressBar& operator=(const DownloadProgressBar&) = delete;
DownloadProgressBar(DownloadProgressBar&&) = delete;
DownloadProgressBar& operator=(DownloadProgressBar&&) = delete;
void reset_options(MonitorOptions options);
private:
void observe_impl(MultiDownloadRequest& requests, DownloadOptions& options) override;
void on_done_impl() override;
void on_unexpected_termination_impl() override;
void update_progress_bar(std::size_t index, const DownloadEvent& event);
void update_progress_bar(std::size_t index, const DownloadProgress& progress);
void update_progress_bar(std::size_t index, const DownloadError& error);
void update_progress_bar(std::size_t index, const DownloadSuccess& success);
void complete_checking_progress_bar(std::size_t index);
std::function<void(ProgressBarRepr&)> download_repr(std::size_t index);
using time_point = std::chrono::steady_clock::time_point;
std::vector<time_point> m_throttle_time;
std::vector<ProgressProxy> m_progress_bar;
MonitorOptions m_options;
};
}
#endif

View File

@ -11,10 +11,12 @@
#include <regex>
#include <string>
#include <nlohmann/json_fwd.hpp>
#include "mamba/core/channel.hpp"
#include "mamba/core/context.hpp"
#include "mamba/core/download.hpp"
#include "mamba/core/error_handling.hpp"
#include "mamba/core/fetch.hpp"
#include "mamba/core/mamba_fs.hpp"
#include "mamba/core/package_cache.hpp"
#include "mamba/core/pool.hpp"
@ -25,46 +27,70 @@
namespace mamba
{
struct subdir_metadata
class DownloadMonitor;
class MSubdirMetadata
{
struct checked_at
public:
struct HttpMetadata
{
std::string url;
std::string etag;
std::string last_modified;
std::string cache_control;
};
using expected_subdir_metadata = tl::expected<MSubdirMetadata, mamba_error>;
static expected_subdir_metadata read(const fs::u8path& file);
void write(const fs::u8path& file);
bool check_valid_metadata(const fs::u8path& file);
const std::string& url() const;
const std::string& etag() const;
const std::string& last_modified() const;
const std::string& cache_control() const;
bool has_zst() const;
void store_http_metadata(HttpMetadata data);
void store_file_metadata(const fs::u8path& file);
void set_zst(bool value);
private:
static expected_subdir_metadata
from_state_file(const fs::u8path& state_file, const fs::u8path& repodata_file);
static expected_subdir_metadata from_repodata_file(const fs::u8path& json);
#ifdef _WIN32
using time_type = std::chrono::system_clock::time_point;
#else
using time_type = fs::file_time_type;
#endif
HttpMetadata m_http;
time_type m_stored_mtime;
std::size_t m_stored_file_size;
struct CheckedAt
{
bool value;
std::time_t last_checked;
bool has_expired() const
{
// difference in seconds, check every 14 days
return std::difftime(std::time(nullptr), last_checked) > 60 * 60 * 24 * 14;
}
bool has_expired() const;
};
static tl::expected<subdir_metadata, mamba_error> from_stream(std::istream& in);
std::optional<CheckedAt> m_has_zst;
std::string url;
std::string etag;
std::string mod;
std::string cache_control;
#ifdef _WIN32
std::chrono::system_clock::time_point stored_mtime;
#else
fs::file_time_type stored_mtime;
#endif
std::size_t stored_file_size;
std::optional<checked_at> has_zst;
std::optional<checked_at> has_bz2;
std::optional<checked_at> has_jlap;
friend void to_json(nlohmann::json& j, const CheckedAt& ca);
friend void from_json(const nlohmann::json& j, CheckedAt& ca);
void store_file_metadata(const fs::u8path& path);
bool check_valid_metadata(const fs::u8path& path);
void serialize_to_stream(std::ostream& out) const;
void serialize_to_stream_tiny(std::ostream& out) const;
bool check_zst(ChannelContext& channel_context, const Channel* channel);
friend void to_json(nlohmann::json& j, const MSubdirMetadata& data);
friend void from_json(const nlohmann::json& j, MSubdirMetadata& data);
};
/**
* Represents a channel subdirectory (i.e. a platform)
* packages index. Handles downloading of the index
@ -88,27 +114,24 @@ namespace mamba
MSubdirData(const MSubdirData&) = delete;
MSubdirData& operator=(const MSubdirData&) = delete;
MSubdirData(MSubdirData&&);
MSubdirData& operator=(MSubdirData&&);
MSubdirData(MSubdirData&&) = default;
MSubdirData& operator=(MSubdirData&&) = default;
// TODO return seconds as double
fs::file_time_type::duration
check_cache(const fs::u8path& cache_file, const fs::file_time_type::clock::time_point& ref) const;
bool loaded() const;
bool forbid_cache();
bool is_noarch() const;
bool is_loaded() const;
void clear_cache();
expected_t<std::string> cache_path() const;
const std::string& name() const;
expected_t<std::string> cache_path() const;
std::vector<std::unique_ptr<DownloadTarget>>& check_targets();
DownloadTarget* target();
static expected_t<void> download_indexes(
std::vector<MSubdirData>& subdirs,
const Context& context,
DownloadMonitor* check_monitor = nullptr,
DownloadMonitor* download_monitor = nullptr
);
bool finalize_check(const DownloadTarget& target);
bool finalize_transfer(const DownloadTarget& target);
void finalize_checks();
expected_t<MRepo> create_repo(MPool& pool);
expected_t<MRepo> create_repo(MPool& pool) const;
private:
@ -121,15 +144,18 @@ namespace mamba
const std::string& repodata_fn = "repodata.json"
);
bool load(MultiPackageCache& caches, ChannelContext& channel_context);
void check_repodata_existence();
void create_target();
std::size_t get_cache_control_max_age(const std::string& val);
void load(MultiPackageCache& caches, ChannelContext& channel_context, const Channel& channel);
void load_cache(MultiPackageCache& caches, ChannelContext& channel_context);
void update_metadata_zst(ChannelContext& context, const Channel& channel);
MultiDownloadRequest build_check_requests();
DownloadRequest build_index_request();
expected_t<void> use_existing_cache();
expected_t<void> finalize_transfer(MSubdirMetadata::HttpMetadata http_data);
void refresh_last_write_time(const fs::u8path& json_file, const fs::u8path& solv_file);
std::unique_ptr<DownloadTarget> m_target = nullptr;
std::vector<std::unique_ptr<DownloadTarget>> m_check_targets;
bool m_loaded = false;
bool m_json_cache_valid = false;
bool m_solv_cache_valid = false;
@ -137,20 +163,15 @@ namespace mamba
fs::u8path m_expired_cache_path;
fs::u8path m_writable_pkgs_dir;
ProgressProxy m_progress_bar;
ProgressProxy m_progress_bar_check;
bool m_loaded;
bool m_download_complete;
std::string m_repodata_url;
std::string m_name;
std::string m_json_fn;
std::string m_solv_fn;
bool m_is_noarch;
subdir_metadata m_metadata;
MSubdirMetadata m_metadata;
std::unique_ptr<TemporaryFile> m_temp_file;
const Channel* p_channel = nullptr;
Context* p_context = nullptr;
Context* p_context;
};
// Contrary to conda original function, this one expects a full url

View File

@ -0,0 +1,59 @@
// Copyright (c) 2019, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#ifndef MAMBA_UTIL_JSON_HPP
#define MAMBA_UTIL_JSON_HPP
#include <nlohmann/json.hpp>
NLOHMANN_JSON_NAMESPACE_BEGIN
template <typename T>
struct adl_serializer<std::optional<T>>
{
static void to_json(json& j, const std::optional<T>& opt)
{
if (opt.has_value())
{
j = opt.value();
}
else
{
j = nullptr;
}
}
static void from_json(const json& j, std::optional<T>& opt)
{
if (!j.is_null())
{
opt = j.template get<T>();
}
else
{
opt = std::nullopt;
}
}
};
NLOHMANN_JSON_NAMESPACE_END
namespace mamba::util
{
template <typename Json, std::size_t N, typename T>
void deserialize_maybe_missing(Json&& j, const char (&name)[N], T& t)
{
if (j.contains(name))
{
t = std::forward<Json>(j)[name].template get<T>();
}
else
{
t = {};
}
}
}
#endif

View File

@ -58,6 +58,11 @@ namespace mamba::util
*/
[[nodiscard]] auto url_get_scheme(std::string_view url) -> std::string_view;
/*
* Return true if @p url is a file URI, i.e. if it starts with "file://".
*/
[[nodiscard]] auto is_file_uri(std::string_view url) -> bool;
/**
* Retrun true if @p url starts with a URL scheme.
*/

View File

@ -6,11 +6,12 @@
#include "mamba/api/channel_loader.hpp"
#include "mamba/core/channel.hpp"
#include "mamba/core/download.hpp"
#include "mamba/core/download_progress_bar.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/prefix_data.hpp"
#include "mamba/core/repo.hpp"
#include "mamba/core/subdirdata.hpp"
#include "mamba/core/thread_utils.hpp"
#include "mamba/util/string.hpp"
namespace mamba
@ -53,7 +54,6 @@ namespace mamba
std::vector<std::string> channel_urls = ctx.channels;
std::vector<MSubdirData> subdirs;
MultiDownloadTarget multi_dl{ ctx };
std::vector<std::pair<int, int>> priorities;
int max_prio = static_cast<int>(channel_urls.size());
@ -99,42 +99,26 @@ namespace mamba
}
}
for (auto& subdir : subdirs)
expected_t<void> download_res;
if (DownloadProgressBar::can_monitor(ctx))
{
for (auto& check_target : subdir.check_targets())
{
multi_dl.add(check_target.get());
}
DownloadProgressBar check_monitor({ true, true });
DownloadProgressBar index_monitor;
download_res = MSubdirData::download_indexes(subdirs, ctx, &check_monitor, &index_monitor);
}
else
{
download_res = MSubdirData::download_indexes(subdirs, ctx);
}
multi_dl.download(MAMBA_NO_CLEAR_PROGRESS_BARS);
if (is_sig_interrupted())
if (!download_res)
{
error_list.push_back(mamba_error("Interrupted by user", mamba_error_code::user_interrupted)
);
return tl::unexpected(mamba_aggregated_error(std::move(error_list)));
}
for (auto& subdir : subdirs)
{
if (!subdir.check_targets().empty())
mamba_error error = download_res.error();
mamba_error_code ec = error.error_code();
error_list.push_back(std::move(error));
if (ec == mamba_error_code::user_interrupted)
{
// recreate final download target in case HEAD requests succeeded
subdir.finalize_checks();
}
multi_dl.add(subdir.target());
}
// TODO load local channels even when offline if (!ctx.offline)
if (!ctx.offline)
{
try
{
multi_dl.download(MAMBA_DOWNLOAD_FAILFAST);
}
catch (const std::runtime_error& e)
{
error_list.push_back(mamba_error(e.what(), mamba_error_code::repodata_not_loaded));
return tl::unexpected(mamba_aggregated_error(std::move(error_list)));
}
}
@ -151,9 +135,9 @@ namespace mamba
for (std::size_t i = 0; i < subdirs.size(); ++i)
{
auto& subdir = subdirs[i];
if (!subdir.loaded())
if (!subdir.is_loaded())
{
if (!ctx.offline && util::ends_with(subdir.name(), "/noarch"))
if (!ctx.offline && subdir.is_noarch())
{
error_list.push_back(mamba_error(
"Subdir " + subdir.name() + " not loaded!",

View File

@ -243,7 +243,7 @@ namespace mamba
// Set error buffer
m_errorbuffer[0] = '\0';
set_opt(CURLOPT_ERRORBUFFER, m_errorbuffer);
set_opt(CURLOPT_ERRORBUFFER, m_errorbuffer.data());
}
CURLHandle::CURLHandle(CURLHandle&& rhs)
@ -254,6 +254,7 @@ namespace mamba
rhs.p_headers = nullptr;
std::swap(m_errorbuffer, rhs.m_errorbuffer);
std::swap(m_result, rhs.m_result);
set_opt(CURLOPT_ERRORBUFFER, m_errorbuffer.data());
}
CURLHandle& CURLHandle::operator=(CURLHandle&& rhs)
@ -263,6 +264,8 @@ namespace mamba
swap(m_result, rhs.m_result);
swap(p_headers, rhs.p_headers);
swap(m_errorbuffer, rhs.m_errorbuffer);
set_opt(CURLOPT_ERRORBUFFER, m_errorbuffer.data());
rhs.set_opt(CURLOPT_ERRORBUFFER, rhs.m_errorbuffer.data());
return *this;
}
@ -448,7 +451,7 @@ namespace mamba
const char* CURLHandle::get_error_buffer() const
{
return m_errorbuffer;
return m_errorbuffer.data();
}
std::string CURLHandle::get_curl_effective_url() const
@ -661,4 +664,14 @@ namespace mamba
return static_cast<std::size_t>(numfds);
}
std::size_t CURLMultiHandle::poll(size_t timeout)
{
int numfds = 0;
CURLMcode code = curl_multi_poll(p_handle, NULL, 0, static_cast<int>(timeout), &numfds);
if (code != CURLM_OK)
{
throw std::runtime_error(curl_multi_strerror(code));
}
return static_cast<std::size_t>(numfds);
}
} // namespace mamba

View File

@ -104,9 +104,13 @@ namespace mamba
public:
CURLHandle();
~CURLHandle();
CURLHandle(const CURLHandle&) = delete;
CURLHandle& operator=(const CURLHandle&) = delete;
CURLHandle(CURLHandle&& rhs);
CURLHandle& operator=(CURLHandle&& rhs);
~CURLHandle();
const std::pair<std::string_view, CurlLogLevel> get_ssl_backend_info();
@ -159,7 +163,7 @@ namespace mamba
CURL* m_handle;
CURLcode m_result; // Enum range from 0 to 99
curl_slist* p_headers = nullptr;
char m_errorbuffer[CURL_ERROR_SIZE];
std::array<char, CURL_ERROR_SIZE> m_errorbuffer;
friend CURL* unwrap(const CURLHandle&);
};
@ -183,6 +187,9 @@ namespace mamba
explicit CURLMultiHandle(std::size_t max_parallel_downloads);
~CURLMultiHandle();
CURLMultiHandle(const CURLMultiHandle&) = delete;
CURLMultiHandle& operator=(const CURLMultiHandle&) = delete;
CURLMultiHandle(CURLMultiHandle&&);
CURLMultiHandle& operator=(CURLMultiHandle&&);
@ -193,6 +200,7 @@ namespace mamba
response_type pop_message();
std::size_t get_timeout(std::size_t max_timeout = 1000u) const;
std::size_t wait(std::size_t timeout);
std::size_t poll(std::size_t timeout);
private:

View File

@ -1,8 +1,13 @@
#include "mamba/core/download.hpp"
#include "mamba/core/invoke.hpp"
#include "mamba/core/thread_utils.hpp"
#include "mamba/core/util.hpp"
#include "mamba/core/util_scope.hpp"
#include "mamba/util/build.hpp"
#include "mamba/util/iterator.hpp"
#include "mamba/util/string.hpp"
#include "mamba/util/url.hpp"
#include "mamba/util/url_manip.hpp"
#include "curl.hpp"
#include "download_impl.hpp"
@ -10,17 +15,91 @@
namespace mamba
{
namespace
{
constexpr std::array<const char*, 6> cert_locations{
"/etc/ssl/certs/ca-certificates.crt", // Debian/Ubuntu/Gentoo etc.
"/etc/pki/tls/certs/ca-bundle.crt", // Fedora/RHEL 6
"/etc/ssl/ca-bundle.pem", // OpenSUSE
"/etc/pki/tls/cacert.pem", // OpenELEC
"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", // CentOS/RHEL 7
"/etc/ssl/cert.pem", // Alpine Linux
};
void init_remote_fetch_params(Context::RemoteFetchParams& remote_fetch_params)
{
if (!remote_fetch_params.curl_initialized)
{
if (remote_fetch_params.ssl_verify == "<false>")
{
LOG_DEBUG << "'ssl_verify' not activated, skipping cURL SSL init";
remote_fetch_params.curl_initialized = true;
return;
}
#ifdef LIBMAMBA_STATIC_DEPS
CURLHandle handle;
auto init_res = handle.get_ssl_backend_info();
switch (init_res.second)
{
case CurlLogLevel::kInfo:
{
LOG_INFO << init_res.first;
break;
}
case CurlLogLevel::kWarning:
{
LOG_WARNING << init_res.first;
break;
}
case CurlLogLevel::kError:
{
LOG_ERROR << init_res.first;
break;
}
}
#endif
if (!remote_fetch_params.ssl_verify.size()
&& std::getenv("REQUESTS_CA_BUNDLE") != nullptr)
{
remote_fetch_params.ssl_verify = std::getenv("REQUESTS_CA_BUNDLE");
LOG_INFO << "Using REQUESTS_CA_BUNDLE " << remote_fetch_params.ssl_verify;
}
else if (remote_fetch_params.ssl_verify == "<system>" && util::on_linux)
{
bool found = false;
for (const auto& loc : cert_locations)
{
if (fs::exists(loc))
{
remote_fetch_params.ssl_verify = loc;
found = true;
}
}
if (!found)
{
const std::string msg = "No CA certificates found on system, aborting";
LOG_ERROR << msg;
throw mamba_error(msg, mamba_error_code::openssl_failed);
}
}
remote_fetch_params.curl_initialized = true;
}
}
}
/**********************************
* DownloadAttempt implementation *
**********************************/
DownloadAttempt::DownloadAttempt(const DownloadRequest& request)
: p_request(&request)
, p_stream(nullptr)
{
p_stream = make_compression_stream(
p_request->url,
[this](char* in, std::size_t size) { return this->write_data(in, size); }
);
m_retry_wait_seconds = std::size_t(0);
}
@ -31,6 +110,10 @@ namespace mamba
on_failure_callback error
)
{
p_stream = make_compression_stream(
p_request->url,
[this](char* in, std::size_t size) { return this->write_data(in, size); }
);
m_retry_wait_seconds = static_cast<std::size_t>(context.remote_fetch_params.retry_timeout);
configure_handle(context);
downloader.add_handle(m_handle);
@ -44,7 +127,8 @@ namespace mamba
{
bool is_http_status_ok(int http_status)
{
return http_status / 100 == 2;
// Note: http_status == 0 for files
return http_status / 100 == 2 || http_status == 304 || http_status == 0;
}
}
@ -161,7 +245,7 @@ namespace mamba
|| (ssl_no_revoke_env != "0");
m_handle.configure_handle(
p_request->url,
util::file_uri_unc2_to_unc4(p_request->url),
set_low_speed_opt,
context.remote_fetch_params.connect_timeout_secs,
set_ssl_no_revoke,
@ -231,14 +315,14 @@ namespace mamba
}
}
if (p_request->if_none_match.has_value())
if (p_request->etag.has_value())
{
m_handle.add_header("If-None-Match:" + p_request->if_none_match.value());
m_handle.add_header("If-None-Match:" + p_request->etag.value());
}
if (p_request->if_modified_since.has_value())
if (p_request->last_modified.has_value())
{
m_handle.add_header("If-Modified-Since:" + p_request->if_modified_since.value());
m_handle.add_header("If-Modified-Since:" + p_request->last_modified.value());
}
m_handle.set_opt_header();
@ -309,6 +393,7 @@ namespace mamba
s->m_last_modified = value;
}
}
return buffer_size;
}
@ -326,9 +411,13 @@ namespace mamba
)
{
auto* self = reinterpret_cast<DownloadAttempt*>(f);
self->p_request->progress.value()(DownloadProgress{
static_cast<std::size_t>(total_to_download),
static_cast<std::size_t>(now_downloaded) });
const auto speed_Bps = self->m_handle.get_info<std::size_t>(CURLINFO_SPEED_DOWNLOAD_T)
.value_or(0);
const size_t total = total_to_download ? static_cast<std::size_t>(total_to_download)
: self->p_request->expected_size.value_or(0);
self->p_request->progress.value()(
DownloadProgress{ static_cast<std::size_t>(now_downloaded), total, speed_Bps }
);
return 0;
}
@ -355,10 +444,16 @@ namespace mamba
TransferData DownloadAttempt::get_transfer_data() const
{
// Curl transforms file URI like file:///C/something into file://C/something, which
// may lead to wrong comparisons later. When the URL is a file URI, we know there is
// no redirection and we can use the input URL as the effective URL.
std::string url = util::is_file_uri(p_request->url)
? p_request->url
: m_handle.get_info<char*>(CURLINFO_EFFECTIVE_URL).value();
return {
/* .http_status = */ m_handle.get_info<int>(CURLINFO_RESPONSE_CODE)
.value_or(http::ARBITRARY_ERROR),
/* .effective_url = */ m_handle.get_info<char*>(CURLINFO_EFFECTIVE_URL).value(),
/* .effective_url = */ std::move(url),
/* .dwonloaded_size = */ m_handle.get_info<std::size_t>(CURLINFO_SIZE_DOWNLOAD_T).value_or(0),
/* .average_speed = */ m_handle.get_info<std::size_t>(CURLINFO_SPEED_DOWNLOAD_T).value_or(0)
};
@ -396,7 +491,7 @@ namespace mamba
DownloadSuccess DownloadAttempt::build_download_success(TransferData data) const
{
return { /*.filename = */ p_request->filename,
/*.trnasfer = */ std::move(data),
/*.transfer = */ std::move(data),
/*.cache_control = */ m_cache_control,
/*.etag = */ m_etag,
/*.last_modified = */ m_last_modified };
@ -419,6 +514,7 @@ namespace mamba
auto DownloadTracker::prepare_new_attempt(CURLMultiHandle& handle, const Context& context)
-> completion_map_entry
{
m_state = DownloadState::PREPARING;
m_next_retry = std::nullopt;
CURLId id = m_attempt.prepare_download(
@ -426,9 +522,9 @@ namespace mamba
context,
[this](DownloadSuccess res)
{
bool finalize_res = invoke_on_success(res);
set_state(finalize_res);
throw_if_required(res);
expected_t<void> finalize_res = invoke_on_success(res);
set_state(finalize_res.has_value());
throw_if_required(finalize_res);
save(std::move(res));
return is_waiting();
},
@ -451,25 +547,31 @@ namespace mamba
|| m_next_retry.value() < std::chrono::steady_clock::now());
}
void DownloadTracker::set_transfer_started()
{
m_state = DownloadState::RUNNING;
}
const DownloadResult& DownloadTracker::get_result() const
{
return m_attempt_results.back();
}
bool DownloadTracker::invoke_on_success(const DownloadSuccess& res) const
expected_t<void> DownloadTracker::invoke_on_success(const DownloadSuccess& res) const
{
if (p_request->on_success.has_value())
{
return p_request->on_success.value()(res);
auto ret = safe_invoke(p_request->on_success.value(), res);
return ret.has_value() ? ret.value() : forward_error(ret);
}
return true;
return expected_t<void>();
}
void DownloadTracker::invoke_on_failure(const DownloadError& res) const
{
if (p_request->on_failure.has_value())
{
p_request->on_failure.value()(res);
safe_invoke(p_request->on_failure.value(), res);
}
}
@ -518,18 +620,11 @@ namespace mamba
}
}
void DownloadTracker::throw_if_required(const DownloadSuccess& res)
void DownloadTracker::throw_if_required(const expected_t<void>& res)
{
if (m_state == DownloadState::FAILED && !p_request->ignore_failure && m_options.fail_fast)
{
throw std::runtime_error(
"Multi-download failed. Reason: "
+ build_transfer_message(
res.transfer.http_status,
res.transfer.effective_url,
res.transfer.downloaded_size
)
);
throw res.error();
}
}
@ -567,20 +662,20 @@ namespace mamba
if (m_options.sort)
{
std::sort(
m_requests.requests.begin(),
m_requests.requests.end(),
m_requests.begin(),
m_requests.end(),
[](const DownloadRequest& a, const DownloadRequest& b) -> bool
{ return a.expected_size.value_or(SIZE_MAX) > b.expected_size.value_or(SIZE_MAX); }
);
}
m_trackers.reserve(m_requests.requests.size());
m_trackers.reserve(m_requests.size());
std::size_t max_retries = static_cast<std::size_t>(context.remote_fetch_params.max_retries);
DownloadTrackerOptions tracker_options{ max_retries, options.fail_fast };
std::transform(
m_requests.requests.begin(),
m_requests.requests.end(),
std::inserter(m_trackers, m_trackers.begin()),
m_requests.begin(),
m_requests.end(),
std::back_inserter(m_trackers),
[tracker_options](const DownloadRequest& req)
{ return DownloadTracker(req, tracker_options); }
);
@ -591,6 +686,11 @@ namespace mamba
{
while (!download_done())
{
if (is_sig_interrupted())
{
invoke_unexpected_termination();
break;
}
prepare_next_downloads();
update_downloads();
}
@ -614,6 +714,7 @@ namespace mamba
);
if (success)
{
tracker.set_transfer_started();
++running_attempts;
}
}
@ -622,6 +723,12 @@ namespace mamba
void Downloader::update_downloads()
{
std::size_t still_running = m_curl_handle.perform();
if (still_running == m_waiting_count)
{
m_curl_handle.wait(m_curl_handle.get_timeout());
}
while (auto resp = m_curl_handle.pop_message())
{
const auto& msg = resp.value();
@ -658,7 +765,7 @@ namespace mamba
MultiDownloadResult Downloader::build_result() const
{
DownloadResultList result;
MultiDownloadResult result;
result.reserve(m_trackers.size());
std::transform(
m_trackers.begin(),
@ -666,7 +773,15 @@ namespace mamba
std::inserter(result, result.begin()),
[](const DownloadTracker& tracker) { return tracker.get_result(); }
);
return { result };
return result;
}
void Downloader::invoke_unexpected_termination() const
{
if (m_options.on_unexpected_termination.has_value())
{
safe_invoke(m_options.on_unexpected_termination.value());
}
}
/*****************************
@ -688,10 +803,48 @@ namespace mamba
{
}
MultiDownloadResult
download(MultiDownloadRequest requests, const Context& context, DownloadOptions options)
void DownloadMonitor::observe(MultiDownloadRequest& requests, DownloadOptions& options)
{
Downloader dl(std::move(requests), std::move(options), context);
return dl.download();
observe_impl(requests, options);
}
void DownloadMonitor::on_done()
{
on_done_impl();
}
void DownloadMonitor::on_unexpected_termination()
{
on_done_impl();
}
MultiDownloadResult download(
MultiDownloadRequest requests,
const Context& context,
DownloadOptions options,
DownloadMonitor* monitor
)
{
if (!context.remote_fetch_params.curl_initialized)
{
// TODO: MOve this into an object that would be autmotacially initialized
// upon construction, and passed by const reference to this function instead
// of context.
Context& ctx = const_cast<Context&>(context);
init_remote_fetch_params(ctx.remote_fetch_params);
}
if (monitor)
{
monitor->observe(requests, options);
on_scope_exit guard([monitor]() { monitor->on_done(); });
Downloader dl(std::move(requests), std::move(options), context);
return dl.download();
}
else
{
Downloader dl(std::move(requests), std::move(options), context);
return dl.download();
}
}
}

View File

@ -105,12 +105,13 @@ namespace mamba
-> completion_map_entry;
bool can_start_transfer() const;
void set_transfer_started();
const DownloadResult& get_result() const;
private:
bool invoke_on_success(const DownloadSuccess&) const;
expected_t<void> invoke_on_success(const DownloadSuccess&) const;
void invoke_on_failure(const DownloadError&) const;
bool is_waiting() const;
@ -118,7 +119,15 @@ namespace mamba
void set_state(bool success);
void set_state(const DownloadError& res);
void throw_if_required(const DownloadSuccess&);
/**
* Invoked when the download succeeded but the download callback
* failed.
*/
void throw_if_required(const expected_t<void>&);
/**
* Invoked when the download failed.
*/
void throw_if_required(const DownloadError&);
void save(DownloadSuccess&&);
@ -147,6 +156,7 @@ namespace mamba
void update_downloads();
bool download_done() const;
MultiDownloadResult build_result() const;
void invoke_unexpected_termination() const;
MultiDownloadRequest m_requests;
DownloadOptions m_options;

View File

@ -0,0 +1,210 @@
#include "mamba/core/download_progress_bar.hpp"
#include "progress_bar_impl.hpp"
namespace mamba
{
DownloadProgressBar::DownloadProgressBar(MonitorOptions options)
: m_options(std::move(options))
{
}
void DownloadProgressBar::reset_options(MonitorOptions options)
{
m_options = std::move(options);
}
bool DownloadProgressBar::can_monitor(const Context& context)
{
return !(
context.graphics_params.no_progress_bars || context.output_params.json
|| context.output_params.quiet
);
}
void DownloadProgressBar::observe_impl(MultiDownloadRequest& requests, DownloadOptions& options)
{
m_throttle_time.resize(requests.size(), std::chrono::steady_clock::now());
m_progress_bar.reserve(requests.size());
for (std::size_t i = 0; i < requests.size(); ++i)
{
m_progress_bar.push_back(Console::instance().add_progress_bar(requests[i].name));
m_progress_bar.back().set_repr_hook(download_repr(i));
if (m_options.checking_download)
{
m_progress_bar.back().repr().postfix.set_value("Checking");
}
requests[i].progress = [this, i](const DownloadEvent& e) { update_progress_bar(i, e); };
}
auto& pbar_manager = Console::instance().progress_bar_manager();
if (!pbar_manager.started())
{
pbar_manager.watch_print();
}
options.on_unexpected_termination = [this]() { on_unexpected_termination(); };
}
void DownloadProgressBar::on_done_impl()
{
auto& pbar_manager = Console::instance().progress_bar_manager();
if (pbar_manager.started())
{
pbar_manager.terminate();
if (!m_options.no_clear_progress_bar)
{
pbar_manager.clear_progress_bars();
}
}
m_throttle_time.clear();
m_progress_bar.clear();
m_options = MonitorOptions{};
}
void DownloadProgressBar::on_unexpected_termination_impl()
{
Console::instance().progress_bar_manager().terminate();
}
void DownloadProgressBar::update_progress_bar(std::size_t index, const DownloadEvent& event)
{
std::visit([this, index](auto&& arg) { update_progress_bar(index, arg); }, event);
}
void DownloadProgressBar::update_progress_bar(std::size_t index, const DownloadProgress& progress)
{
auto now = std::chrono::steady_clock::now();
const auto throttle_treshold = std::chrono::milliseconds(50);
if (now - m_throttle_time[index] < throttle_treshold)
{
return;
}
else
{
m_throttle_time[index] = now;
}
ProgressProxy& progress_bar = m_progress_bar[index];
if (!progress.total_to_download)
{
progress_bar.activate_spinner();
}
else
{
progress_bar.deactivate_spinner();
}
progress_bar.update_progress(progress.downloaded_size, progress.total_to_download);
progress_bar.set_speed(progress.speed_Bps);
}
void DownloadProgressBar::update_progress_bar(std::size_t index, const DownloadError& error)
{
if (m_options.checking_download)
{
complete_checking_progress_bar(index);
}
else
{
ProgressProxy& progress_bar = m_progress_bar[index];
if (error.transfer.has_value())
{
const int http_status = error.transfer.value().http_status;
progress_bar.set_postfix(std::to_string(http_status) + " failed");
}
else
{
progress_bar.set_postfix("failed");
}
progress_bar.set_full();
progress_bar.mark_as_completed();
}
}
void DownloadProgressBar::update_progress_bar(std::size_t index, const DownloadSuccess& success)
{
if (m_options.checking_download)
{
complete_checking_progress_bar(index);
}
else
{
ProgressProxy& progress_bar = m_progress_bar[index];
if (success.transfer.http_status == 304)
{
auto& r = progress_bar.repr();
r.postfix.set_format("{:>20}", 20);
r.prefix.set_format("{:<50}", 50);
progress_bar.set_postfix("No change");
progress_bar.mark_as_completed();
r.total.deactivate();
r.speed.deactivate();
r.elapsed.deactivate();
}
else
{
progress_bar.repr().postfix.set_value("Downloaded").deactivate();
progress_bar.mark_as_completed();
// make sure total value is up-to-date
progress_bar.update_repr(false);
// select field to display and make sure they are
// properly set if not yet printed by the progress bar manager
ProgressBarRepr r = progress_bar.repr();
r.prefix.set_format("{:<50}", 50);
r.progress.deactivate();
r.current.deactivate();
r.separator.deactivate();
auto console_stream = Console::stream();
r.print(console_stream, 0, false);
}
}
}
void DownloadProgressBar::complete_checking_progress_bar(std::size_t index)
{
ProgressProxy& progress_bar = m_progress_bar[index];
progress_bar.repr().postfix.set_value("Checked");
progress_bar.repr().speed.deactivate();
progress_bar.repr().total.deactivate();
progress_bar.mark_as_completed();
}
std::function<void(ProgressBarRepr&)> DownloadProgressBar::download_repr(std::size_t index)
{
return [this, index](ProgressBarRepr& r)
{
ProgressProxy& progress_bar = m_progress_bar[index];
r.current.set_value(fmt::format(
"{:>7}",
to_human_readable_filesize(static_cast<double>(progress_bar.current()), 1)
));
std::string total_str;
if (!progress_bar.total()
|| (progress_bar.total() == std::numeric_limits<std::size_t>::max()))
{
total_str = "??.?MB";
}
else
{
total_str = to_human_readable_filesize(static_cast<double>(progress_bar.total()), 1);
}
r.total.set_value(fmt::format("{:>7}", total_str));
auto speed = progress_bar.speed();
r.speed.set_value(fmt::format(
"@ {:>7}/s",
speed ? to_human_readable_filesize(static_cast<double>(speed), 1) : "??.?MB"
));
r.separator.set_value("/");
};
}
}

File diff suppressed because it is too large Load Diff

View File

@ -8,39 +8,8 @@
#include <string_view>
#include <utility>
#include <nlohmann/json.hpp>
#include "mamba/specs/repo_data.hpp"
NLOHMANN_JSON_NAMESPACE_BEGIN
template <typename T>
struct adl_serializer<std::optional<T>>
{
static void to_json(json& j, const std::optional<T>& opt)
{
if (opt.has_value())
{
j = opt.value();
}
else
{
j = nullptr;
}
}
static void from_json(const json& j, std::optional<T>& opt)
{
if (!j.is_null())
{
opt = j.template get<T>();
}
else
{
opt = std::nullopt;
}
}
};
NLOHMANN_JSON_NAMESPACE_END
#include "mamba/util/json.hpp"
namespace mamba::specs
{
@ -76,24 +45,10 @@ namespace mamba::specs
j["timestamp"] = p.timestamp;
}
namespace
{
template <typename Json, std::size_t N, typename T>
void deserialize_maybe_missing(Json&& j, const char (&name)[N], T& t)
{
if (j.contains(name))
{
t = std::forward<Json>(j)[name].template get<T>();
}
else
{
t = {};
}
}
}
void from_json(const nlohmann::json& j, RepoDataPackage& p)
{
using mamba::util::deserialize_maybe_missing;
p.name = j.at("name");
p.version = Version::parse(j.at("version").template get<std::string_view>());
p.build_string = j.at("build");
@ -165,6 +120,7 @@ namespace mamba::specs
void from_json(const nlohmann::json& j, RepoData& data)
{
using mamba::util::deserialize_maybe_missing;
deserialize_maybe_missing(j, "version", data.version);
deserialize_maybe_missing(j, "info", data.info);
deserialize_maybe_missing(j, "packages", data.packages);

View File

@ -227,6 +227,11 @@ namespace mamba::util
return "";
}
auto is_file_uri(std::string_view url) -> bool
{
return url_get_scheme(url) == "file";
}
auto url_has_scheme(std::string_view url) -> bool
{
return !url_get_scheme(url).empty();

View File

@ -589,12 +589,6 @@ namespace mamba
}
}
namespace detail
{
// read the header that contains json like {"_mod": "...", ...}
tl::expected<subdir_metadata, mamba_error> read_metadata(const fs::u8path& file);
}
#ifdef _WIN32
std::chrono::system_clock::time_point filetime_to_unix_test(const fs::file_time_type& filetime)
{
@ -610,47 +604,47 @@ namespace mamba
TEST_SUITE("subdirdata")
{
TEST_CASE("parse_mod_etag")
TEST_CASE("parse_last_modified_etag")
{
fs::u8path cache_folder = fs::u8path{ test_data_dir / "repodata_json_cache" };
auto mq = detail::read_metadata(cache_folder / "test_1.json");
auto mq = MSubdirMetadata::read(cache_folder / "test_1.json");
CHECK(mq.has_value());
auto j = mq.value();
CHECK_EQ(j.mod, "Fri, 11 Feb 2022 13:52:44 GMT");
CHECK_EQ(j.last_modified(), "Fri, 11 Feb 2022 13:52:44 GMT");
CHECK_EQ(
j.url,
j.url(),
"file:///Users/wolfvollprecht/Programs/mamba/mamba/tests/channel_a/linux-64/repodata.json"
);
j = detail::read_metadata(cache_folder / "test_2.json").value();
CHECK_EQ(j.mod, "Fri, 11 Feb 2022 13:52:44 GMT");
j = MSubdirMetadata::read(cache_folder / "test_2.json").value();
CHECK_EQ(j.last_modified(), "Fri, 11 Feb 2022 13:52:44 GMT");
CHECK_EQ(
j.url,
j.url(),
"file:///Users/wolfvollprecht/Programs/mamba/mamba/tests/channel_a/linux-64/repodata.json"
);
j = detail::read_metadata(cache_folder / "test_5.json").value();
CHECK_EQ(j.mod, "Fri, 11 Feb 2022 13:52:44 GMT");
j = MSubdirMetadata::read(cache_folder / "test_5.json").value();
CHECK_EQ(j.last_modified(), "Fri, 11 Feb 2022 13:52:44 GMT");
CHECK_EQ(
j.url,
j.url(),
"file:///Users/wolfvollprecht/Programs/mamba/mamba/tests/channel_a/linux-64/repodata.json"
);
j = detail::read_metadata(cache_folder / "test_4.json").value();
CHECK_EQ(j.cache_control, "{{}}\",,,\"");
CHECK_EQ(j.etag, "\n\n\"\"randome ecx,,ssd\n,,\"");
CHECK_EQ(j.mod, "Fri, 11 Feb 2022 13:52:44 GMT");
j = MSubdirMetadata::read(cache_folder / "test_4.json").value();
CHECK_EQ(j.cache_control(), "{{}}\",,,\"");
CHECK_EQ(j.etag(), "\n\n\"\"randome ecx,,ssd\n,,\"");
CHECK_EQ(j.last_modified(), "Fri, 11 Feb 2022 13:52:44 GMT");
CHECK_EQ(
j.url,
j.url(),
"file:///Users/wolfvollprecht/Programs/mamba/mamba/tests/channel_a/linux-64/repodata.json"
);
mq = detail::read_metadata(cache_folder / "test_3.json");
mq = MSubdirMetadata::read(cache_folder / "test_3.json");
CHECK(mq.has_value() == false);
j = detail::read_metadata(cache_folder / "test_6.json").value();
CHECK_EQ(j.mod, "Thu, 02 Apr 2020 20:21:27 GMT");
CHECK_EQ(j.url, "https://conda.anaconda.org/intake/osx-arm64");
j = MSubdirMetadata::read(cache_folder / "test_6.json").value();
CHECK_EQ(j.last_modified(), "Thu, 02 Apr 2020 20:21:27 GMT");
CHECK_EQ(j.url(), "https://conda.anaconda.org/intake/osx-arm64");
auto state_file = cache_folder / "test_7.state.json";
// set file_mtime
@ -681,13 +675,12 @@ namespace mamba
ofs << jstate.dump(4);
}
j = detail::read_metadata(cache_folder / "test_7.json").value();
CHECK_EQ(j.cache_control, "something");
CHECK_EQ(j.etag, "something else");
CHECK_EQ(j.mod, "Fri, 11 Feb 2022 13:52:44 GMT");
CHECK_EQ(j.url, "https://conda.anaconda.org/conda-forge/noarch/repodata.json.zst");
CHECK_EQ(j.has_zst.value().value, true);
CHECK_EQ(j.has_zst.value().last_checked, parse_utc_timestamp("2023-01-06T16:33:06Z"));
j = MSubdirMetadata::read(cache_folder / "test_7.json").value();
CHECK_EQ(j.cache_control(), "something");
CHECK_EQ(j.etag(), "something else");
CHECK_EQ(j.last_modified(), "Fri, 11 Feb 2022 13:52:44 GMT");
CHECK_EQ(j.url(), "https://conda.anaconda.org/conda-forge/noarch/repodata.json.zst");
CHECK_EQ(j.has_zst(), false);
}
}
} // namespace mamba

View File

@ -31,9 +31,9 @@ namespace mamba
MultiDownloadRequest dl_request{ std::vector{ std::move(request) } };
context.output_params.quiet = true;
MultiDownloadResult res = download(dl_request, context);
CHECK_EQ(res.results.size(), std::size_t(1));
CHECK(!res.results[0]);
CHECK_EQ(res.results[0].error().attempt_number, std::size_t(1));
CHECK_EQ(res.size(), std::size_t(1));
CHECK(!res[0]);
CHECK_EQ(res[0].error().attempt_number, std::size_t(1));
#endif
}

View File

@ -15,6 +15,7 @@
#include <solv/solver.h>
#include "mamba/core/channel.hpp"
#include "mamba/core/download.hpp"
#include "mamba/core/mamba_fs.hpp"
#include "mamba/core/package_info.hpp"
#include "mamba/core/pool.hpp"
@ -76,7 +77,7 @@ TEST_SUITE("conflict_map")
}
}
TEST_SUITE_BEGIN("satifiability_error");
TEST_SUITE_BEGIN("satisfiability_error");
namespace
{
@ -329,7 +330,6 @@ namespace
*/
auto load_channels(MPool& pool, MultiPackageCache& cache, std::vector<std::string>&& channels)
{
MultiDownloadTarget dlist{ mambatests::context() };
auto sub_dirs = std::vector<MSubdirData>();
for (const auto* chan : pool.channel_context().get_channels(channels))
{
@ -338,11 +338,12 @@ namespace
auto sub_dir = expected_value_or_throw(
MSubdirData::create(pool.channel_context(), *chan, platform, url, cache)
);
dlist.add(sub_dir.target());
sub_dirs.push_back(std::move(sub_dir));
}
}
dlist.download(MAMBA_DOWNLOAD_FAILFAST);
MSubdirData::download_indexes(sub_dirs, mambatests::context());
for (auto& sub_dir : sub_dirs)
{
sub_dir.create_repo(pool);
@ -394,7 +395,7 @@ namespace
}
}
TEST_CASE("Test create_conda_forge utility ")
TEST_CASE("Test create_conda_forge utility")
{
ChannelContext channel_context{ mambatests::context() };
auto solver = create_conda_forge(channel_context, { "xtensor>=0.7" });

View File

@ -1,67 +0,0 @@
// Copyright (c) 2022, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#include <array>
#include <doctest/doctest.h>
#include "mamba/core/subdirdata.hpp"
#include "mambatests.hpp"
namespace mamba
{
TEST_SUITE("transfer")
{
TEST_CASE("file_not_exist")
{
#ifdef __linux__
auto& context = mambatests::context();
context.output_params.quiet = true;
{
mamba::ChannelContext channel_context{ context };
const mamba::Channel& c = channel_context.make_channel("conda-forge");
mamba::MultiDownloadTarget multi_dl{ context };
mamba::MultiPackageCache pkg_cache({ "/tmp/" }, context.validation_params);
mamba::MSubdirData cf = mamba::MSubdirData::create(
channel_context,
c,
"linux-64",
"file:///nonexistent/repodata.json",
pkg_cache
)
.value();
multi_dl.add(cf.target());
// file:// url should not retry
CHECK_EQ(cf.target()->can_retry(), false);
multi_dl.download(MAMBA_DOWNLOAD_FAILFAST);
// File does not exist
CHECK_EQ(cf.target()->get_result(), 37);
}
{
mamba::ChannelContext channel_context{ context };
const mamba::Channel& c = channel_context.make_channel("conda-forge");
mamba::MultiDownloadTarget multi_dl{ channel_context.context() };
mamba::MultiPackageCache pkg_cache({ "/tmp/" }, context.validation_params);
mamba::MSubdirData cf = mamba::MSubdirData::create(
channel_context,
c,
"noarch",
"file:///nonexistent/repodata.json",
pkg_cache
)
.value();
multi_dl.add(cf.target());
CHECK_THROWS_AS(multi_dl.download(MAMBA_DOWNLOAD_FAILFAST), std::runtime_error);
}
context.output_params.quiet = false;
#endif
}
}
} // namespace mamba

View File

@ -112,6 +112,8 @@ __all__ = [
"SpecBase",
"SpecImpl",
"SubdirData",
"SubdirIndex",
"SubdirIndexEntry",
"TimeRef",
"Transaction",
"cache_fn_url",
@ -829,7 +831,6 @@ class Context:
class DownloadTargetList:
def __init__(self) -> None: ...
def add(self, arg0: SubdirData) -> None: ...
def download(self, arg0: int) -> bool: ...
pass
@ -1546,16 +1547,52 @@ class SpecImpl(SpecBase):
pass
class SubdirData:
def __init__(
self, arg0: Channel, arg1: str, arg2: str, arg3: MultiPackageCache, arg4: str
) -> None: ...
def cache_path(self) -> str: ...
def create_repo(self, arg0: Pool) -> Repo: ...
def download_and_check_targets(self, arg0: DownloadTargetList) -> bool: ...
def finalize_checks(self) -> None: ...
def loaded(self) -> bool: ...
pass
class SubdirIndex:
def __getitem__(self, arg0: int) -> SubdirIndexEntry: ...
def __init__(self) -> None: ...
def __iter__(self) -> typing.Iterator: ...
def __len__(self) -> int: ...
def create(
self,
arg0: Channel,
arg1: str,
arg2: str,
arg3: MultiPackageCache,
arg4: str,
arg5: str,
) -> None: ...
def download(self) -> bool: ...
pass
class SubdirIndexEntry:
def __init__(self) -> None: ...
@property
def channel(self) -> Channel:
"""
:type: Channel
"""
@property
def platform(self) -> str:
"""
:type: str
"""
@property
def subdir(self) -> SubdirData:
"""
:type: SubdirData
"""
@property
def url(self) -> str:
"""
:type: str
"""
pass
class TimeRef:
@typing.overload
def __init__(self) -> None: ...

View File

@ -13,12 +13,15 @@
#include <pybind11/operators.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pybind11/stl_bind.h>
#include "mamba/api/clean.hpp"
#include "mamba/api/configuration.hpp"
#include "mamba/core/channel.hpp"
#include "mamba/core/context.hpp"
#include "mamba/core/download_progress_bar.hpp"
#include "mamba/core/execution.hpp"
#include "mamba/core/fetch.hpp"
#include "mamba/core/output.hpp"
#include "mamba/core/package_handling.hpp"
#include "mamba/core/pool.hpp"
@ -170,6 +173,98 @@ namespace mambapy
};
Singletons singletons;
// MSubdirData objects are movable only, and they need to be moved into
// a std::vector before we call MSudbirData::download. Since we cannot
// replicate the move semantics in Python, we encapsulate the creation
// and the storage of MSubdirData objects in this class, to avoid
// potential dangling references in Python.
class SubdirIndex
{
public:
struct Entry
{
mamba::MSubdirData* p_subdirdata = nullptr;
std::string m_platform = "";
const mamba::Channel* p_channel = nullptr;
std::string m_url = "";
};
using entry_list = std::vector<Entry>;
using iterator = entry_list::const_iterator;
void create(
mamba::ChannelContext& channel_context,
const mamba::Channel& channel,
const std::string& platform,
const std::string& full_url,
mamba::MultiPackageCache& caches,
const std::string& repodata_fn,
const std::string& url
)
{
using namespace mamba;
m_subdirs.push_back(extract(
MSubdirData::create(channel_context, channel, platform, full_url, caches, repodata_fn)
));
m_entries.push_back({ nullptr, platform, &channel, url });
for (size_t i = 0; i < m_subdirs.size(); ++i)
{
m_entries[i].p_subdirdata = &(m_subdirs[i]);
}
}
bool download()
{
using namespace mamba;
// TODO: expose DownloadProgressBar to libmambapy and remove this
// logic
Context& ctx = mambapy::singletons.context();
expected_t<void> download_res;
if (DownloadProgressBar::can_monitor(ctx))
{
DownloadProgressBar check_monitor({ true, true });
DownloadProgressBar index_monitor;
download_res = MSubdirData::download_indexes(
m_subdirs,
ctx,
&check_monitor,
&index_monitor
);
}
else
{
download_res = MSubdirData::download_indexes(m_subdirs, ctx);
}
return download_res.has_value();
}
std::size_t size() const
{
return m_entries.size();
}
const Entry& operator[](std::size_t index) const
{
return m_entries[index];
}
iterator begin() const
{
return m_entries.begin();
}
iterator end() const
{
return m_entries.end();
}
private:
std::vector<mamba::MSubdirData> m_subdirs;
entry_list m_entries;
};
}
PYBIND11_MODULE(bindings, m)
@ -520,47 +615,58 @@ PYBIND11_MODULE(bindings, m)
);
py::class_<MSubdirData>(m, "SubdirData")
.def(py::init(
[](const Channel& channel,
const std::string& platform,
const std::string& url,
MultiPackageCache& caches,
const std::string& repodata_fn) -> MSubdirData
{
auto sres = MSubdirData::create(
mambapy::singletons.channel_context(),
channel,
platform,
url,
caches,
repodata_fn
);
return extract(std::move(sres));
}
))
.def(
"create_repo",
[](MSubdirData& subdir, MPool& pool) -> MRepo
{ return extract(subdir.create_repo(pool)); }
)
.def("loaded", &MSubdirData::loaded)
.def("loaded", &MSubdirData::is_loaded)
.def(
"cache_path",
[](const MSubdirData& self) -> std::string { return extract(self.cache_path()); }
)
);
using mambapy::SubdirIndex;
using SubdirIndexEntry = SubdirIndex::Entry;
py::class_<SubdirIndexEntry>(m, "SubdirIndexEntry")
.def(py::init<>())
.def_readonly("subdir", &SubdirIndexEntry::p_subdirdata, py::return_value_policy::reference)
.def_readonly("platform", &SubdirIndexEntry::m_platform)
.def_readonly("channel", &SubdirIndexEntry::p_channel, py::return_value_policy::reference)
.def_readonly("url", &SubdirIndexEntry::m_url);
py::class_<SubdirIndex>(m, "SubdirIndex")
.def(py::init<>())
.def(
"download_and_check_targets",
[](MSubdirData& self, MultiDownloadTarget& multi_download) -> bool
"create",
[](SubdirIndex& self,
const Channel& channel,
const std::string& platform,
const std::string& full_url,
MultiPackageCache& caches,
const std::string& repodata_fn,
const std::string& url)
{
for (auto& check_target : self.check_targets())
{
multi_download.add(check_target.get());
}
multi_download.download(MAMBA_NO_CLEAR_PROGRESS_BARS);
return self.check_targets().size();
self.create(
mambapy::singletons.channel_context(),
channel,
platform,
full_url,
caches,
repodata_fn,
url
);
}
)
.def("finalize_checks", &MSubdirData::finalize_checks);
.def("download", &SubdirIndex::download)
.def("__len__", &SubdirIndex::size)
.def("__getitem__", &SubdirIndex::operator[])
.def(
"__iter__",
[](SubdirIndex& self) { return py::make_iterator(self.begin(), self.end()); },
py::keep_alive<0, 1>()
);
m.def("cache_fn_url", &cache_fn_url);
m.def("create_cache_dir", &create_cache_dir);
@ -569,10 +675,6 @@ PYBIND11_MODULE(bindings, m)
.def(py::init(
[] { return std::make_unique<MultiDownloadTarget>(mambapy::singletons.context()); }
))
.def(
"add",
[](MultiDownloadTarget& self, MSubdirData& sub) -> void { self.add(sub.target()); }
)
.def("download", &MultiDownloadTarget::download);
py::enum_<ChannelPriority>(m, "ChannelPriority")

View File

@ -57,10 +57,6 @@ def get_index(
# Remove duplicates but retain order
all_channels = list(OrderedDict.fromkeys(all_channels))
dlist = api.DownloadTargetList()
index = []
def fixup_channel_spec(spec):
at_count = spec.count("@")
if at_count > 1:
@ -78,33 +74,15 @@ def get_index(
pkgs_dirs = api.MultiPackageCache(context.pkgs_dirs)
api.create_cache_dir(str(pkgs_dirs.first_writable_path))
index = api.SubdirIndex()
for channel in api.get_channels(all_channels):
for channel_platform, url in channel.platform_urls(with_credentials=True):
full_url = CondaHttpAuth.add_binstar_token(url)
sd = api.SubdirData(
channel, channel_platform, full_url, pkgs_dirs, repodata_fn
index.create(
channel, channel_platform, full_url, pkgs_dirs, repodata_fn, url
)
needs_finalising = sd.download_and_check_targets(dlist)
index.append(
(
sd,
{
"platform": channel_platform,
"url": url,
"channel": channel,
"needs_finalising": needs_finalising,
},
)
)
for sd, info in index:
if info["needs_finalising"]:
sd.finalize_checks()
dlist.add(sd)
is_downloaded = dlist.download(api.MAMBA_DOWNLOAD_FAILFAST)
is_downloaded = index.download()
if not is_downloaded:
raise RuntimeError("Error downloading repodata.")
@ -141,16 +119,16 @@ def load_channels(
subprio_index = len(index)
if has_priority:
# first, count unique channels
n_channels = len(set([entry["channel"].canonical_name for _, entry in index]))
current_channel = index[0][1]["channel"].canonical_name
n_channels = len(set([entry.channel.canonical_name for entry in index]))
current_channel = index[0].channel.canonical_name
channel_prio = n_channels
for subdir, entry in index:
for entry in index:
# add priority here
if has_priority:
if entry["channel"].canonical_name != current_channel:
if entry.channel.canonical_name != current_channel:
channel_prio -= 1
current_channel = entry["channel"].canonical_name
current_channel = entry.channel.canonical_name
priority = channel_prio
else:
priority = 0
@ -160,19 +138,19 @@ def load_channels(
subpriority = subprio_index
subprio_index -= 1
if not subdir.loaded() and entry["platform"] != "noarch":
if not entry.subdir.loaded() and entry.platform != "noarch":
# ignore non-loaded subdir if channel is != noarch
continue
if context.verbosity != 0 and not context.json:
print(
"Channel: {}, platform: {}, prio: {} : {}".format(
entry["channel"], entry["platform"], priority, subpriority
entry.channel, entry.platform, priority, subpriority
)
)
print("Cache path: ", subdir.cache_path())
print("Cache path: ", entry.subdir.cache_path())
repo = subdir.create_repo(pool)
repo = entry.subdir.create_repo(pool)
repo.set_priority(priority, subpriority)
repos.append(repo)
@ -301,13 +279,13 @@ def to_conda_channel(channel, platform):
def to_package_record_from_subjson(entry, pkg, jsn_string):
channel_url = entry["url"]
channel_url = entry.url
info = json.loads(jsn_string)
info["fn"] = pkg
info["channel"] = to_conda_channel(entry["channel"], entry["platform"])
info["channel"] = to_conda_channel(entry.channel, entry.platform)
info["url"] = join_url(channel_url, pkg)
if not info.get("subdir"):
info["subdir"] = entry["platform"]
info["subdir"] = entry.platform
package_record = PackageRecord(**info)
return package_record
@ -360,9 +338,9 @@ def compute_final_precs(
final_precs = IndexedSet(prefix_records)
lookup_dict = {}
for _, entry in index:
for entry in index:
lookup_dict[
entry["channel"].platform_url(entry["platform"], with_credentials=False)
entry.channel.platform_url(entry.platform, with_credentials=False)
] = entry
i_rec: PackageRecord