diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 09:38:05 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 09:38:05 +0200 |
| commit | f5727a1e4d6bfb833e37e1210691d351456dbe3a (patch) | |
| tree | cbf7688c4e31ba7db429a98c7c9f813fa0a826be /src/zenserver/projectstore | |
| parent | move projectstore to zenstore (#541) (diff) | |
| download | zen-f5727a1e4d6bfb833e37e1210691d351456dbe3a.tar.xz zen-f5727a1e4d6bfb833e37e1210691d351456dbe3a.zip | |
move remoteproject to remotestorelib (#542)
* move remoteproject code to remotestorelib
Diffstat (limited to 'src/zenserver/projectstore')
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.cpp | 545 | ||||
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.h | 32 | ||||
| -rw-r--r-- | src/zenserver/projectstore/fileremoteprojectstore.cpp | 341 | ||||
| -rw-r--r-- | src/zenserver/projectstore/fileremoteprojectstore.h | 20 | ||||
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 13 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.cpp | 402 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.h | 32 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 3519 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 178 | ||||
| -rw-r--r-- | src/zenserver/projectstore/zenremoteprojectstore.cpp | 336 | ||||
| -rw-r--r-- | src/zenserver/projectstore/zenremoteprojectstore.h | 18 |
11 files changed, 5 insertions, 5431 deletions
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp deleted file mode 100644 index b8b499f7f..000000000 --- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp +++ /dev/null @@ -1,545 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "buildsremoteprojectstore.h" - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compress.h> -#include <zencore/fmtutils.h> -#include <zencore/scopeguard.h> - -#include <zenhttp/httpclientauth.h> -#include <zenutil/jupiter/jupiterbuildstorage.h> - -namespace zen { - -using namespace std::literals; - -static const std::string_view OplogContainerPartName = "oplogcontainer"sv; - -class BuildsRemoteStore : public RemoteProjectStore -{ -public: - BuildsRemoteStore(std::unique_ptr<BuildStorage::Statistics>&& BuildStorageStats, - std::unique_ptr<HttpClient>&& BuildStorageHttp, - std::unique_ptr<BuildStorage>&& BuildStorage, - std::string_view Url, - std::string_view Namespace, - std::string_view Bucket, - const Oid& BuildId, - const IoBuffer& MetaData, - bool ForceDisableBlocks, - bool ForceDisableTempBlocks) - : m_BuildStorageStats(std::move(BuildStorageStats)) - , m_BuildStorageHttp(std::move(BuildStorageHttp)) - , m_BuildStorage(std::move(BuildStorage)) - , m_Url(Url) - , m_Namespace(Namespace) - , m_Bucket(Bucket) - , m_BuildId(BuildId) - , m_MetaData(MetaData) - , m_EnableBlocks(!ForceDisableBlocks) - , m_UseTempBlocks(!ForceDisableTempBlocks) - { - m_MetaData.MakeOwned(); - } - - virtual RemoteStoreInfo GetInfo() const override - { - return {.CreateBlocks = m_EnableBlocks, - .UseTempBlockFiles = m_UseTempBlocks, - .AllowChunking = true, - .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId), - .Description = fmt::format("[cloud] {}. SessionId: {}. {}/{}/{}"sv, - m_Url, - m_BuildStorageHttp->GetSessionId(), - m_Namespace, - m_Bucket, - m_BuildId)}; - } - - virtual Stats GetStats() const override - { - return { - .m_SentBytes = m_BuildStorageStats->TotalBytesWritten.load(), - .m_ReceivedBytes = m_BuildStorageStats->TotalBytesRead.load(), - .m_RequestTimeNS = m_BuildStorageStats->TotalRequestTimeUs.load() * 1000, - .m_RequestCount = m_BuildStorageStats->TotalRequestCount.load(), - .m_PeakSentBytes = m_BuildStorageStats->PeakSentBytes.load(), - .m_PeakReceivedBytes = m_BuildStorageStats->PeakReceivedBytes.load(), - .m_PeakBytesPerSec = m_BuildStorageStats->PeakBytesPerSec.load(), - }; - } - - virtual CreateContainerResult CreateContainer() override - { - ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); - - CreateContainerResult Result; - Stopwatch Timer; - auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); - - CbObject Payload = LoadCompactBinaryObject(m_MetaData); - try - { - CbObject PutBuildResult = m_BuildStorage->PutBuild(m_BuildId, Payload); - ZEN_UNUSED(PutBuildResult); - m_OplogBuildPartId = Oid::NewOid(); - } - catch (const HttpClientError& Ex) - { - Result.ErrorCode = MakeErrorCode(Ex); - Result.Reason = - fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); - } - catch (const std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = - fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); - } - - return Result; - } - - virtual SaveResult SaveContainer(const IoBuffer& Payload) override - { - ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - - SaveResult Result; - Stopwatch Timer; - auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); - - try - { - CbObject ObjectPayload = LoadCompactBinaryObject(Payload); - - std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = - m_BuildStorage->PutBuildPart(m_BuildId, m_OplogBuildPartId, OplogContainerPartName, ObjectPayload); - Result.RawHash = PutBuildPartResult.first; - Result.Needs = std::unordered_set<IoHash, IoHash::Hasher>(PutBuildPartResult.second.begin(), PutBuildPartResult.second.end()); - } - catch (const HttpClientError& Ex) - { - Result.ErrorCode = MakeErrorCode(Ex); - Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - m_OplogBuildPartId, - Ex.what()); - } - catch (const std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - m_OplogBuildPartId, - Ex.what()); - } - - return Result; - } - - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, - const IoHash& RawHash, - ChunkBlockDescription&& Block) override - { - ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - - SaveAttachmentResult Result; - Stopwatch Timer; - auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); - - try - { - m_BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); - - if (Block.BlockHash == RawHash) - { - try - { - CbObjectWriter BlockMetaData; - BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string()); - CbObject MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()); - if (!m_BuildStorage->PutBlockMetadata(m_BuildId, RawHash, MetaPayload)) - { - ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - RawHash, - "not found"); - } - } - catch (const HttpClientError& Ex) - { - Result.ErrorCode = MakeErrorCode(Ex); - Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - Ex.what()); - } - catch (const std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - Ex.what()); - } - } - } - catch (const HttpClientError& Ex) - { - Result.ErrorCode = MakeErrorCode(Ex); - Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - Ex.what()); - } - catch (const std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - Ex.what()); - } - - return Result; - } - - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override - { - SaveAttachmentsResult Result; - Stopwatch Timer; - auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); - for (const SharedBuffer& Chunk : Chunks) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); - SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); - if (ChunkResult.ErrorCode) - { - return SaveAttachmentsResult{ChunkResult}; - } - } - return Result; - } - - virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override - { - ZEN_UNUSED(RawHash); - ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - - FinalizeResult Result; - Stopwatch Timer; - auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); - - try - { - std::vector<IoHash> Needs = m_BuildStorage->FinalizeBuildPart(m_BuildId, m_OplogBuildPartId, RawHash); - Result.Needs = std::unordered_set<IoHash, IoHash::Hasher>(Needs.begin(), Needs.end()); - } - catch (const HttpClientError& Ex) - { - Result.ErrorCode = Ex.GetInternalErrorCode() != 0 ? Ex.GetInternalErrorCode() - : Ex.GetHttpResponseCode() != HttpResponseCode::ImATeapot ? (int)Ex.GetHttpResponseCode() - : 0; - Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - m_OplogBuildPartId, - Ex.what()); - } - catch (const std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - m_OplogBuildPartId, - Ex.what()); - } - - if (!Result.ErrorCode && Result.Needs.empty()) - { - try - { - m_BuildStorage->FinalizeBuild(m_BuildId); - } - catch (const HttpClientError& Ex) - { - Result.ErrorCode = Ex.GetInternalErrorCode() != 0 ? Ex.GetInternalErrorCode() - : Ex.GetHttpResponseCode() != HttpResponseCode::ImATeapot ? (int)Ex.GetHttpResponseCode() - : 0; - Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - Ex.what()); - } - catch (const std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - Ex.what()); - } - } - - return Result; - } - - virtual LoadContainerResult LoadContainer() override - { - ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); - - LoadContainerResult Result; - Stopwatch Timer; - auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); - - try - { - CbObject BuildObject = m_BuildStorage->GetBuild(m_BuildId); - - CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); - if (!PartsObject) - { - throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, - m_Url, - m_Namespace, - m_Bucket, - m_BuildId)); - } - m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); - if (m_OplogBuildPartId == Oid::Zero) - { - throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv, - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - OplogContainerPartName)); - } - - Result.ContainerObject = m_BuildStorage->GetBuildPart(m_BuildId, m_OplogBuildPartId); - } - catch (const HttpClientError& Ex) - { - Result.ErrorCode = MakeErrorCode(Ex); - Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - Ex.what()); - } - catch (const std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'", - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - Ex.what()); - } - - return Result; - } - - virtual GetKnownBlocksResult GetKnownBlocks() override - { - ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - - GetKnownBlocksResult Result; - Stopwatch Timer; - auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); - - try - { - CbObject KnownBlocks = m_BuildStorage->FindBlocks(m_BuildId, 10000u); - std::optional<std::vector<ChunkBlockDescription>> Blocks = ParseChunkBlockDescriptionList(KnownBlocks); - Result.Blocks.reserve(Blocks.value().size()); - for (ChunkBlockDescription& BlockDescription : Blocks.value()) - { - Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash, - .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)}); - } - } - catch (const HttpClientError& Ex) - { - Result.ErrorCode = MakeErrorCode(Ex); - Result.Reason = - fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); - } - catch (const std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = - fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); - } - - return Result; - } - - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override - { - ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - - LoadAttachmentResult Result; - Stopwatch Timer; - auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); - - try - { - Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash); - } - catch (const HttpClientError& Ex) - { - Result.ErrorCode = MakeErrorCode(Ex); - Result.Reason = - fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); - } - catch (const std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = - fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what()); - } - - return Result; - } - - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override - { - LoadAttachmentsResult Result; - Stopwatch Timer; - auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); - for (const IoHash& Hash : RawHashes) - { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash); - if (ChunkResult.ErrorCode) - { - return LoadAttachmentsResult{ChunkResult}; - } - ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); - Result.Chunks.emplace_back( - std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); - } - return Result; - } - -private: - static int MakeErrorCode(const HttpClientError& Ex) - { - return Ex.GetInternalErrorCode() != 0 ? Ex.GetInternalErrorCode() - : Ex.GetHttpResponseCode() != HttpResponseCode::ImATeapot ? (int)Ex.GetHttpResponseCode() - : 0; - } - - std::unique_ptr<BuildStorage::Statistics> m_BuildStorageStats; - std::unique_ptr<HttpClient> m_BuildStorageHttp; - std::unique_ptr<BuildStorage> m_BuildStorage; - const std::string m_Url; - const std::string m_Namespace; - const std::string m_Bucket; - const Oid m_BuildId; - IoBuffer m_MetaData; - Oid m_OplogBuildPartId = Oid::Zero; - const bool m_EnableBlocks = true; - const bool m_UseTempBlocks = true; - const bool m_AllowRedirect = false; -}; - -std::shared_ptr<RemoteProjectStore> -CreateJupiterBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, - const std::filesystem::path& TempFilePath, - bool Quiet, - bool Unattended) -{ - std::string Url = Options.Url; - if (Url.find("://"sv) == std::string::npos) - { - // Assume https URL - Url = fmt::format("https://{}"sv, Url); - } - - // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider - // 2) Access token as parameter in request - // 3) Environment variable (different win vs linux/mac) - // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider - - std::function<HttpClientAccessToken()> TokenProvider; - if (!Options.OpenIdProvider.empty()) - { - TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider); - } - else if (!Options.AccessToken.empty()) - { - TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken); - } - else if (!Options.OidcExePath.empty()) - { - if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url, Quiet, Unattended); - TokenProviderMaybe) - { - TokenProvider = TokenProviderMaybe.value(); - } - } - - if (!TokenProvider) - { - TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); - } - - HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", - .ConnectTimeout = std::chrono::milliseconds(2000), - .Timeout = std::chrono::milliseconds(1800000), - .AccessTokenProvider = std::move(TokenProvider), - .AssumeHttp2 = Options.AssumeHttp2, - .AllowResume = true, - .RetryCount = 4}; - - std::unique_ptr<BuildStorage::Statistics> BuildStorageStats(std::make_unique<BuildStorage::Statistics>()); - - std::unique_ptr<HttpClient> BuildStorageHttp = std::make_unique<HttpClient>(Url, ClientSettings); - - std::unique_ptr<BuildStorage> BuildStorage = - CreateJupiterBuildStorage(Log(), *BuildStorageHttp, *BuildStorageStats, Options.Namespace, Options.Bucket, false, TempFilePath); - - std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(BuildStorageStats), - std::move(BuildStorageHttp), - std::move(BuildStorage), - Url, - Options.Namespace, - Options.Bucket, - Options.BuildId, - Options.MetaData, - Options.ForceDisableBlocks, - Options.ForceDisableTempBlocks); - return RemoteStore; -} - -} // namespace zen diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.h b/src/zenserver/projectstore/buildsremoteprojectstore.h deleted file mode 100644 index 24ab9678d..000000000 --- a/src/zenserver/projectstore/buildsremoteprojectstore.h +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "remoteprojectstore.h" - -namespace zen { - -class AuthMgr; - -struct BuildsRemoteStoreOptions : RemoteStoreOptions -{ - std::string Url; - std::string Namespace; - std::string Bucket; - Oid BuildId; - std::string OpenIdProvider; - std::string AccessToken; - AuthMgr& AuthManager; - std::filesystem::path OidcExePath; - bool ForceDisableBlocks = false; - bool ForceDisableTempBlocks = false; - bool AssumeHttp2 = false; - IoBuffer MetaData; -}; - -std::shared_ptr<RemoteProjectStore> CreateJupiterBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, - const std::filesystem::path& TempFilePath, - bool Quiet, - bool Unattended); - -} // namespace zen diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp deleted file mode 100644 index 348f5fb8a..000000000 --- a/src/zenserver/projectstore/fileremoteprojectstore.cpp +++ /dev/null @@ -1,341 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "fileremoteprojectstore.h" - -#include <zencore/compactbinaryutil.h> -#include <zencore/compress.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/timer.h> -#include <zenhttp/httpcommon.h> - -namespace zen { - -using namespace std::literals; - -class LocalExportProjectStore : public RemoteProjectStore -{ -public: - LocalExportProjectStore(std::string_view Name, - std::string_view OptionalBaseName, - const std::filesystem::path& FolderPath, - bool ForceDisableBlocks, - bool ForceEnableTempBlocks) - : m_Name(Name) - , m_OptionalBaseName(OptionalBaseName) - , m_OutputPath(FolderPath) - { - if (ForceDisableBlocks) - { - m_EnableBlocks = false; - } - if (ForceEnableTempBlocks) - { - m_UseTempBlocks = true; - } - } - - virtual RemoteStoreInfo GetInfo() const override - { - return { - .CreateBlocks = m_EnableBlocks, - .UseTempBlockFiles = m_UseTempBlocks, - .AllowChunking = true, - .ContainerName = m_Name, - .Description = - fmt::format("[file] {}/{}{}{}"sv, m_OutputPath, m_Name, m_OptionalBaseName.empty() ? "" : " Base: ", m_OptionalBaseName)}; - } - - virtual Stats GetStats() const override - { - return {.m_SentBytes = m_SentBytes.load(), - .m_ReceivedBytes = m_ReceivedBytes.load(), - .m_RequestTimeNS = m_RequestTimeNS.load(), - .m_RequestCount = m_RequestCount.load(), - .m_PeakSentBytes = m_PeakSentBytes.load(), - .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), - .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; - } - - virtual CreateContainerResult CreateContainer() override - { - // Nothing to do here - return {}; - } - - virtual SaveResult SaveContainer(const IoBuffer& Payload) override - { - Stopwatch Timer; - SaveResult Result; - - { - CbObject ContainerObject = LoadCompactBinaryObject(Payload); - - ContainerObject.IterateAttachments([&](CbFieldView FieldView) { - IoHash AttachmentHash = FieldView.AsBinaryAttachment(); - std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash); - if (!IsFile(AttachmentPath)) - { - Result.Needs.insert(AttachmentHash); - } - }); - } - - std::filesystem::path ContainerPath = m_OutputPath; - ContainerPath.append(m_Name); - - try - { - CreateDirectories(m_OutputPath); - BasicFile ContainerFile; - ContainerFile.Open(ContainerPath, BasicFile::Mode::kTruncate); - std::error_code Ec; - ContainerFile.WriteAll(Payload, Ec); - if (Ec) - { - throw std::system_error(Ec, Ec.message()); - } - Result.RawHash = IoHash::HashBuffer(Payload); - } - catch (const std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed saving oplog container to '{}'. Reason: {}", ContainerPath, Ex.what()); - } - AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return Result; - } - - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override - { - Stopwatch Timer; - SaveAttachmentResult Result; - std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (!IsFile(ChunkPath)) - { - try - { - CreateDirectories(ChunkPath.parent_path()); - - BasicFile ChunkFile; - ChunkFile.Open(ChunkPath, BasicFile::Mode::kTruncate); - size_t Offset = 0; - for (const SharedBuffer& Segment : Payload.GetSegments()) - { - ChunkFile.Write(Segment.GetView(), Offset); - Offset += Segment.GetSize(); - } - } - catch (const std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed saving oplog attachment to '{}'. Reason: {}", ChunkPath, Ex.what()); - } - } - AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return Result; - } - - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override - { - Stopwatch Timer; - - for (const SharedBuffer& Chunk : Chunks) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); - SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); - if (ChunkResult.ErrorCode) - { - ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return SaveAttachmentsResult{ChunkResult}; - } - } - SaveAttachmentsResult Result; - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return Result; - } - - virtual FinalizeResult FinalizeContainer(const IoHash&) override { return {}; } - - virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Name); } - - virtual GetKnownBlocksResult GetKnownBlocks() override - { - if (m_OptionalBaseName.empty()) - { - return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; - } - LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseName); - if (LoadResult.ErrorCode) - { - return GetKnownBlocksResult{LoadResult}; - } - Stopwatch Timer; - std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject); - if (BlockHashes.empty()) - { - return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), - .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; - } - std::vector<IoHash> ExistingBlockHashes; - for (const IoHash& RawHash : BlockHashes) - { - std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (IsFile(ChunkPath)) - { - ExistingBlockHashes.push_back(RawHash); - } - } - if (ExistingBlockHashes.empty()) - { - return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), - .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; - } - std::vector<ThinChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); - GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; - Result.Blocks = std::move(KnownBlocks); - return Result; - } - - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override - { - Stopwatch Timer; - LoadAttachmentResult Result; - std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (!IsFile(ChunkPath)) - { - Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); - Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return Result; - } - { - BasicFile ChunkFile; - ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); - Result.Bytes = ChunkFile.ReadAll(); - } - AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return Result; - } - - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override - { - Stopwatch Timer; - LoadAttachmentsResult Result; - for (const IoHash& Hash : RawHashes) - { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash); - if (ChunkResult.ErrorCode) - { - ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return LoadAttachmentsResult{ChunkResult}; - } - ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); - Result.Chunks.emplace_back( - std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); - } - return Result; - } - -private: - LoadContainerResult LoadContainer(const std::string& Name) - { - Stopwatch Timer; - LoadContainerResult Result; - std::filesystem::path SourcePath = m_OutputPath; - SourcePath.append(Name); - if (!IsFile(SourcePath)) - { - Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); - Result.Reason = fmt::format("Failed loading oplog container from '{}'. Reason: 'The file does not exist'", SourcePath.string()); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return Result; - } - IoBuffer ContainerPayload; - { - BasicFile ContainerFile; - ContainerFile.Open(SourcePath, BasicFile::Mode::kRead); - ContainerPayload = ContainerFile.ReadAll(); - } - AddStats(0, ContainerPayload.GetSize(), Timer.GetElapsedTimeUs() * 1000); - CbValidateError ValidateResult = CbValidateError::None; - if (Result.ContainerObject = ValidateAndReadCompactBinaryObject(std::move(ContainerPayload), ValidateResult); - ValidateResult != CbValidateError::None || !Result.ContainerObject) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The file {} is not formatted as a compact binary object ('{}')", - SourcePath.string(), - ToString(ValidateResult)); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return Result; - } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return Result; - } - - std::filesystem::path GetAttachmentPath(const IoHash& RawHash) const - { - ExtendablePathBuilder<128> ShardedPath; - ShardedPath.Append(m_OutputPath.c_str()); - ExtendableStringBuilder<64> HashString; - RawHash.ToHexString(HashString); - const char* str = HashString.c_str(); - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str, str + 3); - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str + 3, str + 5); - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str + 5, str + 40); - - return ShardedPath.ToPath(); - } - - void AddStats(uint64_t UploadedBytes, uint64_t DownloadedBytes, uint64_t ElapsedNS) - { - m_SentBytes.fetch_add(UploadedBytes); - m_ReceivedBytes.fetch_add(DownloadedBytes); - m_RequestTimeNS.fetch_add(ElapsedNS); - SetAtomicMax(m_PeakSentBytes, UploadedBytes); - SetAtomicMax(m_PeakReceivedBytes, DownloadedBytes); - if (ElapsedNS > 0) - { - uint64_t BytesPerSec = (gsl::narrow<uint64_t>(UploadedBytes + DownloadedBytes) * 1000000) / ElapsedNS; - SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); - } - - m_RequestCount.fetch_add(1); - } - - const std::string m_Name; - const std::string m_OptionalBaseName; - const std::filesystem::path m_OutputPath; - bool m_EnableBlocks = true; - bool m_UseTempBlocks = false; - - std::atomic_uint64_t m_SentBytes = {}; - std::atomic_uint64_t m_ReceivedBytes = {}; - std::atomic_uint64_t m_RequestTimeNS = {}; - std::atomic_uint64_t m_RequestCount = {}; - std::atomic_uint64_t m_PeakSentBytes = {}; - std::atomic_uint64_t m_PeakReceivedBytes = {}; - std::atomic_uint64_t m_PeakBytesPerSec = {}; -}; - -std::shared_ptr<RemoteProjectStore> -CreateFileRemoteStore(const FileRemoteStoreOptions& Options) -{ - std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<LocalExportProjectStore>(Options.Name, - Options.OptionalBaseName, - std::filesystem::path(Options.FolderPath), - Options.ForceDisableBlocks, - Options.ForceEnableTempBlocks); - return RemoteStore; -} - -} // namespace zen diff --git a/src/zenserver/projectstore/fileremoteprojectstore.h b/src/zenserver/projectstore/fileremoteprojectstore.h deleted file mode 100644 index 8da9692d5..000000000 --- a/src/zenserver/projectstore/fileremoteprojectstore.h +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "remoteprojectstore.h" - -namespace zen { - -struct FileRemoteStoreOptions : RemoteStoreOptions -{ - std::filesystem::path FolderPath; - std::string Name; - std::string OptionalBaseName; - bool ForceDisableBlocks = false; - bool ForceEnableTempBlocks = false; -}; - -std::shared_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options); - -} // namespace zen diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index c227c0f9e..9fc9e4605 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -2,8 +2,6 @@ #include "httpprojectstore.h" -#include "remoteprojectstore.h" - #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryutil.h> @@ -16,18 +14,17 @@ #include <zencore/stream.h> #include <zencore/trace.h> #include <zenhttp/packageformat.h> +#include <zenremotestore/projectstore/buildsremoteprojectstore.h> +#include <zenremotestore/projectstore/fileremoteprojectstore.h> +#include <zenremotestore/projectstore/jupiterremoteprojectstore.h> +#include <zenremotestore/projectstore/remoteprojectstore.h> +#include <zenremotestore/projectstore/zenremoteprojectstore.h> #include <zenstore/oplogreferencedset.h> #include <zenstore/projectstore.h> #include <zenstore/zenstore.h> #include <zenutil/openprocesscache.h> #include <zenutil/workerpools.h> -#include "buildsremoteprojectstore.h" -#include "fileremoteprojectstore.h" -#include "jupiterremoteprojectstore.h" -#include "remoteprojectstore.h" -#include "zenremoteprojectstore.h" - namespace zen { const FLLMTag& diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp deleted file mode 100644 index da3f92353..000000000 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ /dev/null @@ -1,402 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "jupiterremoteprojectstore.h" - -#include <zencore/compactbinaryutil.h> -#include <zencore/compress.h> -#include <zencore/fmtutils.h> - -#include <zenhttp/httpclientauth.h> - -#include <zenutil/jupiter/jupiterclient.h> -#include <zenutil/jupiter/jupitersession.h> - -namespace zen { - -using namespace std::literals; - -class JupiterRemoteStore : public RemoteProjectStore -{ -public: - JupiterRemoteStore(Ref<JupiterClient>&& InJupiterClient, - std::string_view Namespace, - std::string_view Bucket, - const IoHash& Key, - const IoHash& OptionalBaseKey, - bool ForceDisableBlocks, - bool ForceDisableTempBlocks, - const std::filesystem::path& TempFilePath) - : m_JupiterClient(std::move(InJupiterClient)) - , m_Namespace(Namespace) - , m_Bucket(Bucket) - , m_Key(Key) - , m_OptionalBaseKey(OptionalBaseKey) - , m_TempFilePath(TempFilePath) - , m_EnableBlocks(!ForceDisableBlocks) - , m_UseTempBlocks(!ForceDisableTempBlocks) - { - } - - virtual RemoteStoreInfo GetInfo() const override - { - return {.CreateBlocks = m_EnableBlocks, - .UseTempBlockFiles = m_UseTempBlocks, - .AllowChunking = true, - .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key), - .Description = fmt::format("[cloud] {}. SessionId: {}. {}/{}/{}{}"sv, - m_JupiterClient->ServiceUrl(), - m_JupiterClient->Client().GetSessionId(), - m_Namespace, - m_Bucket, - m_Key, - m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format(" Base {}", m_OptionalBaseKey))}; - } - - virtual Stats GetStats() const override - { - return {.m_SentBytes = m_SentBytes.load(), - .m_ReceivedBytes = m_ReceivedBytes.load(), - .m_RequestTimeNS = m_RequestTimeNS.load(), - .m_RequestCount = m_RequestCount.load(), - .m_PeakSentBytes = m_PeakSentBytes.load(), - .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), - .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; - } - - virtual CreateContainerResult CreateContainer() override - { - // Nothing to do here - return {}; - } - - virtual SaveResult SaveContainer(const IoBuffer& Payload) override - { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); - AddStats(PutResult); - - SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_Key, - Result.Reason); - } - return Result; - } - - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override - { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); - AddStats(PutResult); - - SaveAttachmentResult Result{ConvertResult(PutResult)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - RawHash, - Result.Reason); - } - return Result; - } - - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override - { - SaveAttachmentsResult Result; - for (const SharedBuffer& Chunk : Chunks) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); - SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); - if (ChunkResult.ErrorCode) - { - return SaveAttachmentsResult{ChunkResult}; - } - } - return Result; - } - - virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override - { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); - AddStats(FinalizeRefResult); - - FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - m_Key, - Result.Reason); - } - return Result; - } - - virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Key); } - - virtual GetKnownBlocksResult GetKnownBlocks() override - { - if (m_OptionalBaseKey == IoHash::Zero) - { - return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; - } - LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseKey); - if (LoadResult.ErrorCode) - { - return GetKnownBlocksResult{LoadResult}; - } - std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject); - if (BlockHashes.empty()) - { - return GetKnownBlocksResult{ - {.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds}}; - } - - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterExistsResult ExistsResult = - Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(BlockHashes.begin(), BlockHashes.end())); - AddStats(ExistsResult); - - if (ExistsResult.ErrorCode) - { - return GetKnownBlocksResult{{.ErrorCode = ExistsResult.ErrorCode, - .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds, - .Reason = fmt::format("Failed checking attachment existance in {}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - ExistsResult.Reason)}}; - } - - Stopwatch Timer; - std::vector<IoHash> ExistingBlockHashes; - for (const IoHash& RawHash : BlockHashes) - { - if (!ExistsResult.Needs.contains(RawHash)) - { - ExistingBlockHashes.push_back(RawHash); - } - } - if (ExistingBlockHashes.empty()) - { - return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), - .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds}}; - } - std::vector<ThinChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); - - GetKnownBlocksResult Result{ - {.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000.0}}; - Result.Blocks = std::move(KnownBlocks); - return Result; - } - - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override - { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); - AddStats(GetResult); - - LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; - if (GetResult.ErrorCode) - { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - RawHash, - Result.Reason); - } - return Result; - } - - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override - { - LoadAttachmentsResult Result; - for (const IoHash& Hash : RawHashes) - { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash); - if (ChunkResult.ErrorCode) - { - return LoadAttachmentsResult{ChunkResult}; - } - ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); - Result.Chunks.emplace_back( - std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); - } - return Result; - } - -private: - LoadContainerResult LoadContainer(const IoHash& Key) - { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject); - AddStats(GetResult); - if (GetResult.ErrorCode || !GetResult.Success) - { - LoadContainerResult Result{ConvertResult(GetResult)}; - Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - Key, - Result.Reason); - return Result; - } - - CbValidateError ValidateResult = CbValidateError::None; - if (CbObject ContainerObject = ValidateAndReadCompactBinaryObject(IoBuffer(GetResult.Response), ValidateResult); - ValidateResult != CbValidateError::None || !ContainerObject) - { - return LoadContainerResult{ - RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - .ElapsedSeconds = GetResult.ElapsedSeconds, - .Reason = fmt::format("The ref {}/{}/{}/{} is not formatted as a compact binary object"sv, - m_JupiterClient->ServiceUrl(), - m_Namespace, - m_Bucket, - Key)}, - {}}; - } - else - { - return LoadContainerResult{ConvertResult(GetResult), std::move(ContainerObject)}; - } - } - - void AddStats(const JupiterResult& Result) - { - m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes)); - m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes)); - m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000)); - SetAtomicMax(m_PeakSentBytes, Result.SentBytes); - SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes); - if (Result.ElapsedSeconds > 0.0) - { - uint64_t BytesPerSec = static_cast<uint64_t>((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); - SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); - } - - m_RequestCount.fetch_add(1); - } - - static Result ConvertResult(const JupiterResult& Response) - { - std::string Text; - int32_t ErrorCode = 0; - if (Response.ErrorCode != 0 || !Response.Success) - { - if (Response.Response) - { - HttpContentType ContentType = Response.Response.GetContentType(); - if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON) - { - ExtendableStringBuilder<256> SB; - SB.Append("\n"); - SB.Append(std::string_view(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()), - Response.Response.GetSize())); - Text = SB.ToString(); - } - else if (ContentType == ZenContentType::kCbObject) - { - ExtendableStringBuilder<256> SB; - SB.Append("\n"); - CompactBinaryToJson(Response.Response.GetView(), SB); - Text = SB.ToString(); - } - } - } - if (Response.ErrorCode != 0) - { - ErrorCode = Response.ErrorCode; - } - else if (!Response.Success) - { - ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - } - return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; - } - - Ref<JupiterClient> m_JupiterClient; - const std::string m_Namespace; - const std::string m_Bucket; - const IoHash m_Key; - const IoHash m_OptionalBaseKey; - std::filesystem::path m_TempFilePath; - const bool m_EnableBlocks = true; - const bool m_UseTempBlocks = true; - const bool m_AllowRedirect = false; - - std::atomic_uint64_t m_SentBytes = {}; - std::atomic_uint64_t m_ReceivedBytes = {}; - std::atomic_uint64_t m_RequestTimeNS = {}; - std::atomic_uint64_t m_RequestCount = {}; - std::atomic_uint64_t m_PeakSentBytes = {}; - std::atomic_uint64_t m_PeakReceivedBytes = {}; - std::atomic_uint64_t m_PeakBytesPerSec = {}; -}; - -std::shared_ptr<RemoteProjectStore> -CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet, bool Unattended) -{ - std::string Url = Options.Url; - if (Url.find("://"sv) == std::string::npos) - { - // Assume https URL - Url = fmt::format("https://{}"sv, Url); - } - JupiterClientOptions ClientOptions{.Name = "Remote store"sv, - .ServiceUrl = Url, - .ConnectTimeout = std::chrono::milliseconds(2000), - .Timeout = std::chrono::milliseconds(1800000), - .AssumeHttp2 = Options.AssumeHttp2, - .AllowResume = true, - .RetryCount = 4}; - // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider - // 2) Access token as parameter in request - // 3) Environment variable (different win vs linux/mac) - // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider - - std::function<HttpClientAccessToken()> TokenProvider; - if (!Options.OpenIdProvider.empty()) - { - TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider); - } - else if (!Options.AccessToken.empty()) - { - TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken); - } - else if (!Options.OidcExePath.empty()) - { - if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url, Quiet, Unattended); - TokenProviderMaybe) - { - TokenProvider = TokenProviderMaybe.value(); - } - } - - if (!TokenProvider) - { - TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); - } - - Ref<JupiterClient> Client(new JupiterClient(ClientOptions, std::move(TokenProvider))); - - std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(Client), - Options.Namespace, - Options.Bucket, - Options.Key, - Options.OptionalBaseKey, - Options.ForceDisableBlocks, - Options.ForceDisableTempBlocks, - TempFilePath); - return RemoteStore; -} - -} // namespace zen diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h deleted file mode 100644 index d7a22700a..000000000 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.h +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "remoteprojectstore.h" - -namespace zen { - -class AuthMgr; - -struct JupiterRemoteStoreOptions : RemoteStoreOptions -{ - std::string Url; - std::string Namespace; - std::string Bucket; - IoHash Key; - IoHash OptionalBaseKey; - std::string OpenIdProvider; - std::string AccessToken; - AuthMgr& AuthManager; - std::filesystem::path OidcExePath; - bool ForceDisableBlocks = false; - bool ForceDisableTempBlocks = false; - bool AssumeHttp2 = false; -}; - -std::shared_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, - const std::filesystem::path& TempFilePath, - bool Quiet, - bool Unattended); - -} // namespace zen diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp deleted file mode 100644 index c31c2297e..000000000 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ /dev/null @@ -1,3519 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "remoteprojectstore.h" - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryutil.h> -#include <zencore/compress.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zencore/stream.h> -#include <zencore/timer.h> -#include <zencore/workthreadpool.h> -#include <zenhttp/httpcommon.h> -#include <zenstore/cidstore.h> -#include <zenutil/chunkedfile.h> -#include <zenutil/workerpools.h> - -#include <unordered_map> - -#if ZEN_WITH_TESTS -# include <zencore/testing.h> -# include <zencore/testutils.h> -# include "fileremoteprojectstore.h" -#endif // ZEN_WITH_TESTS - -namespace zen { - -/* - OplogContainer - Binary("ops") // Compressed CompactBinary object to hide attachment references, also makes the oplog smaller - { - CbArray("ops") - { - CbObject Op - (CbFieldType::BinaryAttachment Attachments[]) - (OpData) - } - } - CbArray("blocks") - CbObject - CbFieldType::BinaryAttachment "rawhash" // Optional, only if we are creating blocks (Jupiter/File) - CbArray("chunks") - CbFieldType::Hash // Chunk hashes - CbArray("chunks") // Optional, only if we are not creating blocks (Zen) - CbFieldType::BinaryAttachment // Chunk attachment hashes - - CbArray("chunkedfiles"); - CbFieldType::Hash "rawhash" - CbFieldType::Integer "rawsize" - CbArray("chunks"); - CbFieldType::Hash "chunkhash" - CbArray("sequence"); - CbFieldType::Integer chunks index - - CompressedBinary ChunkBlock - { - VarUInt ChunkCount - VarUInt ChunkSizes[ChunkCount] - uint8_t[chunksize])[ChunkCount] - } -*/ -namespace remotestore_impl { - ////////////////////////////// AsyncRemoteResult - - struct AsyncRemoteResult - { - void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText) - { - int32_t Expected = 0; - if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1)) - { - m_ErrorReason = ErrorReason; - m_ErrorText = ErrorText; - } - } - bool IsError() const { return m_ErrorCode.load() != 0; } - int GetError() const { return m_ErrorCode.load(); }; - const std::string& GetErrorReason() const { return m_ErrorReason; }; - const std::string& GetErrorText() const { return m_ErrorText; }; - RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const - { - return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText}; - } - - private: - std::atomic<int32_t> m_ErrorCode = 0; - std::string m_ErrorReason; - std::string m_ErrorText; - }; - - void ReportProgress(JobContext* OptionalContext, - std::string_view CurrentOp, - std::string_view Details, - ptrdiff_t Total, - ptrdiff_t Remaining) - { - if (OptionalContext) - { - ZEN_ASSERT(Total > 0); - OptionalContext->ReportProgress(CurrentOp, Details, Total, Remaining); - } - } - - void ReportMessage(JobContext* OptionalContext, std::string_view Message) - { - if (OptionalContext) - { - OptionalContext->ReportMessage(Message); - } - ZEN_INFO("{}", Message); - } - - bool IsCancelled(JobContext* OptionalContext) - { - if (!OptionalContext) - { - return false; - } - return OptionalContext->IsCancelled(); - } - - std::string GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS) - { - return fmt::format( - "Sent: {} ({}bits/s) Recv: {} ({}bits/s)", - NiceBytes(Stats.m_SentBytes), - NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u), - NiceBytes(Stats.m_ReceivedBytes), - NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u)); - } - - void LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats) - { - ZEN_INFO("Oplog request count: {}. Average request size: {}. Average request time: {}, Peak request speed: {}bits/s", - Stats.m_RequestCount, - NiceBytes(Stats.m_RequestCount > 0 ? (Stats.m_ReceivedBytes + Stats.m_SentBytes) / Stats.m_RequestCount : 0u), - NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_RequestTimeNS / Stats.m_RequestCount) : 0u), - NiceNum(Stats.m_PeakBytesPerSec)); - ZEN_INFO( - "Oplog sent request avg: {} ({}bits/s). Peak: {}", - NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_SentBytes) / Stats.m_RequestCount) : 0u), - NiceNum(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 8 * 1000000000) / Stats.m_RequestTimeNS) : 0u), - NiceBytes(Stats.m_PeakSentBytes)); - ZEN_INFO( - "Oplog recv request avg: {} ({}bits/s). Peak: {}", - NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes) / Stats.m_RequestCount) : 0u), - NiceNum(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 8 * 1000000000) / Stats.m_RequestTimeNS) - : 0u), - NiceBytes(Stats.m_PeakReceivedBytes)); - } - - size_t AddBlock(RwLock& BlocksLock, std::vector<ChunkBlockDescription>& Blocks) - { - size_t BlockIndex; - { - RwLock::ExclusiveLockScope _(BlocksLock); - BlockIndex = Blocks.size(); - Blocks.resize(BlockIndex + 1); - } - return BlockIndex; - } - - RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext) - { - using namespace std::literals; - - Stopwatch Timer; - - CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); - const uint64_t OpCount = OpsArray.Num(); - - ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount)); - - const size_t OpsBatchSize = 8192; - std::vector<uint8_t> OpsData; - std::vector<size_t> OpDataOffsets; - size_t OpsCompleteCount = 0; - - OpsData.reserve(OpsBatchSize); - - auto AppendBatch = [&]() { - std::vector<CbObjectView> Ops; - Ops.reserve(OpDataOffsets.size()); - for (size_t OpDataOffset : OpDataOffsets) - { - Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset])); - } - std::vector<ProjectStore::LogSequenceNumber> OpLsns = Oplog.AppendNewOplogEntries(Ops); - OpsCompleteCount += OpLsns.size(); - OpsData.clear(); - OpDataOffsets.clear(); - ReportProgress(OptionalContext, - "Writing oplog"sv, - fmt::format("{} remaining...", OpCount - OpsCompleteCount), - OpCount, - OpCount - OpsCompleteCount); - }; - - BinaryWriter Writer; - for (CbFieldView OpEntry : OpsArray) - { - CbObjectView Op = OpEntry.AsObjectView(); - Op.CopyTo(Writer); - OpDataOffsets.push_back(OpsData.size()); - OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize()); - Writer.Reset(); - - if (OpDataOffsets.size() == OpsBatchSize) - { - AppendBatch(); - } - } - if (!OpDataOffsets.empty()) - { - AppendBatch(); - } - - if (OpCount > 0) - { - ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0); - } - - return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; - } - - struct DownloadInfo - { - uint64_t OplogSizeBytes = 0; - std::atomic<uint64_t> AttachmentsDownloaded = 0; - std::atomic<uint64_t> AttachmentBlocksDownloaded = 0; - std::atomic<uint64_t> AttachmentBytesDownloaded = 0; - std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0; - std::atomic<uint64_t> AttachmentsStored = 0; - std::atomic<uint64_t> AttachmentBytesStored = 0; - std::atomic_size_t MissingAttachmentCount = 0; - }; - - void DownloadAndSaveBlockChunks(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, - DownloadInfo& Info, - Stopwatch& LoadAttachmentsTimer, - std::atomic_uint64_t& DownloadStartMS, - const std::vector<IoHash>& Chunks) - { - AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork( - [&RemoteStore, - &ChunkStore, - &WorkerPool, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &RemoteResult, - Chunks = Chunks, - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); - if (Result.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to load attachments with {} chunks ({}): {}", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - Info.MissingAttachmentCount.fetch_add(1); - if (IgnoreMissingAttachments) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - } - return; - } - Info.AttachmentsDownloaded.fetch_add(Chunks.size()); - ZEN_INFO("Loaded {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - if (RemoteResult.IsError()) - { - return; - } - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - if (!Chunks.empty()) - { - try - { - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - WriteAttachmentBuffers.reserve(Chunks.size()); - WriteRawHashes.reserve(Chunks.size()); - - for (const auto& It : Chunks) - { - uint64_t ChunkSize = It.second.GetCompressedSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); - WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(It.first); - } - std::vector<CidStore::InsertResult> InsertResults = - ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); - - for (size_t Index = 0; Index < InsertResults.size(); Index++) - { - if (InsertResults[Index].New) - { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); - } - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk save {} attachments", Chunks.size()), - Ex.what()); - } - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to bulk load {} attachments", Chunks.size()), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - }; - - void DownloadAndSaveBlock(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, - DownloadInfo& Info, - Stopwatch& LoadAttachmentsTimer, - std::atomic_uint64_t& DownloadStartMS, - const IoHash& BlockHash, - const std::vector<IoHash>& Chunks, - uint32_t RetriesLeft) - { - AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork( - [&AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, - BlockHash, - &RemoteResult, - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, - RetriesLeft, - Chunks = Chunks]() { - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); - if (BlockResult.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to download block attachment {} ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); - } - return; - } - if (RemoteResult.IsError()) - { - return; - } - uint64_t BlockSize = BlockResult.Bytes.GetSize(); - Info.AttachmentBlocksDownloaded.fetch_add(1); - ZEN_INFO("Loaded block attachment '{}' in {} ({})", - BlockHash, - NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), - NiceBytes(BlockSize)); - Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); - - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, - BlockHash, - &RemoteResult, - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, - RetriesLeft, - Chunks = Chunks, - Bytes = std::move(BlockResult.Bytes)]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - ZEN_ASSERT(Bytes.Size() > 0); - std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; - WantedChunks.reserve(Chunks.size()); - WantedChunks.insert(Chunks.begin(), Chunks.end()); - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize); - if (!Compressed) - { - if (RetriesLeft > 0) - { - ReportMessage( - OptionalContext, - fmt::format( - "Block attachment {} is malformed, can't parse as compressed binary, retrying download", - BlockHash)); - return DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockHash, - std::move(Chunks), - RetriesLeft - 1); - } - ReportMessage( - OptionalContext, - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash)); - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash), - {}); - return; - } - SharedBuffer BlockPayload = Compressed.Decompress(); - if (!BlockPayload) - { - if (RetriesLeft > 0) - { - ReportMessage( - OptionalContext, - fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download", - BlockHash)); - return DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockHash, - std::move(Chunks), - RetriesLeft - 1); - } - ReportMessage(OptionalContext, - fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash)); - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash), - {}); - return; - } - if (RawHash != BlockHash) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); - RemoteResult.SetError( - gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), - {}); - return; - } - - uint64_t BlockHeaderSize = 0; - bool StoreChunksOK = IterateChunkBlock( - BlockPayload, - [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, - const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - IoHash RawHash; - uint64_t RawSize; - ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), - RawHash, - RawSize)); - ZEN_ASSERT(RawHash == AttachmentRawHash); - WriteRawHashes.emplace_back(AttachmentRawHash); - WantedChunks.erase(AttachmentRawHash); - } - }, - BlockHeaderSize); - - if (!StoreChunksOK) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has invalid format ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Invalid format for block {}", BlockHash), - {}); - return; - } - - ZEN_ASSERT(WantedChunks.empty()); - - if (!WriteAttachmentBuffers.empty()) - { - auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (size_t Index = 0; Index < Results.size(); Index++) - { - const auto& Result = Results[Index]; - if (Result.New) - { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); - } - } - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed save block attachment {}", BlockHash), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to block attachment {}", BlockHash), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - }; - - void DownloadAndSaveAttachment(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - Latch& AttachmentsDownloadLatch, - Latch& AttachmentsWriteLatch, - AsyncRemoteResult& RemoteResult, - DownloadInfo& Info, - Stopwatch& LoadAttachmentsTimer, - std::atomic_uint64_t& DownloadStartMS, - const IoHash& RawHash) - { - AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork( - [&RemoteStore, - &ChunkStore, - &WorkerPool, - &RemoteResult, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - RawHash, - &LoadAttachmentsTimer, - &DownloadStartMS, - &Info, - IgnoreMissingAttachments, - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to download large attachment {}: '{}', error code : {}", - RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode)); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); - } - return; - } - uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); - ZEN_INFO("Loaded large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), - NiceBytes(AttachmentSize)); - Info.AttachmentsDownloaded.fetch_add(1); - if (RemoteResult.IsError()) - { - return; - } - Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); - - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&AttachmentsWriteLatch, - &RemoteResult, - &Info, - &ChunkStore, - RawHash, - AttachmentSize, - Bytes = std::move(AttachmentResult.Bytes), - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); - if (InsertResult.New) - { - Info.AttachmentBytesStored.fetch_add(AttachmentSize); - Info.AttachmentsStored.fetch_add(1); - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Saving attachment {} failed", RawHash), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Loading attachment {} failed", RawHash), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - }; - - void CreateBlock(WorkerThreadPool& WorkerPool, - Latch& OpSectionsLatch, - std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, - RwLock& SectionsLock, - std::vector<ChunkBlockDescription>& Blocks, - size_t BlockIndex, - const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, - AsyncRemoteResult& RemoteResult) - { - OpSectionsLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&Blocks, - &SectionsLock, - &OpSectionsLatch, - BlockIndex, - Chunks = std::move(ChunksInBlock), - &AsyncOnBlock, - &RemoteResult]() mutable { - auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - size_t ChunkCount = Chunks.size(); - try - { - ZEN_ASSERT(ChunkCount > 0); - Stopwatch Timer; - ChunkBlockDescription Block; - CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope __(SectionsLock); - Blocks[BlockIndex] = Block; - } - uint64_t BlockSize = CompressedBlock.GetCompressedSize(); - AsyncOnBlock(std::move(CompressedBlock), std::move(Block)); - ZEN_INFO("Generated block with {} attachments in {} ({})", - ChunkCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - NiceBytes(BlockSize)); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - - struct UploadInfo - { - uint64_t OplogSizeBytes = 0; - std::atomic<uint64_t> AttachmentsUploaded = 0; - std::atomic<uint64_t> AttachmentBlocksUploaded = 0; - std::atomic<uint64_t> AttachmentBytesUploaded = 0; - std::atomic<uint64_t> AttachmentBlockBytesUploaded = 0; - }; - - struct CreatedBlock - { - IoBuffer Payload; - ChunkBlockDescription Block; - }; - - void UploadAttachments(WorkerThreadPool& WorkerPool, - CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments, - const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks, - const std::unordered_map<IoHash, CreatedBlock, IoHash::Hasher>& CreatedBlocks, - const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments, - const std::unordered_set<IoHash, IoHash::Hasher>& Needs, - bool ForceAll, - UploadInfo& Info, - AsyncRemoteResult& RemoteResult, - JobContext* OptionalContext) - { - using namespace std::literals; - - if (Needs.empty() && !ForceAll) - { - return; - } - - ReportMessage(OptionalContext, "Filtering needed attachments for upload..."); - - std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload; - std::unordered_map<IoHash, FetchChunkFunc, IoHash::Hasher> BulkBlockAttachmentsToUpload; - - size_t BlockAttachmentCountToUpload = 0; - size_t LargeAttachmentCountToUpload = 0; - size_t BulkAttachmentCountToUpload = 0; - AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size()); - - std::unordered_set<IoHash, IoHash::Hasher> UnknownAttachments(Needs); - - for (const auto& CreatedBlock : CreatedBlocks) - { - if (ForceAll || Needs.contains(CreatedBlock.first)) - { - AttachmentsToUpload.insert(CreatedBlock.first); - BlockAttachmentCountToUpload++; - UnknownAttachments.erase(CreatedBlock.first); - } - } - for (const IoHash& LargeAttachment : LargeAttachments) - { - if (ForceAll || Needs.contains(LargeAttachment)) - { - AttachmentsToUpload.insert(LargeAttachment); - LargeAttachmentCountToUpload++; - UnknownAttachments.erase(LargeAttachment); - } - } - for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& BlockHashes : BlockChunks) - { - for (const std::pair<IoHash, FetchChunkFunc>& Chunk : BlockHashes) - { - if (ForceAll || Needs.contains(Chunk.first)) - { - BulkBlockAttachmentsToUpload.insert(std::make_pair(Chunk.first, Chunk.second)); - BulkAttachmentCountToUpload++; - UnknownAttachments.erase(Chunk.first); - } - } - } - - if (AttachmentsToUpload.empty() && BulkBlockAttachmentsToUpload.empty()) - { - ReportMessage(OptionalContext, "No attachments needed"); - return; - } - - if (!UnknownAttachments.empty()) - { - RemoteResult.SetError( - gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Upload requested of {} missing attachments, the base container referenced blocks that are no longer available", - UnknownAttachments.size()), - ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return; - } - - if (IsCancelled(OptionalContext)) - { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - } - return; - } - - ReportMessage(OptionalContext, - fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)", - AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), - BlockAttachmentCountToUpload, - LargeAttachmentCountToUpload, - BulkAttachmentCountToUpload)); - - Stopwatch Timer; - - ptrdiff_t AttachmentsToSave(0); - Latch SaveAttachmentsLatch(1); - - for (const IoHash& RawHash : AttachmentsToUpload) - { - if (RemoteResult.IsError()) - { - break; - } - - SaveAttachmentsLatch.AddCount(1); - AttachmentsToSave++; - WorkerPool.ScheduleWork( - [&ChunkStore, - &RemoteStore, - &SaveAttachmentsLatch, - &RemoteResult, - RawHash, - &CreatedBlocks, - &LooseFileAttachments, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - IoBuffer Payload; - ChunkBlockDescription Block; - if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) - { - Payload = BlockIt->second.Payload; - Block = BlockIt->second.Block; - } - else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) - { - Payload = LooseTmpFileIt->second(RawHash); - } - else - { - Payload = ChunkStore.FindChunkByCid(RawHash); - } - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_WARN("Failed to save attachment '{}' ({}): {}", - RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - const bool IsBlock = Block.BlockHash == RawHash; - size_t PayloadSize = Payload.GetSize(); - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block)); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachment '{}', {} ({}): {}", - RawHash, - NiceBytes(PayloadSize), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; - } - if (IsBlock) - { - Info.AttachmentBlocksUploaded.fetch_add(1); - Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved block attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); - } - else - { - Info.AttachmentsUploaded.fetch_add(1); - Info.AttachmentBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("To upload attachment {}", RawHash), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - - if (IsCancelled(OptionalContext)) - { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - } - return; - } - - if (!BulkBlockAttachmentsToUpload.empty()) - { - for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& Chunks : BlockChunks) - { - if (RemoteResult.IsError()) - { - break; - } - - std::vector<IoHash> NeededChunks; - NeededChunks.reserve(Chunks.size()); - for (const std::pair<IoHash, FetchChunkFunc>& Chunk : Chunks) - { - const IoHash& ChunkHash = Chunk.first; - if (BulkBlockAttachmentsToUpload.contains(ChunkHash) && !AttachmentsToUpload.contains(ChunkHash)) - { - NeededChunks.push_back(Chunk.first); - } - } - if (NeededChunks.empty()) - { - continue; - } - - SaveAttachmentsLatch.AddCount(1); - AttachmentsToSave++; - WorkerPool.ScheduleWork( - [&RemoteStore, - &ChunkStore, - &SaveAttachmentsLatch, - &RemoteResult, - NeededChunks = std::move(NeededChunks), - &BulkBlockAttachmentsToUpload, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - try - { - size_t ChunksSize = 0; - std::vector<SharedBuffer> ChunkBuffers; - ChunkBuffers.reserve(NeededChunks.size()); - for (const IoHash& Chunk : NeededChunks) - { - auto It = BulkBlockAttachmentsToUpload.find(Chunk); - ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); - CompressedBuffer ChunkPayload = It->second(It->first).second; - if (!ChunkPayload) - { - RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), - fmt::format("Missing chunk {}"sv, Chunk), - fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); - ChunkBuffers.clear(); - break; - } - ChunksSize += ChunkPayload.GetCompressedSize(); - ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer())); - } - RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachments with {} chunks ({}): {}", - NeededChunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; - } - Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); - Info.AttachmentBytesUploaded.fetch_add(ChunksSize); - - ZEN_INFO("Saved {} bulk attachments in {} ({})", - NeededChunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(ChunksSize)); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to buck upload {} attachments", NeededChunks.size()), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - } - - SaveAttachmentsLatch.CountDown(); - while (!SaveAttachmentsLatch.Wait(1000)) - { - ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); - if (IsCancelled(OptionalContext)) - { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - } - } - uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs(); - ReportProgress(OptionalContext, - "Saving attachments"sv, - fmt::format("{} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), - AttachmentsToSave, - Remaining); - } - uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs(); - if (AttachmentsToSave > 0) - { - ReportProgress(OptionalContext, - "Saving attachments"sv, - fmt::format("{}", GetStats(RemoteStore.GetStats(), ElapsedTimeMS)), - AttachmentsToSave, - 0); - } - ReportMessage(OptionalContext, - fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}", - AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), - BlockAttachmentCountToUpload, - LargeAttachmentCountToUpload, - BulkAttachmentCountToUpload, - NiceTimeSpanMs(ElapsedTimeMS), - GetStats(RemoteStore.GetStats(), ElapsedTimeMS))); - } - -} // namespace remotestore_impl - -std::vector<IoHash> -GetBlockHashesFromOplog(CbObjectView ContainerObject) -{ - using namespace std::literals; - std::vector<ChunkBlockDescription> Result; - CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); - - std::vector<IoHash> BlockHashes; - BlockHashes.reserve(BlocksArray.Num()); - for (CbFieldView BlockField : BlocksArray) - { - CbObjectView BlockView = BlockField.AsObjectView(); - IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); - BlockHashes.push_back(BlockHash); - } - return BlockHashes; -} - -std::vector<ThinChunkBlockDescription> -GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes) -{ - using namespace std::literals; - std::vector<ThinChunkBlockDescription> Result; - CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); - tsl::robin_set<IoHash, IoHash::Hasher> IncludeSet; - IncludeSet.insert(IncludeBlockHashes.begin(), IncludeBlockHashes.end()); - - Result.reserve(IncludeBlockHashes.size()); - for (CbFieldView BlockField : BlocksArray) - { - CbObjectView BlockView = BlockField.AsObjectView(); - IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); - if (IncludeSet.contains(BlockHash)) - { - std::vector<IoHash> ChunkHashes; - CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); - if (BlockHash == IoHash::Zero) - { - continue; - } - ChunkHashes.reserve(ChunksArray.Num()); - for (CbFieldView ChunkField : ChunksArray) - { - ChunkHashes.push_back(ChunkField.AsHash()); - } - Result.push_back(ThinChunkBlockDescription{.BlockHash = BlockHash, .ChunkRawHashes = std::move(ChunkHashes)}); - } - } - return Result; -} - -CbObject -BuildContainer(CidStore& ChunkStore, - ProjectStore::Project& Project, - ProjectStore::Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunksPerBlock, - size_t MaxChunkEmbedSize, - size_t ChunkFileSizeLimit, - bool BuildBlocks, - bool IgnoreMissingAttachments, - bool AllowChunking, - const std::vector<ThinChunkBlockDescription>& KnownBlocks, - WorkerThreadPool& WorkerPool, - const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, - const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, - const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, - bool EmbedLooseFiles, - JobContext* OptionalContext, - remotestore_impl::AsyncRemoteResult& RemoteResult) -{ - using namespace std::literals; - - size_t OpCount = 0; - - CbObject OplogContainerObject; - { - struct FoundAttachment - { - std::filesystem::path RawPath; // If not stored in cid - uint64_t Size = 0; - Oid Key = Oid::Zero; - }; - - std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments; - - RwLock BlocksLock; - std::vector<ChunkBlockDescription> Blocks; - CompressedBuffer OpsBuffer; - - std::filesystem::path AttachmentTempPath = Oplog.TempPath(); - AttachmentTempPath.append(".pending"); - CreateDirectories(AttachmentTempPath); - - auto RewriteOp = [&](const Oid& Key, CbObjectView Op, const std::function<void(CbObjectView)>& CB) { - bool OpRewritten = false; - CbArrayView Files = Op["files"sv].AsArrayView(); - if (Files.Num() == 0) - { - CB(Op); - return; - } - - CbWriter Cbo; - Cbo.BeginArray("files"sv); - - for (CbFieldView& Field : Files) - { - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - CB(Op); - return; - } - - bool CopyField = true; - - if (CbObjectView View = Field.AsObjectView()) - { - IoHash DataHash = View["data"sv].AsHash(); - - if (DataHash == IoHash::Zero) - { - std::string_view ServerPath = View["serverpath"sv].AsString(); - std::filesystem::path FilePath = Project.RootDir / ServerPath; - if (!IsFile(FilePath)) - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId())); - if (IgnoreMissingAttachments) - { - continue; - } - else - { - ExtendableStringBuilder<1024> Sb; - Sb.Append("Failed to find attachment '"); - Sb.Append(FilePath.string()); - Sb.Append("' for op: \n"); - View.ToJson(Sb); - throw std::runtime_error(Sb.ToString()); - } - } - - { - Stopwatch HashTimer; - SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath)); - DataHash = IoHash::HashBuffer(CompositeBuffer(DataBuffer)); - ZEN_INFO("Hashed loose file '{}' {}: {} in {}", - FilePath, - NiceBytes(DataBuffer.GetSize()), - DataHash, - NiceTimeSpanMs(HashTimer.GetElapsedTimeMs())); - } - - // Rewrite file array entry with new data reference - CbObjectWriter Writer; - RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { - if (Field.GetName() == "data"sv) - { - // omit this field as we will write it explicitly ourselves - return true; - } - return false; - }); - Writer.AddBinaryAttachment("data"sv, DataHash); - UploadAttachments.insert_or_assign(DataHash, FoundAttachment{.RawPath = FilePath, .Key = Key}); - - CbObject RewrittenOp = Writer.Save(); - Cbo.AddObject(std::move(RewrittenOp)); - CopyField = false; - } - } - - if (CopyField) - { - Cbo.AddField(Field); - } - else - { - OpRewritten = true; - } - } - - if (!OpRewritten) - { - CB(Op); - return; - } - - Cbo.EndArray(); - CbArray FilesArray = Cbo.Save().AsArray(); - - CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { - if (Field.GetName() == "files"sv) - { - NewWriter.AddArray("files"sv, FilesArray); - - return true; - } - - return false; - }); - CB(RewrittenOp); - }; - - remotestore_impl::ReportMessage(OptionalContext, "Building exported oplog and collecting attachments"); - - Stopwatch Timer; - - size_t TotalOpCount = Oplog.GetOplogEntryCount(); - CompressedBuffer CompressedOpsSection; - { - Stopwatch RewriteOplogTimer; - CbObjectWriter SectionOpsWriter; - SectionOpsWriter.BeginArray("ops"sv); - { - Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) { - if (RemoteResult.IsError()) - { - return; - } - Op.IterateAttachments([&](CbFieldView FieldView) { - UploadAttachments.insert_or_assign(FieldView.AsAttachment(), FoundAttachment{.Key = Key}); - }); - if (EmbedLooseFiles) - { - RewriteOp(Key, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; }); - } - else - { - SectionOpsWriter << Op; - } - OpCount++; - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return; - } - if (OpCount % 1000 == 0) - { - remotestore_impl::ReportProgress(OptionalContext, - "Building oplog"sv, - fmt::format("{} ops processed", OpCount), - TotalOpCount, - TotalOpCount - OpCount); - } - }); - if (RemoteResult.IsError()) - { - return {}; - } - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } - if (TotalOpCount > 0) - { - remotestore_impl::ReportProgress(OptionalContext, - "Building oplog"sv, - fmt::format("{} ops processed", OpCount), - TotalOpCount, - 0); - } - } - SectionOpsWriter.EndArray(); // "ops" - - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Rewrote {} ops to new oplog in {}", - OpCount, - NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs())))); - - { - Stopwatch CompressOpsTimer; - CompressedOpsSection = - CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Compressed oplog section {} ({} -> {}) in {}", - CompressedOpsSection.DecodeRawHash(), - NiceBytes(CompressedOpsSection.DecodeRawSize()), - NiceBytes(CompressedOpsSection.GetCompressedSize()), - NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs())))); - } - } - - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } - - auto FindReuseBlocks = [](const std::vector<ThinChunkBlockDescription>& KnownBlocks, - const std::unordered_set<IoHash, IoHash::Hasher>& Attachments, - JobContext* OptionalContext) -> std::vector<size_t> { - std::vector<size_t> ReuseBlockIndexes; - if (!Attachments.empty() && !KnownBlocks.empty()) - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Checking {} Attachments against {} known blocks for reuse", Attachments.size(), KnownBlocks.size())); - Stopwatch ReuseTimer; - - for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) - { - const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; - size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size(); - if (BlockAttachmentCount == 0) - { - continue; - } - size_t FoundAttachmentCount = 0; - for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) - { - if (Attachments.contains(KnownHash)) - { - FoundAttachmentCount++; - } - } - - size_t ReusePercent = (FoundAttachmentCount * 100) / BlockAttachmentCount; - // TODO: Configure reuse-level - if (ReusePercent > 80) - { - ZEN_DEBUG("Reusing block {}. {} attachments found, usage level: {}%", - KnownBlock.BlockHash, - FoundAttachmentCount, - ReusePercent); - ReuseBlockIndexes.push_back(KnownBlockIndex); - } - else if (FoundAttachmentCount > 0) - { - ZEN_DEBUG("Skipping block {}. {} attachments found, usage level: {}%", - KnownBlock.BlockHash, - FoundAttachmentCount, - ReusePercent); - } - } - } - return ReuseBlockIndexes; - }; - - std::unordered_set<IoHash, IoHash::Hasher> FoundHashes; - FoundHashes.reserve(UploadAttachments.size()); - for (const auto& It : UploadAttachments) - { - FoundHashes.insert(It.first); - } - - size_t ReusedAttachmentCount = 0; - std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext); - for (size_t KnownBlockIndex : ReusedBlockIndexes) - { - const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) - { - if (UploadAttachments.erase(KnownHash) == 1) - { - ReusedAttachmentCount++; - } - } - } - - struct ChunkedFile - { - IoBuffer Source; - - ChunkedInfoWithSource Chunked; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkLoookup; - }; - std::vector<ChunkedFile> ChunkedFiles; - - auto ChunkFile = [](const IoHash& RawHash, IoBuffer& RawData, const IoBufferFileReference& FileRef, JobContext*) -> ChunkedFile { - ChunkedFile Chunked; - Stopwatch Timer; - - uint64_t Offset = FileRef.FileChunkOffset; - uint64_t Size = FileRef.FileChunkSize; - - BasicFile SourceFile; - SourceFile.Attach(FileRef.FileHandle); - auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); }); - - Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams); - ZEN_ASSERT(Chunked.Chunked.Info.RawHash == RawHash); - Chunked.Source = RawData; - - ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}", - RawHash, - NiceBytes(Chunked.Chunked.Info.RawSize), - Chunked.Chunked.Info.ChunkHashes.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - - return Chunked; - }; - - RwLock ResolveLock; - std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes; - std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; - std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments; - std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments; - std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; - - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount)); - - Latch ResolveAttachmentsLatch(1); - for (auto& It : UploadAttachments) - { - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } - - ResolveAttachmentsLatch.AddCount(1); - - WorkerPool.ScheduleWork( - [&ChunkStore, - UploadAttachment = &It.second, - RawHash = It.first, - &ResolveAttachmentsLatch, - &ResolveLock, - &ChunkedHashes, - &LargeChunkHashes, - &ChunkedUploadAttachments, - &LooseUploadAttachments, - &MissingHashes, - &OnLargeAttachment, - &AttachmentTempPath, - &ChunkFile, - &ChunkedFiles, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - AllowChunking, - &RemoteResult, - OptionalContext]() { - auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - return; - } - - try - { - if (!UploadAttachment->RawPath.empty()) - { - const std::filesystem::path& FilePath = UploadAttachment->RawPath; - IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); - if (RawData) - { - if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit) - { - IoBufferFileReference FileRef; - (void)RawData.GetFileReference(FileRef); - - ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext); - ResolveLock.WithExclusiveLock( - [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { - ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); - ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); - for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) - { - ChunkedHashes.insert(ChunkHash); - } - ChunkedFiles.emplace_back(std::move(Chunked)); - }); - } - else if (RawData.GetSize() > (MaxChunkEmbedSize * 2)) - { - // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't - // it will be a loose attachment instead of going into a block - OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) { - size_t RawSize = RawData.GetSize(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), - OodleCompressor::Mermaid, - OodleCompressionLevel::VeryFast); - - std::filesystem::path AttachmentPath = AttachmentTempPath; - AttachmentPath.append(RawHash.ToHexString()); - IoBuffer TempAttachmentBuffer = - WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); - ZEN_INFO("Saved temp attachment to '{}', {} ({})", - AttachmentPath, - NiceBytes(RawSize), - NiceBytes(TempAttachmentBuffer.GetSize())); - return TempAttachmentBuffer; - }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); - } - else - { - uint64_t RawSize = RawData.GetSize(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData), - OodleCompressor::Mermaid, - OodleCompressionLevel::VeryFast); - - std::filesystem::path AttachmentPath = AttachmentTempPath; - AttachmentPath.append(RawHash.ToHexString()); - - uint64_t CompressedSize = Compressed.GetCompressedSize(); - IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); - ZEN_INFO("Saved temp attachment to '{}', {} ({})", - AttachmentPath, - NiceBytes(RawSize), - NiceBytes(TempAttachmentBuffer.GetSize())); - - if (CompressedSize > MaxChunkEmbedSize) - { - OnLargeAttachment(RawHash, - [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); - } - else - { - UploadAttachment->Size = CompressedSize; - ResolveLock.WithExclusiveLock( - [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { - LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); - }); - } - } - } - else - { - ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); - } - } - else - { - IoBuffer Data = ChunkStore.FindChunkByCid(RawHash); - if (Data) - { - auto GetForChunking = - [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool { - if (Data.IsWholeFile()) - { - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize); - if (Compressed) - { - if (VerifyRawSize > ChunkFileSizeLimit) - { - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - uint64_t BlockSize; - if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) - { - if (CompressionLevel == OodleCompressionLevel::None) - { - CompositeBuffer Decompressed = Compressed.DecompressToComposite(); - if (Decompressed) - { - std::span<const SharedBuffer> Segments = Decompressed.GetSegments(); - if (Segments.size() == 1) - { - IoBuffer DecompressedData = Segments[0].AsIoBuffer(); - if (DecompressedData.GetFileReference(OutFileRef)) - { - return true; - } - } - } - } - } - } - } - } - return false; - }; - - IoBufferFileReference FileRef; - if (AllowChunking && GetForChunking(ChunkFileSizeLimit, Data, FileRef)) - { - ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext); - ResolveLock.WithExclusiveLock( - [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { - ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); - ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); - for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) - { - ChunkedHashes.insert(ChunkHash); - } - ChunkedFiles.emplace_back(std::move(Chunked)); - }); - } - else if (Data.GetSize() > MaxChunkEmbedSize) - { - OnLargeAttachment(RawHash, - [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); - } - else - { - UploadAttachment->Size = Data.GetSize(); - } - } - else - { - ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); - } - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to resolve attachment {}", RawHash), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - ResolveAttachmentsLatch.CountDown(); - - while (!ResolveAttachmentsLatch.Wait(1000)) - { - ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - while (!ResolveAttachmentsLatch.Wait(1000)) - { - Remaining = ResolveAttachmentsLatch.Remaining(); - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - fmt::format("Aborting, {} attachments remaining...", Remaining), - UploadAttachments.size(), - Remaining); - } - remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0); - return {}; - } - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - fmt::format("{} remaining...", Remaining), - UploadAttachments.size(), - Remaining); - } - if (UploadAttachments.size() > 0) - { - remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, ""sv, UploadAttachments.size(), 0); - } - - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } - - for (const IoHash& AttachmentHash : MissingHashes) - { - auto It = UploadAttachments.find(AttachmentHash); - ZEN_ASSERT(It != UploadAttachments.end()); - std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key); - ZEN_ASSERT(Op.has_value()); - - if (IgnoreMissingAttachments) - { - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key)); - } - else - { - ExtendableStringBuilder<1024> Sb; - Sb.Append("Failed to find attachment '"); - Sb.Append(AttachmentHash.ToHexString()); - Sb.Append("' for op: \n"); - Op.value().ToJson(Sb); - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); - return {}; - } - UploadAttachments.erase(AttachmentHash); - } - - for (const auto& It : ChunkedUploadAttachments) - { - UploadAttachments.erase(It.first); - } - for (const auto& It : LargeChunkHashes) - { - UploadAttachments.erase(It); - } - - std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext); - for (size_t KnownBlockIndex : ReusedBlockIndexes) - { - const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) - { - if (ChunkedHashes.erase(KnownHash) == 1) - { - ReusedAttachmentCount++; - } - } - } - - ReusedBlockIndexes.insert(ReusedBlockIndexes.end(), ReusedBlockFromChunking.begin(), ReusedBlockFromChunking.end()); - std::sort(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end()); - auto UniqueKnownBlocksEnd = std::unique(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end()); - size_t ReuseBlockCount = std::distance(ReusedBlockIndexes.begin(), UniqueKnownBlocksEnd); - if (ReuseBlockCount > 0) - { - Blocks.reserve(ReuseBlockCount); - for (auto It = ReusedBlockIndexes.begin(); It != UniqueKnownBlocksEnd; It++) - { - Blocks.push_back({KnownBlocks[*It]}); - } - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount)); - } - - std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments; - SortedUploadAttachments.reserve(UploadAttachments.size()); - for (const auto& It : UploadAttachments) - { - SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key)); - } - - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } - - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount)); - - // Sort attachments so we get predictable blocks for the same oplog upload - std::sort(SortedUploadAttachments.begin(), - SortedUploadAttachments.end(), - [](const std::pair<IoHash, Oid>& Lhs, const std::pair<IoHash, Oid>& Rhs) { - if (Lhs.second == Rhs.second) - { - // Same key, sort by raw hash - return Lhs.first < Rhs.first; - } - // Sort by key - return Lhs.second < Rhs.second; - }); - - std::vector<size_t> ChunkedFilesOrder; - ChunkedFilesOrder.reserve(ChunkedFiles.size()); - for (size_t Index = 0; Index < ChunkedFiles.size(); Index++) - { - ChunkedFilesOrder.push_back(Index); - } - std::sort(ChunkedFilesOrder.begin(), ChunkedFilesOrder.end(), [&ChunkedFiles](size_t Lhs, size_t Rhs) { - return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash; - }); - - // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded - // ChunkedHashes contains all chunked up chunks to be composed into blocks - - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments", - SortedUploadAttachments.size(), - ChunkedHashes.size(), - TotalOpCount)); - - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } - - // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded - // ChunkedHashes contains all chunked up chunks to be composed into blocks - - size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size(); - size_t ChunksAssembled = 0; - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount)); - - Latch BlockCreateLatch(1); - size_t GeneratedBlockCount = 0; - size_t BlockSize = 0; - std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock; - - Oid LastOpKey = Oid::Zero; - uint32_t ComposedBlocks = 0; - - uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs(); - try - { - uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs(); - std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; - auto NewBlock = [&]() { - size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); - size_t ChunkCount = ChunksInBlock.size(); - if (BuildBlocks) - { - remotestore_impl::CreateBlock(WorkerPool, - BlockCreateLatch, - std::move(ChunksInBlock), - BlocksLock, - Blocks, - BlockIndex, - AsyncOnBlock, - RemoteResult); - ComposedBlocks++; - } - else - { - ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size()); - OnBlockChunks(std::move(ChunksInBlock)); - } - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunkRawHashes.insert(Blocks[BlockIndex].ChunkRawHashes.end(), - BlockAttachmentHashes.begin(), - BlockAttachmentHashes.end()); - } - uint64_t NowMS = Timer.GetElapsedTimeMs(); - ZEN_INFO("Assembled block {} with {} chunks in {} ({})", - BlockIndex, - ChunkCount, - NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS), - NiceBytes(BlockSize)); - FetchAttachmentsStartMS = NowMS; - BlockAttachmentHashes.clear(); - ChunksInBlock.clear(); - BlockSize = 0; - GeneratedBlockCount++; - }; - - for (auto HashIt = SortedUploadAttachments.begin(); HashIt != SortedUploadAttachments.end(); HashIt++) - { - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - break; - } - if (ChunksAssembled % 1000 == 0) - { - remotestore_impl::ReportProgress( - OptionalContext, - "Assembling blocks"sv, - fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), - ChunkAssembleCount, - ChunkAssembleCount - ChunksAssembled); - } - const IoHash& RawHash(HashIt->first); - const Oid CurrentOpKey = HashIt->second; - const IoHash& AttachmentHash(HashIt->first); - auto InfoIt = UploadAttachments.find(RawHash); - ZEN_ASSERT(InfoIt != UploadAttachments.end()); - uint64_t PayloadSize = InfoIt->second.Size; - - if (BlockAttachmentHashes.insert(AttachmentHash).second) - { - if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end()) - { - ChunksInBlock.emplace_back(std::make_pair( - RawHash, - [RawSize = It->second.first, - IoBuffer = SharedBuffer(It->second.second)](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { - return std::make_pair(RawSize, CompressedBuffer::FromCompressedNoValidate(IoBuffer.AsIoBuffer())); - })); - LooseUploadAttachments.erase(It); - } - else - { - ChunksInBlock.emplace_back( - std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> { - IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash); - if (!Chunk) - { - throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash)); - } - IoHash ValidateRawHash; - uint64_t RawSize = 0; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), ValidateRawHash, RawSize); - if (!Compressed) - { - throw std::runtime_error( - fmt::format("Chunk {} in cid store is malformed (not a compressed buffer)", RawHash)); - } - if (RawHash != ValidateRawHash) - { - throw std::runtime_error( - fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash)); - } - return {RawSize, Compressed}; - })); - } - BlockSize += PayloadSize; - - if ((BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock) && (CurrentOpKey != LastOpKey)) - { - NewBlock(); - } - LastOpKey = CurrentOpKey; - ChunksAssembled++; - } - } - if (!RemoteResult.IsError()) - { - // Keep the chunked files as separate blocks to make the blocks generated - // more consistent - if (BlockSize > 0) - { - NewBlock(); - } - - for (size_t ChunkedFileIndex : ChunkedFilesOrder) - { - const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex]; - const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked; - size_t ChunkCount = Chunked.Info.ChunkHashes.size(); - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++) - { - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - break; - } - if (ChunksAssembled % 1000 == 0) - { - remotestore_impl::ReportProgress( - OptionalContext, - "Assembling blocks"sv, - fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), - ChunkAssembleCount, - ChunkAssembleCount - ChunksAssembled); - } - const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex]; - if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end()) - { - if (BlockAttachmentHashes.insert(ChunkHash).second) - { - const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex]; - ChunksInBlock.emplace_back( - std::make_pair(ChunkHash, - [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( - const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { - return {Size, - CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), - OodleCompressor::Mermaid, - OodleCompressionLevel::None)}; - })); - BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size; - if (BuildBlocks) - { - if (BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock) - { - NewBlock(); - } - } - ChunksAssembled++; - } - ChunkedHashes.erase(FindIt); - } - } - } - } - - if (BlockSize > 0 && !RemoteResult.IsError()) - { - if (!remotestore_impl::IsCancelled(OptionalContext)) - { - NewBlock(); - } - } - - if (ChunkAssembleCount > 0) - { - remotestore_impl::ReportProgress( - OptionalContext, - "Assembling blocks"sv, - fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), - ChunkAssembleCount, - 0); - } - - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}", - ChunkAssembleCount, - TotalOpCount, - GeneratedBlockCount, - NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); - - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) - { - ptrdiff_t Remaining = BlockCreateLatch.Remaining(); - remotestore_impl::ReportProgress(OptionalContext, - "Assembling blocks"sv, - fmt::format("Aborting, {} blocks remaining...", Remaining), - GeneratedBlockCount, - Remaining); - } - if (GeneratedBlockCount > 0) - { - remotestore_impl::ReportProgress(OptionalContext, - "Assembling blocks"sv, - fmt::format("Aborting, {} blocks remaining...", 0), - GeneratedBlockCount, - 0); - } - return {}; - } - } - catch (const std::exception& Ex) - { - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) - { - } - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), "Block creation failed", Ex.what()); - throw; - } - - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) - { - ptrdiff_t Remaining = BlockCreateLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - while (!BlockCreateLatch.Wait(1000)) - { - Remaining = BlockCreateLatch.Remaining(); - remotestore_impl::ReportProgress(OptionalContext, - "Creating blocks"sv, - fmt::format("Aborting, {} blocks remaining...", Remaining), - GeneratedBlockCount, - Remaining); - } - remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, "Aborted"sv, GeneratedBlockCount, 0); - return {}; - } - remotestore_impl::ReportProgress(OptionalContext, - "Creating blocks"sv, - fmt::format("{} remaining...", Remaining), - GeneratedBlockCount, - Remaining); - } - - if (GeneratedBlockCount > 0) - { - uint64_t NowMS = Timer.GetElapsedTimeMs(); - remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, ""sv, GeneratedBlockCount, 0); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Created {} blocks in {}", GeneratedBlockCount, NiceTimeSpanMs(NowMS - CreateBlocksStartMS))); - } - - if (!RemoteResult.IsError()) - { - CbObjectWriter OplogContinerWriter; - RwLock::SharedLockScope _(BlocksLock); - OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); - OplogContinerWriter.BeginArray("blocks"sv); - { - for (const ChunkBlockDescription& B : Blocks) - { - ZEN_ASSERT(!B.ChunkRawHashes.empty()); - if (BuildBlocks) - { - ZEN_ASSERT(B.BlockHash != IoHash::Zero); - - OplogContinerWriter.BeginObject(); - { - OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); - OplogContinerWriter.BeginArray("chunks"sv); - { - for (const IoHash& RawHash : B.ChunkRawHashes) - { - OplogContinerWriter.AddHash(RawHash); - } - } - OplogContinerWriter.EndArray(); // "chunks" - } - OplogContinerWriter.EndObject(); - continue; - } - - ZEN_ASSERT(B.BlockHash == IoHash::Zero); - OplogContinerWriter.BeginObject(); - { - OplogContinerWriter.BeginArray("chunks"sv); - { - for (const IoHash& RawHash : B.ChunkRawHashes) - { - OplogContinerWriter.AddBinaryAttachment(RawHash); - } - } - OplogContinerWriter.EndArray(); - } - OplogContinerWriter.EndObject(); - } - } - OplogContinerWriter.EndArray(); // "blocks"sv - OplogContinerWriter.BeginArray("chunkedfiles"sv); - { - for (const ChunkedFile& F : ChunkedFiles) - { - OplogContinerWriter.BeginObject(); - { - OplogContinerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash); - OplogContinerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize); - OplogContinerWriter.BeginArray("chunks"sv); - { - for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes) - { - OplogContinerWriter.AddHash(RawHash); - } - } - OplogContinerWriter.EndArray(); // "chunks" - OplogContinerWriter.BeginArray("sequence"sv); - { - for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence) - { - OplogContinerWriter.AddInteger(ChunkIndex); - } - } - OplogContinerWriter.EndArray(); // "sequence" - } - OplogContinerWriter.EndObject(); - } - } - OplogContinerWriter.EndArray(); // "chunkedfiles"sv - - OplogContinerWriter.BeginArray("chunks"sv); - { - for (const IoHash& AttachmentHash : LargeChunkHashes) - { - OplogContinerWriter.AddBinaryAttachment(AttachmentHash); - } - } - OplogContinerWriter.EndArray(); // "chunks" - - OplogContainerObject = OplogContinerWriter.Save(); - } - } - return OplogContainerObject; -} - -RemoteProjectStore::LoadContainerResult -BuildContainer(CidStore& ChunkStore, - ProjectStore::Project& Project, - ProjectStore::Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunksPerBlock, - size_t MaxChunkEmbedSize, - size_t ChunkFileSizeLimit, - bool BuildBlocks, - bool IgnoreMissingAttachments, - bool AllowChunking, - const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, - const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, - const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, - bool EmbedLooseFiles) -{ - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - - remotestore_impl::AsyncRemoteResult RemoteResult; - CbObject ContainerObject = BuildContainer(ChunkStore, - Project, - Oplog, - MaxBlockSize, - MaxChunksPerBlock, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - BuildBlocks, - IgnoreMissingAttachments, - AllowChunking, - {}, - WorkerPool, - AsyncOnBlock, - OnLargeAttachment, - OnBlockChunks, - EmbedLooseFiles, - nullptr, - RemoteResult); - return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; -} - -RemoteProjectStore::Result -SaveOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Project& Project, - ProjectStore::Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunksPerBlock, - size_t MaxChunkEmbedSize, - size_t ChunkFileSizeLimit, - bool EmbedLooseFiles, - bool ForceUpload, - bool IgnoreMissingAttachments, - JobContext* OptionalContext) -{ - using namespace std::literals; - - Stopwatch Timer; - - remotestore_impl::UploadInfo Info; - - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background); - - const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); - - std::filesystem::path AttachmentTempPath; - if (RemoteStoreInfo.UseTempBlockFiles) - { - AttachmentTempPath = Oplog.TempPath(); - AttachmentTempPath.append(".pending"); - CreateDirectories(AttachmentTempPath); - } - - remotestore_impl::AsyncRemoteResult RemoteResult; - RwLock AttachmentsLock; - std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; - std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks; - tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles; - - auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, - ChunkBlockDescription&& Block) { - std::filesystem::path BlockPath = AttachmentTempPath; - BlockPath.append(Block.BlockHash.ToHexString()); - try - { - IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath); - RwLock::ExclusiveLockScope __(AttachmentsLock); - CreatedBlocks.insert({Block.BlockHash, {.Payload = std::move(BlockBuffer), .Block = std::move(Block)}}); - ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize())); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - Ex.what(), - "Unable to create temp block file"); - return; - } - }; - - auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, - ChunkBlockDescription&& Block) { - IoHash BlockHash = Block.BlockHash; - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block)); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return; - } - Info.AttachmentBlocksUploaded.fetch_add(1); - Info.AttachmentBlockBytesUploaded.fetch_add(CompressedBlock.GetCompressedSize()); - ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); - }; - - std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>> BlockChunks; - auto OnBlockChunks = [&BlockChunks](std::vector<std::pair<IoHash, FetchChunkFunc>>&& Chunks) { - BlockChunks.push_back({Chunks.begin(), Chunks.end()}); - ZEN_DEBUG("Found {} block chunks", Chunks.size()); - }; - - auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments, &LooseLargeFiles](const IoHash& AttachmentHash, - TGetAttachmentBufferFunc&& GetBufferFunc) { - { - RwLock::ExclusiveLockScope _(AttachmentsLock); - LargeAttachments.insert(AttachmentHash); - LooseLargeFiles.insert_or_assign(AttachmentHash, std::move(GetBufferFunc)); - } - ZEN_DEBUG("Found attachment {}", AttachmentHash); - }; - - std::function<void(CompressedBuffer&&, ChunkBlockDescription &&)> OnBlock; - if (RemoteStoreInfo.UseTempBlockFiles) - { - OnBlock = MakeTempBlock; - } - else - { - OnBlock = UploadBlock; - } - - std::vector<ThinChunkBlockDescription> KnownBlocks; - - uint64_t TransferWallTimeMS = 0; - - RemoteProjectStore::CreateContainerResult ContainerResult = RemoteStore.CreateContainer(); - if (ContainerResult.ErrorCode) - { - RemoteProjectStore::Result Result = {.ErrorCode = ContainerResult.ErrorCode, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Text = fmt::format("Failed to create container for oplog '{}' ({}): {}", - RemoteStoreInfo.ContainerName, - ContainerResult.ErrorCode, - ContainerResult.Reason)}; - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return Result; - } - - if (RemoteStoreInfo.CreateBlocks) - { - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Fetching known blocks from '{}'", RemoteStoreInfo.Description)); - Stopwatch GetKnownBlocksTimer; - RemoteProjectStore::GetKnownBlocksResult KnownBlocksResult = RemoteStore.GetKnownBlocks(); - TransferWallTimeMS += GetKnownBlocksTimer.GetElapsedTimeMs(); - - if (KnownBlocksResult.ErrorCode == static_cast<int>(HttpResponseCode::NoContent)) - { - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("No known blocks in '{}', uploading all attachments", RemoteStoreInfo.Description)); - } - else if (KnownBlocksResult.ErrorCode) - { - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Failed to get known blocks from '{}' ({}): {}, uploading all attachments", - RemoteStoreInfo.Description, - KnownBlocksResult.ErrorCode, - KnownBlocksResult.Reason)); - } - else - { - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Fetched {} known blocks from '{}' in {}", - KnownBlocksResult.Blocks.size(), - RemoteStoreInfo.Description, - NiceTimeSpanMs(static_cast<uint64_t>(KnownBlocksResult.ElapsedSeconds * 1000.0)))); - KnownBlocks = std::move(KnownBlocksResult.Blocks); - } - } - - CbObject OplogContainerObject = BuildContainer(ChunkStore, - Project, - Oplog, - MaxBlockSize, - MaxChunksPerBlock, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - RemoteStoreInfo.CreateBlocks, - IgnoreMissingAttachments, - RemoteStoreInfo.AllowChunking, - KnownBlocks, - WorkerPool, - OnBlock, - OnLargeAttachment, - OnBlockChunks, - EmbedLooseFiles, - OptionalContext, - /* out */ RemoteResult); - if (!RemoteResult.IsError()) - { - Info.OplogSizeBytes = OplogContainerObject.GetSize(); - - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteProjectStore::Result Result = {.ErrorCode = 0, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Text = "Operation cancelled"}; - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return Result; - } - - uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); - uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Saving oplog container '{}' with {} attachments and {} blocks...", - RemoteStoreInfo.ContainerName, - ChunkCount, - BlockCount)); - Stopwatch SaveContainerTimer; - IoBuffer ContainerPayload = OplogContainerObject.GetBuffer().AsIoBuffer(); - ContainerPayload.SetContentType(ZenContentType::kCbObject); - RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(std::move(ContainerPayload)); - TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs(); - if (ContainerSaveResult.ErrorCode) - { - RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); - RemoteProjectStore::Result Result = { - .ErrorCode = 0, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Text = fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())}; - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return Result; - } - else - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Saved container '{}' in {}", - RemoteStoreInfo.ContainerName, - NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000.0)))); - } - - { - Stopwatch UploadAttachmentsTimer; - UploadAttachments(NetworkWorkerPool, - ChunkStore, - RemoteStore, - LargeAttachments, - BlockChunks, - CreatedBlocks, - LooseLargeFiles, - ContainerSaveResult.Needs, - ForceUpload, - Info, - RemoteResult, - OptionalContext); - TransferWallTimeMS += UploadAttachmentsTimer.GetElapsedTimeMs(); - } - - uint32_t Try = 0; - while (!RemoteResult.IsError()) - { - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteProjectStore::Result Result = {.ErrorCode = 0, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Text = "Operation cancelled"}; - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Text)); - return Result; - } - - remotestore_impl::ReportMessage(OptionalContext, "Finalizing oplog container..."); - RemoteProjectStore::FinalizeResult ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); - if (ContainerFinalizeResult.ErrorCode) - { - RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Failed to finalize oplog container {} ({}): {}", - ContainerSaveResult.RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); - return Result; - } - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Finalized container '{}' in {}", - RemoteStoreInfo.ContainerName, - NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000.0)))); - - if (ContainerFinalizeResult.Needs.empty()) - { - break; - } - - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteProjectStore::Result Result = {.ErrorCode = 0, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Text = "Operation cancelled"}; - return Result; - } - - const uint32_t MaxTries = 8; - if (Try < MaxTries) - { - Try++; - - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Finalize of container '{}' reported {} missing attachments. Uploading missing attachements. Try {}", - RemoteStoreInfo.ContainerName, - ContainerFinalizeResult.Needs.size(), - Try)); - - Stopwatch UploadAttachmentsTimer; - UploadAttachments(NetworkWorkerPool, - ChunkStore, - RemoteStore, - LargeAttachments, - BlockChunks, - CreatedBlocks, - LooseLargeFiles, - ContainerFinalizeResult.Needs, - false, - Info, - RemoteResult, - OptionalContext); - TransferWallTimeMS += UploadAttachmentsTimer.GetElapsedTimeMs(); - } - else - { - RemoteResult.SetError( - gsl::narrow<int>(HttpResponseCode::InternalServerError), - "Failed to save oplog container", - fmt::format("Giving up finalize oplog container {} after {} retries, still getting reports of missing attachments", - ContainerSaveResult.RawHash, - ContainerFinalizeResult.Needs.size())); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Failed to finalize oplog container container {} ({}): {}", - ContainerSaveResult.RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - break; - } - } - - LooseLargeFiles.clear(); - CreatedBlocks.clear(); - } - RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - - remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats()); - - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}", - RemoteStoreInfo.ContainerName, - RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), - NiceBytes(Info.OplogSizeBytes), - Info.AttachmentBlocksUploaded.load(), - NiceBytes(Info.AttachmentBlockBytesUploaded.load()), - Info.AttachmentsUploaded.load(), - NiceBytes(Info.AttachmentBytesUploaded.load()), - remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); - - return Result; -}; - -RemoteProjectStore::Result -ParseOplogContainer(const CbObject& ContainerObject, - const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, - const std::function<bool(const IoHash& RawHash)>& HasAttachment, - const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, - const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, - CbObject& OutOplogSection, - JobContext* OptionalContext) -{ - using namespace std::literals; - - Stopwatch Timer; - - MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); - IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); - IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); - - CbValidateError ValidateResult = CbValidateError::None; - if (CbObject SectionObject = ValidateAndReadCompactBinaryObject(std::move(SectionPayload), ValidateResult); - ValidateResult == CbValidateError::None && ContainerObject) - { - OutOplogSection = SectionObject; - } - else - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Failed to save oplog container: '{}' ('{}')", "Section has unexpected data type", ToString(ValidateResult))); - return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), - Timer.GetElapsedTimeMs() / 1000.0, - "Section has unexpected data type", - "Failed to save oplog container"}; - } - std::unordered_set<IoHash, IoHash::Hasher> OpsAttachments; - { - CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView(); - for (CbFieldView OpEntry : OpsArray) - { - OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); }); - } - } - { - std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end()); - OnReferencedAttachments(ReferencedAttachments); - } - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size())); - - CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); - for (CbFieldView ChunkedFileField : ChunkedFilesArray) - { - CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView(); - IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash(); - if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash))) - { - ChunkedInfo Chunked; - Chunked.RawHash = RawHash; - Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64(); - CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView(); - Chunked.ChunkHashes.reserve(ChunksArray.Num()); - for (CbFieldView ChunkField : ChunksArray) - { - const IoHash ChunkHash = ChunkField.AsHash(); - Chunked.ChunkHashes.emplace_back(ChunkHash); - } - OnReferencedAttachments(Chunked.ChunkHashes); - OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end()); - CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView(); - Chunked.ChunkSequence.reserve(SequenceArray.Num()); - for (CbFieldView SequenceField : SequenceArray) - { - uint32_t SequenceIndex = SequenceField.AsUInt32(); - ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size()); - Chunked.ChunkSequence.push_back(SequenceIndex); - } - OnChunkedAttachment(Chunked); - ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - Chunked.ChunkHashes.size()); - } - if (remotestore_impl::IsCancelled(OptionalContext)) - { - return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Reason = "Operation cancelled"}; - } - } - - size_t NeedBlockCount = 0; - CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); - for (CbFieldView BlockField : BlocksArray) - { - CbObjectView BlockView = BlockField.AsObjectView(); - IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); - - CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); - - std::vector<IoHash> NeededChunks; - NeededChunks.reserve(ChunksArray.Num()); - if (BlockHash == IoHash::Zero) - { - for (CbFieldView ChunkField : ChunksArray) - { - IoHash ChunkHash = ChunkField.AsBinaryAttachment(); - if (OpsAttachments.contains(ChunkHash) && !HasAttachment(ChunkHash)) - { - NeededChunks.emplace_back(ChunkHash); - } - } - } - else - { - for (CbFieldView ChunkField : ChunksArray) - { - const IoHash ChunkHash = ChunkField.AsHash(); - if (OpsAttachments.contains(ChunkHash) && !HasAttachment(ChunkHash)) - { - NeededChunks.emplace_back(ChunkHash); - } - } - } - - if (!NeededChunks.empty()) - { - OnNeedBlock(BlockHash, std::move(NeededChunks)); - if (BlockHash != IoHash::Zero) - { - NeedBlockCount++; - } - } - if (remotestore_impl::IsCancelled(OptionalContext)) - { - return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Reason = "Operation cancelled"}; - } - } - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); - - size_t NeedAttachmentCount = 0; - CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); - for (CbFieldView LargeChunksField : LargeChunksArray) - { - IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); - if (OpsAttachments.contains(AttachmentHash) && !HasAttachment(AttachmentHash)) - { - OnNeedAttachment(AttachmentHash); - NeedAttachmentCount++; - } - if (remotestore_impl::IsCancelled(OptionalContext)) - { - return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Reason = "Operation cancelled"}; - } - }; - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); - - return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; -} - -RemoteProjectStore::Result -SaveOplogContainer(ProjectStore::Oplog& Oplog, - const CbObject& ContainerObject, - const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, - const std::function<bool(const IoHash& RawHash)>& HasAttachment, - const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, - const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment, - JobContext* OptionalContext) -{ - using namespace std::literals; - - Stopwatch Timer; - CbObject OplogSection; - RemoteProjectStore::Result Result = ParseOplogContainer(ContainerObject, - OnReferencedAttachments, - HasAttachment, - OnNeedBlock, - OnNeedAttachment, - OnChunkedAttachment, - OplogSection, - OptionalContext); - if (Result.ErrorCode != 0) - { - return Result; - } - Result = remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return Result; -} - -RemoteProjectStore::Result -LoadOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Oplog& Oplog, - bool ForceDownload, - bool IgnoreMissingAttachments, - bool CleanOplog, - JobContext* OptionalContext) -{ - using namespace std::literals; - - remotestore_impl::DownloadInfo Info; - - Stopwatch Timer; - - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool(EWorkloadType::Background); - - std::unordered_set<IoHash, IoHash::Hasher> Attachments; - uint64_t BlockCountToDownload = 0; - - RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName)); - - uint64_t TransferWallTimeMS = 0; - - Stopwatch LoadContainerTimer; - RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); - TransferWallTimeMS += LoadContainerTimer.GetElapsedTimeMs(); - if (LoadContainerResult.ErrorCode) - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode)); - return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Reason = LoadContainerResult.Reason, - .Text = LoadContainerResult.Text}; - } - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Loaded container in {} ({})", - NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)), - NiceBytes(LoadContainerResult.ContainerObject.GetSize()))); - Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize(); - - remotestore_impl::AsyncRemoteResult RemoteResult; - Latch AttachmentsDownloadLatch(1); - Latch AttachmentsWriteLatch(1); - std::atomic_size_t AttachmentCount = 0; - - Stopwatch LoadAttachmentsTimer; - std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1; - - auto HasAttachment = [&Oplog, &ChunkStore, ForceDownload](const IoHash& RawHash) { - if (ForceDownload) - { - return false; - } - if (ChunkStore.ContainsChunk(RawHash)) - { - return true; - } - return false; - }; - - auto OnNeedBlock = [&RemoteStore, - &ChunkStore, - &NetworkWorkerPool, - &WorkerPool, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &AttachmentCount, - &RemoteResult, - &BlockCountToDownload, - &Info, - &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) { - if (RemoteResult.IsError()) - { - return; - } - - BlockCountToDownload++; - AttachmentCount.fetch_add(1); - if (BlockHash == IoHash::Zero) - { - DownloadAndSaveBlockChunks(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - Chunks); - } - else - { - DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - BlockHash, - Chunks, - 3); - } - }; - - auto OnNeedAttachment = [&RemoteStore, - &Oplog, - &ChunkStore, - &NetworkWorkerPool, - &WorkerPool, - &AttachmentsDownloadLatch, - &AttachmentsWriteLatch, - &RemoteResult, - &Attachments, - &AttachmentCount, - &LoadAttachmentsTimer, - &DownloadStartMS, - &Info, - IgnoreMissingAttachments, - OptionalContext](const IoHash& RawHash) { - if (!Attachments.insert(RawHash).second) - { - return; - } - if (RemoteResult.IsError()) - { - return; - } - AttachmentCount.fetch_add(1); - DownloadAndSaveAttachment(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, - AttachmentsDownloadLatch, - AttachmentsWriteLatch, - RemoteResult, - Info, - LoadAttachmentsTimer, - DownloadStartMS, - RawHash); - }; - - std::vector<ChunkedInfo> FilesToDechunk; - auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) { - if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash)) - { - FilesToDechunk.push_back(Chunked); - } - }; - - auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); }; - - // Make sure we retain any attachments we download before writing the oplog - Oplog.EnableUpdateCapture(); - auto _ = MakeGuard([&Oplog]() { Oplog.DisableUpdateCapture(); }); - - CbObject OplogSection; - RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject, - OnReferencedAttachments, - HasAttachment, - OnNeedBlock, - OnNeedAttachment, - OnChunkedAttachment, - OplogSection, - OptionalContext); - if (Result.ErrorCode != 0) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - } - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), - Attachments.size(), - BlockCountToDownload, - FilesToDechunk.size())); - - AttachmentsDownloadLatch.CountDown(); - while (!AttachmentsDownloadLatch.Wait(1000)) - { - ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - } - } - uint64_t PartialTransferWallTimeMS = TransferWallTimeMS; - if (DownloadStartMS != (uint64_t)-1) - { - PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); - } - remotestore_impl::ReportProgress( - OptionalContext, - "Loading attachments"sv, - fmt::format("{} remaining. {}", Remaining, remotestore_impl::GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), - AttachmentCount.load(), - Remaining); - } - if (DownloadStartMS != (uint64_t)-1) - { - TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); - } - - if (AttachmentCount.load() > 0) - { - remotestore_impl::ReportProgress(OptionalContext, - "Loading attachments"sv, - fmt::format("{}", remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)), - AttachmentCount.load(), - 0); - } - - AttachmentsWriteLatch.CountDown(); - while (!AttachmentsWriteLatch.Wait(1000)) - { - ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - } - } - remotestore_impl::ReportProgress(OptionalContext, - "Writing attachments"sv, - fmt::format("{} remaining.", Remaining), - AttachmentCount.load(), - Remaining); - } - - if (AttachmentCount.load() > 0) - { - remotestore_impl::ReportProgress(OptionalContext, "Writing attachments", ""sv, AttachmentCount.load(), 0); - } - - if (Result.ErrorCode == 0) - { - if (!FilesToDechunk.empty()) - { - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); - - Latch DechunkLatch(1); - std::filesystem::path TempFilePath = Oplog.TempPath(); - for (const ChunkedInfo& Chunked : FilesToDechunk) - { - std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); - DechunkLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&ChunkStore, - &DechunkLatch, - TempFileName, - &Chunked, - &RemoteResult, - IgnoreMissingAttachments, - &Info, - OptionalContext]() { - auto _ = MakeGuard([&DechunkLatch, &TempFileName] { - std::error_code Ec; - if (IsFile(TempFileName, Ec)) - { - RemoveFile(TempFileName, Ec); - if (Ec) - { - ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); - } - } - DechunkLatch.CountDown(); - }); - try - { - if (RemoteResult.IsError()) - { - return; - } - Stopwatch Timer; - IoBuffer TmpBuffer; - { - BasicFile TmpFile; - TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate); - { - BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); - - uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); - BLAKE3Stream HashingStream; - for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) - { - const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; - IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); - if (!Chunk) - { - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); - - // We only add 1 as the resulting missing count will be 1 for the dechunked file - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError( - gsl::narrow<int>(HttpResponseCode::NotFound), - "Missing chunk", - fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); - } - return; - } - CompositeBuffer Decompressed = - CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite(); - for (const SharedBuffer& Segment : Decompressed.GetSegments()) - { - MemoryView SegmentData = Segment.GetView(); - HashingStream.Append(SegmentData); - TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset); - Offset += SegmentData.GetSize(); - } - } - BLAKE3 RawHash = HashingStream.GetHash(); - ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); - UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash); - TmpWriter.Write(Header.GetData(), Header.GetSize(), 0); - } - TmpFile.Close(); - TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName); - } - CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); - if (InsertResult.New) - { - Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize()); - Info.AttachmentsStored.fetch_add(1); - } - - ZEN_INFO("Dechunked attachment {} ({}) in {}", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to dechunck file {}", Chunked.RawHash), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - DechunkLatch.CountDown(); - - while (!DechunkLatch.Wait(1000)) - { - ptrdiff_t Remaining = DechunkLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - if (!RemoteResult.IsError()) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - } - } - remotestore_impl::ReportProgress(OptionalContext, - "Dechunking attachments"sv, - fmt::format("{} remaining...", Remaining), - FilesToDechunk.size(), - Remaining); - } - remotestore_impl::ReportProgress(OptionalContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0); - } - Result = RemoteResult.ConvertResult(); - } - - if (Result.ErrorCode == 0) - { - if (CleanOplog) - { - if (!Oplog.Reset()) - { - Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Reason = fmt::format("Failed to clean existing oplog '{}'", Oplog.OplogId())}; - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason)); - } - } - if (Result.ErrorCode == 0) - { - remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext); - } - } - - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - - remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats()); - - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}", - RemoteStoreInfo.ContainerName, - Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), - NiceBytes(Info.OplogSizeBytes), - Info.AttachmentBlocksDownloaded.load(), - NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), - Info.AttachmentsDownloaded.load(), - NiceBytes(Info.AttachmentBytesDownloaded.load()), - Info.AttachmentsStored.load(), - NiceBytes(Info.AttachmentBytesStored.load()), - Info.MissingAttachmentCount.load(), - remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); - - return Result; -} - -////////////////////////////////////////////////////////////////////////// -// These are here to avoid vtable leakage - -RemoteProjectStore::RemoteProjectStore() -{ -} - -RemoteProjectStore::~RemoteProjectStore() -{ -} - -#if ZEN_WITH_TESTS - -namespace testutils { - using namespace std::literals; - - static std::string OidAsString(const Oid& Id) - { - StringBuilder<25> OidStringBuilder; - Id.ToString(OidStringBuilder); - return OidStringBuilder.ToString(); - } - - static CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) - { - CbPackage Package; - CbObjectWriter Object; - Object << "key"sv << OidAsString(Id); - if (!Attachments.empty()) - { - Object.BeginArray("bulkdata"); - for (const auto& Attachment : Attachments) - { - CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); - Object.BeginObject(); - Object << "id"sv << Attachment.first; - Object << "type"sv - << "Standard"sv; - Object << "data"sv << Attach; - Object.EndObject(); - - Package.AddAttachment(Attach); - } - Object.EndArray(); - } - Package.SetObject(Object.Save()); - return Package; - }; - - static std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments( - const std::span<const size_t>& Sizes, - OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, - uint64_t BlockSize = 0) - { - std::vector<std::pair<Oid, CompressedBuffer>> Result; - Result.reserve(Sizes.size()); - for (size_t Size : Sizes) - { - CompressedBuffer Compressed = - CompressedBuffer::Compress(SharedBuffer(CreateSemiRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel, BlockSize); - Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); - } - return Result; - } - -} // namespace testutils - -struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse -{ - static const bool ForceDisableBlocks = true; - static const bool ForceEnableTempBlocks = false; -}; - -struct ExportForceDisableBlocksFalse_ForceTempBlocksFalse -{ - static const bool ForceDisableBlocks = false; - static const bool ForceEnableTempBlocks = false; -}; - -struct ExportForceDisableBlocksFalse_ForceTempBlocksTrue -{ - static const bool ForceDisableBlocks = false; - static const bool ForceEnableTempBlocks = true; -}; - -TEST_CASE_TEMPLATE("project.store.export", - Settings, - ExportForceDisableBlocksTrue_ForceTempBlocksFalse, - ExportForceDisableBlocksFalse_ForceTempBlocksFalse, - ExportForceDisableBlocksFalse_ForceTempBlocksTrue) -{ - using namespace std::literals; - using namespace testutils; - - ScopedTemporaryDirectory TempDir; - ScopedTemporaryDirectory ExportDir; - - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); - std::filesystem::path RootDir = TempDir.Path() / "root"; - std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; - std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; - std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; - - Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv, - "proj1"sv, - RootDir.string(), - EngineRootDir.string(), - ProjectRootDir.string(), - ProjectFilePath.string())); - Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog1", {}); - CHECK(Oplog); - - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); - Oplog->AppendNewOplogEntry( - CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( - Oid::NewOid(), - CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None))); - - FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024, - .MaxChunksPerBlock = 1000, - .MaxChunkEmbedSize = 32 * 1024u, - .ChunkFileSizeLimit = 64u * 1024u}, - /*.FolderPath = */ ExportDir.Path(), - /*.Name = */ std::string("oplog1"), - /*OptionalBaseName = */ std::string(), - /*.ForceDisableBlocks = */ Settings::ForceDisableBlocks, - /*.ForceEnableTempBlocks = */ Settings::ForceEnableTempBlocks}; - std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options); - RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); - - RemoteProjectStore::Result ExportResult = SaveOplog(CidStore, - *RemoteStore, - *Project.Get(), - *Oplog, - Options.MaxBlockSize, - Options.MaxChunksPerBlock, - Options.MaxChunkEmbedSize, - Options.ChunkFileSizeLimit, - true, - false, - false, - nullptr); - - CHECK(ExportResult.ErrorCode == 0); - - Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {}); - CHECK(OplogImport); - - RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - /*Force*/ false, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ false, - nullptr); - CHECK(ImportResult.ErrorCode == 0); - - RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - /*Force*/ true, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ false, - nullptr); - CHECK(ImportForceResult.ErrorCode == 0); - - RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - /*Force*/ false, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ true, - nullptr); - CHECK(ImportCleanResult.ErrorCode == 0); - - RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - /*Force*/ true, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ true, - nullptr); - CHECK(ImportForceCleanResult.ErrorCode == 0); -} - -#endif // ZEN_WITH_TESTS - -void -remoteprojectstore_forcelink() -{ -} - -} // namespace zen diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h deleted file mode 100644 index 11cc58e4d..000000000 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ /dev/null @@ -1,178 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/jobqueue.h> -#include <zenstore/projectstore.h> - -#include <zenutil/chunkblock.h> - -#include <unordered_set> - -namespace zen { - -class CidStore; -class WorkerThreadPool; -struct ChunkedInfo; - -class RemoteProjectStore -{ -public: - struct Result - { - int32_t ErrorCode{}; - double ElapsedSeconds{}; - std::string Reason; - std::string Text; - }; - - struct CreateContainerResult : public Result - { - }; - - struct SaveResult : public Result - { - std::unordered_set<IoHash, IoHash::Hasher> Needs; - IoHash RawHash; - }; - - struct FinalizeResult : public Result - { - std::unordered_set<IoHash, IoHash::Hasher> Needs; - }; - - struct SaveAttachmentResult : public Result - { - }; - - struct SaveAttachmentsResult : public Result - { - }; - - struct LoadAttachmentResult : public Result - { - IoBuffer Bytes; - }; - - struct LoadContainerResult : public Result - { - CbObject ContainerObject; - }; - - struct LoadAttachmentsResult : public Result - { - std::vector<std::pair<IoHash, CompressedBuffer>> Chunks; - }; - - struct GetKnownBlocksResult : public Result - { - std::vector<ThinChunkBlockDescription> Blocks; - }; - - struct RemoteStoreInfo - { - bool CreateBlocks; - bool UseTempBlockFiles; - bool AllowChunking; - std::string ContainerName; - std::string Description; - }; - - struct Stats - { - std::uint64_t m_SentBytes; - std::uint64_t m_ReceivedBytes; - std::uint64_t m_RequestTimeNS; - std::uint64_t m_RequestCount; - std::uint64_t m_PeakSentBytes; - std::uint64_t m_PeakReceivedBytes; - std::uint64_t m_PeakBytesPerSec; - }; - - RemoteProjectStore(); - virtual ~RemoteProjectStore(); - - virtual RemoteStoreInfo GetInfo() const = 0; - virtual Stats GetStats() const = 0; - - virtual CreateContainerResult CreateContainer() = 0; - virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&& Block) = 0; - virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; - - virtual LoadContainerResult LoadContainer() = 0; - virtual GetKnownBlocksResult GetKnownBlocks() = 0; - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0; - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0; -}; - -struct RemoteStoreOptions -{ - static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u; - static const size_t DefaultMaxChunksPerBlock = 4u * 1000u; - static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u; - static const size_t DefaultChunkFileSizeLimit = 256u * 1024u * 1024u; - - size_t MaxBlockSize = DefaultMaxBlockSize; - size_t MaxChunksPerBlock = DefaultMaxChunksPerBlock; - size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize; - size_t ChunkFileSizeLimit = DefaultChunkFileSizeLimit; -}; - -typedef std::function<IoBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc; - -RemoteProjectStore::LoadContainerResult BuildContainer( - CidStore& ChunkStore, - ProjectStore::Project& Project, - ProjectStore::Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunksPerBlock, - size_t MaxChunkEmbedSize, - size_t ChunkFileSizeLimit, - bool BuildBlocks, - bool IgnoreMissingAttachments, - bool AllowChunking, - const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, - const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, - const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, - bool EmbedLooseFiles); - -class JobContext; - -RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, - const CbObject& ContainerObject, - const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments, - const std::function<bool(const IoHash& RawHash)>& HasAttachment, - const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment, - const std::function<void(const ChunkedInfo& Chunked)>& OnChunkedAttachment, - JobContext* OptionalContext); - -RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Project& Project, - ProjectStore::Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunksPerBlock, - size_t MaxChunkEmbedSize, - size_t ChunkFileSizeLimit, - bool EmbedLooseFiles, - bool ForceUpload, - bool IgnoreMissingAttachments, - JobContext* OptionalContext); - -RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Oplog& Oplog, - bool ForceDownload, - bool IgnoreMissingAttachments, - bool CleanOplog, - JobContext* OptionalContext); - -std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject); -std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes); - -void remoteprojectstore_forcelink(); - -} // namespace zen diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp deleted file mode 100644 index 21ddd6cff..000000000 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ /dev/null @@ -1,336 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "zenremoteprojectstore.h" - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compositebuffer.h> -#include <zencore/fmtutils.h> -#include <zencore/stream.h> -#include <zenhttp/httpclient.h> -#include <zenhttp/packageformat.h> - -namespace zen { - -using namespace std::literals; - -class ZenRemoteStore : public RemoteProjectStore -{ -public: - ZenRemoteStore(std::string_view HostAddress, - std::string_view Project, - std::string_view Oplog, - const std::filesystem::path& TempFilePath) - : m_HostAddress(HostAddress) - , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress)) - , m_Project(Project) - , m_Oplog(Oplog) - , m_TempFilePath(TempFilePath) - , m_Client(m_ProjectStoreUrl, {.LogCategory = "ZenRemoteStore", .RetryCount = 2}) - { - } - - virtual RemoteStoreInfo GetInfo() const override - { - return {.CreateBlocks = false, - .UseTempBlockFiles = false, - .AllowChunking = false, - .ContainerName = fmt::format("{}/{}", m_Project, m_Oplog), - .Description = fmt::format("[zen] {}. SessionId: {}"sv, m_HostAddress, m_Client.GetSessionId())}; - } - - virtual Stats GetStats() const override - { - return {.m_SentBytes = m_SentBytes.load(), - .m_ReceivedBytes = m_ReceivedBytes.load(), - .m_RequestTimeNS = m_RequestTimeNS.load(), - .m_RequestCount = m_RequestCount.load(), - .m_PeakSentBytes = m_PeakSentBytes.load(), - .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), - .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; - } - - virtual CreateContainerResult CreateContainer() override - { - // Nothing to do here - return {}; - } - - virtual SaveResult SaveContainer(const IoBuffer& Payload) override - { - std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog); - HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCbObject); - AddStats(Response); - SaveResult Result = SaveResult{ConvertResult(Response)}; - - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'", - m_ProjectStoreUrl, - m_Project, - m_Oplog, - Result.Reason); - return Result; - } - CbObject ResponseObject = Response.AsObject(); - if (!ResponseObject) - { - Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, - m_ProjectStoreUrl, - m_Project, - m_Oplog); - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - return Result; - } - CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView(); - for (CbFieldView FieldView : NeedsArray) - { - IoHash ChunkHash = FieldView.AsHash(); - Result.Needs.insert(ChunkHash); - } - - Result.RawHash = IoHash::HashBuffer(Payload); - return Result; - } - - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override - { - std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); - HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary); - AddStats(Response); - SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", - m_ProjectStoreUrl, - m_Project, - m_Oplog, - RawHash, - Result.Reason); - } - return Result; - } - - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override - { - CbPackage RequestPackage; - { - CbObjectWriter RequestWriter; - RequestWriter.AddString("method"sv, "putchunks"sv); - RequestWriter.BeginArray("chunks"sv); - { - for (const SharedBuffer& Chunk : Chunks) - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize); - ZEN_ASSERT(Compressed); - RequestWriter.AddHash(RawHash); - RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash)); - } - } - RequestWriter.EndArray(); // "chunks" - RequestPackage.SetObject(RequestWriter.Save()); - } - std::string SaveRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); - HttpClient::Response Response = m_Client.Post(SaveRequest, RequestPackage); - AddStats(Response); - - SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed saving {} oplog attachments to {}/{}/{}. Reason: '{}'", - Chunks.size(), - m_ProjectStoreUrl, - m_Project, - m_Oplog, - Result.Reason); - } - return Result; - } - - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override - { - std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); - - CbObject Request; - { - CbObjectWriter RequestWriter; - RequestWriter.AddString("method"sv, "getchunks"sv); - RequestWriter.BeginObject("Request"sv); - { - RequestWriter.BeginArray("Chunks"sv); - { - for (const IoHash& RawHash : RawHashes) - { - RequestWriter.BeginObject(); - { - RequestWriter.AddHash("RawHash", RawHash); - } - RequestWriter.EndObject(); - } - } - RequestWriter.EndArray(); // "chunks" - } - RequestWriter.EndObject(); - Request = RequestWriter.Save(); - } - - HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage)); - AddStats(Response); - - LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'", - RawHashes.size(), - m_ProjectStoreUrl, - m_Project, - m_Oplog, - Result.Reason); - } - else - { - CbPackage Package = Response.AsPackage(); - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - Result.Chunks.reserve(Attachments.size()); - for (const CbAttachment& Attachment : Attachments) - { - Result.Chunks.emplace_back( - std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); - } - } - return Result; - }; - - virtual FinalizeResult FinalizeContainer(const IoHash&) override { return FinalizeResult{Result{}}; } - - virtual LoadContainerResult LoadContainer() override - { - std::string LoadRequest = fmt::format("/{}/oplog/{}/load"sv, m_Project, m_Oplog); - - HttpClient::Response Response = m_Client.Get(LoadRequest, HttpClient::Accept(ZenContentType::kCbObject)); - AddStats(Response); - - LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}. Reason: '{}'", - m_ProjectStoreUrl, - m_Project, - m_Oplog, - Result.Reason); - } - else - { - Result.ContainerObject = Response.AsObject(); - if (!Result.ContainerObject) - { - Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, - m_ProjectStoreUrl, - m_Project, - m_Oplog); - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - } - } - return Result; - } - - virtual GetKnownBlocksResult GetKnownBlocks() override - { - return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; - } - - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override - { - std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); - HttpClient::Response Response = - m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary)); - AddStats(Response); - - LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; - if (!Result.ErrorCode) - { - Result.Bytes = Response.ResponsePayload; - Result.Bytes.MakeOwned(); - } - if (!Result.ErrorCode) - { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'", - m_ProjectStoreUrl, - m_Project, - m_Oplog, - RawHash, - Result.Reason); - } - return Result; - } - -private: - void AddStats(const HttpClient::Response& Result) - { - m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.UploadedBytes)); - m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.DownloadedBytes)); - m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000)); - SetAtomicMax(m_PeakSentBytes, Result.UploadedBytes); - SetAtomicMax(m_PeakReceivedBytes, Result.DownloadedBytes); - if (Result.ElapsedSeconds > 0.0) - { - uint64_t BytesPerSec = static_cast<uint64_t>((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds); - SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); - } - - m_RequestCount.fetch_add(1); - } - - static Result ConvertResult(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) - { - if (Response.Error) - { - return {.ErrorCode = Response.Error.value().ErrorCode, - .ElapsedSeconds = Response.ElapsedSeconds, - .Reason = Response.ErrorMessage(""), - .Text = Response.ToText()}; - } - if (!Response.IsSuccess()) - { - return {.ErrorCode = static_cast<int32_t>(Response.StatusCode), - .ElapsedSeconds = Response.ElapsedSeconds, - .Reason = Response.ErrorMessage(ErrorPrefix), - .Text = Response.ToText()}; - } - return {.ErrorCode = 0, .ElapsedSeconds = Response.ElapsedSeconds}; - } - - const std::string m_HostAddress; - const std::string m_ProjectStoreUrl; - const std::string m_Project; - const std::string m_Oplog; - const std::filesystem::path m_TempFilePath; - - HttpClient m_Client; - - std::atomic_uint64_t m_SentBytes = {}; - std::atomic_uint64_t m_ReceivedBytes = {}; - std::atomic_uint64_t m_RequestTimeNS = {}; - std::atomic_uint64_t m_RequestCount = {}; - std::atomic_uint64_t m_PeakSentBytes = {}; - std::atomic_uint64_t m_PeakReceivedBytes = {}; - std::atomic_uint64_t m_PeakBytesPerSec = {}; -}; - -std::shared_ptr<RemoteProjectStore> -CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) -{ - std::string Url = Options.Url; - if (Url.find("://"sv) == std::string::npos) - { - // Assume http URL - Url = fmt::format("http://{}"sv, Url); - } - std::shared_ptr<RemoteProjectStore> RemoteStore = - std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, TempFilePath); - return RemoteStore; -} - -} // namespace zen diff --git a/src/zenserver/projectstore/zenremoteprojectstore.h b/src/zenserver/projectstore/zenremoteprojectstore.h deleted file mode 100644 index 7c81a597d..000000000 --- a/src/zenserver/projectstore/zenremoteprojectstore.h +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "remoteprojectstore.h" - -namespace zen { - -struct ZenRemoteStoreOptions : RemoteStoreOptions -{ - std::string Url; - std::string ProjectId; - std::string OplogId; -}; - -std::shared_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath); - -} // namespace zen |