Add mirrors (#2795)

* Add mirrors

* Add test for split_path_tag

* Return string instead of vector when getting auth header

* Simplify data callback

* Update comment
This commit is contained in:
Hind-M 2023-11-07 05:29:36 +01:00 committed by GitHub
parent 8683078464
commit 2b948ce1cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 709 additions and 3 deletions

View File

@ -162,6 +162,7 @@ set(LIBMAMBA_SOURCES
${LIBMAMBA_SOURCE_DIR}/core/history.cpp
${LIBMAMBA_SOURCE_DIR}/core/match_spec.cpp
${LIBMAMBA_SOURCE_DIR}/core/menuinst.cpp
${LIBMAMBA_SOURCE_DIR}/core/mirror.cpp
${LIBMAMBA_SOURCE_DIR}/core/output.cpp
${LIBMAMBA_SOURCE_DIR}/core/package_handling.cpp
${LIBMAMBA_SOURCE_DIR}/core/package_cache.cpp
@ -259,6 +260,7 @@ set(LIBMAMBA_PUBLIC_HEADERS
${LIBMAMBA_INCLUDE_DIR}/mamba/core/link.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/match_spec.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/menuinst.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/mirror.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/output.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/package_cache.hpp
${LIBMAMBA_INCLUDE_DIR}/mamba/core/package_fetcher.hpp

View File

@ -1,4 +1,4 @@
// Copyright (c) 2019, QuantStack and Mamba Contributors
// Copyright (c) 2023, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//

View File

@ -0,0 +1,182 @@
// Copyright (c) 2023, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#ifndef MAMBA_CORE_MIRROR_HPP
#define MAMBA_CORE_MIRROR_HPP
#include <chrono>
#include <cstddef>
#include <map>
#include <memory>
#include <string>
#include <vector>
namespace mamba
{
class CURLHandle;
enum class Protocol
{
FILE,
FTP,
HTTP,
OCI,
};
// Statistics of mirror
struct MirrorStats
{
// Maximum number of allowed parallel connections to this mirror. -1 means no
// limit. Dynamically adjusted (decreased) if no fatal (temporary) error will
// occur.
// TODO To be set using `ctx.threads_params.download_threads`
// Do we need it as an arg in Mirror constructor?
// Anyway, it will need to be linked to (or replacing?) `m_max_parallel_downloads` in
// CURLMultiHandle
// Same for the other stats?
long allowed_parallel_connections = -1;
// The maximum number of tried parallel connections to this mirror
// (including unsuccessful).
std::size_t max_tried_parallel_connections = 0;
// How many transfers from this mirror are currently in progress.
std::size_t running_transfers = 0;
// How many transfers was finished successfully from the mirror.
std::size_t successful_transfers = 0;
// How many transfers failed.
std::size_t failed_transfers = 0;
// Maximum ranges supported in a single request. This will be automatically
// adjusted when mirrors respond with 200 to a range request
// zchunk case
std::size_t max_ranges = 256;
};
class Mirror
{
public:
Mirror(const std::string& url);
virtual ~Mirror();
Mirror(const Mirror&) = delete;
Mirror& operator=(const Mirror&) = delete;
Mirror(Mirror&&) = delete;
Mirror& operator=(Mirror&&) = delete;
Protocol protocol() const;
const std::string& url() const;
std::chrono::system_clock::time_point next_retry() const;
const MirrorStats& stats() const;
void set_max_ranges(std::size_t max_ranges);
void set_allowed_parallel_connections(long allowed_parallel_connections);
bool need_wait_for_retry() const;
bool has_running_transfers() const;
void increase_running_transfers();
bool is_parallel_connections_limited_and_reached() const;
void update_statistics(bool transfer_success);
virtual std::string format_url(const std::string& path) const;
virtual std::string get_auth_header(const std::string& path) const;
virtual bool needs_preparation(const std::string& path) const;
virtual void prepare(const std::string& path, CURLHandle& handle);
protected:
Protocol m_protocol;
private:
const std::string m_url;
// TODO put these in a struct?
// Retry & backoff params
std::chrono::system_clock::time_point m_next_retry;
std::chrono::system_clock::duration m_retry_wait_seconds;
std::size_t m_retry_backoff_factor;
// count number of retries (this is not the same as failed transfers, as mutiple
// transfers can be started at the same time, but should all be retried only once)
std::size_t m_retry_counter;
MirrorStats m_stats;
};
class HTTPMirror : public Mirror
{
public:
HTTPMirror(const std::string& url);
~HTTPMirror();
bool authenticate(CURLHandle& handle, const std::string& user, const std::string& password);
};
// Utility function
// TODO leave it here or put it in a namespace, or move it to utils?
std::pair<std::string, std::string> split_path_tag(const std::string& path);
class OCIMirror : public Mirror
{
public:
using proxy_map_type = std::map<std::string, std::string>;
OCIMirror(const std::string& host, const std::string& repo_prefix, const proxy_map_type& proxy_map);
OCIMirror(
const std::string& host,
const std::string& repo_prefix,
const std::string& scope,
const std::string& username,
const std::string& password,
const proxy_map_type& proxy_map
);
~OCIMirror();
std::string get_repo(const std::string& repo) const;
std::string get_auth_url(const std::string& repo, const std::string& scope) const;
std::string get_manifest_url(const std::string& repo, const std::string& reference) const;
std::string get_preupload_url(const std::string& repo) const;
bool need_auth() const;
std::string format_url(const std::string& path) const override;
std::string get_auth_header(const std::string& path) const override;
bool needs_preparation(const std::string& path) const override;
void prepare(const std::string& path, CURLHandle& handle) override;
private:
struct AuthData
{
std::string sha256sum; // TODO what about other checksums types? i.e md5
std::string token;
};
std::map<std::string, std::unique_ptr<AuthData>> m_path_map;
std::string m_repo_prefix;
std::string m_scope;
std::string m_username;
std::string m_password;
// TODO do we really need this here? (only needs in `prepare` so far, add it as an arg
// there?)
proxy_map_type m_proxy_map;
AuthData* get_data(const std::string& path) const;
};
} // namespace mamba
#endif // MAMBA_CORE_MIRROR_HPP

View File

@ -445,6 +445,21 @@ namespace mamba
return *this;
}
CURLHandle& CURLHandle::set_url(const std::string& url, const proxy_map_type& proxies)
{
set_opt(CURLOPT_URL, url);
const auto match = proxy_match(url, proxies);
if (match)
{
set_opt(CURLOPT_PROXY, match.value());
}
else
{
set_opt(CURLOPT_PROXY, nullptr);
}
return *this;
}
const char* CURLHandle::get_error_buffer() const
{
return m_errorbuffer.data();

View File

@ -7,6 +7,7 @@
#ifndef MAMBA_CURL_HPP
#define MAMBA_CURL_HPP
#include <map>
#include <optional>
#include <stdexcept>
#include <string_view>
@ -99,6 +100,8 @@ struct std::hash<mamba::CURLId>
namespace mamba
{
using proxy_map_type = std::map<std::string, std::string>;
class CURLHandle
{
public:
@ -137,6 +140,8 @@ namespace mamba
CURLHandle& set_opt_header();
CURLHandle& set_url(const std::string& url, const proxy_map_type& proxies);
const char* get_error_buffer() const;
std::string get_curl_effective_url() const;

View File

@ -1,3 +1,9 @@
// Copyright (c) 2023, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#include "mamba/core/download.hpp"
#include "mamba/core/invoke.hpp"
#include "mamba/core/thread_utils.hpp"

View File

@ -1,4 +1,4 @@
// Copyright (c) 2019, QuantStack and Mamba Contributors
// Copyright (c) 2023, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//

View File

@ -0,0 +1,444 @@
// Copyright (c) 2023, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#include <fmt/format.h>
#include "mamba/core/mirror.hpp"
#include "mamba/core/output.hpp"
#include "mamba/util/string.hpp"
#include "mamba/util/url_manip.hpp"
#include "curl.hpp"
namespace mamba
{
/*************************
* Mirror implementation *
*************************/
Mirror::Mirror(const std::string& url)
: m_protocol(Protocol::HTTP)
, m_url(std::string(util::rstrip(url, '/')))
, m_next_retry(std::chrono::system_clock::now())
, m_retry_wait_seconds(std::chrono::milliseconds(200))
, m_retry_backoff_factor(2)
, m_retry_counter(0)
{
}
Mirror::~Mirror() = default;
Protocol Mirror::protocol() const
{
return m_protocol;
}
const std::string& Mirror::url() const
{
return m_url;
}
std::chrono::system_clock::time_point Mirror::next_retry() const
{
return m_next_retry;
}
const MirrorStats& Mirror::stats() const
{
return m_stats;
}
void Mirror::set_max_ranges(std::size_t max_ranges)
{
m_stats.max_ranges = max_ranges;
}
void Mirror::set_allowed_parallel_connections(long allowed_parallel_connections)
{
m_stats.allowed_parallel_connections = allowed_parallel_connections;
}
bool Mirror::need_wait_for_retry() const
{
return m_retry_counter != 0 && m_next_retry > std::chrono::system_clock::now();
}
bool Mirror::has_running_transfers() const
{
return m_stats.running_transfers > 0;
}
void Mirror::increase_running_transfers()
{
m_stats.running_transfers++;
if (m_stats.max_tried_parallel_connections < m_stats.running_transfers)
{
m_stats.max_tried_parallel_connections = m_stats.running_transfers;
}
}
bool Mirror::is_parallel_connections_limited_and_reached() const
{
return m_stats.allowed_parallel_connections != -1
&& m_stats.running_transfers >= std::size_t(m_stats.allowed_parallel_connections);
}
void Mirror::update_statistics(bool transfer_success)
{
m_stats.running_transfers--;
if (transfer_success)
{
m_stats.successful_transfers++;
}
else
{
m_stats.failed_transfers++;
if (m_stats.failed_transfers == 1 || m_next_retry < std::chrono::system_clock::now())
{
m_retry_counter++;
m_retry_wait_seconds = m_retry_wait_seconds * m_retry_backoff_factor;
m_next_retry = std::chrono::system_clock::now() + m_retry_wait_seconds;
}
}
}
std::string Mirror::format_url(const std::string& path) const
{
return util::join_url(m_url, path);
}
std::string Mirror::get_auth_header(const std::string&) const
{
return {};
}
bool Mirror::needs_preparation(const std::string&) const
{
return false;
}
void Mirror::prepare(const std::string&, CURLHandle&)
{
}
/*****************************
* HTTPMirror implementation *
*****************************/
HTTPMirror::HTTPMirror(const std::string& url)
: Mirror(url)
{
}
HTTPMirror::~HTTPMirror() = default;
// TODO Maybe move this curl stuff somewhere else?
// Or remove it if authentication won't be used
bool
HTTPMirror::authenticate(CURLHandle& handle, const std::string& user, const std::string& password)
{
if (!password.empty() && !user.empty())
{
handle.set_opt(CURLOPT_USERNAME, user);
handle.set_opt(CURLOPT_PASSWORD, password);
return true;
}
else
{
LOG_WARNING << "Cannot authenticate: user or password empty";
return false;
}
}
// Utility function
std::pair<std::string, std::string> split_path_tag(const std::string& path)
{
// for OCI, if we have a filename like "xtensor-0.23.10-h2acdbc0_0.tar.bz2"
// we want to split it to `xtensor:0.23.10-h2acdbc0-0`
if (util::ends_with(path, ".json"))
{
return { path, "latest" };
}
std::pair<std::string, std::string> result;
auto parts = util::rsplit(path, "-", 2);
if (parts.size() < 2)
{
LOG_ERROR << "Could not split filename into enough parts";
throw std::runtime_error("Could not split filename into enough parts");
}
result.first = parts[0];
std::string tag;
if (parts.size() > 2)
{
std::string last_part = parts[2].substr(0, parts[2].find_first_of("."));
tag = fmt::format("{}-{}", parts[1], last_part);
}
else
{
tag = parts[1];
}
util::replace_all(tag, "_", "-");
result.second = tag;
LOG_INFO << "Splitting " << path << " to name: " << result.first << " tag: " << result.second;
return result;
}
/*****************************
* OCIMirror implementation *
*****************************/
// OCI Mirror:
// When knowing the SHA256 we can directly get to the blob
// When we do not know the SHA256 sum, we need to find the `latest` or some
// other blob
// OCI upload process
// 4 steps:
// - first get auth token with push rights
// - then
// This is what an OCI manifest (index) looks like:
// {
// "schemaVersion": 2,
// "config": {
// "mediaType": "application/vnd.unknown.config.v1+json",
// "digest":
// "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
// "size": 0
// },
// "layers": [
// {
// "mediaType": "application/vnd.unknown.layer.v1+txt",
// "digest":
// "sha256:c5be3ea75353851e1fcf3a298af3b6cfd2af3d7ff018ce52657b6dbd8f986aa4",
// "size": 13,
// "annotations": {
// "org.opencontainers.image.title": "artifact.txt"
// }
// }
// ]
// }
// Used for pull/read (doesn't need auth)
OCIMirror::OCIMirror(
const std::string& host,
const std::string& repo_prefix,
const proxy_map_type& proxy_map
)
: Mirror(host)
, m_repo_prefix(repo_prefix)
, m_scope("pull")
, m_proxy_map(proxy_map)
{
m_protocol = Protocol::OCI;
}
OCIMirror::OCIMirror(
const std::string& host,
const std::string& repo_prefix,
const std::string& scope,
const std::string& username,
const std::string& password,
const proxy_map_type& proxy_map
)
: Mirror(host)
, m_repo_prefix(repo_prefix)
, m_scope(scope)
, m_username(username)
, m_password(password)
, m_proxy_map(proxy_map)
{
m_protocol = Protocol::OCI;
}
OCIMirror::~OCIMirror() = default;
std::string OCIMirror::get_repo(const std::string& repo) const
{
if (!m_repo_prefix.empty())
{
return fmt::format("{}/{}", m_repo_prefix, repo);
}
else
{
return repo;
}
}
std::string OCIMirror::get_auth_url(const std::string& repo, const std::string& scope) const
{
return fmt::format("{}/token?scope=repository:{}:{}", this->url(), get_repo(repo), scope);
}
std::string OCIMirror::get_manifest_url(const std::string& repo, const std::string& reference) const
{
return fmt::format("{}/v2/{}/manifests/{}", this->url(), get_repo(repo), reference);
}
std::string OCIMirror::get_preupload_url(const std::string& repo) const
{
return fmt::format("{}/v2/{}/blobs/uploads/", this->url(), get_repo(repo));
}
bool OCIMirror::need_auth() const
{
return !m_username.empty() && !m_password.empty();
}
std::string OCIMirror::format_url(const std::string& path) const
{
// TODO Maybe we would need a std::string checksum to pass as arg (to be empty in Mirror)
// (in case the map data is not set yet)
// to be checked in use/call of this method
auto* data = get_data(path);
if (data)
{
auto [split_path, split_tag] = split_path_tag(path);
// Should be this format:
// https://ghcr.io/v2/wolfv/artifact/blobs/sha256:c5be3ea75353851e1fcf3a298af3b6cfd2af3d7ff018ce52657b6dbd8f986aa4
return fmt::format(
"{}/v2/{}/blobs/sha256:{}",
this->url(),
get_repo(split_path),
data->sha256sum
);
}
else
{
LOG_ERROR << "Checksum corresponding to " << path << " is not available";
return {};
}
}
std::string OCIMirror::get_auth_header(const std::string& path) const
{
if (m_username.empty() && m_password.empty())
{
return {};
}
auto* data = get_data(path);
if (data && !data->token.empty())
{
return fmt::format("Authorization: Bearer {}", data->token);
}
else
{
return {};
}
}
bool OCIMirror::needs_preparation(const std::string& path) const
{
auto* data = get_data(path);
if ((!data || (data && data->token.empty())) && need_auth())
{
return true;
}
if (!data || (data && data->sha256sum.empty()))
{
return true;
}
return false;
}
void OCIMirror::prepare(const std::string& path, CURLHandle& handle)
{
auto [split_path, split_tag] = split_path_tag(path);
auto* data = get_data(path);
if (!data)
{
m_path_map[split_path].reset(new AuthData);
data = m_path_map[split_path].get();
}
if (data && data->token.empty() && need_auth())
{
std::string auth_url = get_auth_url(split_path, m_scope);
// TODO to be checked
// Not sure this is necessary
// Do we need to call curl config beforehand using the right args in
// `configure_curl_handle`
handle.set_url(auth_url, m_proxy_map);
// IMPORTANT: Make sure the callbacks are set before calling this
// i.e CURLOPT_HEADERFUNCTION, CURLOPT_HEADERDATA, CURLOPT_WRITEFUNCTION,
// CURLOPT_WRITEDATA
if (!m_username.empty())
{
handle.set_opt(CURLOPT_USERNAME, m_username);
}
if (!m_password.empty())
{
handle.set_opt(CURLOPT_PASSWORD, m_password);
}
auto set_data_callback =
[&data](bool success, const std::string& token, const std::string&) // Args got as
// a response
// to transfer
{
if (success && !token.empty())
{
data->token = token;
}
};
// TODO set here a callback to `set_data_callback` in CURLHandle or other relevant
// class which performs the transfer finalization and sets data token
// handle.set_end_callback(set_data_callback);
}
else
{
std::string manifest_url = get_manifest_url(split_path, split_tag);
handle.set_url(manifest_url, m_proxy_map)
.add_header(get_auth_header(path))
.add_header("Accept: application/vnd.oci.image.manifest.v1+json");
auto set_data_callback =
[&data](bool success, const std::string&, const std::string& digest)
{
// digest is got from a json response like
// j["layers"][0]["digest"]
if (success && !digest.empty())
{
assert(util::starts_with(digest, "sha256:"));
data->sha256sum = digest.substr(sizeof("sha256:") - 1);
}
};
// TODO set here a callback to `set_data_callback` in CURLHandle or other
// relevant class which performs the transfer finalization and sets data sha256sum
// handle.set_end_callback(set_data_callback);
}
}
OCIMirror::AuthData* OCIMirror::get_data(const std::string& path) const
{
auto [split_path, _] = split_path_tag(path);
auto it = m_path_map.find(split_path);
if (it != m_path_map.end())
{
return it->second.get();
}
return nullptr;
}
} // namespace mamba

View File

@ -56,6 +56,7 @@ set(LIBMAMBA_TEST_SRCS
src/core/test_environments_manager.cpp
src/core/test_history.cpp
src/core/test_lockfile.cpp
src/core/test_mirror.cpp
src/core/test_pinning.cpp
src/core/test_output.cpp
src/core/test_progress_bar.cpp

View File

@ -1,4 +1,4 @@
// Copyright (c) 2022, QuantStack and Mamba Contributors
// Copyright (c) 2023, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//

View File

@ -0,0 +1,51 @@
// Copyright (c) 2023, QuantStack and Mamba Contributors
//
// Distributed under the terms of the BSD 3-Clause License.
//
// The full license is in the file LICENSE, distributed with this software.
#include <doctest/doctest.h>
#include "mamba/core/mirror.hpp"
namespace mamba
{
TEST_SUITE("mirror")
{
TEST_CASE("split_path_tag")
{
SUBCASE("tar_bz2_extension")
{
auto [split_path, split_tag] = split_path_tag("xtensor-0.23.10-h2acdbc0_0.tar.bz2");
CHECK_EQ(split_path, "xtensor");
CHECK_EQ(split_tag, "0.23.10-h2acdbc0-0");
}
SUBCASE("multiple_parts")
{
auto [split_path, split_tag] = split_path_tag("x-tensor-10.23.10-h2acdbc0_0.tar.bz2");
CHECK_EQ(split_path, "x-tensor");
CHECK_EQ(split_tag, "10.23.10-h2acdbc0-0");
}
SUBCASE("more_multiple_parts")
{
auto [split_path, split_tag] = split_path_tag("x-tens-or-10.23.10-h2acdbc0_0.tar.bz2");
CHECK_EQ(split_path, "x-tens-or");
CHECK_EQ(split_tag, "10.23.10-h2acdbc0-0");
}
SUBCASE("json_extension")
{
auto [split_path, split_tag] = split_path_tag("xtensor-0.23.10-h2acdbc0_0.json");
CHECK_EQ(split_path, "xtensor-0.23.10-h2acdbc0_0.json");
CHECK_EQ(split_tag, "latest");
}
SUBCASE("not_enough_parts")
{
CHECK_THROWS_AS(split_path_tag("xtensor.tar.bz2"), std::runtime_error);
}
}
}
}