diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/projectstore | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenserver/projectstore')
| -rw-r--r-- | src/zenserver/projectstore/fileremoteprojectstore.cpp | 235 | ||||
| -rw-r--r-- | src/zenserver/projectstore/fileremoteprojectstore.h | 19 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.cpp | 244 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.h | 26 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 4082 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 372 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 1036 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 111 | ||||
| -rw-r--r-- | src/zenserver/projectstore/zenremoteprojectstore.cpp | 341 | ||||
| -rw-r--r-- | src/zenserver/projectstore/zenremoteprojectstore.h | 18 |
10 files changed, 6484 insertions, 0 deletions
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp new file mode 100644 index 000000000..d7a34a6c2 --- /dev/null +++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp @@ -0,0 +1,235 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "fileremoteprojectstore.h" + +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/timer.h> + +namespace zen { + +using namespace std::literals; + +class LocalExportProjectStore : public RemoteProjectStore +{ +public: + LocalExportProjectStore(std::string_view Name, + const std::filesystem::path& FolderPath, + bool ForceDisableBlocks, + bool ForceEnableTempBlocks) + : m_Name(Name) + , m_OutputPath(FolderPath) + { + if (ForceDisableBlocks) + { + m_EnableBlocks = false; + } + if (ForceEnableTempBlocks) + { + m_UseTempBlocks = true; + } + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = m_EnableBlocks, + .UseTempBlockFiles = m_UseTempBlocks, + .Description = fmt::format("[file] {}"sv, m_OutputPath)}; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + Stopwatch Timer; + SaveResult Result; + + { + CbObject ContainerObject = LoadCompactBinaryObject(Payload); + + ContainerObject.IterateAttachments([&](CbFieldView FieldView) { + IoHash AttachmentHash = FieldView.AsBinaryAttachment(); + std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash); + if (!std::filesystem::exists(AttachmentPath)) + { + Result.Needs.insert(AttachmentHash); + } + }); + } + + std::filesystem::path ContainerPath = m_OutputPath; + ContainerPath.append(m_Name); + + CreateDirectories(m_OutputPath); + BasicFile ContainerFile; + ContainerFile.Open(ContainerPath, BasicFile::Mode::kTruncate); + std::error_code Ec; + ContainerFile.WriteAll(Payload, Ec); + if (Ec) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = Ec.message(); + } + Result.RawHash = IoHash::HashBuffer(Payload); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + { + Stopwatch Timer; + SaveAttachmentResult Result; + std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); + if (!std::filesystem::exists(ChunkPath)) + { + try + { + CreateDirectories(ChunkPath.parent_path()); + + BasicFile ChunkFile; + ChunkFile.Open(ChunkPath, BasicFile::Mode::kTruncate); + size_t Offset = 0; + for (const SharedBuffer& Segment : Payload.GetSegments()) + { + ChunkFile.Write(Segment.GetView(), Offset); + Offset += Segment.GetSize(); + } + } + catch (std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = Ex.what(); + } + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + Stopwatch Timer; + + for (const SharedBuffer& Chunk : Chunks) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); + SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash()); + if (ChunkResult.ErrorCode) + { + ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return SaveAttachmentsResult{ChunkResult}; + } + } + SaveAttachmentsResult Result; + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual Result FinalizeContainer(const IoHash&) override { return {}; } + + virtual LoadContainerResult LoadContainer() override + { + Stopwatch Timer; + LoadContainerResult Result; + std::filesystem::path ContainerPath = m_OutputPath; + ContainerPath.append(m_Name); + if (!std::filesystem::is_regular_file(ContainerPath)) + { + Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); + Result.Reason = fmt::format("The file {} does not exist"sv, ContainerPath.string()); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + IoBuffer ContainerPayload; + { + BasicFile ContainerFile; + ContainerFile.Open(ContainerPath, BasicFile::Mode::kRead); + ContainerPayload = ContainerFile.ReadAll(); + } + Result.ContainerObject = LoadCompactBinaryObject(ContainerPayload); + if (!Result.ContainerObject) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The file {} is not formatted as a compact binary object"sv, ContainerPath.string()); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + Stopwatch Timer; + LoadAttachmentResult Result; + std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); + if (!std::filesystem::is_regular_file(ChunkPath)) + { + Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); + Result.Reason = fmt::format("The file {} does not exist"sv, ChunkPath.string()); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + { + BasicFile ChunkFile; + ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); + Result.Bytes = ChunkFile.ReadAll(); + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + Stopwatch Timer; + LoadAttachmentsResult Result; + for (const IoHash& Hash : RawHashes) + { + LoadAttachmentResult ChunkResult = LoadAttachment(Hash); + if (ChunkResult.ErrorCode) + { + ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return LoadAttachmentsResult{ChunkResult}; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); + } + return Result; + } + +private: + std::filesystem::path GetAttachmentPath(const IoHash& RawHash) const + { + ExtendablePathBuilder<128> ShardedPath; + ShardedPath.Append(m_OutputPath.c_str()); + ExtendableStringBuilder<64> HashString; + RawHash.ToHexString(HashString); + const char* str = HashString.c_str(); + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str, str + 3); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 3, str + 5); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 5, str + 40); + + return ShardedPath.ToPath(); + } + + const std::string m_Name; + const std::filesystem::path m_OutputPath; + bool m_EnableBlocks = true; + bool m_UseTempBlocks = false; +}; + +std::unique_ptr<RemoteProjectStore> +CreateFileRemoteStore(const FileRemoteStoreOptions& Options) +{ + std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<LocalExportProjectStore>(Options.Name, + std::filesystem::path(Options.FolderPath), + Options.ForceDisableBlocks, + Options.ForceEnableTempBlocks); + return RemoteStore; +} + +} // namespace zen diff --git a/src/zenserver/projectstore/fileremoteprojectstore.h b/src/zenserver/projectstore/fileremoteprojectstore.h new file mode 100644 index 000000000..68d1eb71e --- /dev/null +++ b/src/zenserver/projectstore/fileremoteprojectstore.h @@ -0,0 +1,19 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "remoteprojectstore.h" + +namespace zen { + +struct FileRemoteStoreOptions : RemoteStoreOptions +{ + std::filesystem::path FolderPath; + std::string Name; + bool ForceDisableBlocks; + bool ForceEnableTempBlocks; +}; + +std::unique_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options); + +} // namespace zen diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp new file mode 100644 index 000000000..66cf3c4f8 --- /dev/null +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -0,0 +1,244 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "jupiterremoteprojectstore.h" + +#include <zencore/compress.h> +#include <zencore/fmtutils.h> + +#include <auth/authmgr.h> +#include <upstream/jupiter.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class JupiterRemoteStore : public RemoteProjectStore +{ +public: + JupiterRemoteStore(Ref<CloudCacheClient>&& CloudClient, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& Key, + bool ForceDisableBlocks, + bool ForceDisableTempBlocks) + : m_CloudClient(CloudClient) + , m_Namespace(Namespace) + , m_Bucket(Bucket) + , m_Key(Key) + { + if (ForceDisableBlocks) + { + m_EnableBlocks = false; + } + if (ForceDisableTempBlocks) + { + m_UseTempBlocks = false; + } + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = m_EnableBlocks, + .UseTempBlockFiles = m_UseTempBlocks, + .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key)}; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + const int32_t MaxAttempts = 3; + PutRefResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); + } + } + + return SaveResult{ConvertResult(Result), {Result.Needs.begin(), Result.Needs.end()} /*, {}*/, IoHash::HashBuffer(Payload)}; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + { + const int32_t MaxAttempts = 3; + CloudCacheResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); + } + } + + return SaveAttachmentResult{ConvertResult(Result)}; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + SaveAttachmentsResult Result; + for (const SharedBuffer& Chunk : Chunks) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); + SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash()); + if (ChunkResult.ErrorCode) + { + return SaveAttachmentsResult{ChunkResult}; + } + } + return Result; + } + + virtual Result FinalizeContainer(const IoHash& RawHash) override + { + const int32_t MaxAttempts = 3; + CloudCacheResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); + } + } + return ConvertResult(Result); + } + + virtual LoadContainerResult LoadContainer() override + { + const int32_t MaxAttempts = 3; + CloudCacheResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.GetRef(m_Namespace, m_Bucket, m_Key, ZenContentType::kCbObject); + } + } + + if (Result.ErrorCode || !Result.Success) + { + return LoadContainerResult{ConvertResult(Result)}; + } + + CbObject ContainerObject = LoadCompactBinaryObject(Result.Response); + if (!ContainerObject) + { + return LoadContainerResult{ + RemoteProjectStore::Result{ + .ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + .ElapsedSeconds = Result.ElapsedSeconds, + .Reason = fmt::format("The ref {}/{}/{} is not formatted as a compact binary object"sv, m_Namespace, m_Bucket, m_Key)}, + std::move(ContainerObject)}; + } + + return LoadContainerResult{ConvertResult(Result), std::move(ContainerObject)}; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + const int32_t MaxAttempts = 3; + CloudCacheResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.GetCompressedBlob(m_Namespace, RawHash); + } + } + return LoadAttachmentResult{ConvertResult(Result), std::move(Result.Response)}; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + LoadAttachmentsResult Result; + for (const IoHash& Hash : RawHashes) + { + LoadAttachmentResult ChunkResult = LoadAttachment(Hash); + if (ChunkResult.ErrorCode) + { + return LoadAttachmentsResult{ChunkResult}; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); + } + return Result; + } + +private: + static Result ConvertResult(const CloudCacheResult& Response) + { + std::string Text; + int32_t ErrorCode = 0; + if (Response.ErrorCode != 0) + { + ErrorCode = Response.ErrorCode; + } + else if (!Response.Success) + { + ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + if (Response.Response.GetContentType() == ZenContentType::kText) + { + Text = + std::string(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()), Response.Response.GetSize()); + } + } + return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; + } + + Ref<CloudCacheClient> m_CloudClient; + const std::string m_Namespace; + const std::string m_Bucket; + const IoHash m_Key; + bool m_EnableBlocks = true; + bool m_UseTempBlocks = true; +}; + +std::unique_ptr<RemoteProjectStore> +CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options) +{ + std::string Url = Options.Url; + if (Url.find("://"sv) == std::string::npos) + { + // Assume https URL + Url = fmt::format("https://{}"sv, Url); + } + CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv, + .ServiceUrl = Url, + .ConnectTimeout = std::chrono::milliseconds(2000), + .Timeout = std::chrono::milliseconds(60000)}; + // 1) Access token as parameter in request + // 2) Environment variable (different win vs linux/mac) + // 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider + + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + if (!Options.AccessToken.empty()) + { + TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = Options.AccessToken]() { + return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()}; + }); + } + else + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() { + AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + + Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider))); + + std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<JupiterRemoteStore>(std::move(CloudClient), + Options.Namespace, + Options.Bucket, + Options.Key, + Options.ForceDisableBlocks, + Options.ForceDisableTempBlocks); + return RemoteStore; +} + +} // namespace zen diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h new file mode 100644 index 000000000..31548af22 --- /dev/null +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.h @@ -0,0 +1,26 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "remoteprojectstore.h" + +namespace zen { + +class AuthMgr; + +struct JupiterRemoteStoreOptions : RemoteStoreOptions +{ + std::string Url; + std::string Namespace; + std::string Bucket; + IoHash Key; + std::string OpenIdProvider; + std::string AccessToken; + AuthMgr& AuthManager; + bool ForceDisableBlocks; + bool ForceDisableTempBlocks; +}; + +std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options); + +} // namespace zen diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp new file mode 100644 index 000000000..847a79a1d --- /dev/null +++ b/src/zenserver/projectstore/projectstore.cpp @@ -0,0 +1,4082 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "projectstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zenhttp/httpshared.h> +#include <zenstore/caslog.h> +#include <zenstore/cidstore.h> +#include <zenstore/scrubcontext.h> +#include <zenutil/cache/rpcrecording.h> + +#include "fileremoteprojectstore.h" +#include "jupiterremoteprojectstore.h" +#include "remoteprojectstore.h" +#include "zenremoteprojectstore.h" + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +#include <xxh3.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif // ZEN_WITH_TESTS + +namespace zen { + +namespace { + bool PrepareDirectoryDelete(const std::filesystem::path& Dir, std::filesystem::path& OutDeleteDir) + { + int DropIndex = 0; + do + { + if (!std::filesystem::exists(Dir)) + { + return true; + } + + std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); + std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; + if (std::filesystem::exists(DroppedBucketPath)) + { + DropIndex++; + continue; + } + + std::error_code Ec; + std::filesystem::rename(Dir, DroppedBucketPath, Ec); + if (!Ec) + { + OutDeleteDir = DroppedBucketPath; + return true; + } + if (Ec && !std::filesystem::exists(DroppedBucketPath)) + { + // We can't move our folder, probably because it is busy, bail.. + return false; + } + Sleep(100); + } while (true); + } + + std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params, + AuthMgr& AuthManager, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize) + { + using namespace std::literals; + + std::unique_ptr<RemoteProjectStore> RemoteStore; + + if (CbObjectView File = Params["file"sv].AsObjectView(); File) + { + std::filesystem::path FolderPath(File["path"sv].AsString()); + if (FolderPath.empty()) + { + return {nullptr, "Missing file path"}; + } + std::string_view Name(File["name"sv].AsString()); + if (Name.empty()) + { + return {nullptr, "Missing file name"}; + } + bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false); + bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false); + + FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + FolderPath, + std::string(Name), + ForceDisableBlocks, + ForceEnableTempBlocks}; + RemoteStore = CreateFileRemoteStore(Options); + } + + if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud) + { + std::string_view CloudServiceUrl = Cloud["url"sv].AsString(); + if (CloudServiceUrl.empty()) + { + return {nullptr, "Missing service url"}; + } + + std::string Url = cpr::util::urlDecode(std::string(CloudServiceUrl)); + std::string_view Namespace = Cloud["namespace"sv].AsString(); + if (Namespace.empty()) + { + return {nullptr, "Missing namespace"}; + } + std::string_view Bucket = Cloud["bucket"sv].AsString(); + if (Bucket.empty()) + { + return {nullptr, "Missing bucket"}; + } + std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString(); + std::string AccessToken = std::string(Cloud["access-token"sv].AsString()); + if (AccessToken.empty()) + { + std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString(); + if (!AccessTokenEnvVariable.empty()) + { + AccessToken = GetEnvVariable(AccessTokenEnvVariable); + } + } + std::string_view KeyParam = Cloud["key"sv].AsString(); + if (KeyParam.empty()) + { + return {nullptr, "Missing key"}; + } + if (KeyParam.length() != IoHash::StringLength) + { + return {nullptr, "Invalid key"}; + } + IoHash Key = IoHash::FromHexString(KeyParam); + if (Key == IoHash::Zero) + { + return {nullptr, "Invalid key string"}; + } + bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false); + bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false); + + JupiterRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + Url, + std::string(Namespace), + std::string(Bucket), + Key, + std::string(OpenIdProvider), + AccessToken, + AuthManager, + ForceDisableBlocks, + ForceDisableTempBlocks}; + RemoteStore = CreateJupiterRemoteStore(Options); + } + + if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen) + { + std::string_view Url = Zen["url"sv].AsString(); + std::string_view Project = Zen["project"sv].AsString(); + if (Project.empty()) + { + return {nullptr, "Missing project"}; + } + std::string_view Oplog = Zen["oplog"sv].AsString(); + if (Oplog.empty()) + { + return {nullptr, "Missing oplog"}; + } + ZenRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + std::string(Url), + std::string(Project), + std::string(Oplog)}; + RemoteStore = CreateZenRemoteStore(Options); + } + + if (!RemoteStore) + { + return {nullptr, "Unknown remote store type"}; + } + + return {std::move(RemoteStore), ""}; + } + + std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result) + { + if (Result.ErrorCode == 0) + { + return {HttpResponseCode::OK, Result.Text}; + } + return {static_cast<HttpResponseCode>(Result.ErrorCode), + Result.Reason.empty() ? Result.Text + : Result.Text.empty() ? Result.Reason + : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)}; + } + + void CSVHeader(bool Details, bool AttachmentDetails, StringBuilderBase& CSVWriter) + { + if (AttachmentDetails) + { + CSVWriter << "Project, Oplog, LSN, Key, Cid, Size"; + } + else if (Details) + { + CSVWriter << "Project, Oplog, LSN, Key, Size, AttachmentCount, AttachmentsSize"; + } + else + { + CSVWriter << "Project, Oplog, Key"; + } + } + + void CSVWriteOp(CidStore& CidStore, + std::string_view ProjectId, + std::string_view OplogId, + bool Details, + bool AttachmentDetails, + int LSN, + const Oid& Key, + CbObject Op, + StringBuilderBase& CSVWriter) + { + StringBuilder<32> KeyStringBuilder; + Key.ToString(KeyStringBuilder); + const std::string_view KeyString = KeyStringBuilder.ToView(); + + SharedBuffer Buffer = Op.GetBuffer(); + if (AttachmentDetails) + { + Op.IterateAttachments([&CidStore, &CSVWriter, &ProjectId, &OplogId, LSN, &KeyString](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); + CSVWriter << "\r\n" + << ProjectId << ", " << OplogId << ", " << LSN << ", " << KeyString << ", " << AttachmentHash.ToHexString() + << ", " << gsl::narrow<uint64_t>(Attachment.GetSize()); + }); + } + else if (Details) + { + uint64_t AttachmentCount = 0; + size_t AttachmentsSize = 0; + Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + AttachmentCount++; + IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); + AttachmentsSize += Attachment.GetSize(); + }); + CSVWriter << "\r\n" + << ProjectId << ", " << OplogId << ", " << LSN << ", " << KeyString << ", " << gsl::narrow<uint64_t>(Buffer.GetSize()) + << ", " << AttachmentCount << ", " << gsl::narrow<uint64_t>(AttachmentsSize); + } + else + { + CSVWriter << "\r\n" << ProjectId << ", " << OplogId << ", " << KeyString; + } + }; + + void CbWriteOp(CidStore& CidStore, + bool Details, + bool OpDetails, + bool AttachmentDetails, + int LSN, + const Oid& Key, + CbObject Op, + CbObjectWriter& CbWriter) + { + CbWriter.BeginObject(); + { + SharedBuffer Buffer = Op.GetBuffer(); + CbWriter.AddObjectId("key", Key); + if (Details) + { + CbWriter.AddInteger("lsn", LSN); + CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Buffer.GetSize())); + } + if (AttachmentDetails) + { + CbWriter.BeginArray("attachments"); + Op.IterateAttachments([&CidStore, &CbWriter](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + CbWriter.BeginObject(); + { + IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); + CbWriter.AddString("cid", AttachmentHash.ToHexString()); + CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Attachment.GetSize())); + } + CbWriter.EndObject(); + }); + CbWriter.EndArray(); + } + else if (Details) + { + uint64_t AttachmentCount = 0; + size_t AttachmentsSize = 0; + Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + AttachmentCount++; + IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); + AttachmentsSize += Attachment.GetSize(); + }); + if (AttachmentCount > 0) + { + CbWriter.AddInteger("attachments", AttachmentCount); + CbWriter.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize)); + } + } + if (OpDetails) + { + CbWriter.BeginObject("op"); + for (const CbFieldView& Field : Op) + { + if (!Field.HasName()) + { + CbWriter.AddField(Field); + continue; + } + std::string_view FieldName = Field.GetName(); + CbWriter.AddField(FieldName, Field); + } + CbWriter.EndObject(); + } + } + CbWriter.EndObject(); + }; + + void CbWriteOplogOps(CidStore& CidStore, + ProjectStore::Oplog& Oplog, + bool Details, + bool OpDetails, + bool AttachmentDetails, + CbObjectWriter& Cbo) + { + Cbo.BeginArray("ops"); + { + Oplog.IterateOplogWithKey([&Cbo, &CidStore, Details, OpDetails, AttachmentDetails](int LSN, const Oid& Key, CbObject Op) { + CbWriteOp(CidStore, Details, OpDetails, AttachmentDetails, LSN, Key, Op, Cbo); + }); + } + Cbo.EndArray(); + } + + void CbWriteOplog(CidStore& CidStore, + ProjectStore::Oplog& Oplog, + bool Details, + bool OpDetails, + bool AttachmentDetails, + CbObjectWriter& Cbo) + { + Cbo.BeginObject(); + { + Cbo.AddString("name", Oplog.OplogId()); + CbWriteOplogOps(CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo); + } + Cbo.EndObject(); + } + + void CbWriteOplogs(CidStore& CidStore, + ProjectStore::Project& Project, + std::vector<std::string> OpLogs, + bool Details, + bool OpDetails, + bool AttachmentDetails, + CbObjectWriter& Cbo) + { + Cbo.BeginArray("oplogs"); + { + for (const std::string& OpLogId : OpLogs) + { + ProjectStore::Oplog* Oplog = Project.OpenOplog(OpLogId); + if (Oplog != nullptr) + { + CbWriteOplog(CidStore, *Oplog, Details, OpDetails, AttachmentDetails, Cbo); + } + } + } + Cbo.EndArray(); + } + + void CbWriteProject(CidStore& CidStore, + ProjectStore::Project& Project, + std::vector<std::string> OpLogs, + bool Details, + bool OpDetails, + bool AttachmentDetails, + CbObjectWriter& Cbo) + { + Cbo.BeginObject(); + { + Cbo.AddString("name", Project.Identifier); + CbWriteOplogs(CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); + } + Cbo.EndObject(); + } + +} // namespace + +////////////////////////////////////////////////////////////////////////// + +Oid +OpKeyStringAsOId(std::string_view OpKey) +{ + using namespace std::literals; + + CbObjectWriter Writer; + Writer << "key"sv << OpKey; + + XXH3_128Stream KeyHasher; + Writer.Save()["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash = KeyHasher.GetHash(); + + Oid OpId; + memcpy(OpId.OidBits, &KeyHash, sizeof(OpId.OidBits)); + + return OpId; +} + +////////////////////////////////////////////////////////////////////////// + +struct ProjectStore::OplogStorage : public RefCounted +{ + OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath) + { + } + + ~OplogStorage() + { + ZEN_INFO("closing oplog storage at {}", m_OplogStoragePath); + Flush(); + } + + [[nodiscard]] bool Exists() { return Exists(m_OplogStoragePath); } + [[nodiscard]] static bool Exists(std::filesystem::path BasePath) + { + return std::filesystem::exists(BasePath / "ops.zlog") && std::filesystem::exists(BasePath / "ops.zops"); + } + + static bool Delete(std::filesystem::path BasePath) { return DeleteDirectories(BasePath); } + + uint64_t OpBlobsSize() const + { + RwLock::SharedLockScope _(m_RwLock); + return m_NextOpsOffset; + } + + void Open(bool IsCreate) + { + using namespace std::literals; + + ZEN_INFO("initializing oplog storage at '{}'", m_OplogStoragePath); + + if (IsCreate) + { + DeleteDirectories(m_OplogStoragePath); + CreateDirectories(m_OplogStoragePath); + } + + m_Oplog.Open(m_OplogStoragePath / "ops.zlog"sv, IsCreate ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + m_Oplog.Initialize(); + + m_OpBlobs.Open(m_OplogStoragePath / "ops.zops"sv, IsCreate ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); + + ZEN_ASSERT(IsPow2(m_OpsAlign)); + ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1))); + } + + void ReplayLog(std::function<void(CbObject, const OplogEntry&)>&& Handler) + { + ZEN_TRACE_CPU("ProjectStore::OplogStorage::ReplayLog"); + + // This could use memory mapping or do something clever but for now it just reads the file sequentially + + ZEN_INFO("replaying log for '{}'", m_OplogStoragePath); + + Stopwatch Timer; + + uint64_t InvalidEntries = 0; + + IoBuffer OpBuffer; + m_Oplog.Replay( + [&](const OplogEntry& LogEntry) { + if (LogEntry.OpCoreSize == 0) + { + ++InvalidEntries; + + return; + } + + if (OpBuffer.GetSize() < LogEntry.OpCoreSize) + { + OpBuffer = IoBuffer(LogEntry.OpCoreSize); + } + + const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; + + m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); + + // Verify checksum, ignore op data if incorrect + const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF); + + if (OpCoreHash != LogEntry.OpCoreHash) + { + ZEN_WARN("skipping oplog entry with bad checksum!"); + return; + } + + CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), LogEntry.OpCoreSize)); + + m_NextOpsOffset = + Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); + m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); + + Handler(Op, LogEntry); + }, + 0); + + if (InvalidEntries) + { + ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries); + } + + ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}", + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + m_MaxLsn, + m_NextOpsOffset); + } + + void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler) + { + for (const OplogEntryAddress& Entry : Entries) + { + CbObject Op = GetOp(Entry); + Handler(Op); + } + } + + CbObject GetOp(const OplogEntryAddress& Entry) + { + IoBuffer OpBuffer(Entry.Size); + + const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; + m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); + + return CbObject(SharedBuffer(std::move(OpBuffer))); + } + + OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, XXH3_128 KeyHash) + { + ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp"); + + using namespace std::literals; + + uint64_t WriteSize = Buffer.GetSize(); + + RwLock::ExclusiveLockScope Lock(m_RwLock); + const uint64_t WriteOffset = m_NextOpsOffset; + const uint32_t OpLsn = ++m_MaxLsn; + m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign); + Lock.ReleaseNow(); + + ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign)); + + OplogEntry Entry = {.OpLsn = OpLsn, + .OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign), + .OpCoreSize = uint32_t(Buffer.GetSize()), + .OpCoreHash = OpCoreHash, + .OpKeyHash = KeyHash}; + + m_Oplog.Append(Entry); + m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset); + + return Entry; + } + + void Flush() + { + m_Oplog.Flush(); + m_OpBlobs.Flush(); + } + + spdlog::logger& Log() { return m_OwnerOplog->Log(); } + +private: + ProjectStore::Oplog* m_OwnerOplog; + std::filesystem::path m_OplogStoragePath; + mutable RwLock m_RwLock; + TCasLogFile<OplogEntry> m_Oplog; + BasicFile m_OpBlobs; + std::atomic<uint64_t> m_NextOpsOffset{0}; + uint64_t m_OpsAlign = 32; + std::atomic<uint32_t> m_MaxLsn{0}; +}; + +////////////////////////////////////////////////////////////////////////// + +ProjectStore::Oplog::Oplog(std::string_view Id, + Project* Project, + CidStore& Store, + std::filesystem::path BasePath, + const std::filesystem::path& MarkerPath) +: m_OuterProject(Project) +, m_CidStore(Store) +, m_BasePath(BasePath) +, m_MarkerPath(MarkerPath) +, m_OplogId(Id) +{ + using namespace std::literals; + + m_Storage = new OplogStorage(this, m_BasePath); + const bool StoreExists = m_Storage->Exists(); + m_Storage->Open(/* IsCreate */ !StoreExists); + + m_TempPath = m_BasePath / "temp"sv; + + CleanDirectory(m_TempPath); +} + +ProjectStore::Oplog::~Oplog() +{ + if (m_Storage) + { + Flush(); + } +} + +void +ProjectStore::Oplog::Flush() +{ + ZEN_ASSERT(m_Storage); + m_Storage->Flush(); +} + +void +ProjectStore::Oplog::Scrub(ScrubContext& Ctx) const +{ + ZEN_UNUSED(Ctx); +} + +void +ProjectStore::Oplog::GatherReferences(GcContext& GcCtx) +{ + RwLock::SharedLockScope _(m_OplogLock); + + std::vector<IoHash> Hashes; + Hashes.reserve(Max(m_ChunkMap.size(), m_MetaMap.size())); + + for (const auto& Kv : m_ChunkMap) + { + Hashes.push_back(Kv.second); + } + + GcCtx.AddRetainedCids(Hashes); + + Hashes.clear(); + + for (const auto& Kv : m_MetaMap) + { + Hashes.push_back(Kv.second); + } + + GcCtx.AddRetainedCids(Hashes); +} + +uint64_t +ProjectStore::Oplog::TotalSize() const +{ + RwLock::SharedLockScope _(m_OplogLock); + if (m_Storage) + { + return m_Storage->OpBlobsSize(); + } + return 0; +} + +bool +ProjectStore::Oplog::IsExpired() const +{ + if (m_MarkerPath.empty()) + { + return false; + } + return !std::filesystem::exists(m_MarkerPath); +} + +std::filesystem::path +ProjectStore::Oplog::PrepareForDelete(bool MoveFolder) +{ + RwLock::ExclusiveLockScope _(m_OplogLock); + m_ChunkMap.clear(); + m_MetaMap.clear(); + m_FileMap.clear(); + m_OpAddressMap.clear(); + m_LatestOpMap.clear(); + m_Storage = {}; + if (!MoveFolder) + { + return {}; + } + std::filesystem::path MovedDir; + if (PrepareDirectoryDelete(m_BasePath, MovedDir)) + { + return MovedDir; + } + return {}; +} + +bool +ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath) +{ + using namespace std::literals; + + std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; + return std::filesystem::is_regular_file(StateFilePath); +} + +void +ProjectStore::Oplog::Read() +{ + using namespace std::literals; + + std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; + if (std::filesystem::is_regular_file(StateFilePath)) + { + ZEN_INFO("reading config for oplog '{}' in project '{}' from {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); + + BasicFile Blob; + Blob.Open(StateFilePath, BasicFile::Mode::kRead); + + IoBuffer Obj = Blob.ReadAll(); + CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); + + if (ValidationError != CbValidateError::None) + { + ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), StateFilePath); + return; + } + + CbObject Cfg = LoadCompactBinaryObject(Obj); + + m_MarkerPath = Cfg["gcpath"sv].AsString(); + } + else + { + ZEN_INFO("config for oplog '{}' in project '{}' not found at {}. Assuming legacy store", + m_OplogId, + m_OuterProject->Identifier, + StateFilePath); + } + ReplayLog(); +} + +void +ProjectStore::Oplog::Write() +{ + using namespace std::literals; + + BinaryWriter Mem; + + CbObjectWriter Cfg; + + Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath); + + Cfg.Save(Mem); + + std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; + + ZEN_INFO("persisting config for oplog '{}' in project '{}' to {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); + + BasicFile Blob; + Blob.Open(StateFilePath, BasicFile::Mode::kTruncate); + Blob.Write(Mem.Data(), Mem.Size(), 0); + Blob.Flush(); +} + +void +ProjectStore::Oplog::ReplayLog() +{ + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return; + } + m_Storage->ReplayLog( + [&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry, kUpdateReplay); }); +} + +IoBuffer +ProjectStore::Oplog::FindChunk(Oid ChunkId) +{ + RwLock::SharedLockScope OplogLock(m_OplogLock); + if (!m_Storage) + { + return IoBuffer{}; + } + + if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) + { + IoHash ChunkHash = ChunkIt->second; + OplogLock.ReleaseNow(); + + IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + + return Chunk; + } + + if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) + { + std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath; + + OplogLock.ReleaseNow(); + + IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath); + FileChunk.SetContentType(ZenContentType::kBinary); + + return FileChunk; + } + + if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) + { + IoHash ChunkHash = MetaIt->second; + OplogLock.ReleaseNow(); + + IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + + return Chunk; + } + + return {}; +} + +void +ProjectStore::Oplog::IterateFileMap( + std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn) +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } + + for (const auto& Kv : m_FileMap) + { + Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath); + } +} + +void +ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler) +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } + + std::vector<OplogEntryAddress> Entries; + Entries.reserve(m_LatestOpMap.size()); + + for (const auto& Kv : m_LatestOpMap) + { + const auto AddressEntry = m_OpAddressMap.find(Kv.second); + ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); + + Entries.push_back(AddressEntry->second); + } + + std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) { + return Lhs.Offset < Rhs.Offset; + }); + + m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); }); +} + +void +ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbObject)>&& Handler) +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return; + } + + std::vector<size_t> EntryIndexes; + std::vector<OplogEntryAddress> Entries; + std::vector<Oid> Keys; + std::vector<int> LSNs; + Entries.reserve(m_LatestOpMap.size()); + EntryIndexes.reserve(m_LatestOpMap.size()); + Keys.reserve(m_LatestOpMap.size()); + LSNs.reserve(m_LatestOpMap.size()); + + for (const auto& Kv : m_LatestOpMap) + { + const auto AddressEntry = m_OpAddressMap.find(Kv.second); + ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); + + Entries.push_back(AddressEntry->second); + Keys.push_back(Kv.first); + LSNs.push_back(Kv.second); + EntryIndexes.push_back(EntryIndexes.size()); + } + + std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) { + const OplogEntryAddress& LhsEntry = Entries[Lhs]; + const OplogEntryAddress& RhsEntry = Entries[Rhs]; + return LhsEntry.Offset < RhsEntry.Offset; + }); + std::vector<OplogEntryAddress> SortedEntries; + SortedEntries.reserve(EntryIndexes.size()); + for (size_t Index : EntryIndexes) + { + SortedEntries.push_back(Entries[Index]); + } + + size_t EntryIndex = 0; + m_Storage->ReplayLog(SortedEntries, [&](CbObject Op) { + Handler(LSNs[EntryIndex], Keys[EntryIndex], Op); + EntryIndex++; + }); +} + +int +ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key) +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } + if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) + { + return LatestOp->second; + } + return -1; +} + +std::optional<CbObject> +ProjectStore::Oplog::GetOpByKey(const Oid& Key) +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } + + if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) + { + const auto AddressEntry = m_OpAddressMap.find(LatestOp->second); + ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); + + return m_Storage->GetOp(AddressEntry->second); + } + + return {}; +} + +std::optional<CbObject> +ProjectStore::Oplog::GetOpByIndex(int Index) +{ + RwLock::SharedLockScope _(m_OplogLock); + if (!m_Storage) + { + return {}; + } + + if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end()) + { + return m_Storage->GetOp(AddressEntryIt->second); + } + + return {}; +} + +void +ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, + Oid FileId, + IoHash Hash, + std::string_view ServerPath, + std::string_view ClientPath) +{ + if (Hash != IoHash::Zero) + { + m_ChunkMap.insert_or_assign(FileId, Hash); + } + + FileMapEntry Entry; + Entry.ServerPath = ServerPath; + Entry.ClientPath = ClientPath; + + m_FileMap[FileId] = std::move(Entry); + + if (Hash != IoHash::Zero) + { + m_ChunkMap.insert_or_assign(FileId, Hash); + } +} + +void +ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) +{ + m_ChunkMap.insert_or_assign(ChunkId, Hash); +} + +void +ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) +{ + m_MetaMap.insert_or_assign(ChunkId, Hash); +} + +ProjectStore::Oplog::OplogEntryMapping +ProjectStore::Oplog::GetMapping(CbObject Core) +{ + using namespace std::literals; + + OplogEntryMapping Result; + + // Update chunk id maps + CbObjectView PackageObj = Core["package"sv].AsObjectView(); + CbArrayView BulkDataArray = Core["bulkdata"sv].AsArrayView(); + CbArrayView PackageDataArray = Core["packagedata"sv].AsArrayView(); + Result.Chunks.reserve(PackageObj ? 1 : 0 + BulkDataArray.Num() + PackageDataArray.Num()); + + if (PackageObj) + { + Oid Id = PackageObj["id"sv].AsObjectId(); + IoHash Hash = PackageObj["data"sv].AsBinaryAttachment(); + Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); + ZEN_DEBUG("package data {} -> {}", Id, Hash); + } + + for (CbFieldView& Entry : PackageDataArray) + { + CbObjectView PackageDataObj = Entry.AsObjectView(); + Oid Id = PackageDataObj["id"sv].AsObjectId(); + IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment(); + Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); + ZEN_DEBUG("package {} -> {}", Id, Hash); + } + + for (CbFieldView& Entry : BulkDataArray) + { + CbObjectView BulkObj = Entry.AsObjectView(); + Oid Id = BulkObj["id"sv].AsObjectId(); + IoHash Hash = BulkObj["data"sv].AsBinaryAttachment(); + Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); + ZEN_DEBUG("bulkdata {} -> {}", Id, Hash); + } + + CbArrayView FilesArray = Core["files"sv].AsArrayView(); + Result.Files.reserve(FilesArray.Num()); + for (CbFieldView& Entry : FilesArray) + { + CbObjectView FileObj = Entry.AsObjectView(); + + std::string_view ServerPath = FileObj["serverpath"sv].AsString(); + std::string_view ClientPath = FileObj["clientpath"sv].AsString(); + if (ServerPath.empty() || ClientPath.empty()) + { + ZEN_WARN("invalid file"); + continue; + } + + Oid Id = FileObj["id"sv].AsObjectId(); + IoHash Hash = FileObj["data"sv].AsBinaryAttachment(); + Result.Files.emplace_back( + OplogEntryMapping::FileMapping{OplogEntryMapping::Mapping{Id, Hash}, std::string(ServerPath), std::string(ClientPath)}); + ZEN_DEBUG("file {} -> {}, ServerPath: {}, ClientPath: {}", Id, Hash, ServerPath, ClientPath); + } + + CbArrayView MetaArray = Core["meta"sv].AsArrayView(); + Result.Meta.reserve(MetaArray.Num()); + for (CbFieldView& Entry : MetaArray) + { + CbObjectView MetaObj = Entry.AsObjectView(); + Oid Id = MetaObj["id"sv].AsObjectId(); + IoHash Hash = MetaObj["data"sv].AsBinaryAttachment(); + Result.Meta.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); + auto NameString = MetaObj["name"sv].AsString(); + ZEN_DEBUG("meta data ({}) {} -> {}", NameString, Id, Hash); + } + + return Result; +} + +uint32_t +ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, + const OplogEntryMapping& OpMapping, + const OplogEntry& OpEntry, + UpdateType TypeOfUpdate) +{ + ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry"); + + ZEN_UNUSED(TypeOfUpdate); + + // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing + // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however + + using namespace std::literals; + + // Update chunk id maps + for (const OplogEntryMapping::Mapping& Chunk : OpMapping.Chunks) + { + AddChunkMapping(OplogLock, Chunk.Id, Chunk.Hash); + } + + for (const OplogEntryMapping::FileMapping& File : OpMapping.Files) + { + AddFileMapping(OplogLock, File.Id, File.Hash, File.ServerPath, File.ClientPath); + } + + for (const OplogEntryMapping::Mapping& Meta : OpMapping.Meta) + { + AddMetaMapping(OplogLock, Meta.Id, Meta.Hash); + } + + m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); + m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn; + + return OpEntry.OpLsn; +} + +uint32_t +ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) +{ + ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); + + const CbObject& Core = OpPackage.GetObject(); + const uint32_t EntryId = AppendNewOplogEntry(Core); + if (EntryId == 0xffffffffu) + { + // The oplog has been deleted so just drop this + return EntryId; + } + + // Persist attachments after oplog entry so GC won't find attachments without references + + uint64_t AttachmentBytes = 0; + uint64_t NewAttachmentBytes = 0; + + auto Attachments = OpPackage.GetAttachments(); + + for (const auto& Attach : Attachments) + { + ZEN_ASSERT(Attach.IsCompressedBinary()); + + CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); + const uint64_t AttachmentSize = AttachmentData.DecodeRawSize(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash()); + + if (InsertResult.New) + { + NewAttachmentBytes += AttachmentSize; + } + AttachmentBytes += AttachmentSize; + } + + ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); + + return EntryId; +} + +uint32_t +ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) +{ + ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); + + using namespace std::literals; + + OplogEntryMapping Mapping = GetMapping(Core); + + SharedBuffer Buffer = Core.GetBuffer(); + const uint64_t WriteSize = Buffer.GetSize(); + const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); + + ZEN_ASSERT(WriteSize != 0); + + XXH3_128Stream KeyHasher; + Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash = KeyHasher.GetHash(); + + RefPtr<OplogStorage> Storage; + { + RwLock::SharedLockScope _(m_OplogLock); + Storage = m_Storage; + } + if (!m_Storage) + { + return 0xffffffffu; + } + const OplogEntry OpEntry = m_Storage->AppendOp(Buffer, OpCoreHash, KeyHash); + + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry, kUpdateNewEntry); + + return EntryId; +} + +////////////////////////////////////////////////////////////////////////// + +ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath) +: m_ProjectStore(PrjStore) +, m_CidStore(Store) +, m_OplogStoragePath(BasePath) +{ +} + +ProjectStore::Project::~Project() +{ +} + +bool +ProjectStore::Project::Exists(std::filesystem::path BasePath) +{ + return std::filesystem::exists(BasePath / "Project.zcb"); +} + +void +ProjectStore::Project::Read() +{ + using namespace std::literals; + + std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; + + ZEN_INFO("reading config for project '{}' from {}", Identifier, ProjectStateFilePath); + + BasicFile Blob; + Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead); + + IoBuffer Obj = Blob.ReadAll(); + CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); + + if (ValidationError == CbValidateError::None) + { + CbObject Cfg = LoadCompactBinaryObject(Obj); + + Identifier = Cfg["id"sv].AsString(); + RootDir = Cfg["root"sv].AsString(); + ProjectRootDir = Cfg["project"sv].AsString(); + EngineRootDir = Cfg["engine"sv].AsString(); + ProjectFilePath = Cfg["projectfile"sv].AsString(); + } + else + { + ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), ProjectStateFilePath); + } +} + +void +ProjectStore::Project::Write() +{ + using namespace std::literals; + + BinaryWriter Mem; + + CbObjectWriter Cfg; + Cfg << "id"sv << Identifier; + Cfg << "root"sv << PathToUtf8(RootDir); + Cfg << "project"sv << ProjectRootDir; + Cfg << "engine"sv << EngineRootDir; + Cfg << "projectfile"sv << ProjectFilePath; + + Cfg.Save(Mem); + + CreateDirectories(m_OplogStoragePath); + + std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; + + ZEN_INFO("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath); + + BasicFile Blob; + Blob.Open(ProjectStateFilePath, BasicFile::Mode::kTruncate); + Blob.Write(Mem.Data(), Mem.Size(), 0); + Blob.Flush(); +} + +spdlog::logger& +ProjectStore::Project::Log() +{ + return m_ProjectStore->Log(); +} + +std::filesystem::path +ProjectStore::Project::BasePathForOplog(std::string_view OplogId) +{ + return m_OplogStoragePath / OplogId; +} + +ProjectStore::Oplog* +ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath) +{ + RwLock::ExclusiveLockScope _(m_ProjectLock); + + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + + try + { + Oplog* Log = m_Oplogs + .try_emplace(std::string{OplogId}, + std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, MarkerPath)) + .first->second.get(); + + Log->Write(); + return Log; + } + catch (std::exception&) + { + // In case of failure we need to ensure there's no half constructed entry around + // + // (This is probably already ensured by the try_emplace implementation?) + + m_Oplogs.erase(std::string{OplogId}); + + return nullptr; + } +} + +ProjectStore::Oplog* +ProjectStore::Project::OpenOplog(std::string_view OplogId) +{ + { + RwLock::SharedLockScope _(m_ProjectLock); + + auto OplogIt = m_Oplogs.find(std::string(OplogId)); + + if (OplogIt != m_Oplogs.end()) + { + return OplogIt->second.get(); + } + } + + RwLock::ExclusiveLockScope _(m_ProjectLock); + + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + + if (Oplog::ExistsAt(OplogBasePath)) + { + // Do open of existing oplog + + try + { + Oplog* Log = + m_Oplogs + .try_emplace(std::string{OplogId}, + std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{})) + .first->second.get(); + Log->Read(); + + return Log; + } + catch (std::exception& ex) + { + ZEN_WARN("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what()); + + m_Oplogs.erase(std::string{OplogId}); + } + } + + return nullptr; +} + +void +ProjectStore::Project::DeleteOplog(std::string_view OplogId) +{ + std::filesystem::path DeletePath; + { + RwLock::ExclusiveLockScope _(m_ProjectLock); + + auto OplogIt = m_Oplogs.find(std::string(OplogId)); + + if (OplogIt != m_Oplogs.end()) + { + std::unique_ptr<Oplog>& Oplog = OplogIt->second; + DeletePath = Oplog->PrepareForDelete(true); + m_DeletedOplogs.emplace_back(std::move(Oplog)); + m_Oplogs.erase(OplogIt); + } + } + + // Erase content on disk + if (!DeletePath.empty()) + { + OplogStorage::Delete(DeletePath); + } +} + +std::vector<std::string> +ProjectStore::Project::ScanForOplogs() const +{ + DirectoryContent DirContent; + GetDirectoryContent(m_OplogStoragePath, DirectoryContent::IncludeDirsFlag, DirContent); + std::vector<std::string> Oplogs; + Oplogs.reserve(DirContent.Directories.size()); + for (const std::filesystem::path& DirPath : DirContent.Directories) + { + Oplogs.push_back(DirPath.filename().string()); + } + return Oplogs; +} + +void +ProjectStore::Project::IterateOplogs(std::function<void(const Oplog&)>&& Fn) const +{ + RwLock::SharedLockScope _(m_ProjectLock); + + for (auto& Kv : m_Oplogs) + { + Fn(*Kv.second); + } +} + +void +ProjectStore::Project::IterateOplogs(std::function<void(Oplog&)>&& Fn) +{ + RwLock::SharedLockScope _(m_ProjectLock); + + for (auto& Kv : m_Oplogs) + { + Fn(*Kv.second); + } +} + +void +ProjectStore::Project::Flush() +{ + // We only need to flush oplogs that we have already loaded + IterateOplogs([&](Oplog& Ops) { Ops.Flush(); }); +} + +void +ProjectStore::Project::Scrub(ScrubContext& Ctx) +{ + // Scrubbing needs to check all existing oplogs + std::vector<std::string> OpLogs = ScanForOplogs(); + for (const std::string& OpLogId : OpLogs) + { + OpenOplog(OpLogId); + } + IterateOplogs([&](const Oplog& Ops) { + if (!Ops.IsExpired()) + { + Ops.Scrub(Ctx); + } + }); +} + +void +ProjectStore::Project::GatherReferences(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("ProjectStore::Project::GatherReferences"); + + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_DEBUG("gathered references from project store project {} in {}", Identifier, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + // GatherReferences needs to check all existing oplogs + std::vector<std::string> OpLogs = ScanForOplogs(); + for (const std::string& OpLogId : OpLogs) + { + OpenOplog(OpLogId); + } + IterateOplogs([&](Oplog& Ops) { + if (!Ops.IsExpired()) + { + Ops.GatherReferences(GcCtx); + } + }); +} + +uint64_t +ProjectStore::Project::TotalSize() const +{ + uint64_t Result = 0; + { + RwLock::SharedLockScope _(m_ProjectLock); + for (const auto& It : m_Oplogs) + { + Result += It.second->TotalSize(); + } + } + return Result; +} + +bool +ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath) +{ + RwLock::ExclusiveLockScope _(m_ProjectLock); + + for (auto& It : m_Oplogs) + { + // We don't care about the moved folder + It.second->PrepareForDelete(false); + m_DeletedOplogs.emplace_back(std::move(It.second)); + } + + m_Oplogs.clear(); + + bool Success = PrepareDirectoryDelete(m_OplogStoragePath, OutDeletePath); + if (!Success) + { + return false; + } + m_OplogStoragePath.clear(); + return true; +} + +bool +ProjectStore::Project::IsExpired() const +{ + if (ProjectFilePath.empty()) + { + return false; + } + return !std::filesystem::exists(ProjectFilePath); +} + +////////////////////////////////////////////////////////////////////////// + +ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc) +: GcStorage(Gc) +, GcContributor(Gc) +, m_Log(logging::Get("project")) +, m_CidStore(Store) +, m_ProjectBasePath(BasePath) +{ + ZEN_INFO("initializing project store at '{}'", BasePath); + // m_Log.set_level(spdlog::level::debug); +} + +ProjectStore::~ProjectStore() +{ + ZEN_INFO("closing project store ('{}')", m_ProjectBasePath); +} + +std::filesystem::path +ProjectStore::BasePathForProject(std::string_view ProjectId) +{ + return m_ProjectBasePath / ProjectId; +} + +void +ProjectStore::DiscoverProjects() +{ + if (!std::filesystem::exists(m_ProjectBasePath)) + { + return; + } + + DirectoryContent DirContent; + GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent); + + for (const std::filesystem::path& DirPath : DirContent.Directories) + { + std::string DirName = PathToUtf8(DirPath.filename()); + OpenProject(DirName); + } +} + +void +ProjectStore::IterateProjects(std::function<void(Project& Prj)>&& Fn) +{ + RwLock::SharedLockScope _(m_ProjectsLock); + + for (auto& Kv : m_Projects) + { + Fn(*Kv.second.Get()); + } +} + +void +ProjectStore::Flush() +{ + std::vector<Ref<Project>> Projects; + { + RwLock::SharedLockScope _(m_ProjectsLock); + Projects.reserve(m_Projects.size()); + + for (auto& Kv : m_Projects) + { + Projects.push_back(Kv.second); + } + } + for (const Ref<Project>& Project : Projects) + { + Project->Flush(); + } +} + +void +ProjectStore::Scrub(ScrubContext& Ctx) +{ + DiscoverProjects(); + + std::vector<Ref<Project>> Projects; + { + RwLock::SharedLockScope _(m_ProjectsLock); + Projects.reserve(m_Projects.size()); + + for (auto& Kv : m_Projects) + { + if (Kv.second->IsExpired()) + { + continue; + } + Projects.push_back(Kv.second); + } + } + for (const Ref<Project>& Project : Projects) + { + Project->Scrub(Ctx); + } +} + +void +ProjectStore::GatherReferences(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("ProjectStore::GatherReferences"); + + size_t ProjectCount = 0; + size_t ExpiredProjectCount = 0; + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_DEBUG("gathered references from '{}' in {}, found {} active projects and {} expired projects", + m_ProjectBasePath.string(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + ProjectCount, + ExpiredProjectCount); + }); + + DiscoverProjects(); + + std::vector<Ref<Project>> Projects; + { + RwLock::SharedLockScope _(m_ProjectsLock); + Projects.reserve(m_Projects.size()); + + for (auto& Kv : m_Projects) + { + if (Kv.second->IsExpired()) + { + ExpiredProjectCount++; + continue; + } + Projects.push_back(Kv.second); + } + } + ProjectCount = Projects.size(); + for (const Ref<Project>& Project : Projects) + { + Project->GatherReferences(GcCtx); + } +} + +void +ProjectStore::CollectGarbage(GcContext& GcCtx) +{ + ZEN_TRACE_CPU("ProjectStore::CollectGarbage"); + + size_t ProjectCount = 0; + size_t ExpiredProjectCount = 0; + + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_DEBUG("garbage collect from '{}' DONE after {}, found {} active projects and {} expired projects", + m_ProjectBasePath.string(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + ProjectCount, + ExpiredProjectCount); + }); + std::vector<Ref<Project>> ExpiredProjects; + std::vector<Ref<Project>> Projects; + + { + RwLock::SharedLockScope _(m_ProjectsLock); + for (auto& Kv : m_Projects) + { + if (Kv.second->IsExpired()) + { + ExpiredProjects.push_back(Kv.second); + ExpiredProjectCount++; + continue; + } + Projects.push_back(Kv.second); + ProjectCount++; + } + } + + if (!GcCtx.IsDeletionMode()) + { + ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string()); + return; + } + + for (const Ref<Project>& Project : Projects) + { + std::vector<std::string> ExpiredOplogs; + { + RwLock::ExclusiveLockScope _(m_ProjectsLock); + Project->IterateOplogs([&ExpiredOplogs](ProjectStore::Oplog& Oplog) { + if (Oplog.IsExpired()) + { + ExpiredOplogs.push_back(Oplog.OplogId()); + } + }); + } + for (const std::string& OplogId : ExpiredOplogs) + { + ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected oplog '{}' in project '{}'. Removing storage on disk", + OplogId, + Project->Identifier); + Project->DeleteOplog(OplogId); + } + } + + if (ExpiredProjects.empty()) + { + ZEN_DEBUG("garbage collect for '{}', no expired projects found", m_ProjectBasePath.string()); + return; + } + + for (const Ref<Project>& Project : ExpiredProjects) + { + std::filesystem::path PathToRemove; + std::string ProjectId; + { + RwLock::ExclusiveLockScope _(m_ProjectsLock); + if (!Project->IsExpired()) + { + ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project no longer expired.", ProjectId); + continue; + } + bool Success = Project->PrepareForDelete(PathToRemove); + if (!Success) + { + ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project folder is locked.", ProjectId); + continue; + } + m_Projects.erase(Project->Identifier); + ProjectId = Project->Identifier; + } + + ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected project '{}'. Removing storage on disk", ProjectId); + if (PathToRemove.empty()) + { + continue; + } + + DeleteDirectories(PathToRemove); + } +} + +GcStorageSize +ProjectStore::StorageSize() const +{ + GcStorageSize Result; + { + RwLock::SharedLockScope _(m_ProjectsLock); + for (auto& Kv : m_Projects) + { + const Ref<Project>& Project = Kv.second; + Result.DiskSize += Project->TotalSize(); + } + } + return Result; +} + +Ref<ProjectStore::Project> +ProjectStore::OpenProject(std::string_view ProjectId) +{ + { + RwLock::SharedLockScope _(m_ProjectsLock); + + auto ProjIt = m_Projects.find(std::string{ProjectId}); + + if (ProjIt != m_Projects.end()) + { + return ProjIt->second; + } + } + + RwLock::ExclusiveLockScope _(m_ProjectsLock); + + std::filesystem::path BasePath = BasePathForProject(ProjectId); + + if (Project::Exists(BasePath)) + { + try + { + ZEN_INFO("opening project {} @ {}", ProjectId, BasePath); + + Ref<Project>& Prj = + m_Projects + .try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath))) + .first->second; + Prj->Identifier = ProjectId; + Prj->Read(); + return Prj; + } + catch (std::exception& e) + { + ZEN_WARN("failed to open {} @ {} ({})", ProjectId, BasePath, e.what()); + m_Projects.erase(std::string{ProjectId}); + } + } + + return {}; +} + +Ref<ProjectStore::Project> +ProjectStore::NewProject(std::filesystem::path BasePath, + std::string_view ProjectId, + std::string_view RootDir, + std::string_view EngineRootDir, + std::string_view ProjectRootDir, + std::string_view ProjectFilePath) +{ + RwLock::ExclusiveLockScope _(m_ProjectsLock); + + Ref<Project>& Prj = + m_Projects.try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath))) + .first->second; + Prj->Identifier = ProjectId; + Prj->RootDir = RootDir; + Prj->EngineRootDir = EngineRootDir; + Prj->ProjectRootDir = ProjectRootDir; + Prj->ProjectFilePath = ProjectFilePath; + Prj->Write(); + + return Prj; +} + +bool +ProjectStore::DeleteProject(std::string_view ProjectId) +{ + ZEN_INFO("deleting project {}", ProjectId); + + RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock); + + auto ProjIt = m_Projects.find(std::string{ProjectId}); + + if (ProjIt == m_Projects.end()) + { + return true; + } + + std::filesystem::path DeletePath; + bool Success = ProjIt->second->PrepareForDelete(DeletePath); + + if (!Success) + { + return false; + } + m_Projects.erase(ProjIt); + ProjectsLock.ReleaseNow(); + + if (!DeletePath.empty()) + { + DeleteDirectories(DeletePath); + } + return true; +} + +bool +ProjectStore::Exists(std::string_view ProjectId) +{ + return Project::Exists(BasePathForProject(ProjectId)); +} + +CbArray +ProjectStore::GetProjectsList() +{ + using namespace std::literals; + + DiscoverProjects(); + + CbWriter Response; + Response.BeginArray(); + + IterateProjects([&Response](ProjectStore::Project& Prj) { + Response.BeginObject(); + Response << "Id"sv << Prj.Identifier; + Response << "RootDir"sv << Prj.RootDir.string(); + Response << "ProjectRootDir"sv << Prj.ProjectRootDir; + Response << "EngineRootDir"sv << Prj.EngineRootDir; + Response << "ProjectFilePath"sv << Prj.ProjectFilePath; + Response.EndObject(); + }); + Response.EndArray(); + return Response.Save().AsArray(); +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, bool FilterClient, CbObject& OutPayload) +{ + using namespace std::literals; + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Project files request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Project files for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + CbObjectWriter Response; + Response.BeginArray("files"sv); + + FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { + Response.BeginObject(); + Response << "id"sv << Id; + Response << "clientpath"sv << ClientPath; + if (!FilterClient) + { + Response << "serverpath"sv << ServerPath; + } + Response.EndObject(); + }); + + Response.EndArray(); + OutPayload = Response.Save(); + return {HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::GetChunkInfo(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + CbObject& OutPayload) +{ + using namespace std::literals; + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) + { + return {HttpResponseCode::BadRequest, + fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)}; + } + + const Oid Obj = Oid::FromHexString(ChunkId); + + IoBuffer Chunk = FoundLog->FindChunk(Obj); + if (!Chunk) + { + return {HttpResponseCode::NotFound, {}}; + } + + uint64_t ChunkSize = Chunk.GetSize(); + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize); + ZEN_ASSERT(IsCompressed); + ChunkSize = RawSize; + } + + CbObjectWriter Response; + Response << "size"sv << ChunkSize; + OutPayload = Response.Save(); + return {HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::GetChunkRange(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + IoBuffer& OutChunk) +{ + bool IsOffset = Offset != 0 || Size != ~(0ull); + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)}; + } + + const Oid Obj = Oid::FromHexString(ChunkId); + + IoBuffer Chunk = FoundLog->FindChunk(Obj); + if (!Chunk) + { + return {HttpResponseCode::NotFound, {}}; + } + + OutChunk = Chunk; + HttpContentType ContentType = Chunk.GetContentType(); + + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); + ZEN_ASSERT(!Compressed.IsNull()); + + if (IsOffset) + { + if ((Offset + Size) > RawSize) + { + Size = RawSize - Offset; + } + + if (AcceptType == HttpContentType::kBinary) + { + OutChunk = Compressed.Decompress(Offset, Size).AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kBinary); + } + else + { + // Value will be a range of compressed blocks that covers the requested range + // The client will have to compensate for any offsets that do not land on an even block size multiple + OutChunk = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kCompressedBinary); + } + } + else + { + if (AcceptType == HttpContentType::kBinary) + { + OutChunk = Compressed.Decompress().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kBinary); + } + else + { + OutChunk = Compressed.GetCompressed().Flatten().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kCompressedBinary); + } + } + } + else if (IsOffset) + { + if ((Offset + Size) > Chunk.GetSize()) + { + Size = Chunk.GetSize() - Offset; + } + OutChunk = IoBuffer(std::move(Chunk), Offset, Size); + OutChunk.SetContentType(ContentType); + } + + return {HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::GetChunk(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view Cid, + ZenContentType AcceptType, + IoBuffer& OutChunk) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + if (Cid.length() != IoHash::StringLength) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, Cid)}; + } + + const IoHash Hash = IoHash::FromHexString(Cid); + OutChunk = m_CidStore.FindChunkByCid(Hash); + + if (!OutChunk) + { + return {HttpResponseCode::NotFound, fmt::format("chunk - '{}' MISSING", Cid)}; + } + + if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk)); + OutChunk = Compressed.Decompress().AsIoBuffer(); + OutChunk.SetContentType(ZenContentType::kBinary); + } + else + { + OutChunk.SetContentType(ZenContentType::kCompressedBinary); + } + return {HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::PutChunk(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view Cid, + ZenContentType ContentType, + IoBuffer&& Chunk) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + if (Cid.length() != IoHash::StringLength) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk put request for invalid chunk hash '{}'", Cid)}; + } + + const IoHash Hash = IoHash::FromHexString(Cid); + + if (ContentType != HttpContentType::kCompressedBinary) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid content type for chunk '{}'", Cid)}; + } + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); + if (RawHash != Hash) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid payload format for chunk '{}'", Cid)}; + } + + CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, Hash); + return {Result.New ? HttpResponseCode::Created : HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, CbObject& OutResponse) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); + + if (!Oplog) + { + return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + CbObject ContainerObject = LoadCompactBinaryObject(Payload); + if (!ContainerObject) + { + return {HttpResponseCode::BadRequest, "Invalid payload format"}; + } + + CidStore& ChunkStore = m_CidStore; + RwLock AttachmentsLock; + std::unordered_set<IoHash, IoHash::Hasher> Attachments; + + auto HasAttachment = [&ChunkStore](const IoHash& RawHash) { return ChunkStore.ContainsChunk(RawHash); }; + auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) { + RwLock::ExclusiveLockScope _(AttachmentsLock); + if (BlockHash != IoHash::Zero) + { + Attachments.insert(BlockHash); + } + else + { + Attachments.insert(ChunkHashes.begin(), ChunkHashes.end()); + } + }; + auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) { + RwLock::ExclusiveLockScope _(AttachmentsLock); + Attachments.insert(RawHash); + }; + + RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + + if (RemoteResult.ErrorCode) + { + return ConvertResult(RemoteResult); + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + { + for (const IoHash& Hash : Attachments) + { + ZEN_DEBUG("Need attachment {}", Hash); + Cbo << Hash; + } + } + Cbo.EndArray(); // "need" + + OutResponse = Cbo.Save(); + return {HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::ReadOplog(const std::string_view ProjectId, + const std::string_view OplogId, + const HttpServerRequest::QueryParams& Params, + CbObject& OutResponse) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); + + if (!Oplog) + { + return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + size_t MaxBlockSize = 128u * 1024u * 1024u; + if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false) + { + if (auto Value = ParseInt<size_t>(Param)) + { + MaxBlockSize = Value.value(); + } + } + size_t MaxChunkEmbedSize = 1024u * 1024u; + if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false) + { + if (auto Value = ParseInt<size_t>(Param)) + { + MaxChunkEmbedSize = Value.value(); + } + } + + CidStore& ChunkStore = m_CidStore; + + RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( + ChunkStore, + *Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + false, + [](CompressedBuffer&&, const IoHash) {}, + [](const IoHash&) {}, + [](const std::unordered_set<IoHash, IoHash::Hasher>) {}); + + OutResponse = std::move(ContainerResult.ContainerObject); + return ConvertResult(ContainerResult); +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); + + if (!Oplog) + { + return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer(); + m_CidStore.AddChunk(Compressed, AttachmentRawHash); + ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize()); + })) + { + return {HttpResponseCode::BadRequest, "Invalid chunk in block"}; + } + + return {HttpResponseCode::OK, {}}; +} + +void +ProjectStore::Rpc(HttpServerRequest& HttpReq, + const std::string_view ProjectId, + const std::string_view OplogId, + IoBuffer&& Payload, + AuthMgr& AuthManager) +{ + using namespace std::literals; + HttpContentType PayloadContentType = HttpReq.RequestContentType(); + CbPackage Package; + CbObject Cb; + switch (PayloadContentType) + { + case HttpContentType::kJSON: + case HttpContentType::kUnknownContentType: + case HttpContentType::kText: + { + std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); + Cb = LoadCompactBinaryFromJson(JsonText).AsObject(); + if (!Cb) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected JSON format"); + } + } + break; + case HttpContentType::kCbObject: + Cb = LoadCompactBinaryObject(Payload); + if (!Cb) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected compact binary format"); + } + break; + case HttpContentType::kCbPackage: + Package = ParsePackageMessage(Payload); + Cb = Package.GetObject(); + if (!Cb) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected package message format"); + } + break; + default: + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type"); + } + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Rpc oplog request for unknown project '{}'", ProjectId)); + } + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); + + if (!Oplog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); + } + + std::string_view Method = Cb["method"sv].AsString(); + + if (Method == "import") + { + std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + else if (Method == "export") + { + std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + else if (Method == "getchunks") + { + CbPackage ResponsePackage; + { + CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); + CbObjectWriter ResponseWriter; + ResponseWriter.BeginArray("chunks"sv); + for (CbFieldView FieldView : ChunksArray) + { + IoHash RawHash = FieldView.AsHash(); + IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash); + if (ChunkBuffer) + { + ResponseWriter.AddHash(RawHash); + ResponsePackage.AddAttachment( + CbAttachment(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)), RawHash)); + } + } + ResponseWriter.EndArray(); + ResponsePackage.SetObject(ResponseWriter.Save()); + } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault); + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else if (Method == "putchunks") + { + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + for (const CbAttachment& Attachment : Attachments) + { + IoHash RawHash = Attachment.GetHash(); + CompressedBuffer Compressed = Attachment.AsCompressedBinary(); + m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly); + } + return HttpReq.WriteResponse(HttpResponseCode::OK); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) +{ + using namespace std::literals; + + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); + bool Force = Params["force"sv].AsBool(false); + + std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = + CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); + + if (RemoteStoreResult.first == nullptr) + { + return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; + } + std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); + RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + + ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", + Project.Identifier, + Oplog.OplogId(), + StoreInfo.Description, + NiceBytes(MaxBlockSize), + NiceBytes(MaxChunkEmbedSize)); + + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *RemoteStore, + Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + StoreInfo.CreateBlocks, + StoreInfo.UseTempBlockFiles, + Force); + + return ConvertResult(Result); +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) +{ + using namespace std::literals; + + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); + bool Force = Params["force"sv].AsBool(false); + + std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = + CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); + + if (RemoteStoreResult.first == nullptr) + { + return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; + } + std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); + RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + + ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); + RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force); + return ConvertResult(Result); +} + +////////////////////////////////////////////////////////////////////////// + +HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatsService& StatsService, AuthMgr& AuthMgr) +: m_Log(logging::Get("project")) +, m_CidStore(Store) +, m_ProjectStore(Projects) +, m_StatsService(StatsService) +, m_AuthMgr(AuthMgr) +{ + using namespace std::literals; + + m_StatsService.RegisterHandler("prj", *this); + + m_Router.AddPattern("project", "([[:alnum:]_.]+)"); + m_Router.AddPattern("log", "([[:alnum:]_.]+)"); + m_Router.AddPattern("op", "([[:digit:]]+?)"); + m_Router.AddPattern("chunk", "([[:xdigit:]]{24})"); + m_Router.AddPattern("hash", "([[:xdigit:]]{40})"); + + m_Router.RegisterRoute( + "", + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "list", + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/batch", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + // Parse Request + + IoBuffer Payload = HttpReq.ReadPayload(); + BinaryReader Reader(Payload); + + struct RequestHeader + { + enum + { + kMagic = 0xAAAA'77AC + }; + uint32_t Magic; + uint32_t ChunkCount; + uint32_t Reserved1; + uint32_t Reserved2; + }; + + struct RequestChunkEntry + { + Oid ChunkId; + uint32_t CorrelationId; + uint64_t Offset; + uint64_t RequestBytes; + }; + + if (Payload.Size() <= sizeof(RequestHeader)) + { + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + RequestHeader RequestHdr; + Reader.Read(&RequestHdr, sizeof RequestHdr); + + if (RequestHdr.Magic != RequestHeader::kMagic) + { + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + std::vector<RequestChunkEntry> RequestedChunks; + RequestedChunks.resize(RequestHdr.ChunkCount); + Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); + + // Make Response + + struct ResponseHeader + { + uint32_t Magic = 0xbada'b00f; + uint32_t ChunkCount; + uint32_t Reserved1 = 0; + uint32_t Reserved2 = 0; + }; + + struct ResponseChunkEntry + { + uint32_t CorrelationId; + uint32_t Flags = 0; + uint64_t ChunkSize; + }; + + std::vector<IoBuffer> OutBlobs; + OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry)); + for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) + { + const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; + IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId); + if (FoundChunk) + { + if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) + { + uint64_t Offset = RequestedChunk.Offset; + if (Offset > FoundChunk.Size()) + { + Offset = FoundChunk.Size(); + } + uint64_t Size = RequestedChunk.RequestBytes; + if ((Offset + Size) > FoundChunk.Size()) + { + Size = FoundChunk.Size() - Offset; + } + FoundChunk = IoBuffer(FoundChunk, Offset, Size); + } + } + OutBlobs.emplace_back(std::move(FoundChunk)); + } + uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData()); + ResponseHeader ResponseHdr; + ResponseHdr.ChunkCount = RequestHdr.ChunkCount; + memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); + ResponsePtr += sizeof(ResponseHdr); + for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) + { + // const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; + const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]); + ResponseChunkEntry ResponseChunk; + ResponseChunk.CorrelationId = ChunkIndex; + if (FoundChunk) + { + ResponseChunk.ChunkSize = FoundChunk.Size(); + } + else + { + ResponseChunk.ChunkSize = uint64_t(-1); + } + memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); + ResponsePtr += sizeof(ResponseChunk); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/files", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + // File manifest fetch, returns the client file list + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + const bool FilterClient = Params.GetValue("filter"sv) == "client"sv; + + CbObject ResponsePayload; + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->GetProjectFiles(ProjectId, OplogId, FilterClient, ResponsePayload); + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/{chunk}/info", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& ChunkId = Req.GetCapture(3); + + CbObject ResponsePayload; + std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunkInfo(ProjectId, OplogId, ChunkId, ResponsePayload); + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); + } + else if (Result.first == HttpResponseCode::NotFound) + { + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/{chunk}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& ChunkId = Req.GetCapture(3); + + uint64_t Offset = 0; + uint64_t Size = ~(0ull); + + auto QueryParms = Req.ServerRequest().GetQueryParams(); + + if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false) + { + if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm)) + { + Offset = OffsetVal.value(); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + } + + if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false) + { + if (auto SizeVal = ParseInt<uint64_t>(SizeParm)) + { + Size = SizeVal.value(); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + } + + HttpContentType AcceptType = HttpReq.AcceptContentType(); + + IoBuffer Chunk; + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->GetChunkRange(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk); + if (Result.first == HttpResponseCode::OK) + { + ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Chunk.GetContentType())); + return HttpReq.WriteResponse(HttpResponseCode::OK, Chunk.GetContentType(), Chunk); + } + else if (Result.first == HttpResponseCode::NotFound) + { + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kGet | HttpVerb::kHead); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/{hash}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& Cid = Req.GetCapture(3); + HttpContentType AcceptType = HttpReq.AcceptContentType(); + HttpContentType RequestType = HttpReq.RequestContentType(); + + switch (Req.ServerRequest().RequestVerb()) + { + case HttpVerb::kGet: + { + IoBuffer Value; + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value); + + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value); + } + else if (Result.first == HttpResponseCode::NotFound) + { + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + case HttpVerb::kPost: + { + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->PutChunk(ProjectId, OplogId, Cid, RequestType, HttpReq.ReadPayload()); + if (Result.first == HttpResponseCode::OK || Result.first == HttpResponseCode::Created) + { + return HttpReq.WriteResponse(Result.first); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/prep", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + // This operation takes a list of referenced hashes and decides which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response + + IoBuffer Payload = HttpReq.ReadPayload(); + CbObject RequestObject = LoadCompactBinaryObject(Payload); + + std::vector<IoHash> NeedList; + + for (auto Entry : RequestObject["have"sv]) + { + const IoHash FileHash = Entry.AsHash(); + + if (!m_CidStore.ContainsChunk(FileHash)) + { + ZEN_DEBUG("prep - NEED: {}", FileHash); + + NeedList.push_back(FileHash); + } + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponseCode::OK, Response); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/new", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + bool IsUsingSalt = false; + IoHash SaltHash = IoHash::Zero; + + if (std::string_view SaltParam = Params.GetValue("salt"); SaltParam.empty() == false) + { + const uint32_t Salt = std::stoi(std::string(SaltParam)); + SaltHash = IoHash::HashBuffer(&Salt, sizeof Salt); + IsUsingSalt = true; + } + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog& Oplog = *FoundLog; + + IoBuffer Payload = HttpReq.ReadPayload(); + + // This will attempt to open files which may not exist for the case where + // the prep step rejected the chunk. This should be fixed since there's + // a performance cost associated with any file system activity + + bool IsValid = true; + std::vector<IoHash> MissingChunks; + + CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer { + if (m_CidStore.ContainsChunk(Hash)) + { + // Return null attachment as we already have it, no point in reading it and storing it again + return {}; + } + + IoHash AttachmentId; + if (IsUsingSalt) + { + IoHash AttachmentSpec[]{SaltHash, Hash}; + AttachmentId = IoHash::HashBuffer(MakeMemoryView(AttachmentSpec)); + } + else + { + AttachmentId = Hash; + } + + std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString(); + if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) + { + return SharedBuffer(std::move(Data)); + } + else + { + IsValid = false; + MissingChunks.push_back(Hash); + + return {}; + } + }; + + CbPackage Package; + + if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver)) + { + std::filesystem::path BadPackagePath = + Oplog.TempPath() / "bad_packages"sv / fmt::format("session{}_request{}"sv, HttpReq.SessionId(), HttpReq.RequestId()); + + ZEN_WARN("Received malformed package! Saving payload to '{}'", BadPackagePath); + + WriteFile(BadPackagePath, Payload); + + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); + } + + if (!IsValid) + { + // TODO: emit diagnostics identifying missing chunks + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing chunk reference"); + } + + CbObject Core = Package.GetObject(); + + if (!Core["key"sv]) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); + } + + // Write core to oplog + + const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Package); + + if (OpLsn == ProjectStore::Oplog::kInvalidOp) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString()); + + HttpReq.WriteResponse(HttpResponseCode::Created); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/{op}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const std::string& ProjectId = Req.GetCapture(1); + const std::string& OplogId = Req.GetCapture(2); + const std::string& OpIdString = Req.GetCapture(3); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog& Oplog = *FoundLog; + + if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString)) + { + if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value())) + { + CbObject& Op = MaybeOp.value(); + if (Req.ServerRequest().AcceptContentType() == ZenContentType::kCbPackage) + { + CbPackage Package; + Package.SetObject(Op); + + Op.IterateAttachments([&](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); + + // We force this for now as content type is not consistently tracked (will + // be fixed in CidStore refactor) + Payload.SetContentType(ZenContentType::kCompressedBinary); + + if (Payload) + { + switch (Payload.GetContentType()) + { + case ZenContentType::kCbObject: + if (CbObject Object = LoadCompactBinaryObject(Payload)) + { + Package.AddAttachment(CbAttachment(Object)); + } + else + { + // Error - malformed object + + ZEN_WARN("malformed object returned for {}", AttachmentHash); + } + break; + + case ZenContentType::kCompressedBinary: + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload))) + { + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash)); + } + else + { + // Error - not compressed! + + ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash); + } + break; + + default: + Package.AddAttachment(CbAttachment(SharedBuffer(Payload))); + break; + } + } + }); + + return HttpReq.WriteResponse(HttpResponseCode::Accepted, Package); + } + else + { + // Client cannot accept a package, so we only send the core object + return HttpReq.WriteResponse(HttpResponseCode::Accepted, Op); + } + } + } + + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}", + [this](HttpRouterRequest& Req) { + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + + if (!Project) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); + } + + switch (Req.ServerRequest().RequestVerb()) + { + case HttpVerb::kGet: + { + ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); + + if (!OplogIt) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("oplog {} not found in project {}", OplogId, ProjectId)); + } + + ProjectStore::Oplog& Log = *OplogIt; + + CbObjectWriter Cb; + Cb << "id"sv << Log.OplogId() << "project"sv << Project->Identifier << "tempdir"sv << Log.TempPath().c_str() + << "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount" + << Log.OplogCount() << "expired"sv << Log.IsExpired(); + + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cb.Save()); + } + break; + + case HttpVerb::kPost: + { + std::filesystem::path OplogMarkerPath; + if (CbObject Params = Req.ServerRequest().ReadPayloadObject()) + { + OplogMarkerPath = Params["gcpath"sv].AsString(); + } + + ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); + + if (!OplogIt) + { + if (!Project->NewOplog(OplogId, OplogMarkerPath)) + { + // TODO: indicate why the operation failed! + return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); + } + + ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); + + return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); + } + + // I guess this should ultimately be used to execute RPCs but for now, it + // does absolutely nothing + + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); + } + break; + + case HttpVerb::kDelete: + { + ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId); + + Project->DeleteOplog(OplogId); + + return Req.ServerRequest().WriteResponse(HttpResponseCode::OK); + } + break; + + default: + break; + } + }, + HttpVerb::kPost | HttpVerb::kGet | HttpVerb::kDelete); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/entries", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + CbObjectWriter Response; + + if (FoundLog->OplogCount() > 0) + { + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty()) + { + Oid OpKeyId = OpKeyStringAsOId(OpKey); + std::optional<CbObject> Op = FoundLog->GetOpByKey(OpKeyId); + + if (Op.has_value()) + { + Response << "entry"sv << Op.value(); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + } + else + { + Response.BeginArray("entries"sv); + + FoundLog->IterateOplog([&Response](CbObject Op) { Response << Op; }); + + Response.EndArray(); + } + } + + return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}", + [this](HttpRouterRequest& Req) { + const std::string ProjectId = Req.GetCapture(1); + + switch (Req.ServerRequest().RequestVerb()) + { + case HttpVerb::kPost: + { + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + CbObject Params = LoadCompactBinaryObject(Payload); + std::string_view Id = Params["id"sv].AsString(); + std::string_view Root = Params["root"sv].AsString(); + std::string_view EngineRoot = Params["engine"sv].AsString(); + std::string_view ProjectRoot = Params["project"sv].AsString(); + std::string_view ProjectFilePath = Params["projectfile"sv].AsString(); + + const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId; + m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath); + + ZEN_INFO("established project - {} (id: '{}', roots: '{}', '{}', '{}', '{}'{})", + ProjectId, + Id, + Root, + EngineRoot, + ProjectRoot, + ProjectFilePath, + ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); + + Req.ServerRequest().WriteResponse(HttpResponseCode::Created); + } + break; + + case HttpVerb::kGet: + { + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + + if (!Project) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); + } + + std::vector<std::string> OpLogs = Project->ScanForOplogs(); + + CbObjectWriter Response; + Response << "id"sv << Project->Identifier; + Response << "root"sv << PathToUtf8(Project->RootDir); + Response << "engine"sv << PathToUtf8(Project->EngineRootDir); + Response << "project"sv << PathToUtf8(Project->ProjectRootDir); + Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath); + + Response.BeginArray("oplogs"sv); + for (const std::string& OplogId : OpLogs) + { + Response.BeginObject(); + Response << "id"sv << OplogId; + Response.EndObject(); + } + Response.EndArray(); // oplogs + + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save()); + } + break; + + case HttpVerb::kDelete: + { + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + + if (!Project) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); + } + + ZEN_INFO("deleting project '{}'", ProjectId); + if (!m_ProjectStore->DeleteProject(ProjectId)) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::Locked, + HttpContentType::kText, + fmt::format("project {} is in use", ProjectId)); + } + + return Req.ServerRequest().WriteResponse(HttpResponseCode::NoContent); + } + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete); + + // Push a oplog container + m_Router.RegisterRoute( + "{project}/oplog/{log}/save", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + if (HttpReq.RequestContentType() != HttpContentType::kCbObject) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type"); + } + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + + CbObject Response; + std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->WriteOplog(ProjectId, OplogId, std::move(Payload), Response); + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Response); + } + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kPost); + + // Pull a oplog container + m_Router.RegisterRoute( + "{project}/oplog/{log}/load", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + if (HttpReq.AcceptContentType() != HttpContentType::kCbObject) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type"); + } + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + + CbObject Response; + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->ReadOplog(ProjectId, OplogId, Req.ServerRequest().GetQueryParams(), Response); + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Response); + } + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kGet); + + // Do an rpc style operation on project/oplog + m_Router.RegisterRoute( + "{project}/oplog/{log}/rpc", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + + m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "details\\$", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + bool CSV = Params.GetValue("csv") == "true"; + bool Details = Params.GetValue("details") == "true"; + bool OpDetails = Params.GetValue("opdetails") == "true"; + bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; + + if (CSV) + { + ExtendableStringBuilder<4096> CSVWriter; + CSVHeader(Details, AttachmentDetails, CSVWriter); + + m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) { + Project.IterateOplogs([&](ProjectStore::Oplog& Oplog) { + Oplog.IterateOplogWithKey( + [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](int LSN, const Oid& Key, CbObject Op) { + CSVWriteOp(m_CidStore, + Project.Identifier, + Oplog.OplogId(), + Details, + AttachmentDetails, + LSN, + Key, + Op, + CSVWriter); + }); + }); + }); + + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); + } + else + { + CbObjectWriter Cbo; + Cbo.BeginArray("projects"); + { + m_ProjectStore->DiscoverProjects(); + + m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) { + std::vector<std::string> OpLogs = Project.ScanForOplogs(); + CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); + }); + } + Cbo.EndArray(); + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "details\\$/{project}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + bool CSV = Params.GetValue("csv") == "true"; + bool Details = Params.GetValue("details") == "true"; + bool OpDetails = Params.GetValue("opdetails") == "true"; + bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; + + Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId); + if (!FoundProject) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + ProjectStore::Project& Project = *FoundProject.Get(); + if (CSV) + { + ExtendableStringBuilder<4096> CSVWriter; + CSVHeader(Details, AttachmentDetails, CSVWriter); + + FoundProject->IterateOplogs([&](ProjectStore::Oplog& Oplog) { + Oplog.IterateOplogWithKey([this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](int LSN, + const Oid& Key, + CbObject Op) { + CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); + }); + }); + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); + } + else + { + CbObjectWriter Cbo; + std::vector<std::string> OpLogs = FoundProject->ScanForOplogs(); + Cbo.BeginArray("projects"); + { + CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); + } + Cbo.EndArray(); + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "details\\$/{project}/{log}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + bool CSV = Params.GetValue("csv") == "true"; + bool Details = Params.GetValue("details") == "true"; + bool OpDetails = Params.GetValue("opdetails") == "true"; + bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; + + Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId); + if (!FoundProject) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Project& Project = *FoundProject.Get(); + ProjectStore::Oplog& Oplog = *FoundLog; + if (CSV) + { + ExtendableStringBuilder<4096> CSVWriter; + CSVHeader(Details, AttachmentDetails, CSVWriter); + + Oplog.IterateOplogWithKey( + [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](int LSN, const Oid& Key, CbObject Op) { + CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); + }); + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); + } + else + { + CbObjectWriter Cbo; + Cbo.BeginArray("oplogs"); + { + CbWriteOplog(m_CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo); + } + Cbo.EndArray(); + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "details\\$/{project}/{log}/{chunk}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& ChunkId = Req.GetCapture(3); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + bool CSV = Params.GetValue("csv") == "true"; + bool Details = Params.GetValue("details") == "true"; + bool OpDetails = Params.GetValue("opdetails") == "true"; + bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; + + Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId); + if (!FoundProject) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) + { + return HttpReq.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)); + } + + const Oid ObjId = Oid::FromHexString(ChunkId); + ProjectStore::Project& Project = *FoundProject.Get(); + ProjectStore::Oplog& Oplog = *FoundLog; + + int LSN = Oplog.GetOpIndexByKey(ObjId); + if (LSN == -1) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + std::optional<CbObject> Op = Oplog.GetOpByIndex(LSN); + if (!Op.has_value()) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + if (CSV) + { + ExtendableStringBuilder<4096> CSVWriter; + CSVHeader(Details, AttachmentDetails, CSVWriter); + + CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, ObjId, Op.value(), CSVWriter); + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); + } + else + { + CbObjectWriter Cbo; + Cbo.BeginArray("ops"); + { + CbWriteOp(m_CidStore, Details, OpDetails, AttachmentDetails, LSN, ObjId, Op.value(), Cbo); + } + Cbo.EndArray(); + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + }, + HttpVerb::kGet); +} + +HttpProjectService::~HttpProjectService() +{ + m_StatsService.UnregisterHandler("prj", *this); +} + +const char* +HttpProjectService::BaseUri() const +{ + return "/prj/"; +} + +void +HttpProjectService::HandleRequest(HttpServerRequest& Request) +{ + if (m_Router.HandleRequest(Request) == false) + { + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + } +} + +void +HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq) +{ + const GcStorageSize StoreSize = m_ProjectStore->StorageSize(); + const CidStoreSize CidSize = m_CidStore.TotalSize(); + + CbObjectWriter Cbo; + Cbo.BeginObject("store"); + { + Cbo.BeginObject("size"); + { + Cbo << "disk" << StoreSize.DiskSize; + Cbo << "memory" << StoreSize.MemorySize; + } + Cbo.EndObject(); + } + Cbo.EndObject(); + + Cbo.BeginObject("cid"); + { + Cbo.BeginObject("size"); + { + Cbo << "tiny" << CidSize.TinySize; + Cbo << "small" << CidSize.SmallSize; + Cbo << "large" << CidSize.LargeSize; + Cbo << "total" << CidSize.TotalSize; + } + Cbo.EndObject(); + } + Cbo.EndObject(); + + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +namespace testutils { + using namespace std::literals; + + std::string OidAsString(const Oid& Id) + { + StringBuilder<25> OidStringBuilder; + Id.ToString(OidStringBuilder); + return OidStringBuilder.ToString(); + } + + CbPackage CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) + { + CbPackage Package; + CbObjectWriter Object; + Object << "key"sv << OidAsString(Id); + if (!Attachments.empty()) + { + Object.BeginArray("bulkdata"); + for (const auto& Attachment : Attachments) + { + CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); + Object.BeginObject(); + Object << "id"sv << Attachment.first; + Object << "type"sv + << "Standard"sv; + Object << "data"sv << Attach; + Object.EndObject(); + + Package.AddAttachment(Attach); + } + Object.EndArray(); + } + Package.SetObject(Object.Save()); + return Package; + }; + + std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes) + { + std::vector<std::pair<Oid, CompressedBuffer>> Result; + Result.reserve(Sizes.size()); + for (size_t Size : Sizes) + { + std::vector<uint8_t> Data; + Data.resize(Size); + uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data()); + for (size_t Idx = 0; Idx < Size / 2; ++Idx) + { + DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu); + } + if (Size & 1) + { + Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff); + } + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); + Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); + } + return Result; + } + + uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset) + { + if (RawOffset > 0) + { + uint64 BlockSize = 0; + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + return 0; + } + return BlockSize > 0 ? RawOffset % BlockSize : 0; + } + return 0; + } + +} // namespace testutils + +TEST_CASE("project.store.create") +{ + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::string_view ProjectName("proj1"sv); + std::filesystem::path BasePath = TempDir.Path() / "projectstore"; + ProjectStore ProjectStore(CidStore, BasePath, Gc); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; + + Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / ProjectName, + ProjectName, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + CHECK(ProjectStore.DeleteProject(ProjectName)); + CHECK(!Project->Exists(BasePath)); +} + +TEST_CASE("project.store.lifetimes") +{ + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"; + ProjectStore ProjectStore(CidStore, BasePath, Gc); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; + + Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {}); + CHECK(Oplog != nullptr); + + std::filesystem::path DeletePath; + CHECK(Project->PrepareForDelete(DeletePath)); + CHECK(!DeletePath.empty()); + CHECK(Project->OpenOplog("oplog1") == nullptr); + // Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers + CHECK(Oplog->OplogCount() == 0); + // Project is still valid since we have a Ref to it + CHECK(Project->Identifier == "proj1"sv); +} + +TEST_CASE("project.store.gc") +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"; + ProjectStore ProjectStore(CidStore, BasePath, Gc); + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + + std::filesystem::path Project1RootDir = TempDir.Path() / "game1"; + std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject"; + { + CreateDirectories(Project1FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); + } + + std::filesystem::path Project2RootDir = TempDir.Path() / "game2"; + std::filesystem::path Project2FilePath = TempDir.Path() / "game2" / "game.uproject"; + { + CreateDirectories(Project2FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project2FilePath, BasicFile::Mode::kTruncate); + } + + { + Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + Project1RootDir.string(), + Project1FilePath.string())); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", {}); + CHECK(Oplog != nullptr); + + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); + } + + { + Ref<ProjectStore::Project> Project2(ProjectStore.NewProject(BasePath / "proj2"sv, + "proj2"sv, + RootDir.string(), + EngineRootDir.string(), + Project2RootDir.string(), + Project2FilePath.string())); + ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1", {}); + CHECK(Oplog != nullptr); + + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177}))); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9123, 383, 590, 96}))); + Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221}))); + } + + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + ProjectStore.GatherReferences(GcCtx); + size_t RefCount = 0; + GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); + CHECK(RefCount == 14); + ProjectStore.CollectGarbage(GcCtx); + CHECK(ProjectStore.OpenProject("proj1"sv)); + CHECK(ProjectStore.OpenProject("proj2"sv)); + } + + std::filesystem::remove(Project1FilePath); + + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); + ProjectStore.GatherReferences(GcCtx); + size_t RefCount = 0; + GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); + CHECK(RefCount == 7); + ProjectStore.CollectGarbage(GcCtx); + CHECK(!ProjectStore.OpenProject("proj1"sv)); + CHECK(ProjectStore.OpenProject("proj2"sv)); + } +} + +TEST_CASE("project.store.partial.read") +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; + ProjectStore ProjectStore(CidStore, BasePath, Gc); + std::filesystem::path RootDir = TempDir.Path() / "root"sv; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; + + std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; + std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; + { + CreateDirectories(Project1FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); + } + + std::vector<Oid> OpIds; + OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; + { + Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + Project1RootDir.string(), + Project1FilePath.string())); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {}); + CHECK(Oplog != nullptr); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); + Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99}); + Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122}); + for (auto It : Attachments) + { + Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second)); + } + } + { + IoBuffer Chunk; + CHECK(ProjectStore + .GetChunk("proj1"sv, + "oplog1"sv, + Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), + HttpContentType::kCompressedBinary, + Chunk) + .first == HttpResponseCode::OK); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); + CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize()); + } + + IoBuffer ChunkResult; + CHECK(ProjectStore + .GetChunkRange("proj1"sv, + "oplog1"sv, + OidAsString(Attachments[OpIds[2]][1].first), + 0, + ~0ull, + HttpContentType::kCompressedBinary, + ChunkResult) + .first == HttpResponseCode::OK); + CHECK(ChunkResult); + CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult)).DecodeRawSize() == + Attachments[OpIds[2]][1].second.DecodeRawSize()); + + IoBuffer PartialChunkResult; + CHECK(ProjectStore + .GetChunkRange("proj1"sv, + "oplog1"sv, + OidAsString(Attachments[OpIds[2]][1].first), + 5, + 1773, + HttpContentType::kCompressedBinary, + PartialChunkResult) + .first == HttpResponseCode::OK); + CHECK(PartialChunkResult); + IoHash PartialRawHash; + uint64_t PartialRawSize; + CompressedBuffer PartialCompressedResult = + CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult), PartialRawHash, PartialRawSize); + CHECK(PartialRawSize >= 1773); + + uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); + SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); + SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress(); + const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]); + const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData()); + CHECK(FullDataPtr[0] == PartialDataPtr[0]); +} + +TEST_CASE("project.store.block") +{ + using namespace std::literals; + using namespace testutils; + + std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489, + 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251, + 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}); + + std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes); + std::vector<SharedBuffer> Chunks; + Chunks.reserve(AttachmentSizes.size()); + for (const auto& It : AttachmentsWithId) + { + Chunks.push_back(It.second.GetCompressed().Flatten()); + } + CompressedBuffer Block = GenerateBlock(std::move(Chunks)); + IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer(); + CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {})); +} + +#endif + +void +prj_forcelink() +{ +} + +} // namespace zen diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h new file mode 100644 index 000000000..e4f664b85 --- /dev/null +++ b/src/zenserver/projectstore/projectstore.h @@ -0,0 +1,372 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/uid.h> +#include <zencore/xxhash.h> +#include <zenhttp/httpserver.h> +#include <zenstore/gc.h> + +#include "monitoring/httpstats.h" + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +class CbPackage; +class CidStore; +class AuthMgr; +class ScrubContext; + +struct OplogEntry +{ + uint32_t OpLsn; + uint32_t OpCoreOffset; // note: Multiple of alignment! + uint32_t OpCoreSize; + uint32_t OpCoreHash; // Used as checksum + XXH3_128 OpKeyHash; // XXH128_canonical_t + + inline Oid OpKeyAsOId() const + { + Oid Id; + memcpy(Id.OidBits, &OpKeyHash, sizeof Id.OidBits); + return Id; + } +}; + +struct OplogEntryAddress +{ + uint64_t Offset; + uint64_t Size; +}; + +static_assert(IsPow2(sizeof(OplogEntry))); + +/** Project Store + + A project store consists of a number of Projects. + + Each project contains a number of oplogs (short for "operation log"). UE uses + one oplog per target platform to store the output of the cook process. + + An oplog consists of a sequence of "op" entries. Each entry is a structured object + containing references to attachments. Attachments are typically the serialized + package data split into separate chunks for bulk data, exports and header + information. + */ +class ProjectStore : public RefCounted, public GcStorage, public GcContributor +{ + struct OplogStorage; + +public: + ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc); + ~ProjectStore(); + + struct Project; + + struct Oplog + { + Oplog(std::string_view Id, + Project* Project, + CidStore& Store, + std::filesystem::path BasePath, + const std::filesystem::path& MarkerPath); + ~Oplog(); + + [[nodiscard]] static bool ExistsAt(std::filesystem::path BasePath); + + void Read(); + void Write(); + + void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn); + void IterateOplog(std::function<void(CbObject)>&& Fn); + void IterateOplogWithKey(std::function<void(int, const Oid&, CbObject)>&& Fn); + std::optional<CbObject> GetOpByKey(const Oid& Key); + std::optional<CbObject> GetOpByIndex(int Index); + int GetOpIndexByKey(const Oid& Key); + + IoBuffer FindChunk(Oid ChunkId); + + inline static const uint32_t kInvalidOp = ~0u; + + /** Persist a new oplog entry + * + * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected + */ + uint32_t AppendNewOplogEntry(CbPackage Op); + + uint32_t AppendNewOplogEntry(CbObject Core); + + enum UpdateType + { + kUpdateNewEntry, + kUpdateReplay + }; + + const std::string& OplogId() const { return m_OplogId; } + + const std::filesystem::path& TempPath() const { return m_TempPath; } + const std::filesystem::path& MarkerPath() const { return m_MarkerPath; } + + spdlog::logger& Log() { return m_OuterProject->Log(); } + void Flush(); + void Scrub(ScrubContext& Ctx) const; + void GatherReferences(GcContext& GcCtx); + uint64_t TotalSize() const; + + std::size_t OplogCount() const + { + RwLock::SharedLockScope _(m_OplogLock); + return m_LatestOpMap.size(); + } + + bool IsExpired() const; + std::filesystem::path PrepareForDelete(bool MoveFolder); + + private: + struct FileMapEntry + { + std::string ServerPath; + std::string ClientPath; + }; + + template<class V> + using OidMap = tsl::robin_map<Oid, V, Oid::Hasher>; + + Project* m_OuterProject = nullptr; + CidStore& m_CidStore; + std::filesystem::path m_BasePath; + std::filesystem::path m_MarkerPath; + std::filesystem::path m_TempPath; + + mutable RwLock m_OplogLock; + OidMap<IoHash> m_ChunkMap; // output data chunk id -> CAS address + OidMap<IoHash> m_MetaMap; // meta chunk id -> CAS address + OidMap<FileMapEntry> m_FileMap; // file id -> file map entry + int32_t m_ManifestVersion; // File system manifest version + tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file + OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key + + RefPtr<OplogStorage> m_Storage; + std::string m_OplogId; + + /** Scan oplog and register each entry, thus updating the in-memory tracking tables + */ + void ReplayLog(); + + struct OplogEntryMapping + { + struct Mapping + { + Oid Id; + IoHash Hash; + }; + struct FileMapping : public Mapping + { + std::string ServerPath; + std::string ClientPath; + }; + std::vector<Mapping> Chunks; + std::vector<Mapping> Meta; + std::vector<FileMapping> Files; + }; + + OplogEntryMapping GetMapping(CbObject Core); + + /** Update tracking metadata for a new oplog entry + * + * This is used during replay (and gets called as part of new op append) + * + * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected + */ + uint32_t RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, + const OplogEntryMapping& OpMapping, + const OplogEntry& OpEntry, + UpdateType TypeOfUpdate); + + void AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock, + Oid FileId, + IoHash Hash, + std::string_view ServerPath, + std::string_view ClientPath); + void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash); + void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash); + }; + + struct Project : public RefCounted + { + std::string Identifier; + std::filesystem::path RootDir; + std::string EngineRootDir; + std::string ProjectRootDir; + std::string ProjectFilePath; + + Oplog* NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath); + Oplog* OpenOplog(std::string_view OplogId); + void DeleteOplog(std::string_view OplogId); + void IterateOplogs(std::function<void(const Oplog&)>&& Fn) const; + void IterateOplogs(std::function<void(Oplog&)>&& Fn); + std::vector<std::string> ScanForOplogs() const; + bool IsExpired() const; + + Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath); + virtual ~Project(); + + void Read(); + void Write(); + [[nodiscard]] static bool Exists(std::filesystem::path BasePath); + void Flush(); + void Scrub(ScrubContext& Ctx); + spdlog::logger& Log(); + void GatherReferences(GcContext& GcCtx); + uint64_t TotalSize() const; + bool PrepareForDelete(std::filesystem::path& OutDeletePath); + + private: + ProjectStore* m_ProjectStore; + CidStore& m_CidStore; + mutable RwLock m_ProjectLock; + std::map<std::string, std::unique_ptr<Oplog>> m_Oplogs; + std::vector<std::unique_ptr<Oplog>> m_DeletedOplogs; + std::filesystem::path m_OplogStoragePath; + + std::filesystem::path BasePathForOplog(std::string_view OplogId); + }; + + // Oplog* OpenProjectOplog(std::string_view ProjectId, std::string_view OplogId); + + Ref<Project> OpenProject(std::string_view ProjectId); + Ref<Project> NewProject(std::filesystem::path BasePath, + std::string_view ProjectId, + std::string_view RootDir, + std::string_view EngineRootDir, + std::string_view ProjectRootDir, + std::string_view ProjectFilePath); + bool DeleteProject(std::string_view ProjectId); + bool Exists(std::string_view ProjectId); + void Flush(); + void Scrub(ScrubContext& Ctx); + void DiscoverProjects(); + void IterateProjects(std::function<void(Project& Prj)>&& Fn); + + spdlog::logger& Log() { return m_Log; } + const std::filesystem::path& BasePath() const { return m_ProjectBasePath; } + + virtual void GatherReferences(GcContext& GcCtx) override; + virtual void CollectGarbage(GcContext& GcCtx) override; + virtual GcStorageSize StorageSize() const override; + + CbArray GetProjectsList(); + std::pair<HttpResponseCode, std::string> GetProjectFiles(const std::string_view ProjectId, + const std::string_view OplogId, + bool FilterClient, + CbObject& OutPayload); + std::pair<HttpResponseCode, std::string> GetChunkInfo(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + CbObject& OutPayload); + std::pair<HttpResponseCode, std::string> GetChunkRange(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + IoBuffer& OutChunk); + std::pair<HttpResponseCode, std::string> GetChunk(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view Cid, + ZenContentType AcceptType, + IoBuffer& OutChunk); + + std::pair<HttpResponseCode, std::string> PutChunk(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view Cid, + ZenContentType ContentType, + IoBuffer&& Chunk); + + std::pair<HttpResponseCode, std::string> WriteOplog(const std::string_view ProjectId, + const std::string_view OplogId, + IoBuffer&& Payload, + CbObject& OutResponse); + + std::pair<HttpResponseCode, std::string> ReadOplog(const std::string_view ProjectId, + const std::string_view OplogId, + const HttpServerRequest::QueryParams& Params, + CbObject& OutResponse); + + std::pair<HttpResponseCode, std::string> WriteBlock(const std::string_view ProjectId, + const std::string_view OplogId, + IoBuffer&& Payload); + + void Rpc(HttpServerRequest& HttpReq, + const std::string_view ProjectId, + const std::string_view OplogId, + IoBuffer&& Payload, + AuthMgr& AuthManager); + + std::pair<HttpResponseCode, std::string> Export(ProjectStore::Project& Project, + ProjectStore::Oplog& Oplog, + CbObjectView&& Params, + AuthMgr& AuthManager); + + std::pair<HttpResponseCode, std::string> Import(ProjectStore::Project& Project, + ProjectStore::Oplog& Oplog, + CbObjectView&& Params, + AuthMgr& AuthManager); + +private: + spdlog::logger& m_Log; + CidStore& m_CidStore; + std::filesystem::path m_ProjectBasePath; + mutable RwLock m_ProjectsLock; + std::map<std::string, Ref<Project>> m_Projects; + + std::filesystem::path BasePathForProject(std::string_view ProjectId); +}; + +////////////////////////////////////////////////////////////////////////// +// +// {project} a project identifier +// {target} a variation of the project, typically a build target +// {lsn} oplog entry sequence number +// +// /prj/{project} +// /prj/{project}/oplog/{target} +// /prj/{project}/oplog/{target}/{lsn} +// +// oplog entry +// +// id: {id} +// key: {} +// meta: {} +// data: [] +// refs: +// + +class HttpProjectService : public HttpService, public IHttpStatsProvider +{ +public: + HttpProjectService(CidStore& Store, ProjectStore* InProjectStore, HttpStatsService& StatsService, AuthMgr& AuthMgr); + ~HttpProjectService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + +private: + inline spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; + CidStore& m_CidStore; + HttpRequestRouter m_Router; + Ref<ProjectStore> m_ProjectStore; + HttpStatsService& m_StatsService; + AuthMgr& m_AuthMgr; +}; + +void prj_forcelink(); + +} // namespace zen diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp new file mode 100644 index 000000000..1e6ca51a1 --- /dev/null +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -0,0 +1,1036 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "remoteprojectstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/timer.h> +#include <zencore/workthreadpool.h> +#include <zenstore/cidstore.h> + +namespace zen { + +/* + OplogContainer + Binary("ops") // Compressed CompactBinary object to hide attachment references, also makes the oplog smaller + { + CbArray("ops") + { + CbObject Op + (CbFieldType::BinaryAttachment Attachments[]) + (OpData) + } + } + CbArray("blocks") + CbObject + CbFieldType::BinaryAttachment "rawhash" // Optional, only if we are creating blocks (Jupiter/File) + CbArray("chunks") + CbFieldType::Hash // Chunk hashes + CbArray("chunks") // Optional, only if we are not creating blocks (Zen) + CbFieldType::BinaryAttachment // Chunk attachment hashes + + CompressedBinary ChunkBlock + { + VarUInt ChunkCount + VarUInt ChunkSizes[ChunkCount] + uint8_t[chunksize])[ChunkCount] + } +*/ + +////////////////////////////// AsyncRemoteResult + +struct AsyncRemoteResult +{ + void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText) + { + int32_t Expected = 0; + if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1)) + { + m_ErrorReason = ErrorReason; + m_ErrorText = ErrorText; + } + } + bool IsError() const { return m_ErrorCode.load() != 0; } + int GetError() const { return m_ErrorCode.load(); }; + const std::string& GetErrorReason() const { return m_ErrorReason; }; + const std::string& GetErrorText() const { return m_ErrorText; }; + RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const + { + return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText}; + } + +private: + std::atomic<int32_t> m_ErrorCode = 0; + std::string m_ErrorReason; + std::string m_ErrorText; +}; + +bool +IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) +{ + IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer(); + + MemoryView BlockView = BlockPayload.GetView(); + const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); + uint32_t NumberSize; + uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); + ReadPtr += NumberSize; + std::vector<uint64_t> ChunkSizes; + ChunkSizes.reserve(ChunkCount); + while (ChunkCount--) + { + ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize)); + ReadPtr += NumberSize; + } + ptrdiff_t TempBufferLength = std::distance(reinterpret_cast<const uint8_t*>(BlockView.GetData()), ReadPtr); + ZEN_ASSERT(TempBufferLength > 0); + for (uint64_t ChunkSize : ChunkSizes) + { + IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize); + IoHash AttachmentRawHash; + uint64_t AttachmentRawSize; + CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize); + + if (!CompressedChunk) + { + ZEN_ERROR("Invalid chunk in block"); + return false; + } + Visitor(std::move(CompressedChunk), AttachmentRawHash); + ReadPtr += ChunkSize; + ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd()); + } + return true; +}; + +CompressedBuffer +GenerateBlock(std::vector<SharedBuffer>&& Chunks) +{ + size_t ChunkCount = Chunks.size(); + SharedBuffer SizeBuffer; + { + IoBuffer TempBuffer(ChunkCount * 9); + MutableMemoryView View = TempBuffer.GetMutableView(); + uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData()); + uint8_t* BufferEndPtr = BufferStartPtr; + BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); + auto It = Chunks.begin(); + while (It != Chunks.end()) + { + BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(It->GetSize()), BufferEndPtr); + It++; + } + ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); + ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); + SizeBuffer = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength))); + } + CompositeBuffer AllBuffers(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks))); + + CompressedBuffer CompressedBlock = + CompressedBuffer::Compress(std::move(AllBuffers), OodleCompressor::Mermaid, OodleCompressionLevel::None); + + return CompressedBlock; +} + +struct Block +{ + IoHash BlockHash; + std::vector<IoHash> ChunksInBlock; +}; + +void +CreateBlock(WorkerThreadPool& WorkerPool, + Latch& OpSectionsLatch, + std::vector<SharedBuffer>&& ChunksInBlock, + RwLock& SectionsLock, + std::vector<Block>& Blocks, + size_t BlockIndex, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + AsyncRemoteResult& RemoteResult) +{ + OpSectionsLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable { + auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + if (!Chunks.empty()) + { + CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash + IoHash BlockHash = CompressedBlock.DecodeRawHash(); + AsyncOnBlock(std::move(CompressedBlock), BlockHash); + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope __(SectionsLock); + Blocks[BlockIndex].BlockHash = BlockHash; + } + } + }); +} + +size_t +AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks) +{ + size_t BlockIndex; + { + RwLock::ExclusiveLockScope _(BlocksLock); + BlockIndex = Blocks.size(); + Blocks.resize(BlockIndex + 1); + } + return BlockIndex; +} + +CbObject +BuildContainer(CidStore& ChunkStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + WorkerThreadPool& WorkerPool, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(const IoHash&)>& OnLargeAttachment, + const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, + AsyncRemoteResult& RemoteResult) +{ + using namespace std::literals; + + std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; + CbObjectWriter SectionOpsWriter; + SectionOpsWriter.BeginArray("ops"sv); + + size_t OpCount = 0; + + CbObject OplogContainerObject; + { + RwLock BlocksLock; + std::vector<Block> Blocks; + CompressedBuffer OpsBuffer; + + Latch BlockCreateLatch(1); + + std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; + + size_t BlockSize = 0; + std::vector<SharedBuffer> ChunksInBlock; + + std::unordered_set<IoHash, IoHash::Hasher> Attachments; + Oplog.IterateOplog([&Attachments, &SectionOpsWriter, &OpCount](CbObject Op) { + Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert(FieldView.AsAttachment()); }); + (SectionOpsWriter) << Op; + OpCount++; + }); + + for (const IoHash& AttachmentHash : Attachments) + { + IoBuffer Payload = ChunkStore.FindChunkByCid(AttachmentHash); + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {} for op", AttachmentHash), + {}); + ZEN_ERROR("Failed to build container ({}). Reason: '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()); + return {}; + } + uint64_t PayloadSize = Payload.GetSize(); + if (PayloadSize > MaxChunkEmbedSize) + { + if (LargeChunkHashes.insert(AttachmentHash).second) + { + OnLargeAttachment(AttachmentHash); + } + continue; + } + + if (!BlockAttachmentHashes.insert(AttachmentHash).second) + { + continue; + } + + BlockSize += PayloadSize; + if (BuildBlocks) + { + ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload))); + } + else + { + Payload = {}; + } + + if (BlockSize >= MaxBlockSize) + { + size_t BlockIndex = AddBlock(BlocksLock, Blocks); + if (BuildBlocks) + { + CreateBlock(WorkerPool, + BlockCreateLatch, + std::move(ChunksInBlock), + BlocksLock, + Blocks, + BlockIndex, + AsyncOnBlock, + RemoteResult); + } + else + { + OnBlockChunks(BlockAttachmentHashes); + } + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope _(BlocksLock); + Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), + BlockAttachmentHashes.begin(), + BlockAttachmentHashes.end()); + } + BlockAttachmentHashes.clear(); + ChunksInBlock.clear(); + BlockSize = 0; + } + } + if (BlockSize > 0) + { + size_t BlockIndex = AddBlock(BlocksLock, Blocks); + if (BuildBlocks) + { + CreateBlock(WorkerPool, + BlockCreateLatch, + std::move(ChunksInBlock), + BlocksLock, + Blocks, + BlockIndex, + AsyncOnBlock, + RemoteResult); + } + else + { + OnBlockChunks(BlockAttachmentHashes); + } + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope _(BlocksLock); + Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), + BlockAttachmentHashes.begin(), + BlockAttachmentHashes.end()); + } + BlockAttachmentHashes.clear(); + ChunksInBlock.clear(); + BlockSize = 0; + } + SectionOpsWriter.EndArray(); // "ops" + + CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); + ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize())); + + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining()); + } + + if (!RemoteResult.IsError()) + { + CbObjectWriter OplogContinerWriter; + RwLock::SharedLockScope _(BlocksLock); + OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); + + OplogContinerWriter.BeginArray("blocks"sv); + { + for (const Block& B : Blocks) + { + ZEN_ASSERT(!B.ChunksInBlock.empty()); + if (BuildBlocks) + { + ZEN_ASSERT(B.BlockHash != IoHash::Zero); + + OplogContinerWriter.BeginObject(); + { + OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& RawHash : B.ChunksInBlock) + { + OplogContinerWriter.AddHash(RawHash); + } + } + OplogContinerWriter.EndArray(); // "chunks" + } + OplogContinerWriter.EndObject(); + continue; + } + + ZEN_ASSERT(B.BlockHash == IoHash::Zero); + OplogContinerWriter.BeginObject(); + { + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& RawHash : B.ChunksInBlock) + { + OplogContinerWriter.AddBinaryAttachment(RawHash); + } + } + OplogContinerWriter.EndArray(); + } + OplogContinerWriter.EndObject(); + } + } + OplogContinerWriter.EndArray(); // "blocks"sv + + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& AttachmentHash : LargeChunkHashes) + { + OplogContinerWriter.AddBinaryAttachment(AttachmentHash); + } + } + OplogContinerWriter.EndArray(); // "chunks" + + OplogContainerObject = OplogContinerWriter.Save(); + } + } + return OplogContainerObject; +} + +RemoteProjectStore::LoadContainerResult +BuildContainer(CidStore& ChunkStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(const IoHash&)>& OnLargeAttachment, + const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks) +{ + // We are creating a worker thread pool here since we are uploading a lot of attachments in one go and we dont want to keep a + // WorkerThreadPool alive + size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); + WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); + + AsyncRemoteResult RemoteResult; + CbObject ContainerObject = BuildContainer(ChunkStore, + Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + BuildBlocks, + WorkerPool, + AsyncOnBlock, + OnLargeAttachment, + OnBlockChunks, + RemoteResult); + return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; +} + +RemoteProjectStore::Result +SaveOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + bool UseTempBlocks, + bool ForceUpload) +{ + using namespace std::literals; + + Stopwatch Timer; + + // We are creating a worker thread pool here since we are uploading a lot of attachments in one go + // Doing upload is a rare and transient occation so we don't want to keep a WorkerThreadPool alive. + size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); + WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); + + std::filesystem::path AttachmentTempPath; + if (UseTempBlocks) + { + AttachmentTempPath = Oplog.TempPath(); + AttachmentTempPath.append(".pending"); + CreateDirectories(AttachmentTempPath); + } + + AsyncRemoteResult RemoteResult; + RwLock AttachmentsLock; + std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks; + + auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, + const IoHash& BlockHash) { + std::filesystem::path BlockPath = AttachmentTempPath; + BlockPath.append(BlockHash.ToHexString()); + if (!std::filesystem::exists(BlockPath)) + { + IoBuffer BlockBuffer; + try + { + BasicFile BlockFile; + BlockFile.Open(BlockPath, BasicFile::Mode::kTruncateDelete); + uint64_t Offset = 0; + for (const SharedBuffer& Buffer : CompressedBlock.GetCompressed().GetSegments()) + { + BlockFile.Write(Buffer.GetView(), Offset); + Offset += Buffer.GetSize(); + } + void* FileHandle = BlockFile.Detach(); + BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset); + } + catch (std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + Ex.what(), + "Unable to create temp block file"); + return; + } + + BlockBuffer.MarkAsDeleteOnClose(); + { + RwLock::ExclusiveLockScope __(AttachmentsLock); + CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)}); + } + ZEN_DEBUG("Saved temp block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); + } + }; + + auto UploadBlock = [&RemoteStore, &RemoteResult](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { + RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); + return; + } + ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); + }; + + std::vector<std::vector<IoHash>> BlockChunks; + auto OnBlockChunks = [&BlockChunks](const std::unordered_set<IoHash, IoHash::Hasher>& Chunks) { + BlockChunks.push_back({Chunks.begin(), Chunks.end()}); + ZEN_DEBUG("Found {} block chunks", Chunks.size()); + }; + + auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments](const IoHash& AttachmentHash) { + { + RwLock::ExclusiveLockScope _(AttachmentsLock); + LargeAttachments.insert(AttachmentHash); + } + ZEN_DEBUG("Found attachment {}", AttachmentHash); + }; + + std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock; + if (UseTempBlocks) + { + OnBlock = MakeTempBlock; + } + else + { + OnBlock = UploadBlock; + } + + CbObject OplogContainerObject = BuildContainer(ChunkStore, + Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + BuildBlocks, + WorkerPool, + OnBlock, + OnLargeAttachment, + OnBlockChunks, + RemoteResult); + + if (!RemoteResult.IsError()) + { + uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); + uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); + ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount); + RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); + if (ContainerSaveResult.ErrorCode) + { + RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); + ZEN_ERROR("Failed to save oplog container ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); + } + ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000))); + if (!ContainerSaveResult.Needs.empty()) + { + ZEN_INFO("Filtering needed attachments..."); + std::vector<IoHash> NeededLargeAttachments; + std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments; + NeededLargeAttachments.reserve(LargeAttachments.size()); + NeededOtherAttachments.reserve(CreatedBlocks.size()); + if (ForceUpload) + { + NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end()); + } + else + { + for (const IoHash& RawHash : ContainerSaveResult.Needs) + { + if (LargeAttachments.contains(RawHash)) + { + NeededLargeAttachments.push_back(RawHash); + continue; + } + NeededOtherAttachments.insert(RawHash); + } + } + + Latch SaveAttachmentsLatch(1); + if (!NeededLargeAttachments.empty()) + { + ZEN_INFO("Saving large attachments..."); + for (const IoHash& RawHash : NeededLargeAttachments) + { + if (RemoteResult.IsError()) + { + break; + } + SaveAttachmentsLatch.AddCount(1); + WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + + IoBuffer Payload; + if (auto It = CreatedBlocks.find(RawHash); It != CreatedBlocks.end()) + { + Payload = std::move(It->second); + } + else + { + Payload = ChunkStore.FindChunkByCid(RawHash); + } + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_ERROR("Failed to build container ({}). Reason: '{}'", + RemoteResult.GetErrorReason(), + RemoteResult.GetError()); + return; + } + + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); + } + } + + if (!CreatedBlocks.empty()) + { + ZEN_INFO("Saving created block attachments..."); + for (auto& It : CreatedBlocks) + { + if (RemoteResult.IsError()) + { + break; + } + const IoHash& RawHash = It.first; + if (ForceUpload || NeededOtherAttachments.contains(RawHash)) + { + IoBuffer Payload = It.second; + ZEN_ASSERT(Payload); + SaveAttachmentsLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); + } + It.second = {}; + } + } + + if (!BlockChunks.empty()) + { + ZEN_INFO("Saving chunk block attachments..."); + for (const std::vector<IoHash>& Chunks : BlockChunks) + { + if (RemoteResult.IsError()) + { + break; + } + std::vector<IoHash> NeededChunks; + if (ForceUpload) + { + NeededChunks = Chunks; + } + else + { + NeededChunks.reserve(Chunks.size()); + for (const IoHash& Chunk : Chunks) + { + if (NeededOtherAttachments.contains(Chunk)) + { + NeededChunks.push_back(Chunk); + } + } + if (NeededChunks.empty()) + { + continue; + } + } + SaveAttachmentsLatch.AddCount(1); + WorkerPool.ScheduleWork([&RemoteStore, + &ChunkStore, + &SaveAttachmentsLatch, + &RemoteResult, + &Chunks, + NeededChunks = std::move(NeededChunks), + ForceUpload]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + std::vector<SharedBuffer> ChunkBuffers; + ChunkBuffers.reserve(NeededChunks.size()); + for (const IoHash& Chunk : NeededChunks) + { + IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk); + if (!ChunkPayload) + { + RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), + fmt::format("Missing chunk {}"sv, Chunk), + fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); + ChunkBuffers.clear(); + break; + } + ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload))); + } + RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachments with {} chunks ({}). Reason: '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Saved {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + }); + } + } + SaveAttachmentsLatch.CountDown(); + while (!SaveAttachmentsLatch.Wait(1000)) + { + ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining()); + } + SaveAttachmentsLatch.Wait(); + } + + if (!RemoteResult.IsError()) + { + ZEN_INFO("Finalizing oplog container..."); + RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); + if (ContainerFinalizeResult.ErrorCode) + { + RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); + ZEN_ERROR("Failed to finalize oplog container {} ({}). Reason: '{}'", + ContainerSaveResult.RawHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + } + ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000))); + } + } + + RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + ZEN_INFO("Saved oplog {} in {}", + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return Result; +}; + +RemoteProjectStore::Result +SaveOplogContainer(ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment) +{ + using namespace std::literals; + + Stopwatch Timer; + + CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); + for (CbFieldView LargeChunksField : LargeChunksArray) + { + IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); + if (HasAttachment(AttachmentHash)) + { + continue; + } + OnNeedAttachment(AttachmentHash); + }; + + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + for (CbFieldView BlockField : BlocksArray) + { + CbObjectView BlockView = BlockField.AsObjectView(); + IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); + + CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); + if (BlockHash == IoHash::Zero) + { + std::vector<IoHash> NeededChunks; + NeededChunks.reserve(ChunksArray.GetSize()); + for (CbFieldView ChunkField : ChunksArray) + { + IoHash ChunkHash = ChunkField.AsBinaryAttachment(); + if (HasAttachment(ChunkHash)) + { + continue; + } + NeededChunks.emplace_back(ChunkHash); + } + + if (!NeededChunks.empty()) + { + OnNeedBlock(IoHash::Zero, std::move(NeededChunks)); + } + continue; + } + + for (CbFieldView ChunkField : ChunksArray) + { + IoHash ChunkHash = ChunkField.AsHash(); + if (HasAttachment(ChunkHash)) + { + continue; + } + + OnNeedBlock(BlockHash, {}); + break; + } + }; + + MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); + IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); + IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); + + CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); + if (!SectionObject) + { + ZEN_ERROR("Failed to save oplog container. Reason: '{}'", "Section has unexpected data type"); + return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), + Timer.GetElapsedTimeMs() / 1000.500, + "Section has unexpected data type", + "Failed to save oplog container"}; + } + + CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + for (CbFieldView OpEntry : OpsArray) + { + CbObjectView Core = OpEntry.AsObjectView(); + BinaryWriter Writer; + Core.CopyTo(Writer); + MemoryView OpView = Writer.GetView(); + IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); + CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); + const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Op); + if (OpLsn == ProjectStore::Oplog::kInvalidOp) + { + return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), + Timer.GetElapsedTimeMs() / 1000.500, + "Failed saving op", + "Failed to save oplog container"}; + } + ZEN_DEBUG("oplog entry #{}", OpLsn); + } + return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; +} + +RemoteProjectStore::Result +LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload) +{ + using namespace std::literals; + + Stopwatch Timer; + + // We are creating a worker thread pool here since we are download a lot of attachments in one go and we dont want to keep a + // WorkerThreadPool alive + size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); + WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); + + std::unordered_set<IoHash, IoHash::Hasher> Attachments; + std::vector<std::vector<IoHash>> ChunksInBlocks; + + RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); + if (LoadContainerResult.ErrorCode) + { + ZEN_WARN("Failed to load oplog container, reason: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode); + return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Reason = LoadContainerResult.Reason, + .Text = LoadContainerResult.Text}; + } + ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000))); + + AsyncRemoteResult RemoteResult; + Latch AttachmentsWorkLatch(1); + + auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { + return !ForceDownload && ChunkStore.ContainsChunk(RawHash); + }; + auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult]( + const IoHash& BlockHash, + std::vector<IoHash>&& Chunks) { + if (BlockHash == IoHash::Zero) + { + AttachmentsWorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + + RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to attachments with {} chunks ({}). Reason: '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + for (const auto& It : Result.Chunks) + { + ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); + } + }); + return; + } + AttachmentsWorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); + if (BlockResult.ErrorCode) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + ZEN_ERROR("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); + + if (!IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); + })) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Invalid format for block {}", BlockHash), + {}); + ZEN_ERROR("Failed to load oplog container, attachment {} has invalid format ({}). Reason: '{}'", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + }); + }; + + auto OnNeedAttachment = + [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) { + if (!Attachments.insert(RawHash).second) + { + return; + } + + AttachmentsWorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode); + return; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); + ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); + }); + }; + + RemoteProjectStore::Result Result = + SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + + AttachmentsWorkLatch.CountDown(); + while (!AttachmentsWorkLatch.Wait(1000)) + { + ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining()); + } + AttachmentsWorkLatch.Wait(); + if (Result.ErrorCode == 0) + { + Result = RemoteResult.ConvertResult(); + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + + ZEN_INFO("Loaded oplog {} in {}", + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0))); + + return Result; +} + +} // namespace zen diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h new file mode 100644 index 000000000..dcabaedd4 --- /dev/null +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -0,0 +1,111 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "projectstore.h" + +#include <unordered_set> + +namespace zen { + +class CidStore; +class WorkerThreadPool; + +class RemoteProjectStore +{ +public: + struct Result + { + int32_t ErrorCode{}; + double ElapsedSeconds{}; + std::string Reason; + std::string Text; + }; + + struct SaveResult : public Result + { + std::unordered_set<IoHash, IoHash::Hasher> Needs; + IoHash RawHash; + }; + + struct SaveAttachmentResult : public Result + { + }; + + struct SaveAttachmentsResult : public Result + { + }; + + struct LoadAttachmentResult : public Result + { + IoBuffer Bytes; + }; + + struct LoadContainerResult : public Result + { + CbObject ContainerObject; + }; + + struct LoadAttachmentsResult : public Result + { + std::vector<std::pair<IoHash, CompressedBuffer>> Chunks; + }; + + struct RemoteStoreInfo + { + bool CreateBlocks; + bool UseTempBlockFiles; + std::string Description; + }; + + virtual ~RemoteProjectStore() {} + + virtual RemoteStoreInfo GetInfo() const = 0; + + virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) = 0; + virtual Result FinalizeContainer(const IoHash& RawHash) = 0; + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; + + virtual LoadContainerResult LoadContainer() = 0; + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0; + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0; +}; + +struct RemoteStoreOptions +{ + size_t MaxBlockSize = 128u * 1024u * 1024u; + size_t MaxChunkEmbedSize = 1024u * 1024u; +}; + +RemoteProjectStore::LoadContainerResult BuildContainer( + CidStore& ChunkStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(const IoHash&)>& OnLargeAttachment, + const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks); + +RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment); + +RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + bool UseTempBlocks, + bool ForceUpload); + +RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload); + +CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks); +bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); + +} // namespace zen diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp new file mode 100644 index 000000000..6ff471ae5 --- /dev/null +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -0,0 +1,341 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenremoteprojectstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compositebuffer.h> +#include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/timer.h> +#include <zenhttp/httpshared.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class ZenRemoteStore : public RemoteProjectStore +{ +public: + ZenRemoteStore(std::string_view HostAddress, + std::string_view Project, + std::string_view Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize) + : m_HostAddress(HostAddress) + , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress)) + , m_Project(Project) + , m_Oplog(Oplog) + , m_MaxBlockSize(MaxBlockSize) + , m_MaxChunkEmbedSize(MaxChunkEmbedSize) + { + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = false, .UseTempBlockFiles = false, .Description = fmt::format("[zen] {}"sv, m_HostAddress)}; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/save"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + Session->SetUrl({SaveRequest}); + Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); + MemoryView Data(Payload.GetView()); + Session->SetBody({reinterpret_cast<const char*>(Data.GetData()), Data.GetSize()}); + cpr::Response Response = Session->Post(); + SaveResult Result = SaveResult{ConvertResult(Response)}; + + if (Result.ErrorCode) + { + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + IoBuffer ResponsePayload(IoBuffer::Wrap, Response.text.data(), Response.text.size()); + CbObject ResponseObject = LoadCompactBinaryObject(ResponsePayload); + if (!ResponseObject) + { + Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, + m_ProjectStoreUrl, + m_Project, + m_Oplog); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView(); + for (CbFieldView FieldView : NeedsArray) + { + IoHash ChunkHash = FieldView.AsHash(); + Result.Needs.insert(ChunkHash); + } + + Result.RawHash = IoHash::HashBuffer(Payload); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); + Session->SetUrl({SaveRequest}); + Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); + cpr::Response Response = Session->Post(); + SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + Stopwatch Timer; + + CbPackage RequestPackage; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("method"sv, "putchunks"sv); + RequestWriter.BeginArray("chunks"sv); + { + for (const SharedBuffer& Chunk : Chunks) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize); + RequestWriter.AddHash(RawHash); + RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash)); + } + } + RequestWriter.EndArray(); // "chunks" + RequestPackage.SetObject(RequestWriter.Save()); + } + CompositeBuffer Payload = FormatPackageMessageBuffer(RequestPackage, FormatFlags::kDefault); + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + Session->SetUrl({SaveRequest}); + Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); + + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); + cpr::Response Response = Session->Post(); + SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + + CbObject Request; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("method"sv, "getchunks"sv); + RequestWriter.BeginArray("chunks"sv); + { + for (const IoHash& RawHash : RawHashes) + { + RequestWriter.AddHash(RawHash); + } + } + RequestWriter.EndArray(); // "chunks" + Request = RequestWriter.Save(); + } + IoBuffer Payload = Request.GetBuffer().AsIoBuffer(); + Session->SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()}); + Session->SetUrl(SaveRequest); + Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}, + {"Accept", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); + + cpr::Response Response = Session->Post(); + LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; + if (!Result.ErrorCode) + { + CbPackage Package = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size())); + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + Result.Chunks.reserve(Attachments.size()); + for (const CbAttachment& Attachment : Attachments) + { + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); + } + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + }; + + virtual Result FinalizeContainer(const IoHash&) override + { + Stopwatch Timer; + + RwLock::ExclusiveLockScope _(SessionsLock); + Sessions.clear(); + return {.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; + } + + virtual LoadContainerResult LoadContainer() override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/load"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + Session->SetUrl(SaveRequest); + Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); + Session->SetParameters( + {{"maxblocksize", fmt::format("{}", m_MaxBlockSize)}, {"maxchunkembedsize", fmt::format("{}", m_MaxChunkEmbedSize)}}); + cpr::Response Response = Session->Get(); + + LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; + if (!Result.ErrorCode) + { + Result.ContainerObject = LoadCompactBinaryObject(IoBuffer(IoBuffer::Clone, Response.text.data(), Response.text.size())); + if (!Result.ContainerObject) + { + Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, + m_ProjectStoreUrl, + m_Project, + m_Oplog); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + + std::string LoadRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); + Session->SetUrl({LoadRequest}); + Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); + cpr::Response Response = Session->Get(); + LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; + if (!Result.ErrorCode) + { + Result.Bytes = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + +private: + std::unique_ptr<cpr::Session> AllocateSession() + { + RwLock::ExclusiveLockScope _(SessionsLock); + if (Sessions.empty()) + { + Sessions.emplace_back(std::make_unique<cpr::Session>()); + } + std::unique_ptr<cpr::Session> Session = std::move(Sessions.back()); + Sessions.pop_back(); + return Session; + } + + void ReleaseSession(std::unique_ptr<cpr::Session>&& Session) + { + RwLock::ExclusiveLockScope _(SessionsLock); + Sessions.emplace_back(std::move(Session)); + } + + static Result ConvertResult(const cpr::Response& Response) + { + std::string Text; + std::string Reason = Response.reason; + int32_t ErrorCode = 0; + if (Response.error.code != cpr::ErrorCode::OK) + { + ErrorCode = static_cast<int32_t>(Response.error.code); + if (!Response.error.message.empty()) + { + Reason = Response.error.message; + } + } + else if (!IsHttpSuccessCode(Response.status_code)) + { + ErrorCode = static_cast<int32_t>(Response.status_code); + + if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) + { + zen::HttpContentType ContentType = zen::ParseContentType(It->second); + if (ContentType == zen::HttpContentType::kText) + { + Text = Response.text; + } + } + + Reason = fmt::format("{}"sv, Response.status_code); + } + return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.elapsed, .Reason = Reason, .Text = Text}; + } + + RwLock SessionsLock; + std::vector<std::unique_ptr<cpr::Session>> Sessions; + + const std::string m_HostAddress; + const std::string m_ProjectStoreUrl; + const std::string m_Project; + const std::string m_Oplog; + const size_t m_MaxBlockSize; + const size_t m_MaxChunkEmbedSize; +}; + +std::unique_ptr<RemoteProjectStore> +CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) +{ + std::string Url = Options.Url; + if (Url.find("://"sv) == std::string::npos) + { + // Assume https URL + Url = fmt::format("http://{}"sv, Url); + } + std::unique_ptr<RemoteProjectStore> RemoteStore = + std::make_unique<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize); + return RemoteStore; +} + +} // namespace zen diff --git a/src/zenserver/projectstore/zenremoteprojectstore.h b/src/zenserver/projectstore/zenremoteprojectstore.h new file mode 100644 index 000000000..ef9dcad8c --- /dev/null +++ b/src/zenserver/projectstore/zenremoteprojectstore.h @@ -0,0 +1,18 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "remoteprojectstore.h" + +namespace zen { + +struct ZenRemoteStoreOptions : RemoteStoreOptions +{ + std::string Url; + std::string ProjectId; + std::string OplogId; +}; + +std::unique_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options); + +} // namespace zen |