diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /zenserver/projectstore | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'zenserver/projectstore')
| -rw-r--r-- | zenserver/projectstore/fileremoteprojectstore.cpp | 235 | ||||
| -rw-r--r-- | zenserver/projectstore/fileremoteprojectstore.h | 19 | ||||
| -rw-r--r-- | zenserver/projectstore/jupiterremoteprojectstore.cpp | 244 | ||||
| -rw-r--r-- | zenserver/projectstore/jupiterremoteprojectstore.h | 26 | ||||
| -rw-r--r-- | zenserver/projectstore/projectstore.cpp | 4082 | ||||
| -rw-r--r-- | zenserver/projectstore/projectstore.h | 372 | ||||
| -rw-r--r-- | zenserver/projectstore/remoteprojectstore.cpp | 1036 | ||||
| -rw-r--r-- | zenserver/projectstore/remoteprojectstore.h | 111 | ||||
| -rw-r--r-- | zenserver/projectstore/zenremoteprojectstore.cpp | 341 | ||||
| -rw-r--r-- | zenserver/projectstore/zenremoteprojectstore.h | 18 |
10 files changed, 0 insertions, 6484 deletions
diff --git a/zenserver/projectstore/fileremoteprojectstore.cpp b/zenserver/projectstore/fileremoteprojectstore.cpp deleted file mode 100644 index d7a34a6c2..000000000 --- a/zenserver/projectstore/fileremoteprojectstore.cpp +++ /dev/null @@ -1,235 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "fileremoteprojectstore.h" - -#include <zencore/compress.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/timer.h> - -namespace zen { - -using namespace std::literals; - -class LocalExportProjectStore : public RemoteProjectStore -{ -public: - LocalExportProjectStore(std::string_view Name, - const std::filesystem::path& FolderPath, - bool ForceDisableBlocks, - bool ForceEnableTempBlocks) - : m_Name(Name) - , m_OutputPath(FolderPath) - { - if (ForceDisableBlocks) - { - m_EnableBlocks = false; - } - if (ForceEnableTempBlocks) - { - m_UseTempBlocks = true; - } - } - - virtual RemoteStoreInfo GetInfo() const override - { - return {.CreateBlocks = m_EnableBlocks, - .UseTempBlockFiles = m_UseTempBlocks, - .Description = fmt::format("[file] {}"sv, m_OutputPath)}; - } - - virtual SaveResult SaveContainer(const IoBuffer& Payload) override - { - Stopwatch Timer; - SaveResult Result; - - { - CbObject ContainerObject = LoadCompactBinaryObject(Payload); - - ContainerObject.IterateAttachments([&](CbFieldView FieldView) { - IoHash AttachmentHash = FieldView.AsBinaryAttachment(); - std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash); - if (!std::filesystem::exists(AttachmentPath)) - { - Result.Needs.insert(AttachmentHash); - } - }); - } - - std::filesystem::path ContainerPath = m_OutputPath; - ContainerPath.append(m_Name); - - CreateDirectories(m_OutputPath); - BasicFile ContainerFile; - ContainerFile.Open(ContainerPath, BasicFile::Mode::kTruncate); - std::error_code Ec; - ContainerFile.WriteAll(Payload, Ec); - if (Ec) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = Ec.message(); - } - Result.RawHash = IoHash::HashBuffer(Payload); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override - { - Stopwatch Timer; - SaveAttachmentResult Result; - std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (!std::filesystem::exists(ChunkPath)) - { - try - { - CreateDirectories(ChunkPath.parent_path()); - - BasicFile ChunkFile; - ChunkFile.Open(ChunkPath, BasicFile::Mode::kTruncate); - size_t Offset = 0; - for (const SharedBuffer& Segment : Payload.GetSegments()) - { - ChunkFile.Write(Segment.GetView(), Offset); - Offset += Segment.GetSize(); - } - } - catch (std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = Ex.what(); - } - } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override - { - Stopwatch Timer; - - for (const SharedBuffer& Chunk : Chunks) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); - SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash()); - if (ChunkResult.ErrorCode) - { - ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return SaveAttachmentsResult{ChunkResult}; - } - } - SaveAttachmentsResult Result; - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - - virtual Result FinalizeContainer(const IoHash&) override { return {}; } - - virtual LoadContainerResult LoadContainer() override - { - Stopwatch Timer; - LoadContainerResult Result; - std::filesystem::path ContainerPath = m_OutputPath; - ContainerPath.append(m_Name); - if (!std::filesystem::is_regular_file(ContainerPath)) - { - Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); - Result.Reason = fmt::format("The file {} does not exist"sv, ContainerPath.string()); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - IoBuffer ContainerPayload; - { - BasicFile ContainerFile; - ContainerFile.Open(ContainerPath, BasicFile::Mode::kRead); - ContainerPayload = ContainerFile.ReadAll(); - } - Result.ContainerObject = LoadCompactBinaryObject(ContainerPayload); - if (!Result.ContainerObject) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The file {} is not formatted as a compact binary object"sv, ContainerPath.string()); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override - { - Stopwatch Timer; - LoadAttachmentResult Result; - std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (!std::filesystem::is_regular_file(ChunkPath)) - { - Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); - Result.Reason = fmt::format("The file {} does not exist"sv, ChunkPath.string()); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - { - BasicFile ChunkFile; - ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); - Result.Bytes = ChunkFile.ReadAll(); - } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override - { - Stopwatch Timer; - LoadAttachmentsResult Result; - for (const IoHash& Hash : RawHashes) - { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash); - if (ChunkResult.ErrorCode) - { - ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return LoadAttachmentsResult{ChunkResult}; - } - ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); - Result.Chunks.emplace_back( - std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); - } - return Result; - } - -private: - std::filesystem::path GetAttachmentPath(const IoHash& RawHash) const - { - ExtendablePathBuilder<128> ShardedPath; - ShardedPath.Append(m_OutputPath.c_str()); - ExtendableStringBuilder<64> HashString; - RawHash.ToHexString(HashString); - const char* str = HashString.c_str(); - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str, str + 3); - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str + 3, str + 5); - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str + 5, str + 40); - - return ShardedPath.ToPath(); - } - - const std::string m_Name; - const std::filesystem::path m_OutputPath; - bool m_EnableBlocks = true; - bool m_UseTempBlocks = false; -}; - -std::unique_ptr<RemoteProjectStore> -CreateFileRemoteStore(const FileRemoteStoreOptions& Options) -{ - std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<LocalExportProjectStore>(Options.Name, - std::filesystem::path(Options.FolderPath), - Options.ForceDisableBlocks, - Options.ForceEnableTempBlocks); - return RemoteStore; -} - -} // namespace zen diff --git a/zenserver/projectstore/fileremoteprojectstore.h b/zenserver/projectstore/fileremoteprojectstore.h deleted file mode 100644 index 68d1eb71e..000000000 --- a/zenserver/projectstore/fileremoteprojectstore.h +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "remoteprojectstore.h" - -namespace zen { - -struct FileRemoteStoreOptions : RemoteStoreOptions -{ - std::filesystem::path FolderPath; - std::string Name; - bool ForceDisableBlocks; - bool ForceEnableTempBlocks; -}; - -std::unique_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options); - -} // namespace zen diff --git a/zenserver/projectstore/jupiterremoteprojectstore.cpp b/zenserver/projectstore/jupiterremoteprojectstore.cpp deleted file mode 100644 index 66cf3c4f8..000000000 --- a/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ /dev/null @@ -1,244 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "jupiterremoteprojectstore.h" - -#include <zencore/compress.h> -#include <zencore/fmtutils.h> - -#include <auth/authmgr.h> -#include <upstream/jupiter.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -using namespace std::literals; - -class JupiterRemoteStore : public RemoteProjectStore -{ -public: - JupiterRemoteStore(Ref<CloudCacheClient>&& CloudClient, - std::string_view Namespace, - std::string_view Bucket, - const IoHash& Key, - bool ForceDisableBlocks, - bool ForceDisableTempBlocks) - : m_CloudClient(CloudClient) - , m_Namespace(Namespace) - , m_Bucket(Bucket) - , m_Key(Key) - { - if (ForceDisableBlocks) - { - m_EnableBlocks = false; - } - if (ForceDisableTempBlocks) - { - m_UseTempBlocks = false; - } - } - - virtual RemoteStoreInfo GetInfo() const override - { - return {.CreateBlocks = m_EnableBlocks, - .UseTempBlockFiles = m_UseTempBlocks, - .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key)}; - } - - virtual SaveResult SaveContainer(const IoBuffer& Payload) override - { - const int32_t MaxAttempts = 3; - PutRefResult Result; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); - } - } - - return SaveResult{ConvertResult(Result), {Result.Needs.begin(), Result.Needs.end()} /*, {}*/, IoHash::HashBuffer(Payload)}; - } - - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override - { - const int32_t MaxAttempts = 3; - CloudCacheResult Result; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); - } - } - - return SaveAttachmentResult{ConvertResult(Result)}; - } - - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override - { - SaveAttachmentsResult Result; - for (const SharedBuffer& Chunk : Chunks) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); - SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash()); - if (ChunkResult.ErrorCode) - { - return SaveAttachmentsResult{ChunkResult}; - } - } - return Result; - } - - virtual Result FinalizeContainer(const IoHash& RawHash) override - { - const int32_t MaxAttempts = 3; - CloudCacheResult Result; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); - } - } - return ConvertResult(Result); - } - - virtual LoadContainerResult LoadContainer() override - { - const int32_t MaxAttempts = 3; - CloudCacheResult Result; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.GetRef(m_Namespace, m_Bucket, m_Key, ZenContentType::kCbObject); - } - } - - if (Result.ErrorCode || !Result.Success) - { - return LoadContainerResult{ConvertResult(Result)}; - } - - CbObject ContainerObject = LoadCompactBinaryObject(Result.Response); - if (!ContainerObject) - { - return LoadContainerResult{ - RemoteProjectStore::Result{ - .ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - .ElapsedSeconds = Result.ElapsedSeconds, - .Reason = fmt::format("The ref {}/{}/{} is not formatted as a compact binary object"sv, m_Namespace, m_Bucket, m_Key)}, - std::move(ContainerObject)}; - } - - return LoadContainerResult{ConvertResult(Result), std::move(ContainerObject)}; - } - - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override - { - const int32_t MaxAttempts = 3; - CloudCacheResult Result; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.GetCompressedBlob(m_Namespace, RawHash); - } - } - return LoadAttachmentResult{ConvertResult(Result), std::move(Result.Response)}; - } - - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override - { - LoadAttachmentsResult Result; - for (const IoHash& Hash : RawHashes) - { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash); - if (ChunkResult.ErrorCode) - { - return LoadAttachmentsResult{ChunkResult}; - } - ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); - Result.Chunks.emplace_back( - std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); - } - return Result; - } - -private: - static Result ConvertResult(const CloudCacheResult& Response) - { - std::string Text; - int32_t ErrorCode = 0; - if (Response.ErrorCode != 0) - { - ErrorCode = Response.ErrorCode; - } - else if (!Response.Success) - { - ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - if (Response.Response.GetContentType() == ZenContentType::kText) - { - Text = - std::string(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()), Response.Response.GetSize()); - } - } - return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; - } - - Ref<CloudCacheClient> m_CloudClient; - const std::string m_Namespace; - const std::string m_Bucket; - const IoHash m_Key; - bool m_EnableBlocks = true; - bool m_UseTempBlocks = true; -}; - -std::unique_ptr<RemoteProjectStore> -CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options) -{ - std::string Url = Options.Url; - if (Url.find("://"sv) == std::string::npos) - { - // Assume https URL - Url = fmt::format("https://{}"sv, Url); - } - CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv, - .ServiceUrl = Url, - .ConnectTimeout = std::chrono::milliseconds(2000), - .Timeout = std::chrono::milliseconds(60000)}; - // 1) Access token as parameter in request - // 2) Environment variable (different win vs linux/mac) - // 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider - - std::unique_ptr<CloudCacheTokenProvider> TokenProvider; - if (!Options.AccessToken.empty()) - { - TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = Options.AccessToken]() { - return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()}; - }); - } - else - { - TokenProvider = - CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() { - AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider); - return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; - }); - } - - Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider))); - - std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<JupiterRemoteStore>(std::move(CloudClient), - Options.Namespace, - Options.Bucket, - Options.Key, - Options.ForceDisableBlocks, - Options.ForceDisableTempBlocks); - return RemoteStore; -} - -} // namespace zen diff --git a/zenserver/projectstore/jupiterremoteprojectstore.h b/zenserver/projectstore/jupiterremoteprojectstore.h deleted file mode 100644 index 31548af22..000000000 --- a/zenserver/projectstore/jupiterremoteprojectstore.h +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "remoteprojectstore.h" - -namespace zen { - -class AuthMgr; - -struct JupiterRemoteStoreOptions : RemoteStoreOptions -{ - std::string Url; - std::string Namespace; - std::string Bucket; - IoHash Key; - std::string OpenIdProvider; - std::string AccessToken; - AuthMgr& AuthManager; - bool ForceDisableBlocks; - bool ForceDisableTempBlocks; -}; - -std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options); - -} // namespace zen diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp deleted file mode 100644 index 847a79a1d..000000000 --- a/zenserver/projectstore/projectstore.cpp +++ /dev/null @@ -1,4082 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "projectstore.h" - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zencore/stream.h> -#include <zencore/timer.h> -#include <zencore/trace.h> -#include <zenhttp/httpshared.h> -#include <zenstore/caslog.h> -#include <zenstore/cidstore.h> -#include <zenstore/scrubcontext.h> -#include <zenutil/cache/rpcrecording.h> - -#include "fileremoteprojectstore.h" -#include "jupiterremoteprojectstore.h" -#include "remoteprojectstore.h" -#include "zenremoteprojectstore.h" - -ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> -#include <xxh3.h> -ZEN_THIRD_PARTY_INCLUDES_END - -#if ZEN_WITH_TESTS -# include <zencore/testing.h> -# include <zencore/testutils.h> -#endif // ZEN_WITH_TESTS - -namespace zen { - -namespace { - bool PrepareDirectoryDelete(const std::filesystem::path& Dir, std::filesystem::path& OutDeleteDir) - { - int DropIndex = 0; - do - { - if (!std::filesystem::exists(Dir)) - { - return true; - } - - std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); - std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; - if (std::filesystem::exists(DroppedBucketPath)) - { - DropIndex++; - continue; - } - - std::error_code Ec; - std::filesystem::rename(Dir, DroppedBucketPath, Ec); - if (!Ec) - { - OutDeleteDir = DroppedBucketPath; - return true; - } - if (Ec && !std::filesystem::exists(DroppedBucketPath)) - { - // We can't move our folder, probably because it is busy, bail.. - return false; - } - Sleep(100); - } while (true); - } - - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params, - AuthMgr& AuthManager, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize) - { - using namespace std::literals; - - std::unique_ptr<RemoteProjectStore> RemoteStore; - - if (CbObjectView File = Params["file"sv].AsObjectView(); File) - { - std::filesystem::path FolderPath(File["path"sv].AsString()); - if (FolderPath.empty()) - { - return {nullptr, "Missing file path"}; - } - std::string_view Name(File["name"sv].AsString()); - if (Name.empty()) - { - return {nullptr, "Missing file name"}; - } - bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false); - bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false); - - FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - FolderPath, - std::string(Name), - ForceDisableBlocks, - ForceEnableTempBlocks}; - RemoteStore = CreateFileRemoteStore(Options); - } - - if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud) - { - std::string_view CloudServiceUrl = Cloud["url"sv].AsString(); - if (CloudServiceUrl.empty()) - { - return {nullptr, "Missing service url"}; - } - - std::string Url = cpr::util::urlDecode(std::string(CloudServiceUrl)); - std::string_view Namespace = Cloud["namespace"sv].AsString(); - if (Namespace.empty()) - { - return {nullptr, "Missing namespace"}; - } - std::string_view Bucket = Cloud["bucket"sv].AsString(); - if (Bucket.empty()) - { - return {nullptr, "Missing bucket"}; - } - std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString(); - std::string AccessToken = std::string(Cloud["access-token"sv].AsString()); - if (AccessToken.empty()) - { - std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString(); - if (!AccessTokenEnvVariable.empty()) - { - AccessToken = GetEnvVariable(AccessTokenEnvVariable); - } - } - std::string_view KeyParam = Cloud["key"sv].AsString(); - if (KeyParam.empty()) - { - return {nullptr, "Missing key"}; - } - if (KeyParam.length() != IoHash::StringLength) - { - return {nullptr, "Invalid key"}; - } - IoHash Key = IoHash::FromHexString(KeyParam); - if (Key == IoHash::Zero) - { - return {nullptr, "Invalid key string"}; - } - bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false); - bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false); - - JupiterRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - Url, - std::string(Namespace), - std::string(Bucket), - Key, - std::string(OpenIdProvider), - AccessToken, - AuthManager, - ForceDisableBlocks, - ForceDisableTempBlocks}; - RemoteStore = CreateJupiterRemoteStore(Options); - } - - if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen) - { - std::string_view Url = Zen["url"sv].AsString(); - std::string_view Project = Zen["project"sv].AsString(); - if (Project.empty()) - { - return {nullptr, "Missing project"}; - } - std::string_view Oplog = Zen["oplog"sv].AsString(); - if (Oplog.empty()) - { - return {nullptr, "Missing oplog"}; - } - ZenRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - std::string(Url), - std::string(Project), - std::string(Oplog)}; - RemoteStore = CreateZenRemoteStore(Options); - } - - if (!RemoteStore) - { - return {nullptr, "Unknown remote store type"}; - } - - return {std::move(RemoteStore), ""}; - } - - std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result) - { - if (Result.ErrorCode == 0) - { - return {HttpResponseCode::OK, Result.Text}; - } - return {static_cast<HttpResponseCode>(Result.ErrorCode), - Result.Reason.empty() ? Result.Text - : Result.Text.empty() ? Result.Reason - : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)}; - } - - void CSVHeader(bool Details, bool AttachmentDetails, StringBuilderBase& CSVWriter) - { - if (AttachmentDetails) - { - CSVWriter << "Project, Oplog, LSN, Key, Cid, Size"; - } - else if (Details) - { - CSVWriter << "Project, Oplog, LSN, Key, Size, AttachmentCount, AttachmentsSize"; - } - else - { - CSVWriter << "Project, Oplog, Key"; - } - } - - void CSVWriteOp(CidStore& CidStore, - std::string_view ProjectId, - std::string_view OplogId, - bool Details, - bool AttachmentDetails, - int LSN, - const Oid& Key, - CbObject Op, - StringBuilderBase& CSVWriter) - { - StringBuilder<32> KeyStringBuilder; - Key.ToString(KeyStringBuilder); - const std::string_view KeyString = KeyStringBuilder.ToView(); - - SharedBuffer Buffer = Op.GetBuffer(); - if (AttachmentDetails) - { - Op.IterateAttachments([&CidStore, &CSVWriter, &ProjectId, &OplogId, LSN, &KeyString](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsAttachment(); - IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); - CSVWriter << "\r\n" - << ProjectId << ", " << OplogId << ", " << LSN << ", " << KeyString << ", " << AttachmentHash.ToHexString() - << ", " << gsl::narrow<uint64_t>(Attachment.GetSize()); - }); - } - else if (Details) - { - uint64_t AttachmentCount = 0; - size_t AttachmentsSize = 0; - Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsAttachment(); - AttachmentCount++; - IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); - AttachmentsSize += Attachment.GetSize(); - }); - CSVWriter << "\r\n" - << ProjectId << ", " << OplogId << ", " << LSN << ", " << KeyString << ", " << gsl::narrow<uint64_t>(Buffer.GetSize()) - << ", " << AttachmentCount << ", " << gsl::narrow<uint64_t>(AttachmentsSize); - } - else - { - CSVWriter << "\r\n" << ProjectId << ", " << OplogId << ", " << KeyString; - } - }; - - void CbWriteOp(CidStore& CidStore, - bool Details, - bool OpDetails, - bool AttachmentDetails, - int LSN, - const Oid& Key, - CbObject Op, - CbObjectWriter& CbWriter) - { - CbWriter.BeginObject(); - { - SharedBuffer Buffer = Op.GetBuffer(); - CbWriter.AddObjectId("key", Key); - if (Details) - { - CbWriter.AddInteger("lsn", LSN); - CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Buffer.GetSize())); - } - if (AttachmentDetails) - { - CbWriter.BeginArray("attachments"); - Op.IterateAttachments([&CidStore, &CbWriter](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsAttachment(); - CbWriter.BeginObject(); - { - IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); - CbWriter.AddString("cid", AttachmentHash.ToHexString()); - CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Attachment.GetSize())); - } - CbWriter.EndObject(); - }); - CbWriter.EndArray(); - } - else if (Details) - { - uint64_t AttachmentCount = 0; - size_t AttachmentsSize = 0; - Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsAttachment(); - AttachmentCount++; - IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); - AttachmentsSize += Attachment.GetSize(); - }); - if (AttachmentCount > 0) - { - CbWriter.AddInteger("attachments", AttachmentCount); - CbWriter.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize)); - } - } - if (OpDetails) - { - CbWriter.BeginObject("op"); - for (const CbFieldView& Field : Op) - { - if (!Field.HasName()) - { - CbWriter.AddField(Field); - continue; - } - std::string_view FieldName = Field.GetName(); - CbWriter.AddField(FieldName, Field); - } - CbWriter.EndObject(); - } - } - CbWriter.EndObject(); - }; - - void CbWriteOplogOps(CidStore& CidStore, - ProjectStore::Oplog& Oplog, - bool Details, - bool OpDetails, - bool AttachmentDetails, - CbObjectWriter& Cbo) - { - Cbo.BeginArray("ops"); - { - Oplog.IterateOplogWithKey([&Cbo, &CidStore, Details, OpDetails, AttachmentDetails](int LSN, const Oid& Key, CbObject Op) { - CbWriteOp(CidStore, Details, OpDetails, AttachmentDetails, LSN, Key, Op, Cbo); - }); - } - Cbo.EndArray(); - } - - void CbWriteOplog(CidStore& CidStore, - ProjectStore::Oplog& Oplog, - bool Details, - bool OpDetails, - bool AttachmentDetails, - CbObjectWriter& Cbo) - { - Cbo.BeginObject(); - { - Cbo.AddString("name", Oplog.OplogId()); - CbWriteOplogOps(CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo); - } - Cbo.EndObject(); - } - - void CbWriteOplogs(CidStore& CidStore, - ProjectStore::Project& Project, - std::vector<std::string> OpLogs, - bool Details, - bool OpDetails, - bool AttachmentDetails, - CbObjectWriter& Cbo) - { - Cbo.BeginArray("oplogs"); - { - for (const std::string& OpLogId : OpLogs) - { - ProjectStore::Oplog* Oplog = Project.OpenOplog(OpLogId); - if (Oplog != nullptr) - { - CbWriteOplog(CidStore, *Oplog, Details, OpDetails, AttachmentDetails, Cbo); - } - } - } - Cbo.EndArray(); - } - - void CbWriteProject(CidStore& CidStore, - ProjectStore::Project& Project, - std::vector<std::string> OpLogs, - bool Details, - bool OpDetails, - bool AttachmentDetails, - CbObjectWriter& Cbo) - { - Cbo.BeginObject(); - { - Cbo.AddString("name", Project.Identifier); - CbWriteOplogs(CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); - } - Cbo.EndObject(); - } - -} // namespace - -////////////////////////////////////////////////////////////////////////// - -Oid -OpKeyStringAsOId(std::string_view OpKey) -{ - using namespace std::literals; - - CbObjectWriter Writer; - Writer << "key"sv << OpKey; - - XXH3_128Stream KeyHasher; - Writer.Save()["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); - XXH3_128 KeyHash = KeyHasher.GetHash(); - - Oid OpId; - memcpy(OpId.OidBits, &KeyHash, sizeof(OpId.OidBits)); - - return OpId; -} - -////////////////////////////////////////////////////////////////////////// - -struct ProjectStore::OplogStorage : public RefCounted -{ - OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath) - { - } - - ~OplogStorage() - { - ZEN_INFO("closing oplog storage at {}", m_OplogStoragePath); - Flush(); - } - - [[nodiscard]] bool Exists() { return Exists(m_OplogStoragePath); } - [[nodiscard]] static bool Exists(std::filesystem::path BasePath) - { - return std::filesystem::exists(BasePath / "ops.zlog") && std::filesystem::exists(BasePath / "ops.zops"); - } - - static bool Delete(std::filesystem::path BasePath) { return DeleteDirectories(BasePath); } - - uint64_t OpBlobsSize() const - { - RwLock::SharedLockScope _(m_RwLock); - return m_NextOpsOffset; - } - - void Open(bool IsCreate) - { - using namespace std::literals; - - ZEN_INFO("initializing oplog storage at '{}'", m_OplogStoragePath); - - if (IsCreate) - { - DeleteDirectories(m_OplogStoragePath); - CreateDirectories(m_OplogStoragePath); - } - - m_Oplog.Open(m_OplogStoragePath / "ops.zlog"sv, IsCreate ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); - m_Oplog.Initialize(); - - m_OpBlobs.Open(m_OplogStoragePath / "ops.zops"sv, IsCreate ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); - - ZEN_ASSERT(IsPow2(m_OpsAlign)); - ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1))); - } - - void ReplayLog(std::function<void(CbObject, const OplogEntry&)>&& Handler) - { - ZEN_TRACE_CPU("ProjectStore::OplogStorage::ReplayLog"); - - // This could use memory mapping or do something clever but for now it just reads the file sequentially - - ZEN_INFO("replaying log for '{}'", m_OplogStoragePath); - - Stopwatch Timer; - - uint64_t InvalidEntries = 0; - - IoBuffer OpBuffer; - m_Oplog.Replay( - [&](const OplogEntry& LogEntry) { - if (LogEntry.OpCoreSize == 0) - { - ++InvalidEntries; - - return; - } - - if (OpBuffer.GetSize() < LogEntry.OpCoreSize) - { - OpBuffer = IoBuffer(LogEntry.OpCoreSize); - } - - const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; - - m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); - - // Verify checksum, ignore op data if incorrect - const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF); - - if (OpCoreHash != LogEntry.OpCoreHash) - { - ZEN_WARN("skipping oplog entry with bad checksum!"); - return; - } - - CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), LogEntry.OpCoreSize)); - - m_NextOpsOffset = - Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); - m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); - - Handler(Op, LogEntry); - }, - 0); - - if (InvalidEntries) - { - ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries); - } - - ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}", - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - m_MaxLsn, - m_NextOpsOffset); - } - - void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler) - { - for (const OplogEntryAddress& Entry : Entries) - { - CbObject Op = GetOp(Entry); - Handler(Op); - } - } - - CbObject GetOp(const OplogEntryAddress& Entry) - { - IoBuffer OpBuffer(Entry.Size); - - const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; - m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); - - return CbObject(SharedBuffer(std::move(OpBuffer))); - } - - OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, XXH3_128 KeyHash) - { - ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp"); - - using namespace std::literals; - - uint64_t WriteSize = Buffer.GetSize(); - - RwLock::ExclusiveLockScope Lock(m_RwLock); - const uint64_t WriteOffset = m_NextOpsOffset; - const uint32_t OpLsn = ++m_MaxLsn; - m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign); - Lock.ReleaseNow(); - - ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign)); - - OplogEntry Entry = {.OpLsn = OpLsn, - .OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign), - .OpCoreSize = uint32_t(Buffer.GetSize()), - .OpCoreHash = OpCoreHash, - .OpKeyHash = KeyHash}; - - m_Oplog.Append(Entry); - m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset); - - return Entry; - } - - void Flush() - { - m_Oplog.Flush(); - m_OpBlobs.Flush(); - } - - spdlog::logger& Log() { return m_OwnerOplog->Log(); } - -private: - ProjectStore::Oplog* m_OwnerOplog; - std::filesystem::path m_OplogStoragePath; - mutable RwLock m_RwLock; - TCasLogFile<OplogEntry> m_Oplog; - BasicFile m_OpBlobs; - std::atomic<uint64_t> m_NextOpsOffset{0}; - uint64_t m_OpsAlign = 32; - std::atomic<uint32_t> m_MaxLsn{0}; -}; - -////////////////////////////////////////////////////////////////////////// - -ProjectStore::Oplog::Oplog(std::string_view Id, - Project* Project, - CidStore& Store, - std::filesystem::path BasePath, - const std::filesystem::path& MarkerPath) -: m_OuterProject(Project) -, m_CidStore(Store) -, m_BasePath(BasePath) -, m_MarkerPath(MarkerPath) -, m_OplogId(Id) -{ - using namespace std::literals; - - m_Storage = new OplogStorage(this, m_BasePath); - const bool StoreExists = m_Storage->Exists(); - m_Storage->Open(/* IsCreate */ !StoreExists); - - m_TempPath = m_BasePath / "temp"sv; - - CleanDirectory(m_TempPath); -} - -ProjectStore::Oplog::~Oplog() -{ - if (m_Storage) - { - Flush(); - } -} - -void -ProjectStore::Oplog::Flush() -{ - ZEN_ASSERT(m_Storage); - m_Storage->Flush(); -} - -void -ProjectStore::Oplog::Scrub(ScrubContext& Ctx) const -{ - ZEN_UNUSED(Ctx); -} - -void -ProjectStore::Oplog::GatherReferences(GcContext& GcCtx) -{ - RwLock::SharedLockScope _(m_OplogLock); - - std::vector<IoHash> Hashes; - Hashes.reserve(Max(m_ChunkMap.size(), m_MetaMap.size())); - - for (const auto& Kv : m_ChunkMap) - { - Hashes.push_back(Kv.second); - } - - GcCtx.AddRetainedCids(Hashes); - - Hashes.clear(); - - for (const auto& Kv : m_MetaMap) - { - Hashes.push_back(Kv.second); - } - - GcCtx.AddRetainedCids(Hashes); -} - -uint64_t -ProjectStore::Oplog::TotalSize() const -{ - RwLock::SharedLockScope _(m_OplogLock); - if (m_Storage) - { - return m_Storage->OpBlobsSize(); - } - return 0; -} - -bool -ProjectStore::Oplog::IsExpired() const -{ - if (m_MarkerPath.empty()) - { - return false; - } - return !std::filesystem::exists(m_MarkerPath); -} - -std::filesystem::path -ProjectStore::Oplog::PrepareForDelete(bool MoveFolder) -{ - RwLock::ExclusiveLockScope _(m_OplogLock); - m_ChunkMap.clear(); - m_MetaMap.clear(); - m_FileMap.clear(); - m_OpAddressMap.clear(); - m_LatestOpMap.clear(); - m_Storage = {}; - if (!MoveFolder) - { - return {}; - } - std::filesystem::path MovedDir; - if (PrepareDirectoryDelete(m_BasePath, MovedDir)) - { - return MovedDir; - } - return {}; -} - -bool -ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath) -{ - using namespace std::literals; - - std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; - return std::filesystem::is_regular_file(StateFilePath); -} - -void -ProjectStore::Oplog::Read() -{ - using namespace std::literals; - - std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; - if (std::filesystem::is_regular_file(StateFilePath)) - { - ZEN_INFO("reading config for oplog '{}' in project '{}' from {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); - - BasicFile Blob; - Blob.Open(StateFilePath, BasicFile::Mode::kRead); - - IoBuffer Obj = Blob.ReadAll(); - CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); - - if (ValidationError != CbValidateError::None) - { - ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), StateFilePath); - return; - } - - CbObject Cfg = LoadCompactBinaryObject(Obj); - - m_MarkerPath = Cfg["gcpath"sv].AsString(); - } - else - { - ZEN_INFO("config for oplog '{}' in project '{}' not found at {}. Assuming legacy store", - m_OplogId, - m_OuterProject->Identifier, - StateFilePath); - } - ReplayLog(); -} - -void -ProjectStore::Oplog::Write() -{ - using namespace std::literals; - - BinaryWriter Mem; - - CbObjectWriter Cfg; - - Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath); - - Cfg.Save(Mem); - - std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv; - - ZEN_INFO("persisting config for oplog '{}' in project '{}' to {}", m_OplogId, m_OuterProject->Identifier, StateFilePath); - - BasicFile Blob; - Blob.Open(StateFilePath, BasicFile::Mode::kTruncate); - Blob.Write(Mem.Data(), Mem.Size(), 0); - Blob.Flush(); -} - -void -ProjectStore::Oplog::ReplayLog() -{ - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - if (!m_Storage) - { - return; - } - m_Storage->ReplayLog( - [&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry, kUpdateReplay); }); -} - -IoBuffer -ProjectStore::Oplog::FindChunk(Oid ChunkId) -{ - RwLock::SharedLockScope OplogLock(m_OplogLock); - if (!m_Storage) - { - return IoBuffer{}; - } - - if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end()) - { - IoHash ChunkHash = ChunkIt->second; - OplogLock.ReleaseNow(); - - IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); - Chunk.SetContentType(ZenContentType::kCompressedBinary); - - return Chunk; - } - - if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) - { - std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath; - - OplogLock.ReleaseNow(); - - IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath); - FileChunk.SetContentType(ZenContentType::kBinary); - - return FileChunk; - } - - if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) - { - IoHash ChunkHash = MetaIt->second; - OplogLock.ReleaseNow(); - - IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); - Chunk.SetContentType(ZenContentType::kCompressedBinary); - - return Chunk; - } - - return {}; -} - -void -ProjectStore::Oplog::IterateFileMap( - std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn) -{ - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return; - } - - for (const auto& Kv : m_FileMap) - { - Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath); - } -} - -void -ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler) -{ - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return; - } - - std::vector<OplogEntryAddress> Entries; - Entries.reserve(m_LatestOpMap.size()); - - for (const auto& Kv : m_LatestOpMap) - { - const auto AddressEntry = m_OpAddressMap.find(Kv.second); - ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); - - Entries.push_back(AddressEntry->second); - } - - std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) { - return Lhs.Offset < Rhs.Offset; - }); - - m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); }); -} - -void -ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbObject)>&& Handler) -{ - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return; - } - - std::vector<size_t> EntryIndexes; - std::vector<OplogEntryAddress> Entries; - std::vector<Oid> Keys; - std::vector<int> LSNs; - Entries.reserve(m_LatestOpMap.size()); - EntryIndexes.reserve(m_LatestOpMap.size()); - Keys.reserve(m_LatestOpMap.size()); - LSNs.reserve(m_LatestOpMap.size()); - - for (const auto& Kv : m_LatestOpMap) - { - const auto AddressEntry = m_OpAddressMap.find(Kv.second); - ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); - - Entries.push_back(AddressEntry->second); - Keys.push_back(Kv.first); - LSNs.push_back(Kv.second); - EntryIndexes.push_back(EntryIndexes.size()); - } - - std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) { - const OplogEntryAddress& LhsEntry = Entries[Lhs]; - const OplogEntryAddress& RhsEntry = Entries[Rhs]; - return LhsEntry.Offset < RhsEntry.Offset; - }); - std::vector<OplogEntryAddress> SortedEntries; - SortedEntries.reserve(EntryIndexes.size()); - for (size_t Index : EntryIndexes) - { - SortedEntries.push_back(Entries[Index]); - } - - size_t EntryIndex = 0; - m_Storage->ReplayLog(SortedEntries, [&](CbObject Op) { - Handler(LSNs[EntryIndex], Keys[EntryIndex], Op); - EntryIndex++; - }); -} - -int -ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key) -{ - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return {}; - } - if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) - { - return LatestOp->second; - } - return -1; -} - -std::optional<CbObject> -ProjectStore::Oplog::GetOpByKey(const Oid& Key) -{ - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return {}; - } - - if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) - { - const auto AddressEntry = m_OpAddressMap.find(LatestOp->second); - ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); - - return m_Storage->GetOp(AddressEntry->second); - } - - return {}; -} - -std::optional<CbObject> -ProjectStore::Oplog::GetOpByIndex(int Index) -{ - RwLock::SharedLockScope _(m_OplogLock); - if (!m_Storage) - { - return {}; - } - - if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end()) - { - return m_Storage->GetOp(AddressEntryIt->second); - } - - return {}; -} - -void -ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, - Oid FileId, - IoHash Hash, - std::string_view ServerPath, - std::string_view ClientPath) -{ - if (Hash != IoHash::Zero) - { - m_ChunkMap.insert_or_assign(FileId, Hash); - } - - FileMapEntry Entry; - Entry.ServerPath = ServerPath; - Entry.ClientPath = ClientPath; - - m_FileMap[FileId] = std::move(Entry); - - if (Hash != IoHash::Zero) - { - m_ChunkMap.insert_or_assign(FileId, Hash); - } -} - -void -ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) -{ - m_ChunkMap.insert_or_assign(ChunkId, Hash); -} - -void -ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash) -{ - m_MetaMap.insert_or_assign(ChunkId, Hash); -} - -ProjectStore::Oplog::OplogEntryMapping -ProjectStore::Oplog::GetMapping(CbObject Core) -{ - using namespace std::literals; - - OplogEntryMapping Result; - - // Update chunk id maps - CbObjectView PackageObj = Core["package"sv].AsObjectView(); - CbArrayView BulkDataArray = Core["bulkdata"sv].AsArrayView(); - CbArrayView PackageDataArray = Core["packagedata"sv].AsArrayView(); - Result.Chunks.reserve(PackageObj ? 1 : 0 + BulkDataArray.Num() + PackageDataArray.Num()); - - if (PackageObj) - { - Oid Id = PackageObj["id"sv].AsObjectId(); - IoHash Hash = PackageObj["data"sv].AsBinaryAttachment(); - Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); - ZEN_DEBUG("package data {} -> {}", Id, Hash); - } - - for (CbFieldView& Entry : PackageDataArray) - { - CbObjectView PackageDataObj = Entry.AsObjectView(); - Oid Id = PackageDataObj["id"sv].AsObjectId(); - IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment(); - Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); - ZEN_DEBUG("package {} -> {}", Id, Hash); - } - - for (CbFieldView& Entry : BulkDataArray) - { - CbObjectView BulkObj = Entry.AsObjectView(); - Oid Id = BulkObj["id"sv].AsObjectId(); - IoHash Hash = BulkObj["data"sv].AsBinaryAttachment(); - Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); - ZEN_DEBUG("bulkdata {} -> {}", Id, Hash); - } - - CbArrayView FilesArray = Core["files"sv].AsArrayView(); - Result.Files.reserve(FilesArray.Num()); - for (CbFieldView& Entry : FilesArray) - { - CbObjectView FileObj = Entry.AsObjectView(); - - std::string_view ServerPath = FileObj["serverpath"sv].AsString(); - std::string_view ClientPath = FileObj["clientpath"sv].AsString(); - if (ServerPath.empty() || ClientPath.empty()) - { - ZEN_WARN("invalid file"); - continue; - } - - Oid Id = FileObj["id"sv].AsObjectId(); - IoHash Hash = FileObj["data"sv].AsBinaryAttachment(); - Result.Files.emplace_back( - OplogEntryMapping::FileMapping{OplogEntryMapping::Mapping{Id, Hash}, std::string(ServerPath), std::string(ClientPath)}); - ZEN_DEBUG("file {} -> {}, ServerPath: {}, ClientPath: {}", Id, Hash, ServerPath, ClientPath); - } - - CbArrayView MetaArray = Core["meta"sv].AsArrayView(); - Result.Meta.reserve(MetaArray.Num()); - for (CbFieldView& Entry : MetaArray) - { - CbObjectView MetaObj = Entry.AsObjectView(); - Oid Id = MetaObj["id"sv].AsObjectId(); - IoHash Hash = MetaObj["data"sv].AsBinaryAttachment(); - Result.Meta.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); - auto NameString = MetaObj["name"sv].AsString(); - ZEN_DEBUG("meta data ({}) {} -> {}", NameString, Id, Hash); - } - - return Result; -} - -uint32_t -ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, - const OplogEntryMapping& OpMapping, - const OplogEntry& OpEntry, - UpdateType TypeOfUpdate) -{ - ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry"); - - ZEN_UNUSED(TypeOfUpdate); - - // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing - // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however - - using namespace std::literals; - - // Update chunk id maps - for (const OplogEntryMapping::Mapping& Chunk : OpMapping.Chunks) - { - AddChunkMapping(OplogLock, Chunk.Id, Chunk.Hash); - } - - for (const OplogEntryMapping::FileMapping& File : OpMapping.Files) - { - AddFileMapping(OplogLock, File.Id, File.Hash, File.ServerPath, File.ClientPath); - } - - for (const OplogEntryMapping::Mapping& Meta : OpMapping.Meta) - { - AddMetaMapping(OplogLock, Meta.Id, Meta.Hash); - } - - m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); - m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn; - - return OpEntry.OpLsn; -} - -uint32_t -ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) -{ - ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); - - const CbObject& Core = OpPackage.GetObject(); - const uint32_t EntryId = AppendNewOplogEntry(Core); - if (EntryId == 0xffffffffu) - { - // The oplog has been deleted so just drop this - return EntryId; - } - - // Persist attachments after oplog entry so GC won't find attachments without references - - uint64_t AttachmentBytes = 0; - uint64_t NewAttachmentBytes = 0; - - auto Attachments = OpPackage.GetAttachments(); - - for (const auto& Attach : Attachments) - { - ZEN_ASSERT(Attach.IsCompressedBinary()); - - CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); - const uint64_t AttachmentSize = AttachmentData.DecodeRawSize(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash()); - - if (InsertResult.New) - { - NewAttachmentBytes += AttachmentSize; - } - AttachmentBytes += AttachmentSize; - } - - ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); - - return EntryId; -} - -uint32_t -ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) -{ - ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); - - using namespace std::literals; - - OplogEntryMapping Mapping = GetMapping(Core); - - SharedBuffer Buffer = Core.GetBuffer(); - const uint64_t WriteSize = Buffer.GetSize(); - const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); - - ZEN_ASSERT(WriteSize != 0); - - XXH3_128Stream KeyHasher; - Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); - XXH3_128 KeyHash = KeyHasher.GetHash(); - - RefPtr<OplogStorage> Storage; - { - RwLock::SharedLockScope _(m_OplogLock); - Storage = m_Storage; - } - if (!m_Storage) - { - return 0xffffffffu; - } - const OplogEntry OpEntry = m_Storage->AppendOp(Buffer, OpCoreHash, KeyHash); - - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); - const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry, kUpdateNewEntry); - - return EntryId; -} - -////////////////////////////////////////////////////////////////////////// - -ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath) -: m_ProjectStore(PrjStore) -, m_CidStore(Store) -, m_OplogStoragePath(BasePath) -{ -} - -ProjectStore::Project::~Project() -{ -} - -bool -ProjectStore::Project::Exists(std::filesystem::path BasePath) -{ - return std::filesystem::exists(BasePath / "Project.zcb"); -} - -void -ProjectStore::Project::Read() -{ - using namespace std::literals; - - std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; - - ZEN_INFO("reading config for project '{}' from {}", Identifier, ProjectStateFilePath); - - BasicFile Blob; - Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead); - - IoBuffer Obj = Blob.ReadAll(); - CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); - - if (ValidationError == CbValidateError::None) - { - CbObject Cfg = LoadCompactBinaryObject(Obj); - - Identifier = Cfg["id"sv].AsString(); - RootDir = Cfg["root"sv].AsString(); - ProjectRootDir = Cfg["project"sv].AsString(); - EngineRootDir = Cfg["engine"sv].AsString(); - ProjectFilePath = Cfg["projectfile"sv].AsString(); - } - else - { - ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), ProjectStateFilePath); - } -} - -void -ProjectStore::Project::Write() -{ - using namespace std::literals; - - BinaryWriter Mem; - - CbObjectWriter Cfg; - Cfg << "id"sv << Identifier; - Cfg << "root"sv << PathToUtf8(RootDir); - Cfg << "project"sv << ProjectRootDir; - Cfg << "engine"sv << EngineRootDir; - Cfg << "projectfile"sv << ProjectFilePath; - - Cfg.Save(Mem); - - CreateDirectories(m_OplogStoragePath); - - std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv; - - ZEN_INFO("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath); - - BasicFile Blob; - Blob.Open(ProjectStateFilePath, BasicFile::Mode::kTruncate); - Blob.Write(Mem.Data(), Mem.Size(), 0); - Blob.Flush(); -} - -spdlog::logger& -ProjectStore::Project::Log() -{ - return m_ProjectStore->Log(); -} - -std::filesystem::path -ProjectStore::Project::BasePathForOplog(std::string_view OplogId) -{ - return m_OplogStoragePath / OplogId; -} - -ProjectStore::Oplog* -ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath) -{ - RwLock::ExclusiveLockScope _(m_ProjectLock); - - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); - - try - { - Oplog* Log = m_Oplogs - .try_emplace(std::string{OplogId}, - std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, MarkerPath)) - .first->second.get(); - - Log->Write(); - return Log; - } - catch (std::exception&) - { - // In case of failure we need to ensure there's no half constructed entry around - // - // (This is probably already ensured by the try_emplace implementation?) - - m_Oplogs.erase(std::string{OplogId}); - - return nullptr; - } -} - -ProjectStore::Oplog* -ProjectStore::Project::OpenOplog(std::string_view OplogId) -{ - { - RwLock::SharedLockScope _(m_ProjectLock); - - auto OplogIt = m_Oplogs.find(std::string(OplogId)); - - if (OplogIt != m_Oplogs.end()) - { - return OplogIt->second.get(); - } - } - - RwLock::ExclusiveLockScope _(m_ProjectLock); - - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); - - if (Oplog::ExistsAt(OplogBasePath)) - { - // Do open of existing oplog - - try - { - Oplog* Log = - m_Oplogs - .try_emplace(std::string{OplogId}, - std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{})) - .first->second.get(); - Log->Read(); - - return Log; - } - catch (std::exception& ex) - { - ZEN_WARN("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what()); - - m_Oplogs.erase(std::string{OplogId}); - } - } - - return nullptr; -} - -void -ProjectStore::Project::DeleteOplog(std::string_view OplogId) -{ - std::filesystem::path DeletePath; - { - RwLock::ExclusiveLockScope _(m_ProjectLock); - - auto OplogIt = m_Oplogs.find(std::string(OplogId)); - - if (OplogIt != m_Oplogs.end()) - { - std::unique_ptr<Oplog>& Oplog = OplogIt->second; - DeletePath = Oplog->PrepareForDelete(true); - m_DeletedOplogs.emplace_back(std::move(Oplog)); - m_Oplogs.erase(OplogIt); - } - } - - // Erase content on disk - if (!DeletePath.empty()) - { - OplogStorage::Delete(DeletePath); - } -} - -std::vector<std::string> -ProjectStore::Project::ScanForOplogs() const -{ - DirectoryContent DirContent; - GetDirectoryContent(m_OplogStoragePath, DirectoryContent::IncludeDirsFlag, DirContent); - std::vector<std::string> Oplogs; - Oplogs.reserve(DirContent.Directories.size()); - for (const std::filesystem::path& DirPath : DirContent.Directories) - { - Oplogs.push_back(DirPath.filename().string()); - } - return Oplogs; -} - -void -ProjectStore::Project::IterateOplogs(std::function<void(const Oplog&)>&& Fn) const -{ - RwLock::SharedLockScope _(m_ProjectLock); - - for (auto& Kv : m_Oplogs) - { - Fn(*Kv.second); - } -} - -void -ProjectStore::Project::IterateOplogs(std::function<void(Oplog&)>&& Fn) -{ - RwLock::SharedLockScope _(m_ProjectLock); - - for (auto& Kv : m_Oplogs) - { - Fn(*Kv.second); - } -} - -void -ProjectStore::Project::Flush() -{ - // We only need to flush oplogs that we have already loaded - IterateOplogs([&](Oplog& Ops) { Ops.Flush(); }); -} - -void -ProjectStore::Project::Scrub(ScrubContext& Ctx) -{ - // Scrubbing needs to check all existing oplogs - std::vector<std::string> OpLogs = ScanForOplogs(); - for (const std::string& OpLogId : OpLogs) - { - OpenOplog(OpLogId); - } - IterateOplogs([&](const Oplog& Ops) { - if (!Ops.IsExpired()) - { - Ops.Scrub(Ctx); - } - }); -} - -void -ProjectStore::Project::GatherReferences(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("ProjectStore::Project::GatherReferences"); - - Stopwatch Timer; - const auto Guard = MakeGuard([&] { - ZEN_DEBUG("gathered references from project store project {} in {}", Identifier, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - // GatherReferences needs to check all existing oplogs - std::vector<std::string> OpLogs = ScanForOplogs(); - for (const std::string& OpLogId : OpLogs) - { - OpenOplog(OpLogId); - } - IterateOplogs([&](Oplog& Ops) { - if (!Ops.IsExpired()) - { - Ops.GatherReferences(GcCtx); - } - }); -} - -uint64_t -ProjectStore::Project::TotalSize() const -{ - uint64_t Result = 0; - { - RwLock::SharedLockScope _(m_ProjectLock); - for (const auto& It : m_Oplogs) - { - Result += It.second->TotalSize(); - } - } - return Result; -} - -bool -ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath) -{ - RwLock::ExclusiveLockScope _(m_ProjectLock); - - for (auto& It : m_Oplogs) - { - // We don't care about the moved folder - It.second->PrepareForDelete(false); - m_DeletedOplogs.emplace_back(std::move(It.second)); - } - - m_Oplogs.clear(); - - bool Success = PrepareDirectoryDelete(m_OplogStoragePath, OutDeletePath); - if (!Success) - { - return false; - } - m_OplogStoragePath.clear(); - return true; -} - -bool -ProjectStore::Project::IsExpired() const -{ - if (ProjectFilePath.empty()) - { - return false; - } - return !std::filesystem::exists(ProjectFilePath); -} - -////////////////////////////////////////////////////////////////////////// - -ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc) -: GcStorage(Gc) -, GcContributor(Gc) -, m_Log(logging::Get("project")) -, m_CidStore(Store) -, m_ProjectBasePath(BasePath) -{ - ZEN_INFO("initializing project store at '{}'", BasePath); - // m_Log.set_level(spdlog::level::debug); -} - -ProjectStore::~ProjectStore() -{ - ZEN_INFO("closing project store ('{}')", m_ProjectBasePath); -} - -std::filesystem::path -ProjectStore::BasePathForProject(std::string_view ProjectId) -{ - return m_ProjectBasePath / ProjectId; -} - -void -ProjectStore::DiscoverProjects() -{ - if (!std::filesystem::exists(m_ProjectBasePath)) - { - return; - } - - DirectoryContent DirContent; - GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent); - - for (const std::filesystem::path& DirPath : DirContent.Directories) - { - std::string DirName = PathToUtf8(DirPath.filename()); - OpenProject(DirName); - } -} - -void -ProjectStore::IterateProjects(std::function<void(Project& Prj)>&& Fn) -{ - RwLock::SharedLockScope _(m_ProjectsLock); - - for (auto& Kv : m_Projects) - { - Fn(*Kv.second.Get()); - } -} - -void -ProjectStore::Flush() -{ - std::vector<Ref<Project>> Projects; - { - RwLock::SharedLockScope _(m_ProjectsLock); - Projects.reserve(m_Projects.size()); - - for (auto& Kv : m_Projects) - { - Projects.push_back(Kv.second); - } - } - for (const Ref<Project>& Project : Projects) - { - Project->Flush(); - } -} - -void -ProjectStore::Scrub(ScrubContext& Ctx) -{ - DiscoverProjects(); - - std::vector<Ref<Project>> Projects; - { - RwLock::SharedLockScope _(m_ProjectsLock); - Projects.reserve(m_Projects.size()); - - for (auto& Kv : m_Projects) - { - if (Kv.second->IsExpired()) - { - continue; - } - Projects.push_back(Kv.second); - } - } - for (const Ref<Project>& Project : Projects) - { - Project->Scrub(Ctx); - } -} - -void -ProjectStore::GatherReferences(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("ProjectStore::GatherReferences"); - - size_t ProjectCount = 0; - size_t ExpiredProjectCount = 0; - Stopwatch Timer; - const auto Guard = MakeGuard([&] { - ZEN_DEBUG("gathered references from '{}' in {}, found {} active projects and {} expired projects", - m_ProjectBasePath.string(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - ProjectCount, - ExpiredProjectCount); - }); - - DiscoverProjects(); - - std::vector<Ref<Project>> Projects; - { - RwLock::SharedLockScope _(m_ProjectsLock); - Projects.reserve(m_Projects.size()); - - for (auto& Kv : m_Projects) - { - if (Kv.second->IsExpired()) - { - ExpiredProjectCount++; - continue; - } - Projects.push_back(Kv.second); - } - } - ProjectCount = Projects.size(); - for (const Ref<Project>& Project : Projects) - { - Project->GatherReferences(GcCtx); - } -} - -void -ProjectStore::CollectGarbage(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("ProjectStore::CollectGarbage"); - - size_t ProjectCount = 0; - size_t ExpiredProjectCount = 0; - - Stopwatch Timer; - const auto Guard = MakeGuard([&] { - ZEN_DEBUG("garbage collect from '{}' DONE after {}, found {} active projects and {} expired projects", - m_ProjectBasePath.string(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - ProjectCount, - ExpiredProjectCount); - }); - std::vector<Ref<Project>> ExpiredProjects; - std::vector<Ref<Project>> Projects; - - { - RwLock::SharedLockScope _(m_ProjectsLock); - for (auto& Kv : m_Projects) - { - if (Kv.second->IsExpired()) - { - ExpiredProjects.push_back(Kv.second); - ExpiredProjectCount++; - continue; - } - Projects.push_back(Kv.second); - ProjectCount++; - } - } - - if (!GcCtx.IsDeletionMode()) - { - ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string()); - return; - } - - for (const Ref<Project>& Project : Projects) - { - std::vector<std::string> ExpiredOplogs; - { - RwLock::ExclusiveLockScope _(m_ProjectsLock); - Project->IterateOplogs([&ExpiredOplogs](ProjectStore::Oplog& Oplog) { - if (Oplog.IsExpired()) - { - ExpiredOplogs.push_back(Oplog.OplogId()); - } - }); - } - for (const std::string& OplogId : ExpiredOplogs) - { - ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected oplog '{}' in project '{}'. Removing storage on disk", - OplogId, - Project->Identifier); - Project->DeleteOplog(OplogId); - } - } - - if (ExpiredProjects.empty()) - { - ZEN_DEBUG("garbage collect for '{}', no expired projects found", m_ProjectBasePath.string()); - return; - } - - for (const Ref<Project>& Project : ExpiredProjects) - { - std::filesystem::path PathToRemove; - std::string ProjectId; - { - RwLock::ExclusiveLockScope _(m_ProjectsLock); - if (!Project->IsExpired()) - { - ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project no longer expired.", ProjectId); - continue; - } - bool Success = Project->PrepareForDelete(PathToRemove); - if (!Success) - { - ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project folder is locked.", ProjectId); - continue; - } - m_Projects.erase(Project->Identifier); - ProjectId = Project->Identifier; - } - - ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected project '{}'. Removing storage on disk", ProjectId); - if (PathToRemove.empty()) - { - continue; - } - - DeleteDirectories(PathToRemove); - } -} - -GcStorageSize -ProjectStore::StorageSize() const -{ - GcStorageSize Result; - { - RwLock::SharedLockScope _(m_ProjectsLock); - for (auto& Kv : m_Projects) - { - const Ref<Project>& Project = Kv.second; - Result.DiskSize += Project->TotalSize(); - } - } - return Result; -} - -Ref<ProjectStore::Project> -ProjectStore::OpenProject(std::string_view ProjectId) -{ - { - RwLock::SharedLockScope _(m_ProjectsLock); - - auto ProjIt = m_Projects.find(std::string{ProjectId}); - - if (ProjIt != m_Projects.end()) - { - return ProjIt->second; - } - } - - RwLock::ExclusiveLockScope _(m_ProjectsLock); - - std::filesystem::path BasePath = BasePathForProject(ProjectId); - - if (Project::Exists(BasePath)) - { - try - { - ZEN_INFO("opening project {} @ {}", ProjectId, BasePath); - - Ref<Project>& Prj = - m_Projects - .try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath))) - .first->second; - Prj->Identifier = ProjectId; - Prj->Read(); - return Prj; - } - catch (std::exception& e) - { - ZEN_WARN("failed to open {} @ {} ({})", ProjectId, BasePath, e.what()); - m_Projects.erase(std::string{ProjectId}); - } - } - - return {}; -} - -Ref<ProjectStore::Project> -ProjectStore::NewProject(std::filesystem::path BasePath, - std::string_view ProjectId, - std::string_view RootDir, - std::string_view EngineRootDir, - std::string_view ProjectRootDir, - std::string_view ProjectFilePath) -{ - RwLock::ExclusiveLockScope _(m_ProjectsLock); - - Ref<Project>& Prj = - m_Projects.try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath))) - .first->second; - Prj->Identifier = ProjectId; - Prj->RootDir = RootDir; - Prj->EngineRootDir = EngineRootDir; - Prj->ProjectRootDir = ProjectRootDir; - Prj->ProjectFilePath = ProjectFilePath; - Prj->Write(); - - return Prj; -} - -bool -ProjectStore::DeleteProject(std::string_view ProjectId) -{ - ZEN_INFO("deleting project {}", ProjectId); - - RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock); - - auto ProjIt = m_Projects.find(std::string{ProjectId}); - - if (ProjIt == m_Projects.end()) - { - return true; - } - - std::filesystem::path DeletePath; - bool Success = ProjIt->second->PrepareForDelete(DeletePath); - - if (!Success) - { - return false; - } - m_Projects.erase(ProjIt); - ProjectsLock.ReleaseNow(); - - if (!DeletePath.empty()) - { - DeleteDirectories(DeletePath); - } - return true; -} - -bool -ProjectStore::Exists(std::string_view ProjectId) -{ - return Project::Exists(BasePathForProject(ProjectId)); -} - -CbArray -ProjectStore::GetProjectsList() -{ - using namespace std::literals; - - DiscoverProjects(); - - CbWriter Response; - Response.BeginArray(); - - IterateProjects([&Response](ProjectStore::Project& Prj) { - Response.BeginObject(); - Response << "Id"sv << Prj.Identifier; - Response << "RootDir"sv << Prj.RootDir.string(); - Response << "ProjectRootDir"sv << Prj.ProjectRootDir; - Response << "EngineRootDir"sv << Prj.EngineRootDir; - Response << "ProjectFilePath"sv << Prj.ProjectFilePath; - Response.EndObject(); - }); - Response.EndArray(); - return Response.Save().AsArray(); -} - -std::pair<HttpResponseCode, std::string> -ProjectStore::GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, bool FilterClient, CbObject& OutPayload) -{ - using namespace std::literals; - - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Project files request for unknown project '{}'", ProjectId)}; - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return {HttpResponseCode::NotFound, fmt::format("Project files for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - - CbObjectWriter Response; - Response.BeginArray("files"sv); - - FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { - Response.BeginObject(); - Response << "id"sv << Id; - Response << "clientpath"sv << ClientPath; - if (!FilterClient) - { - Response << "serverpath"sv << ServerPath; - } - Response.EndObject(); - }); - - Response.EndArray(); - OutPayload = Response.Save(); - return {HttpResponseCode::OK, {}}; -} - -std::pair<HttpResponseCode, std::string> -ProjectStore::GetChunkInfo(const std::string_view ProjectId, - const std::string_view OplogId, - const std::string_view ChunkId, - CbObject& OutPayload) -{ - using namespace std::literals; - - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown project '{}'", ProjectId)}; - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) - { - return {HttpResponseCode::BadRequest, - fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)}; - } - - const Oid Obj = Oid::FromHexString(ChunkId); - - IoBuffer Chunk = FoundLog->FindChunk(Obj); - if (!Chunk) - { - return {HttpResponseCode::NotFound, {}}; - } - - uint64_t ChunkSize = Chunk.GetSize(); - if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize); - ZEN_ASSERT(IsCompressed); - ChunkSize = RawSize; - } - - CbObjectWriter Response; - Response << "size"sv << ChunkSize; - OutPayload = Response.Save(); - return {HttpResponseCode::OK, {}}; -} - -std::pair<HttpResponseCode, std::string> -ProjectStore::GetChunkRange(const std::string_view ProjectId, - const std::string_view OplogId, - const std::string_view ChunkId, - uint64_t Offset, - uint64_t Size, - ZenContentType AcceptType, - IoBuffer& OutChunk) -{ - bool IsOffset = Offset != 0 || Size != ~(0ull); - - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - - if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) - { - return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)}; - } - - const Oid Obj = Oid::FromHexString(ChunkId); - - IoBuffer Chunk = FoundLog->FindChunk(Obj); - if (!Chunk) - { - return {HttpResponseCode::NotFound, {}}; - } - - OutChunk = Chunk; - HttpContentType ContentType = Chunk.GetContentType(); - - if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); - ZEN_ASSERT(!Compressed.IsNull()); - - if (IsOffset) - { - if ((Offset + Size) > RawSize) - { - Size = RawSize - Offset; - } - - if (AcceptType == HttpContentType::kBinary) - { - OutChunk = Compressed.Decompress(Offset, Size).AsIoBuffer(); - OutChunk.SetContentType(HttpContentType::kBinary); - } - else - { - // Value will be a range of compressed blocks that covers the requested range - // The client will have to compensate for any offsets that do not land on an even block size multiple - OutChunk = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); - OutChunk.SetContentType(HttpContentType::kCompressedBinary); - } - } - else - { - if (AcceptType == HttpContentType::kBinary) - { - OutChunk = Compressed.Decompress().AsIoBuffer(); - OutChunk.SetContentType(HttpContentType::kBinary); - } - else - { - OutChunk = Compressed.GetCompressed().Flatten().AsIoBuffer(); - OutChunk.SetContentType(HttpContentType::kCompressedBinary); - } - } - } - else if (IsOffset) - { - if ((Offset + Size) > Chunk.GetSize()) - { - Size = Chunk.GetSize() - Offset; - } - OutChunk = IoBuffer(std::move(Chunk), Offset, Size); - OutChunk.SetContentType(ContentType); - } - - return {HttpResponseCode::OK, {}}; -} - -std::pair<HttpResponseCode, std::string> -ProjectStore::GetChunk(const std::string_view ProjectId, - const std::string_view OplogId, - const std::string_view Cid, - ZenContentType AcceptType, - IoBuffer& OutChunk) -{ - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - - if (Cid.length() != IoHash::StringLength) - { - return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, Cid)}; - } - - const IoHash Hash = IoHash::FromHexString(Cid); - OutChunk = m_CidStore.FindChunkByCid(Hash); - - if (!OutChunk) - { - return {HttpResponseCode::NotFound, fmt::format("chunk - '{}' MISSING", Cid)}; - } - - if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk)); - OutChunk = Compressed.Decompress().AsIoBuffer(); - OutChunk.SetContentType(ZenContentType::kBinary); - } - else - { - OutChunk.SetContentType(ZenContentType::kCompressedBinary); - } - return {HttpResponseCode::OK, {}}; -} - -std::pair<HttpResponseCode, std::string> -ProjectStore::PutChunk(const std::string_view ProjectId, - const std::string_view OplogId, - const std::string_view Cid, - ZenContentType ContentType, - IoBuffer&& Chunk) -{ - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown project '{}'", ProjectId)}; - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - - if (Cid.length() != IoHash::StringLength) - { - return {HttpResponseCode::BadRequest, fmt::format("Chunk put request for invalid chunk hash '{}'", Cid)}; - } - - const IoHash Hash = IoHash::FromHexString(Cid); - - if (ContentType != HttpContentType::kCompressedBinary) - { - return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid content type for chunk '{}'", Cid)}; - } - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); - if (RawHash != Hash) - { - return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid payload format for chunk '{}'", Cid)}; - } - - CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, Hash); - return {Result.New ? HttpResponseCode::Created : HttpResponseCode::OK, {}}; -} - -std::pair<HttpResponseCode, std::string> -ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, CbObject& OutResponse) -{ - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown project '{}'", ProjectId)}; - } - - ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); - - if (!Oplog) - { - return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - - CbObject ContainerObject = LoadCompactBinaryObject(Payload); - if (!ContainerObject) - { - return {HttpResponseCode::BadRequest, "Invalid payload format"}; - } - - CidStore& ChunkStore = m_CidStore; - RwLock AttachmentsLock; - std::unordered_set<IoHash, IoHash::Hasher> Attachments; - - auto HasAttachment = [&ChunkStore](const IoHash& RawHash) { return ChunkStore.ContainsChunk(RawHash); }; - auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) { - RwLock::ExclusiveLockScope _(AttachmentsLock); - if (BlockHash != IoHash::Zero) - { - Attachments.insert(BlockHash); - } - else - { - Attachments.insert(ChunkHashes.begin(), ChunkHashes.end()); - } - }; - auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) { - RwLock::ExclusiveLockScope _(AttachmentsLock); - Attachments.insert(RawHash); - }; - - RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); - - if (RemoteResult.ErrorCode) - { - return ConvertResult(RemoteResult); - } - - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - { - for (const IoHash& Hash : Attachments) - { - ZEN_DEBUG("Need attachment {}", Hash); - Cbo << Hash; - } - } - Cbo.EndArray(); // "need" - - OutResponse = Cbo.Save(); - return {HttpResponseCode::OK, {}}; -} - -std::pair<HttpResponseCode, std::string> -ProjectStore::ReadOplog(const std::string_view ProjectId, - const std::string_view OplogId, - const HttpServerRequest::QueryParams& Params, - CbObject& OutResponse) -{ - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown project '{}'", ProjectId)}; - } - - ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); - - if (!Oplog) - { - return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - - size_t MaxBlockSize = 128u * 1024u * 1024u; - if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false) - { - if (auto Value = ParseInt<size_t>(Param)) - { - MaxBlockSize = Value.value(); - } - } - size_t MaxChunkEmbedSize = 1024u * 1024u; - if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false) - { - if (auto Value = ParseInt<size_t>(Param)) - { - MaxChunkEmbedSize = Value.value(); - } - } - - CidStore& ChunkStore = m_CidStore; - - RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( - ChunkStore, - *Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - false, - [](CompressedBuffer&&, const IoHash) {}, - [](const IoHash&) {}, - [](const std::unordered_set<IoHash, IoHash::Hasher>) {}); - - OutResponse = std::move(ContainerResult.ContainerObject); - return ConvertResult(ContainerResult); -} - -std::pair<HttpResponseCode, std::string> -ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload) -{ - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)}; - } - - ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); - - if (!Oplog) - { - return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - - if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer(); - m_CidStore.AddChunk(Compressed, AttachmentRawHash); - ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize()); - })) - { - return {HttpResponseCode::BadRequest, "Invalid chunk in block"}; - } - - return {HttpResponseCode::OK, {}}; -} - -void -ProjectStore::Rpc(HttpServerRequest& HttpReq, - const std::string_view ProjectId, - const std::string_view OplogId, - IoBuffer&& Payload, - AuthMgr& AuthManager) -{ - using namespace std::literals; - HttpContentType PayloadContentType = HttpReq.RequestContentType(); - CbPackage Package; - CbObject Cb; - switch (PayloadContentType) - { - case HttpContentType::kJSON: - case HttpContentType::kUnknownContentType: - case HttpContentType::kText: - { - std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); - Cb = LoadCompactBinaryFromJson(JsonText).AsObject(); - if (!Cb) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Content format not supported, expected JSON format"); - } - } - break; - case HttpContentType::kCbObject: - Cb = LoadCompactBinaryObject(Payload); - if (!Cb) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Content format not supported, expected compact binary format"); - } - break; - case HttpContentType::kCbPackage: - Package = ParsePackageMessage(Payload); - Cb = Package.GetObject(); - if (!Cb) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Content format not supported, expected package message format"); - } - break; - default: - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type"); - } - - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Rpc oplog request for unknown project '{}'", ProjectId)); - } - - ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); - - if (!Oplog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); - } - - std::string_view Method = Cb["method"sv].AsString(); - - if (Method == "import") - { - std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); - if (Result.second.empty()) - { - return HttpReq.WriteResponse(Result.first); - } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - } - else if (Method == "export") - { - std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); - if (Result.second.empty()) - { - return HttpReq.WriteResponse(Result.first); - } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - } - else if (Method == "getchunks") - { - CbPackage ResponsePackage; - { - CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); - CbObjectWriter ResponseWriter; - ResponseWriter.BeginArray("chunks"sv); - for (CbFieldView FieldView : ChunksArray) - { - IoHash RawHash = FieldView.AsHash(); - IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash); - if (ChunkBuffer) - { - ResponseWriter.AddHash(RawHash); - ResponsePackage.AddAttachment( - CbAttachment(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)), RawHash)); - } - } - ResponseWriter.EndArray(); - ResponsePackage.SetObject(ResponseWriter.Save()); - } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault); - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else if (Method == "putchunks") - { - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - for (const CbAttachment& Attachment : Attachments) - { - IoHash RawHash = Attachment.GetHash(); - CompressedBuffer Compressed = Attachment.AsCompressedBinary(); - m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly); - } - return HttpReq.WriteResponse(HttpResponseCode::OK); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); -} - -std::pair<HttpResponseCode, std::string> -ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) -{ - using namespace std::literals; - - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); - size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); - bool Force = Params["force"sv].AsBool(false); - - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = - CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); - - if (RemoteStoreResult.first == nullptr) - { - return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; - } - std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); - RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); - - ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", - Project.Identifier, - Oplog.OplogId(), - StoreInfo.Description, - NiceBytes(MaxBlockSize), - NiceBytes(MaxChunkEmbedSize)); - - RemoteProjectStore::Result Result = SaveOplog(m_CidStore, - *RemoteStore, - Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - StoreInfo.CreateBlocks, - StoreInfo.UseTempBlockFiles, - Force); - - return ConvertResult(Result); -} - -std::pair<HttpResponseCode, std::string> -ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) -{ - using namespace std::literals; - - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); - size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); - bool Force = Params["force"sv].AsBool(false); - - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = - CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); - - if (RemoteStoreResult.first == nullptr) - { - return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; - } - std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); - RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); - - ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); - RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force); - return ConvertResult(Result); -} - -////////////////////////////////////////////////////////////////////////// - -HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatsService& StatsService, AuthMgr& AuthMgr) -: m_Log(logging::Get("project")) -, m_CidStore(Store) -, m_ProjectStore(Projects) -, m_StatsService(StatsService) -, m_AuthMgr(AuthMgr) -{ - using namespace std::literals; - - m_StatsService.RegisterHandler("prj", *this); - - m_Router.AddPattern("project", "([[:alnum:]_.]+)"); - m_Router.AddPattern("log", "([[:alnum:]_.]+)"); - m_Router.AddPattern("op", "([[:digit:]]+?)"); - m_Router.AddPattern("chunk", "([[:xdigit:]]{24})"); - m_Router.AddPattern("hash", "([[:xdigit:]]{40})"); - - m_Router.RegisterRoute( - "", - [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "list", - [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/batch", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - // Parse Request - - IoBuffer Payload = HttpReq.ReadPayload(); - BinaryReader Reader(Payload); - - struct RequestHeader - { - enum - { - kMagic = 0xAAAA'77AC - }; - uint32_t Magic; - uint32_t ChunkCount; - uint32_t Reserved1; - uint32_t Reserved2; - }; - - struct RequestChunkEntry - { - Oid ChunkId; - uint32_t CorrelationId; - uint64_t Offset; - uint64_t RequestBytes; - }; - - if (Payload.Size() <= sizeof(RequestHeader)) - { - HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - - RequestHeader RequestHdr; - Reader.Read(&RequestHdr, sizeof RequestHdr); - - if (RequestHdr.Magic != RequestHeader::kMagic) - { - HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - - std::vector<RequestChunkEntry> RequestedChunks; - RequestedChunks.resize(RequestHdr.ChunkCount); - Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); - - // Make Response - - struct ResponseHeader - { - uint32_t Magic = 0xbada'b00f; - uint32_t ChunkCount; - uint32_t Reserved1 = 0; - uint32_t Reserved2 = 0; - }; - - struct ResponseChunkEntry - { - uint32_t CorrelationId; - uint32_t Flags = 0; - uint64_t ChunkSize; - }; - - std::vector<IoBuffer> OutBlobs; - OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry)); - for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) - { - const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; - IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId); - if (FoundChunk) - { - if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) - { - uint64_t Offset = RequestedChunk.Offset; - if (Offset > FoundChunk.Size()) - { - Offset = FoundChunk.Size(); - } - uint64_t Size = RequestedChunk.RequestBytes; - if ((Offset + Size) > FoundChunk.Size()) - { - Size = FoundChunk.Size() - Offset; - } - FoundChunk = IoBuffer(FoundChunk, Offset, Size); - } - } - OutBlobs.emplace_back(std::move(FoundChunk)); - } - uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData()); - ResponseHeader ResponseHdr; - ResponseHdr.ChunkCount = RequestHdr.ChunkCount; - memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); - ResponsePtr += sizeof(ResponseHdr); - for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) - { - // const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; - const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]); - ResponseChunkEntry ResponseChunk; - ResponseChunk.CorrelationId = ChunkIndex; - if (FoundChunk) - { - ResponseChunk.ChunkSize = FoundChunk.Size(); - } - else - { - ResponseChunk.ChunkSize = uint64_t(-1); - } - memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); - ResponsePtr += sizeof(ResponseChunk); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); - }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/files", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - // File manifest fetch, returns the client file list - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - - const bool FilterClient = Params.GetValue("filter"sv) == "client"sv; - - CbObject ResponsePayload; - std::pair<HttpResponseCode, std::string> Result = - m_ProjectStore->GetProjectFiles(ProjectId, OplogId, FilterClient, ResponsePayload); - if (Result.first == HttpResponseCode::OK) - { - return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); - } - else - { - ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", - ToString(HttpReq.RequestVerb()), - HttpReq.QueryString(), - static_cast<int>(Result.first), - Result.second); - } - if (Result.second.empty()) - { - return HttpReq.WriteResponse(Result.first); - } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/{chunk}/info", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& ChunkId = Req.GetCapture(3); - - CbObject ResponsePayload; - std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunkInfo(ProjectId, OplogId, ChunkId, ResponsePayload); - if (Result.first == HttpResponseCode::OK) - { - return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); - } - else if (Result.first == HttpResponseCode::NotFound) - { - ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); - } - else - { - ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", - ToString(HttpReq.RequestVerb()), - HttpReq.QueryString(), - static_cast<int>(Result.first), - Result.second); - } - if (Result.second.empty()) - { - return HttpReq.WriteResponse(Result.first); - } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/{chunk}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& ChunkId = Req.GetCapture(3); - - uint64_t Offset = 0; - uint64_t Size = ~(0ull); - - auto QueryParms = Req.ServerRequest().GetQueryParams(); - - if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false) - { - if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm)) - { - Offset = OffsetVal.value(); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - } - - if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false) - { - if (auto SizeVal = ParseInt<uint64_t>(SizeParm)) - { - Size = SizeVal.value(); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - } - - HttpContentType AcceptType = HttpReq.AcceptContentType(); - - IoBuffer Chunk; - std::pair<HttpResponseCode, std::string> Result = - m_ProjectStore->GetChunkRange(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk); - if (Result.first == HttpResponseCode::OK) - { - ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Chunk.GetContentType())); - return HttpReq.WriteResponse(HttpResponseCode::OK, Chunk.GetContentType(), Chunk); - } - else if (Result.first == HttpResponseCode::NotFound) - { - ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); - } - else - { - ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", - ToString(HttpReq.RequestVerb()), - HttpReq.QueryString(), - static_cast<int>(Result.first), - Result.second); - } - if (Result.second.empty()) - { - return HttpReq.WriteResponse(Result.first); - } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - }, - HttpVerb::kGet | HttpVerb::kHead); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/{hash}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& Cid = Req.GetCapture(3); - HttpContentType AcceptType = HttpReq.AcceptContentType(); - HttpContentType RequestType = HttpReq.RequestContentType(); - - switch (Req.ServerRequest().RequestVerb()) - { - case HttpVerb::kGet: - { - IoBuffer Value; - std::pair<HttpResponseCode, std::string> Result = - m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value); - - if (Result.first == HttpResponseCode::OK) - { - return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value); - } - else if (Result.first == HttpResponseCode::NotFound) - { - ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid); - } - else - { - ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", - ToString(HttpReq.RequestVerb()), - HttpReq.QueryString(), - static_cast<int>(Result.first), - Result.second); - } - if (Result.second.empty()) - { - return HttpReq.WriteResponse(Result.first); - } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - } - case HttpVerb::kPost: - { - std::pair<HttpResponseCode, std::string> Result = - m_ProjectStore->PutChunk(ProjectId, OplogId, Cid, RequestType, HttpReq.ReadPayload()); - if (Result.first == HttpResponseCode::OK || Result.first == HttpResponseCode::Created) - { - return HttpReq.WriteResponse(Result.first); - } - else - { - ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", - ToString(HttpReq.RequestVerb()), - HttpReq.QueryString(), - static_cast<int>(Result.first), - Result.second); - } - if (Result.second.empty()) - { - return HttpReq.WriteResponse(Result.first); - } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - } - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/prep", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - // This operation takes a list of referenced hashes and decides which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response - - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject RequestObject = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> NeedList; - - for (auto Entry : RequestObject["have"sv]) - { - const IoHash FileHash = Entry.AsHash(); - - if (!m_CidStore.ContainsChunk(FileHash)) - { - ZEN_DEBUG("prep - NEED: {}", FileHash); - - NeedList.push_back(FileHash); - } - } - - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } - - Cbo.EndArray(); - CbObject Response = Cbo.Save(); - - return HttpReq.WriteResponse(HttpResponseCode::OK, Response); - }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/new", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - - bool IsUsingSalt = false; - IoHash SaltHash = IoHash::Zero; - - if (std::string_view SaltParam = Params.GetValue("salt"); SaltParam.empty() == false) - { - const uint32_t Salt = std::stoi(std::string(SaltParam)); - SaltHash = IoHash::HashBuffer(&Salt, sizeof Salt); - IsUsingSalt = true; - } - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog& Oplog = *FoundLog; - - IoBuffer Payload = HttpReq.ReadPayload(); - - // This will attempt to open files which may not exist for the case where - // the prep step rejected the chunk. This should be fixed since there's - // a performance cost associated with any file system activity - - bool IsValid = true; - std::vector<IoHash> MissingChunks; - - CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer { - if (m_CidStore.ContainsChunk(Hash)) - { - // Return null attachment as we already have it, no point in reading it and storing it again - return {}; - } - - IoHash AttachmentId; - if (IsUsingSalt) - { - IoHash AttachmentSpec[]{SaltHash, Hash}; - AttachmentId = IoHash::HashBuffer(MakeMemoryView(AttachmentSpec)); - } - else - { - AttachmentId = Hash; - } - - std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString(); - if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) - { - return SharedBuffer(std::move(Data)); - } - else - { - IsValid = false; - MissingChunks.push_back(Hash); - - return {}; - } - }; - - CbPackage Package; - - if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver)) - { - std::filesystem::path BadPackagePath = - Oplog.TempPath() / "bad_packages"sv / fmt::format("session{}_request{}"sv, HttpReq.SessionId(), HttpReq.RequestId()); - - ZEN_WARN("Received malformed package! Saving payload to '{}'", BadPackagePath); - - WriteFile(BadPackagePath, Payload); - - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); - } - - if (!IsValid) - { - // TODO: emit diagnostics identifying missing chunks - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing chunk reference"); - } - - CbObject Core = Package.GetObject(); - - if (!Core["key"sv]) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); - } - - // Write core to oplog - - const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Package); - - if (OpLsn == ProjectStore::Oplog::kInvalidOp) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - - ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString()); - - HttpReq.WriteResponse(HttpResponseCode::Created); - }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/{op}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const std::string& ProjectId = Req.GetCapture(1); - const std::string& OplogId = Req.GetCapture(2); - const std::string& OpIdString = Req.GetCapture(3); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog& Oplog = *FoundLog; - - if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString)) - { - if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value())) - { - CbObject& Op = MaybeOp.value(); - if (Req.ServerRequest().AcceptContentType() == ZenContentType::kCbPackage) - { - CbPackage Package; - Package.SetObject(Op); - - Op.IterateAttachments([&](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsAttachment(); - IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); - - // We force this for now as content type is not consistently tracked (will - // be fixed in CidStore refactor) - Payload.SetContentType(ZenContentType::kCompressedBinary); - - if (Payload) - { - switch (Payload.GetContentType()) - { - case ZenContentType::kCbObject: - if (CbObject Object = LoadCompactBinaryObject(Payload)) - { - Package.AddAttachment(CbAttachment(Object)); - } - else - { - // Error - malformed object - - ZEN_WARN("malformed object returned for {}", AttachmentHash); - } - break; - - case ZenContentType::kCompressedBinary: - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload))) - { - Package.AddAttachment(CbAttachment(Compressed, AttachmentHash)); - } - else - { - // Error - not compressed! - - ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash); - } - break; - - default: - Package.AddAttachment(CbAttachment(SharedBuffer(Payload))); - break; - } - } - }); - - return HttpReq.WriteResponse(HttpResponseCode::Accepted, Package); - } - else - { - // Client cannot accept a package, so we only send the core object - return HttpReq.WriteResponse(HttpResponseCode::Accepted, Op); - } - } - } - - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}/oplog/{log}", - [this](HttpRouterRequest& Req) { - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - - if (!Project) - { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("project {} not found", ProjectId)); - } - - switch (Req.ServerRequest().RequestVerb()) - { - case HttpVerb::kGet: - { - ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); - - if (!OplogIt) - { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("oplog {} not found in project {}", OplogId, ProjectId)); - } - - ProjectStore::Oplog& Log = *OplogIt; - - CbObjectWriter Cb; - Cb << "id"sv << Log.OplogId() << "project"sv << Project->Identifier << "tempdir"sv << Log.TempPath().c_str() - << "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount" - << Log.OplogCount() << "expired"sv << Log.IsExpired(); - - Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cb.Save()); - } - break; - - case HttpVerb::kPost: - { - std::filesystem::path OplogMarkerPath; - if (CbObject Params = Req.ServerRequest().ReadPayloadObject()) - { - OplogMarkerPath = Params["gcpath"sv].AsString(); - } - - ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); - - if (!OplogIt) - { - if (!Project->NewOplog(OplogId, OplogMarkerPath)) - { - // TODO: indicate why the operation failed! - return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); - } - - ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); - - return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); - } - - // I guess this should ultimately be used to execute RPCs but for now, it - // does absolutely nothing - - return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); - } - break; - - case HttpVerb::kDelete: - { - ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId); - - Project->DeleteOplog(OplogId); - - return Req.ServerRequest().WriteResponse(HttpResponseCode::OK); - } - break; - - default: - break; - } - }, - HttpVerb::kPost | HttpVerb::kGet | HttpVerb::kDelete); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/entries", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - CbObjectWriter Response; - - if (FoundLog->OplogCount() > 0) - { - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - - if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty()) - { - Oid OpKeyId = OpKeyStringAsOId(OpKey); - std::optional<CbObject> Op = FoundLog->GetOpByKey(OpKeyId); - - if (Op.has_value()) - { - Response << "entry"sv << Op.value(); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - } - else - { - Response.BeginArray("entries"sv); - - FoundLog->IterateOplog([&Response](CbObject Op) { Response << Op; }); - - Response.EndArray(); - } - } - - return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}", - [this](HttpRouterRequest& Req) { - const std::string ProjectId = Req.GetCapture(1); - - switch (Req.ServerRequest().RequestVerb()) - { - case HttpVerb::kPost: - { - IoBuffer Payload = Req.ServerRequest().ReadPayload(); - CbObject Params = LoadCompactBinaryObject(Payload); - std::string_view Id = Params["id"sv].AsString(); - std::string_view Root = Params["root"sv].AsString(); - std::string_view EngineRoot = Params["engine"sv].AsString(); - std::string_view ProjectRoot = Params["project"sv].AsString(); - std::string_view ProjectFilePath = Params["projectfile"sv].AsString(); - - const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId; - m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath); - - ZEN_INFO("established project - {} (id: '{}', roots: '{}', '{}', '{}', '{}'{})", - ProjectId, - Id, - Root, - EngineRoot, - ProjectRoot, - ProjectFilePath, - ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); - - Req.ServerRequest().WriteResponse(HttpResponseCode::Created); - } - break; - - case HttpVerb::kGet: - { - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - - if (!Project) - { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("project {} not found", ProjectId)); - } - - std::vector<std::string> OpLogs = Project->ScanForOplogs(); - - CbObjectWriter Response; - Response << "id"sv << Project->Identifier; - Response << "root"sv << PathToUtf8(Project->RootDir); - Response << "engine"sv << PathToUtf8(Project->EngineRootDir); - Response << "project"sv << PathToUtf8(Project->ProjectRootDir); - Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath); - - Response.BeginArray("oplogs"sv); - for (const std::string& OplogId : OpLogs) - { - Response.BeginObject(); - Response << "id"sv << OplogId; - Response.EndObject(); - } - Response.EndArray(); // oplogs - - Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save()); - } - break; - - case HttpVerb::kDelete: - { - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - - if (!Project) - { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("project {} not found", ProjectId)); - } - - ZEN_INFO("deleting project '{}'", ProjectId); - if (!m_ProjectStore->DeleteProject(ProjectId)) - { - return Req.ServerRequest().WriteResponse(HttpResponseCode::Locked, - HttpContentType::kText, - fmt::format("project {} is in use", ProjectId)); - } - - return Req.ServerRequest().WriteResponse(HttpResponseCode::NoContent); - } - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete); - - // Push a oplog container - m_Router.RegisterRoute( - "{project}/oplog/{log}/save", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - if (HttpReq.RequestContentType() != HttpContentType::kCbObject) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type"); - } - IoBuffer Payload = Req.ServerRequest().ReadPayload(); - - CbObject Response; - std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->WriteOplog(ProjectId, OplogId, std::move(Payload), Response); - if (Result.first == HttpResponseCode::OK) - { - return HttpReq.WriteResponse(HttpResponseCode::OK, Response); - } - if (Result.second.empty()) - { - return HttpReq.WriteResponse(Result.first); - } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - }, - HttpVerb::kPost); - - // Pull a oplog container - m_Router.RegisterRoute( - "{project}/oplog/{log}/load", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - if (HttpReq.AcceptContentType() != HttpContentType::kCbObject) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type"); - } - IoBuffer Payload = Req.ServerRequest().ReadPayload(); - - CbObject Response; - std::pair<HttpResponseCode, std::string> Result = - m_ProjectStore->ReadOplog(ProjectId, OplogId, Req.ServerRequest().GetQueryParams(), Response); - if (Result.first == HttpResponseCode::OK) - { - return HttpReq.WriteResponse(HttpResponseCode::OK, Response); - } - if (Result.second.empty()) - { - return HttpReq.WriteResponse(Result.first); - } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - }, - HttpVerb::kGet); - - // Do an rpc style operation on project/oplog - m_Router.RegisterRoute( - "{project}/oplog/{log}/rpc", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - IoBuffer Payload = Req.ServerRequest().ReadPayload(); - - m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr); - }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "details\\$", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - bool CSV = Params.GetValue("csv") == "true"; - bool Details = Params.GetValue("details") == "true"; - bool OpDetails = Params.GetValue("opdetails") == "true"; - bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; - - if (CSV) - { - ExtendableStringBuilder<4096> CSVWriter; - CSVHeader(Details, AttachmentDetails, CSVWriter); - - m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) { - Project.IterateOplogs([&](ProjectStore::Oplog& Oplog) { - Oplog.IterateOplogWithKey( - [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](int LSN, const Oid& Key, CbObject Op) { - CSVWriteOp(m_CidStore, - Project.Identifier, - Oplog.OplogId(), - Details, - AttachmentDetails, - LSN, - Key, - Op, - CSVWriter); - }); - }); - }); - - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); - } - else - { - CbObjectWriter Cbo; - Cbo.BeginArray("projects"); - { - m_ProjectStore->DiscoverProjects(); - - m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) { - std::vector<std::string> OpLogs = Project.ScanForOplogs(); - CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); - }); - } - Cbo.EndArray(); - HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - } - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "details\\$/{project}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - bool CSV = Params.GetValue("csv") == "true"; - bool Details = Params.GetValue("details") == "true"; - bool OpDetails = Params.GetValue("opdetails") == "true"; - bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; - - Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId); - if (!FoundProject) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - ProjectStore::Project& Project = *FoundProject.Get(); - if (CSV) - { - ExtendableStringBuilder<4096> CSVWriter; - CSVHeader(Details, AttachmentDetails, CSVWriter); - - FoundProject->IterateOplogs([&](ProjectStore::Oplog& Oplog) { - Oplog.IterateOplogWithKey([this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](int LSN, - const Oid& Key, - CbObject Op) { - CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); - }); - }); - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); - } - else - { - CbObjectWriter Cbo; - std::vector<std::string> OpLogs = FoundProject->ScanForOplogs(); - Cbo.BeginArray("projects"); - { - CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); - } - Cbo.EndArray(); - HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - } - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "details\\$/{project}/{log}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - bool CSV = Params.GetValue("csv") == "true"; - bool Details = Params.GetValue("details") == "true"; - bool OpDetails = Params.GetValue("opdetails") == "true"; - bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; - - Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId); - if (!FoundProject) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Project& Project = *FoundProject.Get(); - ProjectStore::Oplog& Oplog = *FoundLog; - if (CSV) - { - ExtendableStringBuilder<4096> CSVWriter; - CSVHeader(Details, AttachmentDetails, CSVWriter); - - Oplog.IterateOplogWithKey( - [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](int LSN, const Oid& Key, CbObject Op) { - CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); - }); - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); - } - else - { - CbObjectWriter Cbo; - Cbo.BeginArray("oplogs"); - { - CbWriteOplog(m_CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo); - } - Cbo.EndArray(); - HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - } - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "details\\$/{project}/{log}/{chunk}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& ChunkId = Req.GetCapture(3); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - bool CSV = Params.GetValue("csv") == "true"; - bool Details = Params.GetValue("details") == "true"; - bool OpDetails = Params.GetValue("opdetails") == "true"; - bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true"; - - Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId); - if (!FoundProject) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) - { - return HttpReq.WriteResponse( - HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)); - } - - const Oid ObjId = Oid::FromHexString(ChunkId); - ProjectStore::Project& Project = *FoundProject.Get(); - ProjectStore::Oplog& Oplog = *FoundLog; - - int LSN = Oplog.GetOpIndexByKey(ObjId); - if (LSN == -1) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - std::optional<CbObject> Op = Oplog.GetOpByIndex(LSN); - if (!Op.has_value()) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - if (CSV) - { - ExtendableStringBuilder<4096> CSVWriter; - CSVHeader(Details, AttachmentDetails, CSVWriter); - - CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, ObjId, Op.value(), CSVWriter); - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); - } - else - { - CbObjectWriter Cbo; - Cbo.BeginArray("ops"); - { - CbWriteOp(m_CidStore, Details, OpDetails, AttachmentDetails, LSN, ObjId, Op.value(), Cbo); - } - Cbo.EndArray(); - HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - } - }, - HttpVerb::kGet); -} - -HttpProjectService::~HttpProjectService() -{ - m_StatsService.UnregisterHandler("prj", *this); -} - -const char* -HttpProjectService::BaseUri() const -{ - return "/prj/"; -} - -void -HttpProjectService::HandleRequest(HttpServerRequest& Request) -{ - if (m_Router.HandleRequest(Request) == false) - { - ZEN_WARN("No route found for {0}", Request.RelativeUri()); - } -} - -void -HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq) -{ - const GcStorageSize StoreSize = m_ProjectStore->StorageSize(); - const CidStoreSize CidSize = m_CidStore.TotalSize(); - - CbObjectWriter Cbo; - Cbo.BeginObject("store"); - { - Cbo.BeginObject("size"); - { - Cbo << "disk" << StoreSize.DiskSize; - Cbo << "memory" << StoreSize.MemorySize; - } - Cbo.EndObject(); - } - Cbo.EndObject(); - - Cbo.BeginObject("cid"); - { - Cbo.BeginObject("size"); - { - Cbo << "tiny" << CidSize.TinySize; - Cbo << "small" << CidSize.SmallSize; - Cbo << "large" << CidSize.LargeSize; - Cbo << "total" << CidSize.TotalSize; - } - Cbo.EndObject(); - } - Cbo.EndObject(); - - return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); -} - -////////////////////////////////////////////////////////////////////////// - -#if ZEN_WITH_TESTS - -namespace testutils { - using namespace std::literals; - - std::string OidAsString(const Oid& Id) - { - StringBuilder<25> OidStringBuilder; - Id.ToString(OidStringBuilder); - return OidStringBuilder.ToString(); - } - - CbPackage CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) - { - CbPackage Package; - CbObjectWriter Object; - Object << "key"sv << OidAsString(Id); - if (!Attachments.empty()) - { - Object.BeginArray("bulkdata"); - for (const auto& Attachment : Attachments) - { - CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); - Object.BeginObject(); - Object << "id"sv << Attachment.first; - Object << "type"sv - << "Standard"sv; - Object << "data"sv << Attach; - Object.EndObject(); - - Package.AddAttachment(Attach); - } - Object.EndArray(); - } - Package.SetObject(Object.Save()); - return Package; - }; - - std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes) - { - std::vector<std::pair<Oid, CompressedBuffer>> Result; - Result.reserve(Sizes.size()); - for (size_t Size : Sizes) - { - std::vector<uint8_t> Data; - Data.resize(Size); - uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data()); - for (size_t Idx = 0; Idx < Size / 2; ++Idx) - { - DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu); - } - if (Size & 1) - { - Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff); - } - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); - Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); - } - return Result; - } - - uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset) - { - if (RawOffset > 0) - { - uint64 BlockSize = 0; - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) - { - return 0; - } - return BlockSize > 0 ? RawOffset % BlockSize : 0; - } - return 0; - } - -} // namespace testutils - -TEST_CASE("project.store.create") -{ - using namespace std::literals; - - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::string_view ProjectName("proj1"sv); - std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc); - std::filesystem::path RootDir = TempDir.Path() / "root"; - std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; - std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; - std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; - - Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / ProjectName, - ProjectName, - RootDir.string(), - EngineRootDir.string(), - ProjectRootDir.string(), - ProjectFilePath.string())); - CHECK(ProjectStore.DeleteProject(ProjectName)); - CHECK(!Project->Exists(BasePath)); -} - -TEST_CASE("project.store.lifetimes") -{ - using namespace std::literals; - - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc); - std::filesystem::path RootDir = TempDir.Path() / "root"; - std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; - std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; - std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; - - Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv, - "proj1"sv, - RootDir.string(), - EngineRootDir.string(), - ProjectRootDir.string(), - ProjectFilePath.string())); - ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {}); - CHECK(Oplog != nullptr); - - std::filesystem::path DeletePath; - CHECK(Project->PrepareForDelete(DeletePath)); - CHECK(!DeletePath.empty()); - CHECK(Project->OpenOplog("oplog1") == nullptr); - // Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers - CHECK(Oplog->OplogCount() == 0); - // Project is still valid since we have a Ref to it - CHECK(Project->Identifier == "proj1"sv); -} - -TEST_CASE("project.store.gc") -{ - using namespace std::literals; - using namespace testutils; - - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc); - std::filesystem::path RootDir = TempDir.Path() / "root"; - std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; - - std::filesystem::path Project1RootDir = TempDir.Path() / "game1"; - std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject"; - { - CreateDirectories(Project1FilePath.parent_path()); - BasicFile ProjectFile; - ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); - } - - std::filesystem::path Project2RootDir = TempDir.Path() / "game2"; - std::filesystem::path Project2FilePath = TempDir.Path() / "game2" / "game.uproject"; - { - CreateDirectories(Project2FilePath.parent_path()); - BasicFile ProjectFile; - ProjectFile.Open(Project2FilePath, BasicFile::Mode::kTruncate); - } - - { - Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, - "proj1"sv, - RootDir.string(), - EngineRootDir.string(), - Project1RootDir.string(), - Project1FilePath.string())); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", {}); - CHECK(Oplog != nullptr); - - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); - } - - { - Ref<ProjectStore::Project> Project2(ProjectStore.NewProject(BasePath / "proj2"sv, - "proj2"sv, - RootDir.string(), - EngineRootDir.string(), - Project2RootDir.string(), - Project2FilePath.string())); - ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1", {}); - CHECK(Oplog != nullptr); - - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177}))); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9123, 383, 590, 96}))); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221}))); - } - - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - ProjectStore.GatherReferences(GcCtx); - size_t RefCount = 0; - GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); - CHECK(RefCount == 14); - ProjectStore.CollectGarbage(GcCtx); - CHECK(ProjectStore.OpenProject("proj1"sv)); - CHECK(ProjectStore.OpenProject("proj2"sv)); - } - - std::filesystem::remove(Project1FilePath); - - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - ProjectStore.GatherReferences(GcCtx); - size_t RefCount = 0; - GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; }); - CHECK(RefCount == 7); - ProjectStore.CollectGarbage(GcCtx); - CHECK(!ProjectStore.OpenProject("proj1"sv)); - CHECK(ProjectStore.OpenProject("proj2"sv)); - } -} - -TEST_CASE("project.store.partial.read") -{ - using namespace std::literals; - using namespace testutils; - - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc); - std::filesystem::path RootDir = TempDir.Path() / "root"sv; - std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; - - std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; - std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; - { - CreateDirectories(Project1FilePath.parent_path()); - BasicFile ProjectFile; - ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); - } - - std::vector<Oid> OpIds; - OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); - std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; - { - Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, - "proj1"sv, - RootDir.string(), - EngineRootDir.string(), - Project1RootDir.string(), - Project1FilePath.string())); - ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {}); - CHECK(Oplog != nullptr); - Attachments[OpIds[0]] = {}; - Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); - Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99}); - Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122}); - for (auto It : Attachments) - { - Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second)); - } - } - { - IoBuffer Chunk; - CHECK(ProjectStore - .GetChunk("proj1"sv, - "oplog1"sv, - Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), - HttpContentType::kCompressedBinary, - Chunk) - .first == HttpResponseCode::OK); - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); - CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize()); - } - - IoBuffer ChunkResult; - CHECK(ProjectStore - .GetChunkRange("proj1"sv, - "oplog1"sv, - OidAsString(Attachments[OpIds[2]][1].first), - 0, - ~0ull, - HttpContentType::kCompressedBinary, - ChunkResult) - .first == HttpResponseCode::OK); - CHECK(ChunkResult); - CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult)).DecodeRawSize() == - Attachments[OpIds[2]][1].second.DecodeRawSize()); - - IoBuffer PartialChunkResult; - CHECK(ProjectStore - .GetChunkRange("proj1"sv, - "oplog1"sv, - OidAsString(Attachments[OpIds[2]][1].first), - 5, - 1773, - HttpContentType::kCompressedBinary, - PartialChunkResult) - .first == HttpResponseCode::OK); - CHECK(PartialChunkResult); - IoHash PartialRawHash; - uint64_t PartialRawSize; - CompressedBuffer PartialCompressedResult = - CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult), PartialRawHash, PartialRawSize); - CHECK(PartialRawSize >= 1773); - - uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); - SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); - SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress(); - const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]); - const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData()); - CHECK(FullDataPtr[0] == PartialDataPtr[0]); -} - -TEST_CASE("project.store.block") -{ - using namespace std::literals; - using namespace testutils; - - std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489, - 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251, - 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}); - - std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes); - std::vector<SharedBuffer> Chunks; - Chunks.reserve(AttachmentSizes.size()); - for (const auto& It : AttachmentsWithId) - { - Chunks.push_back(It.second.GetCompressed().Flatten()); - } - CompressedBuffer Block = GenerateBlock(std::move(Chunks)); - IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer(); - CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {})); -} - -#endif - -void -prj_forcelink() -{ -} - -} // namespace zen diff --git a/zenserver/projectstore/projectstore.h b/zenserver/projectstore/projectstore.h deleted file mode 100644 index e4f664b85..000000000 --- a/zenserver/projectstore/projectstore.h +++ /dev/null @@ -1,372 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/uid.h> -#include <zencore/xxhash.h> -#include <zenhttp/httpserver.h> -#include <zenstore/gc.h> - -#include "monitoring/httpstats.h" - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_map.h> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -class CbPackage; -class CidStore; -class AuthMgr; -class ScrubContext; - -struct OplogEntry -{ - uint32_t OpLsn; - uint32_t OpCoreOffset; // note: Multiple of alignment! - uint32_t OpCoreSize; - uint32_t OpCoreHash; // Used as checksum - XXH3_128 OpKeyHash; // XXH128_canonical_t - - inline Oid OpKeyAsOId() const - { - Oid Id; - memcpy(Id.OidBits, &OpKeyHash, sizeof Id.OidBits); - return Id; - } -}; - -struct OplogEntryAddress -{ - uint64_t Offset; - uint64_t Size; -}; - -static_assert(IsPow2(sizeof(OplogEntry))); - -/** Project Store - - A project store consists of a number of Projects. - - Each project contains a number of oplogs (short for "operation log"). UE uses - one oplog per target platform to store the output of the cook process. - - An oplog consists of a sequence of "op" entries. Each entry is a structured object - containing references to attachments. Attachments are typically the serialized - package data split into separate chunks for bulk data, exports and header - information. - */ -class ProjectStore : public RefCounted, public GcStorage, public GcContributor -{ - struct OplogStorage; - -public: - ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc); - ~ProjectStore(); - - struct Project; - - struct Oplog - { - Oplog(std::string_view Id, - Project* Project, - CidStore& Store, - std::filesystem::path BasePath, - const std::filesystem::path& MarkerPath); - ~Oplog(); - - [[nodiscard]] static bool ExistsAt(std::filesystem::path BasePath); - - void Read(); - void Write(); - - void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn); - void IterateOplog(std::function<void(CbObject)>&& Fn); - void IterateOplogWithKey(std::function<void(int, const Oid&, CbObject)>&& Fn); - std::optional<CbObject> GetOpByKey(const Oid& Key); - std::optional<CbObject> GetOpByIndex(int Index); - int GetOpIndexByKey(const Oid& Key); - - IoBuffer FindChunk(Oid ChunkId); - - inline static const uint32_t kInvalidOp = ~0u; - - /** Persist a new oplog entry - * - * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected - */ - uint32_t AppendNewOplogEntry(CbPackage Op); - - uint32_t AppendNewOplogEntry(CbObject Core); - - enum UpdateType - { - kUpdateNewEntry, - kUpdateReplay - }; - - const std::string& OplogId() const { return m_OplogId; } - - const std::filesystem::path& TempPath() const { return m_TempPath; } - const std::filesystem::path& MarkerPath() const { return m_MarkerPath; } - - spdlog::logger& Log() { return m_OuterProject->Log(); } - void Flush(); - void Scrub(ScrubContext& Ctx) const; - void GatherReferences(GcContext& GcCtx); - uint64_t TotalSize() const; - - std::size_t OplogCount() const - { - RwLock::SharedLockScope _(m_OplogLock); - return m_LatestOpMap.size(); - } - - bool IsExpired() const; - std::filesystem::path PrepareForDelete(bool MoveFolder); - - private: - struct FileMapEntry - { - std::string ServerPath; - std::string ClientPath; - }; - - template<class V> - using OidMap = tsl::robin_map<Oid, V, Oid::Hasher>; - - Project* m_OuterProject = nullptr; - CidStore& m_CidStore; - std::filesystem::path m_BasePath; - std::filesystem::path m_MarkerPath; - std::filesystem::path m_TempPath; - - mutable RwLock m_OplogLock; - OidMap<IoHash> m_ChunkMap; // output data chunk id -> CAS address - OidMap<IoHash> m_MetaMap; // meta chunk id -> CAS address - OidMap<FileMapEntry> m_FileMap; // file id -> file map entry - int32_t m_ManifestVersion; // File system manifest version - tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file - OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key - - RefPtr<OplogStorage> m_Storage; - std::string m_OplogId; - - /** Scan oplog and register each entry, thus updating the in-memory tracking tables - */ - void ReplayLog(); - - struct OplogEntryMapping - { - struct Mapping - { - Oid Id; - IoHash Hash; - }; - struct FileMapping : public Mapping - { - std::string ServerPath; - std::string ClientPath; - }; - std::vector<Mapping> Chunks; - std::vector<Mapping> Meta; - std::vector<FileMapping> Files; - }; - - OplogEntryMapping GetMapping(CbObject Core); - - /** Update tracking metadata for a new oplog entry - * - * This is used during replay (and gets called as part of new op append) - * - * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected - */ - uint32_t RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, - const OplogEntryMapping& OpMapping, - const OplogEntry& OpEntry, - UpdateType TypeOfUpdate); - - void AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock, - Oid FileId, - IoHash Hash, - std::string_view ServerPath, - std::string_view ClientPath); - void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash); - void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid ChunkId, IoHash Hash); - }; - - struct Project : public RefCounted - { - std::string Identifier; - std::filesystem::path RootDir; - std::string EngineRootDir; - std::string ProjectRootDir; - std::string ProjectFilePath; - - Oplog* NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath); - Oplog* OpenOplog(std::string_view OplogId); - void DeleteOplog(std::string_view OplogId); - void IterateOplogs(std::function<void(const Oplog&)>&& Fn) const; - void IterateOplogs(std::function<void(Oplog&)>&& Fn); - std::vector<std::string> ScanForOplogs() const; - bool IsExpired() const; - - Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath); - virtual ~Project(); - - void Read(); - void Write(); - [[nodiscard]] static bool Exists(std::filesystem::path BasePath); - void Flush(); - void Scrub(ScrubContext& Ctx); - spdlog::logger& Log(); - void GatherReferences(GcContext& GcCtx); - uint64_t TotalSize() const; - bool PrepareForDelete(std::filesystem::path& OutDeletePath); - - private: - ProjectStore* m_ProjectStore; - CidStore& m_CidStore; - mutable RwLock m_ProjectLock; - std::map<std::string, std::unique_ptr<Oplog>> m_Oplogs; - std::vector<std::unique_ptr<Oplog>> m_DeletedOplogs; - std::filesystem::path m_OplogStoragePath; - - std::filesystem::path BasePathForOplog(std::string_view OplogId); - }; - - // Oplog* OpenProjectOplog(std::string_view ProjectId, std::string_view OplogId); - - Ref<Project> OpenProject(std::string_view ProjectId); - Ref<Project> NewProject(std::filesystem::path BasePath, - std::string_view ProjectId, - std::string_view RootDir, - std::string_view EngineRootDir, - std::string_view ProjectRootDir, - std::string_view ProjectFilePath); - bool DeleteProject(std::string_view ProjectId); - bool Exists(std::string_view ProjectId); - void Flush(); - void Scrub(ScrubContext& Ctx); - void DiscoverProjects(); - void IterateProjects(std::function<void(Project& Prj)>&& Fn); - - spdlog::logger& Log() { return m_Log; } - const std::filesystem::path& BasePath() const { return m_ProjectBasePath; } - - virtual void GatherReferences(GcContext& GcCtx) override; - virtual void CollectGarbage(GcContext& GcCtx) override; - virtual GcStorageSize StorageSize() const override; - - CbArray GetProjectsList(); - std::pair<HttpResponseCode, std::string> GetProjectFiles(const std::string_view ProjectId, - const std::string_view OplogId, - bool FilterClient, - CbObject& OutPayload); - std::pair<HttpResponseCode, std::string> GetChunkInfo(const std::string_view ProjectId, - const std::string_view OplogId, - const std::string_view ChunkId, - CbObject& OutPayload); - std::pair<HttpResponseCode, std::string> GetChunkRange(const std::string_view ProjectId, - const std::string_view OplogId, - const std::string_view ChunkId, - uint64_t Offset, - uint64_t Size, - ZenContentType AcceptType, - IoBuffer& OutChunk); - std::pair<HttpResponseCode, std::string> GetChunk(const std::string_view ProjectId, - const std::string_view OplogId, - const std::string_view Cid, - ZenContentType AcceptType, - IoBuffer& OutChunk); - - std::pair<HttpResponseCode, std::string> PutChunk(const std::string_view ProjectId, - const std::string_view OplogId, - const std::string_view Cid, - ZenContentType ContentType, - IoBuffer&& Chunk); - - std::pair<HttpResponseCode, std::string> WriteOplog(const std::string_view ProjectId, - const std::string_view OplogId, - IoBuffer&& Payload, - CbObject& OutResponse); - - std::pair<HttpResponseCode, std::string> ReadOplog(const std::string_view ProjectId, - const std::string_view OplogId, - const HttpServerRequest::QueryParams& Params, - CbObject& OutResponse); - - std::pair<HttpResponseCode, std::string> WriteBlock(const std::string_view ProjectId, - const std::string_view OplogId, - IoBuffer&& Payload); - - void Rpc(HttpServerRequest& HttpReq, - const std::string_view ProjectId, - const std::string_view OplogId, - IoBuffer&& Payload, - AuthMgr& AuthManager); - - std::pair<HttpResponseCode, std::string> Export(ProjectStore::Project& Project, - ProjectStore::Oplog& Oplog, - CbObjectView&& Params, - AuthMgr& AuthManager); - - std::pair<HttpResponseCode, std::string> Import(ProjectStore::Project& Project, - ProjectStore::Oplog& Oplog, - CbObjectView&& Params, - AuthMgr& AuthManager); - -private: - spdlog::logger& m_Log; - CidStore& m_CidStore; - std::filesystem::path m_ProjectBasePath; - mutable RwLock m_ProjectsLock; - std::map<std::string, Ref<Project>> m_Projects; - - std::filesystem::path BasePathForProject(std::string_view ProjectId); -}; - -////////////////////////////////////////////////////////////////////////// -// -// {project} a project identifier -// {target} a variation of the project, typically a build target -// {lsn} oplog entry sequence number -// -// /prj/{project} -// /prj/{project}/oplog/{target} -// /prj/{project}/oplog/{target}/{lsn} -// -// oplog entry -// -// id: {id} -// key: {} -// meta: {} -// data: [] -// refs: -// - -class HttpProjectService : public HttpService, public IHttpStatsProvider -{ -public: - HttpProjectService(CidStore& Store, ProjectStore* InProjectStore, HttpStatsService& StatsService, AuthMgr& AuthMgr); - ~HttpProjectService(); - - virtual const char* BaseUri() const override; - virtual void HandleRequest(HttpServerRequest& Request) override; - - virtual void HandleStatsRequest(HttpServerRequest& Request) override; - -private: - inline spdlog::logger& Log() { return m_Log; } - - spdlog::logger& m_Log; - CidStore& m_CidStore; - HttpRequestRouter m_Router; - Ref<ProjectStore> m_ProjectStore; - HttpStatsService& m_StatsService; - AuthMgr& m_AuthMgr; -}; - -void prj_forcelink(); - -} // namespace zen diff --git a/zenserver/projectstore/remoteprojectstore.cpp b/zenserver/projectstore/remoteprojectstore.cpp deleted file mode 100644 index 1e6ca51a1..000000000 --- a/zenserver/projectstore/remoteprojectstore.cpp +++ /dev/null @@ -1,1036 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "remoteprojectstore.h" - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compress.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zencore/stream.h> -#include <zencore/timer.h> -#include <zencore/workthreadpool.h> -#include <zenstore/cidstore.h> - -namespace zen { - -/* - OplogContainer - Binary("ops") // Compressed CompactBinary object to hide attachment references, also makes the oplog smaller - { - CbArray("ops") - { - CbObject Op - (CbFieldType::BinaryAttachment Attachments[]) - (OpData) - } - } - CbArray("blocks") - CbObject - CbFieldType::BinaryAttachment "rawhash" // Optional, only if we are creating blocks (Jupiter/File) - CbArray("chunks") - CbFieldType::Hash // Chunk hashes - CbArray("chunks") // Optional, only if we are not creating blocks (Zen) - CbFieldType::BinaryAttachment // Chunk attachment hashes - - CompressedBinary ChunkBlock - { - VarUInt ChunkCount - VarUInt ChunkSizes[ChunkCount] - uint8_t[chunksize])[ChunkCount] - } -*/ - -////////////////////////////// AsyncRemoteResult - -struct AsyncRemoteResult -{ - void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText) - { - int32_t Expected = 0; - if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1)) - { - m_ErrorReason = ErrorReason; - m_ErrorText = ErrorText; - } - } - bool IsError() const { return m_ErrorCode.load() != 0; } - int GetError() const { return m_ErrorCode.load(); }; - const std::string& GetErrorReason() const { return m_ErrorReason; }; - const std::string& GetErrorText() const { return m_ErrorText; }; - RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const - { - return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText}; - } - -private: - std::atomic<int32_t> m_ErrorCode = 0; - std::string m_ErrorReason; - std::string m_ErrorText; -}; - -bool -IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) -{ - IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer(); - - MemoryView BlockView = BlockPayload.GetView(); - const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); - uint32_t NumberSize; - uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); - ReadPtr += NumberSize; - std::vector<uint64_t> ChunkSizes; - ChunkSizes.reserve(ChunkCount); - while (ChunkCount--) - { - ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize)); - ReadPtr += NumberSize; - } - ptrdiff_t TempBufferLength = std::distance(reinterpret_cast<const uint8_t*>(BlockView.GetData()), ReadPtr); - ZEN_ASSERT(TempBufferLength > 0); - for (uint64_t ChunkSize : ChunkSizes) - { - IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize); - IoHash AttachmentRawHash; - uint64_t AttachmentRawSize; - CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize); - - if (!CompressedChunk) - { - ZEN_ERROR("Invalid chunk in block"); - return false; - } - Visitor(std::move(CompressedChunk), AttachmentRawHash); - ReadPtr += ChunkSize; - ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd()); - } - return true; -}; - -CompressedBuffer -GenerateBlock(std::vector<SharedBuffer>&& Chunks) -{ - size_t ChunkCount = Chunks.size(); - SharedBuffer SizeBuffer; - { - IoBuffer TempBuffer(ChunkCount * 9); - MutableMemoryView View = TempBuffer.GetMutableView(); - uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData()); - uint8_t* BufferEndPtr = BufferStartPtr; - BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); - auto It = Chunks.begin(); - while (It != Chunks.end()) - { - BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(It->GetSize()), BufferEndPtr); - It++; - } - ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); - ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); - SizeBuffer = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength))); - } - CompositeBuffer AllBuffers(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks))); - - CompressedBuffer CompressedBlock = - CompressedBuffer::Compress(std::move(AllBuffers), OodleCompressor::Mermaid, OodleCompressionLevel::None); - - return CompressedBlock; -} - -struct Block -{ - IoHash BlockHash; - std::vector<IoHash> ChunksInBlock; -}; - -void -CreateBlock(WorkerThreadPool& WorkerPool, - Latch& OpSectionsLatch, - std::vector<SharedBuffer>&& ChunksInBlock, - RwLock& SectionsLock, - std::vector<Block>& Blocks, - size_t BlockIndex, - const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, - AsyncRemoteResult& RemoteResult) -{ - OpSectionsLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable { - auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - if (!Chunks.empty()) - { - CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash - IoHash BlockHash = CompressedBlock.DecodeRawHash(); - AsyncOnBlock(std::move(CompressedBlock), BlockHash); - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope __(SectionsLock); - Blocks[BlockIndex].BlockHash = BlockHash; - } - } - }); -} - -size_t -AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks) -{ - size_t BlockIndex; - { - RwLock::ExclusiveLockScope _(BlocksLock); - BlockIndex = Blocks.size(); - Blocks.resize(BlockIndex + 1); - } - return BlockIndex; -} - -CbObject -BuildContainer(CidStore& ChunkStore, - ProjectStore::Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - bool BuildBlocks, - WorkerThreadPool& WorkerPool, - const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, - const std::function<void(const IoHash&)>& OnLargeAttachment, - const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, - AsyncRemoteResult& RemoteResult) -{ - using namespace std::literals; - - std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; - CbObjectWriter SectionOpsWriter; - SectionOpsWriter.BeginArray("ops"sv); - - size_t OpCount = 0; - - CbObject OplogContainerObject; - { - RwLock BlocksLock; - std::vector<Block> Blocks; - CompressedBuffer OpsBuffer; - - Latch BlockCreateLatch(1); - - std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; - - size_t BlockSize = 0; - std::vector<SharedBuffer> ChunksInBlock; - - std::unordered_set<IoHash, IoHash::Hasher> Attachments; - Oplog.IterateOplog([&Attachments, &SectionOpsWriter, &OpCount](CbObject Op) { - Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert(FieldView.AsAttachment()); }); - (SectionOpsWriter) << Op; - OpCount++; - }); - - for (const IoHash& AttachmentHash : Attachments) - { - IoBuffer Payload = ChunkStore.FindChunkByCid(AttachmentHash); - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {} for op", AttachmentHash), - {}); - ZEN_ERROR("Failed to build container ({}). Reason: '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()); - return {}; - } - uint64_t PayloadSize = Payload.GetSize(); - if (PayloadSize > MaxChunkEmbedSize) - { - if (LargeChunkHashes.insert(AttachmentHash).second) - { - OnLargeAttachment(AttachmentHash); - } - continue; - } - - if (!BlockAttachmentHashes.insert(AttachmentHash).second) - { - continue; - } - - BlockSize += PayloadSize; - if (BuildBlocks) - { - ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload))); - } - else - { - Payload = {}; - } - - if (BlockSize >= MaxBlockSize) - { - size_t BlockIndex = AddBlock(BlocksLock, Blocks); - if (BuildBlocks) - { - CreateBlock(WorkerPool, - BlockCreateLatch, - std::move(ChunksInBlock), - BlocksLock, - Blocks, - BlockIndex, - AsyncOnBlock, - RemoteResult); - } - else - { - OnBlockChunks(BlockAttachmentHashes); - } - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), - BlockAttachmentHashes.begin(), - BlockAttachmentHashes.end()); - } - BlockAttachmentHashes.clear(); - ChunksInBlock.clear(); - BlockSize = 0; - } - } - if (BlockSize > 0) - { - size_t BlockIndex = AddBlock(BlocksLock, Blocks); - if (BuildBlocks) - { - CreateBlock(WorkerPool, - BlockCreateLatch, - std::move(ChunksInBlock), - BlocksLock, - Blocks, - BlockIndex, - AsyncOnBlock, - RemoteResult); - } - else - { - OnBlockChunks(BlockAttachmentHashes); - } - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), - BlockAttachmentHashes.begin(), - BlockAttachmentHashes.end()); - } - BlockAttachmentHashes.clear(); - ChunksInBlock.clear(); - BlockSize = 0; - } - SectionOpsWriter.EndArray(); // "ops" - - CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); - ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize())); - - BlockCreateLatch.CountDown(); - while (!BlockCreateLatch.Wait(1000)) - { - ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining()); - } - - if (!RemoteResult.IsError()) - { - CbObjectWriter OplogContinerWriter; - RwLock::SharedLockScope _(BlocksLock); - OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); - - OplogContinerWriter.BeginArray("blocks"sv); - { - for (const Block& B : Blocks) - { - ZEN_ASSERT(!B.ChunksInBlock.empty()); - if (BuildBlocks) - { - ZEN_ASSERT(B.BlockHash != IoHash::Zero); - - OplogContinerWriter.BeginObject(); - { - OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); - OplogContinerWriter.BeginArray("chunks"sv); - { - for (const IoHash& RawHash : B.ChunksInBlock) - { - OplogContinerWriter.AddHash(RawHash); - } - } - OplogContinerWriter.EndArray(); // "chunks" - } - OplogContinerWriter.EndObject(); - continue; - } - - ZEN_ASSERT(B.BlockHash == IoHash::Zero); - OplogContinerWriter.BeginObject(); - { - OplogContinerWriter.BeginArray("chunks"sv); - { - for (const IoHash& RawHash : B.ChunksInBlock) - { - OplogContinerWriter.AddBinaryAttachment(RawHash); - } - } - OplogContinerWriter.EndArray(); - } - OplogContinerWriter.EndObject(); - } - } - OplogContinerWriter.EndArray(); // "blocks"sv - - OplogContinerWriter.BeginArray("chunks"sv); - { - for (const IoHash& AttachmentHash : LargeChunkHashes) - { - OplogContinerWriter.AddBinaryAttachment(AttachmentHash); - } - } - OplogContinerWriter.EndArray(); // "chunks" - - OplogContainerObject = OplogContinerWriter.Save(); - } - } - return OplogContainerObject; -} - -RemoteProjectStore::LoadContainerResult -BuildContainer(CidStore& ChunkStore, - ProjectStore::Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - bool BuildBlocks, - const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, - const std::function<void(const IoHash&)>& OnLargeAttachment, - const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks) -{ - // We are creating a worker thread pool here since we are uploading a lot of attachments in one go and we dont want to keep a - // WorkerThreadPool alive - size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); - WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); - - AsyncRemoteResult RemoteResult; - CbObject ContainerObject = BuildContainer(ChunkStore, - Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - BuildBlocks, - WorkerPool, - AsyncOnBlock, - OnLargeAttachment, - OnBlockChunks, - RemoteResult); - return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; -} - -RemoteProjectStore::Result -SaveOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - bool BuildBlocks, - bool UseTempBlocks, - bool ForceUpload) -{ - using namespace std::literals; - - Stopwatch Timer; - - // We are creating a worker thread pool here since we are uploading a lot of attachments in one go - // Doing upload is a rare and transient occation so we don't want to keep a WorkerThreadPool alive. - size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); - WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); - - std::filesystem::path AttachmentTempPath; - if (UseTempBlocks) - { - AttachmentTempPath = Oplog.TempPath(); - AttachmentTempPath.append(".pending"); - CreateDirectories(AttachmentTempPath); - } - - AsyncRemoteResult RemoteResult; - RwLock AttachmentsLock; - std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; - std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks; - - auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, - const IoHash& BlockHash) { - std::filesystem::path BlockPath = AttachmentTempPath; - BlockPath.append(BlockHash.ToHexString()); - if (!std::filesystem::exists(BlockPath)) - { - IoBuffer BlockBuffer; - try - { - BasicFile BlockFile; - BlockFile.Open(BlockPath, BasicFile::Mode::kTruncateDelete); - uint64_t Offset = 0; - for (const SharedBuffer& Buffer : CompressedBlock.GetCompressed().GetSegments()) - { - BlockFile.Write(Buffer.GetView(), Offset); - Offset += Buffer.GetSize(); - } - void* FileHandle = BlockFile.Detach(); - BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset); - } - catch (std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - Ex.what(), - "Unable to create temp block file"); - return; - } - - BlockBuffer.MarkAsDeleteOnClose(); - { - RwLock::ExclusiveLockScope __(AttachmentsLock); - CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)}); - } - ZEN_DEBUG("Saved temp block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); - } - }; - - auto UploadBlock = [&RemoteStore, &RemoteResult](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { - RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_ERROR("Failed to save attachment ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); - return; - } - ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); - }; - - std::vector<std::vector<IoHash>> BlockChunks; - auto OnBlockChunks = [&BlockChunks](const std::unordered_set<IoHash, IoHash::Hasher>& Chunks) { - BlockChunks.push_back({Chunks.begin(), Chunks.end()}); - ZEN_DEBUG("Found {} block chunks", Chunks.size()); - }; - - auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments](const IoHash& AttachmentHash) { - { - RwLock::ExclusiveLockScope _(AttachmentsLock); - LargeAttachments.insert(AttachmentHash); - } - ZEN_DEBUG("Found attachment {}", AttachmentHash); - }; - - std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock; - if (UseTempBlocks) - { - OnBlock = MakeTempBlock; - } - else - { - OnBlock = UploadBlock; - } - - CbObject OplogContainerObject = BuildContainer(ChunkStore, - Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - BuildBlocks, - WorkerPool, - OnBlock, - OnLargeAttachment, - OnBlockChunks, - RemoteResult); - - if (!RemoteResult.IsError()) - { - uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); - uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); - ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount); - RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); - if (ContainerSaveResult.ErrorCode) - { - RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); - ZEN_ERROR("Failed to save oplog container ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); - } - ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000))); - if (!ContainerSaveResult.Needs.empty()) - { - ZEN_INFO("Filtering needed attachments..."); - std::vector<IoHash> NeededLargeAttachments; - std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments; - NeededLargeAttachments.reserve(LargeAttachments.size()); - NeededOtherAttachments.reserve(CreatedBlocks.size()); - if (ForceUpload) - { - NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end()); - } - else - { - for (const IoHash& RawHash : ContainerSaveResult.Needs) - { - if (LargeAttachments.contains(RawHash)) - { - NeededLargeAttachments.push_back(RawHash); - continue; - } - NeededOtherAttachments.insert(RawHash); - } - } - - Latch SaveAttachmentsLatch(1); - if (!NeededLargeAttachments.empty()) - { - ZEN_INFO("Saving large attachments..."); - for (const IoHash& RawHash : NeededLargeAttachments) - { - if (RemoteResult.IsError()) - { - break; - } - SaveAttachmentsLatch.AddCount(1); - WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - - IoBuffer Payload; - if (auto It = CreatedBlocks.find(RawHash); It != CreatedBlocks.end()) - { - Payload = std::move(It->second); - } - else - { - Payload = ChunkStore.FindChunkByCid(RawHash); - } - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_ERROR("Failed to build container ({}). Reason: '{}'", - RemoteResult.GetErrorReason(), - RemoteResult.GetError()); - return; - } - - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - return; - }); - } - } - - if (!CreatedBlocks.empty()) - { - ZEN_INFO("Saving created block attachments..."); - for (auto& It : CreatedBlocks) - { - if (RemoteResult.IsError()) - { - break; - } - const IoHash& RawHash = It.first; - if (ForceUpload || NeededOtherAttachments.contains(RawHash)) - { - IoBuffer Payload = It.second; - ZEN_ASSERT(Payload); - SaveAttachmentsLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - return; - }); - } - It.second = {}; - } - } - - if (!BlockChunks.empty()) - { - ZEN_INFO("Saving chunk block attachments..."); - for (const std::vector<IoHash>& Chunks : BlockChunks) - { - if (RemoteResult.IsError()) - { - break; - } - std::vector<IoHash> NeededChunks; - if (ForceUpload) - { - NeededChunks = Chunks; - } - else - { - NeededChunks.reserve(Chunks.size()); - for (const IoHash& Chunk : Chunks) - { - if (NeededOtherAttachments.contains(Chunk)) - { - NeededChunks.push_back(Chunk); - } - } - if (NeededChunks.empty()) - { - continue; - } - } - SaveAttachmentsLatch.AddCount(1); - WorkerPool.ScheduleWork([&RemoteStore, - &ChunkStore, - &SaveAttachmentsLatch, - &RemoteResult, - &Chunks, - NeededChunks = std::move(NeededChunks), - ForceUpload]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - std::vector<SharedBuffer> ChunkBuffers; - ChunkBuffers.reserve(NeededChunks.size()); - for (const IoHash& Chunk : NeededChunks) - { - IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk); - if (!ChunkPayload) - { - RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), - fmt::format("Missing chunk {}"sv, Chunk), - fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); - ChunkBuffers.clear(); - break; - } - ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload))); - } - RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_ERROR("Failed to save attachments with {} chunks ({}). Reason: '{}'", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - ZEN_DEBUG("Saved {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - }); - } - } - SaveAttachmentsLatch.CountDown(); - while (!SaveAttachmentsLatch.Wait(1000)) - { - ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining()); - } - SaveAttachmentsLatch.Wait(); - } - - if (!RemoteResult.IsError()) - { - ZEN_INFO("Finalizing oplog container..."); - RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); - if (ContainerFinalizeResult.ErrorCode) - { - RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); - ZEN_ERROR("Failed to finalize oplog container {} ({}). Reason: '{}'", - ContainerSaveResult.RawHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - } - ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000))); - } - } - - RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - ZEN_INFO("Saved oplog {} in {}", - RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - return Result; -}; - -RemoteProjectStore::Result -SaveOplogContainer(ProjectStore::Oplog& Oplog, - const CbObject& ContainerObject, - const std::function<bool(const IoHash& RawHash)>& HasAttachment, - const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment) -{ - using namespace std::literals; - - Stopwatch Timer; - - CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); - for (CbFieldView LargeChunksField : LargeChunksArray) - { - IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); - if (HasAttachment(AttachmentHash)) - { - continue; - } - OnNeedAttachment(AttachmentHash); - }; - - CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); - for (CbFieldView BlockField : BlocksArray) - { - CbObjectView BlockView = BlockField.AsObjectView(); - IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); - - CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); - if (BlockHash == IoHash::Zero) - { - std::vector<IoHash> NeededChunks; - NeededChunks.reserve(ChunksArray.GetSize()); - for (CbFieldView ChunkField : ChunksArray) - { - IoHash ChunkHash = ChunkField.AsBinaryAttachment(); - if (HasAttachment(ChunkHash)) - { - continue; - } - NeededChunks.emplace_back(ChunkHash); - } - - if (!NeededChunks.empty()) - { - OnNeedBlock(IoHash::Zero, std::move(NeededChunks)); - } - continue; - } - - for (CbFieldView ChunkField : ChunksArray) - { - IoHash ChunkHash = ChunkField.AsHash(); - if (HasAttachment(ChunkHash)) - { - continue; - } - - OnNeedBlock(BlockHash, {}); - break; - } - }; - - MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); - IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); - IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); - - CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); - if (!SectionObject) - { - ZEN_ERROR("Failed to save oplog container. Reason: '{}'", "Section has unexpected data type"); - return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), - Timer.GetElapsedTimeMs() / 1000.500, - "Section has unexpected data type", - "Failed to save oplog container"}; - } - - CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); - for (CbFieldView OpEntry : OpsArray) - { - CbObjectView Core = OpEntry.AsObjectView(); - BinaryWriter Writer; - Core.CopyTo(Writer); - MemoryView OpView = Writer.GetView(); - IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); - CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); - const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Op); - if (OpLsn == ProjectStore::Oplog::kInvalidOp) - { - return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), - Timer.GetElapsedTimeMs() / 1000.500, - "Failed saving op", - "Failed to save oplog container"}; - } - ZEN_DEBUG("oplog entry #{}", OpLsn); - } - return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; -} - -RemoteProjectStore::Result -LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload) -{ - using namespace std::literals; - - Stopwatch Timer; - - // We are creating a worker thread pool here since we are download a lot of attachments in one go and we dont want to keep a - // WorkerThreadPool alive - size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); - WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); - - std::unordered_set<IoHash, IoHash::Hasher> Attachments; - std::vector<std::vector<IoHash>> ChunksInBlocks; - - RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); - if (LoadContainerResult.ErrorCode) - { - ZEN_WARN("Failed to load oplog container, reason: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode); - return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, - .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, - .Reason = LoadContainerResult.Reason, - .Text = LoadContainerResult.Text}; - } - ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000))); - - AsyncRemoteResult RemoteResult; - Latch AttachmentsWorkLatch(1); - - auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { - return !ForceDownload && ChunkStore.ContainsChunk(RawHash); - }; - auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult]( - const IoHash& BlockHash, - std::vector<IoHash>&& Chunks) { - if (BlockHash == IoHash::Zero) - { - AttachmentsWorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - - RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_ERROR("Failed to attachments with {} chunks ({}). Reason: '{}'", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - ZEN_DEBUG("Loaded {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - for (const auto& It : Result.Chunks) - { - ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); - } - }); - return; - } - AttachmentsWorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); - if (BlockResult.ErrorCode) - { - RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); - ZEN_ERROR("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); - - if (!IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); - })) - { - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Invalid format for block {}", BlockHash), - {}); - ZEN_ERROR("Failed to load oplog container, attachment {} has invalid format ({}). Reason: '{}'", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - }); - }; - - auto OnNeedAttachment = - [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) { - if (!Attachments.insert(RawHash).second) - { - return; - } - - AttachmentsWorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() { - auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) - { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); - ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}", - RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode); - return; - } - ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); - ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); - }); - }; - - RemoteProjectStore::Result Result = - SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); - - AttachmentsWorkLatch.CountDown(); - while (!AttachmentsWorkLatch.Wait(1000)) - { - ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining()); - } - AttachmentsWorkLatch.Wait(); - if (Result.ErrorCode == 0) - { - Result = RemoteResult.ConvertResult(); - } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - - ZEN_INFO("Loaded oplog {} in {}", - RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0))); - - return Result; -} - -} // namespace zen diff --git a/zenserver/projectstore/remoteprojectstore.h b/zenserver/projectstore/remoteprojectstore.h deleted file mode 100644 index dcabaedd4..000000000 --- a/zenserver/projectstore/remoteprojectstore.h +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "projectstore.h" - -#include <unordered_set> - -namespace zen { - -class CidStore; -class WorkerThreadPool; - -class RemoteProjectStore -{ -public: - struct Result - { - int32_t ErrorCode{}; - double ElapsedSeconds{}; - std::string Reason; - std::string Text; - }; - - struct SaveResult : public Result - { - std::unordered_set<IoHash, IoHash::Hasher> Needs; - IoHash RawHash; - }; - - struct SaveAttachmentResult : public Result - { - }; - - struct SaveAttachmentsResult : public Result - { - }; - - struct LoadAttachmentResult : public Result - { - IoBuffer Bytes; - }; - - struct LoadContainerResult : public Result - { - CbObject ContainerObject; - }; - - struct LoadAttachmentsResult : public Result - { - std::vector<std::pair<IoHash, CompressedBuffer>> Chunks; - }; - - struct RemoteStoreInfo - { - bool CreateBlocks; - bool UseTempBlockFiles; - std::string Description; - }; - - virtual ~RemoteProjectStore() {} - - virtual RemoteStoreInfo GetInfo() const = 0; - - virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) = 0; - virtual Result FinalizeContainer(const IoHash& RawHash) = 0; - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; - - virtual LoadContainerResult LoadContainer() = 0; - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0; - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0; -}; - -struct RemoteStoreOptions -{ - size_t MaxBlockSize = 128u * 1024u * 1024u; - size_t MaxChunkEmbedSize = 1024u * 1024u; -}; - -RemoteProjectStore::LoadContainerResult BuildContainer( - CidStore& ChunkStore, - ProjectStore::Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - bool BuildBlocks, - const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, - const std::function<void(const IoHash&)>& OnLargeAttachment, - const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks); - -RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, - const CbObject& ContainerObject, - const std::function<bool(const IoHash& RawHash)>& HasAttachment, - const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, - const std::function<void(const IoHash& RawHash)>& OnNeedAttachment); - -RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - bool BuildBlocks, - bool UseTempBlocks, - bool ForceUpload); - -RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload); - -CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks); -bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); - -} // namespace zen diff --git a/zenserver/projectstore/zenremoteprojectstore.cpp b/zenserver/projectstore/zenremoteprojectstore.cpp deleted file mode 100644 index 6ff471ae5..000000000 --- a/zenserver/projectstore/zenremoteprojectstore.cpp +++ /dev/null @@ -1,341 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "zenremoteprojectstore.h" - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compositebuffer.h> -#include <zencore/fmtutils.h> -#include <zencore/scopeguard.h> -#include <zencore/stream.h> -#include <zencore/timer.h> -#include <zenhttp/httpshared.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -using namespace std::literals; - -class ZenRemoteStore : public RemoteProjectStore -{ -public: - ZenRemoteStore(std::string_view HostAddress, - std::string_view Project, - std::string_view Oplog, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize) - : m_HostAddress(HostAddress) - , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress)) - , m_Project(Project) - , m_Oplog(Oplog) - , m_MaxBlockSize(MaxBlockSize) - , m_MaxChunkEmbedSize(MaxChunkEmbedSize) - { - } - - virtual RemoteStoreInfo GetInfo() const override - { - return {.CreateBlocks = false, .UseTempBlockFiles = false, .Description = fmt::format("[zen] {}"sv, m_HostAddress)}; - } - - virtual SaveResult SaveContainer(const IoBuffer& Payload) override - { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/save"sv, m_ProjectStoreUrl, m_Project, m_Oplog); - Session->SetUrl({SaveRequest}); - Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); - MemoryView Data(Payload.GetView()); - Session->SetBody({reinterpret_cast<const char*>(Data.GetData()), Data.GetSize()}); - cpr::Response Response = Session->Post(); - SaveResult Result = SaveResult{ConvertResult(Response)}; - - if (Result.ErrorCode) - { - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - IoBuffer ResponsePayload(IoBuffer::Wrap, Response.text.data(), Response.text.size()); - CbObject ResponseObject = LoadCompactBinaryObject(ResponsePayload); - if (!ResponseObject) - { - Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, - m_ProjectStoreUrl, - m_Project, - m_Oplog); - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView(); - for (CbFieldView FieldView : NeedsArray) - { - IoHash ChunkHash = FieldView.AsHash(); - Result.Needs.insert(ChunkHash); - } - - Result.RawHash = IoHash::HashBuffer(Payload); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override - { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); - Session->SetUrl({SaveRequest}); - Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); - cpr::Response Response = Session->Post(); - SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override - { - Stopwatch Timer; - - CbPackage RequestPackage; - { - CbObjectWriter RequestWriter; - RequestWriter.AddString("method"sv, "putchunks"sv); - RequestWriter.BeginArray("chunks"sv); - { - for (const SharedBuffer& Chunk : Chunks) - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize); - RequestWriter.AddHash(RawHash); - RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash)); - } - } - RequestWriter.EndArray(); // "chunks" - RequestPackage.SetObject(RequestWriter.Save()); - } - CompositeBuffer Payload = FormatPackageMessageBuffer(RequestPackage, FormatFlags::kDefault); - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); - Session->SetUrl({SaveRequest}); - Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); - - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); - cpr::Response Response = Session->Post(); - SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override - { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); - - CbObject Request; - { - CbObjectWriter RequestWriter; - RequestWriter.AddString("method"sv, "getchunks"sv); - RequestWriter.BeginArray("chunks"sv); - { - for (const IoHash& RawHash : RawHashes) - { - RequestWriter.AddHash(RawHash); - } - } - RequestWriter.EndArray(); // "chunks" - Request = RequestWriter.Save(); - } - IoBuffer Payload = Request.GetBuffer().AsIoBuffer(); - Session->SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()}); - Session->SetUrl(SaveRequest); - Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}, - {"Accept", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); - - cpr::Response Response = Session->Post(); - LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; - if (!Result.ErrorCode) - { - CbPackage Package = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size())); - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - Result.Chunks.reserve(Attachments.size()); - for (const CbAttachment& Attachment : Attachments) - { - Result.Chunks.emplace_back( - std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); - } - } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - }; - - virtual Result FinalizeContainer(const IoHash&) override - { - Stopwatch Timer; - - RwLock::ExclusiveLockScope _(SessionsLock); - Sessions.clear(); - return {.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; - } - - virtual LoadContainerResult LoadContainer() override - { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/load"sv, m_ProjectStoreUrl, m_Project, m_Oplog); - Session->SetUrl(SaveRequest); - Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); - Session->SetParameters( - {{"maxblocksize", fmt::format("{}", m_MaxBlockSize)}, {"maxchunkembedsize", fmt::format("{}", m_MaxChunkEmbedSize)}}); - cpr::Response Response = Session->Get(); - - LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; - if (!Result.ErrorCode) - { - Result.ContainerObject = LoadCompactBinaryObject(IoBuffer(IoBuffer::Clone, Response.text.data(), Response.text.size())); - if (!Result.ContainerObject) - { - Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, - m_ProjectStoreUrl, - m_Project, - m_Oplog); - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override - { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - - std::string LoadRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); - Session->SetUrl({LoadRequest}); - Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); - cpr::Response Response = Session->Get(); - LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; - if (!Result.ErrorCode) - { - Result.Bytes = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; - return Result; - } - -private: - std::unique_ptr<cpr::Session> AllocateSession() - { - RwLock::ExclusiveLockScope _(SessionsLock); - if (Sessions.empty()) - { - Sessions.emplace_back(std::make_unique<cpr::Session>()); - } - std::unique_ptr<cpr::Session> Session = std::move(Sessions.back()); - Sessions.pop_back(); - return Session; - } - - void ReleaseSession(std::unique_ptr<cpr::Session>&& Session) - { - RwLock::ExclusiveLockScope _(SessionsLock); - Sessions.emplace_back(std::move(Session)); - } - - static Result ConvertResult(const cpr::Response& Response) - { - std::string Text; - std::string Reason = Response.reason; - int32_t ErrorCode = 0; - if (Response.error.code != cpr::ErrorCode::OK) - { - ErrorCode = static_cast<int32_t>(Response.error.code); - if (!Response.error.message.empty()) - { - Reason = Response.error.message; - } - } - else if (!IsHttpSuccessCode(Response.status_code)) - { - ErrorCode = static_cast<int32_t>(Response.status_code); - - if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) - { - zen::HttpContentType ContentType = zen::ParseContentType(It->second); - if (ContentType == zen::HttpContentType::kText) - { - Text = Response.text; - } - } - - Reason = fmt::format("{}"sv, Response.status_code); - } - return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.elapsed, .Reason = Reason, .Text = Text}; - } - - RwLock SessionsLock; - std::vector<std::unique_ptr<cpr::Session>> Sessions; - - const std::string m_HostAddress; - const std::string m_ProjectStoreUrl; - const std::string m_Project; - const std::string m_Oplog; - const size_t m_MaxBlockSize; - const size_t m_MaxChunkEmbedSize; -}; - -std::unique_ptr<RemoteProjectStore> -CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) -{ - std::string Url = Options.Url; - if (Url.find("://"sv) == std::string::npos) - { - // Assume https URL - Url = fmt::format("http://{}"sv, Url); - } - std::unique_ptr<RemoteProjectStore> RemoteStore = - std::make_unique<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize); - return RemoteStore; -} - -} // namespace zen diff --git a/zenserver/projectstore/zenremoteprojectstore.h b/zenserver/projectstore/zenremoteprojectstore.h deleted file mode 100644 index ef9dcad8c..000000000 --- a/zenserver/projectstore/zenremoteprojectstore.h +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "remoteprojectstore.h" - -namespace zen { - -struct ZenRemoteStoreOptions : RemoteStoreOptions -{ - std::string Url; - std::string ProjectId; - std::string OplogId; -}; - -std::unique_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options); - -} // namespace zen |