diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 12:38:35 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 12:38:35 +0200 |
| commit | 5361ee1c77b68bb14237169660840d6d63a74892 (patch) | |
| tree | 3ad259133e09485a14506be38e43ec5b62a050f2 /src/zenutil | |
| parent | move chunking code to zenremotestore lib (#545) (diff) | |
| download | zen-5361ee1c77b68bb14237169660840d6d63a74892.tar.xz zen-5361ee1c77b68bb14237169660840d6d63a74892.zip | |
remove zenutil dependency in zenremotestore (#547)
* remove dependency to zenutil/workerpools.h from remoteprojectstore.cpp
* remove dependency to zenutil/workerpools.h from buildstoragecache.cpp
* remove unneded include
* move jupiter helpers to zenremotestore
* move parallelwork to zencore
* remove zenutil dependency from zenremotestore
* clean up test project dependencies - use indirect dependencies
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupiterclient.h | 56 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupiterhost.h | 35 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupitersession.h | 179 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallelwork.h | 80 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupiterclient.cpp | 28 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupiterhost.cpp | 66 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupitersession.cpp | 871 | ||||
| -rw-r--r-- | src/zenutil/parallelwork.cpp | 264 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 2 |
9 files changed, 0 insertions, 1581 deletions
diff --git a/src/zenutil/include/zenutil/jupiter/jupiterclient.h b/src/zenutil/include/zenutil/jupiter/jupiterclient.h deleted file mode 100644 index 8a51bd60a..000000000 --- a/src/zenutil/include/zenutil/jupiter/jupiterclient.h +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zenbase/refcount.h> -#include <zencore/logging.h> -#include <zenhttp/httpclient.h> - -#include <chrono> - -namespace zen { - -class IoBuffer; - -struct JupiterClientOptions -{ - std::string_view Name; - std::string_view ServiceUrl; - std::string_view DdcNamespace; - std::string_view BlobStoreNamespace; - std::string_view ComputeCluster; - std::chrono::milliseconds ConnectTimeout{5000}; - std::chrono::milliseconds Timeout{}; - bool AssumeHttp2 = false; - bool AllowResume = false; - uint8_t RetryCount = 0; -}; - -/** - * Jupiter upstream cache client - */ -class JupiterClient : public RefCounted -{ -public: - JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider); - ~JupiterClient(); - - std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } - std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } - std::string_view ComputeCluster() const { return m_ComputeCluster; } - std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); } - - LoggerRef Logger() { return m_Log; } - HttpClient& Client() { return m_HttpClient; } - -private: - LoggerRef m_Log; - const std::string m_DefaultDdcNamespace; - const std::string m_DefaultBlobStoreNamespace; - const std::string m_ComputeCluster; - HttpClient m_HttpClient; - - friend class JupiterSession; -}; - -} // namespace zen diff --git a/src/zenutil/include/zenutil/jupiter/jupiterhost.h b/src/zenutil/include/zenutil/jupiter/jupiterhost.h deleted file mode 100644 index 3bbc700b8..000000000 --- a/src/zenutil/include/zenutil/jupiter/jupiterhost.h +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <string> -#include <string_view> -#include <vector> - -namespace zen { - -struct HttpClientSettings; - -struct JupiterServerDiscovery -{ - struct EndPoint - { - std::string Name; - std::string BaseUrl; - bool AssumeHttp2 = false; - }; - std::vector<EndPoint> ServerEndPoints; - std::vector<EndPoint> CacheEndPoints; -}; - -JupiterServerDiscovery DiscoverJupiterEndpoints(std::string_view Host, const HttpClientSettings& ClientSettings); - -struct JupiterEndpointTestResult -{ - bool Success = false; - std::string FailureReason; -}; - -JupiterEndpointTestResult TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2); - -} // namespace zen diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h deleted file mode 100644 index b79790f25..000000000 --- a/src/zenutil/include/zenutil/jupiter/jupitersession.h +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/iohash.h> -#include <zencore/logging.h> -#include <zenhttp/httpclient.h> - -#include <set> - -namespace zen { - -class IoBuffer; - -struct JupiterResult -{ - IoBuffer Response; - uint64_t SentBytes{}; - uint64_t ReceivedBytes{}; - double ElapsedSeconds{}; - int32_t ErrorCode{}; - std::string Reason; - bool Success = false; -}; - -struct PutRefResult : JupiterResult -{ - std::vector<IoHash> Needs; - IoHash RawHash; -}; - -struct FinalizeRefResult : JupiterResult -{ - std::vector<IoHash> Needs; -}; - -struct JupiterExistsResult : JupiterResult -{ - std::set<IoHash> Needs; -}; - -struct GetObjectReferencesResult : JupiterResult -{ - std::set<IoHash> References; -}; - -struct PutBuildPartResult : JupiterResult -{ - std::vector<IoHash> Needs; - IoHash RawHash; -}; - -struct FinalizeBuildPartResult : JupiterResult -{ - std::vector<IoHash> Needs; -}; - -/** - * Context for performing Jupiter operations - * - * Maintains an HTTP connection so that subsequent operations don't need to go - * through the whole connection setup process - * - */ -class JupiterSession -{ -public: - JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect); - ~JupiterSession(); - - JupiterResult Authenticate(); - - JupiterResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); - JupiterResult GetBlob(std::string_view Namespace, const IoHash& Key); - JupiterResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {}); - JupiterResult GetObject(std::string_view Namespace, const IoHash& Key); - JupiterResult GetInlineBlob(std::string_view Namespace, - std::string_view BucketId, - const IoHash& Key, - IoHash& OutPayloadHash, - std::filesystem::path TempFolderPath = {}); - - PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); - JupiterResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); - JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); - JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob); - JupiterResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object); - - FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); - - JupiterResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); - - GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key); - - JupiterResult BlobExists(std::string_view Namespace, const IoHash& Key); - JupiterResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key); - JupiterResult ObjectExists(std::string_view Namespace, const IoHash& Key); - - JupiterExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); - JupiterExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); - JupiterExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys); - - std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); - - JupiterResult ListBuildNamespaces(); - JupiterResult ListBuildBuckets(std::string_view Namespace); - JupiterResult ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload); - JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload); - JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); - JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); - PutBuildPartResult PutBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - std::string_view PartName, - const IoBuffer& Payload); - JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); - JupiterResult PutBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - ZenContentType ContentType, - const CompositeBuffer& Payload); - JupiterResult GetBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - std::filesystem::path TempFolderPath, - uint64_t Offset = 0, - uint64_t Size = (uint64_t)-1); - - JupiterResult PutMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - ZenContentType ContentType, - uint64_t PayloadSize, - std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, - std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems); - JupiterResult GetMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, - std::function<void()>&& OnComplete, - std::vector<std::function<JupiterResult()>>& OutWorkItems); - JupiterResult PutBlockMetadata(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - const IoBuffer& Payload); - FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& RawHash); - JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount); - JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload); - - JupiterResult PutBuildPartStats(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& BuildPartId, - IoBuffer Payload); - -private: - inline LoggerRef Log() { return m_Log; } - - JupiterResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key); - - JupiterExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys); - - LoggerRef m_Log; - HttpClient& m_HttpClient; - const bool m_AllowRedirect = false; -}; - -} // namespace zen diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h deleted file mode 100644 index 05146d644..000000000 --- a/src/zenutil/include/zenutil/parallelwork.h +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/scopeguard.h> -#include <zencore/thread.h> -#include <zencore/workthreadpool.h> - -#include <atomic> - -namespace zen { - -class ParallelWork -{ -public: - ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode); - - ~ParallelWork(); - - typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback; - typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback; - typedef std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)> UpdateCallback; - - void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {}) - { - m_PendingWork.AddCount(1); - try - { - WorkerPool.ScheduleWork( - [this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { - auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); }); - try - { - while (m_PauseFlag && !m_AbortFlag) - { - Sleep(2000); - } - Work(m_AbortFlag); - } - catch (...) - { - OnError(std::current_exception(), m_AbortFlag); - } - }, - m_Mode); - } - catch (const std::exception&) - { - m_PendingWork.CountDown(); - throw; - } - } - - void Abort() { m_AbortFlag = true; } - - bool IsAborted() const { return m_AbortFlag.load(); } - - void Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback); - - void Wait(); - - Latch& PendingWork() { return m_PendingWork; } - -private: - ExceptionCallback DefaultErrorFunction(); - void RethrowErrors(); - - std::atomic<bool>& m_AbortFlag; - std::atomic<bool>& m_PauseFlag; - const WorkerThreadPool::EMode m_Mode; - bool m_DispatchComplete = false; - Latch m_PendingWork; - - RwLock m_ErrorLock; - std::vector<std::exception_ptr> m_Errors; -}; - -void parallellwork_forcelink(); - -} // namespace zen diff --git a/src/zenutil/jupiter/jupiterclient.cpp b/src/zenutil/jupiter/jupiterclient.cpp deleted file mode 100644 index dbac218a4..000000000 --- a/src/zenutil/jupiter/jupiterclient.cpp +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/jupiter/jupiterclient.h> - -namespace zen { - -using namespace std::literals; - -JupiterClient::JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider) -: m_Log(zen::logging::Get("jupiter"sv)) -, m_DefaultDdcNamespace(Options.DdcNamespace) -, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) -, m_ComputeCluster(Options.ComputeCluster) -, m_HttpClient(Options.ServiceUrl, - HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, - .Timeout = Options.Timeout, - .AccessTokenProvider = std::move(TokenProvider), - .AssumeHttp2 = Options.AssumeHttp2, - .AllowResume = Options.AllowResume, - .RetryCount = Options.RetryCount}) -{ -} - -JupiterClient::~JupiterClient() -{ -} - -} // namespace zen diff --git a/src/zenutil/jupiter/jupiterhost.cpp b/src/zenutil/jupiter/jupiterhost.cpp deleted file mode 100644 index d06229cbf..000000000 --- a/src/zenutil/jupiter/jupiterhost.cpp +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/jupiter/jupiterhost.h> - -#include <zencore/compactbinary.h> -#include <zencore/fmtutils.h> -#include <zenhttp/httpclient.h> - -namespace zen { - -JupiterServerDiscovery -DiscoverJupiterEndpoints(std::string_view Host, const HttpClientSettings& ClientSettings) -{ - JupiterServerDiscovery Result; - - HttpClient DiscoveryHttpClient(Host, ClientSettings); - HttpClient::Response ServerInfoResponse = DiscoveryHttpClient.Get("/api/v1/status/servers", HttpClient::Accept(HttpContentType::kJSON)); - if (!ServerInfoResponse.IsSuccess()) - { - ServerInfoResponse.ThrowError(fmt::format("Failed to get list of servers from discovery url '{}'", Host)); - } - std::string_view JsonResponse = ServerInfoResponse.AsText(); - CbObject CbPayload = LoadCompactBinaryFromJson(JsonResponse).AsObject(); - CbArrayView ServerEndpoints = CbPayload["serverEndpoints"].AsArrayView(); - Result.ServerEndPoints.reserve(ServerEndpoints.Num()); - - auto ParseEndPoints = [](CbArrayView ServerEndpoints) -> std::vector<JupiterServerDiscovery::EndPoint> { - std::vector<JupiterServerDiscovery::EndPoint> Result; - - Result.reserve(ServerEndpoints.Num()); - for (CbFieldView ServerEndpointView : ServerEndpoints) - { - CbObjectView ServerEndPoint = ServerEndpointView.AsObjectView(); - Result.push_back(JupiterServerDiscovery::EndPoint{.Name = std::string(ServerEndPoint["name"].AsString()), - .BaseUrl = std::string(ServerEndPoint["baseUrl"].AsString()), - .AssumeHttp2 = ServerEndPoint["baseUrl"].AsBool(false)}); - } - return Result; - }; - - Result.ServerEndPoints = ParseEndPoints(CbPayload["serverEndpoints"].AsArrayView()); - Result.CacheEndPoints = ParseEndPoints(CbPayload["cacheEndpoints"].AsArrayView()); - - return Result; -} - -JupiterEndpointTestResult -TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2) -{ - HttpClientSettings TestClientSettings{.LogCategory = "httpbuildsclient", - .ConnectTimeout = std::chrono::milliseconds{1000}, - .Timeout = std::chrono::milliseconds{2000}, - .AssumeHttp2 = AssumeHttp2, - .AllowResume = true, - .RetryCount = 0}; - - HttpClient TestHttpClient(BaseUrl, TestClientSettings); - HttpClient::Response TestResponse = TestHttpClient.Get("/health/live"); - if (TestResponse.IsSuccess()) - { - return {.Success = true}; - } - return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")}; -} - -} // namespace zen diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp deleted file mode 100644 index 4b594239e..000000000 --- a/src/zenutil/jupiter/jupitersession.cpp +++ /dev/null @@ -1,871 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/jupiter/jupitersession.h> - -#include <zencore/compactbinary.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryutil.h> -#include <zencore/compositebuffer.h> -#include <zencore/compress.h> -#include <zencore/fmtutils.h> -#include <zencore/trace.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <json11.hpp> -ZEN_THIRD_PARTY_INCLUDES_END - -using namespace std::literals; - -namespace zen { - -namespace detail { - JupiterResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) - { - if (Response.Error) - { - return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), - .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), - .ElapsedSeconds = Response.ElapsedSeconds, - .ErrorCode = Response.Error.value().ErrorCode, - .Reason = Response.ErrorMessage(ErrorPrefix), - .Success = false}; - } - if (!Response.IsSuccess()) - { - return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), - .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), - .ElapsedSeconds = Response.ElapsedSeconds, - .ErrorCode = static_cast<int32_t>(Response.StatusCode), - .Reason = Response.ErrorMessage(ErrorPrefix), - .Success = false}; - } - return {.Response = Response.ResponsePayload, - .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), - .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), - .ElapsedSeconds = Response.ElapsedSeconds, - .ErrorCode = 0, - .Success = true}; - } -} // namespace detail - -JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect) -: m_Log(InLog) -, m_HttpClient(InHttpClient) -, m_AllowRedirect(AllowRedirect) -{ -} - -JupiterSession::~JupiterSession() -{ -} - -JupiterResult -JupiterSession::Authenticate() -{ - bool OK = m_HttpClient.Authenticate(); - return {.Success = OK}; -} - -JupiterResult -JupiterSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) -{ - ZEN_TRACE_CPU("JupiterClient::GetRef"); - - HttpClient::Response Response = - m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), {HttpClient::Accept(RefType)}); - - return detail::ConvertResponse(Response, "JupiterSession::GetRef"sv); -} - -JupiterResult -JupiterSession::GetBlob(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::GetBlob"); - HttpClient::Response Response = - m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kBinary)}); - - return detail::ConvertResponse(Response); -} - -JupiterResult -JupiterSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) -{ - ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); - - HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), - TempFolderPath, - {HttpClient::Accept(ZenContentType::kCompressedBinary)}); - - return detail::ConvertResponse(Response); -} - -JupiterResult -JupiterSession::GetInlineBlob(std::string_view Namespace, - std::string_view BucketId, - const IoHash& Key, - IoHash& OutPayloadHash, - std::filesystem::path TempFolderPath) -{ - ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); - - HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), - TempFolderPath, - {{"Accept", "application/x-jupiter-inline"}}); - - JupiterResult Result = detail::ConvertResponse(Response); - - if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) - { - const std::string& PayloadHashHeader = It->second; - if (PayloadHashHeader.length() == IoHash::StringLength) - { - OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); - } - } - - return Result; -} - -JupiterResult -JupiterSession::GetObject(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::GetObject"); - - HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), - {HttpClient::Accept(ZenContentType::kCbObject)}); - - return detail::ConvertResponse(Response); -} - -PutRefResult -JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) -{ - ZEN_TRACE_CPU("JupiterClient::PutRef"); - - Ref.SetContentType(RefType); - - IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); - - HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), - Ref, - {{"X-Jupiter-IoHash", Hash.ToHexString()}}); - - PutRefResult Result = {detail::ConvertResponse(Response)}; - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - Result.RawHash = Hash; - } - return Result; -} - -FinalizeRefResult -JupiterSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) -{ - ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); - - HttpClient::Response Response = - m_HttpClient.Post(fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), - {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); - - FinalizeRefResult Result = {detail::ConvertResponse(Response)}; - - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - } - return Result; -} - -JupiterResult -JupiterSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) -{ - ZEN_TRACE_CPU("JupiterClient::PutBlob"); - - HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - - return detail::ConvertResponse(Response); -} - -JupiterResult -JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) -{ - ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - - Blob.SetContentType(ZenContentType::kCompressedBinary); - HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - - return detail::ConvertResponse(Response); -} - -JupiterResult -JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) -{ - ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - - HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), - Payload, - ZenContentType::kCompressedBinary); - - return detail::ConvertResponse(Response); -} - -JupiterResult -JupiterSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) -{ - ZEN_TRACE_CPU("JupiterClient::PutObject"); - - Object.SetContentType(ZenContentType::kCbObject); - HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); - - return detail::ConvertResponse(Response); -} - -JupiterResult -JupiterSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::RefExists"); - - HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); - - return detail::ConvertResponse(Response); -} - -GetObjectReferencesResult -JupiterSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); - - HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), - {HttpClient::Accept(ZenContentType::kCbObject)}); - - GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; - - if (Result.Success) - { - const CbObject ReferencesResponse = Response.AsObject(); - for (auto& Item : ReferencesResponse["references"sv]) - { - Result.References.insert(Item.AsHash()); - } - } - return Result; -} - -JupiterResult -JupiterSession::BlobExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "blobs"sv, Key); -} - -JupiterResult -JupiterSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); -} - -JupiterResult -JupiterSession::ObjectExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "objects"sv, Key); -} - -JupiterExistsResult -JupiterSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "blobs"sv, Keys); -} - -JupiterExistsResult -JupiterSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); -} - -JupiterExistsResult -JupiterSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "objects"sv, Keys); -} - -std::vector<IoHash> -JupiterSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) -{ - // ExtendableStringBuilder<256> Uri; - // Uri << m_CacheClient->ServiceUrl(); - // Uri << "/api/v1/s/" << Namespace; - - ZEN_UNUSED(Namespace, BucketId, ChunkHashes); - - return {}; -} - -JupiterResult -JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); - - HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); - - return detail::ConvertResponse(Response); -} - -JupiterExistsResult -JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) -{ - ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); - - ExtendableStringBuilder<256> Body; - Body << "["; - for (const auto& Key : Keys) - { - Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; - } - Body << "]"; - IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); - Payload.SetContentType(ZenContentType::kJSON); - - HttpClient::Response Response = - m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), Payload, {HttpClient::Accept(ZenContentType::kCbObject)}); - - JupiterExistsResult Result = {detail::ConvertResponse(Response)}; - - if (Result.Success) - { - const CbObject ExistsResponse = Response.AsObject(); - for (auto& Item : ExistsResponse["needs"sv]) - { - Result.Needs.insert(Item.AsHash()); - } - } - return Result; -} - -JupiterResult -JupiterSession::ListBuildNamespaces() -{ - HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds"), {HttpClient::Accept(ZenContentType::kJSON)}); - return detail::ConvertResponse(Response, "JupiterSession::ListBuildNamespaces"sv); -} - -JupiterResult -JupiterSession::ListBuildBuckets(std::string_view Namespace) -{ - HttpClient::Response Response = - m_HttpClient.Get(fmt::format("/api/v2/builds/{}", Namespace), {HttpClient::Accept(ZenContentType::kJSON)}); - return detail::ConvertResponse(Response, "JupiterSession::ListBuildBuckets"sv); -} - -JupiterResult -JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId); - HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath), - Payload, - {HttpClient::Accept(ZenContentType::kCbObject)}); - return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv); -} - -JupiterResult -JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload); - return detail::ConvertResponse(Response, "JupiterSession::PutBuild"sv); -} - -JupiterResult -JupiterSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) -{ - HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "JupiterSession::GetBuild"sv); -} - -JupiterResult -JupiterSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) -{ - HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId)); - return detail::ConvertResponse(Response, "JupiterSession::FinalizeBuild"sv); -} - -PutBuildPartResult -JupiterSession::PutBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - std::string_view PartName, - const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - - IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); - - HttpClient::Response Response = - m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName), - Payload, - {{"X-Jupiter-IoHash", Hash.ToHexString()}}); - - PutBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::PutBuildPart"sv)}; - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - Result.RawHash = Hash; - } - return Result; -} - -JupiterResult -JupiterSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) -{ - HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId), - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "JupiterSession::GetBuildPart"sv); -} - -JupiterResult -JupiterSession::PutBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - ZenContentType ContentType, - const CompositeBuffer& Payload) -{ - HttpClient::Response Response = - m_HttpClient.Upload(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()), - Payload, - ContentType); - return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv); -} - -JupiterResult -JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - ZenContentType ContentType, - uint64_t PayloadSize, - std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, - std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems) -{ - struct MultipartUploadResponse - { - struct Part - { - uint64_t FirstByte; - uint64_t LastByte; - std::string PartId; - std::string QueryString; - }; - - std::string UploadId; - std::string BlobName; - std::vector<Part> Parts; - - static MultipartUploadResponse Parse(CbObject& Payload) - { - MultipartUploadResponse Result; - Result.UploadId = Payload["uploadId"sv].AsString(); - Result.BlobName = Payload["blobName"sv].AsString(); - CbArrayView PartsArray = Payload["parts"sv].AsArrayView(); - Result.Parts.reserve(PartsArray.Num()); - for (CbFieldView PartView : PartsArray) - { - CbObjectView PartObject = PartView.AsObjectView(); - Result.Parts.emplace_back(Part{ - .FirstByte = PartObject["firstByte"sv].AsUInt64(), - .LastByte = PartObject["lastByte"sv].AsUInt64(), - .PartId = std::string(PartObject["partId"sv].AsString()), - .QueryString = std::string(PartObject["queryString"sv].AsString()), - }); - } - return Result; - } - }; - - CbObjectWriter StartMultipartPayloadWriter; - StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize); - CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save(); - - std::string StartMultipartResponseRequestString = - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/startMultipartUpload", Namespace, BucketId, BuildId, Hash.ToHexString()); - // ZEN_INFO("POST: {}", StartMultipartResponseRequestString); - HttpClient::Response StartMultipartResponse = - m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject)); - if (!StartMultipartResponse.IsSuccess()) - { - ZEN_WARN("{}", StartMultipartResponse.ErrorMessage("startMultipartUpload: ")); - return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); - } - CbValidateError ValidateResult = CbValidateError::None; - CbObject ResponseObject = ValidateAndReadCompactBinaryObject(IoBuffer(StartMultipartResponse.ResponsePayload), ValidateResult); - if (ValidateResult != CbValidateError::None) - { - JupiterResult Result = detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); - Result.ErrorCode = (int32)HttpResponseCode::UnsupportedMediaType; - Result.Reason = fmt::format("Invalid multipart response object format: '{}'", ToString(ValidateResult)); - return Result; - } - - struct WorkloadData - { - MultipartUploadResponse PartDescription; - std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter; - std::atomic<size_t> PartsLeft; - }; - - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - - Workload->PartDescription = MultipartUploadResponse::Parse(ResponseObject); - Workload->Transmitter = std::move(Transmitter); - Workload->PartsLeft = Workload->PartDescription.Parts.size(); - - for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++) - { - OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, Hash, ContentType, Workload, PartIndex]( - bool& OutIsComplete) -> JupiterResult { - const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex]; - IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte); - std::string MultipartUploadResponseRequestString = - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - Part.QueryString, - m_AllowRedirect ? "true"sv : "false"sv); - // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString); - HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload); - if (!MultipartUploadResponse.IsSuccess()) - { - ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(MultipartUploadResponseRequestString)); - } - OutIsComplete = Workload->PartsLeft.fetch_sub(1) == 1; - if (OutIsComplete) - { - int64_t TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; - int64_t TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes; - double TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds; - HttpClient::Response MultipartEndResponse = MultipartUploadResponse; - while (MultipartEndResponse.IsSuccess()) - { - CbObjectWriter CompletePayloadWriter; - CompletePayloadWriter.AddString("blobName"sv, Workload->PartDescription.BlobName); - CompletePayloadWriter.AddString("uploadId"sv, Workload->PartDescription.UploadId); - CompletePayloadWriter.AddBool("isCompressed"sv, ContentType == ZenContentType::kCompressedBinary); - CompletePayloadWriter.BeginArray("partIds"sv); - std::unordered_map<std::string, size_t> PartNameToIndex; - for (size_t UploadPartIndex = 0; UploadPartIndex < Workload->PartDescription.Parts.size(); UploadPartIndex++) - { - const MultipartUploadResponse::Part& PartDescription = Workload->PartDescription.Parts[UploadPartIndex]; - PartNameToIndex.insert({PartDescription.PartId, UploadPartIndex}); - CompletePayloadWriter.AddString(PartDescription.PartId); - } - CompletePayloadWriter.EndArray(); // "partIds" - CbObject CompletePayload = CompletePayloadWriter.Save(); - - std::string MultipartEndResponseRequestString = - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/completeMultipart", Namespace, BucketId, BuildId, Hash.ToHexString()); - - MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString, - CompletePayload, - HttpClient::Accept(ZenContentType::kCbObject)); - TotalUploadedBytes += MultipartEndResponse.UploadedBytes; - TotalDownloadedBytes += MultipartEndResponse.DownloadedBytes; - TotalElapsedSeconds += MultipartEndResponse.ElapsedSeconds; - if (MultipartEndResponse.IsSuccess()) - { - CbObject ResponseObject = MultipartEndResponse.AsObject(); - CbArrayView MissingPartsArrayView = ResponseObject["missingParts"sv].AsArrayView(); - if (MissingPartsArrayView.Num() == 0) - { - break; - } - else - { - for (CbFieldView PartIdView : MissingPartsArrayView) - { - std::string RetryPartId(PartIdView.AsString()); - size_t RetryPartIndex = PartNameToIndex.at(RetryPartId); - const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex]; - IoBuffer RetryPartPayload = - Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1); - std::string RetryMultipartUploadResponseRequestString = - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - RetryPart.QueryString, - m_AllowRedirect ? "true"sv : "false"sv); - - MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload); - TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; - TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes; - TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds; - if (!MultipartUploadResponse.IsSuccess()) - { - ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(RetryMultipartUploadResponseRequestString)); - MultipartEndResponse = MultipartUploadResponse; - } - } - } - } - else - { - ZEN_WARN("{}", MultipartEndResponse.ErrorMessage(MultipartEndResponseRequestString)); - } - } - MultipartEndResponse.UploadedBytes = TotalUploadedBytes; - MultipartEndResponse.DownloadedBytes = TotalDownloadedBytes; - MultipartEndResponse.ElapsedSeconds = TotalElapsedSeconds; - return detail::ConvertResponse(MultipartEndResponse, "JupiterSession::PutMultipartBuildBlob"sv); - } - return detail::ConvertResponse(MultipartUploadResponse, "JupiterSession::PutMultipartBuildBlob"sv); - }); - } - return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); -} - -JupiterResult -JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, - std::function<void()>&& OnComplete, - std::vector<std::function<JupiterResult()>>& OutWorkItems) -{ - std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - m_AllowRedirect ? "true"sv : "false"sv); - HttpClient::Response Response = - m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}})); - if (Response.IsSuccess()) - { - if (std::string_view ContentRange = Response.Header.Entries["Content-Range"]; !ContentRange.empty()) - { - if (std::string_view::size_type SizeDelimiterPos = ContentRange.find('/'); SizeDelimiterPos != std::string_view::npos) - { - if (std::optional<uint64_t> TotalSizeMaybe = ParseInt<uint64_t>(ContentRange.substr(SizeDelimiterPos + 1)); - TotalSizeMaybe.has_value()) - { - uint64_t TotalSize = TotalSizeMaybe.value(); - uint64_t PayloadSize = Response.ResponsePayload.GetSize(); - - OnReceive(0, Response.ResponsePayload); - - if (TotalSize > PayloadSize) - { - struct WorkloadData - { - std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; - std::function<void()> OnComplete; - std::atomic<uint64_t> BytesRemaining; - }; - - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - Workload->OnReceive = std::move(OnReceive); - Workload->OnComplete = std::move(OnComplete); - Workload->BytesRemaining = TotalSize - PayloadSize; - - uint64_t Offset = PayloadSize; - while (Offset < TotalSize) - { - uint64_t PartSize = Min(ChunkSize, TotalSize - Offset); - OutWorkItems.emplace_back([this, - Namespace = std::string(Namespace), - BucketId = std::string(BucketId), - BuildId = Oid(BuildId), - Hash = IoHash(Hash), - TotalSize, - Workload, - Offset, - PartSize]() -> JupiterResult { - std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - m_AllowRedirect ? "true"sv : "false"sv); - HttpClient::Response Response = m_HttpClient.Get( - RequestUrl, - HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); - if (Response.IsSuccess()) - { - Workload->OnReceive(Offset, Response.ResponsePayload); - uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); - if (ByteRemaning == Response.ResponsePayload.GetSize()) - { - Workload->OnComplete(); - } - } - return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); - }); - Offset += PartSize; - } - } - else - { - OnComplete(); - } - return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); - } - } - } - OnReceive(0, Response.ResponsePayload); - OnComplete(); - } - return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); -} - -JupiterResult -JupiterSession::GetBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - std::filesystem::path TempFolderPath, - uint64_t Offset, - uint64_t Size) -{ - HttpClient::KeyValueMap Headers; - if (Offset != 0 || Size != (uint64_t)-1) - { - Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)}); - } - HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - m_AllowRedirect ? "true"sv : "false"sv), - TempFolderPath, - Headers); - if (Response.IsSuccess()) - { - // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it - if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary)) - { - IoHash ValidateRawHash; - uint64_t ValidateRawSize = 0; - ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize)); - ZEN_ASSERT_SLOW(ValidateRawHash == Hash); - ZEN_ASSERT_SLOW(ValidateRawSize > 0); - ZEN_UNUSED(ValidateRawHash, ValidateRawSize); - Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary); - } - } - return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); -} - -JupiterResult -JupiterSession::PutBlockMetadata(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = - m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, Hash.ToHexString()), - Payload); - return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv); -} - -FinalizeBuildPartResult -JupiterSession::FinalizeBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& RawHash) -{ - HttpClient::Response Response = m_HttpClient.Post( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()), - HttpClient::Accept(ZenContentType::kCbObject)); - - FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::FinalizeBuildPart"sv)}; - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - } - return Result; -} - -JupiterResult -JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount) -{ - const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount); - HttpClient::Response Response = - m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters), - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv); -} - -JupiterResult -JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = - m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId), - Payload, - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv); -} - -JupiterResult -JupiterSession::PutBuildPartStats(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& BuildPartId, - IoBuffer Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = - m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/stats", Namespace, BucketId, BuildId, BuildPartId), - Payload, - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "JupiterSession::PutBuildPartStats"sv); -} - -} // namespace zen diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp deleted file mode 100644 index 95417078a..000000000 --- a/src/zenutil/parallelwork.cpp +++ /dev/null @@ -1,264 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/parallelwork.h> - -#include <zencore/callstack.h> -#include <zencore/except.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> - -#include <typeinfo> - -#if ZEN_WITH_TESTS -# include <zencore/testing.h> -#endif // ZEN_WITH_TESTS - -namespace zen { - -ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag, WorkerThreadPool::EMode Mode) -: m_AbortFlag(AbortFlag) -, m_PauseFlag(PauseFlag) -, m_Mode(Mode) -, m_PendingWork(1) -{ -} - -ParallelWork::~ParallelWork() -{ - try - { - if (!m_DispatchComplete) - { - ZEN_ASSERT(m_PendingWork.Remaining() > 0); - ZEN_WARN( - "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads " - "to complete"); - m_PendingWork.CountDown(); - m_DispatchComplete = true; - } - const bool WaitSucceeded = m_PendingWork.Wait(); - const ptrdiff_t RemainingWork = m_PendingWork.Remaining(); - if (!WaitSucceeded) - { - ZEN_ERROR("ParallelWork::~ParallelWork(): waiting for latch failed, pending work count at {}", RemainingWork); - } - if (RemainingWork != 0) - { - void* Frames[8]; - uint32_t FrameCount = GetCallstack(2, 8, Frames); - CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames); - auto _ = MakeGuard([Callstack]() { FreeCallstack(Callstack); }); - ZEN_WARN("ParallelWork::~ParallelWork(): waited for outstanding work but pending work count is {} instead of 0", RemainingWork); - - uint32_t WaitedMs = 0; - while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000) - { - Sleep(50); - WaitedMs += 50; - } - ptrdiff_t RemainingWorkAfterSafetyWait = m_PendingWork.Remaining(); - if (RemainingWorkAfterSafetyWait != 0) - { - ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks failed, pending work count at {} after {}\n{}", - RemainingWork, - RemainingWorkAfterSafetyWait, - NiceLatencyNs(WaitedMs * 1000000u), - CallstackToString(Callstack, " ")) - } - else - { - ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks completed after {}\n{}", - RemainingWork, - NiceLatencyNs(WaitedMs * 1000000u), - CallstackToString(Callstack, " ")); - } - } - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Exception in ParallelWork::~ParallelWork(): {}", Ex.what()); - } -} - -ParallelWork::ExceptionCallback -ParallelWork::DefaultErrorFunction() -{ - return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) { - m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); - AbortFlag = true; - }; -} - -void -ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) -{ - ZEN_ASSERT(!m_DispatchComplete); - ZEN_ASSERT(m_PendingWork.Remaining() > 0); - m_PendingWork.CountDown(); - m_DispatchComplete = true; - - while (!m_PendingWork.Wait(UpdateIntervalMS)) - { - UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining()); - } - - RethrowErrors(); -} - -void -ParallelWork::Wait() -{ - ZEN_ASSERT(!m_DispatchComplete); - ZEN_ASSERT(m_PendingWork.Remaining() > 0); - m_PendingWork.CountDown(); - m_DispatchComplete = true; - - const bool WaitSucceeded = m_PendingWork.Wait(); - const ptrdiff_t RemainingWork = m_PendingWork.Remaining(); - if (!WaitSucceeded) - { - ZEN_ERROR("ParallelWork::Wait(): waiting for latch failed, pending work count at {}", RemainingWork); - } - else if (RemainingWork != 0) - { - ZEN_ERROR("ParallelWork::Wait(): pending work count at {} after successful wait for latch", RemainingWork); - } - - RethrowErrors(); -} - -void -ParallelWork::RethrowErrors() -{ - if (!m_Errors.empty()) - { - if (m_Errors.size() > 1) - { - ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:"); - auto It = m_Errors.begin() + 1; - while (It != m_Errors.end()) - { - try - { - std::rethrow_exception(*It); - } - catch (const std::exception& Ex) - { - ZEN_INFO(" {}", Ex.what()); - } - It++; - } - } - std::exception_ptr Ex = m_Errors.front(); - m_Errors.clear(); - std::rethrow_exception(Ex); - } -} - -#if ZEN_WITH_TESTS - -TEST_CASE("parallellwork.nowork") -{ - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - Work.Wait(); -} - -TEST_CASE("parallellwork.basic") -{ - WorkerThreadPool WorkerPool(2); - - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - for (uint32_t I = 0; I < 5; I++) - { - Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); }); - } - Work.Wait(); -} - -TEST_CASE("parallellwork.throws_in_work") -{ - WorkerThreadPool WorkerPool(2); - - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - for (uint32_t I = 0; I < 10; I++) - { - Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) { - ZEN_UNUSED(AbortFlag); - if (I > 3) - { - throw std::runtime_error("We throw in async thread"); - } - else - { - Sleep(10); - } - }); - } - CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread"); -} - -TEST_CASE("parallellwork.throws_in_dispatch") -{ - WorkerThreadPool WorkerPool(2); - std::atomic<uint32_t> ExecutedCount; - try - { - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - for (uint32_t I = 0; I < 5; I++) - { - Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) { - if (AbortFlag.load()) - { - return; - } - ExecutedCount++; - }); - if (I == 3) - { - throw std::runtime_error("We throw in dispatcher thread"); - } - } - CHECK(false); - } - catch (const std::runtime_error& Ex) - { - CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what())); - CHECK_LE(ExecutedCount.load(), 4); - } -} - -TEST_CASE("parallellwork.limitqueue") -{ - WorkerThreadPool WorkerPool(2); - - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::DisableBacklog); - for (uint32_t I = 0; I < 5; I++) - { - Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { - if (AbortFlag.load()) - { - return; - } - Sleep(10); - }); - } - Work.Wait(); -} - -void -parallellwork_forcelink() -{ -} -#endif // ZEN_WITH_TESTS - -} // namespace zen diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index 88be8a244..586fd1513 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -7,7 +7,6 @@ # include <zenutil/cache/cacherequests.h> # include <zenutil/cache/rpcrecording.h> # include <zenutil/commandlineoptions.h> -# include <zenutil/parallelwork.h> # include <zenutil/wildcard.h> namespace zen { @@ -19,7 +18,6 @@ zenutil_forcelinktests() cache::rpcrecord_forcelink(); cacherequests_forcelink(); commandlineoptions_forcelink(); - parallellwork_forcelink(); wildcard_forcelink(); } |