aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-03 09:38:05 +0200
committerGitHub Enterprise <[email protected]>2025-10-03 09:38:05 +0200
commitf5727a1e4d6bfb833e37e1210691d351456dbe3a (patch)
treecbf7688c4e31ba7db429a98c7c9f813fa0a826be /src/zenserver/projectstore
parentmove projectstore to zenstore (#541) (diff)
downloadzen-f5727a1e4d6bfb833e37e1210691d351456dbe3a.tar.xz
zen-f5727a1e4d6bfb833e37e1210691d351456dbe3a.zip
move remoteproject to remotestorelib (#542)
* move remoteproject code to remotestorelib
Diffstat (limited to 'src/zenserver/projectstore')
-rw-r--r--src/zenserver/projectstore/buildsremoteprojectstore.cpp545
-rw-r--r--src/zenserver/projectstore/buildsremoteprojectstore.h32
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.cpp341
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.h20
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp13
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp402
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.h32
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp3519
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h178
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp336
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.h18
11 files changed, 5 insertions, 5431 deletions
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
deleted file mode 100644
index b8b499f7f..000000000
--- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp
+++ /dev/null
@@ -1,545 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "buildsremoteprojectstore.h"
-
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compress.h>
-#include <zencore/fmtutils.h>
-#include <zencore/scopeguard.h>
-
-#include <zenhttp/httpclientauth.h>
-#include <zenutil/jupiter/jupiterbuildstorage.h>
-
-namespace zen {
-
-using namespace std::literals;
-
-static const std::string_view OplogContainerPartName = "oplogcontainer"sv;
-
-class BuildsRemoteStore : public RemoteProjectStore
-{
-public:
- BuildsRemoteStore(std::unique_ptr<BuildStorage::Statistics>&& BuildStorageStats,
- std::unique_ptr<HttpClient>&& BuildStorageHttp,
- std::unique_ptr<BuildStorage>&& BuildStorage,
- std::string_view Url,
- std::string_view Namespace,
- std::string_view Bucket,
- const Oid& BuildId,
- const IoBuffer& MetaData,
- bool ForceDisableBlocks,
- bool ForceDisableTempBlocks)
- : m_BuildStorageStats(std::move(BuildStorageStats))
- , m_BuildStorageHttp(std::move(BuildStorageHttp))
- , m_BuildStorage(std::move(BuildStorage))
- , m_Url(Url)
- , m_Namespace(Namespace)
- , m_Bucket(Bucket)
- , m_BuildId(BuildId)
- , m_MetaData(MetaData)
- , m_EnableBlocks(!ForceDisableBlocks)
- , m_UseTempBlocks(!ForceDisableTempBlocks)
- {
- m_MetaData.MakeOwned();
- }
-
- virtual RemoteStoreInfo GetInfo() const override
- {
- return {.CreateBlocks = m_EnableBlocks,
- .UseTempBlockFiles = m_UseTempBlocks,
- .AllowChunking = true,
- .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId),
- .Description = fmt::format("[cloud] {}. SessionId: {}. {}/{}/{}"sv,
- m_Url,
- m_BuildStorageHttp->GetSessionId(),
- m_Namespace,
- m_Bucket,
- m_BuildId)};
- }
-
- virtual Stats GetStats() const override
- {
- return {
- .m_SentBytes = m_BuildStorageStats->TotalBytesWritten.load(),
- .m_ReceivedBytes = m_BuildStorageStats->TotalBytesRead.load(),
- .m_RequestTimeNS = m_BuildStorageStats->TotalRequestTimeUs.load() * 1000,
- .m_RequestCount = m_BuildStorageStats->TotalRequestCount.load(),
- .m_PeakSentBytes = m_BuildStorageStats->PeakSentBytes.load(),
- .m_PeakReceivedBytes = m_BuildStorageStats->PeakReceivedBytes.load(),
- .m_PeakBytesPerSec = m_BuildStorageStats->PeakBytesPerSec.load(),
- };
- }
-
- virtual CreateContainerResult CreateContainer() override
- {
- ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero);
-
- CreateContainerResult Result;
- Stopwatch Timer;
- auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
-
- CbObject Payload = LoadCompactBinaryObject(m_MetaData);
- try
- {
- CbObject PutBuildResult = m_BuildStorage->PutBuild(m_BuildId, Payload);
- ZEN_UNUSED(PutBuildResult);
- m_OplogBuildPartId = Oid::NewOid();
- }
- catch (const HttpClientError& Ex)
- {
- Result.ErrorCode = MakeErrorCode(Ex);
- Result.Reason =
- fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
- }
- catch (const std::exception& Ex)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason =
- fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
- }
-
- return Result;
- }
-
- virtual SaveResult SaveContainer(const IoBuffer& Payload) override
- {
- ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
-
- SaveResult Result;
- Stopwatch Timer;
- auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
-
- try
- {
- CbObject ObjectPayload = LoadCompactBinaryObject(Payload);
-
- std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult =
- m_BuildStorage->PutBuildPart(m_BuildId, m_OplogBuildPartId, OplogContainerPartName, ObjectPayload);
- Result.RawHash = PutBuildPartResult.first;
- Result.Needs = std::unordered_set<IoHash, IoHash::Hasher>(PutBuildPartResult.second.begin(), PutBuildPartResult.second.end());
- }
- catch (const HttpClientError& Ex)
- {
- Result.ErrorCode = MakeErrorCode(Ex);
- Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- m_OplogBuildPartId,
- Ex.what());
- }
- catch (const std::exception& Ex)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- m_OplogBuildPartId,
- Ex.what());
- }
-
- return Result;
- }
-
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload,
- const IoHash& RawHash,
- ChunkBlockDescription&& Block) override
- {
- ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
-
- SaveAttachmentResult Result;
- Stopwatch Timer;
- auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
-
- try
- {
- m_BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
-
- if (Block.BlockHash == RawHash)
- {
- try
- {
- CbObjectWriter BlockMetaData;
- BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string());
- CbObject MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save());
- if (!m_BuildStorage->PutBlockMetadata(m_BuildId, RawHash, MetaPayload))
- {
- ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- RawHash,
- "not found");
- }
- }
- catch (const HttpClientError& Ex)
- {
- Result.ErrorCode = MakeErrorCode(Ex);
- Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Ex.what());
- }
- catch (const std::exception& Ex)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Ex.what());
- }
- }
- }
- catch (const HttpClientError& Ex)
- {
- Result.ErrorCode = MakeErrorCode(Ex);
- Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Ex.what());
- }
- catch (const std::exception& Ex)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Ex.what());
- }
-
- return Result;
- }
-
- virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
- {
- SaveAttachmentsResult Result;
- Stopwatch Timer;
- auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
- for (const SharedBuffer& Chunk : Chunks)
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
- SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {});
- if (ChunkResult.ErrorCode)
- {
- return SaveAttachmentsResult{ChunkResult};
- }
- }
- return Result;
- }
-
- virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override
- {
- ZEN_UNUSED(RawHash);
- ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
-
- FinalizeResult Result;
- Stopwatch Timer;
- auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
-
- try
- {
- std::vector<IoHash> Needs = m_BuildStorage->FinalizeBuildPart(m_BuildId, m_OplogBuildPartId, RawHash);
- Result.Needs = std::unordered_set<IoHash, IoHash::Hasher>(Needs.begin(), Needs.end());
- }
- catch (const HttpClientError& Ex)
- {
- Result.ErrorCode = Ex.GetInternalErrorCode() != 0 ? Ex.GetInternalErrorCode()
- : Ex.GetHttpResponseCode() != HttpResponseCode::ImATeapot ? (int)Ex.GetHttpResponseCode()
- : 0;
- Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- m_OplogBuildPartId,
- Ex.what());
- }
- catch (const std::exception& Ex)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- m_OplogBuildPartId,
- Ex.what());
- }
-
- if (!Result.ErrorCode && Result.Needs.empty())
- {
- try
- {
- m_BuildStorage->FinalizeBuild(m_BuildId);
- }
- catch (const HttpClientError& Ex)
- {
- Result.ErrorCode = Ex.GetInternalErrorCode() != 0 ? Ex.GetInternalErrorCode()
- : Ex.GetHttpResponseCode() != HttpResponseCode::ImATeapot ? (int)Ex.GetHttpResponseCode()
- : 0;
- Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Ex.what());
- }
- catch (const std::exception& Ex)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Ex.what());
- }
- }
-
- return Result;
- }
-
- virtual LoadContainerResult LoadContainer() override
- {
- ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero);
-
- LoadContainerResult Result;
- Stopwatch Timer;
- auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
-
- try
- {
- CbObject BuildObject = m_BuildStorage->GetBuild(m_BuildId);
-
- CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
- if (!PartsObject)
- {
- throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv,
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId));
- }
- m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId();
- if (m_OplogBuildPartId == Oid::Zero)
- {
- throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv,
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- OplogContainerPartName));
- }
-
- Result.ContainerObject = m_BuildStorage->GetBuildPart(m_BuildId, m_OplogBuildPartId);
- }
- catch (const HttpClientError& Ex)
- {
- Result.ErrorCode = MakeErrorCode(Ex);
- Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Ex.what());
- }
- catch (const std::exception& Ex)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'",
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Ex.what());
- }
-
- return Result;
- }
-
- virtual GetKnownBlocksResult GetKnownBlocks() override
- {
- ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
-
- GetKnownBlocksResult Result;
- Stopwatch Timer;
- auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
-
- try
- {
- CbObject KnownBlocks = m_BuildStorage->FindBlocks(m_BuildId, 10000u);
- std::optional<std::vector<ChunkBlockDescription>> Blocks = ParseChunkBlockDescriptionList(KnownBlocks);
- Result.Blocks.reserve(Blocks.value().size());
- for (ChunkBlockDescription& BlockDescription : Blocks.value())
- {
- Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash,
- .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)});
- }
- }
- catch (const HttpClientError& Ex)
- {
- Result.ErrorCode = MakeErrorCode(Ex);
- Result.Reason =
- fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
- }
- catch (const std::exception& Ex)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason =
- fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
- }
-
- return Result;
- }
-
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
- {
- ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
-
- LoadAttachmentResult Result;
- Stopwatch Timer;
- auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
-
- try
- {
- Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash);
- }
- catch (const HttpClientError& Ex)
- {
- Result.ErrorCode = MakeErrorCode(Ex);
- Result.Reason =
- fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
- }
- catch (const std::exception& Ex)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason =
- fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
- }
-
- return Result;
- }
-
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
- {
- LoadAttachmentsResult Result;
- Stopwatch Timer;
- auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
- for (const IoHash& Hash : RawHashes)
- {
- LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
- if (ChunkResult.ErrorCode)
- {
- return LoadAttachmentsResult{ChunkResult};
- }
- ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000)));
- Result.Chunks.emplace_back(
- std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))});
- }
- return Result;
- }
-
-private:
- static int MakeErrorCode(const HttpClientError& Ex)
- {
- return Ex.GetInternalErrorCode() != 0 ? Ex.GetInternalErrorCode()
- : Ex.GetHttpResponseCode() != HttpResponseCode::ImATeapot ? (int)Ex.GetHttpResponseCode()
- : 0;
- }
-
- std::unique_ptr<BuildStorage::Statistics> m_BuildStorageStats;
- std::unique_ptr<HttpClient> m_BuildStorageHttp;
- std::unique_ptr<BuildStorage> m_BuildStorage;
- const std::string m_Url;
- const std::string m_Namespace;
- const std::string m_Bucket;
- const Oid m_BuildId;
- IoBuffer m_MetaData;
- Oid m_OplogBuildPartId = Oid::Zero;
- const bool m_EnableBlocks = true;
- const bool m_UseTempBlocks = true;
- const bool m_AllowRedirect = false;
-};
-
-std::shared_ptr<RemoteProjectStore>
-CreateJupiterBuildsRemoteStore(const BuildsRemoteStoreOptions& Options,
- const std::filesystem::path& TempFilePath,
- bool Quiet,
- bool Unattended)
-{
- std::string Url = Options.Url;
- if (Url.find("://"sv) == std::string::npos)
- {
- // Assume https URL
- Url = fmt::format("https://{}"sv, Url);
- }
-
- // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
- // 2) Access token as parameter in request
- // 3) Environment variable (different win vs linux/mac)
- // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
-
- std::function<HttpClientAccessToken()> TokenProvider;
- if (!Options.OpenIdProvider.empty())
- {
- TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider);
- }
- else if (!Options.AccessToken.empty())
- {
- TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken);
- }
- else if (!Options.OidcExePath.empty())
- {
- if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url, Quiet, Unattended);
- TokenProviderMaybe)
- {
- TokenProvider = TokenProviderMaybe.value();
- }
- }
-
- if (!TokenProvider)
- {
- TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager);
- }
-
- HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient",
- .ConnectTimeout = std::chrono::milliseconds(2000),
- .Timeout = std::chrono::milliseconds(1800000),
- .AccessTokenProvider = std::move(TokenProvider),
- .AssumeHttp2 = Options.AssumeHttp2,
- .AllowResume = true,
- .RetryCount = 4};
-
- std::unique_ptr<BuildStorage::Statistics> BuildStorageStats(std::make_unique<BuildStorage::Statistics>());
-
- std::unique_ptr<HttpClient> BuildStorageHttp = std::make_unique<HttpClient>(Url, ClientSettings);
-
- std::unique_ptr<BuildStorage> BuildStorage =
- CreateJupiterBuildStorage(Log(), *BuildStorageHttp, *BuildStorageStats, Options.Namespace, Options.Bucket, false, TempFilePath);
-
- std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(BuildStorageStats),
- std::move(BuildStorageHttp),
- std::move(BuildStorage),
- Url,
- Options.Namespace,
- Options.Bucket,
- Options.BuildId,
- Options.MetaData,
- Options.ForceDisableBlocks,
- Options.ForceDisableTempBlocks);
- return RemoteStore;
-}
-
-} // namespace zen
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.h b/src/zenserver/projectstore/buildsremoteprojectstore.h
deleted file mode 100644
index 24ab9678d..000000000
--- a/src/zenserver/projectstore/buildsremoteprojectstore.h
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "remoteprojectstore.h"
-
-namespace zen {
-
-class AuthMgr;
-
-struct BuildsRemoteStoreOptions : RemoteStoreOptions
-{
- std::string Url;
- std::string Namespace;
- std::string Bucket;
- Oid BuildId;
- std::string OpenIdProvider;
- std::string AccessToken;
- AuthMgr& AuthManager;
- std::filesystem::path OidcExePath;
- bool ForceDisableBlocks = false;
- bool ForceDisableTempBlocks = false;
- bool AssumeHttp2 = false;
- IoBuffer MetaData;
-};
-
-std::shared_ptr<RemoteProjectStore> CreateJupiterBuildsRemoteStore(const BuildsRemoteStoreOptions& Options,
- const std::filesystem::path& TempFilePath,
- bool Quiet,
- bool Unattended);
-
-} // namespace zen
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp
deleted file mode 100644
index 348f5fb8a..000000000
--- a/src/zenserver/projectstore/fileremoteprojectstore.cpp
+++ /dev/null
@@ -1,341 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "fileremoteprojectstore.h"
-
-#include <zencore/compactbinaryutil.h>
-#include <zencore/compress.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/timer.h>
-#include <zenhttp/httpcommon.h>
-
-namespace zen {
-
-using namespace std::literals;
-
-class LocalExportProjectStore : public RemoteProjectStore
-{
-public:
- LocalExportProjectStore(std::string_view Name,
- std::string_view OptionalBaseName,
- const std::filesystem::path& FolderPath,
- bool ForceDisableBlocks,
- bool ForceEnableTempBlocks)
- : m_Name(Name)
- , m_OptionalBaseName(OptionalBaseName)
- , m_OutputPath(FolderPath)
- {
- if (ForceDisableBlocks)
- {
- m_EnableBlocks = false;
- }
- if (ForceEnableTempBlocks)
- {
- m_UseTempBlocks = true;
- }
- }
-
- virtual RemoteStoreInfo GetInfo() const override
- {
- return {
- .CreateBlocks = m_EnableBlocks,
- .UseTempBlockFiles = m_UseTempBlocks,
- .AllowChunking = true,
- .ContainerName = m_Name,
- .Description =
- fmt::format("[file] {}/{}{}{}"sv, m_OutputPath, m_Name, m_OptionalBaseName.empty() ? "" : " Base: ", m_OptionalBaseName)};
- }
-
- virtual Stats GetStats() const override
- {
- return {.m_SentBytes = m_SentBytes.load(),
- .m_ReceivedBytes = m_ReceivedBytes.load(),
- .m_RequestTimeNS = m_RequestTimeNS.load(),
- .m_RequestCount = m_RequestCount.load(),
- .m_PeakSentBytes = m_PeakSentBytes.load(),
- .m_PeakReceivedBytes = m_PeakReceivedBytes.load(),
- .m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
- }
-
- virtual CreateContainerResult CreateContainer() override
- {
- // Nothing to do here
- return {};
- }
-
- virtual SaveResult SaveContainer(const IoBuffer& Payload) override
- {
- Stopwatch Timer;
- SaveResult Result;
-
- {
- CbObject ContainerObject = LoadCompactBinaryObject(Payload);
-
- ContainerObject.IterateAttachments([&](CbFieldView FieldView) {
- IoHash AttachmentHash = FieldView.AsBinaryAttachment();
- std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash);
- if (!IsFile(AttachmentPath))
- {
- Result.Needs.insert(AttachmentHash);
- }
- });
- }
-
- std::filesystem::path ContainerPath = m_OutputPath;
- ContainerPath.append(m_Name);
-
- try
- {
- CreateDirectories(m_OutputPath);
- BasicFile ContainerFile;
- ContainerFile.Open(ContainerPath, BasicFile::Mode::kTruncate);
- std::error_code Ec;
- ContainerFile.WriteAll(Payload, Ec);
- if (Ec)
- {
- throw std::system_error(Ec, Ec.message());
- }
- Result.RawHash = IoHash::HashBuffer(Payload);
- }
- catch (const std::exception& Ex)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed saving oplog container to '{}'. Reason: {}", ContainerPath, Ex.what());
- }
- AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return Result;
- }
-
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override
- {
- Stopwatch Timer;
- SaveAttachmentResult Result;
- std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
- if (!IsFile(ChunkPath))
- {
- try
- {
- CreateDirectories(ChunkPath.parent_path());
-
- BasicFile ChunkFile;
- ChunkFile.Open(ChunkPath, BasicFile::Mode::kTruncate);
- size_t Offset = 0;
- for (const SharedBuffer& Segment : Payload.GetSegments())
- {
- ChunkFile.Write(Segment.GetView(), Offset);
- Offset += Segment.GetSize();
- }
- }
- catch (const std::exception& Ex)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed saving oplog attachment to '{}'. Reason: {}", ChunkPath, Ex.what());
- }
- }
- AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return Result;
- }
-
- virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
- {
- Stopwatch Timer;
-
- for (const SharedBuffer& Chunk : Chunks)
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
- SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {});
- if (ChunkResult.ErrorCode)
- {
- ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return SaveAttachmentsResult{ChunkResult};
- }
- }
- SaveAttachmentsResult Result;
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return Result;
- }
-
- virtual FinalizeResult FinalizeContainer(const IoHash&) override { return {}; }
-
- virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Name); }
-
- virtual GetKnownBlocksResult GetKnownBlocks() override
- {
- if (m_OptionalBaseName.empty())
- {
- return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
- }
- LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseName);
- if (LoadResult.ErrorCode)
- {
- return GetKnownBlocksResult{LoadResult};
- }
- Stopwatch Timer;
- std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject);
- if (BlockHashes.empty())
- {
- return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent),
- .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}};
- }
- std::vector<IoHash> ExistingBlockHashes;
- for (const IoHash& RawHash : BlockHashes)
- {
- std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
- if (IsFile(ChunkPath))
- {
- ExistingBlockHashes.push_back(RawHash);
- }
- }
- if (ExistingBlockHashes.empty())
- {
- return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent),
- .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}};
- }
- std::vector<ThinChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes);
- GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}};
- Result.Blocks = std::move(KnownBlocks);
- return Result;
- }
-
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
- {
- Stopwatch Timer;
- LoadAttachmentResult Result;
- std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
- if (!IsFile(ChunkPath))
- {
- Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
- Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string());
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return Result;
- }
- {
- BasicFile ChunkFile;
- ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead);
- Result.Bytes = ChunkFile.ReadAll();
- }
- AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return Result;
- }
-
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
- {
- Stopwatch Timer;
- LoadAttachmentsResult Result;
- for (const IoHash& Hash : RawHashes)
- {
- LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
- if (ChunkResult.ErrorCode)
- {
- ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return LoadAttachmentsResult{ChunkResult};
- }
- ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000)));
- Result.Chunks.emplace_back(
- std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))});
- }
- return Result;
- }
-
-private:
- LoadContainerResult LoadContainer(const std::string& Name)
- {
- Stopwatch Timer;
- LoadContainerResult Result;
- std::filesystem::path SourcePath = m_OutputPath;
- SourcePath.append(Name);
- if (!IsFile(SourcePath))
- {
- Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
- Result.Reason = fmt::format("Failed loading oplog container from '{}'. Reason: 'The file does not exist'", SourcePath.string());
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return Result;
- }
- IoBuffer ContainerPayload;
- {
- BasicFile ContainerFile;
- ContainerFile.Open(SourcePath, BasicFile::Mode::kRead);
- ContainerPayload = ContainerFile.ReadAll();
- }
- AddStats(0, ContainerPayload.GetSize(), Timer.GetElapsedTimeUs() * 1000);
- CbValidateError ValidateResult = CbValidateError::None;
- if (Result.ContainerObject = ValidateAndReadCompactBinaryObject(std::move(ContainerPayload), ValidateResult);
- ValidateResult != CbValidateError::None || !Result.ContainerObject)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The file {} is not formatted as a compact binary object ('{}')",
- SourcePath.string(),
- ToString(ValidateResult));
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return Result;
- }
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return Result;
- }
-
- std::filesystem::path GetAttachmentPath(const IoHash& RawHash) const
- {
- ExtendablePathBuilder<128> ShardedPath;
- ShardedPath.Append(m_OutputPath.c_str());
- ExtendableStringBuilder<64> HashString;
- RawHash.ToHexString(HashString);
- const char* str = HashString.c_str();
- ShardedPath.AppendSeparator();
- ShardedPath.AppendAsciiRange(str, str + 3);
-
- ShardedPath.AppendSeparator();
- ShardedPath.AppendAsciiRange(str + 3, str + 5);
-
- ShardedPath.AppendSeparator();
- ShardedPath.AppendAsciiRange(str + 5, str + 40);
-
- return ShardedPath.ToPath();
- }
-
- void AddStats(uint64_t UploadedBytes, uint64_t DownloadedBytes, uint64_t ElapsedNS)
- {
- m_SentBytes.fetch_add(UploadedBytes);
- m_ReceivedBytes.fetch_add(DownloadedBytes);
- m_RequestTimeNS.fetch_add(ElapsedNS);
- SetAtomicMax(m_PeakSentBytes, UploadedBytes);
- SetAtomicMax(m_PeakReceivedBytes, DownloadedBytes);
- if (ElapsedNS > 0)
- {
- uint64_t BytesPerSec = (gsl::narrow<uint64_t>(UploadedBytes + DownloadedBytes) * 1000000) / ElapsedNS;
- SetAtomicMax(m_PeakBytesPerSec, BytesPerSec);
- }
-
- m_RequestCount.fetch_add(1);
- }
-
- const std::string m_Name;
- const std::string m_OptionalBaseName;
- const std::filesystem::path m_OutputPath;
- bool m_EnableBlocks = true;
- bool m_UseTempBlocks = false;
-
- std::atomic_uint64_t m_SentBytes = {};
- std::atomic_uint64_t m_ReceivedBytes = {};
- std::atomic_uint64_t m_RequestTimeNS = {};
- std::atomic_uint64_t m_RequestCount = {};
- std::atomic_uint64_t m_PeakSentBytes = {};
- std::atomic_uint64_t m_PeakReceivedBytes = {};
- std::atomic_uint64_t m_PeakBytesPerSec = {};
-};
-
-std::shared_ptr<RemoteProjectStore>
-CreateFileRemoteStore(const FileRemoteStoreOptions& Options)
-{
- std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<LocalExportProjectStore>(Options.Name,
- Options.OptionalBaseName,
- std::filesystem::path(Options.FolderPath),
- Options.ForceDisableBlocks,
- Options.ForceEnableTempBlocks);
- return RemoteStore;
-}
-
-} // namespace zen
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.h b/src/zenserver/projectstore/fileremoteprojectstore.h
deleted file mode 100644
index 8da9692d5..000000000
--- a/src/zenserver/projectstore/fileremoteprojectstore.h
+++ /dev/null
@@ -1,20 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "remoteprojectstore.h"
-
-namespace zen {
-
-struct FileRemoteStoreOptions : RemoteStoreOptions
-{
- std::filesystem::path FolderPath;
- std::string Name;
- std::string OptionalBaseName;
- bool ForceDisableBlocks = false;
- bool ForceEnableTempBlocks = false;
-};
-
-std::shared_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options);
-
-} // namespace zen
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index c227c0f9e..9fc9e4605 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -2,8 +2,6 @@
#include "httpprojectstore.h"
-#include "remoteprojectstore.h"
-
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryutil.h>
@@ -16,18 +14,17 @@
#include <zencore/stream.h>
#include <zencore/trace.h>
#include <zenhttp/packageformat.h>
+#include <zenremotestore/projectstore/buildsremoteprojectstore.h>
+#include <zenremotestore/projectstore/fileremoteprojectstore.h>
+#include <zenremotestore/projectstore/jupiterremoteprojectstore.h>
+#include <zenremotestore/projectstore/remoteprojectstore.h>
+#include <zenremotestore/projectstore/zenremoteprojectstore.h>
#include <zenstore/oplogreferencedset.h>
#include <zenstore/projectstore.h>
#include <zenstore/zenstore.h>
#include <zenutil/openprocesscache.h>
#include <zenutil/workerpools.h>
-#include "buildsremoteprojectstore.h"
-#include "fileremoteprojectstore.h"
-#include "jupiterremoteprojectstore.h"
-#include "remoteprojectstore.h"
-#include "zenremoteprojectstore.h"
-
namespace zen {
const FLLMTag&
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
deleted file mode 100644
index da3f92353..000000000
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ /dev/null
@@ -1,402 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "jupiterremoteprojectstore.h"
-
-#include <zencore/compactbinaryutil.h>
-#include <zencore/compress.h>
-#include <zencore/fmtutils.h>
-
-#include <zenhttp/httpclientauth.h>
-
-#include <zenutil/jupiter/jupiterclient.h>
-#include <zenutil/jupiter/jupitersession.h>
-
-namespace zen {
-
-using namespace std::literals;
-
-class JupiterRemoteStore : public RemoteProjectStore
-{
-public:
- JupiterRemoteStore(Ref<JupiterClient>&& InJupiterClient,
- std::string_view Namespace,
- std::string_view Bucket,
- const IoHash& Key,
- const IoHash& OptionalBaseKey,
- bool ForceDisableBlocks,
- bool ForceDisableTempBlocks,
- const std::filesystem::path& TempFilePath)
- : m_JupiterClient(std::move(InJupiterClient))
- , m_Namespace(Namespace)
- , m_Bucket(Bucket)
- , m_Key(Key)
- , m_OptionalBaseKey(OptionalBaseKey)
- , m_TempFilePath(TempFilePath)
- , m_EnableBlocks(!ForceDisableBlocks)
- , m_UseTempBlocks(!ForceDisableTempBlocks)
- {
- }
-
- virtual RemoteStoreInfo GetInfo() const override
- {
- return {.CreateBlocks = m_EnableBlocks,
- .UseTempBlockFiles = m_UseTempBlocks,
- .AllowChunking = true,
- .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key),
- .Description = fmt::format("[cloud] {}. SessionId: {}. {}/{}/{}{}"sv,
- m_JupiterClient->ServiceUrl(),
- m_JupiterClient->Client().GetSessionId(),
- m_Namespace,
- m_Bucket,
- m_Key,
- m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format(" Base {}", m_OptionalBaseKey))};
- }
-
- virtual Stats GetStats() const override
- {
- return {.m_SentBytes = m_SentBytes.load(),
- .m_ReceivedBytes = m_ReceivedBytes.load(),
- .m_RequestTimeNS = m_RequestTimeNS.load(),
- .m_RequestCount = m_RequestCount.load(),
- .m_PeakSentBytes = m_PeakSentBytes.load(),
- .m_PeakReceivedBytes = m_PeakReceivedBytes.load(),
- .m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
- }
-
- virtual CreateContainerResult CreateContainer() override
- {
- // Nothing to do here
- return {};
- }
-
- virtual SaveResult SaveContainer(const IoBuffer& Payload) override
- {
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
- AddStats(PutResult);
-
- SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_Key,
- Result.Reason);
- }
- return Result;
- }
-
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override
- {
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
- AddStats(PutResult);
-
- SaveAttachmentResult Result{ConvertResult(PutResult)};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- RawHash,
- Result.Reason);
- }
- return Result;
- }
-
- virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
- {
- SaveAttachmentsResult Result;
- for (const SharedBuffer& Chunk : Chunks)
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
- SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {});
- if (ChunkResult.ErrorCode)
- {
- return SaveAttachmentsResult{ChunkResult};
- }
- }
- return Result;
- }
-
- virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override
- {
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
- AddStats(FinalizeRefResult);
-
- FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_Key,
- Result.Reason);
- }
- return Result;
- }
-
- virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Key); }
-
- virtual GetKnownBlocksResult GetKnownBlocks() override
- {
- if (m_OptionalBaseKey == IoHash::Zero)
- {
- return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
- }
- LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseKey);
- if (LoadResult.ErrorCode)
- {
- return GetKnownBlocksResult{LoadResult};
- }
- std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject);
- if (BlockHashes.empty())
- {
- return GetKnownBlocksResult{
- {.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds}};
- }
-
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterExistsResult ExistsResult =
- Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(BlockHashes.begin(), BlockHashes.end()));
- AddStats(ExistsResult);
-
- if (ExistsResult.ErrorCode)
- {
- return GetKnownBlocksResult{{.ErrorCode = ExistsResult.ErrorCode,
- .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds,
- .Reason = fmt::format("Failed checking attachment existance in {}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- ExistsResult.Reason)}};
- }
-
- Stopwatch Timer;
- std::vector<IoHash> ExistingBlockHashes;
- for (const IoHash& RawHash : BlockHashes)
- {
- if (!ExistsResult.Needs.contains(RawHash))
- {
- ExistingBlockHashes.push_back(RawHash);
- }
- }
- if (ExistingBlockHashes.empty())
- {
- return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent),
- .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds}};
- }
- std::vector<ThinChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes);
-
- GetKnownBlocksResult Result{
- {.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000.0}};
- Result.Blocks = std::move(KnownBlocks);
- return Result;
- }
-
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
- {
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
- AddStats(GetResult);
-
- LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
- if (GetResult.ErrorCode)
- {
- Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- RawHash,
- Result.Reason);
- }
- return Result;
- }
-
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
- {
- LoadAttachmentsResult Result;
- for (const IoHash& Hash : RawHashes)
- {
- LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
- if (ChunkResult.ErrorCode)
- {
- return LoadAttachmentsResult{ChunkResult};
- }
- ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000)));
- Result.Chunks.emplace_back(
- std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))});
- }
- return Result;
- }
-
-private:
- LoadContainerResult LoadContainer(const IoHash& Key)
- {
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject);
- AddStats(GetResult);
- if (GetResult.ErrorCode || !GetResult.Success)
- {
- LoadContainerResult Result{ConvertResult(GetResult)};
- Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- Key,
- Result.Reason);
- return Result;
- }
-
- CbValidateError ValidateResult = CbValidateError::None;
- if (CbObject ContainerObject = ValidateAndReadCompactBinaryObject(IoBuffer(GetResult.Response), ValidateResult);
- ValidateResult != CbValidateError::None || !ContainerObject)
- {
- return LoadContainerResult{
- RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- .ElapsedSeconds = GetResult.ElapsedSeconds,
- .Reason = fmt::format("The ref {}/{}/{}/{} is not formatted as a compact binary object"sv,
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- Key)},
- {}};
- }
- else
- {
- return LoadContainerResult{ConvertResult(GetResult), std::move(ContainerObject)};
- }
- }
-
- void AddStats(const JupiterResult& Result)
- {
- m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes));
- m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes));
- m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000));
- SetAtomicMax(m_PeakSentBytes, Result.SentBytes);
- SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes);
- if (Result.ElapsedSeconds > 0.0)
- {
- uint64_t BytesPerSec = static_cast<uint64_t>((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds);
- SetAtomicMax(m_PeakBytesPerSec, BytesPerSec);
- }
-
- m_RequestCount.fetch_add(1);
- }
-
- static Result ConvertResult(const JupiterResult& Response)
- {
- std::string Text;
- int32_t ErrorCode = 0;
- if (Response.ErrorCode != 0 || !Response.Success)
- {
- if (Response.Response)
- {
- HttpContentType ContentType = Response.Response.GetContentType();
- if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON)
- {
- ExtendableStringBuilder<256> SB;
- SB.Append("\n");
- SB.Append(std::string_view(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()),
- Response.Response.GetSize()));
- Text = SB.ToString();
- }
- else if (ContentType == ZenContentType::kCbObject)
- {
- ExtendableStringBuilder<256> SB;
- SB.Append("\n");
- CompactBinaryToJson(Response.Response.GetView(), SB);
- Text = SB.ToString();
- }
- }
- }
- if (Response.ErrorCode != 0)
- {
- ErrorCode = Response.ErrorCode;
- }
- else if (!Response.Success)
- {
- ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- }
- return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text};
- }
-
- Ref<JupiterClient> m_JupiterClient;
- const std::string m_Namespace;
- const std::string m_Bucket;
- const IoHash m_Key;
- const IoHash m_OptionalBaseKey;
- std::filesystem::path m_TempFilePath;
- const bool m_EnableBlocks = true;
- const bool m_UseTempBlocks = true;
- const bool m_AllowRedirect = false;
-
- std::atomic_uint64_t m_SentBytes = {};
- std::atomic_uint64_t m_ReceivedBytes = {};
- std::atomic_uint64_t m_RequestTimeNS = {};
- std::atomic_uint64_t m_RequestCount = {};
- std::atomic_uint64_t m_PeakSentBytes = {};
- std::atomic_uint64_t m_PeakReceivedBytes = {};
- std::atomic_uint64_t m_PeakBytesPerSec = {};
-};
-
-std::shared_ptr<RemoteProjectStore>
-CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet, bool Unattended)
-{
- std::string Url = Options.Url;
- if (Url.find("://"sv) == std::string::npos)
- {
- // Assume https URL
- Url = fmt::format("https://{}"sv, Url);
- }
- JupiterClientOptions ClientOptions{.Name = "Remote store"sv,
- .ServiceUrl = Url,
- .ConnectTimeout = std::chrono::milliseconds(2000),
- .Timeout = std::chrono::milliseconds(1800000),
- .AssumeHttp2 = Options.AssumeHttp2,
- .AllowResume = true,
- .RetryCount = 4};
- // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
- // 2) Access token as parameter in request
- // 3) Environment variable (different win vs linux/mac)
- // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
-
- std::function<HttpClientAccessToken()> TokenProvider;
- if (!Options.OpenIdProvider.empty())
- {
- TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider);
- }
- else if (!Options.AccessToken.empty())
- {
- TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken);
- }
- else if (!Options.OidcExePath.empty())
- {
- if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url, Quiet, Unattended);
- TokenProviderMaybe)
- {
- TokenProvider = TokenProviderMaybe.value();
- }
- }
-
- if (!TokenProvider)
- {
- TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager);
- }
-
- Ref<JupiterClient> Client(new JupiterClient(ClientOptions, std::move(TokenProvider)));
-
- std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(Client),
- Options.Namespace,
- Options.Bucket,
- Options.Key,
- Options.OptionalBaseKey,
- Options.ForceDisableBlocks,
- Options.ForceDisableTempBlocks,
- TempFilePath);
- return RemoteStore;
-}
-
-} // namespace zen
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h
deleted file mode 100644
index d7a22700a..000000000
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.h
+++ /dev/null
@@ -1,32 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "remoteprojectstore.h"
-
-namespace zen {
-
-class AuthMgr;
-
-struct JupiterRemoteStoreOptions : RemoteStoreOptions
-{
- std::string Url;
- std::string Namespace;
- std::string Bucket;
- IoHash Key;
- IoHash OptionalBaseKey;
- std::string OpenIdProvider;
- std::string AccessToken;
- AuthMgr& AuthManager;
- std::filesystem::path OidcExePath;
- bool ForceDisableBlocks = false;
- bool ForceDisableTempBlocks = false;
- bool AssumeHttp2 = false;
-};
-
-std::shared_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options,
- const std::filesystem::path& TempFilePath,
- bool Quiet,
- bool Unattended);
-
-} // namespace zen
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
deleted file mode 100644
index c31c2297e..000000000
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ /dev/null
@@ -1,3519 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "remoteprojectstore.h"
-
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryutil.h>
-#include <zencore/compress.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/scopeguard.h>
-#include <zencore/stream.h>
-#include <zencore/timer.h>
-#include <zencore/workthreadpool.h>
-#include <zenhttp/httpcommon.h>
-#include <zenstore/cidstore.h>
-#include <zenutil/chunkedfile.h>
-#include <zenutil/workerpools.h>
-
-#include <unordered_map>
-
-#if ZEN_WITH_TESTS
-# include <zencore/testing.h>
-# include <zencore/testutils.h>
-# include "fileremoteprojectstore.h"
-#endif // ZEN_WITH_TESTS
-
-namespace zen {
-
-/*
- OplogContainer
- Binary("ops") // Compressed CompactBinary object to hide attachment references, also makes the oplog smaller
- {
- CbArray("ops")
- {
- CbObject Op
- (CbFieldType::BinaryAttachment Attachments[])
- (OpData)
- }
- }
- CbArray("blocks")
- CbObject
- CbFieldType::BinaryAttachment "rawhash" // Optional, only if we are creating blocks (Jupiter/File)
- CbArray("chunks")
- CbFieldType::Hash // Chunk hashes
- CbArray("chunks") // Optional, only if we are not creating blocks (Zen)
- CbFieldType::BinaryAttachment // Chunk attachment hashes
-
- CbArray("chunkedfiles");
- CbFieldType::Hash "rawhash"
- CbFieldType::Integer "rawsize"
- CbArray("chunks");
- CbFieldType::Hash "chunkhash"
- CbArray("sequence");
- CbFieldType::Integer chunks index
-
- CompressedBinary ChunkBlock
- {
- VarUInt ChunkCount
- VarUInt ChunkSizes[ChunkCount]
- uint8_t[chunksize])[ChunkCount]
- }
-*/
-namespace remotestore_impl {
- ////////////////////////////// AsyncRemoteResult
-
- struct AsyncRemoteResult
- {
- void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText)
- {
- int32_t Expected = 0;
- if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1))
- {
- m_ErrorReason = ErrorReason;
- m_ErrorText = ErrorText;
- }
- }
- bool IsError() const { return m_ErrorCode.load() != 0; }
- int GetError() const { return m_ErrorCode.load(); };
- const std::string& GetErrorReason() const { return m_ErrorReason; };
- const std::string& GetErrorText() const { return m_ErrorText; };
- RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const
- {
- return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText};
- }
-
- private:
- std::atomic<int32_t> m_ErrorCode = 0;
- std::string m_ErrorReason;
- std::string m_ErrorText;
- };
-
- void ReportProgress(JobContext* OptionalContext,
- std::string_view CurrentOp,
- std::string_view Details,
- ptrdiff_t Total,
- ptrdiff_t Remaining)
- {
- if (OptionalContext)
- {
- ZEN_ASSERT(Total > 0);
- OptionalContext->ReportProgress(CurrentOp, Details, Total, Remaining);
- }
- }
-
- void ReportMessage(JobContext* OptionalContext, std::string_view Message)
- {
- if (OptionalContext)
- {
- OptionalContext->ReportMessage(Message);
- }
- ZEN_INFO("{}", Message);
- }
-
- bool IsCancelled(JobContext* OptionalContext)
- {
- if (!OptionalContext)
- {
- return false;
- }
- return OptionalContext->IsCancelled();
- }
-
- std::string GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS)
- {
- return fmt::format(
- "Sent: {} ({}bits/s) Recv: {} ({}bits/s)",
- NiceBytes(Stats.m_SentBytes),
- NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u),
- NiceBytes(Stats.m_ReceivedBytes),
- NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u));
- }
-
- void LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats)
- {
- ZEN_INFO("Oplog request count: {}. Average request size: {}. Average request time: {}, Peak request speed: {}bits/s",
- Stats.m_RequestCount,
- NiceBytes(Stats.m_RequestCount > 0 ? (Stats.m_ReceivedBytes + Stats.m_SentBytes) / Stats.m_RequestCount : 0u),
- NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_RequestTimeNS / Stats.m_RequestCount) : 0u),
- NiceNum(Stats.m_PeakBytesPerSec));
- ZEN_INFO(
- "Oplog sent request avg: {} ({}bits/s). Peak: {}",
- NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_SentBytes) / Stats.m_RequestCount) : 0u),
- NiceNum(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 8 * 1000000000) / Stats.m_RequestTimeNS) : 0u),
- NiceBytes(Stats.m_PeakSentBytes));
- ZEN_INFO(
- "Oplog recv request avg: {} ({}bits/s). Peak: {}",
- NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes) / Stats.m_RequestCount) : 0u),
- NiceNum(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 8 * 1000000000) / Stats.m_RequestTimeNS)
- : 0u),
- NiceBytes(Stats.m_PeakReceivedBytes));
- }
-
- size_t AddBlock(RwLock& BlocksLock, std::vector<ChunkBlockDescription>& Blocks)
- {
- size_t BlockIndex;
- {
- RwLock::ExclusiveLockScope _(BlocksLock);
- BlockIndex = Blocks.size();
- Blocks.resize(BlockIndex + 1);
- }
- return BlockIndex;
- }
-
- RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext)
- {
- using namespace std::literals;
-
- Stopwatch Timer;
-
- CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
- const uint64_t OpCount = OpsArray.Num();
-
- ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount));
-
- const size_t OpsBatchSize = 8192;
- std::vector<uint8_t> OpsData;
- std::vector<size_t> OpDataOffsets;
- size_t OpsCompleteCount = 0;
-
- OpsData.reserve(OpsBatchSize);
-
- auto AppendBatch = [&]() {
- std::vector<CbObjectView> Ops;
- Ops.reserve(OpDataOffsets.size());
- for (size_t OpDataOffset : OpDataOffsets)
- {
- Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset]));
- }
- std::vector<ProjectStore::LogSequenceNumber> OpLsns = Oplog.AppendNewOplogEntries(Ops);
- OpsCompleteCount += OpLsns.size();
- OpsData.clear();
- OpDataOffsets.clear();
- ReportProgress(OptionalContext,
- "Writing oplog"sv,
- fmt::format("{} remaining...", OpCount - OpsCompleteCount),
- OpCount,
- OpCount - OpsCompleteCount);
- };
-
- BinaryWriter Writer;
- for (CbFieldView OpEntry : OpsArray)
- {
- CbObjectView Op = OpEntry.AsObjectView();
- Op.CopyTo(Writer);
- OpDataOffsets.push_back(OpsData.size());
- OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize());
- Writer.Reset();
-
- if (OpDataOffsets.size() == OpsBatchSize)
- {
- AppendBatch();
- }
- }
- if (!OpDataOffsets.empty())
- {
- AppendBatch();
- }
-
- if (OpCount > 0)
- {
- ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0);
- }
-
- return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0};
- }
-
- struct DownloadInfo
- {
- uint64_t OplogSizeBytes = 0;
- std::atomic<uint64_t> AttachmentsDownloaded = 0;
- std::atomic<uint64_t> AttachmentBlocksDownloaded = 0;
- std::atomic<uint64_t> AttachmentBytesDownloaded = 0;
- std::atomic<uint64_t> AttachmentBlockBytesDownloaded = 0;
- std::atomic<uint64_t> AttachmentsStored = 0;
- std::atomic<uint64_t> AttachmentBytesStored = 0;
- std::atomic_size_t MissingAttachmentCount = 0;
- };
-
- void DownloadAndSaveBlockChunks(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
- DownloadInfo& Info,
- Stopwatch& LoadAttachmentsTimer,
- std::atomic_uint64_t& DownloadStartMS,
- const std::vector<IoHash>& Chunks)
- {
- AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork(
- [&RemoteStore,
- &ChunkStore,
- &WorkerPool,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &RemoteResult,
- Chunks = Chunks,
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
- if (Result.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to load attachments with {} chunks ({}): {}",
- Chunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- Info.MissingAttachmentCount.fetch_add(1);
- if (IgnoreMissingAttachments)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- }
- return;
- }
- Info.AttachmentsDownloaded.fetch_add(Chunks.size());
- ZEN_INFO("Loaded {} bulk attachments in {}",
- Chunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
- if (RemoteResult.IsError())
- {
- return;
- }
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- if (!Chunks.empty())
- {
- try
- {
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
- WriteAttachmentBuffers.reserve(Chunks.size());
- WriteRawHashes.reserve(Chunks.size());
-
- for (const auto& It : Chunks)
- {
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
- WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.push_back(It.first);
- }
- std::vector<CidStore::InsertResult> InsertResults =
- ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
-
- for (size_t Index = 0; Index < InsertResults.size(); Index++)
- {
- if (InsertResults[Index].New)
- {
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
- }
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk save {} attachments", Chunks.size()),
- Ex.what());
- }
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to bulk load {} attachments", Chunks.size()),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- };
-
- void DownloadAndSaveBlock(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
- DownloadInfo& Info,
- Stopwatch& LoadAttachmentsTimer,
- std::atomic_uint64_t& DownloadStartMS,
- const IoHash& BlockHash,
- const std::vector<IoHash>& Chunks,
- uint32_t RetriesLeft)
- {
- AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork(
- [&AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
- BlockHash,
- &RemoteResult,
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
- RetriesLeft,
- Chunks = Chunks]() {
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
- if (BlockResult.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to download block attachment {} ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
- }
- return;
- }
- if (RemoteResult.IsError())
- {
- return;
- }
- uint64_t BlockSize = BlockResult.Bytes.GetSize();
- Info.AttachmentBlocksDownloaded.fetch_add(1);
- ZEN_INFO("Loaded block attachment '{}' in {} ({})",
- BlockHash,
- NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
- NiceBytes(BlockSize));
- Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
-
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
- BlockHash,
- &RemoteResult,
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
- RetriesLeft,
- Chunks = Chunks,
- Bytes = std::move(BlockResult.Bytes)]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- ZEN_ASSERT(Bytes.Size() > 0);
- std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
- WantedChunks.reserve(Chunks.size());
- WantedChunks.insert(Chunks.begin(), Chunks.end());
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
-
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize);
- if (!Compressed)
- {
- if (RetriesLeft > 0)
- {
- ReportMessage(
- OptionalContext,
- fmt::format(
- "Block attachment {} is malformed, can't parse as compressed binary, retrying download",
- BlockHash));
- return DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockHash,
- std::move(Chunks),
- RetriesLeft - 1);
- }
- ReportMessage(
- OptionalContext,
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash));
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash),
- {});
- return;
- }
- SharedBuffer BlockPayload = Compressed.Decompress();
- if (!BlockPayload)
- {
- if (RetriesLeft > 0)
- {
- ReportMessage(
- OptionalContext,
- fmt::format("Block attachment {} is malformed, can't decompress payload, retrying download",
- BlockHash));
- return DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockHash,
- std::move(Chunks),
- RetriesLeft - 1);
- }
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash));
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash),
- {});
- return;
- }
- if (RawHash != BlockHash)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
- RemoteResult.SetError(
- gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
- {});
- return;
- }
-
- uint64_t BlockHeaderSize = 0;
- bool StoreChunksOK = IterateChunkBlock(
- BlockPayload,
- [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
- const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
- {
- WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
- IoHash RawHash;
- uint64_t RawSize;
- ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(),
- RawHash,
- RawSize));
- ZEN_ASSERT(RawHash == AttachmentRawHash);
- WriteRawHashes.emplace_back(AttachmentRawHash);
- WantedChunks.erase(AttachmentRawHash);
- }
- },
- BlockHeaderSize);
-
- if (!StoreChunksOK)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has invalid format ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Invalid format for block {}", BlockHash),
- {});
- return;
- }
-
- ZEN_ASSERT(WantedChunks.empty());
-
- if (!WriteAttachmentBuffers.empty())
- {
- auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (size_t Index = 0; Index < Results.size(); Index++)
- {
- const auto& Result = Results[Index];
- if (Result.New)
- {
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
- }
- }
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed save block attachment {}", BlockHash),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to block attachment {}", BlockHash),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- };
-
- void DownloadAndSaveAttachment(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- Latch& AttachmentsDownloadLatch,
- Latch& AttachmentsWriteLatch,
- AsyncRemoteResult& RemoteResult,
- DownloadInfo& Info,
- Stopwatch& LoadAttachmentsTimer,
- std::atomic_uint64_t& DownloadStartMS,
- const IoHash& RawHash)
- {
- AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork(
- [&RemoteStore,
- &ChunkStore,
- &WorkerPool,
- &RemoteResult,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- RawHash,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- &Info,
- IgnoreMissingAttachments,
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
- if (AttachmentResult.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to download large attachment {}: '{}', error code : {}",
- RawHash,
- AttachmentResult.Reason,
- AttachmentResult.ErrorCode));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
- }
- return;
- }
- uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
- ZEN_INFO("Loaded large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
- NiceBytes(AttachmentSize));
- Info.AttachmentsDownloaded.fetch_add(1);
- if (RemoteResult.IsError())
- {
- return;
- }
- Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
-
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&AttachmentsWriteLatch,
- &RemoteResult,
- &Info,
- &ChunkStore,
- RawHash,
- AttachmentSize,
- Bytes = std::move(AttachmentResult.Bytes),
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(AttachmentSize);
- Info.AttachmentsStored.fetch_add(1);
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Saving attachment {} failed", RawHash),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Loading attachment {} failed", RawHash),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- };
-
- void CreateBlock(WorkerThreadPool& WorkerPool,
- Latch& OpSectionsLatch,
- std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
- RwLock& SectionsLock,
- std::vector<ChunkBlockDescription>& Blocks,
- size_t BlockIndex,
- const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
- AsyncRemoteResult& RemoteResult)
- {
- OpSectionsLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&Blocks,
- &SectionsLock,
- &OpSectionsLatch,
- BlockIndex,
- Chunks = std::move(ChunksInBlock),
- &AsyncOnBlock,
- &RemoteResult]() mutable {
- auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- size_t ChunkCount = Chunks.size();
- try
- {
- ZEN_ASSERT(ChunkCount > 0);
- Stopwatch Timer;
- ChunkBlockDescription Block;
- CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block);
- IoHash BlockHash = CompressedBlock.DecodeRawHash();
- {
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope __(SectionsLock);
- Blocks[BlockIndex] = Block;
- }
- uint64_t BlockSize = CompressedBlock.GetCompressedSize();
- AsyncOnBlock(std::move(CompressedBlock), std::move(Block));
- ZEN_INFO("Generated block with {} attachments in {} ({})",
- ChunkCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- NiceBytes(BlockSize));
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- }
-
- struct UploadInfo
- {
- uint64_t OplogSizeBytes = 0;
- std::atomic<uint64_t> AttachmentsUploaded = 0;
- std::atomic<uint64_t> AttachmentBlocksUploaded = 0;
- std::atomic<uint64_t> AttachmentBytesUploaded = 0;
- std::atomic<uint64_t> AttachmentBlockBytesUploaded = 0;
- };
-
- struct CreatedBlock
- {
- IoBuffer Payload;
- ChunkBlockDescription Block;
- };
-
- void UploadAttachments(WorkerThreadPool& WorkerPool,
- CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- const std::unordered_set<IoHash, IoHash::Hasher>& LargeAttachments,
- const std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>>& BlockChunks,
- const std::unordered_map<IoHash, CreatedBlock, IoHash::Hasher>& CreatedBlocks,
- const tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LooseFileAttachments,
- const std::unordered_set<IoHash, IoHash::Hasher>& Needs,
- bool ForceAll,
- UploadInfo& Info,
- AsyncRemoteResult& RemoteResult,
- JobContext* OptionalContext)
- {
- using namespace std::literals;
-
- if (Needs.empty() && !ForceAll)
- {
- return;
- }
-
- ReportMessage(OptionalContext, "Filtering needed attachments for upload...");
-
- std::unordered_set<IoHash, IoHash::Hasher> AttachmentsToUpload;
- std::unordered_map<IoHash, FetchChunkFunc, IoHash::Hasher> BulkBlockAttachmentsToUpload;
-
- size_t BlockAttachmentCountToUpload = 0;
- size_t LargeAttachmentCountToUpload = 0;
- size_t BulkAttachmentCountToUpload = 0;
- AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size());
-
- std::unordered_set<IoHash, IoHash::Hasher> UnknownAttachments(Needs);
-
- for (const auto& CreatedBlock : CreatedBlocks)
- {
- if (ForceAll || Needs.contains(CreatedBlock.first))
- {
- AttachmentsToUpload.insert(CreatedBlock.first);
- BlockAttachmentCountToUpload++;
- UnknownAttachments.erase(CreatedBlock.first);
- }
- }
- for (const IoHash& LargeAttachment : LargeAttachments)
- {
- if (ForceAll || Needs.contains(LargeAttachment))
- {
- AttachmentsToUpload.insert(LargeAttachment);
- LargeAttachmentCountToUpload++;
- UnknownAttachments.erase(LargeAttachment);
- }
- }
- for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& BlockHashes : BlockChunks)
- {
- for (const std::pair<IoHash, FetchChunkFunc>& Chunk : BlockHashes)
- {
- if (ForceAll || Needs.contains(Chunk.first))
- {
- BulkBlockAttachmentsToUpload.insert(std::make_pair(Chunk.first, Chunk.second));
- BulkAttachmentCountToUpload++;
- UnknownAttachments.erase(Chunk.first);
- }
- }
- }
-
- if (AttachmentsToUpload.empty() && BulkBlockAttachmentsToUpload.empty())
- {
- ReportMessage(OptionalContext, "No attachments needed");
- return;
- }
-
- if (!UnknownAttachments.empty())
- {
- RemoteResult.SetError(
- gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Upload requested of {} missing attachments, the base container referenced blocks that are no longer available",
- UnknownAttachments.size()),
- "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return;
- }
-
- if (IsCancelled(OptionalContext))
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- }
- return;
- }
-
- ReportMessage(OptionalContext,
- fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)",
- AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(),
- BlockAttachmentCountToUpload,
- LargeAttachmentCountToUpload,
- BulkAttachmentCountToUpload));
-
- Stopwatch Timer;
-
- ptrdiff_t AttachmentsToSave(0);
- Latch SaveAttachmentsLatch(1);
-
- for (const IoHash& RawHash : AttachmentsToUpload)
- {
- if (RemoteResult.IsError())
- {
- break;
- }
-
- SaveAttachmentsLatch.AddCount(1);
- AttachmentsToSave++;
- WorkerPool.ScheduleWork(
- [&ChunkStore,
- &RemoteStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- RawHash,
- &CreatedBlocks,
- &LooseFileAttachments,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- IoBuffer Payload;
- ChunkBlockDescription Block;
- if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
- {
- Payload = BlockIt->second.Payload;
- Block = BlockIt->second.Block;
- }
- else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
- {
- Payload = LooseTmpFileIt->second(RawHash);
- }
- else
- {
- Payload = ChunkStore.FindChunkByCid(RawHash);
- }
- if (!Payload)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to find attachment {}", RawHash),
- {});
- ZEN_WARN("Failed to save attachment '{}' ({}): {}",
- RawHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- return;
- }
- const bool IsBlock = Block.BlockHash == RawHash;
- size_t PayloadSize = Payload.GetSize();
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash, std::move(Block));
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): {}",
- RawHash,
- NiceBytes(PayloadSize),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
- }
- if (IsBlock)
- {
- Info.AttachmentBlocksUploaded.fetch_add(1);
- Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved block attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
- }
- else
- {
- Info.AttachmentsUploaded.fetch_add(1);
- Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("To upload attachment {}", RawHash),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- }
-
- if (IsCancelled(OptionalContext))
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- }
- return;
- }
-
- if (!BulkBlockAttachmentsToUpload.empty())
- {
- for (const std::vector<std::pair<IoHash, FetchChunkFunc>>& Chunks : BlockChunks)
- {
- if (RemoteResult.IsError())
- {
- break;
- }
-
- std::vector<IoHash> NeededChunks;
- NeededChunks.reserve(Chunks.size());
- for (const std::pair<IoHash, FetchChunkFunc>& Chunk : Chunks)
- {
- const IoHash& ChunkHash = Chunk.first;
- if (BulkBlockAttachmentsToUpload.contains(ChunkHash) && !AttachmentsToUpload.contains(ChunkHash))
- {
- NeededChunks.push_back(Chunk.first);
- }
- }
- if (NeededChunks.empty())
- {
- continue;
- }
-
- SaveAttachmentsLatch.AddCount(1);
- AttachmentsToSave++;
- WorkerPool.ScheduleWork(
- [&RemoteStore,
- &ChunkStore,
- &SaveAttachmentsLatch,
- &RemoteResult,
- NeededChunks = std::move(NeededChunks),
- &BulkBlockAttachmentsToUpload,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- try
- {
- size_t ChunksSize = 0;
- std::vector<SharedBuffer> ChunkBuffers;
- ChunkBuffers.reserve(NeededChunks.size());
- for (const IoHash& Chunk : NeededChunks)
- {
- auto It = BulkBlockAttachmentsToUpload.find(Chunk);
- ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
- CompressedBuffer ChunkPayload = It->second(It->first).second;
- if (!ChunkPayload)
- {
- RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
- fmt::format("Missing chunk {}"sv, Chunk),
- fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
- ChunkBuffers.clear();
- break;
- }
- ChunksSize += ChunkPayload.GetCompressedSize();
- ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer()));
- }
- RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachments with {} chunks ({}): {}",
- NeededChunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
- }
- Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
- Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
-
- ZEN_INFO("Saved {} bulk attachments in {} ({})",
- NeededChunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(ChunksSize));
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to buck upload {} attachments", NeededChunks.size()),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- }
- }
-
- SaveAttachmentsLatch.CountDown();
- while (!SaveAttachmentsLatch.Wait(1000))
- {
- ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining();
- if (IsCancelled(OptionalContext))
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- }
- }
- uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs();
- ReportProgress(OptionalContext,
- "Saving attachments"sv,
- fmt::format("{} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
- AttachmentsToSave,
- Remaining);
- }
- uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs();
- if (AttachmentsToSave > 0)
- {
- ReportProgress(OptionalContext,
- "Saving attachments"sv,
- fmt::format("{}", GetStats(RemoteStore.GetStats(), ElapsedTimeMS)),
- AttachmentsToSave,
- 0);
- }
- ReportMessage(OptionalContext,
- fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}",
- AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(),
- BlockAttachmentCountToUpload,
- LargeAttachmentCountToUpload,
- BulkAttachmentCountToUpload,
- NiceTimeSpanMs(ElapsedTimeMS),
- GetStats(RemoteStore.GetStats(), ElapsedTimeMS)));
- }
-
-} // namespace remotestore_impl
-
-std::vector<IoHash>
-GetBlockHashesFromOplog(CbObjectView ContainerObject)
-{
- using namespace std::literals;
- std::vector<ChunkBlockDescription> Result;
- CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
-
- std::vector<IoHash> BlockHashes;
- BlockHashes.reserve(BlocksArray.Num());
- for (CbFieldView BlockField : BlocksArray)
- {
- CbObjectView BlockView = BlockField.AsObjectView();
- IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
- BlockHashes.push_back(BlockHash);
- }
- return BlockHashes;
-}
-
-std::vector<ThinChunkBlockDescription>
-GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes)
-{
- using namespace std::literals;
- std::vector<ThinChunkBlockDescription> Result;
- CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
- tsl::robin_set<IoHash, IoHash::Hasher> IncludeSet;
- IncludeSet.insert(IncludeBlockHashes.begin(), IncludeBlockHashes.end());
-
- Result.reserve(IncludeBlockHashes.size());
- for (CbFieldView BlockField : BlocksArray)
- {
- CbObjectView BlockView = BlockField.AsObjectView();
- IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
- if (IncludeSet.contains(BlockHash))
- {
- std::vector<IoHash> ChunkHashes;
- CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
- if (BlockHash == IoHash::Zero)
- {
- continue;
- }
- ChunkHashes.reserve(ChunksArray.Num());
- for (CbFieldView ChunkField : ChunksArray)
- {
- ChunkHashes.push_back(ChunkField.AsHash());
- }
- Result.push_back(ThinChunkBlockDescription{.BlockHash = BlockHash, .ChunkRawHashes = std::move(ChunkHashes)});
- }
- }
- return Result;
-}
-
-CbObject
-BuildContainer(CidStore& ChunkStore,
- ProjectStore::Project& Project,
- ProjectStore::Oplog& Oplog,
- size_t MaxBlockSize,
- size_t MaxChunksPerBlock,
- size_t MaxChunkEmbedSize,
- size_t ChunkFileSizeLimit,
- bool BuildBlocks,
- bool IgnoreMissingAttachments,
- bool AllowChunking,
- const std::vector<ThinChunkBlockDescription>& KnownBlocks,
- WorkerThreadPool& WorkerPool,
- const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
- const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
- const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
- bool EmbedLooseFiles,
- JobContext* OptionalContext,
- remotestore_impl::AsyncRemoteResult& RemoteResult)
-{
- using namespace std::literals;
-
- size_t OpCount = 0;
-
- CbObject OplogContainerObject;
- {
- struct FoundAttachment
- {
- std::filesystem::path RawPath; // If not stored in cid
- uint64_t Size = 0;
- Oid Key = Oid::Zero;
- };
-
- std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments;
-
- RwLock BlocksLock;
- std::vector<ChunkBlockDescription> Blocks;
- CompressedBuffer OpsBuffer;
-
- std::filesystem::path AttachmentTempPath = Oplog.TempPath();
- AttachmentTempPath.append(".pending");
- CreateDirectories(AttachmentTempPath);
-
- auto RewriteOp = [&](const Oid& Key, CbObjectView Op, const std::function<void(CbObjectView)>& CB) {
- bool OpRewritten = false;
- CbArrayView Files = Op["files"sv].AsArrayView();
- if (Files.Num() == 0)
- {
- CB(Op);
- return;
- }
-
- CbWriter Cbo;
- Cbo.BeginArray("files"sv);
-
- for (CbFieldView& Field : Files)
- {
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- CB(Op);
- return;
- }
-
- bool CopyField = true;
-
- if (CbObjectView View = Field.AsObjectView())
- {
- IoHash DataHash = View["data"sv].AsHash();
-
- if (DataHash == IoHash::Zero)
- {
- std::string_view ServerPath = View["serverpath"sv].AsString();
- std::filesystem::path FilePath = Project.RootDir / ServerPath;
- if (!IsFile(FilePath))
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId()));
- if (IgnoreMissingAttachments)
- {
- continue;
- }
- else
- {
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(FilePath.string());
- Sb.Append("' for op: \n");
- View.ToJson(Sb);
- throw std::runtime_error(Sb.ToString());
- }
- }
-
- {
- Stopwatch HashTimer;
- SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath));
- DataHash = IoHash::HashBuffer(CompositeBuffer(DataBuffer));
- ZEN_INFO("Hashed loose file '{}' {}: {} in {}",
- FilePath,
- NiceBytes(DataBuffer.GetSize()),
- DataHash,
- NiceTimeSpanMs(HashTimer.GetElapsedTimeMs()));
- }
-
- // Rewrite file array entry with new data reference
- CbObjectWriter Writer;
- RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
- if (Field.GetName() == "data"sv)
- {
- // omit this field as we will write it explicitly ourselves
- return true;
- }
- return false;
- });
- Writer.AddBinaryAttachment("data"sv, DataHash);
- UploadAttachments.insert_or_assign(DataHash, FoundAttachment{.RawPath = FilePath, .Key = Key});
-
- CbObject RewrittenOp = Writer.Save();
- Cbo.AddObject(std::move(RewrittenOp));
- CopyField = false;
- }
- }
-
- if (CopyField)
- {
- Cbo.AddField(Field);
- }
- else
- {
- OpRewritten = true;
- }
- }
-
- if (!OpRewritten)
- {
- CB(Op);
- return;
- }
-
- Cbo.EndArray();
- CbArray FilesArray = Cbo.Save().AsArray();
-
- CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
- if (Field.GetName() == "files"sv)
- {
- NewWriter.AddArray("files"sv, FilesArray);
-
- return true;
- }
-
- return false;
- });
- CB(RewrittenOp);
- };
-
- remotestore_impl::ReportMessage(OptionalContext, "Building exported oplog and collecting attachments");
-
- Stopwatch Timer;
-
- size_t TotalOpCount = Oplog.GetOplogEntryCount();
- CompressedBuffer CompressedOpsSection;
- {
- Stopwatch RewriteOplogTimer;
- CbObjectWriter SectionOpsWriter;
- SectionOpsWriter.BeginArray("ops"sv);
- {
- Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) {
- if (RemoteResult.IsError())
- {
- return;
- }
- Op.IterateAttachments([&](CbFieldView FieldView) {
- UploadAttachments.insert_or_assign(FieldView.AsAttachment(), FoundAttachment{.Key = Key});
- });
- if (EmbedLooseFiles)
- {
- RewriteOp(Key, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; });
- }
- else
- {
- SectionOpsWriter << Op;
- }
- OpCount++;
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return;
- }
- if (OpCount % 1000 == 0)
- {
- remotestore_impl::ReportProgress(OptionalContext,
- "Building oplog"sv,
- fmt::format("{} ops processed", OpCount),
- TotalOpCount,
- TotalOpCount - OpCount);
- }
- });
- if (RemoteResult.IsError())
- {
- return {};
- }
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
- if (TotalOpCount > 0)
- {
- remotestore_impl::ReportProgress(OptionalContext,
- "Building oplog"sv,
- fmt::format("{} ops processed", OpCount),
- TotalOpCount,
- 0);
- }
- }
- SectionOpsWriter.EndArray(); // "ops"
-
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Rewrote {} ops to new oplog in {}",
- OpCount,
- NiceTimeSpanMs(static_cast<uint64_t>(RewriteOplogTimer.GetElapsedTimeMs()))));
-
- {
- Stopwatch CompressOpsTimer;
- CompressedOpsSection =
- CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast);
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Compressed oplog section {} ({} -> {}) in {}",
- CompressedOpsSection.DecodeRawHash(),
- NiceBytes(CompressedOpsSection.DecodeRawSize()),
- NiceBytes(CompressedOpsSection.GetCompressedSize()),
- NiceTimeSpanMs(static_cast<uint64_t>(CompressOpsTimer.GetElapsedTimeMs()))));
- }
- }
-
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
-
- auto FindReuseBlocks = [](const std::vector<ThinChunkBlockDescription>& KnownBlocks,
- const std::unordered_set<IoHash, IoHash::Hasher>& Attachments,
- JobContext* OptionalContext) -> std::vector<size_t> {
- std::vector<size_t> ReuseBlockIndexes;
- if (!Attachments.empty() && !KnownBlocks.empty())
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Checking {} Attachments against {} known blocks for reuse", Attachments.size(), KnownBlocks.size()));
- Stopwatch ReuseTimer;
-
- for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++)
- {
- const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
- size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size();
- if (BlockAttachmentCount == 0)
- {
- continue;
- }
- size_t FoundAttachmentCount = 0;
- for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes)
- {
- if (Attachments.contains(KnownHash))
- {
- FoundAttachmentCount++;
- }
- }
-
- size_t ReusePercent = (FoundAttachmentCount * 100) / BlockAttachmentCount;
- // TODO: Configure reuse-level
- if (ReusePercent > 80)
- {
- ZEN_DEBUG("Reusing block {}. {} attachments found, usage level: {}%",
- KnownBlock.BlockHash,
- FoundAttachmentCount,
- ReusePercent);
- ReuseBlockIndexes.push_back(KnownBlockIndex);
- }
- else if (FoundAttachmentCount > 0)
- {
- ZEN_DEBUG("Skipping block {}. {} attachments found, usage level: {}%",
- KnownBlock.BlockHash,
- FoundAttachmentCount,
- ReusePercent);
- }
- }
- }
- return ReuseBlockIndexes;
- };
-
- std::unordered_set<IoHash, IoHash::Hasher> FoundHashes;
- FoundHashes.reserve(UploadAttachments.size());
- for (const auto& It : UploadAttachments)
- {
- FoundHashes.insert(It.first);
- }
-
- size_t ReusedAttachmentCount = 0;
- std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext);
- for (size_t KnownBlockIndex : ReusedBlockIndexes)
- {
- const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
- for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes)
- {
- if (UploadAttachments.erase(KnownHash) == 1)
- {
- ReusedAttachmentCount++;
- }
- }
- }
-
- struct ChunkedFile
- {
- IoBuffer Source;
-
- ChunkedInfoWithSource Chunked;
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkLoookup;
- };
- std::vector<ChunkedFile> ChunkedFiles;
-
- auto ChunkFile = [](const IoHash& RawHash, IoBuffer& RawData, const IoBufferFileReference& FileRef, JobContext*) -> ChunkedFile {
- ChunkedFile Chunked;
- Stopwatch Timer;
-
- uint64_t Offset = FileRef.FileChunkOffset;
- uint64_t Size = FileRef.FileChunkSize;
-
- BasicFile SourceFile;
- SourceFile.Attach(FileRef.FileHandle);
- auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); });
-
- Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams);
- ZEN_ASSERT(Chunked.Chunked.Info.RawHash == RawHash);
- Chunked.Source = RawData;
-
- ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}",
- RawHash,
- NiceBytes(Chunked.Chunked.Info.RawSize),
- Chunked.Chunked.Info.ChunkHashes.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
-
- return Chunked;
- };
-
- RwLock ResolveLock;
- std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes;
- std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
- std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments;
- std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments;
- std::unordered_set<IoHash, IoHash::Hasher> MissingHashes;
-
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount));
-
- Latch ResolveAttachmentsLatch(1);
- for (auto& It : UploadAttachments)
- {
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
-
- ResolveAttachmentsLatch.AddCount(1);
-
- WorkerPool.ScheduleWork(
- [&ChunkStore,
- UploadAttachment = &It.second,
- RawHash = It.first,
- &ResolveAttachmentsLatch,
- &ResolveLock,
- &ChunkedHashes,
- &LargeChunkHashes,
- &ChunkedUploadAttachments,
- &LooseUploadAttachments,
- &MissingHashes,
- &OnLargeAttachment,
- &AttachmentTempPath,
- &ChunkFile,
- &ChunkedFiles,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- AllowChunking,
- &RemoteResult,
- OptionalContext]() {
- auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- return;
- }
-
- try
- {
- if (!UploadAttachment->RawPath.empty())
- {
- const std::filesystem::path& FilePath = UploadAttachment->RawPath;
- IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath);
- if (RawData)
- {
- if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit)
- {
- IoBufferFileReference FileRef;
- (void)RawData.GetFileReference(FileRef);
-
- ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext);
- ResolveLock.WithExclusiveLock(
- [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
- ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
- ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
- for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
- {
- ChunkedHashes.insert(ChunkHash);
- }
- ChunkedFiles.emplace_back(std::move(Chunked));
- });
- }
- else if (RawData.GetSize() > (MaxChunkEmbedSize * 2))
- {
- // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't
- // it will be a loose attachment instead of going into a block
- OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) {
- size_t RawSize = RawData.GetSize();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::VeryFast);
-
- std::filesystem::path AttachmentPath = AttachmentTempPath;
- AttachmentPath.append(RawHash.ToHexString());
- IoBuffer TempAttachmentBuffer =
- WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath);
- ZEN_INFO("Saved temp attachment to '{}', {} ({})",
- AttachmentPath,
- NiceBytes(RawSize),
- NiceBytes(TempAttachmentBuffer.GetSize()));
- return TempAttachmentBuffer;
- });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
- }
- else
- {
- uint64_t RawSize = RawData.GetSize();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::VeryFast);
-
- std::filesystem::path AttachmentPath = AttachmentTempPath;
- AttachmentPath.append(RawHash.ToHexString());
-
- uint64_t CompressedSize = Compressed.GetCompressedSize();
- IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath);
- ZEN_INFO("Saved temp attachment to '{}', {} ({})",
- AttachmentPath,
- NiceBytes(RawSize),
- NiceBytes(TempAttachmentBuffer.GetSize()));
-
- if (CompressedSize > MaxChunkEmbedSize)
- {
- OnLargeAttachment(RawHash,
- [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
- }
- else
- {
- UploadAttachment->Size = CompressedSize;
- ResolveLock.WithExclusiveLock(
- [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
- LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
- });
- }
- }
- }
- else
- {
- ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
- }
- }
- else
- {
- IoBuffer Data = ChunkStore.FindChunkByCid(RawHash);
- if (Data)
- {
- auto GetForChunking =
- [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool {
- if (Data.IsWholeFile())
- {
- IoHash VerifyRawHash;
- uint64_t VerifyRawSize;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize);
- if (Compressed)
- {
- if (VerifyRawSize > ChunkFileSizeLimit)
- {
- OodleCompressor Compressor;
- OodleCompressionLevel CompressionLevel;
- uint64_t BlockSize;
- if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
- {
- if (CompressionLevel == OodleCompressionLevel::None)
- {
- CompositeBuffer Decompressed = Compressed.DecompressToComposite();
- if (Decompressed)
- {
- std::span<const SharedBuffer> Segments = Decompressed.GetSegments();
- if (Segments.size() == 1)
- {
- IoBuffer DecompressedData = Segments[0].AsIoBuffer();
- if (DecompressedData.GetFileReference(OutFileRef))
- {
- return true;
- }
- }
- }
- }
- }
- }
- }
- }
- return false;
- };
-
- IoBufferFileReference FileRef;
- if (AllowChunking && GetForChunking(ChunkFileSizeLimit, Data, FileRef))
- {
- ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext);
- ResolveLock.WithExclusiveLock(
- [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() {
- ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size());
- ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size());
- for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes)
- {
- ChunkedHashes.insert(ChunkHash);
- }
- ChunkedFiles.emplace_back(std::move(Chunked));
- });
- }
- else if (Data.GetSize() > MaxChunkEmbedSize)
- {
- OnLargeAttachment(RawHash,
- [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); });
- ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
- }
- else
- {
- UploadAttachment->Size = Data.GetSize();
- }
- }
- else
- {
- ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); });
- }
- }
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to resolve attachment {}", RawHash),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- }
- ResolveAttachmentsLatch.CountDown();
-
- while (!ResolveAttachmentsLatch.Wait(1000))
- {
- ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- while (!ResolveAttachmentsLatch.Wait(1000))
- {
- Remaining = ResolveAttachmentsLatch.Remaining();
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- fmt::format("Aborting, {} attachments remaining...", Remaining),
- UploadAttachments.size(),
- Remaining);
- }
- remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0);
- return {};
- }
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- fmt::format("{} remaining...", Remaining),
- UploadAttachments.size(),
- Remaining);
- }
- if (UploadAttachments.size() > 0)
- {
- remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, ""sv, UploadAttachments.size(), 0);
- }
-
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
-
- for (const IoHash& AttachmentHash : MissingHashes)
- {
- auto It = UploadAttachments.find(AttachmentHash);
- ZEN_ASSERT(It != UploadAttachments.end());
- std::optional<CbObject> Op = Oplog.GetOpByKey(It->second.Key);
- ZEN_ASSERT(Op.has_value());
-
- if (IgnoreMissingAttachments)
- {
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key));
- }
- else
- {
- ExtendableStringBuilder<1024> Sb;
- Sb.Append("Failed to find attachment '");
- Sb.Append(AttachmentHash.ToHexString());
- Sb.Append("' for op: \n");
- Op.value().ToJson(Sb);
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
- return {};
- }
- UploadAttachments.erase(AttachmentHash);
- }
-
- for (const auto& It : ChunkedUploadAttachments)
- {
- UploadAttachments.erase(It.first);
- }
- for (const auto& It : LargeChunkHashes)
- {
- UploadAttachments.erase(It);
- }
-
- std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext);
- for (size_t KnownBlockIndex : ReusedBlockIndexes)
- {
- const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
- for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes)
- {
- if (ChunkedHashes.erase(KnownHash) == 1)
- {
- ReusedAttachmentCount++;
- }
- }
- }
-
- ReusedBlockIndexes.insert(ReusedBlockIndexes.end(), ReusedBlockFromChunking.begin(), ReusedBlockFromChunking.end());
- std::sort(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end());
- auto UniqueKnownBlocksEnd = std::unique(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end());
- size_t ReuseBlockCount = std::distance(ReusedBlockIndexes.begin(), UniqueKnownBlocksEnd);
- if (ReuseBlockCount > 0)
- {
- Blocks.reserve(ReuseBlockCount);
- for (auto It = ReusedBlockIndexes.begin(); It != UniqueKnownBlocksEnd; It++)
- {
- Blocks.push_back({KnownBlocks[*It]});
- }
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount));
- }
-
- std::vector<std::pair<IoHash, Oid>> SortedUploadAttachments;
- SortedUploadAttachments.reserve(UploadAttachments.size());
- for (const auto& It : UploadAttachments)
- {
- SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key));
- }
-
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
-
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount));
-
- // Sort attachments so we get predictable blocks for the same oplog upload
- std::sort(SortedUploadAttachments.begin(),
- SortedUploadAttachments.end(),
- [](const std::pair<IoHash, Oid>& Lhs, const std::pair<IoHash, Oid>& Rhs) {
- if (Lhs.second == Rhs.second)
- {
- // Same key, sort by raw hash
- return Lhs.first < Rhs.first;
- }
- // Sort by key
- return Lhs.second < Rhs.second;
- });
-
- std::vector<size_t> ChunkedFilesOrder;
- ChunkedFilesOrder.reserve(ChunkedFiles.size());
- for (size_t Index = 0; Index < ChunkedFiles.size(); Index++)
- {
- ChunkedFilesOrder.push_back(Index);
- }
- std::sort(ChunkedFilesOrder.begin(), ChunkedFilesOrder.end(), [&ChunkedFiles](size_t Lhs, size_t Rhs) {
- return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash;
- });
-
- // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded
- // ChunkedHashes contains all chunked up chunks to be composed into blocks
-
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments",
- SortedUploadAttachments.size(),
- ChunkedHashes.size(),
- TotalOpCount));
-
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return {};
- }
-
- // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded
- // ChunkedHashes contains all chunked up chunks to be composed into blocks
-
- size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size();
- size_t ChunksAssembled = 0;
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount));
-
- Latch BlockCreateLatch(1);
- size_t GeneratedBlockCount = 0;
- size_t BlockSize = 0;
- std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock;
-
- Oid LastOpKey = Oid::Zero;
- uint32_t ComposedBlocks = 0;
-
- uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs();
- try
- {
- uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs();
- std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
- auto NewBlock = [&]() {
- size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks);
- size_t ChunkCount = ChunksInBlock.size();
- if (BuildBlocks)
- {
- remotestore_impl::CreateBlock(WorkerPool,
- BlockCreateLatch,
- std::move(ChunksInBlock),
- BlocksLock,
- Blocks,
- BlockIndex,
- AsyncOnBlock,
- RemoteResult);
- ComposedBlocks++;
- }
- else
- {
- ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
- OnBlockChunks(std::move(ChunksInBlock));
- }
- {
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunkRawHashes.insert(Blocks[BlockIndex].ChunkRawHashes.end(),
- BlockAttachmentHashes.begin(),
- BlockAttachmentHashes.end());
- }
- uint64_t NowMS = Timer.GetElapsedTimeMs();
- ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
- BlockIndex,
- ChunkCount,
- NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
- NiceBytes(BlockSize));
- FetchAttachmentsStartMS = NowMS;
- BlockAttachmentHashes.clear();
- ChunksInBlock.clear();
- BlockSize = 0;
- GeneratedBlockCount++;
- };
-
- for (auto HashIt = SortedUploadAttachments.begin(); HashIt != SortedUploadAttachments.end(); HashIt++)
- {
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- break;
- }
- if (ChunksAssembled % 1000 == 0)
- {
- remotestore_impl::ReportProgress(
- OptionalContext,
- "Assembling blocks"sv,
- fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
- ChunkAssembleCount,
- ChunkAssembleCount - ChunksAssembled);
- }
- const IoHash& RawHash(HashIt->first);
- const Oid CurrentOpKey = HashIt->second;
- const IoHash& AttachmentHash(HashIt->first);
- auto InfoIt = UploadAttachments.find(RawHash);
- ZEN_ASSERT(InfoIt != UploadAttachments.end());
- uint64_t PayloadSize = InfoIt->second.Size;
-
- if (BlockAttachmentHashes.insert(AttachmentHash).second)
- {
- if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end())
- {
- ChunksInBlock.emplace_back(std::make_pair(
- RawHash,
- [RawSize = It->second.first,
- IoBuffer = SharedBuffer(It->second.second)](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
- return std::make_pair(RawSize, CompressedBuffer::FromCompressedNoValidate(IoBuffer.AsIoBuffer()));
- }));
- LooseUploadAttachments.erase(It);
- }
- else
- {
- ChunksInBlock.emplace_back(
- std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> {
- IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash);
- if (!Chunk)
- {
- throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash));
- }
- IoHash ValidateRawHash;
- uint64_t RawSize = 0;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), ValidateRawHash, RawSize);
- if (!Compressed)
- {
- throw std::runtime_error(
- fmt::format("Chunk {} in cid store is malformed (not a compressed buffer)", RawHash));
- }
- if (RawHash != ValidateRawHash)
- {
- throw std::runtime_error(
- fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash));
- }
- return {RawSize, Compressed};
- }));
- }
- BlockSize += PayloadSize;
-
- if ((BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock) && (CurrentOpKey != LastOpKey))
- {
- NewBlock();
- }
- LastOpKey = CurrentOpKey;
- ChunksAssembled++;
- }
- }
- if (!RemoteResult.IsError())
- {
- // Keep the chunked files as separate blocks to make the blocks generated
- // more consistent
- if (BlockSize > 0)
- {
- NewBlock();
- }
-
- for (size_t ChunkedFileIndex : ChunkedFilesOrder)
- {
- const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex];
- const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked;
- size_t ChunkCount = Chunked.Info.ChunkHashes.size();
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++)
- {
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- break;
- }
- if (ChunksAssembled % 1000 == 0)
- {
- remotestore_impl::ReportProgress(
- OptionalContext,
- "Assembling blocks"sv,
- fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
- ChunkAssembleCount,
- ChunkAssembleCount - ChunksAssembled);
- }
- const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex];
- if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end())
- {
- if (BlockAttachmentHashes.insert(ChunkHash).second)
- {
- const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex];
- ChunksInBlock.emplace_back(
- std::make_pair(ChunkHash,
- [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](
- const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
- return {Size,
- CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::None)};
- }));
- BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size;
- if (BuildBlocks)
- {
- if (BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock)
- {
- NewBlock();
- }
- }
- ChunksAssembled++;
- }
- ChunkedHashes.erase(FindIt);
- }
- }
- }
- }
-
- if (BlockSize > 0 && !RemoteResult.IsError())
- {
- if (!remotestore_impl::IsCancelled(OptionalContext))
- {
- NewBlock();
- }
- }
-
- if (ChunkAssembleCount > 0)
- {
- remotestore_impl::ReportProgress(
- OptionalContext,
- "Assembling blocks"sv,
- fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
- ChunkAssembleCount,
- 0);
- }
-
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}",
- ChunkAssembleCount,
- TotalOpCount,
- GeneratedBlockCount,
- NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
-
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
- {
- ptrdiff_t Remaining = BlockCreateLatch.Remaining();
- remotestore_impl::ReportProgress(OptionalContext,
- "Assembling blocks"sv,
- fmt::format("Aborting, {} blocks remaining...", Remaining),
- GeneratedBlockCount,
- Remaining);
- }
- if (GeneratedBlockCount > 0)
- {
- remotestore_impl::ReportProgress(OptionalContext,
- "Assembling blocks"sv,
- fmt::format("Aborting, {} blocks remaining...", 0),
- GeneratedBlockCount,
- 0);
- }
- return {};
- }
- }
- catch (const std::exception& Ex)
- {
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
- {
- }
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), "Block creation failed", Ex.what());
- throw;
- }
-
- BlockCreateLatch.CountDown();
- while (!BlockCreateLatch.Wait(1000))
- {
- ptrdiff_t Remaining = BlockCreateLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- while (!BlockCreateLatch.Wait(1000))
- {
- Remaining = BlockCreateLatch.Remaining();
- remotestore_impl::ReportProgress(OptionalContext,
- "Creating blocks"sv,
- fmt::format("Aborting, {} blocks remaining...", Remaining),
- GeneratedBlockCount,
- Remaining);
- }
- remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, "Aborted"sv, GeneratedBlockCount, 0);
- return {};
- }
- remotestore_impl::ReportProgress(OptionalContext,
- "Creating blocks"sv,
- fmt::format("{} remaining...", Remaining),
- GeneratedBlockCount,
- Remaining);
- }
-
- if (GeneratedBlockCount > 0)
- {
- uint64_t NowMS = Timer.GetElapsedTimeMs();
- remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, ""sv, GeneratedBlockCount, 0);
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Created {} blocks in {}", GeneratedBlockCount, NiceTimeSpanMs(NowMS - CreateBlocksStartMS)));
- }
-
- if (!RemoteResult.IsError())
- {
- CbObjectWriter OplogContinerWriter;
- RwLock::SharedLockScope _(BlocksLock);
- OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer());
- OplogContinerWriter.BeginArray("blocks"sv);
- {
- for (const ChunkBlockDescription& B : Blocks)
- {
- ZEN_ASSERT(!B.ChunkRawHashes.empty());
- if (BuildBlocks)
- {
- ZEN_ASSERT(B.BlockHash != IoHash::Zero);
-
- OplogContinerWriter.BeginObject();
- {
- OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash);
- OplogContinerWriter.BeginArray("chunks"sv);
- {
- for (const IoHash& RawHash : B.ChunkRawHashes)
- {
- OplogContinerWriter.AddHash(RawHash);
- }
- }
- OplogContinerWriter.EndArray(); // "chunks"
- }
- OplogContinerWriter.EndObject();
- continue;
- }
-
- ZEN_ASSERT(B.BlockHash == IoHash::Zero);
- OplogContinerWriter.BeginObject();
- {
- OplogContinerWriter.BeginArray("chunks"sv);
- {
- for (const IoHash& RawHash : B.ChunkRawHashes)
- {
- OplogContinerWriter.AddBinaryAttachment(RawHash);
- }
- }
- OplogContinerWriter.EndArray();
- }
- OplogContinerWriter.EndObject();
- }
- }
- OplogContinerWriter.EndArray(); // "blocks"sv
- OplogContinerWriter.BeginArray("chunkedfiles"sv);
- {
- for (const ChunkedFile& F : ChunkedFiles)
- {
- OplogContinerWriter.BeginObject();
- {
- OplogContinerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash);
- OplogContinerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize);
- OplogContinerWriter.BeginArray("chunks"sv);
- {
- for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes)
- {
- OplogContinerWriter.AddHash(RawHash);
- }
- }
- OplogContinerWriter.EndArray(); // "chunks"
- OplogContinerWriter.BeginArray("sequence"sv);
- {
- for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence)
- {
- OplogContinerWriter.AddInteger(ChunkIndex);
- }
- }
- OplogContinerWriter.EndArray(); // "sequence"
- }
- OplogContinerWriter.EndObject();
- }
- }
- OplogContinerWriter.EndArray(); // "chunkedfiles"sv
-
- OplogContinerWriter.BeginArray("chunks"sv);
- {
- for (const IoHash& AttachmentHash : LargeChunkHashes)
- {
- OplogContinerWriter.AddBinaryAttachment(AttachmentHash);
- }
- }
- OplogContinerWriter.EndArray(); // "chunks"
-
- OplogContainerObject = OplogContinerWriter.Save();
- }
- }
- return OplogContainerObject;
-}
-
-RemoteProjectStore::LoadContainerResult
-BuildContainer(CidStore& ChunkStore,
- ProjectStore::Project& Project,
- ProjectStore::Oplog& Oplog,
- size_t MaxBlockSize,
- size_t MaxChunksPerBlock,
- size_t MaxChunkEmbedSize,
- size_t ChunkFileSizeLimit,
- bool BuildBlocks,
- bool IgnoreMissingAttachments,
- bool AllowChunking,
- const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
- const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
- const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
- bool EmbedLooseFiles)
-{
- WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
-
- remotestore_impl::AsyncRemoteResult RemoteResult;
- CbObject ContainerObject = BuildContainer(ChunkStore,
- Project,
- Oplog,
- MaxBlockSize,
- MaxChunksPerBlock,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- BuildBlocks,
- IgnoreMissingAttachments,
- AllowChunking,
- {},
- WorkerPool,
- AsyncOnBlock,
- OnLargeAttachment,
- OnBlockChunks,
- EmbedLooseFiles,
- nullptr,
- RemoteResult);
- return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
-}
-
-RemoteProjectStore::Result
-SaveOplog(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- ProjectStore::Project& Project,
- ProjectStore::Oplog& Oplog,
- size_t MaxBlockSize,
- size_t MaxChunksPerBlock,
- size_t MaxChunkEmbedSize,
- size_t ChunkFileSizeLimit,
- bool EmbedLooseFiles,
- bool ForceUpload,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext)
-{
- using namespace std::literals;
-
- Stopwatch Timer;
-
- remotestore_impl::UploadInfo Info;
-
- WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
- WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
-
- const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
-
- std::filesystem::path AttachmentTempPath;
- if (RemoteStoreInfo.UseTempBlockFiles)
- {
- AttachmentTempPath = Oplog.TempPath();
- AttachmentTempPath.append(".pending");
- CreateDirectories(AttachmentTempPath);
- }
-
- remotestore_impl::AsyncRemoteResult RemoteResult;
- RwLock AttachmentsLock;
- std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
- std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks;
- tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles;
-
- auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
- ChunkBlockDescription&& Block) {
- std::filesystem::path BlockPath = AttachmentTempPath;
- BlockPath.append(Block.BlockHash.ToHexString());
- try
- {
- IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath);
- RwLock::ExclusiveLockScope __(AttachmentsLock);
- CreatedBlocks.insert({Block.BlockHash, {.Payload = std::move(BlockBuffer), .Block = std::move(Block)}});
- ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize()));
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- Ex.what(),
- "Unable to create temp block file");
- return;
- }
- };
-
- auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock,
- ChunkBlockDescription&& Block) {
- IoHash BlockHash = Block.BlockHash;
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block));
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return;
- }
- Info.AttachmentBlocksUploaded.fetch_add(1);
- Info.AttachmentBlockBytesUploaded.fetch_add(CompressedBlock.GetCompressedSize());
- ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
- };
-
- std::vector<std::vector<std::pair<IoHash, FetchChunkFunc>>> BlockChunks;
- auto OnBlockChunks = [&BlockChunks](std::vector<std::pair<IoHash, FetchChunkFunc>>&& Chunks) {
- BlockChunks.push_back({Chunks.begin(), Chunks.end()});
- ZEN_DEBUG("Found {} block chunks", Chunks.size());
- };
-
- auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments, &LooseLargeFiles](const IoHash& AttachmentHash,
- TGetAttachmentBufferFunc&& GetBufferFunc) {
- {
- RwLock::ExclusiveLockScope _(AttachmentsLock);
- LargeAttachments.insert(AttachmentHash);
- LooseLargeFiles.insert_or_assign(AttachmentHash, std::move(GetBufferFunc));
- }
- ZEN_DEBUG("Found attachment {}", AttachmentHash);
- };
-
- std::function<void(CompressedBuffer&&, ChunkBlockDescription &&)> OnBlock;
- if (RemoteStoreInfo.UseTempBlockFiles)
- {
- OnBlock = MakeTempBlock;
- }
- else
- {
- OnBlock = UploadBlock;
- }
-
- std::vector<ThinChunkBlockDescription> KnownBlocks;
-
- uint64_t TransferWallTimeMS = 0;
-
- RemoteProjectStore::CreateContainerResult ContainerResult = RemoteStore.CreateContainer();
- if (ContainerResult.ErrorCode)
- {
- RemoteProjectStore::Result Result = {.ErrorCode = ContainerResult.ErrorCode,
- .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
- .Text = fmt::format("Failed to create container for oplog '{}' ({}): {}",
- RemoteStoreInfo.ContainerName,
- ContainerResult.ErrorCode,
- ContainerResult.Reason)};
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return Result;
- }
-
- if (RemoteStoreInfo.CreateBlocks)
- {
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Fetching known blocks from '{}'", RemoteStoreInfo.Description));
- Stopwatch GetKnownBlocksTimer;
- RemoteProjectStore::GetKnownBlocksResult KnownBlocksResult = RemoteStore.GetKnownBlocks();
- TransferWallTimeMS += GetKnownBlocksTimer.GetElapsedTimeMs();
-
- if (KnownBlocksResult.ErrorCode == static_cast<int>(HttpResponseCode::NoContent))
- {
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("No known blocks in '{}', uploading all attachments", RemoteStoreInfo.Description));
- }
- else if (KnownBlocksResult.ErrorCode)
- {
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Failed to get known blocks from '{}' ({}): {}, uploading all attachments",
- RemoteStoreInfo.Description,
- KnownBlocksResult.ErrorCode,
- KnownBlocksResult.Reason));
- }
- else
- {
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Fetched {} known blocks from '{}' in {}",
- KnownBlocksResult.Blocks.size(),
- RemoteStoreInfo.Description,
- NiceTimeSpanMs(static_cast<uint64_t>(KnownBlocksResult.ElapsedSeconds * 1000.0))));
- KnownBlocks = std::move(KnownBlocksResult.Blocks);
- }
- }
-
- CbObject OplogContainerObject = BuildContainer(ChunkStore,
- Project,
- Oplog,
- MaxBlockSize,
- MaxChunksPerBlock,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- RemoteStoreInfo.CreateBlocks,
- IgnoreMissingAttachments,
- RemoteStoreInfo.AllowChunking,
- KnownBlocks,
- WorkerPool,
- OnBlock,
- OnLargeAttachment,
- OnBlockChunks,
- EmbedLooseFiles,
- OptionalContext,
- /* out */ RemoteResult);
- if (!RemoteResult.IsError())
- {
- Info.OplogSizeBytes = OplogContainerObject.GetSize();
-
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteProjectStore::Result Result = {.ErrorCode = 0,
- .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
- .Text = "Operation cancelled"};
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return Result;
- }
-
- uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num();
- uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num();
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Saving oplog container '{}' with {} attachments and {} blocks...",
- RemoteStoreInfo.ContainerName,
- ChunkCount,
- BlockCount));
- Stopwatch SaveContainerTimer;
- IoBuffer ContainerPayload = OplogContainerObject.GetBuffer().AsIoBuffer();
- ContainerPayload.SetContentType(ZenContentType::kCbObject);
- RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(std::move(ContainerPayload));
- TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs();
- if (ContainerSaveResult.ErrorCode)
- {
- RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container");
- RemoteProjectStore::Result Result = {
- .ErrorCode = 0,
- .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
- .Text = fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())};
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- return Result;
- }
- else
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Saved container '{}' in {}",
- RemoteStoreInfo.ContainerName,
- NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000.0))));
- }
-
- {
- Stopwatch UploadAttachmentsTimer;
- UploadAttachments(NetworkWorkerPool,
- ChunkStore,
- RemoteStore,
- LargeAttachments,
- BlockChunks,
- CreatedBlocks,
- LooseLargeFiles,
- ContainerSaveResult.Needs,
- ForceUpload,
- Info,
- RemoteResult,
- OptionalContext);
- TransferWallTimeMS += UploadAttachmentsTimer.GetElapsedTimeMs();
- }
-
- uint32_t Try = 0;
- while (!RemoteResult.IsError())
- {
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteProjectStore::Result Result = {.ErrorCode = 0,
- .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
- .Text = "Operation cancelled"};
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Text));
- return Result;
- }
-
- remotestore_impl::ReportMessage(OptionalContext, "Finalizing oplog container...");
- RemoteProjectStore::FinalizeResult ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash);
- if (ContainerFinalizeResult.ErrorCode)
- {
- RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text);
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Failed to finalize oplog container {} ({}): {}",
- ContainerSaveResult.RawHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
- return Result;
- }
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Finalized container '{}' in {}",
- RemoteStoreInfo.ContainerName,
- NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000.0))));
-
- if (ContainerFinalizeResult.Needs.empty())
- {
- break;
- }
-
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteProjectStore::Result Result = {.ErrorCode = 0,
- .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
- .Text = "Operation cancelled"};
- return Result;
- }
-
- const uint32_t MaxTries = 8;
- if (Try < MaxTries)
- {
- Try++;
-
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Finalize of container '{}' reported {} missing attachments. Uploading missing attachements. Try {}",
- RemoteStoreInfo.ContainerName,
- ContainerFinalizeResult.Needs.size(),
- Try));
-
- Stopwatch UploadAttachmentsTimer;
- UploadAttachments(NetworkWorkerPool,
- ChunkStore,
- RemoteStore,
- LargeAttachments,
- BlockChunks,
- CreatedBlocks,
- LooseLargeFiles,
- ContainerFinalizeResult.Needs,
- false,
- Info,
- RemoteResult,
- OptionalContext);
- TransferWallTimeMS += UploadAttachmentsTimer.GetElapsedTimeMs();
- }
- else
- {
- RemoteResult.SetError(
- gsl::narrow<int>(HttpResponseCode::InternalServerError),
- "Failed to save oplog container",
- fmt::format("Giving up finalize oplog container {} after {} retries, still getting reports of missing attachments",
- ContainerSaveResult.RawHash,
- ContainerFinalizeResult.Needs.size()));
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Failed to finalize oplog container container {} ({}): {}",
- ContainerSaveResult.RawHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- break;
- }
- }
-
- LooseLargeFiles.clear();
- CreatedBlocks.clear();
- }
- RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
-
- remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats());
-
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}",
- RemoteStoreInfo.ContainerName,
- RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
- NiceBytes(Info.OplogSizeBytes),
- Info.AttachmentBlocksUploaded.load(),
- NiceBytes(Info.AttachmentBlockBytesUploaded.load()),
- Info.AttachmentsUploaded.load(),
- NiceBytes(Info.AttachmentBytesUploaded.load()),
- remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
-
- return Result;
-};
-
-RemoteProjectStore::Result
-ParseOplogContainer(const CbObject& ContainerObject,
- const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
- const std::function<bool(const IoHash& RawHash)>& HasAttachment,
- const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
- CbObject& OutOplogSection,
- JobContext* OptionalContext)
-{
- using namespace std::literals;
-
- Stopwatch Timer;
-
- MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
- IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
- IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
-
- CbValidateError ValidateResult = CbValidateError::None;
- if (CbObject SectionObject = ValidateAndReadCompactBinaryObject(std::move(SectionPayload), ValidateResult);
- ValidateResult == CbValidateError::None && ContainerObject)
- {
- OutOplogSection = SectionObject;
- }
- else
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Failed to save oplog container: '{}' ('{}')", "Section has unexpected data type", ToString(ValidateResult)));
- return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
- Timer.GetElapsedTimeMs() / 1000.0,
- "Section has unexpected data type",
- "Failed to save oplog container"};
- }
- std::unordered_set<IoHash, IoHash::Hasher> OpsAttachments;
- {
- CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView();
- for (CbFieldView OpEntry : OpsArray)
- {
- OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); });
- }
- }
- {
- std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end());
- OnReferencedAttachments(ReferencedAttachments);
- }
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size()));
-
- CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
- for (CbFieldView ChunkedFileField : ChunkedFilesArray)
- {
- CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView();
- IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash();
- if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash)))
- {
- ChunkedInfo Chunked;
- Chunked.RawHash = RawHash;
- Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64();
- CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView();
- Chunked.ChunkHashes.reserve(ChunksArray.Num());
- for (CbFieldView ChunkField : ChunksArray)
- {
- const IoHash ChunkHash = ChunkField.AsHash();
- Chunked.ChunkHashes.emplace_back(ChunkHash);
- }
- OnReferencedAttachments(Chunked.ChunkHashes);
- OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end());
- CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView();
- Chunked.ChunkSequence.reserve(SequenceArray.Num());
- for (CbFieldView SequenceField : SequenceArray)
- {
- uint32_t SequenceIndex = SequenceField.AsUInt32();
- ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size());
- Chunked.ChunkSequence.push_back(SequenceIndex);
- }
- OnChunkedAttachment(Chunked);
- ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks",
- Chunked.RawHash,
- NiceBytes(Chunked.RawSize),
- Chunked.ChunkHashes.size());
- }
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
- .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
- .Reason = "Operation cancelled"};
- }
- }
-
- size_t NeedBlockCount = 0;
- CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
- for (CbFieldView BlockField : BlocksArray)
- {
- CbObjectView BlockView = BlockField.AsObjectView();
- IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
-
- CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
-
- std::vector<IoHash> NeededChunks;
- NeededChunks.reserve(ChunksArray.Num());
- if (BlockHash == IoHash::Zero)
- {
- for (CbFieldView ChunkField : ChunksArray)
- {
- IoHash ChunkHash = ChunkField.AsBinaryAttachment();
- if (OpsAttachments.contains(ChunkHash) && !HasAttachment(ChunkHash))
- {
- NeededChunks.emplace_back(ChunkHash);
- }
- }
- }
- else
- {
- for (CbFieldView ChunkField : ChunksArray)
- {
- const IoHash ChunkHash = ChunkField.AsHash();
- if (OpsAttachments.contains(ChunkHash) && !HasAttachment(ChunkHash))
- {
- NeededChunks.emplace_back(ChunkHash);
- }
- }
- }
-
- if (!NeededChunks.empty())
- {
- OnNeedBlock(BlockHash, std::move(NeededChunks));
- if (BlockHash != IoHash::Zero)
- {
- NeedBlockCount++;
- }
- }
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
- .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
- .Reason = "Operation cancelled"};
- }
- }
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num()));
-
- size_t NeedAttachmentCount = 0;
- CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
- for (CbFieldView LargeChunksField : LargeChunksArray)
- {
- IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
- if (OpsAttachments.contains(AttachmentHash) && !HasAttachment(AttachmentHash))
- {
- OnNeedAttachment(AttachmentHash);
- NeedAttachmentCount++;
- }
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
- .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
- .Reason = "Operation cancelled"};
- }
- };
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num()));
-
- return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0};
-}
-
-RemoteProjectStore::Result
-SaveOplogContainer(ProjectStore::Oplog& Oplog,
- const CbObject& ContainerObject,
- const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
- const std::function<bool(const IoHash& RawHash)>& HasAttachment,
- const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
- JobContext* OptionalContext)
-{
- using namespace std::literals;
-
- Stopwatch Timer;
- CbObject OplogSection;
- RemoteProjectStore::Result Result = ParseOplogContainer(ContainerObject,
- OnReferencedAttachments,
- HasAttachment,
- OnNeedBlock,
- OnNeedAttachment,
- OnChunkedAttachment,
- OplogSection,
- OptionalContext);
- if (Result.ErrorCode != 0)
- {
- return Result;
- }
- Result = remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return Result;
-}
-
-RemoteProjectStore::Result
-LoadOplog(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- ProjectStore::Oplog& Oplog,
- bool ForceDownload,
- bool IgnoreMissingAttachments,
- bool CleanOplog,
- JobContext* OptionalContext)
-{
- using namespace std::literals;
-
- remotestore_impl::DownloadInfo Info;
-
- Stopwatch Timer;
-
- WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
- WorkerThreadPool& NetworkWorkerPool = GetSmallWorkerPool(EWorkloadType::Background);
-
- std::unordered_set<IoHash, IoHash::Hasher> Attachments;
- uint64_t BlockCountToDownload = 0;
-
- RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName));
-
- uint64_t TransferWallTimeMS = 0;
-
- Stopwatch LoadContainerTimer;
- RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer();
- TransferWallTimeMS += LoadContainerTimer.GetElapsedTimeMs();
- if (LoadContainerResult.ErrorCode)
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode));
- return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode,
- .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
- .Reason = LoadContainerResult.Reason,
- .Text = LoadContainerResult.Text};
- }
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Loaded container in {} ({})",
- NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)),
- NiceBytes(LoadContainerResult.ContainerObject.GetSize())));
- Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize();
-
- remotestore_impl::AsyncRemoteResult RemoteResult;
- Latch AttachmentsDownloadLatch(1);
- Latch AttachmentsWriteLatch(1);
- std::atomic_size_t AttachmentCount = 0;
-
- Stopwatch LoadAttachmentsTimer;
- std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1;
-
- auto HasAttachment = [&Oplog, &ChunkStore, ForceDownload](const IoHash& RawHash) {
- if (ForceDownload)
- {
- return false;
- }
- if (ChunkStore.ContainsChunk(RawHash))
- {
- return true;
- }
- return false;
- };
-
- auto OnNeedBlock = [&RemoteStore,
- &ChunkStore,
- &NetworkWorkerPool,
- &WorkerPool,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &AttachmentCount,
- &RemoteResult,
- &BlockCountToDownload,
- &Info,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) {
- if (RemoteResult.IsError())
- {
- return;
- }
-
- BlockCountToDownload++;
- AttachmentCount.fetch_add(1);
- if (BlockHash == IoHash::Zero)
- {
- DownloadAndSaveBlockChunks(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- Chunks);
- }
- else
- {
- DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- BlockHash,
- Chunks,
- 3);
- }
- };
-
- auto OnNeedAttachment = [&RemoteStore,
- &Oplog,
- &ChunkStore,
- &NetworkWorkerPool,
- &WorkerPool,
- &AttachmentsDownloadLatch,
- &AttachmentsWriteLatch,
- &RemoteResult,
- &Attachments,
- &AttachmentCount,
- &LoadAttachmentsTimer,
- &DownloadStartMS,
- &Info,
- IgnoreMissingAttachments,
- OptionalContext](const IoHash& RawHash) {
- if (!Attachments.insert(RawHash).second)
- {
- return;
- }
- if (RemoteResult.IsError())
- {
- return;
- }
- AttachmentCount.fetch_add(1);
- DownloadAndSaveAttachment(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
- AttachmentsDownloadLatch,
- AttachmentsWriteLatch,
- RemoteResult,
- Info,
- LoadAttachmentsTimer,
- DownloadStartMS,
- RawHash);
- };
-
- std::vector<ChunkedInfo> FilesToDechunk;
- auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) {
- if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash))
- {
- FilesToDechunk.push_back(Chunked);
- }
- };
-
- auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); };
-
- // Make sure we retain any attachments we download before writing the oplog
- Oplog.EnableUpdateCapture();
- auto _ = MakeGuard([&Oplog]() { Oplog.DisableUpdateCapture(); });
-
- CbObject OplogSection;
- RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject,
- OnReferencedAttachments,
- HasAttachment,
- OnNeedBlock,
- OnNeedAttachment,
- OnChunkedAttachment,
- OplogSection,
- OptionalContext);
- if (Result.ErrorCode != 0)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- }
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
- Attachments.size(),
- BlockCountToDownload,
- FilesToDechunk.size()));
-
- AttachmentsDownloadLatch.CountDown();
- while (!AttachmentsDownloadLatch.Wait(1000))
- {
- ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- }
- }
- uint64_t PartialTransferWallTimeMS = TransferWallTimeMS;
- if (DownloadStartMS != (uint64_t)-1)
- {
- PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
- }
- remotestore_impl::ReportProgress(
- OptionalContext,
- "Loading attachments"sv,
- fmt::format("{} remaining. {}", Remaining, remotestore_impl::GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
- AttachmentCount.load(),
- Remaining);
- }
- if (DownloadStartMS != (uint64_t)-1)
- {
- TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
- }
-
- if (AttachmentCount.load() > 0)
- {
- remotestore_impl::ReportProgress(OptionalContext,
- "Loading attachments"sv,
- fmt::format("{}", remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)),
- AttachmentCount.load(),
- 0);
- }
-
- AttachmentsWriteLatch.CountDown();
- while (!AttachmentsWriteLatch.Wait(1000))
- {
- ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- }
- }
- remotestore_impl::ReportProgress(OptionalContext,
- "Writing attachments"sv,
- fmt::format("{} remaining.", Remaining),
- AttachmentCount.load(),
- Remaining);
- }
-
- if (AttachmentCount.load() > 0)
- {
- remotestore_impl::ReportProgress(OptionalContext, "Writing attachments", ""sv, AttachmentCount.load(), 0);
- }
-
- if (Result.ErrorCode == 0)
- {
- if (!FilesToDechunk.empty())
- {
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size()));
-
- Latch DechunkLatch(1);
- std::filesystem::path TempFilePath = Oplog.TempPath();
- for (const ChunkedInfo& Chunked : FilesToDechunk)
- {
- std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString();
- DechunkLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&ChunkStore,
- &DechunkLatch,
- TempFileName,
- &Chunked,
- &RemoteResult,
- IgnoreMissingAttachments,
- &Info,
- OptionalContext]() {
- auto _ = MakeGuard([&DechunkLatch, &TempFileName] {
- std::error_code Ec;
- if (IsFile(TempFileName, Ec))
- {
- RemoveFile(TempFileName, Ec);
- if (Ec)
- {
- ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message());
- }
- }
- DechunkLatch.CountDown();
- });
- try
- {
- if (RemoteResult.IsError())
- {
- return;
- }
- Stopwatch Timer;
- IoBuffer TmpBuffer;
- {
- BasicFile TmpFile;
- TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate);
- {
- BasicFileWriter TmpWriter(TmpFile, 64u * 1024u);
-
- uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
- BLAKE3Stream HashingStream;
- for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
- {
- const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex];
- IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
- if (!Chunk)
- {
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
-
- // We only add 1 as the resulting missing count will be 1 for the dechunked file
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(
- gsl::narrow<int>(HttpResponseCode::NotFound),
- "Missing chunk",
- fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
- }
- return;
- }
- CompositeBuffer Decompressed =
- CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite();
- for (const SharedBuffer& Segment : Decompressed.GetSegments())
- {
- MemoryView SegmentData = Segment.GetView();
- HashingStream.Append(SegmentData);
- TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset);
- Offset += SegmentData.GetSize();
- }
- }
- BLAKE3 RawHash = HashingStream.GetHash();
- ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash));
- UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash);
- TmpWriter.Write(Header.GetData(), Header.GetSize(), 0);
- }
- TmpFile.Close();
- TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName);
- }
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize());
- Info.AttachmentsStored.fetch_add(1);
- }
-
- ZEN_INFO("Dechunked attachment {} ({}) in {}",
- Chunked.RawHash,
- NiceBytes(Chunked.RawSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
- catch (const std::exception& Ex)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to dechunck file {}", Chunked.RawHash),
- Ex.what());
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- }
- DechunkLatch.CountDown();
-
- while (!DechunkLatch.Wait(1000))
- {
- ptrdiff_t Remaining = DechunkLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- if (!RemoteResult.IsError())
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- }
- }
- remotestore_impl::ReportProgress(OptionalContext,
- "Dechunking attachments"sv,
- fmt::format("{} remaining...", Remaining),
- FilesToDechunk.size(),
- Remaining);
- }
- remotestore_impl::ReportProgress(OptionalContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0);
- }
- Result = RemoteResult.ConvertResult();
- }
-
- if (Result.ErrorCode == 0)
- {
- if (CleanOplog)
- {
- if (!Oplog.Reset())
- {
- Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError),
- .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
- .Reason = fmt::format("Failed to clean existing oplog '{}'", Oplog.OplogId())};
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason));
- }
- }
- if (Result.ErrorCode == 0)
- {
- remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext);
- }
- }
-
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
-
- remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats());
-
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {} {}",
- RemoteStoreInfo.ContainerName,
- Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
- NiceBytes(Info.OplogSizeBytes),
- Info.AttachmentBlocksDownloaded.load(),
- NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
- Info.AttachmentsDownloaded.load(),
- NiceBytes(Info.AttachmentBytesDownloaded.load()),
- Info.AttachmentsStored.load(),
- NiceBytes(Info.AttachmentBytesStored.load()),
- Info.MissingAttachmentCount.load(),
- remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
-
- return Result;
-}
-
-//////////////////////////////////////////////////////////////////////////
-// These are here to avoid vtable leakage
-
-RemoteProjectStore::RemoteProjectStore()
-{
-}
-
-RemoteProjectStore::~RemoteProjectStore()
-{
-}
-
-#if ZEN_WITH_TESTS
-
-namespace testutils {
- using namespace std::literals;
-
- static std::string OidAsString(const Oid& Id)
- {
- StringBuilder<25> OidStringBuilder;
- Id.ToString(OidStringBuilder);
- return OidStringBuilder.ToString();
- }
-
- static CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
- {
- CbPackage Package;
- CbObjectWriter Object;
- Object << "key"sv << OidAsString(Id);
- if (!Attachments.empty())
- {
- Object.BeginArray("bulkdata");
- for (const auto& Attachment : Attachments)
- {
- CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
- Object.BeginObject();
- Object << "id"sv << Attachment.first;
- Object << "type"sv
- << "Standard"sv;
- Object << "data"sv << Attach;
- Object.EndObject();
-
- Package.AddAttachment(Attach);
- }
- Object.EndArray();
- }
- Package.SetObject(Object.Save());
- return Package;
- };
-
- static std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(
- const std::span<const size_t>& Sizes,
- OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
- uint64_t BlockSize = 0)
- {
- std::vector<std::pair<Oid, CompressedBuffer>> Result;
- Result.reserve(Sizes.size());
- for (size_t Size : Sizes)
- {
- CompressedBuffer Compressed =
- CompressedBuffer::Compress(SharedBuffer(CreateSemiRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel, BlockSize);
- Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
- }
- return Result;
- }
-
-} // namespace testutils
-
-struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse
-{
- static const bool ForceDisableBlocks = true;
- static const bool ForceEnableTempBlocks = false;
-};
-
-struct ExportForceDisableBlocksFalse_ForceTempBlocksFalse
-{
- static const bool ForceDisableBlocks = false;
- static const bool ForceEnableTempBlocks = false;
-};
-
-struct ExportForceDisableBlocksFalse_ForceTempBlocksTrue
-{
- static const bool ForceDisableBlocks = false;
- static const bool ForceEnableTempBlocks = true;
-};
-
-TEST_CASE_TEMPLATE("project.store.export",
- Settings,
- ExportForceDisableBlocksTrue_ForceTempBlocksFalse,
- ExportForceDisableBlocksFalse_ForceTempBlocksFalse,
- ExportForceDisableBlocksFalse_ForceTempBlocksTrue)
-{
- using namespace std::literals;
- using namespace testutils;
-
- ScopedTemporaryDirectory TempDir;
- ScopedTemporaryDirectory ExportDir;
-
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
-
- std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
- std::filesystem::path RootDir = TempDir.Path() / "root";
- std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
- std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
- std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
-
- Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv,
- "proj1"sv,
- RootDir.string(),
- EngineRootDir.string(),
- ProjectRootDir.string(),
- ProjectFilePath.string()));
- Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog1", {});
- CHECK(Oplog);
-
- Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
- Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
- Oplog->AppendNewOplogEntry(
- CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
- Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
- Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(
- Oid::NewOid(),
- CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None)));
-
- FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024,
- .MaxChunksPerBlock = 1000,
- .MaxChunkEmbedSize = 32 * 1024u,
- .ChunkFileSizeLimit = 64u * 1024u},
- /*.FolderPath = */ ExportDir.Path(),
- /*.Name = */ std::string("oplog1"),
- /*OptionalBaseName = */ std::string(),
- /*.ForceDisableBlocks = */ Settings::ForceDisableBlocks,
- /*.ForceEnableTempBlocks = */ Settings::ForceEnableTempBlocks};
- std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options);
- RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
-
- RemoteProjectStore::Result ExportResult = SaveOplog(CidStore,
- *RemoteStore,
- *Project.Get(),
- *Oplog,
- Options.MaxBlockSize,
- Options.MaxChunksPerBlock,
- Options.MaxChunkEmbedSize,
- Options.ChunkFileSizeLimit,
- true,
- false,
- false,
- nullptr);
-
- CHECK(ExportResult.ErrorCode == 0);
-
- Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {});
- CHECK(OplogImport);
-
- RemoteProjectStore::Result ImportResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- /*Force*/ false,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ false,
- nullptr);
- CHECK(ImportResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- /*Force*/ true,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ false,
- nullptr);
- CHECK(ImportForceResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- /*Force*/ false,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ true,
- nullptr);
- CHECK(ImportCleanResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- /*Force*/ true,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ true,
- nullptr);
- CHECK(ImportForceCleanResult.ErrorCode == 0);
-}
-
-#endif // ZEN_WITH_TESTS
-
-void
-remoteprojectstore_forcelink()
-{
-}
-
-} // namespace zen
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
deleted file mode 100644
index 11cc58e4d..000000000
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ /dev/null
@@ -1,178 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/jobqueue.h>
-#include <zenstore/projectstore.h>
-
-#include <zenutil/chunkblock.h>
-
-#include <unordered_set>
-
-namespace zen {
-
-class CidStore;
-class WorkerThreadPool;
-struct ChunkedInfo;
-
-class RemoteProjectStore
-{
-public:
- struct Result
- {
- int32_t ErrorCode{};
- double ElapsedSeconds{};
- std::string Reason;
- std::string Text;
- };
-
- struct CreateContainerResult : public Result
- {
- };
-
- struct SaveResult : public Result
- {
- std::unordered_set<IoHash, IoHash::Hasher> Needs;
- IoHash RawHash;
- };
-
- struct FinalizeResult : public Result
- {
- std::unordered_set<IoHash, IoHash::Hasher> Needs;
- };
-
- struct SaveAttachmentResult : public Result
- {
- };
-
- struct SaveAttachmentsResult : public Result
- {
- };
-
- struct LoadAttachmentResult : public Result
- {
- IoBuffer Bytes;
- };
-
- struct LoadContainerResult : public Result
- {
- CbObject ContainerObject;
- };
-
- struct LoadAttachmentsResult : public Result
- {
- std::vector<std::pair<IoHash, CompressedBuffer>> Chunks;
- };
-
- struct GetKnownBlocksResult : public Result
- {
- std::vector<ThinChunkBlockDescription> Blocks;
- };
-
- struct RemoteStoreInfo
- {
- bool CreateBlocks;
- bool UseTempBlockFiles;
- bool AllowChunking;
- std::string ContainerName;
- std::string Description;
- };
-
- struct Stats
- {
- std::uint64_t m_SentBytes;
- std::uint64_t m_ReceivedBytes;
- std::uint64_t m_RequestTimeNS;
- std::uint64_t m_RequestCount;
- std::uint64_t m_PeakSentBytes;
- std::uint64_t m_PeakReceivedBytes;
- std::uint64_t m_PeakBytesPerSec;
- };
-
- RemoteProjectStore();
- virtual ~RemoteProjectStore();
-
- virtual RemoteStoreInfo GetInfo() const = 0;
- virtual Stats GetStats() const = 0;
-
- virtual CreateContainerResult CreateContainer() = 0;
- virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0;
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&& Block) = 0;
- virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0;
- virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0;
-
- virtual LoadContainerResult LoadContainer() = 0;
- virtual GetKnownBlocksResult GetKnownBlocks() = 0;
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0;
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0;
-};
-
-struct RemoteStoreOptions
-{
- static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u;
- static const size_t DefaultMaxChunksPerBlock = 4u * 1000u;
- static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u;
- static const size_t DefaultChunkFileSizeLimit = 256u * 1024u * 1024u;
-
- size_t MaxBlockSize = DefaultMaxBlockSize;
- size_t MaxChunksPerBlock = DefaultMaxChunksPerBlock;
- size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize;
- size_t ChunkFileSizeLimit = DefaultChunkFileSizeLimit;
-};
-
-typedef std::function<IoBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc;
-
-RemoteProjectStore::LoadContainerResult BuildContainer(
- CidStore& ChunkStore,
- ProjectStore::Project& Project,
- ProjectStore::Oplog& Oplog,
- size_t MaxBlockSize,
- size_t MaxChunksPerBlock,
- size_t MaxChunkEmbedSize,
- size_t ChunkFileSizeLimit,
- bool BuildBlocks,
- bool IgnoreMissingAttachments,
- bool AllowChunking,
- const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
- const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
- const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
- bool EmbedLooseFiles);
-
-class JobContext;
-
-RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog,
- const CbObject& ContainerObject,
- const std::function<void(std::span<IoHash> RawHashes)>& OnReferencedAttachments,
- const std::function<bool(const IoHash& RawHash)>& HasAttachment,
- const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- const std::function<void(const ChunkedInfo& Chunked)>& OnChunkedAttachment,
- JobContext* OptionalContext);
-
-RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- ProjectStore::Project& Project,
- ProjectStore::Oplog& Oplog,
- size_t MaxBlockSize,
- size_t MaxChunksPerBlock,
- size_t MaxChunkEmbedSize,
- size_t ChunkFileSizeLimit,
- bool EmbedLooseFiles,
- bool ForceUpload,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext);
-
-RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- ProjectStore::Oplog& Oplog,
- bool ForceDownload,
- bool IgnoreMissingAttachments,
- bool CleanOplog,
- JobContext* OptionalContext);
-
-std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject);
-std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes);
-
-void remoteprojectstore_forcelink();
-
-} // namespace zen
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
deleted file mode 100644
index 21ddd6cff..000000000
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ /dev/null
@@ -1,336 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "zenremoteprojectstore.h"
-
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
-#include <zencore/compositebuffer.h>
-#include <zencore/fmtutils.h>
-#include <zencore/stream.h>
-#include <zenhttp/httpclient.h>
-#include <zenhttp/packageformat.h>
-
-namespace zen {
-
-using namespace std::literals;
-
-class ZenRemoteStore : public RemoteProjectStore
-{
-public:
- ZenRemoteStore(std::string_view HostAddress,
- std::string_view Project,
- std::string_view Oplog,
- const std::filesystem::path& TempFilePath)
- : m_HostAddress(HostAddress)
- , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress))
- , m_Project(Project)
- , m_Oplog(Oplog)
- , m_TempFilePath(TempFilePath)
- , m_Client(m_ProjectStoreUrl, {.LogCategory = "ZenRemoteStore", .RetryCount = 2})
- {
- }
-
- virtual RemoteStoreInfo GetInfo() const override
- {
- return {.CreateBlocks = false,
- .UseTempBlockFiles = false,
- .AllowChunking = false,
- .ContainerName = fmt::format("{}/{}", m_Project, m_Oplog),
- .Description = fmt::format("[zen] {}. SessionId: {}"sv, m_HostAddress, m_Client.GetSessionId())};
- }
-
- virtual Stats GetStats() const override
- {
- return {.m_SentBytes = m_SentBytes.load(),
- .m_ReceivedBytes = m_ReceivedBytes.load(),
- .m_RequestTimeNS = m_RequestTimeNS.load(),
- .m_RequestCount = m_RequestCount.load(),
- .m_PeakSentBytes = m_PeakSentBytes.load(),
- .m_PeakReceivedBytes = m_PeakReceivedBytes.load(),
- .m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
- }
-
- virtual CreateContainerResult CreateContainer() override
- {
- // Nothing to do here
- return {};
- }
-
- virtual SaveResult SaveContainer(const IoBuffer& Payload) override
- {
- std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog);
- HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCbObject);
- AddStats(Response);
- SaveResult Result = SaveResult{ConvertResult(Response)};
-
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'",
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- Result.Reason);
- return Result;
- }
- CbObject ResponseObject = Response.AsObject();
- if (!ResponseObject)
- {
- Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog);
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- return Result;
- }
- CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView();
- for (CbFieldView FieldView : NeedsArray)
- {
- IoHash ChunkHash = FieldView.AsHash();
- Result.Needs.insert(ChunkHash);
- }
-
- Result.RawHash = IoHash::HashBuffer(Payload);
- return Result;
- }
-
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override
- {
- std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
- HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary);
- AddStats(Response);
- SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'",
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- RawHash,
- Result.Reason);
- }
- return Result;
- }
-
- virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
- {
- CbPackage RequestPackage;
- {
- CbObjectWriter RequestWriter;
- RequestWriter.AddString("method"sv, "putchunks"sv);
- RequestWriter.BeginArray("chunks"sv);
- {
- for (const SharedBuffer& Chunk : Chunks)
- {
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize);
- ZEN_ASSERT(Compressed);
- RequestWriter.AddHash(RawHash);
- RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash));
- }
- }
- RequestWriter.EndArray(); // "chunks"
- RequestPackage.SetObject(RequestWriter.Save());
- }
- std::string SaveRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog);
- HttpClient::Response Response = m_Client.Post(SaveRequest, RequestPackage);
- AddStats(Response);
-
- SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed saving {} oplog attachments to {}/{}/{}. Reason: '{}'",
- Chunks.size(),
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- Result.Reason);
- }
- return Result;
- }
-
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
- {
- std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog);
-
- CbObject Request;
- {
- CbObjectWriter RequestWriter;
- RequestWriter.AddString("method"sv, "getchunks"sv);
- RequestWriter.BeginObject("Request"sv);
- {
- RequestWriter.BeginArray("Chunks"sv);
- {
- for (const IoHash& RawHash : RawHashes)
- {
- RequestWriter.BeginObject();
- {
- RequestWriter.AddHash("RawHash", RawHash);
- }
- RequestWriter.EndObject();
- }
- }
- RequestWriter.EndArray(); // "chunks"
- }
- RequestWriter.EndObject();
- Request = RequestWriter.Save();
- }
-
- HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage));
- AddStats(Response);
-
- LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'",
- RawHashes.size(),
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- Result.Reason);
- }
- else
- {
- CbPackage Package = Response.AsPackage();
- std::span<const CbAttachment> Attachments = Package.GetAttachments();
- Result.Chunks.reserve(Attachments.size());
- for (const CbAttachment& Attachment : Attachments)
- {
- Result.Chunks.emplace_back(
- std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()});
- }
- }
- return Result;
- };
-
- virtual FinalizeResult FinalizeContainer(const IoHash&) override { return FinalizeResult{Result{}}; }
-
- virtual LoadContainerResult LoadContainer() override
- {
- std::string LoadRequest = fmt::format("/{}/oplog/{}/load"sv, m_Project, m_Oplog);
-
- HttpClient::Response Response = m_Client.Get(LoadRequest, HttpClient::Accept(ZenContentType::kCbObject));
- AddStats(Response);
-
- LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}. Reason: '{}'",
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- Result.Reason);
- }
- else
- {
- Result.ContainerObject = Response.AsObject();
- if (!Result.ContainerObject)
- {
- Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog);
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- }
- }
- return Result;
- }
-
- virtual GetKnownBlocksResult GetKnownBlocks() override
- {
- return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
- }
-
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
- {
- std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
- HttpClient::Response Response =
- m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary));
- AddStats(Response);
-
- LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)};
- if (!Result.ErrorCode)
- {
- Result.Bytes = Response.ResponsePayload;
- Result.Bytes.MakeOwned();
- }
- if (!Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'",
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- RawHash,
- Result.Reason);
- }
- return Result;
- }
-
-private:
- void AddStats(const HttpClient::Response& Result)
- {
- m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.UploadedBytes));
- m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.DownloadedBytes));
- m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000));
- SetAtomicMax(m_PeakSentBytes, Result.UploadedBytes);
- SetAtomicMax(m_PeakReceivedBytes, Result.DownloadedBytes);
- if (Result.ElapsedSeconds > 0.0)
- {
- uint64_t BytesPerSec = static_cast<uint64_t>((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds);
- SetAtomicMax(m_PeakBytesPerSec, BytesPerSec);
- }
-
- m_RequestCount.fetch_add(1);
- }
-
- static Result ConvertResult(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
- {
- if (Response.Error)
- {
- return {.ErrorCode = Response.Error.value().ErrorCode,
- .ElapsedSeconds = Response.ElapsedSeconds,
- .Reason = Response.ErrorMessage(""),
- .Text = Response.ToText()};
- }
- if (!Response.IsSuccess())
- {
- return {.ErrorCode = static_cast<int32_t>(Response.StatusCode),
- .ElapsedSeconds = Response.ElapsedSeconds,
- .Reason = Response.ErrorMessage(ErrorPrefix),
- .Text = Response.ToText()};
- }
- return {.ErrorCode = 0, .ElapsedSeconds = Response.ElapsedSeconds};
- }
-
- const std::string m_HostAddress;
- const std::string m_ProjectStoreUrl;
- const std::string m_Project;
- const std::string m_Oplog;
- const std::filesystem::path m_TempFilePath;
-
- HttpClient m_Client;
-
- std::atomic_uint64_t m_SentBytes = {};
- std::atomic_uint64_t m_ReceivedBytes = {};
- std::atomic_uint64_t m_RequestTimeNS = {};
- std::atomic_uint64_t m_RequestCount = {};
- std::atomic_uint64_t m_PeakSentBytes = {};
- std::atomic_uint64_t m_PeakReceivedBytes = {};
- std::atomic_uint64_t m_PeakBytesPerSec = {};
-};
-
-std::shared_ptr<RemoteProjectStore>
-CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath)
-{
- std::string Url = Options.Url;
- if (Url.find("://"sv) == std::string::npos)
- {
- // Assume http URL
- Url = fmt::format("http://{}"sv, Url);
- }
- std::shared_ptr<RemoteProjectStore> RemoteStore =
- std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, TempFilePath);
- return RemoteStore;
-}
-
-} // namespace zen
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.h b/src/zenserver/projectstore/zenremoteprojectstore.h
deleted file mode 100644
index 7c81a597d..000000000
--- a/src/zenserver/projectstore/zenremoteprojectstore.h
+++ /dev/null
@@ -1,18 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "remoteprojectstore.h"
-
-namespace zen {
-
-struct ZenRemoteStoreOptions : RemoteStoreOptions
-{
- std::string Url;
- std::string ProjectId;
- std::string OplogId;
-};
-
-std::shared_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath);
-
-} // namespace zen