aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/projectstore
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'src/zenserver/projectstore')
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.cpp235
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.h19
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp244
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.h26
-rw-r--r--src/zenserver/projectstore/projectstore.cpp4082
-rw-r--r--src/zenserver/projectstore/projectstore.h372
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp1036
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h111
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp341
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.h18
10 files changed, 6484 insertions, 0 deletions
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp
new file mode 100644
index 000000000..d7a34a6c2
--- /dev/null
+++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp
@@ -0,0 +1,235 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "fileremoteprojectstore.h"
+
+#include <zencore/compress.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/timer.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+class LocalExportProjectStore : public RemoteProjectStore
+{
+public:
+ LocalExportProjectStore(std::string_view Name,
+ const std::filesystem::path& FolderPath,
+ bool ForceDisableBlocks,
+ bool ForceEnableTempBlocks)
+ : m_Name(Name)
+ , m_OutputPath(FolderPath)
+ {
+ if (ForceDisableBlocks)
+ {
+ m_EnableBlocks = false;
+ }
+ if (ForceEnableTempBlocks)
+ {
+ m_UseTempBlocks = true;
+ }
+ }
+
+ virtual RemoteStoreInfo GetInfo() const override
+ {
+ return {.CreateBlocks = m_EnableBlocks,
+ .UseTempBlockFiles = m_UseTempBlocks,
+ .Description = fmt::format("[file] {}"sv, m_OutputPath)};
+ }
+
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ {
+ Stopwatch Timer;
+ SaveResult Result;
+
+ {
+ CbObject ContainerObject = LoadCompactBinaryObject(Payload);
+
+ ContainerObject.IterateAttachments([&](CbFieldView FieldView) {
+ IoHash AttachmentHash = FieldView.AsBinaryAttachment();
+ std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash);
+ if (!std::filesystem::exists(AttachmentPath))
+ {
+ Result.Needs.insert(AttachmentHash);
+ }
+ });
+ }
+
+ std::filesystem::path ContainerPath = m_OutputPath;
+ ContainerPath.append(m_Name);
+
+ CreateDirectories(m_OutputPath);
+ BasicFile ContainerFile;
+ ContainerFile.Open(ContainerPath, BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ ContainerFile.WriteAll(Payload, Ec);
+ if (Ec)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = Ec.message();
+ }
+ Result.RawHash = IoHash::HashBuffer(Payload);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
+ {
+ Stopwatch Timer;
+ SaveAttachmentResult Result;
+ std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
+ if (!std::filesystem::exists(ChunkPath))
+ {
+ try
+ {
+ CreateDirectories(ChunkPath.parent_path());
+
+ BasicFile ChunkFile;
+ ChunkFile.Open(ChunkPath, BasicFile::Mode::kTruncate);
+ size_t Offset = 0;
+ for (const SharedBuffer& Segment : Payload.GetSegments())
+ {
+ ChunkFile.Write(Segment.GetView(), Offset);
+ Offset += Segment.GetSize();
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = Ex.what();
+ }
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
+ {
+ Stopwatch Timer;
+
+ for (const SharedBuffer& Chunk : Chunks)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
+ SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash());
+ if (ChunkResult.ErrorCode)
+ {
+ ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return SaveAttachmentsResult{ChunkResult};
+ }
+ }
+ SaveAttachmentsResult Result;
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual Result FinalizeContainer(const IoHash&) override { return {}; }
+
+ virtual LoadContainerResult LoadContainer() override
+ {
+ Stopwatch Timer;
+ LoadContainerResult Result;
+ std::filesystem::path ContainerPath = m_OutputPath;
+ ContainerPath.append(m_Name);
+ if (!std::filesystem::is_regular_file(ContainerPath))
+ {
+ Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
+ Result.Reason = fmt::format("The file {} does not exist"sv, ContainerPath.string());
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ IoBuffer ContainerPayload;
+ {
+ BasicFile ContainerFile;
+ ContainerFile.Open(ContainerPath, BasicFile::Mode::kRead);
+ ContainerPayload = ContainerFile.ReadAll();
+ }
+ Result.ContainerObject = LoadCompactBinaryObject(ContainerPayload);
+ if (!Result.ContainerObject)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("The file {} is not formatted as a compact binary object"sv, ContainerPath.string());
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ {
+ Stopwatch Timer;
+ LoadAttachmentResult Result;
+ std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
+ if (!std::filesystem::is_regular_file(ChunkPath))
+ {
+ Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
+ Result.Reason = fmt::format("The file {} does not exist"sv, ChunkPath.string());
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ {
+ BasicFile ChunkFile;
+ ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead);
+ Result.Bytes = ChunkFile.ReadAll();
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ {
+ Stopwatch Timer;
+ LoadAttachmentsResult Result;
+ for (const IoHash& Hash : RawHashes)
+ {
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
+ if (ChunkResult.ErrorCode)
+ {
+ ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return LoadAttachmentsResult{ChunkResult};
+ }
+ ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000)));
+ Result.Chunks.emplace_back(
+ std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))});
+ }
+ return Result;
+ }
+
+private:
+ std::filesystem::path GetAttachmentPath(const IoHash& RawHash) const
+ {
+ ExtendablePathBuilder<128> ShardedPath;
+ ShardedPath.Append(m_OutputPath.c_str());
+ ExtendableStringBuilder<64> HashString;
+ RawHash.ToHexString(HashString);
+ const char* str = HashString.c_str();
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str, str + 3);
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str + 3, str + 5);
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str + 5, str + 40);
+
+ return ShardedPath.ToPath();
+ }
+
+ const std::string m_Name;
+ const std::filesystem::path m_OutputPath;
+ bool m_EnableBlocks = true;
+ bool m_UseTempBlocks = false;
+};
+
+std::unique_ptr<RemoteProjectStore>
+CreateFileRemoteStore(const FileRemoteStoreOptions& Options)
+{
+ std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<LocalExportProjectStore>(Options.Name,
+ std::filesystem::path(Options.FolderPath),
+ Options.ForceDisableBlocks,
+ Options.ForceEnableTempBlocks);
+ return RemoteStore;
+}
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.h b/src/zenserver/projectstore/fileremoteprojectstore.h
new file mode 100644
index 000000000..68d1eb71e
--- /dev/null
+++ b/src/zenserver/projectstore/fileremoteprojectstore.h
@@ -0,0 +1,19 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "remoteprojectstore.h"
+
+namespace zen {
+
+struct FileRemoteStoreOptions : RemoteStoreOptions
+{
+ std::filesystem::path FolderPath;
+ std::string Name;
+ bool ForceDisableBlocks;
+ bool ForceEnableTempBlocks;
+};
+
+std::unique_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options);
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
new file mode 100644
index 000000000..66cf3c4f8
--- /dev/null
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -0,0 +1,244 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "jupiterremoteprojectstore.h"
+
+#include <zencore/compress.h>
+#include <zencore/fmtutils.h>
+
+#include <auth/authmgr.h>
+#include <upstream/jupiter.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+class JupiterRemoteStore : public RemoteProjectStore
+{
+public:
+ JupiterRemoteStore(Ref<CloudCacheClient>&& CloudClient,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& Key,
+ bool ForceDisableBlocks,
+ bool ForceDisableTempBlocks)
+ : m_CloudClient(CloudClient)
+ , m_Namespace(Namespace)
+ , m_Bucket(Bucket)
+ , m_Key(Key)
+ {
+ if (ForceDisableBlocks)
+ {
+ m_EnableBlocks = false;
+ }
+ if (ForceDisableTempBlocks)
+ {
+ m_UseTempBlocks = false;
+ }
+ }
+
+ virtual RemoteStoreInfo GetInfo() const override
+ {
+ return {.CreateBlocks = m_EnableBlocks,
+ .UseTempBlockFiles = m_UseTempBlocks,
+ .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key)};
+ }
+
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ {
+ const int32_t MaxAttempts = 3;
+ PutRefResult Result;
+ {
+ CloudCacheSession Session(m_CloudClient.Get());
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
+ }
+ }
+
+ return SaveResult{ConvertResult(Result), {Result.Needs.begin(), Result.Needs.end()} /*, {}*/, IoHash::HashBuffer(Payload)};
+ }
+
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
+ {
+ const int32_t MaxAttempts = 3;
+ CloudCacheResult Result;
+ {
+ CloudCacheSession Session(m_CloudClient.Get());
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
+ }
+ }
+
+ return SaveAttachmentResult{ConvertResult(Result)};
+ }
+
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
+ {
+ SaveAttachmentsResult Result;
+ for (const SharedBuffer& Chunk : Chunks)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
+ SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash());
+ if (ChunkResult.ErrorCode)
+ {
+ return SaveAttachmentsResult{ChunkResult};
+ }
+ }
+ return Result;
+ }
+
+ virtual Result FinalizeContainer(const IoHash& RawHash) override
+ {
+ const int32_t MaxAttempts = 3;
+ CloudCacheResult Result;
+ {
+ CloudCacheSession Session(m_CloudClient.Get());
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
+ }
+ }
+ return ConvertResult(Result);
+ }
+
+ virtual LoadContainerResult LoadContainer() override
+ {
+ const int32_t MaxAttempts = 3;
+ CloudCacheResult Result;
+ {
+ CloudCacheSession Session(m_CloudClient.Get());
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.GetRef(m_Namespace, m_Bucket, m_Key, ZenContentType::kCbObject);
+ }
+ }
+
+ if (Result.ErrorCode || !Result.Success)
+ {
+ return LoadContainerResult{ConvertResult(Result)};
+ }
+
+ CbObject ContainerObject = LoadCompactBinaryObject(Result.Response);
+ if (!ContainerObject)
+ {
+ return LoadContainerResult{
+ RemoteProjectStore::Result{
+ .ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Reason = fmt::format("The ref {}/{}/{} is not formatted as a compact binary object"sv, m_Namespace, m_Bucket, m_Key)},
+ std::move(ContainerObject)};
+ }
+
+ return LoadContainerResult{ConvertResult(Result), std::move(ContainerObject)};
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ {
+ const int32_t MaxAttempts = 3;
+ CloudCacheResult Result;
+ {
+ CloudCacheSession Session(m_CloudClient.Get());
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ {
+ Result = Session.GetCompressedBlob(m_Namespace, RawHash);
+ }
+ }
+ return LoadAttachmentResult{ConvertResult(Result), std::move(Result.Response)};
+ }
+
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ {
+ LoadAttachmentsResult Result;
+ for (const IoHash& Hash : RawHashes)
+ {
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
+ if (ChunkResult.ErrorCode)
+ {
+ return LoadAttachmentsResult{ChunkResult};
+ }
+ ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000)));
+ Result.Chunks.emplace_back(
+ std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))});
+ }
+ return Result;
+ }
+
+private:
+ static Result ConvertResult(const CloudCacheResult& Response)
+ {
+ std::string Text;
+ int32_t ErrorCode = 0;
+ if (Response.ErrorCode != 0)
+ {
+ ErrorCode = Response.ErrorCode;
+ }
+ else if (!Response.Success)
+ {
+ ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ if (Response.Response.GetContentType() == ZenContentType::kText)
+ {
+ Text =
+ std::string(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()), Response.Response.GetSize());
+ }
+ }
+ return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text};
+ }
+
+ Ref<CloudCacheClient> m_CloudClient;
+ const std::string m_Namespace;
+ const std::string m_Bucket;
+ const IoHash m_Key;
+ bool m_EnableBlocks = true;
+ bool m_UseTempBlocks = true;
+};
+
+std::unique_ptr<RemoteProjectStore>
+CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options)
+{
+ std::string Url = Options.Url;
+ if (Url.find("://"sv) == std::string::npos)
+ {
+ // Assume https URL
+ Url = fmt::format("https://{}"sv, Url);
+ }
+ CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv,
+ .ServiceUrl = Url,
+ .ConnectTimeout = std::chrono::milliseconds(2000),
+ .Timeout = std::chrono::milliseconds(60000)};
+ // 1) Access token as parameter in request
+ // 2) Environment variable (different win vs linux/mac)
+ // 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
+
+ std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
+ if (!Options.AccessToken.empty())
+ {
+ TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = Options.AccessToken]() {
+ return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()};
+ });
+ }
+ else
+ {
+ TokenProvider =
+ CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() {
+ AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider);
+ return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
+ });
+ }
+
+ Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider)));
+
+ std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<JupiterRemoteStore>(std::move(CloudClient),
+ Options.Namespace,
+ Options.Bucket,
+ Options.Key,
+ Options.ForceDisableBlocks,
+ Options.ForceDisableTempBlocks);
+ return RemoteStore;
+}
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h
new file mode 100644
index 000000000..31548af22
--- /dev/null
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.h
@@ -0,0 +1,26 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "remoteprojectstore.h"
+
+namespace zen {
+
+class AuthMgr;
+
+struct JupiterRemoteStoreOptions : RemoteStoreOptions
+{
+ std::string Url;
+ std::string Namespace;
+ std::string Bucket;
+ IoHash Key;
+ std::string OpenIdProvider;
+ std::string AccessToken;
+ AuthMgr& AuthManager;
+ bool ForceDisableBlocks;
+ bool ForceDisableTempBlocks;
+};
+
+std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options);
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
new file mode 100644
index 000000000..847a79a1d
--- /dev/null
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -0,0 +1,4082 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "projectstore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zenhttp/httpshared.h>
+#include <zenstore/caslog.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/scrubcontext.h>
+#include <zenutil/cache/rpcrecording.h>
+
+#include "fileremoteprojectstore.h"
+#include "jupiterremoteprojectstore.h"
+#include "remoteprojectstore.h"
+#include "zenremoteprojectstore.h"
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
+#include <xxh3.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+#endif // ZEN_WITH_TESTS
+
+namespace zen {
+
+namespace {
+ bool PrepareDirectoryDelete(const std::filesystem::path& Dir, std::filesystem::path& OutDeleteDir)
+ {
+ int DropIndex = 0;
+ do
+ {
+ if (!std::filesystem::exists(Dir))
+ {
+ return true;
+ }
+
+ std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex);
+ std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName;
+ if (std::filesystem::exists(DroppedBucketPath))
+ {
+ DropIndex++;
+ continue;
+ }
+
+ std::error_code Ec;
+ std::filesystem::rename(Dir, DroppedBucketPath, Ec);
+ if (!Ec)
+ {
+ OutDeleteDir = DroppedBucketPath;
+ return true;
+ }
+ if (Ec && !std::filesystem::exists(DroppedBucketPath))
+ {
+ // We can't move our folder, probably because it is busy, bail..
+ return false;
+ }
+ Sleep(100);
+ } while (true);
+ }
+
+ std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params,
+ AuthMgr& AuthManager,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize)
+ {
+ using namespace std::literals;
+
+ std::unique_ptr<RemoteProjectStore> RemoteStore;
+
+ if (CbObjectView File = Params["file"sv].AsObjectView(); File)
+ {
+ std::filesystem::path FolderPath(File["path"sv].AsString());
+ if (FolderPath.empty())
+ {
+ return {nullptr, "Missing file path"};
+ }
+ std::string_view Name(File["name"sv].AsString());
+ if (Name.empty())
+ {
+ return {nullptr, "Missing file name"};
+ }
+ bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false);
+ bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false);
+
+ FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ FolderPath,
+ std::string(Name),
+ ForceDisableBlocks,
+ ForceEnableTempBlocks};
+ RemoteStore = CreateFileRemoteStore(Options);
+ }
+
+ if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud)
+ {
+ std::string_view CloudServiceUrl = Cloud["url"sv].AsString();
+ if (CloudServiceUrl.empty())
+ {
+ return {nullptr, "Missing service url"};
+ }
+
+ std::string Url = cpr::util::urlDecode(std::string(CloudServiceUrl));
+ std::string_view Namespace = Cloud["namespace"sv].AsString();
+ if (Namespace.empty())
+ {
+ return {nullptr, "Missing namespace"};
+ }
+ std::string_view Bucket = Cloud["bucket"sv].AsString();
+ if (Bucket.empty())
+ {
+ return {nullptr, "Missing bucket"};
+ }
+ std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString();
+ std::string AccessToken = std::string(Cloud["access-token"sv].AsString());
+ if (AccessToken.empty())
+ {
+ std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString();
+ if (!AccessTokenEnvVariable.empty())
+ {
+ AccessToken = GetEnvVariable(AccessTokenEnvVariable);
+ }
+ }
+ std::string_view KeyParam = Cloud["key"sv].AsString();
+ if (KeyParam.empty())
+ {
+ return {nullptr, "Missing key"};
+ }
+ if (KeyParam.length() != IoHash::StringLength)
+ {
+ return {nullptr, "Invalid key"};
+ }
+ IoHash Key = IoHash::FromHexString(KeyParam);
+ if (Key == IoHash::Zero)
+ {
+ return {nullptr, "Invalid key string"};
+ }
+ bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false);
+ bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false);
+
+ JupiterRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ Url,
+ std::string(Namespace),
+ std::string(Bucket),
+ Key,
+ std::string(OpenIdProvider),
+ AccessToken,
+ AuthManager,
+ ForceDisableBlocks,
+ ForceDisableTempBlocks};
+ RemoteStore = CreateJupiterRemoteStore(Options);
+ }
+
+ if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen)
+ {
+ std::string_view Url = Zen["url"sv].AsString();
+ std::string_view Project = Zen["project"sv].AsString();
+ if (Project.empty())
+ {
+ return {nullptr, "Missing project"};
+ }
+ std::string_view Oplog = Zen["oplog"sv].AsString();
+ if (Oplog.empty())
+ {
+ return {nullptr, "Missing oplog"};
+ }
+ ZenRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ std::string(Url),
+ std::string(Project),
+ std::string(Oplog)};
+ RemoteStore = CreateZenRemoteStore(Options);
+ }
+
+ if (!RemoteStore)
+ {
+ return {nullptr, "Unknown remote store type"};
+ }
+
+ return {std::move(RemoteStore), ""};
+ }
+
+ std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result)
+ {
+ if (Result.ErrorCode == 0)
+ {
+ return {HttpResponseCode::OK, Result.Text};
+ }
+ return {static_cast<HttpResponseCode>(Result.ErrorCode),
+ Result.Reason.empty() ? Result.Text
+ : Result.Text.empty() ? Result.Reason
+ : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)};
+ }
+
+ void CSVHeader(bool Details, bool AttachmentDetails, StringBuilderBase& CSVWriter)
+ {
+ if (AttachmentDetails)
+ {
+ CSVWriter << "Project, Oplog, LSN, Key, Cid, Size";
+ }
+ else if (Details)
+ {
+ CSVWriter << "Project, Oplog, LSN, Key, Size, AttachmentCount, AttachmentsSize";
+ }
+ else
+ {
+ CSVWriter << "Project, Oplog, Key";
+ }
+ }
+
+ void CSVWriteOp(CidStore& CidStore,
+ std::string_view ProjectId,
+ std::string_view OplogId,
+ bool Details,
+ bool AttachmentDetails,
+ int LSN,
+ const Oid& Key,
+ CbObject Op,
+ StringBuilderBase& CSVWriter)
+ {
+ StringBuilder<32> KeyStringBuilder;
+ Key.ToString(KeyStringBuilder);
+ const std::string_view KeyString = KeyStringBuilder.ToView();
+
+ SharedBuffer Buffer = Op.GetBuffer();
+ if (AttachmentDetails)
+ {
+ Op.IterateAttachments([&CidStore, &CSVWriter, &ProjectId, &OplogId, LSN, &KeyString](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsAttachment();
+ IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash);
+ CSVWriter << "\r\n"
+ << ProjectId << ", " << OplogId << ", " << LSN << ", " << KeyString << ", " << AttachmentHash.ToHexString()
+ << ", " << gsl::narrow<uint64_t>(Attachment.GetSize());
+ });
+ }
+ else if (Details)
+ {
+ uint64_t AttachmentCount = 0;
+ size_t AttachmentsSize = 0;
+ Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsAttachment();
+ AttachmentCount++;
+ IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash);
+ AttachmentsSize += Attachment.GetSize();
+ });
+ CSVWriter << "\r\n"
+ << ProjectId << ", " << OplogId << ", " << LSN << ", " << KeyString << ", " << gsl::narrow<uint64_t>(Buffer.GetSize())
+ << ", " << AttachmentCount << ", " << gsl::narrow<uint64_t>(AttachmentsSize);
+ }
+ else
+ {
+ CSVWriter << "\r\n" << ProjectId << ", " << OplogId << ", " << KeyString;
+ }
+ };
+
+ void CbWriteOp(CidStore& CidStore,
+ bool Details,
+ bool OpDetails,
+ bool AttachmentDetails,
+ int LSN,
+ const Oid& Key,
+ CbObject Op,
+ CbObjectWriter& CbWriter)
+ {
+ CbWriter.BeginObject();
+ {
+ SharedBuffer Buffer = Op.GetBuffer();
+ CbWriter.AddObjectId("key", Key);
+ if (Details)
+ {
+ CbWriter.AddInteger("lsn", LSN);
+ CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Buffer.GetSize()));
+ }
+ if (AttachmentDetails)
+ {
+ CbWriter.BeginArray("attachments");
+ Op.IterateAttachments([&CidStore, &CbWriter](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsAttachment();
+ CbWriter.BeginObject();
+ {
+ IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash);
+ CbWriter.AddString("cid", AttachmentHash.ToHexString());
+ CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Attachment.GetSize()));
+ }
+ CbWriter.EndObject();
+ });
+ CbWriter.EndArray();
+ }
+ else if (Details)
+ {
+ uint64_t AttachmentCount = 0;
+ size_t AttachmentsSize = 0;
+ Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsAttachment();
+ AttachmentCount++;
+ IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash);
+ AttachmentsSize += Attachment.GetSize();
+ });
+ if (AttachmentCount > 0)
+ {
+ CbWriter.AddInteger("attachments", AttachmentCount);
+ CbWriter.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize));
+ }
+ }
+ if (OpDetails)
+ {
+ CbWriter.BeginObject("op");
+ for (const CbFieldView& Field : Op)
+ {
+ if (!Field.HasName())
+ {
+ CbWriter.AddField(Field);
+ continue;
+ }
+ std::string_view FieldName = Field.GetName();
+ CbWriter.AddField(FieldName, Field);
+ }
+ CbWriter.EndObject();
+ }
+ }
+ CbWriter.EndObject();
+ };
+
+ void CbWriteOplogOps(CidStore& CidStore,
+ ProjectStore::Oplog& Oplog,
+ bool Details,
+ bool OpDetails,
+ bool AttachmentDetails,
+ CbObjectWriter& Cbo)
+ {
+ Cbo.BeginArray("ops");
+ {
+ Oplog.IterateOplogWithKey([&Cbo, &CidStore, Details, OpDetails, AttachmentDetails](int LSN, const Oid& Key, CbObject Op) {
+ CbWriteOp(CidStore, Details, OpDetails, AttachmentDetails, LSN, Key, Op, Cbo);
+ });
+ }
+ Cbo.EndArray();
+ }
+
+ void CbWriteOplog(CidStore& CidStore,
+ ProjectStore::Oplog& Oplog,
+ bool Details,
+ bool OpDetails,
+ bool AttachmentDetails,
+ CbObjectWriter& Cbo)
+ {
+ Cbo.BeginObject();
+ {
+ Cbo.AddString("name", Oplog.OplogId());
+ CbWriteOplogOps(CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo);
+ }
+ Cbo.EndObject();
+ }
+
+ void CbWriteOplogs(CidStore& CidStore,
+ ProjectStore::Project& Project,
+ std::vector<std::string> OpLogs,
+ bool Details,
+ bool OpDetails,
+ bool AttachmentDetails,
+ CbObjectWriter& Cbo)
+ {
+ Cbo.BeginArray("oplogs");
+ {
+ for (const std::string& OpLogId : OpLogs)
+ {
+ ProjectStore::Oplog* Oplog = Project.OpenOplog(OpLogId);
+ if (Oplog != nullptr)
+ {
+ CbWriteOplog(CidStore, *Oplog, Details, OpDetails, AttachmentDetails, Cbo);
+ }
+ }
+ }
+ Cbo.EndArray();
+ }
+
+ void CbWriteProject(CidStore& CidStore,
+ ProjectStore::Project& Project,
+ std::vector<std::string> OpLogs,
+ bool Details,
+ bool OpDetails,
+ bool AttachmentDetails,
+ CbObjectWriter& Cbo)
+ {
+ Cbo.BeginObject();
+ {
+ Cbo.AddString("name", Project.Identifier);
+ CbWriteOplogs(CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
+ }
+ Cbo.EndObject();
+ }
+
+} // namespace
+
+//////////////////////////////////////////////////////////////////////////
+
+Oid
+OpKeyStringAsOId(std::string_view OpKey)
+{
+ using namespace std::literals;
+
+ CbObjectWriter Writer;
+ Writer << "key"sv << OpKey;
+
+ XXH3_128Stream KeyHasher;
+ Writer.Save()["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash = KeyHasher.GetHash();
+
+ Oid OpId;
+ memcpy(OpId.OidBits, &KeyHash, sizeof(OpId.OidBits));
+
+ return OpId;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+struct ProjectStore::OplogStorage : public RefCounted
+{
+ OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath)
+ {
+ }
+
+ ~OplogStorage()
+ {
+ ZEN_INFO("closing oplog storage at {}", m_OplogStoragePath);
+ Flush();
+ }
+
+ [[nodiscard]] bool Exists() { return Exists(m_OplogStoragePath); }
+ [[nodiscard]] static bool Exists(std::filesystem::path BasePath)
+ {
+ return std::filesystem::exists(BasePath / "ops.zlog") && std::filesystem::exists(BasePath / "ops.zops");
+ }
+
+ static bool Delete(std::filesystem::path BasePath) { return DeleteDirectories(BasePath); }
+
+ uint64_t OpBlobsSize() const
+ {
+ RwLock::SharedLockScope _(m_RwLock);
+ return m_NextOpsOffset;
+ }
+
+ void Open(bool IsCreate)
+ {
+ using namespace std::literals;
+
+ ZEN_INFO("initializing oplog storage at '{}'", m_OplogStoragePath);
+
+ if (IsCreate)
+ {
+ DeleteDirectories(m_OplogStoragePath);
+ CreateDirectories(m_OplogStoragePath);
+ }
+
+ m_Oplog.Open(m_OplogStoragePath / "ops.zlog"sv, IsCreate ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ m_Oplog.Initialize();
+
+ m_OpBlobs.Open(m_OplogStoragePath / "ops.zops"sv, IsCreate ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
+
+ ZEN_ASSERT(IsPow2(m_OpsAlign));
+ ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1)));
+ }
+
+ void ReplayLog(std::function<void(CbObject, const OplogEntry&)>&& Handler)
+ {
+ ZEN_TRACE_CPU("ProjectStore::OplogStorage::ReplayLog");
+
+ // This could use memory mapping or do something clever but for now it just reads the file sequentially
+
+ ZEN_INFO("replaying log for '{}'", m_OplogStoragePath);
+
+ Stopwatch Timer;
+
+ uint64_t InvalidEntries = 0;
+
+ IoBuffer OpBuffer;
+ m_Oplog.Replay(
+ [&](const OplogEntry& LogEntry) {
+ if (LogEntry.OpCoreSize == 0)
+ {
+ ++InvalidEntries;
+
+ return;
+ }
+
+ if (OpBuffer.GetSize() < LogEntry.OpCoreSize)
+ {
+ OpBuffer = IoBuffer(LogEntry.OpCoreSize);
+ }
+
+ const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
+
+ m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
+
+ // Verify checksum, ignore op data if incorrect
+ const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF);
+
+ if (OpCoreHash != LogEntry.OpCoreHash)
+ {
+ ZEN_WARN("skipping oplog entry with bad checksum!");
+ return;
+ }
+
+ CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), LogEntry.OpCoreSize));
+
+ m_NextOpsOffset =
+ Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
+ m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn);
+
+ Handler(Op, LogEntry);
+ },
+ 0);
+
+ if (InvalidEntries)
+ {
+ ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries);
+ }
+
+ ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}",
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ m_MaxLsn,
+ m_NextOpsOffset);
+ }
+
+ void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler)
+ {
+ for (const OplogEntryAddress& Entry : Entries)
+ {
+ CbObject Op = GetOp(Entry);
+ Handler(Op);
+ }
+ }
+
+ CbObject GetOp(const OplogEntryAddress& Entry)
+ {
+ IoBuffer OpBuffer(Entry.Size);
+
+ const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign;
+ m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset);
+
+ return CbObject(SharedBuffer(std::move(OpBuffer)));
+ }
+
+ OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, XXH3_128 KeyHash)
+ {
+ ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp");
+
+ using namespace std::literals;
+
+ uint64_t WriteSize = Buffer.GetSize();
+
+ RwLock::ExclusiveLockScope Lock(m_RwLock);
+ const uint64_t WriteOffset = m_NextOpsOffset;
+ const uint32_t OpLsn = ++m_MaxLsn;
+ m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign);
+ Lock.ReleaseNow();
+
+ ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign));
+
+ OplogEntry Entry = {.OpLsn = OpLsn,
+ .OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign),
+ .OpCoreSize = uint32_t(Buffer.GetSize()),
+ .OpCoreHash = OpCoreHash,
+ .OpKeyHash = KeyHash};
+
+ m_Oplog.Append(Entry);
+ m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset);
+
+ return Entry;
+ }
+
+ void Flush()
+ {
+ m_Oplog.Flush();
+ m_OpBlobs.Flush();
+ }
+
+ spdlog::logger& Log() { return m_OwnerOplog->Log(); }
+
+private:
+ ProjectStore::Oplog* m_OwnerOplog;
+ std::filesystem::path m_OplogStoragePath;
+ mutable RwLock m_RwLock;
+ TCasLogFile<OplogEntry> m_Oplog;
+ BasicFile m_OpBlobs;
+ std::atomic<uint64_t> m_NextOpsOffset{0};
+ uint64_t m_OpsAlign = 32;
+ std::atomic<uint32_t> m_MaxLsn{0};
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+ProjectStore::Oplog::Oplog(std::string_view Id,
+ Project* Project,
+ CidStore& Store,
+ std::filesystem::path BasePath,
+ const std::filesystem::path& MarkerPath)
+: m_OuterProject(Project)
+, m_CidStore(Store)
+, m_BasePath(BasePath)
+, m_MarkerPath(MarkerPath)
+, m_OplogId(Id)
+{
+ using namespace std::literals;
+
+ m_Storage = new OplogStorage(this, m_BasePath);
+ const bool StoreExists = m_Storage->Exists();
+ m_Storage->Open(/* IsCreate */ !StoreExists);
+
+ m_TempPath = m_BasePath / "temp"sv;
+
+ CleanDirectory(m_TempPath);
+}
+
+ProjectStore::Oplog::~Oplog()
+{
+ if (m_Storage)
+ {
+ Flush();
+ }
+}
+
+void
+ProjectStore::Oplog::Flush()
+{
+ ZEN_ASSERT(m_Storage);
+ m_Storage->Flush();
+}
+
+void
+ProjectStore::Oplog::Scrub(ScrubContext& Ctx) const
+{
+ ZEN_UNUSED(Ctx);
+}
+
+void
+ProjectStore::Oplog::GatherReferences(GcContext& GcCtx)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+
+ std::vector<IoHash> Hashes;
+ Hashes.reserve(Max(m_ChunkMap.size(), m_MetaMap.size()));
+
+ for (const auto& Kv : m_ChunkMap)
+ {
+ Hashes.push_back(Kv.second);
+ }
+
+ GcCtx.AddRetainedCids(Hashes);
+
+ Hashes.clear();
+
+ for (const auto& Kv : m_MetaMap)
+ {
+ Hashes.push_back(Kv.second);
+ }
+
+ GcCtx.AddRetainedCids(Hashes);
+}
+
+uint64_t
+ProjectStore::Oplog::TotalSize() const
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (m_Storage)
+ {
+ return m_Storage->OpBlobsSize();
+ }
+ return 0;
+}
+
+bool
+ProjectStore::Oplog::IsExpired() const
+{
+ if (m_MarkerPath.empty())
+ {
+ return false;
+ }
+ return !std::filesystem::exists(m_MarkerPath);
+}
+
+std::filesystem::path
+ProjectStore::Oplog::PrepareForDelete(bool MoveFolder)
+{
+ RwLock::ExclusiveLockScope _(m_OplogLock);
+ m_ChunkMap.clear();
+ m_MetaMap.clear();
+ m_FileMap.clear();
+ m_OpAddressMap.clear();
+ m_LatestOpMap.clear();
+ m_Storage = {};
+ if (!MoveFolder)
+ {
+ return {};
+ }
+ std::filesystem::path MovedDir;
+ if (PrepareDirectoryDelete(m_BasePath, MovedDir))
+ {
+ return MovedDir;
+ }
+ return {};
+}
+
+bool
+ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath)
+{
+ using namespace std::literals;
+
+ std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv;
+ return std::filesystem::is_regular_file(StateFilePath);
+}
+
+void
+ProjectStore::Oplog::Read()
+{
+ using namespace std::literals;
+
+ std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv;
+ if (std::filesystem::is_regular_file(StateFilePath))
+ {
+ ZEN_INFO("reading config for oplog '{}' in project '{}' from {}", m_OplogId, m_OuterProject->Identifier, StateFilePath);
+
+ BasicFile Blob;
+ Blob.Open(StateFilePath, BasicFile::Mode::kRead);
+
+ IoBuffer Obj = Blob.ReadAll();
+ CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All);
+
+ if (ValidationError != CbValidateError::None)
+ {
+ ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), StateFilePath);
+ return;
+ }
+
+ CbObject Cfg = LoadCompactBinaryObject(Obj);
+
+ m_MarkerPath = Cfg["gcpath"sv].AsString();
+ }
+ else
+ {
+ ZEN_INFO("config for oplog '{}' in project '{}' not found at {}. Assuming legacy store",
+ m_OplogId,
+ m_OuterProject->Identifier,
+ StateFilePath);
+ }
+ ReplayLog();
+}
+
+void
+ProjectStore::Oplog::Write()
+{
+ using namespace std::literals;
+
+ BinaryWriter Mem;
+
+ CbObjectWriter Cfg;
+
+ Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath);
+
+ Cfg.Save(Mem);
+
+ std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv;
+
+ ZEN_INFO("persisting config for oplog '{}' in project '{}' to {}", m_OplogId, m_OuterProject->Identifier, StateFilePath);
+
+ BasicFile Blob;
+ Blob.Open(StateFilePath, BasicFile::Mode::kTruncate);
+ Blob.Write(Mem.Data(), Mem.Size(), 0);
+ Blob.Flush();
+}
+
+void
+ProjectStore::Oplog::ReplayLog()
+{
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ if (!m_Storage)
+ {
+ return;
+ }
+ m_Storage->ReplayLog(
+ [&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry, kUpdateReplay); });
+}
+
+IoBuffer
+ProjectStore::Oplog::FindChunk(Oid ChunkId)
+{
+ RwLock::SharedLockScope OplogLock(m_OplogLock);
+ if (!m_Storage)
+ {
+ return IoBuffer{};
+ }
+
+ if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end())
+ {
+ IoHash ChunkHash = ChunkIt->second;
+ OplogLock.ReleaseNow();
+
+ IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash);
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+
+ return Chunk;
+ }
+
+ if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end())
+ {
+ std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath;
+
+ OplogLock.ReleaseNow();
+
+ IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath);
+ FileChunk.SetContentType(ZenContentType::kBinary);
+
+ return FileChunk;
+ }
+
+ if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end())
+ {
+ IoHash ChunkHash = MetaIt->second;
+ OplogLock.ReleaseNow();
+
+ IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash);
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+
+ return Chunk;
+ }
+
+ return {};
+}
+
+void
+ProjectStore::Oplog::IterateFileMap(
+ std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return;
+ }
+
+ for (const auto& Kv : m_FileMap)
+ {
+ Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath);
+ }
+}
+
+void
+ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return;
+ }
+
+ std::vector<OplogEntryAddress> Entries;
+ Entries.reserve(m_LatestOpMap.size());
+
+ for (const auto& Kv : m_LatestOpMap)
+ {
+ const auto AddressEntry = m_OpAddressMap.find(Kv.second);
+ ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+
+ Entries.push_back(AddressEntry->second);
+ }
+
+ std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) {
+ return Lhs.Offset < Rhs.Offset;
+ });
+
+ m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); });
+}
+
+void
+ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbObject)>&& Handler)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return;
+ }
+
+ std::vector<size_t> EntryIndexes;
+ std::vector<OplogEntryAddress> Entries;
+ std::vector<Oid> Keys;
+ std::vector<int> LSNs;
+ Entries.reserve(m_LatestOpMap.size());
+ EntryIndexes.reserve(m_LatestOpMap.size());
+ Keys.reserve(m_LatestOpMap.size());
+ LSNs.reserve(m_LatestOpMap.size());
+
+ for (const auto& Kv : m_LatestOpMap)
+ {
+ const auto AddressEntry = m_OpAddressMap.find(Kv.second);
+ ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+
+ Entries.push_back(AddressEntry->second);
+ Keys.push_back(Kv.first);
+ LSNs.push_back(Kv.second);
+ EntryIndexes.push_back(EntryIndexes.size());
+ }
+
+ std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) {
+ const OplogEntryAddress& LhsEntry = Entries[Lhs];
+ const OplogEntryAddress& RhsEntry = Entries[Rhs];
+ return LhsEntry.Offset < RhsEntry.Offset;
+ });
+ std::vector<OplogEntryAddress> SortedEntries;
+ SortedEntries.reserve(EntryIndexes.size());
+ for (size_t Index : EntryIndexes)
+ {
+ SortedEntries.push_back(Entries[Index]);
+ }
+
+ size_t EntryIndex = 0;
+ m_Storage->ReplayLog(SortedEntries, [&](CbObject Op) {
+ Handler(LSNs[EntryIndex], Keys[EntryIndex], Op);
+ EntryIndex++;
+ });
+}
+
+int
+ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return {};
+ }
+ if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end())
+ {
+ return LatestOp->second;
+ }
+ return -1;
+}
+
+std::optional<CbObject>
+ProjectStore::Oplog::GetOpByKey(const Oid& Key)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return {};
+ }
+
+ if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end())
+ {
+ const auto AddressEntry = m_OpAddressMap.find(LatestOp->second);
+ ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+
+ return m_Storage->GetOp(AddressEntry->second);
+ }
+
+ return {};
+}
+
+std::optional<CbObject>
+ProjectStore::Oplog::GetOpByIndex(int Index)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (!m_Storage)
+ {
+ return {};
+ }
+
+ if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end())
+ {
+ return m_Storage->GetOp(AddressEntryIt->second);
+ }
+
+ return {};
+}
+
+void
+ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&,
+ Oid FileId,
+ IoHash Hash,
+ std::string_view ServerPath,
+ std::string_view ClientPath)
+{
+ if (Hash != IoHash::Zero)
+ {
+ m_ChunkMap.insert_or_assign(FileId, Hash);
+ }
+
+ FileMapEntry Entry;
+ Entry.ServerPath = ServerPath;
+ Entry.ClientPath = ClientPath;
+
+ m_FileMap[FileId] = std::move(Entry);
+
+ if (Hash != IoHash::Zero)
+ {
+ m_ChunkMap.insert_or_assign(FileId, Hash);
+ }
+}
+
+void
+ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash)
+{
+ m_ChunkMap.insert_or_assign(ChunkId, Hash);
+}
+
+void
+ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash)
+{
+ m_MetaMap.insert_or_assign(ChunkId, Hash);
+}
+
+ProjectStore::Oplog::OplogEntryMapping
+ProjectStore::Oplog::GetMapping(CbObject Core)
+{
+ using namespace std::literals;
+
+ OplogEntryMapping Result;
+
+ // Update chunk id maps
+ CbObjectView PackageObj = Core["package"sv].AsObjectView();
+ CbArrayView BulkDataArray = Core["bulkdata"sv].AsArrayView();
+ CbArrayView PackageDataArray = Core["packagedata"sv].AsArrayView();
+ Result.Chunks.reserve(PackageObj ? 1 : 0 + BulkDataArray.Num() + PackageDataArray.Num());
+
+ if (PackageObj)
+ {
+ Oid Id = PackageObj["id"sv].AsObjectId();
+ IoHash Hash = PackageObj["data"sv].AsBinaryAttachment();
+ Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
+ ZEN_DEBUG("package data {} -> {}", Id, Hash);
+ }
+
+ for (CbFieldView& Entry : PackageDataArray)
+ {
+ CbObjectView PackageDataObj = Entry.AsObjectView();
+ Oid Id = PackageDataObj["id"sv].AsObjectId();
+ IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment();
+ Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
+ ZEN_DEBUG("package {} -> {}", Id, Hash);
+ }
+
+ for (CbFieldView& Entry : BulkDataArray)
+ {
+ CbObjectView BulkObj = Entry.AsObjectView();
+ Oid Id = BulkObj["id"sv].AsObjectId();
+ IoHash Hash = BulkObj["data"sv].AsBinaryAttachment();
+ Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
+ ZEN_DEBUG("bulkdata {} -> {}", Id, Hash);
+ }
+
+ CbArrayView FilesArray = Core["files"sv].AsArrayView();
+ Result.Files.reserve(FilesArray.Num());
+ for (CbFieldView& Entry : FilesArray)
+ {
+ CbObjectView FileObj = Entry.AsObjectView();
+
+ std::string_view ServerPath = FileObj["serverpath"sv].AsString();
+ std::string_view ClientPath = FileObj["clientpath"sv].AsString();
+ if (ServerPath.empty() || ClientPath.empty())
+ {
+ ZEN_WARN("invalid file");
+ continue;
+ }
+
+ Oid Id = FileObj["id"sv].AsObjectId();
+ IoHash Hash = FileObj["data"sv].AsBinaryAttachment();
+ Result.Files.emplace_back(
+ OplogEntryMapping::FileMapping{OplogEntryMapping::Mapping{Id, Hash}, std::string(ServerPath), std::string(ClientPath)});
+ ZEN_DEBUG("file {} -> {}, ServerPath: {}, ClientPath: {}", Id, Hash, ServerPath, ClientPath);
+ }
+
+ CbArrayView MetaArray = Core["meta"sv].AsArrayView();
+ Result.Meta.reserve(MetaArray.Num());
+ for (CbFieldView& Entry : MetaArray)
+ {
+ CbObjectView MetaObj = Entry.AsObjectView();
+ Oid Id = MetaObj["id"sv].AsObjectId();
+ IoHash Hash = MetaObj["data"sv].AsBinaryAttachment();
+ Result.Meta.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
+ auto NameString = MetaObj["name"sv].AsString();
+ ZEN_DEBUG("meta data ({}) {} -> {}", NameString, Id, Hash);
+ }
+
+ return Result;
+}
+
+uint32_t
+ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
+ const OplogEntryMapping& OpMapping,
+ const OplogEntry& OpEntry,
+ UpdateType TypeOfUpdate)
+{
+ ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry");
+
+ ZEN_UNUSED(TypeOfUpdate);
+
+ // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing
+ // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however
+
+ using namespace std::literals;
+
+ // Update chunk id maps
+ for (const OplogEntryMapping::Mapping& Chunk : OpMapping.Chunks)
+ {
+ AddChunkMapping(OplogLock, Chunk.Id, Chunk.Hash);
+ }
+
+ for (const OplogEntryMapping::FileMapping& File : OpMapping.Files)
+ {
+ AddFileMapping(OplogLock, File.Id, File.Hash, File.ServerPath, File.ClientPath);
+ }
+
+ for (const OplogEntryMapping::Mapping& Meta : OpMapping.Meta)
+ {
+ AddMetaMapping(OplogLock, Meta.Id, Meta.Hash);
+ }
+
+ m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
+ m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn;
+
+ return OpEntry.OpLsn;
+}
+
+uint32_t
+ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
+{
+ ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry");
+
+ const CbObject& Core = OpPackage.GetObject();
+ const uint32_t EntryId = AppendNewOplogEntry(Core);
+ if (EntryId == 0xffffffffu)
+ {
+ // The oplog has been deleted so just drop this
+ return EntryId;
+ }
+
+ // Persist attachments after oplog entry so GC won't find attachments without references
+
+ uint64_t AttachmentBytes = 0;
+ uint64_t NewAttachmentBytes = 0;
+
+ auto Attachments = OpPackage.GetAttachments();
+
+ for (const auto& Attach : Attachments)
+ {
+ ZEN_ASSERT(Attach.IsCompressedBinary());
+
+ CompressedBuffer AttachmentData = Attach.AsCompressedBinary();
+ const uint64_t AttachmentSize = AttachmentData.DecodeRawSize();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash());
+
+ if (InsertResult.New)
+ {
+ NewAttachmentBytes += AttachmentSize;
+ }
+ AttachmentBytes += AttachmentSize;
+ }
+
+ ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes));
+
+ return EntryId;
+}
+
+uint32_t
+ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core)
+{
+ ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry");
+
+ using namespace std::literals;
+
+ OplogEntryMapping Mapping = GetMapping(Core);
+
+ SharedBuffer Buffer = Core.GetBuffer();
+ const uint64_t WriteSize = Buffer.GetSize();
+ const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF);
+
+ ZEN_ASSERT(WriteSize != 0);
+
+ XXH3_128Stream KeyHasher;
+ Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash = KeyHasher.GetHash();
+
+ RefPtr<OplogStorage> Storage;
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ Storage = m_Storage;
+ }
+ if (!m_Storage)
+ {
+ return 0xffffffffu;
+ }
+ const OplogEntry OpEntry = m_Storage->AppendOp(Buffer, OpCoreHash, KeyHash);
+
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry, kUpdateNewEntry);
+
+ return EntryId;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath)
+: m_ProjectStore(PrjStore)
+, m_CidStore(Store)
+, m_OplogStoragePath(BasePath)
+{
+}
+
+ProjectStore::Project::~Project()
+{
+}
+
+bool
+ProjectStore::Project::Exists(std::filesystem::path BasePath)
+{
+ return std::filesystem::exists(BasePath / "Project.zcb");
+}
+
+void
+ProjectStore::Project::Read()
+{
+ using namespace std::literals;
+
+ std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv;
+
+ ZEN_INFO("reading config for project '{}' from {}", Identifier, ProjectStateFilePath);
+
+ BasicFile Blob;
+ Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead);
+
+ IoBuffer Obj = Blob.ReadAll();
+ CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All);
+
+ if (ValidationError == CbValidateError::None)
+ {
+ CbObject Cfg = LoadCompactBinaryObject(Obj);
+
+ Identifier = Cfg["id"sv].AsString();
+ RootDir = Cfg["root"sv].AsString();
+ ProjectRootDir = Cfg["project"sv].AsString();
+ EngineRootDir = Cfg["engine"sv].AsString();
+ ProjectFilePath = Cfg["projectfile"sv].AsString();
+ }
+ else
+ {
+ ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), ProjectStateFilePath);
+ }
+}
+
+void
+ProjectStore::Project::Write()
+{
+ using namespace std::literals;
+
+ BinaryWriter Mem;
+
+ CbObjectWriter Cfg;
+ Cfg << "id"sv << Identifier;
+ Cfg << "root"sv << PathToUtf8(RootDir);
+ Cfg << "project"sv << ProjectRootDir;
+ Cfg << "engine"sv << EngineRootDir;
+ Cfg << "projectfile"sv << ProjectFilePath;
+
+ Cfg.Save(Mem);
+
+ CreateDirectories(m_OplogStoragePath);
+
+ std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv;
+
+ ZEN_INFO("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath);
+
+ BasicFile Blob;
+ Blob.Open(ProjectStateFilePath, BasicFile::Mode::kTruncate);
+ Blob.Write(Mem.Data(), Mem.Size(), 0);
+ Blob.Flush();
+}
+
+spdlog::logger&
+ProjectStore::Project::Log()
+{
+ return m_ProjectStore->Log();
+}
+
+std::filesystem::path
+ProjectStore::Project::BasePathForOplog(std::string_view OplogId)
+{
+ return m_OplogStoragePath / OplogId;
+}
+
+ProjectStore::Oplog*
+ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath)
+{
+ RwLock::ExclusiveLockScope _(m_ProjectLock);
+
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
+ try
+ {
+ Oplog* Log = m_Oplogs
+ .try_emplace(std::string{OplogId},
+ std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, MarkerPath))
+ .first->second.get();
+
+ Log->Write();
+ return Log;
+ }
+ catch (std::exception&)
+ {
+ // In case of failure we need to ensure there's no half constructed entry around
+ //
+ // (This is probably already ensured by the try_emplace implementation?)
+
+ m_Oplogs.erase(std::string{OplogId});
+
+ return nullptr;
+ }
+}
+
+ProjectStore::Oplog*
+ProjectStore::Project::OpenOplog(std::string_view OplogId)
+{
+ {
+ RwLock::SharedLockScope _(m_ProjectLock);
+
+ auto OplogIt = m_Oplogs.find(std::string(OplogId));
+
+ if (OplogIt != m_Oplogs.end())
+ {
+ return OplogIt->second.get();
+ }
+ }
+
+ RwLock::ExclusiveLockScope _(m_ProjectLock);
+
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
+ if (Oplog::ExistsAt(OplogBasePath))
+ {
+ // Do open of existing oplog
+
+ try
+ {
+ Oplog* Log =
+ m_Oplogs
+ .try_emplace(std::string{OplogId},
+ std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{}))
+ .first->second.get();
+ Log->Read();
+
+ return Log;
+ }
+ catch (std::exception& ex)
+ {
+ ZEN_WARN("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what());
+
+ m_Oplogs.erase(std::string{OplogId});
+ }
+ }
+
+ return nullptr;
+}
+
+void
+ProjectStore::Project::DeleteOplog(std::string_view OplogId)
+{
+ std::filesystem::path DeletePath;
+ {
+ RwLock::ExclusiveLockScope _(m_ProjectLock);
+
+ auto OplogIt = m_Oplogs.find(std::string(OplogId));
+
+ if (OplogIt != m_Oplogs.end())
+ {
+ std::unique_ptr<Oplog>& Oplog = OplogIt->second;
+ DeletePath = Oplog->PrepareForDelete(true);
+ m_DeletedOplogs.emplace_back(std::move(Oplog));
+ m_Oplogs.erase(OplogIt);
+ }
+ }
+
+ // Erase content on disk
+ if (!DeletePath.empty())
+ {
+ OplogStorage::Delete(DeletePath);
+ }
+}
+
+std::vector<std::string>
+ProjectStore::Project::ScanForOplogs() const
+{
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_OplogStoragePath, DirectoryContent::IncludeDirsFlag, DirContent);
+ std::vector<std::string> Oplogs;
+ Oplogs.reserve(DirContent.Directories.size());
+ for (const std::filesystem::path& DirPath : DirContent.Directories)
+ {
+ Oplogs.push_back(DirPath.filename().string());
+ }
+ return Oplogs;
+}
+
+void
+ProjectStore::Project::IterateOplogs(std::function<void(const Oplog&)>&& Fn) const
+{
+ RwLock::SharedLockScope _(m_ProjectLock);
+
+ for (auto& Kv : m_Oplogs)
+ {
+ Fn(*Kv.second);
+ }
+}
+
+void
+ProjectStore::Project::IterateOplogs(std::function<void(Oplog&)>&& Fn)
+{
+ RwLock::SharedLockScope _(m_ProjectLock);
+
+ for (auto& Kv : m_Oplogs)
+ {
+ Fn(*Kv.second);
+ }
+}
+
+void
+ProjectStore::Project::Flush()
+{
+ // We only need to flush oplogs that we have already loaded
+ IterateOplogs([&](Oplog& Ops) { Ops.Flush(); });
+}
+
+void
+ProjectStore::Project::Scrub(ScrubContext& Ctx)
+{
+ // Scrubbing needs to check all existing oplogs
+ std::vector<std::string> OpLogs = ScanForOplogs();
+ for (const std::string& OpLogId : OpLogs)
+ {
+ OpenOplog(OpLogId);
+ }
+ IterateOplogs([&](const Oplog& Ops) {
+ if (!Ops.IsExpired())
+ {
+ Ops.Scrub(Ctx);
+ }
+ });
+}
+
+void
+ProjectStore::Project::GatherReferences(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("ProjectStore::Project::GatherReferences");
+
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] {
+ ZEN_DEBUG("gathered references from project store project {} in {}", Identifier, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ // GatherReferences needs to check all existing oplogs
+ std::vector<std::string> OpLogs = ScanForOplogs();
+ for (const std::string& OpLogId : OpLogs)
+ {
+ OpenOplog(OpLogId);
+ }
+ IterateOplogs([&](Oplog& Ops) {
+ if (!Ops.IsExpired())
+ {
+ Ops.GatherReferences(GcCtx);
+ }
+ });
+}
+
+uint64_t
+ProjectStore::Project::TotalSize() const
+{
+ uint64_t Result = 0;
+ {
+ RwLock::SharedLockScope _(m_ProjectLock);
+ for (const auto& It : m_Oplogs)
+ {
+ Result += It.second->TotalSize();
+ }
+ }
+ return Result;
+}
+
+bool
+ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath)
+{
+ RwLock::ExclusiveLockScope _(m_ProjectLock);
+
+ for (auto& It : m_Oplogs)
+ {
+ // We don't care about the moved folder
+ It.second->PrepareForDelete(false);
+ m_DeletedOplogs.emplace_back(std::move(It.second));
+ }
+
+ m_Oplogs.clear();
+
+ bool Success = PrepareDirectoryDelete(m_OplogStoragePath, OutDeletePath);
+ if (!Success)
+ {
+ return false;
+ }
+ m_OplogStoragePath.clear();
+ return true;
+}
+
+bool
+ProjectStore::Project::IsExpired() const
+{
+ if (ProjectFilePath.empty())
+ {
+ return false;
+ }
+ return !std::filesystem::exists(ProjectFilePath);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc)
+: GcStorage(Gc)
+, GcContributor(Gc)
+, m_Log(logging::Get("project"))
+, m_CidStore(Store)
+, m_ProjectBasePath(BasePath)
+{
+ ZEN_INFO("initializing project store at '{}'", BasePath);
+ // m_Log.set_level(spdlog::level::debug);
+}
+
+ProjectStore::~ProjectStore()
+{
+ ZEN_INFO("closing project store ('{}')", m_ProjectBasePath);
+}
+
+std::filesystem::path
+ProjectStore::BasePathForProject(std::string_view ProjectId)
+{
+ return m_ProjectBasePath / ProjectId;
+}
+
+void
+ProjectStore::DiscoverProjects()
+{
+ if (!std::filesystem::exists(m_ProjectBasePath))
+ {
+ return;
+ }
+
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent);
+
+ for (const std::filesystem::path& DirPath : DirContent.Directories)
+ {
+ std::string DirName = PathToUtf8(DirPath.filename());
+ OpenProject(DirName);
+ }
+}
+
+void
+ProjectStore::IterateProjects(std::function<void(Project& Prj)>&& Fn)
+{
+ RwLock::SharedLockScope _(m_ProjectsLock);
+
+ for (auto& Kv : m_Projects)
+ {
+ Fn(*Kv.second.Get());
+ }
+}
+
+void
+ProjectStore::Flush()
+{
+ std::vector<Ref<Project>> Projects;
+ {
+ RwLock::SharedLockScope _(m_ProjectsLock);
+ Projects.reserve(m_Projects.size());
+
+ for (auto& Kv : m_Projects)
+ {
+ Projects.push_back(Kv.second);
+ }
+ }
+ for (const Ref<Project>& Project : Projects)
+ {
+ Project->Flush();
+ }
+}
+
+void
+ProjectStore::Scrub(ScrubContext& Ctx)
+{
+ DiscoverProjects();
+
+ std::vector<Ref<Project>> Projects;
+ {
+ RwLock::SharedLockScope _(m_ProjectsLock);
+ Projects.reserve(m_Projects.size());
+
+ for (auto& Kv : m_Projects)
+ {
+ if (Kv.second->IsExpired())
+ {
+ continue;
+ }
+ Projects.push_back(Kv.second);
+ }
+ }
+ for (const Ref<Project>& Project : Projects)
+ {
+ Project->Scrub(Ctx);
+ }
+}
+
+void
+ProjectStore::GatherReferences(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("ProjectStore::GatherReferences");
+
+ size_t ProjectCount = 0;
+ size_t ExpiredProjectCount = 0;
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] {
+ ZEN_DEBUG("gathered references from '{}' in {}, found {} active projects and {} expired projects",
+ m_ProjectBasePath.string(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ ProjectCount,
+ ExpiredProjectCount);
+ });
+
+ DiscoverProjects();
+
+ std::vector<Ref<Project>> Projects;
+ {
+ RwLock::SharedLockScope _(m_ProjectsLock);
+ Projects.reserve(m_Projects.size());
+
+ for (auto& Kv : m_Projects)
+ {
+ if (Kv.second->IsExpired())
+ {
+ ExpiredProjectCount++;
+ continue;
+ }
+ Projects.push_back(Kv.second);
+ }
+ }
+ ProjectCount = Projects.size();
+ for (const Ref<Project>& Project : Projects)
+ {
+ Project->GatherReferences(GcCtx);
+ }
+}
+
+void
+ProjectStore::CollectGarbage(GcContext& GcCtx)
+{
+ ZEN_TRACE_CPU("ProjectStore::CollectGarbage");
+
+ size_t ProjectCount = 0;
+ size_t ExpiredProjectCount = 0;
+
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] {
+ ZEN_DEBUG("garbage collect from '{}' DONE after {}, found {} active projects and {} expired projects",
+ m_ProjectBasePath.string(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ ProjectCount,
+ ExpiredProjectCount);
+ });
+ std::vector<Ref<Project>> ExpiredProjects;
+ std::vector<Ref<Project>> Projects;
+
+ {
+ RwLock::SharedLockScope _(m_ProjectsLock);
+ for (auto& Kv : m_Projects)
+ {
+ if (Kv.second->IsExpired())
+ {
+ ExpiredProjects.push_back(Kv.second);
+ ExpiredProjectCount++;
+ continue;
+ }
+ Projects.push_back(Kv.second);
+ ProjectCount++;
+ }
+ }
+
+ if (!GcCtx.IsDeletionMode())
+ {
+ ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string());
+ return;
+ }
+
+ for (const Ref<Project>& Project : Projects)
+ {
+ std::vector<std::string> ExpiredOplogs;
+ {
+ RwLock::ExclusiveLockScope _(m_ProjectsLock);
+ Project->IterateOplogs([&ExpiredOplogs](ProjectStore::Oplog& Oplog) {
+ if (Oplog.IsExpired())
+ {
+ ExpiredOplogs.push_back(Oplog.OplogId());
+ }
+ });
+ }
+ for (const std::string& OplogId : ExpiredOplogs)
+ {
+ ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected oplog '{}' in project '{}'. Removing storage on disk",
+ OplogId,
+ Project->Identifier);
+ Project->DeleteOplog(OplogId);
+ }
+ }
+
+ if (ExpiredProjects.empty())
+ {
+ ZEN_DEBUG("garbage collect for '{}', no expired projects found", m_ProjectBasePath.string());
+ return;
+ }
+
+ for (const Ref<Project>& Project : ExpiredProjects)
+ {
+ std::filesystem::path PathToRemove;
+ std::string ProjectId;
+ {
+ RwLock::ExclusiveLockScope _(m_ProjectsLock);
+ if (!Project->IsExpired())
+ {
+ ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project no longer expired.", ProjectId);
+ continue;
+ }
+ bool Success = Project->PrepareForDelete(PathToRemove);
+ if (!Success)
+ {
+ ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project folder is locked.", ProjectId);
+ continue;
+ }
+ m_Projects.erase(Project->Identifier);
+ ProjectId = Project->Identifier;
+ }
+
+ ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected project '{}'. Removing storage on disk", ProjectId);
+ if (PathToRemove.empty())
+ {
+ continue;
+ }
+
+ DeleteDirectories(PathToRemove);
+ }
+}
+
+GcStorageSize
+ProjectStore::StorageSize() const
+{
+ GcStorageSize Result;
+ {
+ RwLock::SharedLockScope _(m_ProjectsLock);
+ for (auto& Kv : m_Projects)
+ {
+ const Ref<Project>& Project = Kv.second;
+ Result.DiskSize += Project->TotalSize();
+ }
+ }
+ return Result;
+}
+
+Ref<ProjectStore::Project>
+ProjectStore::OpenProject(std::string_view ProjectId)
+{
+ {
+ RwLock::SharedLockScope _(m_ProjectsLock);
+
+ auto ProjIt = m_Projects.find(std::string{ProjectId});
+
+ if (ProjIt != m_Projects.end())
+ {
+ return ProjIt->second;
+ }
+ }
+
+ RwLock::ExclusiveLockScope _(m_ProjectsLock);
+
+ std::filesystem::path BasePath = BasePathForProject(ProjectId);
+
+ if (Project::Exists(BasePath))
+ {
+ try
+ {
+ ZEN_INFO("opening project {} @ {}", ProjectId, BasePath);
+
+ Ref<Project>& Prj =
+ m_Projects
+ .try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath)))
+ .first->second;
+ Prj->Identifier = ProjectId;
+ Prj->Read();
+ return Prj;
+ }
+ catch (std::exception& e)
+ {
+ ZEN_WARN("failed to open {} @ {} ({})", ProjectId, BasePath, e.what());
+ m_Projects.erase(std::string{ProjectId});
+ }
+ }
+
+ return {};
+}
+
+Ref<ProjectStore::Project>
+ProjectStore::NewProject(std::filesystem::path BasePath,
+ std::string_view ProjectId,
+ std::string_view RootDir,
+ std::string_view EngineRootDir,
+ std::string_view ProjectRootDir,
+ std::string_view ProjectFilePath)
+{
+ RwLock::ExclusiveLockScope _(m_ProjectsLock);
+
+ Ref<Project>& Prj =
+ m_Projects.try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath)))
+ .first->second;
+ Prj->Identifier = ProjectId;
+ Prj->RootDir = RootDir;
+ Prj->EngineRootDir = EngineRootDir;
+ Prj->ProjectRootDir = ProjectRootDir;
+ Prj->ProjectFilePath = ProjectFilePath;
+ Prj->Write();
+
+ return Prj;
+}
+
+bool
+ProjectStore::DeleteProject(std::string_view ProjectId)
+{
+ ZEN_INFO("deleting project {}", ProjectId);
+
+ RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock);
+
+ auto ProjIt = m_Projects.find(std::string{ProjectId});
+
+ if (ProjIt == m_Projects.end())
+ {
+ return true;
+ }
+
+ std::filesystem::path DeletePath;
+ bool Success = ProjIt->second->PrepareForDelete(DeletePath);
+
+ if (!Success)
+ {
+ return false;
+ }
+ m_Projects.erase(ProjIt);
+ ProjectsLock.ReleaseNow();
+
+ if (!DeletePath.empty())
+ {
+ DeleteDirectories(DeletePath);
+ }
+ return true;
+}
+
+bool
+ProjectStore::Exists(std::string_view ProjectId)
+{
+ return Project::Exists(BasePathForProject(ProjectId));
+}
+
+CbArray
+ProjectStore::GetProjectsList()
+{
+ using namespace std::literals;
+
+ DiscoverProjects();
+
+ CbWriter Response;
+ Response.BeginArray();
+
+ IterateProjects([&Response](ProjectStore::Project& Prj) {
+ Response.BeginObject();
+ Response << "Id"sv << Prj.Identifier;
+ Response << "RootDir"sv << Prj.RootDir.string();
+ Response << "ProjectRootDir"sv << Prj.ProjectRootDir;
+ Response << "EngineRootDir"sv << Prj.EngineRootDir;
+ Response << "ProjectFilePath"sv << Prj.ProjectFilePath;
+ Response.EndObject();
+ });
+ Response.EndArray();
+ return Response.Save().AsArray();
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, bool FilterClient, CbObject& OutPayload)
+{
+ using namespace std::literals;
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Project files request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Project files for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ CbObjectWriter Response;
+ Response.BeginArray("files"sv);
+
+ FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) {
+ Response.BeginObject();
+ Response << "id"sv << Id;
+ Response << "clientpath"sv << ClientPath;
+ if (!FilterClient)
+ {
+ Response << "serverpath"sv << ServerPath;
+ }
+ Response.EndObject();
+ });
+
+ Response.EndArray();
+ OutPayload = Response.Save();
+ return {HttpResponseCode::OK, {}};
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::GetChunkInfo(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view ChunkId,
+ CbObject& OutPayload)
+{
+ using namespace std::literals;
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+ if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
+ {
+ return {HttpResponseCode::BadRequest,
+ fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)};
+ }
+
+ const Oid Obj = Oid::FromHexString(ChunkId);
+
+ IoBuffer Chunk = FoundLog->FindChunk(Obj);
+ if (!Chunk)
+ {
+ return {HttpResponseCode::NotFound, {}};
+ }
+
+ uint64_t ChunkSize = Chunk.GetSize();
+ if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize);
+ ZEN_ASSERT(IsCompressed);
+ ChunkSize = RawSize;
+ }
+
+ CbObjectWriter Response;
+ Response << "size"sv << ChunkSize;
+ OutPayload = Response.Save();
+ return {HttpResponseCode::OK, {}};
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::GetChunkRange(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view ChunkId,
+ uint64_t Offset,
+ uint64_t Size,
+ ZenContentType AcceptType,
+ IoBuffer& OutChunk)
+{
+ bool IsOffset = Offset != 0 || Size != ~(0ull);
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
+ {
+ return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)};
+ }
+
+ const Oid Obj = Oid::FromHexString(ChunkId);
+
+ IoBuffer Chunk = FoundLog->FindChunk(Obj);
+ if (!Chunk)
+ {
+ return {HttpResponseCode::NotFound, {}};
+ }
+
+ OutChunk = Chunk;
+ HttpContentType ContentType = Chunk.GetContentType();
+
+ if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize);
+ ZEN_ASSERT(!Compressed.IsNull());
+
+ if (IsOffset)
+ {
+ if ((Offset + Size) > RawSize)
+ {
+ Size = RawSize - Offset;
+ }
+
+ if (AcceptType == HttpContentType::kBinary)
+ {
+ OutChunk = Compressed.Decompress(Offset, Size).AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kBinary);
+ }
+ else
+ {
+ // Value will be a range of compressed blocks that covers the requested range
+ // The client will have to compensate for any offsets that do not land on an even block size multiple
+ OutChunk = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kCompressedBinary);
+ }
+ }
+ else
+ {
+ if (AcceptType == HttpContentType::kBinary)
+ {
+ OutChunk = Compressed.Decompress().AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kBinary);
+ }
+ else
+ {
+ OutChunk = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kCompressedBinary);
+ }
+ }
+ }
+ else if (IsOffset)
+ {
+ if ((Offset + Size) > Chunk.GetSize())
+ {
+ Size = Chunk.GetSize() - Offset;
+ }
+ OutChunk = IoBuffer(std::move(Chunk), Offset, Size);
+ OutChunk.SetContentType(ContentType);
+ }
+
+ return {HttpResponseCode::OK, {}};
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::GetChunk(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view Cid,
+ ZenContentType AcceptType,
+ IoBuffer& OutChunk)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ if (Cid.length() != IoHash::StringLength)
+ {
+ return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, Cid)};
+ }
+
+ const IoHash Hash = IoHash::FromHexString(Cid);
+ OutChunk = m_CidStore.FindChunkByCid(Hash);
+
+ if (!OutChunk)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("chunk - '{}' MISSING", Cid)};
+ }
+
+ if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk));
+ OutChunk = Compressed.Decompress().AsIoBuffer();
+ OutChunk.SetContentType(ZenContentType::kBinary);
+ }
+ else
+ {
+ OutChunk.SetContentType(ZenContentType::kCompressedBinary);
+ }
+ return {HttpResponseCode::OK, {}};
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::PutChunk(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view Cid,
+ ZenContentType ContentType,
+ IoBuffer&& Chunk)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ if (Cid.length() != IoHash::StringLength)
+ {
+ return {HttpResponseCode::BadRequest, fmt::format("Chunk put request for invalid chunk hash '{}'", Cid)};
+ }
+
+ const IoHash Hash = IoHash::FromHexString(Cid);
+
+ if (ContentType != HttpContentType::kCompressedBinary)
+ {
+ return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid content type for chunk '{}'", Cid)};
+ }
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
+ if (RawHash != Hash)
+ {
+ return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid payload format for chunk '{}'", Cid)};
+ }
+
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, Hash);
+ return {Result.New ? HttpResponseCode::Created : HttpResponseCode::OK, {}};
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, CbObject& OutResponse)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+
+ if (!Oplog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ CbObject ContainerObject = LoadCompactBinaryObject(Payload);
+ if (!ContainerObject)
+ {
+ return {HttpResponseCode::BadRequest, "Invalid payload format"};
+ }
+
+ CidStore& ChunkStore = m_CidStore;
+ RwLock AttachmentsLock;
+ std::unordered_set<IoHash, IoHash::Hasher> Attachments;
+
+ auto HasAttachment = [&ChunkStore](const IoHash& RawHash) { return ChunkStore.ContainsChunk(RawHash); };
+ auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ if (BlockHash != IoHash::Zero)
+ {
+ Attachments.insert(BlockHash);
+ }
+ else
+ {
+ Attachments.insert(ChunkHashes.begin(), ChunkHashes.end());
+ }
+ };
+ auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ Attachments.insert(RawHash);
+ };
+
+ RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+
+ if (RemoteResult.ErrorCode)
+ {
+ return ConvertResult(RemoteResult);
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+ {
+ for (const IoHash& Hash : Attachments)
+ {
+ ZEN_DEBUG("Need attachment {}", Hash);
+ Cbo << Hash;
+ }
+ }
+ Cbo.EndArray(); // "need"
+
+ OutResponse = Cbo.Save();
+ return {HttpResponseCode::OK, {}};
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::ReadOplog(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const HttpServerRequest::QueryParams& Params,
+ CbObject& OutResponse)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+
+ if (!Oplog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ size_t MaxBlockSize = 128u * 1024u * 1024u;
+ if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<size_t>(Param))
+ {
+ MaxBlockSize = Value.value();
+ }
+ }
+ size_t MaxChunkEmbedSize = 1024u * 1024u;
+ if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<size_t>(Param))
+ {
+ MaxChunkEmbedSize = Value.value();
+ }
+ }
+
+ CidStore& ChunkStore = m_CidStore;
+
+ RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
+ ChunkStore,
+ *Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ false,
+ [](CompressedBuffer&&, const IoHash) {},
+ [](const IoHash&) {},
+ [](const std::unordered_set<IoHash, IoHash::Hasher>) {});
+
+ OutResponse = std::move(ContainerResult.ContainerObject);
+ return ConvertResult(ContainerResult);
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+
+ if (!Oplog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ m_CidStore.AddChunk(Compressed, AttachmentRawHash);
+ ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize());
+ }))
+ {
+ return {HttpResponseCode::BadRequest, "Invalid chunk in block"};
+ }
+
+ return {HttpResponseCode::OK, {}};
+}
+
+void
+ProjectStore::Rpc(HttpServerRequest& HttpReq,
+ const std::string_view ProjectId,
+ const std::string_view OplogId,
+ IoBuffer&& Payload,
+ AuthMgr& AuthManager)
+{
+ using namespace std::literals;
+ HttpContentType PayloadContentType = HttpReq.RequestContentType();
+ CbPackage Package;
+ CbObject Cb;
+ switch (PayloadContentType)
+ {
+ case HttpContentType::kJSON:
+ case HttpContentType::kUnknownContentType:
+ case HttpContentType::kText:
+ {
+ std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
+ Cb = LoadCompactBinaryFromJson(JsonText).AsObject();
+ if (!Cb)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected JSON format");
+ }
+ }
+ break;
+ case HttpContentType::kCbObject:
+ Cb = LoadCompactBinaryObject(Payload);
+ if (!Cb)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected compact binary format");
+ }
+ break;
+ case HttpContentType::kCbPackage:
+ Package = ParsePackageMessage(Payload);
+ Cb = Package.GetObject();
+ if (!Cb)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected package message format");
+ }
+ break;
+ default:
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type");
+ }
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Rpc oplog request for unknown project '{}'", ProjectId));
+ }
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+
+ if (!Oplog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+
+ std::string_view Method = Cb["method"sv].AsString();
+
+ if (Method == "import")
+ {
+ std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ else if (Method == "export")
+ {
+ std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ else if (Method == "getchunks")
+ {
+ CbPackage ResponsePackage;
+ {
+ CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView();
+ CbObjectWriter ResponseWriter;
+ ResponseWriter.BeginArray("chunks"sv);
+ for (CbFieldView FieldView : ChunksArray)
+ {
+ IoHash RawHash = FieldView.AsHash();
+ IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash);
+ if (ChunkBuffer)
+ {
+ ResponseWriter.AddHash(RawHash);
+ ResponsePackage.AddAttachment(
+ CbAttachment(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)), RawHash));
+ }
+ }
+ ResponseWriter.EndArray();
+ ResponsePackage.SetObject(ResponseWriter.Save());
+ }
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else if (Method == "putchunks")
+ {
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ IoHash RawHash = Attachment.GetHash();
+ CompressedBuffer Compressed = Attachment.AsCompressedBinary();
+ m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method));
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
+{
+ using namespace std::literals;
+
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
+ bool Force = Params["force"sv].AsBool(false);
+
+ std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
+ CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize);
+
+ if (RemoteStoreResult.first == nullptr)
+ {
+ return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
+ }
+ std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}",
+ Project.Identifier,
+ Oplog.OplogId(),
+ StoreInfo.Description,
+ NiceBytes(MaxBlockSize),
+ NiceBytes(MaxChunkEmbedSize));
+
+ RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ *RemoteStore,
+ Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ StoreInfo.CreateBlocks,
+ StoreInfo.UseTempBlockFiles,
+ Force);
+
+ return ConvertResult(Result);
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
+{
+ using namespace std::literals;
+
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
+ bool Force = Params["force"sv].AsBool(false);
+
+ std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
+ CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize);
+
+ if (RemoteStoreResult.first == nullptr)
+ {
+ return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
+ }
+ std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
+ RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force);
+ return ConvertResult(Result);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatsService& StatsService, AuthMgr& AuthMgr)
+: m_Log(logging::Get("project"))
+, m_CidStore(Store)
+, m_ProjectStore(Projects)
+, m_StatsService(StatsService)
+, m_AuthMgr(AuthMgr)
+{
+ using namespace std::literals;
+
+ m_StatsService.RegisterHandler("prj", *this);
+
+ m_Router.AddPattern("project", "([[:alnum:]_.]+)");
+ m_Router.AddPattern("log", "([[:alnum:]_.]+)");
+ m_Router.AddPattern("op", "([[:digit:]]+?)");
+ m_Router.AddPattern("chunk", "([[:xdigit:]]{24})");
+ m_Router.AddPattern("hash", "([[:xdigit:]]{40})");
+
+ m_Router.RegisterRoute(
+ "",
+ [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "list",
+ [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/batch",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ // Parse Request
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ BinaryReader Reader(Payload);
+
+ struct RequestHeader
+ {
+ enum
+ {
+ kMagic = 0xAAAA'77AC
+ };
+ uint32_t Magic;
+ uint32_t ChunkCount;
+ uint32_t Reserved1;
+ uint32_t Reserved2;
+ };
+
+ struct RequestChunkEntry
+ {
+ Oid ChunkId;
+ uint32_t CorrelationId;
+ uint64_t Offset;
+ uint64_t RequestBytes;
+ };
+
+ if (Payload.Size() <= sizeof(RequestHeader))
+ {
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ RequestHeader RequestHdr;
+ Reader.Read(&RequestHdr, sizeof RequestHdr);
+
+ if (RequestHdr.Magic != RequestHeader::kMagic)
+ {
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ std::vector<RequestChunkEntry> RequestedChunks;
+ RequestedChunks.resize(RequestHdr.ChunkCount);
+ Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount);
+
+ // Make Response
+
+ struct ResponseHeader
+ {
+ uint32_t Magic = 0xbada'b00f;
+ uint32_t ChunkCount;
+ uint32_t Reserved1 = 0;
+ uint32_t Reserved2 = 0;
+ };
+
+ struct ResponseChunkEntry
+ {
+ uint32_t CorrelationId;
+ uint32_t Flags = 0;
+ uint64_t ChunkSize;
+ };
+
+ std::vector<IoBuffer> OutBlobs;
+ OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry));
+ for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex)
+ {
+ const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex];
+ IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId);
+ if (FoundChunk)
+ {
+ if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1))
+ {
+ uint64_t Offset = RequestedChunk.Offset;
+ if (Offset > FoundChunk.Size())
+ {
+ Offset = FoundChunk.Size();
+ }
+ uint64_t Size = RequestedChunk.RequestBytes;
+ if ((Offset + Size) > FoundChunk.Size())
+ {
+ Size = FoundChunk.Size() - Offset;
+ }
+ FoundChunk = IoBuffer(FoundChunk, Offset, Size);
+ }
+ }
+ OutBlobs.emplace_back(std::move(FoundChunk));
+ }
+ uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData());
+ ResponseHeader ResponseHdr;
+ ResponseHdr.ChunkCount = RequestHdr.ChunkCount;
+ memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr));
+ ResponsePtr += sizeof(ResponseHdr);
+ for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex)
+ {
+ // const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex];
+ const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]);
+ ResponseChunkEntry ResponseChunk;
+ ResponseChunk.CorrelationId = ChunkIndex;
+ if (FoundChunk)
+ {
+ ResponseChunk.ChunkSize = FoundChunk.Size();
+ }
+ else
+ {
+ ResponseChunk.ChunkSize = uint64_t(-1);
+ }
+ memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk));
+ ResponsePtr += sizeof(ResponseChunk);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs);
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/files",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ // File manifest fetch, returns the client file list
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ const bool FilterClient = Params.GetValue("filter"sv) == "client"sv;
+
+ CbObject ResponsePayload;
+ std::pair<HttpResponseCode, std::string> Result =
+ m_ProjectStore->GetProjectFiles(ProjectId, OplogId, FilterClient, ResponsePayload);
+ if (Result.first == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload);
+ }
+ else
+ {
+ ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
+ ToString(HttpReq.RequestVerb()),
+ HttpReq.QueryString(),
+ static_cast<int>(Result.first),
+ Result.second);
+ }
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/{chunk}/info",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ const auto& ChunkId = Req.GetCapture(3);
+
+ CbObject ResponsePayload;
+ std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunkInfo(ProjectId, OplogId, ChunkId, ResponsePayload);
+ if (Result.first == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload);
+ }
+ else if (Result.first == HttpResponseCode::NotFound)
+ {
+ ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
+ }
+ else
+ {
+ ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
+ ToString(HttpReq.RequestVerb()),
+ HttpReq.QueryString(),
+ static_cast<int>(Result.first),
+ Result.second);
+ }
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/{chunk}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ const auto& ChunkId = Req.GetCapture(3);
+
+ uint64_t Offset = 0;
+ uint64_t Size = ~(0ull);
+
+ auto QueryParms = Req.ServerRequest().GetQueryParams();
+
+ if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false)
+ {
+ if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm))
+ {
+ Offset = OffsetVal.value();
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ }
+
+ if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false)
+ {
+ if (auto SizeVal = ParseInt<uint64_t>(SizeParm))
+ {
+ Size = SizeVal.value();
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ }
+
+ HttpContentType AcceptType = HttpReq.AcceptContentType();
+
+ IoBuffer Chunk;
+ std::pair<HttpResponseCode, std::string> Result =
+ m_ProjectStore->GetChunkRange(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk);
+ if (Result.first == HttpResponseCode::OK)
+ {
+ ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Chunk.GetContentType()));
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Chunk.GetContentType(), Chunk);
+ }
+ else if (Result.first == HttpResponseCode::NotFound)
+ {
+ ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
+ }
+ else
+ {
+ ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
+ ToString(HttpReq.RequestVerb()),
+ HttpReq.QueryString(),
+ static_cast<int>(Result.first),
+ Result.second);
+ }
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ },
+ HttpVerb::kGet | HttpVerb::kHead);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/{hash}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ const auto& Cid = Req.GetCapture(3);
+ HttpContentType AcceptType = HttpReq.AcceptContentType();
+ HttpContentType RequestType = HttpReq.RequestContentType();
+
+ switch (Req.ServerRequest().RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ IoBuffer Value;
+ std::pair<HttpResponseCode, std::string> Result =
+ m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value);
+
+ if (Result.first == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value);
+ }
+ else if (Result.first == HttpResponseCode::NotFound)
+ {
+ ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid);
+ }
+ else
+ {
+ ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
+ ToString(HttpReq.RequestVerb()),
+ HttpReq.QueryString(),
+ static_cast<int>(Result.first),
+ Result.second);
+ }
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ case HttpVerb::kPost:
+ {
+ std::pair<HttpResponseCode, std::string> Result =
+ m_ProjectStore->PutChunk(ProjectId, OplogId, Cid, RequestType, HttpReq.ReadPayload());
+ if (Result.first == HttpResponseCode::OK || Result.first == HttpResponseCode::Created)
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ else
+ {
+ ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
+ ToString(HttpReq.RequestVerb()),
+ HttpReq.QueryString(),
+ static_cast<int>(Result.first),
+ Result.second);
+ }
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/prep",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ // This operation takes a list of referenced hashes and decides which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject RequestObject = LoadCompactBinaryObject(Payload);
+
+ std::vector<IoHash> NeedList;
+
+ for (auto Entry : RequestObject["have"sv])
+ {
+ const IoHash FileHash = Entry.AsHash();
+
+ if (!m_CidStore.ContainsChunk(FileHash))
+ {
+ ZEN_DEBUG("prep - NEED: {}", FileHash);
+
+ NeedList.push_back(FileHash);
+ }
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/new",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ bool IsUsingSalt = false;
+ IoHash SaltHash = IoHash::Zero;
+
+ if (std::string_view SaltParam = Params.GetValue("salt"); SaltParam.empty() == false)
+ {
+ const uint32_t Salt = std::stoi(std::string(SaltParam));
+ SaltHash = IoHash::HashBuffer(&Salt, sizeof Salt);
+ IsUsingSalt = true;
+ }
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ ProjectStore::Oplog& Oplog = *FoundLog;
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+
+ // This will attempt to open files which may not exist for the case where
+ // the prep step rejected the chunk. This should be fixed since there's
+ // a performance cost associated with any file system activity
+
+ bool IsValid = true;
+ std::vector<IoHash> MissingChunks;
+
+ CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer {
+ if (m_CidStore.ContainsChunk(Hash))
+ {
+ // Return null attachment as we already have it, no point in reading it and storing it again
+ return {};
+ }
+
+ IoHash AttachmentId;
+ if (IsUsingSalt)
+ {
+ IoHash AttachmentSpec[]{SaltHash, Hash};
+ AttachmentId = IoHash::HashBuffer(MakeMemoryView(AttachmentSpec));
+ }
+ else
+ {
+ AttachmentId = Hash;
+ }
+
+ std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString();
+ if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath))
+ {
+ return SharedBuffer(std::move(Data));
+ }
+ else
+ {
+ IsValid = false;
+ MissingChunks.push_back(Hash);
+
+ return {};
+ }
+ };
+
+ CbPackage Package;
+
+ if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver))
+ {
+ std::filesystem::path BadPackagePath =
+ Oplog.TempPath() / "bad_packages"sv / fmt::format("session{}_request{}"sv, HttpReq.SessionId(), HttpReq.RequestId());
+
+ ZEN_WARN("Received malformed package! Saving payload to '{}'", BadPackagePath);
+
+ WriteFile(BadPackagePath, Payload);
+
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package");
+ }
+
+ if (!IsValid)
+ {
+ // TODO: emit diagnostics identifying missing chunks
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing chunk reference");
+ }
+
+ CbObject Core = Package.GetObject();
+
+ if (!Core["key"sv])
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified");
+ }
+
+ // Write core to oplog
+
+ const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Package);
+
+ if (OpLsn == ProjectStore::Oplog::kInvalidOp)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString());
+
+ HttpReq.WriteResponse(HttpResponseCode::Created);
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/{op}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const std::string& ProjectId = Req.GetCapture(1);
+ const std::string& OplogId = Req.GetCapture(2);
+ const std::string& OpIdString = Req.GetCapture(3);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ ProjectStore::Oplog& Oplog = *FoundLog;
+
+ if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString))
+ {
+ if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value()))
+ {
+ CbObject& Op = MaybeOp.value();
+ if (Req.ServerRequest().AcceptContentType() == ZenContentType::kCbPackage)
+ {
+ CbPackage Package;
+ Package.SetObject(Op);
+
+ Op.IterateAttachments([&](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsAttachment();
+ IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash);
+
+ // We force this for now as content type is not consistently tracked (will
+ // be fixed in CidStore refactor)
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ if (Payload)
+ {
+ switch (Payload.GetContentType())
+ {
+ case ZenContentType::kCbObject:
+ if (CbObject Object = LoadCompactBinaryObject(Payload))
+ {
+ Package.AddAttachment(CbAttachment(Object));
+ }
+ else
+ {
+ // Error - malformed object
+
+ ZEN_WARN("malformed object returned for {}", AttachmentHash);
+ }
+ break;
+
+ case ZenContentType::kCompressedBinary:
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)))
+ {
+ Package.AddAttachment(CbAttachment(Compressed, AttachmentHash));
+ }
+ else
+ {
+ // Error - not compressed!
+
+ ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash);
+ }
+ break;
+
+ default:
+ Package.AddAttachment(CbAttachment(SharedBuffer(Payload)));
+ break;
+ }
+ }
+ });
+
+ return HttpReq.WriteResponse(HttpResponseCode::Accepted, Package);
+ }
+ else
+ {
+ // Client cannot accept a package, so we only send the core object
+ return HttpReq.WriteResponse(HttpResponseCode::Accepted, Op);
+ }
+ }
+ }
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}",
+ [this](HttpRouterRequest& Req) {
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+
+ if (!Project)
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("project {} not found", ProjectId));
+ }
+
+ switch (Req.ServerRequest().RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId);
+
+ if (!OplogIt)
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("oplog {} not found in project {}", OplogId, ProjectId));
+ }
+
+ ProjectStore::Oplog& Log = *OplogIt;
+
+ CbObjectWriter Cb;
+ Cb << "id"sv << Log.OplogId() << "project"sv << Project->Identifier << "tempdir"sv << Log.TempPath().c_str()
+ << "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount"
+ << Log.OplogCount() << "expired"sv << Log.IsExpired();
+
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cb.Save());
+ }
+ break;
+
+ case HttpVerb::kPost:
+ {
+ std::filesystem::path OplogMarkerPath;
+ if (CbObject Params = Req.ServerRequest().ReadPayloadObject())
+ {
+ OplogMarkerPath = Params["gcpath"sv].AsString();
+ }
+
+ ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId);
+
+ if (!OplogIt)
+ {
+ if (!Project->NewOplog(OplogId, OplogMarkerPath))
+ {
+ // TODO: indicate why the operation failed!
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError);
+ }
+
+ ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath);
+
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
+ }
+
+ // I guess this should ultimately be used to execute RPCs but for now, it
+ // does absolutely nothing
+
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
+ }
+ break;
+
+ case HttpVerb::kDelete:
+ {
+ ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId);
+
+ Project->DeleteOplog(OplogId);
+
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::OK);
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kPost | HttpVerb::kGet | HttpVerb::kDelete);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/entries",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ CbObjectWriter Response;
+
+ if (FoundLog->OplogCount() > 0)
+ {
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty())
+ {
+ Oid OpKeyId = OpKeyStringAsOId(OpKey);
+ std::optional<CbObject> Op = FoundLog->GetOpByKey(OpKeyId);
+
+ if (Op.has_value())
+ {
+ Response << "entry"sv << Op.value();
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ }
+ else
+ {
+ Response.BeginArray("entries"sv);
+
+ FoundLog->IterateOplog([&Response](CbObject Op) { Response << Op; });
+
+ Response.EndArray();
+ }
+ }
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}",
+ [this](HttpRouterRequest& Req) {
+ const std::string ProjectId = Req.GetCapture(1);
+
+ switch (Req.ServerRequest().RequestVerb())
+ {
+ case HttpVerb::kPost:
+ {
+ IoBuffer Payload = Req.ServerRequest().ReadPayload();
+ CbObject Params = LoadCompactBinaryObject(Payload);
+ std::string_view Id = Params["id"sv].AsString();
+ std::string_view Root = Params["root"sv].AsString();
+ std::string_view EngineRoot = Params["engine"sv].AsString();
+ std::string_view ProjectRoot = Params["project"sv].AsString();
+ std::string_view ProjectFilePath = Params["projectfile"sv].AsString();
+
+ const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId;
+ m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath);
+
+ ZEN_INFO("established project - {} (id: '{}', roots: '{}', '{}', '{}', '{}'{})",
+ ProjectId,
+ Id,
+ Root,
+ EngineRoot,
+ ProjectRoot,
+ ProjectFilePath,
+ ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : "");
+
+ Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
+ }
+ break;
+
+ case HttpVerb::kGet:
+ {
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+
+ if (!Project)
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("project {} not found", ProjectId));
+ }
+
+ std::vector<std::string> OpLogs = Project->ScanForOplogs();
+
+ CbObjectWriter Response;
+ Response << "id"sv << Project->Identifier;
+ Response << "root"sv << PathToUtf8(Project->RootDir);
+ Response << "engine"sv << PathToUtf8(Project->EngineRootDir);
+ Response << "project"sv << PathToUtf8(Project->ProjectRootDir);
+ Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath);
+
+ Response.BeginArray("oplogs"sv);
+ for (const std::string& OplogId : OpLogs)
+ {
+ Response.BeginObject();
+ Response << "id"sv << OplogId;
+ Response.EndObject();
+ }
+ Response.EndArray(); // oplogs
+
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save());
+ }
+ break;
+
+ case HttpVerb::kDelete:
+ {
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+
+ if (!Project)
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("project {} not found", ProjectId));
+ }
+
+ ZEN_INFO("deleting project '{}'", ProjectId);
+ if (!m_ProjectStore->DeleteProject(ProjectId))
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::Locked,
+ HttpContentType::kText,
+ fmt::format("project {} is in use", ProjectId));
+ }
+
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::NoContent);
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete);
+
+ // Push a oplog container
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/save",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ if (HttpReq.RequestContentType() != HttpContentType::kCbObject)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type");
+ }
+ IoBuffer Payload = Req.ServerRequest().ReadPayload();
+
+ CbObject Response;
+ std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->WriteOplog(ProjectId, OplogId, std::move(Payload), Response);
+ if (Result.first == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
+ }
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ },
+ HttpVerb::kPost);
+
+ // Pull a oplog container
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/load",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ if (HttpReq.AcceptContentType() != HttpContentType::kCbObject)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type");
+ }
+ IoBuffer Payload = Req.ServerRequest().ReadPayload();
+
+ CbObject Response;
+ std::pair<HttpResponseCode, std::string> Result =
+ m_ProjectStore->ReadOplog(ProjectId, OplogId, Req.ServerRequest().GetQueryParams(), Response);
+ if (Result.first == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
+ }
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ },
+ HttpVerb::kGet);
+
+ // Do an rpc style operation on project/oplog
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/rpc",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ IoBuffer Payload = Req.ServerRequest().ReadPayload();
+
+ m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr);
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "details\\$",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+ bool CSV = Params.GetValue("csv") == "true";
+ bool Details = Params.GetValue("details") == "true";
+ bool OpDetails = Params.GetValue("opdetails") == "true";
+ bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true";
+
+ if (CSV)
+ {
+ ExtendableStringBuilder<4096> CSVWriter;
+ CSVHeader(Details, AttachmentDetails, CSVWriter);
+
+ m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) {
+ Project.IterateOplogs([&](ProjectStore::Oplog& Oplog) {
+ Oplog.IterateOplogWithKey(
+ [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](int LSN, const Oid& Key, CbObject Op) {
+ CSVWriteOp(m_CidStore,
+ Project.Identifier,
+ Oplog.OplogId(),
+ Details,
+ AttachmentDetails,
+ LSN,
+ Key,
+ Op,
+ CSVWriter);
+ });
+ });
+ });
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
+ }
+ else
+ {
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("projects");
+ {
+ m_ProjectStore->DiscoverProjects();
+
+ m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) {
+ std::vector<std::string> OpLogs = Project.ScanForOplogs();
+ CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
+ });
+ }
+ Cbo.EndArray();
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "details\\$/{project}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+ bool CSV = Params.GetValue("csv") == "true";
+ bool Details = Params.GetValue("details") == "true";
+ bool OpDetails = Params.GetValue("opdetails") == "true";
+ bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true";
+
+ Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId);
+ if (!FoundProject)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ ProjectStore::Project& Project = *FoundProject.Get();
+ if (CSV)
+ {
+ ExtendableStringBuilder<4096> CSVWriter;
+ CSVHeader(Details, AttachmentDetails, CSVWriter);
+
+ FoundProject->IterateOplogs([&](ProjectStore::Oplog& Oplog) {
+ Oplog.IterateOplogWithKey([this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](int LSN,
+ const Oid& Key,
+ CbObject Op) {
+ CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
+ });
+ });
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
+ }
+ else
+ {
+ CbObjectWriter Cbo;
+ std::vector<std::string> OpLogs = FoundProject->ScanForOplogs();
+ Cbo.BeginArray("projects");
+ {
+ CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
+ }
+ Cbo.EndArray();
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "details\\$/{project}/{log}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+ bool CSV = Params.GetValue("csv") == "true";
+ bool Details = Params.GetValue("details") == "true";
+ bool OpDetails = Params.GetValue("opdetails") == "true";
+ bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true";
+
+ Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId);
+ if (!FoundProject)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ ProjectStore::Project& Project = *FoundProject.Get();
+ ProjectStore::Oplog& Oplog = *FoundLog;
+ if (CSV)
+ {
+ ExtendableStringBuilder<4096> CSVWriter;
+ CSVHeader(Details, AttachmentDetails, CSVWriter);
+
+ Oplog.IterateOplogWithKey(
+ [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](int LSN, const Oid& Key, CbObject Op) {
+ CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
+ });
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
+ }
+ else
+ {
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("oplogs");
+ {
+ CbWriteOplog(m_CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo);
+ }
+ Cbo.EndArray();
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "details\\$/{project}/{log}/{chunk}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ const auto& ChunkId = Req.GetCapture(3);
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+ bool CSV = Params.GetValue("csv") == "true";
+ bool Details = Params.GetValue("details") == "true";
+ bool OpDetails = Params.GetValue("opdetails") == "true";
+ bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true";
+
+ Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId);
+ if (!FoundProject)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
+ {
+ return HttpReq.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId));
+ }
+
+ const Oid ObjId = Oid::FromHexString(ChunkId);
+ ProjectStore::Project& Project = *FoundProject.Get();
+ ProjectStore::Oplog& Oplog = *FoundLog;
+
+ int LSN = Oplog.GetOpIndexByKey(ObjId);
+ if (LSN == -1)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ std::optional<CbObject> Op = Oplog.GetOpByIndex(LSN);
+ if (!Op.has_value())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ if (CSV)
+ {
+ ExtendableStringBuilder<4096> CSVWriter;
+ CSVHeader(Details, AttachmentDetails, CSVWriter);
+
+ CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, ObjId, Op.value(), CSVWriter);
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
+ }
+ else
+ {
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("ops");
+ {
+ CbWriteOp(m_CidStore, Details, OpDetails, AttachmentDetails, LSN, ObjId, Op.value(), Cbo);
+ }
+ Cbo.EndArray();
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+ },
+ HttpVerb::kGet);
+}
+
+HttpProjectService::~HttpProjectService()
+{
+ m_StatsService.UnregisterHandler("prj", *this);
+}
+
+const char*
+HttpProjectService::BaseUri() const
+{
+ return "/prj/";
+}
+
+void
+HttpProjectService::HandleRequest(HttpServerRequest& Request)
+{
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ ZEN_WARN("No route found for {0}", Request.RelativeUri());
+ }
+}
+
+void
+HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq)
+{
+ const GcStorageSize StoreSize = m_ProjectStore->StorageSize();
+ const CidStoreSize CidSize = m_CidStore.TotalSize();
+
+ CbObjectWriter Cbo;
+ Cbo.BeginObject("store");
+ {
+ Cbo.BeginObject("size");
+ {
+ Cbo << "disk" << StoreSize.DiskSize;
+ Cbo << "memory" << StoreSize.MemorySize;
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndObject();
+
+ Cbo.BeginObject("cid");
+ {
+ Cbo.BeginObject("size");
+ {
+ Cbo << "tiny" << CidSize.TinySize;
+ Cbo << "small" << CidSize.SmallSize;
+ Cbo << "large" << CidSize.LargeSize;
+ Cbo << "total" << CidSize.TotalSize;
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndObject();
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+namespace testutils {
+ using namespace std::literals;
+
+ std::string OidAsString(const Oid& Id)
+ {
+ StringBuilder<25> OidStringBuilder;
+ Id.ToString(OidStringBuilder);
+ return OidStringBuilder.ToString();
+ }
+
+ CbPackage CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
+ {
+ CbPackage Package;
+ CbObjectWriter Object;
+ Object << "key"sv << OidAsString(Id);
+ if (!Attachments.empty())
+ {
+ Object.BeginArray("bulkdata");
+ for (const auto& Attachment : Attachments)
+ {
+ CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
+ Object.BeginObject();
+ Object << "id"sv << Attachment.first;
+ Object << "type"sv
+ << "Standard"sv;
+ Object << "data"sv << Attach;
+ Object.EndObject();
+
+ Package.AddAttachment(Attach);
+ }
+ Object.EndArray();
+ }
+ Package.SetObject(Object.Save());
+ return Package;
+ };
+
+ std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes)
+ {
+ std::vector<std::pair<Oid, CompressedBuffer>> Result;
+ Result.reserve(Sizes.size());
+ for (size_t Size : Sizes)
+ {
+ std::vector<uint8_t> Data;
+ Data.resize(Size);
+ uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data());
+ for (size_t Idx = 0; Idx < Size / 2; ++Idx)
+ {
+ DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu);
+ }
+ if (Size & 1)
+ {
+ Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff);
+ }
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
+ Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
+ }
+ return Result;
+ }
+
+ uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset)
+ {
+ if (RawOffset > 0)
+ {
+ uint64 BlockSize = 0;
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ return 0;
+ }
+ return BlockSize > 0 ? RawOffset % BlockSize : 0;
+ }
+ return 0;
+ }
+
+} // namespace testutils
+
+TEST_CASE("project.store.create")
+{
+ using namespace std::literals;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::string_view ProjectName("proj1"sv);
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore";
+ ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+ std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
+ std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
+
+ Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / ProjectName,
+ ProjectName,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+ CHECK(ProjectStore.DeleteProject(ProjectName));
+ CHECK(!Project->Exists(BasePath));
+}
+
+TEST_CASE("project.store.lifetimes")
+{
+ using namespace std::literals;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore";
+ ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+ std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
+ std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
+
+ Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+ ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {});
+ CHECK(Oplog != nullptr);
+
+ std::filesystem::path DeletePath;
+ CHECK(Project->PrepareForDelete(DeletePath));
+ CHECK(!DeletePath.empty());
+ CHECK(Project->OpenOplog("oplog1") == nullptr);
+ // Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers
+ CHECK(Oplog->OplogCount() == 0);
+ // Project is still valid since we have a Ref to it
+ CHECK(Project->Identifier == "proj1"sv);
+}
+
+TEST_CASE("project.store.gc")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore";
+ ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+
+ std::filesystem::path Project1RootDir = TempDir.Path() / "game1";
+ std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject";
+ {
+ CreateDirectories(Project1FilePath.parent_path());
+ BasicFile ProjectFile;
+ ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate);
+ }
+
+ std::filesystem::path Project2RootDir = TempDir.Path() / "game2";
+ std::filesystem::path Project2FilePath = TempDir.Path() / "game2" / "game.uproject";
+ {
+ CreateDirectories(Project2FilePath.parent_path());
+ BasicFile ProjectFile;
+ ProjectFile.Open(Project2FilePath, BasicFile::Mode::kTruncate);
+ }
+
+ {
+ Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ Project1RootDir.string(),
+ Project1FilePath.string()));
+ ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", {});
+ CHECK(Oplog != nullptr);
+
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
+ }
+
+ {
+ Ref<ProjectStore::Project> Project2(ProjectStore.NewProject(BasePath / "proj2"sv,
+ "proj2"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ Project2RootDir.string(),
+ Project2FilePath.string()));
+ ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1", {});
+ CHECK(Oplog != nullptr);
+
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177})));
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9123, 383, 590, 96})));
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221})));
+ }
+
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ ProjectStore.GatherReferences(GcCtx);
+ size_t RefCount = 0;
+ GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; });
+ CHECK(RefCount == 14);
+ ProjectStore.CollectGarbage(GcCtx);
+ CHECK(ProjectStore.OpenProject("proj1"sv));
+ CHECK(ProjectStore.OpenProject("proj2"sv));
+ }
+
+ std::filesystem::remove(Project1FilePath);
+
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
+ ProjectStore.GatherReferences(GcCtx);
+ size_t RefCount = 0;
+ GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; });
+ CHECK(RefCount == 7);
+ ProjectStore.CollectGarbage(GcCtx);
+ CHECK(!ProjectStore.OpenProject("proj1"sv));
+ CHECK(ProjectStore.OpenProject("proj2"sv));
+ }
+}
+
+TEST_CASE("project.store.partial.read")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
+ ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ std::filesystem::path RootDir = TempDir.Path() / "root"sv;
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
+
+ std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv;
+ std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv;
+ {
+ CreateDirectories(Project1FilePath.parent_path());
+ BasicFile ProjectFile;
+ ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate);
+ }
+
+ std::vector<Oid> OpIds;
+ OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()});
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
+ {
+ Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ Project1RootDir.string(),
+ Project1FilePath.string()));
+ ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {});
+ CHECK(Oplog != nullptr);
+ Attachments[OpIds[0]] = {};
+ Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77});
+ Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99});
+ Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122});
+ for (auto It : Attachments)
+ {
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second));
+ }
+ }
+ {
+ IoBuffer Chunk;
+ CHECK(ProjectStore
+ .GetChunk("proj1"sv,
+ "oplog1"sv,
+ Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(),
+ HttpContentType::kCompressedBinary,
+ Chunk)
+ .first == HttpResponseCode::OK);
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
+ CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize());
+ }
+
+ IoBuffer ChunkResult;
+ CHECK(ProjectStore
+ .GetChunkRange("proj1"sv,
+ "oplog1"sv,
+ OidAsString(Attachments[OpIds[2]][1].first),
+ 0,
+ ~0ull,
+ HttpContentType::kCompressedBinary,
+ ChunkResult)
+ .first == HttpResponseCode::OK);
+ CHECK(ChunkResult);
+ CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult)).DecodeRawSize() ==
+ Attachments[OpIds[2]][1].second.DecodeRawSize());
+
+ IoBuffer PartialChunkResult;
+ CHECK(ProjectStore
+ .GetChunkRange("proj1"sv,
+ "oplog1"sv,
+ OidAsString(Attachments[OpIds[2]][1].first),
+ 5,
+ 1773,
+ HttpContentType::kCompressedBinary,
+ PartialChunkResult)
+ .first == HttpResponseCode::OK);
+ CHECK(PartialChunkResult);
+ IoHash PartialRawHash;
+ uint64_t PartialRawSize;
+ CompressedBuffer PartialCompressedResult =
+ CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult), PartialRawHash, PartialRawSize);
+ CHECK(PartialRawSize >= 1773);
+
+ uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5);
+ SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed);
+ SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress();
+ const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]);
+ const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData());
+ CHECK(FullDataPtr[0] == PartialDataPtr[0]);
+}
+
+TEST_CASE("project.store.block")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489,
+ 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251,
+ 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225});
+
+ std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes);
+ std::vector<SharedBuffer> Chunks;
+ Chunks.reserve(AttachmentSizes.size());
+ for (const auto& It : AttachmentsWithId)
+ {
+ Chunks.push_back(It.second.GetCompressed().Flatten());
+ }
+ CompressedBuffer Block = GenerateBlock(std::move(Chunks));
+ IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer();
+ CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {}));
+}
+
+#endif
+
+void
+prj_forcelink()
+{
+}
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
new file mode 100644
index 000000000..e4f664b85
--- /dev/null
+++ b/src/zenserver/projectstore/projectstore.h
@@ -0,0 +1,372 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/uid.h>
+#include <zencore/xxhash.h>
+#include <zenhttp/httpserver.h>
+#include <zenstore/gc.h>
+
+#include "monitoring/httpstats.h"
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+class CbPackage;
+class CidStore;
+class AuthMgr;
+class ScrubContext;
+
+struct OplogEntry
+{
+ uint32_t OpLsn;
+ uint32_t OpCoreOffset; // note: Multiple of alignment!
+ uint32_t OpCoreSize;
+ uint32_t OpCoreHash; // Used as checksum
+ XXH3_128 OpKeyHash; // XXH128_canonical_t
+
+ inline Oid OpKeyAsOId() const
+ {
+ Oid Id;
+ memcpy(Id.OidBits, &OpKeyHash, sizeof Id.OidBits);
+ return Id;
+ }
+};
+
+struct OplogEntryAddress
+{
+ uint64_t Offset;
+ uint64_t Size;
+};
+
+static_assert(IsPow2(sizeof(OplogEntry)));
+
+/** Project Store
+
+ A project store consists of a number of Projects.
+
+ Each project contains a number of oplogs (short for "operation log"). UE uses
+ one oplog per target platform to store the output of the cook process.
+
+ An oplog consists of a sequence of "op" entries. Each entry is a structured object
+ containing references to attachments. Attachments are typically the serialized
+ package data split into separate chunks for bulk data, exports and header
+ information.
+ */
+class ProjectStore : public RefCounted, public GcStorage, public GcContributor
+{
+ struct OplogStorage;
+
+public:
+ ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc);
+ ~ProjectStore();
+
+ struct Project;
+
+ struct Oplog
+ {
+ Oplog(std::string_view Id,
+ Project* Project,
+ CidStore& Store,
+ std::filesystem::path BasePath,
+ const std::filesystem::path& MarkerPath);
+ ~Oplog();
+
+ [[nodiscard]] static bool ExistsAt(std::filesystem::path BasePath);
+
+ void Read();
+ void Write();
+
+ void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn);
+ void IterateOplog(std::function<void(CbObject)>&& Fn);
+ void IterateOplogWithKey(std::function<void(int, const Oid&, CbObject)>&& Fn);
+ std::optional<CbObject> GetOpByKey(const Oid& Key);
+ std::optional<CbObject> GetOpByIndex(int Index);
+ int GetOpIndexByKey(const Oid& Key);
+
+ IoBuffer FindChunk(Oid ChunkId);
+
+ inline static const uint32_t kInvalidOp = ~0u;
+
+ /** Persist a new oplog entry
+ *
+ * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected
+ */
+ uint32_t AppendNewOplogEntry(CbPackage Op);
+
+ uint32_t AppendNewOplogEntry(CbObject Core);
+
+ enum UpdateType
+ {
+ kUpdateNewEntry,
+ kUpdateReplay
+ };
+
+ const std::string& OplogId() const { return m_OplogId; }
+
+ const std::filesystem::path& TempPath() const { return m_TempPath; }
+ const std::filesystem::path& MarkerPath() const { return m_MarkerPath; }
+
+ spdlog::logger& Log() { return m_OuterProject->Log(); }
+ void Flush();
+ void Scrub(ScrubContext& Ctx) const;
+ void GatherReferences(GcContext& GcCtx);
+ uint64_t TotalSize() const;
+
+ std::size_t OplogCount() const
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ return m_LatestOpMap.size();
+ }
+
+ bool IsExpired() const;
+ std::filesystem::path PrepareForDelete(bool MoveFolder);
+
+ private:
+ struct FileMapEntry
+ {
+ std::string ServerPath;
+ std::string ClientPath;
+ };
+
+ template<class V>
+ using OidMap = tsl::robin_map<Oid, V, Oid::Hasher>;
+
+ Project* m_OuterProject = nullptr;
+ CidStore& m_CidStore;
+ std::filesystem::path m_BasePath;
+ std::filesystem::path m_MarkerPath;
+ std::filesystem::path m_TempPath;
+
+ mutable RwLock m_OplogLock;
+ OidMap<IoHash> m_ChunkMap; // output data chunk id -> CAS address
+ OidMap<IoHash> m_MetaMap; // meta chunk id -> CAS address
+ OidMap<FileMapEntry> m_FileMap; // file id -> file map entry
+ int32_t m_ManifestVersion; // File system manifest version
+ tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file
+ OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key
+
+ RefPtr<OplogStorage> m_Storage;
+ std::string m_OplogId;
+
+ /** Scan oplog and register each entry, thus updating the in-memory tracking tables
+ */
+ void ReplayLog();
+
+ struct OplogEntryMapping
+ {
+ struct Mapping
+ {
+ Oid Id;
+ IoHash Hash;
+ };
+ struct FileMapping : public Mapping
+ {
+ std::string ServerPath;
+ std::string ClientPath;
+ };
+ std::vector<Mapping> Chunks;
+ std::vector<Mapping> Meta;
+ std::vector<FileMapping> Files;
+ };
+
+ OplogEntryMapping GetMapping(CbObject Core);
+
+ /** Update tracking metadata for a new oplog entry
+ *
+ * This is used during replay (and gets called as part of new op append)
+ *
+ * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected
+ */
+ uint32_t RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
+ const OplogEntryMapping& OpMapping,
+ const OplogEntry& OpEntry,
+ UpdateType TypeOfUpdate);
+
+ void AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock,
+ Oid FileId,
+ IoHash Hash,
+ std::string_view ServerPath,
+ std::string_view ClientPath);
+ void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash);
+ void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash);
+ };
+
+ struct Project : public RefCounted
+ {
+ std::string Identifier;
+ std::filesystem::path RootDir;
+ std::string EngineRootDir;
+ std::string ProjectRootDir;
+ std::string ProjectFilePath;
+
+ Oplog* NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath);
+ Oplog* OpenOplog(std::string_view OplogId);
+ void DeleteOplog(std::string_view OplogId);
+ void IterateOplogs(std::function<void(const Oplog&)>&& Fn) const;
+ void IterateOplogs(std::function<void(Oplog&)>&& Fn);
+ std::vector<std::string> ScanForOplogs() const;
+ bool IsExpired() const;
+
+ Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath);
+ virtual ~Project();
+
+ void Read();
+ void Write();
+ [[nodiscard]] static bool Exists(std::filesystem::path BasePath);
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+ spdlog::logger& Log();
+ void GatherReferences(GcContext& GcCtx);
+ uint64_t TotalSize() const;
+ bool PrepareForDelete(std::filesystem::path& OutDeletePath);
+
+ private:
+ ProjectStore* m_ProjectStore;
+ CidStore& m_CidStore;
+ mutable RwLock m_ProjectLock;
+ std::map<std::string, std::unique_ptr<Oplog>> m_Oplogs;
+ std::vector<std::unique_ptr<Oplog>> m_DeletedOplogs;
+ std::filesystem::path m_OplogStoragePath;
+
+ std::filesystem::path BasePathForOplog(std::string_view OplogId);
+ };
+
+ // Oplog* OpenProjectOplog(std::string_view ProjectId, std::string_view OplogId);
+
+ Ref<Project> OpenProject(std::string_view ProjectId);
+ Ref<Project> NewProject(std::filesystem::path BasePath,
+ std::string_view ProjectId,
+ std::string_view RootDir,
+ std::string_view EngineRootDir,
+ std::string_view ProjectRootDir,
+ std::string_view ProjectFilePath);
+ bool DeleteProject(std::string_view ProjectId);
+ bool Exists(std::string_view ProjectId);
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+ void DiscoverProjects();
+ void IterateProjects(std::function<void(Project& Prj)>&& Fn);
+
+ spdlog::logger& Log() { return m_Log; }
+ const std::filesystem::path& BasePath() const { return m_ProjectBasePath; }
+
+ virtual void GatherReferences(GcContext& GcCtx) override;
+ virtual void CollectGarbage(GcContext& GcCtx) override;
+ virtual GcStorageSize StorageSize() const override;
+
+ CbArray GetProjectsList();
+ std::pair<HttpResponseCode, std::string> GetProjectFiles(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ bool FilterClient,
+ CbObject& OutPayload);
+ std::pair<HttpResponseCode, std::string> GetChunkInfo(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view ChunkId,
+ CbObject& OutPayload);
+ std::pair<HttpResponseCode, std::string> GetChunkRange(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view ChunkId,
+ uint64_t Offset,
+ uint64_t Size,
+ ZenContentType AcceptType,
+ IoBuffer& OutChunk);
+ std::pair<HttpResponseCode, std::string> GetChunk(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view Cid,
+ ZenContentType AcceptType,
+ IoBuffer& OutChunk);
+
+ std::pair<HttpResponseCode, std::string> PutChunk(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view Cid,
+ ZenContentType ContentType,
+ IoBuffer&& Chunk);
+
+ std::pair<HttpResponseCode, std::string> WriteOplog(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ IoBuffer&& Payload,
+ CbObject& OutResponse);
+
+ std::pair<HttpResponseCode, std::string> ReadOplog(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const HttpServerRequest::QueryParams& Params,
+ CbObject& OutResponse);
+
+ std::pair<HttpResponseCode, std::string> WriteBlock(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ IoBuffer&& Payload);
+
+ void Rpc(HttpServerRequest& HttpReq,
+ const std::string_view ProjectId,
+ const std::string_view OplogId,
+ IoBuffer&& Payload,
+ AuthMgr& AuthManager);
+
+ std::pair<HttpResponseCode, std::string> Export(ProjectStore::Project& Project,
+ ProjectStore::Oplog& Oplog,
+ CbObjectView&& Params,
+ AuthMgr& AuthManager);
+
+ std::pair<HttpResponseCode, std::string> Import(ProjectStore::Project& Project,
+ ProjectStore::Oplog& Oplog,
+ CbObjectView&& Params,
+ AuthMgr& AuthManager);
+
+private:
+ spdlog::logger& m_Log;
+ CidStore& m_CidStore;
+ std::filesystem::path m_ProjectBasePath;
+ mutable RwLock m_ProjectsLock;
+ std::map<std::string, Ref<Project>> m_Projects;
+
+ std::filesystem::path BasePathForProject(std::string_view ProjectId);
+};
+
+//////////////////////////////////////////////////////////////////////////
+//
+// {project} a project identifier
+// {target} a variation of the project, typically a build target
+// {lsn} oplog entry sequence number
+//
+// /prj/{project}
+// /prj/{project}/oplog/{target}
+// /prj/{project}/oplog/{target}/{lsn}
+//
+// oplog entry
+//
+// id: {id}
+// key: {}
+// meta: {}
+// data: []
+// refs:
+//
+
+class HttpProjectService : public HttpService, public IHttpStatsProvider
+{
+public:
+ HttpProjectService(CidStore& Store, ProjectStore* InProjectStore, HttpStatsService& StatsService, AuthMgr& AuthMgr);
+ ~HttpProjectService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+
+private:
+ inline spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
+ CidStore& m_CidStore;
+ HttpRequestRouter m_Router;
+ Ref<ProjectStore> m_ProjectStore;
+ HttpStatsService& m_StatsService;
+ AuthMgr& m_AuthMgr;
+};
+
+void prj_forcelink();
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
new file mode 100644
index 000000000..1e6ca51a1
--- /dev/null
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -0,0 +1,1036 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "remoteprojectstore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compress.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
+#include <zencore/timer.h>
+#include <zencore/workthreadpool.h>
+#include <zenstore/cidstore.h>
+
+namespace zen {
+
+/*
+ OplogContainer
+ Binary("ops") // Compressed CompactBinary object to hide attachment references, also makes the oplog smaller
+ {
+ CbArray("ops")
+ {
+ CbObject Op
+ (CbFieldType::BinaryAttachment Attachments[])
+ (OpData)
+ }
+ }
+ CbArray("blocks")
+ CbObject
+ CbFieldType::BinaryAttachment "rawhash" // Optional, only if we are creating blocks (Jupiter/File)
+ CbArray("chunks")
+ CbFieldType::Hash // Chunk hashes
+ CbArray("chunks") // Optional, only if we are not creating blocks (Zen)
+ CbFieldType::BinaryAttachment // Chunk attachment hashes
+
+ CompressedBinary ChunkBlock
+ {
+ VarUInt ChunkCount
+ VarUInt ChunkSizes[ChunkCount]
+ uint8_t[chunksize])[ChunkCount]
+ }
+*/
+
+////////////////////////////// AsyncRemoteResult
+
+struct AsyncRemoteResult
+{
+ void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText)
+ {
+ int32_t Expected = 0;
+ if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1))
+ {
+ m_ErrorReason = ErrorReason;
+ m_ErrorText = ErrorText;
+ }
+ }
+ bool IsError() const { return m_ErrorCode.load() != 0; }
+ int GetError() const { return m_ErrorCode.load(); };
+ const std::string& GetErrorReason() const { return m_ErrorReason; };
+ const std::string& GetErrorText() const { return m_ErrorText; };
+ RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const
+ {
+ return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText};
+ }
+
+private:
+ std::atomic<int32_t> m_ErrorCode = 0;
+ std::string m_ErrorReason;
+ std::string m_ErrorText;
+};
+
+bool
+IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
+{
+ IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer();
+
+ MemoryView BlockView = BlockPayload.GetView();
+ const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
+ uint32_t NumberSize;
+ uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize);
+ ReadPtr += NumberSize;
+ std::vector<uint64_t> ChunkSizes;
+ ChunkSizes.reserve(ChunkCount);
+ while (ChunkCount--)
+ {
+ ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize));
+ ReadPtr += NumberSize;
+ }
+ ptrdiff_t TempBufferLength = std::distance(reinterpret_cast<const uint8_t*>(BlockView.GetData()), ReadPtr);
+ ZEN_ASSERT(TempBufferLength > 0);
+ for (uint64_t ChunkSize : ChunkSizes)
+ {
+ IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize);
+ IoHash AttachmentRawHash;
+ uint64_t AttachmentRawSize;
+ CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize);
+
+ if (!CompressedChunk)
+ {
+ ZEN_ERROR("Invalid chunk in block");
+ return false;
+ }
+ Visitor(std::move(CompressedChunk), AttachmentRawHash);
+ ReadPtr += ChunkSize;
+ ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd());
+ }
+ return true;
+};
+
+CompressedBuffer
+GenerateBlock(std::vector<SharedBuffer>&& Chunks)
+{
+ size_t ChunkCount = Chunks.size();
+ SharedBuffer SizeBuffer;
+ {
+ IoBuffer TempBuffer(ChunkCount * 9);
+ MutableMemoryView View = TempBuffer.GetMutableView();
+ uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData());
+ uint8_t* BufferEndPtr = BufferStartPtr;
+ BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr);
+ auto It = Chunks.begin();
+ while (It != Chunks.end())
+ {
+ BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(It->GetSize()), BufferEndPtr);
+ It++;
+ }
+ ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd());
+ ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr);
+ SizeBuffer = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength)));
+ }
+ CompositeBuffer AllBuffers(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks)));
+
+ CompressedBuffer CompressedBlock =
+ CompressedBuffer::Compress(std::move(AllBuffers), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+
+ return CompressedBlock;
+}
+
+struct Block
+{
+ IoHash BlockHash;
+ std::vector<IoHash> ChunksInBlock;
+};
+
+void
+CreateBlock(WorkerThreadPool& WorkerPool,
+ Latch& OpSectionsLatch,
+ std::vector<SharedBuffer>&& ChunksInBlock,
+ RwLock& SectionsLock,
+ std::vector<Block>& Blocks,
+ size_t BlockIndex,
+ const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ AsyncRemoteResult& RemoteResult)
+{
+ OpSectionsLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable {
+ auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ if (!Chunks.empty())
+ {
+ CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ AsyncOnBlock(std::move(CompressedBlock), BlockHash);
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope __(SectionsLock);
+ Blocks[BlockIndex].BlockHash = BlockHash;
+ }
+ }
+ });
+}
+
+size_t
+AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks)
+{
+ size_t BlockIndex;
+ {
+ RwLock::ExclusiveLockScope _(BlocksLock);
+ BlockIndex = Blocks.size();
+ Blocks.resize(BlockIndex + 1);
+ }
+ return BlockIndex;
+}
+
+CbObject
+BuildContainer(CidStore& ChunkStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ WorkerThreadPool& WorkerPool,
+ const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
+ AsyncRemoteResult& RemoteResult)
+{
+ using namespace std::literals;
+
+ std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
+ CbObjectWriter SectionOpsWriter;
+ SectionOpsWriter.BeginArray("ops"sv);
+
+ size_t OpCount = 0;
+
+ CbObject OplogContainerObject;
+ {
+ RwLock BlocksLock;
+ std::vector<Block> Blocks;
+ CompressedBuffer OpsBuffer;
+
+ Latch BlockCreateLatch(1);
+
+ std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
+
+ size_t BlockSize = 0;
+ std::vector<SharedBuffer> ChunksInBlock;
+
+ std::unordered_set<IoHash, IoHash::Hasher> Attachments;
+ Oplog.IterateOplog([&Attachments, &SectionOpsWriter, &OpCount](CbObject Op) {
+ Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert(FieldView.AsAttachment()); });
+ (SectionOpsWriter) << Op;
+ OpCount++;
+ });
+
+ for (const IoHash& AttachmentHash : Attachments)
+ {
+ IoBuffer Payload = ChunkStore.FindChunkByCid(AttachmentHash);
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {} for op", AttachmentHash),
+ {});
+ ZEN_ERROR("Failed to build container ({}). Reason: '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason());
+ return {};
+ }
+ uint64_t PayloadSize = Payload.GetSize();
+ if (PayloadSize > MaxChunkEmbedSize)
+ {
+ if (LargeChunkHashes.insert(AttachmentHash).second)
+ {
+ OnLargeAttachment(AttachmentHash);
+ }
+ continue;
+ }
+
+ if (!BlockAttachmentHashes.insert(AttachmentHash).second)
+ {
+ continue;
+ }
+
+ BlockSize += PayloadSize;
+ if (BuildBlocks)
+ {
+ ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload)));
+ }
+ else
+ {
+ Payload = {};
+ }
+
+ if (BlockSize >= MaxBlockSize)
+ {
+ size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ if (BuildBlocks)
+ {
+ CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ }
+ else
+ {
+ OnBlockChunks(BlockAttachmentHashes);
+ }
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
+ }
+ BlockAttachmentHashes.clear();
+ ChunksInBlock.clear();
+ BlockSize = 0;
+ }
+ }
+ if (BlockSize > 0)
+ {
+ size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ if (BuildBlocks)
+ {
+ CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ }
+ else
+ {
+ OnBlockChunks(BlockAttachmentHashes);
+ }
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
+ }
+ BlockAttachmentHashes.clear();
+ ChunksInBlock.clear();
+ BlockSize = 0;
+ }
+ SectionOpsWriter.EndArray(); // "ops"
+
+ CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer());
+ ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize()));
+
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining());
+ }
+
+ if (!RemoteResult.IsError())
+ {
+ CbObjectWriter OplogContinerWriter;
+ RwLock::SharedLockScope _(BlocksLock);
+ OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer());
+
+ OplogContinerWriter.BeginArray("blocks"sv);
+ {
+ for (const Block& B : Blocks)
+ {
+ ZEN_ASSERT(!B.ChunksInBlock.empty());
+ if (BuildBlocks)
+ {
+ ZEN_ASSERT(B.BlockHash != IoHash::Zero);
+
+ OplogContinerWriter.BeginObject();
+ {
+ OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash);
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& RawHash : B.ChunksInBlock)
+ {
+ OplogContinerWriter.AddHash(RawHash);
+ }
+ }
+ OplogContinerWriter.EndArray(); // "chunks"
+ }
+ OplogContinerWriter.EndObject();
+ continue;
+ }
+
+ ZEN_ASSERT(B.BlockHash == IoHash::Zero);
+ OplogContinerWriter.BeginObject();
+ {
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& RawHash : B.ChunksInBlock)
+ {
+ OplogContinerWriter.AddBinaryAttachment(RawHash);
+ }
+ }
+ OplogContinerWriter.EndArray();
+ }
+ OplogContinerWriter.EndObject();
+ }
+ }
+ OplogContinerWriter.EndArray(); // "blocks"sv
+
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& AttachmentHash : LargeChunkHashes)
+ {
+ OplogContinerWriter.AddBinaryAttachment(AttachmentHash);
+ }
+ }
+ OplogContinerWriter.EndArray(); // "chunks"
+
+ OplogContainerObject = OplogContinerWriter.Save();
+ }
+ }
+ return OplogContainerObject;
+}
+
+RemoteProjectStore::LoadContainerResult
+BuildContainer(CidStore& ChunkStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks)
+{
+ // We are creating a worker thread pool here since we are uploading a lot of attachments in one go and we dont want to keep a
+ // WorkerThreadPool alive
+ size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
+ WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
+
+ AsyncRemoteResult RemoteResult;
+ CbObject ContainerObject = BuildContainer(ChunkStore,
+ Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ BuildBlocks,
+ WorkerPool,
+ AsyncOnBlock,
+ OnLargeAttachment,
+ OnBlockChunks,
+ RemoteResult);
+ return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
+}
+
+RemoteProjectStore::Result
+SaveOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ bool UseTempBlocks,
+ bool ForceUpload)
+{
+ using namespace std::literals;
+
+ Stopwatch Timer;
+
+ // We are creating a worker thread pool here since we are uploading a lot of attachments in one go
+ // Doing upload is a rare and transient occation so we don't want to keep a WorkerThreadPool alive.
+ size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
+ WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
+
+ std::filesystem::path AttachmentTempPath;
+ if (UseTempBlocks)
+ {
+ AttachmentTempPath = Oplog.TempPath();
+ AttachmentTempPath.append(".pending");
+ CreateDirectories(AttachmentTempPath);
+ }
+
+ AsyncRemoteResult RemoteResult;
+ RwLock AttachmentsLock;
+ std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks;
+
+ auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
+ const IoHash& BlockHash) {
+ std::filesystem::path BlockPath = AttachmentTempPath;
+ BlockPath.append(BlockHash.ToHexString());
+ if (!std::filesystem::exists(BlockPath))
+ {
+ IoBuffer BlockBuffer;
+ try
+ {
+ BasicFile BlockFile;
+ BlockFile.Open(BlockPath, BasicFile::Mode::kTruncateDelete);
+ uint64_t Offset = 0;
+ for (const SharedBuffer& Buffer : CompressedBlock.GetCompressed().GetSegments())
+ {
+ BlockFile.Write(Buffer.GetView(), Offset);
+ Offset += Buffer.GetSize();
+ }
+ void* FileHandle = BlockFile.Detach();
+ BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset);
+ }
+ catch (std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ Ex.what(),
+ "Unable to create temp block file");
+ return;
+ }
+
+ BlockBuffer.MarkAsDeleteOnClose();
+ {
+ RwLock::ExclusiveLockScope __(AttachmentsLock);
+ CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)});
+ }
+ ZEN_DEBUG("Saved temp block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
+ }
+ };
+
+ auto UploadBlock = [&RemoteStore, &RemoteResult](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) {
+ RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError());
+ return;
+ }
+ ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
+ };
+
+ std::vector<std::vector<IoHash>> BlockChunks;
+ auto OnBlockChunks = [&BlockChunks](const std::unordered_set<IoHash, IoHash::Hasher>& Chunks) {
+ BlockChunks.push_back({Chunks.begin(), Chunks.end()});
+ ZEN_DEBUG("Found {} block chunks", Chunks.size());
+ };
+
+ auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments](const IoHash& AttachmentHash) {
+ {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ LargeAttachments.insert(AttachmentHash);
+ }
+ ZEN_DEBUG("Found attachment {}", AttachmentHash);
+ };
+
+ std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock;
+ if (UseTempBlocks)
+ {
+ OnBlock = MakeTempBlock;
+ }
+ else
+ {
+ OnBlock = UploadBlock;
+ }
+
+ CbObject OplogContainerObject = BuildContainer(ChunkStore,
+ Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ BuildBlocks,
+ WorkerPool,
+ OnBlock,
+ OnLargeAttachment,
+ OnBlockChunks,
+ RemoteResult);
+
+ if (!RemoteResult.IsError())
+ {
+ uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num();
+ uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num();
+ ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount);
+ RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer());
+ if (ContainerSaveResult.ErrorCode)
+ {
+ RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container");
+ ZEN_ERROR("Failed to save oplog container ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError());
+ }
+ ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000)));
+ if (!ContainerSaveResult.Needs.empty())
+ {
+ ZEN_INFO("Filtering needed attachments...");
+ std::vector<IoHash> NeededLargeAttachments;
+ std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments;
+ NeededLargeAttachments.reserve(LargeAttachments.size());
+ NeededOtherAttachments.reserve(CreatedBlocks.size());
+ if (ForceUpload)
+ {
+ NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end());
+ }
+ else
+ {
+ for (const IoHash& RawHash : ContainerSaveResult.Needs)
+ {
+ if (LargeAttachments.contains(RawHash))
+ {
+ NeededLargeAttachments.push_back(RawHash);
+ continue;
+ }
+ NeededOtherAttachments.insert(RawHash);
+ }
+ }
+
+ Latch SaveAttachmentsLatch(1);
+ if (!NeededLargeAttachments.empty())
+ {
+ ZEN_INFO("Saving large attachments...");
+ for (const IoHash& RawHash : NeededLargeAttachments)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ SaveAttachmentsLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ IoBuffer Payload;
+ if (auto It = CreatedBlocks.find(RawHash); It != CreatedBlocks.end())
+ {
+ Payload = std::move(It->second);
+ }
+ else
+ {
+ Payload = ChunkStore.FindChunkByCid(RawHash);
+ }
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {}", RawHash),
+ {});
+ ZEN_ERROR("Failed to build container ({}). Reason: '{}'",
+ RemoteResult.GetErrorReason(),
+ RemoteResult.GetError());
+ return;
+ }
+
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return;
+ });
+ }
+ }
+
+ if (!CreatedBlocks.empty())
+ {
+ ZEN_INFO("Saving created block attachments...");
+ for (auto& It : CreatedBlocks)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ const IoHash& RawHash = It.first;
+ if (ForceUpload || NeededOtherAttachments.contains(RawHash))
+ {
+ IoBuffer Payload = It.second;
+ ZEN_ASSERT(Payload);
+ SaveAttachmentsLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return;
+ });
+ }
+ It.second = {};
+ }
+ }
+
+ if (!BlockChunks.empty())
+ {
+ ZEN_INFO("Saving chunk block attachments...");
+ for (const std::vector<IoHash>& Chunks : BlockChunks)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ std::vector<IoHash> NeededChunks;
+ if (ForceUpload)
+ {
+ NeededChunks = Chunks;
+ }
+ else
+ {
+ NeededChunks.reserve(Chunks.size());
+ for (const IoHash& Chunk : Chunks)
+ {
+ if (NeededOtherAttachments.contains(Chunk))
+ {
+ NeededChunks.push_back(Chunk);
+ }
+ }
+ if (NeededChunks.empty())
+ {
+ continue;
+ }
+ }
+ SaveAttachmentsLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&RemoteStore,
+ &ChunkStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ &Chunks,
+ NeededChunks = std::move(NeededChunks),
+ ForceUpload]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ std::vector<SharedBuffer> ChunkBuffers;
+ ChunkBuffers.reserve(NeededChunks.size());
+ for (const IoHash& Chunk : NeededChunks)
+ {
+ IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk);
+ if (!ChunkPayload)
+ {
+ RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
+ fmt::format("Missing chunk {}"sv, Chunk),
+ fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
+ ChunkBuffers.clear();
+ break;
+ }
+ ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload)));
+ }
+ RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachments with {} chunks ({}). Reason: '{}'",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Saved {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ });
+ }
+ }
+ SaveAttachmentsLatch.CountDown();
+ while (!SaveAttachmentsLatch.Wait(1000))
+ {
+ ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining());
+ }
+ SaveAttachmentsLatch.Wait();
+ }
+
+ if (!RemoteResult.IsError())
+ {
+ ZEN_INFO("Finalizing oplog container...");
+ RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash);
+ if (ContainerFinalizeResult.ErrorCode)
+ {
+ RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text);
+ ZEN_ERROR("Failed to finalize oplog container {} ({}). Reason: '{}'",
+ ContainerSaveResult.RawHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ }
+ ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000)));
+ }
+ }
+
+ RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ ZEN_INFO("Saved oplog {} in {}",
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return Result;
+};
+
+RemoteProjectStore::Result
+SaveOplogContainer(ProjectStore::Oplog& Oplog,
+ const CbObject& ContainerObject,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment)
+{
+ using namespace std::literals;
+
+ Stopwatch Timer;
+
+ CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
+ for (CbFieldView LargeChunksField : LargeChunksArray)
+ {
+ IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
+ if (HasAttachment(AttachmentHash))
+ {
+ continue;
+ }
+ OnNeedAttachment(AttachmentHash);
+ };
+
+ CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
+ for (CbFieldView BlockField : BlocksArray)
+ {
+ CbObjectView BlockView = BlockField.AsObjectView();
+ IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
+
+ CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
+ if (BlockHash == IoHash::Zero)
+ {
+ std::vector<IoHash> NeededChunks;
+ NeededChunks.reserve(ChunksArray.GetSize());
+ for (CbFieldView ChunkField : ChunksArray)
+ {
+ IoHash ChunkHash = ChunkField.AsBinaryAttachment();
+ if (HasAttachment(ChunkHash))
+ {
+ continue;
+ }
+ NeededChunks.emplace_back(ChunkHash);
+ }
+
+ if (!NeededChunks.empty())
+ {
+ OnNeedBlock(IoHash::Zero, std::move(NeededChunks));
+ }
+ continue;
+ }
+
+ for (CbFieldView ChunkField : ChunksArray)
+ {
+ IoHash ChunkHash = ChunkField.AsHash();
+ if (HasAttachment(ChunkHash))
+ {
+ continue;
+ }
+
+ OnNeedBlock(BlockHash, {});
+ break;
+ }
+ };
+
+ MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
+ IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
+ IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
+
+ CbObject SectionObject = LoadCompactBinaryObject(SectionPayload);
+ if (!SectionObject)
+ {
+ ZEN_ERROR("Failed to save oplog container. Reason: '{}'", "Section has unexpected data type");
+ return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
+ Timer.GetElapsedTimeMs() / 1000.500,
+ "Section has unexpected data type",
+ "Failed to save oplog container"};
+ }
+
+ CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ for (CbFieldView OpEntry : OpsArray)
+ {
+ CbObjectView Core = OpEntry.AsObjectView();
+ BinaryWriter Writer;
+ Core.CopyTo(Writer);
+ MemoryView OpView = Writer.GetView();
+ IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize());
+ CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType);
+ const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Op);
+ if (OpLsn == ProjectStore::Oplog::kInvalidOp)
+ {
+ return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
+ Timer.GetElapsedTimeMs() / 1000.500,
+ "Failed saving op",
+ "Failed to save oplog container"};
+ }
+ ZEN_DEBUG("oplog entry #{}", OpLsn);
+ }
+ return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500};
+}
+
+RemoteProjectStore::Result
+LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload)
+{
+ using namespace std::literals;
+
+ Stopwatch Timer;
+
+ // We are creating a worker thread pool here since we are download a lot of attachments in one go and we dont want to keep a
+ // WorkerThreadPool alive
+ size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
+ WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
+
+ std::unordered_set<IoHash, IoHash::Hasher> Attachments;
+ std::vector<std::vector<IoHash>> ChunksInBlocks;
+
+ RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer();
+ if (LoadContainerResult.ErrorCode)
+ {
+ ZEN_WARN("Failed to load oplog container, reason: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode);
+ return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Reason = LoadContainerResult.Reason,
+ .Text = LoadContainerResult.Text};
+ }
+ ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)));
+
+ AsyncRemoteResult RemoteResult;
+ Latch AttachmentsWorkLatch(1);
+
+ auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) {
+ return !ForceDownload && ChunkStore.ContainsChunk(RawHash);
+ };
+ auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult](
+ const IoHash& BlockHash,
+ std::vector<IoHash>&& Chunks) {
+ if (BlockHash == IoHash::Zero)
+ {
+ AttachmentsWorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to attachments with {} chunks ({}). Reason: '{}'",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Loaded {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ for (const auto& It : Result.Chunks)
+ {
+ ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly);
+ }
+ });
+ return;
+ }
+ AttachmentsWorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
+ if (BlockResult.ErrorCode)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ ZEN_ERROR("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)));
+
+ if (!IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
+ }))
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Invalid format for block {}", BlockHash),
+ {});
+ ZEN_ERROR("Failed to load oplog container, attachment {} has invalid format ({}). Reason: '{}'",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ });
+ };
+
+ auto OnNeedAttachment =
+ [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) {
+ if (!Attachments.insert(RawHash).second)
+ {
+ return;
+ }
+
+ AttachmentsWorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
+ {
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode);
+ return;
+ }
+ ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
+ ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
+ });
+ };
+
+ RemoteProjectStore::Result Result =
+ SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+
+ AttachmentsWorkLatch.CountDown();
+ while (!AttachmentsWorkLatch.Wait(1000))
+ {
+ ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining());
+ }
+ AttachmentsWorkLatch.Wait();
+ if (Result.ErrorCode == 0)
+ {
+ Result = RemoteResult.ConvertResult();
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+
+ ZEN_INFO("Loaded oplog {} in {}",
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)));
+
+ return Result;
+}
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
new file mode 100644
index 000000000..dcabaedd4
--- /dev/null
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -0,0 +1,111 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "projectstore.h"
+
+#include <unordered_set>
+
+namespace zen {
+
+class CidStore;
+class WorkerThreadPool;
+
+class RemoteProjectStore
+{
+public:
+ struct Result
+ {
+ int32_t ErrorCode{};
+ double ElapsedSeconds{};
+ std::string Reason;
+ std::string Text;
+ };
+
+ struct SaveResult : public Result
+ {
+ std::unordered_set<IoHash, IoHash::Hasher> Needs;
+ IoHash RawHash;
+ };
+
+ struct SaveAttachmentResult : public Result
+ {
+ };
+
+ struct SaveAttachmentsResult : public Result
+ {
+ };
+
+ struct LoadAttachmentResult : public Result
+ {
+ IoBuffer Bytes;
+ };
+
+ struct LoadContainerResult : public Result
+ {
+ CbObject ContainerObject;
+ };
+
+ struct LoadAttachmentsResult : public Result
+ {
+ std::vector<std::pair<IoHash, CompressedBuffer>> Chunks;
+ };
+
+ struct RemoteStoreInfo
+ {
+ bool CreateBlocks;
+ bool UseTempBlockFiles;
+ std::string Description;
+ };
+
+ virtual ~RemoteProjectStore() {}
+
+ virtual RemoteStoreInfo GetInfo() const = 0;
+
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0;
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) = 0;
+ virtual Result FinalizeContainer(const IoHash& RawHash) = 0;
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0;
+
+ virtual LoadContainerResult LoadContainer() = 0;
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0;
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0;
+};
+
+struct RemoteStoreOptions
+{
+ size_t MaxBlockSize = 128u * 1024u * 1024u;
+ size_t MaxChunkEmbedSize = 1024u * 1024u;
+};
+
+RemoteProjectStore::LoadContainerResult BuildContainer(
+ CidStore& ChunkStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks);
+
+RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog,
+ const CbObject& ContainerObject,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment);
+
+RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ bool UseTempBlocks,
+ bool ForceUpload);
+
+RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload);
+
+CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks);
+bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
new file mode 100644
index 000000000..6ff471ae5
--- /dev/null
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -0,0 +1,341 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenremoteprojectstore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compositebuffer.h>
+#include <zencore/fmtutils.h>
+#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
+#include <zencore/timer.h>
+#include <zenhttp/httpshared.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+class ZenRemoteStore : public RemoteProjectStore
+{
+public:
+ ZenRemoteStore(std::string_view HostAddress,
+ std::string_view Project,
+ std::string_view Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize)
+ : m_HostAddress(HostAddress)
+ , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress))
+ , m_Project(Project)
+ , m_Oplog(Oplog)
+ , m_MaxBlockSize(MaxBlockSize)
+ , m_MaxChunkEmbedSize(MaxChunkEmbedSize)
+ {
+ }
+
+ virtual RemoteStoreInfo GetInfo() const override
+ {
+ return {.CreateBlocks = false, .UseTempBlockFiles = false, .Description = fmt::format("[zen] {}"sv, m_HostAddress)};
+ }
+
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/save"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+ Session->SetUrl({SaveRequest});
+ Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}});
+ MemoryView Data(Payload.GetView());
+ Session->SetBody({reinterpret_cast<const char*>(Data.GetData()), Data.GetSize()});
+ cpr::Response Response = Session->Post();
+ SaveResult Result = SaveResult{ConvertResult(Response)};
+
+ if (Result.ErrorCode)
+ {
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ IoBuffer ResponsePayload(IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ CbObject ResponseObject = LoadCompactBinaryObject(ResponsePayload);
+ if (!ResponseObject)
+ {
+ Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView();
+ for (CbFieldView FieldView : NeedsArray)
+ {
+ IoHash ChunkHash = FieldView.AsHash();
+ Result.Needs.insert(ChunkHash);
+ }
+
+ Result.RawHash = IoHash::HashBuffer(Payload);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash);
+ Session->SetUrl({SaveRequest});
+ Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}});
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
+ Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
+ cpr::Response Response = Session->Post();
+ SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)};
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
+ {
+ Stopwatch Timer;
+
+ CbPackage RequestPackage;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("method"sv, "putchunks"sv);
+ RequestWriter.BeginArray("chunks"sv);
+ {
+ for (const SharedBuffer& Chunk : Chunks)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize);
+ RequestWriter.AddHash(RawHash);
+ RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash));
+ }
+ }
+ RequestWriter.EndArray(); // "chunks"
+ RequestPackage.SetObject(RequestWriter.Save());
+ }
+ CompositeBuffer Payload = FormatPackageMessageBuffer(RequestPackage, FormatFlags::kDefault);
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+ Session->SetUrl({SaveRequest});
+ Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}});
+
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
+ Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
+ cpr::Response Response = Session->Post();
+ SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)};
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+
+ CbObject Request;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("method"sv, "getchunks"sv);
+ RequestWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& RawHash : RawHashes)
+ {
+ RequestWriter.AddHash(RawHash);
+ }
+ }
+ RequestWriter.EndArray(); // "chunks"
+ Request = RequestWriter.Save();
+ }
+ IoBuffer Payload = Request.GetBuffer().AsIoBuffer();
+ Session->SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()});
+ Session->SetUrl(SaveRequest);
+ Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))},
+ {"Accept", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}});
+
+ cpr::Response Response = Session->Post();
+ LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)};
+ if (!Result.ErrorCode)
+ {
+ CbPackage Package = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()));
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ Result.Chunks.reserve(Attachments.size());
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ Result.Chunks.emplace_back(
+ std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()});
+ }
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ };
+
+ virtual Result FinalizeContainer(const IoHash&) override
+ {
+ Stopwatch Timer;
+
+ RwLock::ExclusiveLockScope _(SessionsLock);
+ Sessions.clear();
+ return {.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500};
+ }
+
+ virtual LoadContainerResult LoadContainer() override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+ std::string SaveRequest = fmt::format("{}/{}/oplog/{}/load"sv, m_ProjectStoreUrl, m_Project, m_Oplog);
+ Session->SetUrl(SaveRequest);
+ Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCbObject))}});
+ Session->SetParameters(
+ {{"maxblocksize", fmt::format("{}", m_MaxBlockSize)}, {"maxchunkembedsize", fmt::format("{}", m_MaxChunkEmbedSize)}});
+ cpr::Response Response = Session->Get();
+
+ LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)};
+ if (!Result.ErrorCode)
+ {
+ Result.ContainerObject = LoadCompactBinaryObject(IoBuffer(IoBuffer::Clone, Response.text.data(), Response.text.size()));
+ if (!Result.ContainerObject)
+ {
+ Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv,
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ {
+ Stopwatch Timer;
+
+ std::unique_ptr<cpr::Session> Session(AllocateSession());
+ auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); });
+
+ std::string LoadRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash);
+ Session->SetUrl({LoadRequest});
+ Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}});
+ cpr::Response Response = Session->Get();
+ LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)};
+ if (!Result.ErrorCode)
+ {
+ Result.Bytes = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ return Result;
+ }
+
+private:
+ std::unique_ptr<cpr::Session> AllocateSession()
+ {
+ RwLock::ExclusiveLockScope _(SessionsLock);
+ if (Sessions.empty())
+ {
+ Sessions.emplace_back(std::make_unique<cpr::Session>());
+ }
+ std::unique_ptr<cpr::Session> Session = std::move(Sessions.back());
+ Sessions.pop_back();
+ return Session;
+ }
+
+ void ReleaseSession(std::unique_ptr<cpr::Session>&& Session)
+ {
+ RwLock::ExclusiveLockScope _(SessionsLock);
+ Sessions.emplace_back(std::move(Session));
+ }
+
+ static Result ConvertResult(const cpr::Response& Response)
+ {
+ std::string Text;
+ std::string Reason = Response.reason;
+ int32_t ErrorCode = 0;
+ if (Response.error.code != cpr::ErrorCode::OK)
+ {
+ ErrorCode = static_cast<int32_t>(Response.error.code);
+ if (!Response.error.message.empty())
+ {
+ Reason = Response.error.message;
+ }
+ }
+ else if (!IsHttpSuccessCode(Response.status_code))
+ {
+ ErrorCode = static_cast<int32_t>(Response.status_code);
+
+ if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
+ {
+ zen::HttpContentType ContentType = zen::ParseContentType(It->second);
+ if (ContentType == zen::HttpContentType::kText)
+ {
+ Text = Response.text;
+ }
+ }
+
+ Reason = fmt::format("{}"sv, Response.status_code);
+ }
+ return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.elapsed, .Reason = Reason, .Text = Text};
+ }
+
+ RwLock SessionsLock;
+ std::vector<std::unique_ptr<cpr::Session>> Sessions;
+
+ const std::string m_HostAddress;
+ const std::string m_ProjectStoreUrl;
+ const std::string m_Project;
+ const std::string m_Oplog;
+ const size_t m_MaxBlockSize;
+ const size_t m_MaxChunkEmbedSize;
+};
+
+std::unique_ptr<RemoteProjectStore>
+CreateZenRemoteStore(const ZenRemoteStoreOptions& Options)
+{
+ std::string Url = Options.Url;
+ if (Url.find("://"sv) == std::string::npos)
+ {
+ // Assume https URL
+ Url = fmt::format("http://{}"sv, Url);
+ }
+ std::unique_ptr<RemoteProjectStore> RemoteStore =
+ std::make_unique<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize);
+ return RemoteStore;
+}
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.h b/src/zenserver/projectstore/zenremoteprojectstore.h
new file mode 100644
index 000000000..ef9dcad8c
--- /dev/null
+++ b/src/zenserver/projectstore/zenremoteprojectstore.h
@@ -0,0 +1,18 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "remoteprojectstore.h"
+
+namespace zen {
+
+struct ZenRemoteStoreOptions : RemoteStoreOptions
+{
+ std::string Url;
+ std::string ProjectId;
+ std::string OplogId;
+};
+
+std::unique_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options);
+
+} // namespace zen