diff options
| author | Dan Engelbrecht <[email protected]> | 2023-02-09 16:49:51 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-09 07:49:51 -0800 |
| commit | 2f872e432d4a77d1c2dd082cb97a0cbfddb3cc97 (patch) | |
| tree | d631da0746b78cad7140784de4e637bcfb4e1cac /zenserver | |
| parent | Update README.md (diff) | |
| download | zen-2f872e432d4a77d1c2dd082cb97a0cbfddb3cc97.tar.xz zen-2f872e432d4a77d1c2dd082cb97a0cbfddb3cc97.zip | |
oplog upload/download (#214)
- Feature: Zen server endpoint `prj/{project}/oplog/{log}/chunks` to post multiple attachments in one request.
- Feature: Zen server endpoint `prj/{project}/oplog/{log}/save` to save an oplog container. Accepts `CbObject` containing a compressed oplog and attachment references organized in blocks.
- Feature: Zen server endpoint `prj/{project}/oplog/{log}/load` to request an oplog container. Responds with an `CbObject` containing a compressed oplog and attachment references organized in blocks.
- Feature: Zen server endpoint `{project}/oplog/{log}/rpc` to initiate an import to or export from an external location and other operations. Use either JSon or CbPackage as payload.
- CbObject/JSon RPC format for `import` and `export` methods:
- CbObject RPC format for `getchunks` method, returns CbPackage with the found chunks, if all chunks are found the number of attachments matches number of chunks requested.
- Feature: Zen server `{project}/oplog/{log}/{hash}` now accepts `HttpVerb::kPost` as well as `HttpVerb::kGet`.
- Feature: Zen command line tool `oplog-export` to export an oplog to an external target using the zenserver oplog export endpoint.
- Feature: Zen command line tool `oplog-import` to import an oplog from an external source using the zenserver oplog import endpoint.
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/auth/oidc.cpp | 2 | ||||
| -rw-r--r-- | zenserver/projectstore/fileremoteprojectstore.cpp | 235 | ||||
| -rw-r--r-- | zenserver/projectstore/fileremoteprojectstore.h | 19 | ||||
| -rw-r--r-- | zenserver/projectstore/jupiterremoteprojectstore.cpp | 244 | ||||
| -rw-r--r-- | zenserver/projectstore/jupiterremoteprojectstore.h | 26 | ||||
| -rw-r--r-- | zenserver/projectstore/projectstore.cpp | 721 | ||||
| -rw-r--r-- | zenserver/projectstore/projectstore.h | 63 | ||||
| -rw-r--r-- | zenserver/projectstore/remoteprojectstore.cpp | 1036 | ||||
| -rw-r--r-- | zenserver/projectstore/remoteprojectstore.h | 111 | ||||
| -rw-r--r-- | zenserver/projectstore/zenremoteprojectstore.cpp | 341 | ||||
| -rw-r--r-- | zenserver/projectstore/zenremoteprojectstore.h | 18 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 42 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 1 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 2 |
14 files changed, 2798 insertions, 63 deletions
diff --git a/zenserver/auth/oidc.cpp b/zenserver/auth/oidc.cpp index 17b5bac08..d2265c22f 100644 --- a/zenserver/auth/oidc.cpp +++ b/zenserver/auth/oidc.cpp @@ -104,7 +104,7 @@ OidcClient::RefreshToken(std::string_view RefreshToken) if (Response.status_code != 200) { - return {.Reason = std::move(Response.reason)}; + return {.Reason = fmt::format("{} ({})", Response.reason, Response.text)}; } std::string JsonError; diff --git a/zenserver/projectstore/fileremoteprojectstore.cpp b/zenserver/projectstore/fileremoteprojectstore.cpp new file mode 100644 index 000000000..d7a34a6c2 --- /dev/null +++ b/zenserver/projectstore/fileremoteprojectstore.cpp @@ -0,0 +1,235 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "fileremoteprojectstore.h" + +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/timer.h> + +namespace zen { + +using namespace std::literals; + +class LocalExportProjectStore : public RemoteProjectStore +{ +public: + LocalExportProjectStore(std::string_view Name, + const std::filesystem::path& FolderPath, + bool ForceDisableBlocks, + bool ForceEnableTempBlocks) + : m_Name(Name) + , m_OutputPath(FolderPath) + { + if (ForceDisableBlocks) + { + m_EnableBlocks = false; + } + if (ForceEnableTempBlocks) + { + m_UseTempBlocks = true; + } + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = m_EnableBlocks, + .UseTempBlockFiles = m_UseTempBlocks, + .Description = fmt::format("[file] {}"sv, m_OutputPath)}; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + Stopwatch Timer; + SaveResult Result; + + { + CbObject ContainerObject = LoadCompactBinaryObject(Payload); + + ContainerObject.IterateAttachments([&](CbFieldView FieldView) { + IoHash AttachmentHash = FieldView.AsBinaryAttachment(); + std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash); + if (!std::filesystem::exists(AttachmentPath)) + { + Result.Needs.insert(AttachmentHash); + } + }); + } + + std::filesystem::path ContainerPath = m_OutputPath; + ContainerPath.append(m_Name); + + CreateDirectories(m_OutputPath); + BasicFile ContainerFile; + ContainerFile.Open(ContainerPath, BasicFile::Mode::kTruncate); + std::error_code Ec; + ContainerFile.WriteAll(Payload, Ec); + if (Ec) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = Ec.message(); + } + Result.RawHash = IoHash::HashBuffer(Payload); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + { + Stopwatch Timer; + SaveAttachmentResult Result; + std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); + if (!std::filesystem::exists(ChunkPath)) + { + try + { + CreateDirectories(ChunkPath.parent_path()); + + BasicFile ChunkFile; + ChunkFile.Open(ChunkPath, BasicFile::Mode::kTruncate); + size_t Offset = 0; + for (const SharedBuffer& Segment : Payload.GetSegments()) + { + ChunkFile.Write(Segment.GetView(), Offset); + Offset += Segment.GetSize(); + } + } + catch (std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = Ex.what(); + } + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + Stopwatch Timer; + + for (const SharedBuffer& Chunk : Chunks) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); + SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash()); + if (ChunkResult.ErrorCode) + { + ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return SaveAttachmentsResult{ChunkResult}; + } + } + SaveAttachmentsResult Result; + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual Result FinalizeContainer(const IoHash&) override { return {}; } + + virtual LoadContainerResult LoadContainer() override + { + Stopwatch Timer; + LoadContainerResult Result; + std::filesystem::path ContainerPath = m_OutputPath; + ContainerPath.append(m_Name); + if (!std::filesystem::is_regular_file(ContainerPath)) + { + Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); + Result.Reason = fmt::format("The file {} does not exist"sv, ContainerPath.string()); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + IoBuffer ContainerPayload; + { + BasicFile ContainerFile; + ContainerFile.Open(ContainerPath, BasicFile::Mode::kRead); + ContainerPayload = ContainerFile.ReadAll(); + } + Result.ContainerObject = LoadCompactBinaryObject(ContainerPayload); + if (!Result.ContainerObject) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The file {} is not formatted as a compact binary object"sv, ContainerPath.string()); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + Stopwatch Timer; + LoadAttachmentResult Result; + std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); + if (!std::filesystem::is_regular_file(ChunkPath)) + { + Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); + Result.Reason = fmt::format("The file {} does not exist"sv, ChunkPath.string()); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + { + BasicFile ChunkFile; + ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); + Result.Bytes = ChunkFile.ReadAll(); + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + Stopwatch Timer; + LoadAttachmentsResult Result; + for (const IoHash& Hash : RawHashes) + { + LoadAttachmentResult ChunkResult = LoadAttachment(Hash); + if (ChunkResult.ErrorCode) + { + ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return LoadAttachmentsResult{ChunkResult}; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); + } + return Result; + } + +private: + std::filesystem::path GetAttachmentPath(const IoHash& RawHash) const + { + ExtendablePathBuilder<128> ShardedPath; + ShardedPath.Append(m_OutputPath.c_str()); + ExtendableStringBuilder<64> HashString; + RawHash.ToHexString(HashString); + const char* str = HashString.c_str(); + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str, str + 3); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 3, str + 5); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 5, str + 40); + + return ShardedPath.ToPath(); + } + + const std::string m_Name; + const std::filesystem::path m_OutputPath; + bool m_EnableBlocks = true; + bool m_UseTempBlocks = false; +}; + +std::unique_ptr<RemoteProjectStore> +CreateFileRemoteStore(const FileRemoteStoreOptions& Options) +{ + std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<LocalExportProjectStore>(Options.Name, + std::filesystem::path(Options.FolderPath), + Options.ForceDisableBlocks, + Options.ForceEnableTempBlocks); + return RemoteStore; +} + +} // namespace zen diff --git a/zenserver/projectstore/fileremoteprojectstore.h b/zenserver/projectstore/fileremoteprojectstore.h new file mode 100644 index 000000000..68d1eb71e --- /dev/null +++ b/zenserver/projectstore/fileremoteprojectstore.h @@ -0,0 +1,19 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "remoteprojectstore.h" + +namespace zen { + +struct FileRemoteStoreOptions : RemoteStoreOptions +{ + std::filesystem::path FolderPath; + std::string Name; + bool ForceDisableBlocks; + bool ForceEnableTempBlocks; +}; + +std::unique_ptr<RemoteProjectStore> CreateFileRemoteStore(const FileRemoteStoreOptions& Options); + +} // namespace zen diff --git a/zenserver/projectstore/jupiterremoteprojectstore.cpp b/zenserver/projectstore/jupiterremoteprojectstore.cpp new file mode 100644 index 000000000..66cf3c4f8 --- /dev/null +++ b/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -0,0 +1,244 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "jupiterremoteprojectstore.h" + +#include <zencore/compress.h> +#include <zencore/fmtutils.h> + +#include <auth/authmgr.h> +#include <upstream/jupiter.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class JupiterRemoteStore : public RemoteProjectStore +{ +public: + JupiterRemoteStore(Ref<CloudCacheClient>&& CloudClient, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& Key, + bool ForceDisableBlocks, + bool ForceDisableTempBlocks) + : m_CloudClient(CloudClient) + , m_Namespace(Namespace) + , m_Bucket(Bucket) + , m_Key(Key) + { + if (ForceDisableBlocks) + { + m_EnableBlocks = false; + } + if (ForceDisableTempBlocks) + { + m_UseTempBlocks = false; + } + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = m_EnableBlocks, + .UseTempBlockFiles = m_UseTempBlocks, + .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key)}; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + const int32_t MaxAttempts = 3; + PutRefResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); + } + } + + return SaveResult{ConvertResult(Result), {Result.Needs.begin(), Result.Needs.end()} /*, {}*/, IoHash::HashBuffer(Payload)}; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + { + const int32_t MaxAttempts = 3; + CloudCacheResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); + } + } + + return SaveAttachmentResult{ConvertResult(Result)}; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + SaveAttachmentsResult Result; + for (const SharedBuffer& Chunk : Chunks) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); + SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash()); + if (ChunkResult.ErrorCode) + { + return SaveAttachmentsResult{ChunkResult}; + } + } + return Result; + } + + virtual Result FinalizeContainer(const IoHash& RawHash) override + { + const int32_t MaxAttempts = 3; + CloudCacheResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); + } + } + return ConvertResult(Result); + } + + virtual LoadContainerResult LoadContainer() override + { + const int32_t MaxAttempts = 3; + CloudCacheResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.GetRef(m_Namespace, m_Bucket, m_Key, ZenContentType::kCbObject); + } + } + + if (Result.ErrorCode || !Result.Success) + { + return LoadContainerResult{ConvertResult(Result)}; + } + + CbObject ContainerObject = LoadCompactBinaryObject(Result.Response); + if (!ContainerObject) + { + return LoadContainerResult{ + RemoteProjectStore::Result{ + .ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + .ElapsedSeconds = Result.ElapsedSeconds, + .Reason = fmt::format("The ref {}/{}/{} is not formatted as a compact binary object"sv, m_Namespace, m_Bucket, m_Key)}, + std::move(ContainerObject)}; + } + + return LoadContainerResult{ConvertResult(Result), std::move(ContainerObject)}; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + const int32_t MaxAttempts = 3; + CloudCacheResult Result; + { + CloudCacheSession Session(m_CloudClient.Get()); + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.GetCompressedBlob(m_Namespace, RawHash); + } + } + return LoadAttachmentResult{ConvertResult(Result), std::move(Result.Response)}; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + LoadAttachmentsResult Result; + for (const IoHash& Hash : RawHashes) + { + LoadAttachmentResult ChunkResult = LoadAttachment(Hash); + if (ChunkResult.ErrorCode) + { + return LoadAttachmentsResult{ChunkResult}; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); + } + return Result; + } + +private: + static Result ConvertResult(const CloudCacheResult& Response) + { + std::string Text; + int32_t ErrorCode = 0; + if (Response.ErrorCode != 0) + { + ErrorCode = Response.ErrorCode; + } + else if (!Response.Success) + { + ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + if (Response.Response.GetContentType() == ZenContentType::kText) + { + Text = + std::string(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()), Response.Response.GetSize()); + } + } + return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; + } + + Ref<CloudCacheClient> m_CloudClient; + const std::string m_Namespace; + const std::string m_Bucket; + const IoHash m_Key; + bool m_EnableBlocks = true; + bool m_UseTempBlocks = true; +}; + +std::unique_ptr<RemoteProjectStore> +CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options) +{ + std::string Url = Options.Url; + if (Url.find("://"sv) == std::string::npos) + { + // Assume https URL + Url = fmt::format("https://{}"sv, Url); + } + CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv, + .ServiceUrl = Url, + .ConnectTimeout = std::chrono::milliseconds(2000), + .Timeout = std::chrono::milliseconds(60000)}; + // 1) Access token as parameter in request + // 2) Environment variable (different win vs linux/mac) + // 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider + + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + if (!Options.AccessToken.empty()) + { + TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = Options.AccessToken]() { + return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()}; + }); + } + else + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() { + AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + + Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider))); + + std::unique_ptr<RemoteProjectStore> RemoteStore = std::make_unique<JupiterRemoteStore>(std::move(CloudClient), + Options.Namespace, + Options.Bucket, + Options.Key, + Options.ForceDisableBlocks, + Options.ForceDisableTempBlocks); + return RemoteStore; +} + +} // namespace zen diff --git a/zenserver/projectstore/jupiterremoteprojectstore.h b/zenserver/projectstore/jupiterremoteprojectstore.h new file mode 100644 index 000000000..31548af22 --- /dev/null +++ b/zenserver/projectstore/jupiterremoteprojectstore.h @@ -0,0 +1,26 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "remoteprojectstore.h" + +namespace zen { + +class AuthMgr; + +struct JupiterRemoteStoreOptions : RemoteStoreOptions +{ + std::string Url; + std::string Namespace; + std::string Bucket; + IoHash Key; + std::string OpenIdProvider; + std::string AccessToken; + AuthMgr& AuthManager; + bool ForceDisableBlocks; + bool ForceDisableTempBlocks; +}; + +std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options); + +} // namespace zen diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp index a07698d50..82aac1605 100644 --- a/zenserver/projectstore/projectstore.cpp +++ b/zenserver/projectstore/projectstore.cpp @@ -5,32 +5,31 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> -#include <zencore/compactbinaryvalue.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/stream.h> -#include <zencore/string.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> #include <zencore/timer.h> #include <zencore/trace.h> -#include <zencore/workthreadpool.h> #include <zenhttp/httpshared.h> #include <zenstore/caslog.h> +#include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> -#include <zenutil/basicfile.h> -#include "config.h" - -#include <latch> +#include "fileremoteprojectstore.h" +#include "jupiterremoteprojectstore.h" +#include "remoteprojectstore.h" +#include "zenremoteprojectstore.h" ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> #include <xxh3.h> ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS +# include <zencore/testing.h> +# include <zencore/testutils.h> #endif // ZEN_WITH_TESTS namespace zen { @@ -69,6 +68,149 @@ namespace { Sleep(100); } while (true); } + + std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params, + AuthMgr& AuthManager, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize) + { + using namespace std::literals; + + std::unique_ptr<RemoteProjectStore> RemoteStore; + + if (CbObjectView File = Params["file"sv].AsObjectView(); File) + { + std::filesystem::path FolderPath(File["path"sv].AsString()); + if (FolderPath.empty()) + { + return {nullptr, "Missing file path"}; + } + std::string_view Name(File["name"sv].AsString()); + if (Name.empty()) + { + return {nullptr, "Missing file name"}; + } + bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false); + bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false); + + FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + FolderPath, + std::string(Name), + ForceDisableBlocks, + ForceEnableTempBlocks}; + RemoteStore = CreateFileRemoteStore(Options); + } + + if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud) + { + std::string_view CloudServiceUrl = Cloud["url"sv].AsString(); + if (CloudServiceUrl.empty()) + { + return {nullptr, "Missing service url"}; + } + + std::string Url = cpr::util::urlDecode(std::string(CloudServiceUrl)); + std::string_view Namespace = Cloud["namespace"sv].AsString(); + if (Namespace.empty()) + { + return {nullptr, "Missing namespace"}; + } + std::string_view Bucket = Cloud["bucket"sv].AsString(); + if (Bucket.empty()) + { + return {nullptr, "Missing bucket"}; + } + std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString(); + std::string AccessToken = std::string(Cloud["access-token"sv].AsString()); + if (AccessToken.empty()) + { +#if PLATFORM_WINDOWS + + CHAR EnvVariableBuffer[1023 + 1]; + DWORD RESULT = GetEnvironmentVariableA("UE-CloudDataCacheAccessToken", EnvVariableBuffer, sizeof(EnvVariableBuffer)); + if (RESULT > 0 && RESULT < sizeof(EnvVariableBuffer)) + { + AccessToken = std::string(EnvVariableBuffer); + } +#endif +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + char* EnvVariable = getenv("UE_CloudDataCacheAccessToken"); + if (EnvVariable) + { + AccessToken = std::string(EnvVariable); + } +#endif + } + std::string_view KeyParam = Cloud["key"sv].AsString(); + if (KeyParam.empty()) + { + return {nullptr, "Missing key"}; + } + if (KeyParam.length() != IoHash::StringLength) + { + return {nullptr, "Invalid key"}; + } + IoHash Key = IoHash::FromHexString(KeyParam); + if (Key == IoHash::Zero) + { + return {nullptr, "Invalid key string"}; + } + bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false); + bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false); + + JupiterRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + Url, + std::string(Namespace), + std::string(Bucket), + Key, + std::string(OpenIdProvider), + AccessToken, + AuthManager, + ForceDisableBlocks, + ForceDisableTempBlocks}; + RemoteStore = CreateJupiterRemoteStore(Options); + } + + if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen) + { + std::string_view Url = Zen["url"sv].AsString(); + std::string_view Project = Zen["project"sv].AsString(); + if (Project.empty()) + { + return {nullptr, "Missing project"}; + } + std::string_view Oplog = Zen["oplog"sv].AsString(); + if (Oplog.empty()) + { + return {nullptr, "Missing oplog"}; + } + ZenRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + std::string(Url), + std::string(Project), + std::string(Oplog)}; + RemoteStore = CreateZenRemoteStore(Options); + } + + if (!RemoteStore) + { + return {nullptr, "Unknown remote store type"}; + } + + return {std::move(RemoteStore), ""}; + } + + std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result) + { + if (Result.ErrorCode == 0) + { + return {HttpResponseCode::OK, Result.Text}; + } + return {static_cast<HttpResponseCode>(Result.ErrorCode), + Result.Reason.empty() ? Result.Text + : Result.Text.empty() ? Result.Reason + : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)}; + } + } // namespace ////////////////////////////////////////////////////////////////////////// @@ -794,7 +936,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) XXH3_128 KeyHash = KeyHasher.GetHash(); RefPtr<OplogStorage> Storage; - { RwLock::SharedLockScope _(m_OplogLock); Storage = m_Storage; @@ -1613,7 +1754,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, if (!FoundLog) { - return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; } if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) @@ -1636,7 +1777,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, { IoHash RawHash; uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); ZEN_ASSERT(!Compressed.IsNull()); if (IsOffset) @@ -1679,7 +1820,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, { Size = Chunk.GetSize() - Offset; } - OutChunk = IoBuffer(Chunk, Offset, Size); + OutChunk = IoBuffer(std::move(Chunk), Offset, Size); OutChunk.SetContentType(ContentType); } @@ -1693,8 +1834,6 @@ ProjectStore::GetChunk(const std::string_view ProjectId, ZenContentType AcceptType, IoBuffer& OutChunk) { - using namespace std::literals; - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); if (!Project) { @@ -1705,7 +1844,7 @@ ProjectStore::GetChunk(const std::string_view ProjectId, if (!FoundLog) { - return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; } if (Cid.length() != IoHash::StringLength) @@ -1721,25 +1860,401 @@ ProjectStore::GetChunk(const std::string_view ProjectId, return {HttpResponseCode::NotFound, fmt::format("chunk - '{}' MISSING", Cid)}; } - if (AcceptType == HttpContentType::kBinary) + if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk)); OutChunk = Compressed.Decompress().AsIoBuffer(); - OutChunk.SetContentType(HttpContentType::kBinary); + OutChunk.SetContentType(ZenContentType::kBinary); } else { - OutChunk.SetContentType(HttpContentType::kCompressedBinary); + OutChunk.SetContentType(ZenContentType::kCompressedBinary); } return {HttpResponseCode::OK, {}}; } +std::pair<HttpResponseCode, std::string> +ProjectStore::PutChunk(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view Cid, + ZenContentType ContentType, + IoBuffer&& Chunk) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + if (Cid.length() != IoHash::StringLength) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk put request for invalid chunk hash '{}'", Cid)}; + } + + const IoHash Hash = IoHash::FromHexString(Cid); + + if (ContentType != HttpContentType::kCompressedBinary) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid content type for chunk '{}'", Cid)}; + } + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); + if (RawHash != Hash) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid payload format for chunk '{}'", Cid)}; + } + + CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, Hash); + return {Result.New ? HttpResponseCode::Created : HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, CbObject& OutResponse) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); + + if (!Oplog) + { + return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + CbObject ContainerObject = LoadCompactBinaryObject(Payload); + if (!ContainerObject) + { + return {HttpResponseCode::BadRequest, "Invalid payload format"}; + } + + CidStore& ChunkStore = m_CidStore; + RwLock AttachmentsLock; + std::unordered_set<IoHash, IoHash::Hasher> Attachments; + + auto HasAttachment = [&ChunkStore](const IoHash& RawHash) { return ChunkStore.ContainsChunk(RawHash); }; + auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) { + RwLock::ExclusiveLockScope _(AttachmentsLock); + if (BlockHash != IoHash::Zero) + { + Attachments.insert(BlockHash); + } + else + { + Attachments.insert(ChunkHashes.begin(), ChunkHashes.end()); + } + }; + auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) { + RwLock::ExclusiveLockScope _(AttachmentsLock); + Attachments.insert(RawHash); + }; + + RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + + if (RemoteResult.ErrorCode) + { + return ConvertResult(RemoteResult); + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + { + for (const IoHash& Hash : Attachments) + { + ZEN_DEBUG("Need attachment {}", Hash); + Cbo << Hash; + } + } + Cbo.EndArray(); // "need" + + OutResponse = Cbo.Save(); + return {HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::ReadOplog(const std::string_view ProjectId, + const std::string_view OplogId, + const HttpServerRequest::QueryParams& Params, + CbObject& OutResponse) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); + + if (!Oplog) + { + return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + size_t MaxBlockSize = 128u * 1024u * 1024u; + if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false) + { + if (auto Value = ParseInt<size_t>(Param)) + { + MaxBlockSize = Value.value(); + } + } + size_t MaxChunkEmbedSize = 1024u * 1024u; + if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false) + { + if (auto Value = ParseInt<size_t>(Param)) + { + MaxChunkEmbedSize = Value.value(); + } + } + + CidStore& ChunkStore = m_CidStore; + + RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( + ChunkStore, + *Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + false, + [](CompressedBuffer&&, const IoHash) {}, + [](const IoHash&) {}, + [](const std::unordered_set<IoHash, IoHash::Hasher>) {}); + + OutResponse = std::move(ContainerResult.ContainerObject); + return ConvertResult(ContainerResult); +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)}; + } + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); + + if (!Oplog) + { + return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + + if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer(); + m_CidStore.AddChunk(Compressed, AttachmentRawHash); + ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize()); + })) + { + return {HttpResponseCode::BadRequest, "Invalid chunk in block"}; + } + + return {HttpResponseCode::OK, {}}; +} + +void +ProjectStore::Rpc(HttpServerRequest& HttpReq, + const std::string_view ProjectId, + const std::string_view OplogId, + IoBuffer&& Payload, + AuthMgr& AuthManager) +{ + using namespace std::literals; + HttpContentType PayloadContentType = HttpReq.RequestContentType(); + CbPackage Package; + CbObject Cb; + switch (PayloadContentType) + { + case HttpContentType::kJSON: + case HttpContentType::kUnknownContentType: + case HttpContentType::kText: + { + std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); + Cb = LoadCompactBinaryFromJson(JsonText).AsObject(); + if (!Cb) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected JSON format"); + } + } + break; + case HttpContentType::kCbObject: + Cb = LoadCompactBinaryObject(Payload); + if (!Cb) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected compact binary format"); + } + break; + case HttpContentType::kCbPackage: + Package = ParsePackageMessage(Payload); + Cb = Package.GetObject(); + if (!Cb) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected package message format"); + } + break; + default: + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type"); + } + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Rpc oplog request for unknown project '{}'", ProjectId)); + } + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId); + + if (!Oplog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); + } + + std::string_view Method = Cb["method"sv].AsString(); + + if (Method == "import") + { + std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + else if (Method == "export") + { + std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); + if (Result.second.empty()) + { + return HttpReq.WriteResponse(Result.first); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + else if (Method == "getchunks") + { + CbPackage ResponsePackage; + { + CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); + CbObjectWriter ResponseWriter; + ResponseWriter.BeginArray("chunks"sv); + for (CbFieldView FieldView : ChunksArray) + { + IoHash RawHash = FieldView.AsHash(); + IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash); + if (ChunkBuffer) + { + ResponseWriter.AddHash(RawHash); + ResponsePackage.AddAttachment( + CbAttachment(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)), RawHash)); + } + } + ResponseWriter.EndArray(); + ResponsePackage.SetObject(ResponseWriter.Save()); + } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault); + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else if (Method == "putchunks") + { + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + for (const CbAttachment& Attachment : Attachments) + { + IoHash RawHash = Attachment.GetHash(); + CompressedBuffer Compressed = Attachment.AsCompressedBinary(); + m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly); + } + return HttpReq.WriteResponse(HttpResponseCode::OK); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) +{ + using namespace std::literals; + + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); + bool Force = Params["force"sv].AsBool(false); + + std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = + CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); + + if (RemoteStoreResult.first == nullptr) + { + return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; + } + std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); + RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + + ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", + Project.Identifier, + Oplog.OplogId(), + StoreInfo.Description, + NiceBytes(MaxBlockSize), + NiceBytes(MaxChunkEmbedSize)); + + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *RemoteStore, + Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + StoreInfo.CreateBlocks, + StoreInfo.UseTempBlockFiles, + Force); + + return ConvertResult(Result); +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) +{ + using namespace std::literals; + + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); + bool Force = Params["force"sv].AsBool(false); + + std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = + CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); + + if (RemoteStoreResult.first == nullptr) + { + return {HttpResponseCode::BadRequest, RemoteStoreResult.second}; + } + std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first); + RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + + ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); + RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force); + return ConvertResult(Result); +} + ////////////////////////////////////////////////////////////////////////// -HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) +HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, AuthMgr& AuthMgr) : m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectStore(Projects) +, m_AuthMgr(AuthMgr) { using namespace std::literals; @@ -1918,7 +2433,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) static_cast<int>(Result.first), Result.second); } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); }, HttpVerb::kGet); @@ -2023,33 +2537,60 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& Cid = Req.GetCapture(3); - HttpContentType AcceptType = HttpReq.AcceptContentType(); - - IoBuffer Value; - std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& Cid = Req.GetCapture(3); + HttpContentType AcceptType = HttpReq.AcceptContentType(); + HttpContentType RequestType = HttpReq.RequestContentType(); - if (Result.first == HttpResponseCode::OK) - { - return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value); - } - else if (Result.first == HttpResponseCode::NotFound) - { - ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid); - } - else + switch (Req.ServerRequest().RequestVerb()) { - ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", - ToString(HttpReq.RequestVerb()), - HttpReq.QueryString(), - static_cast<int>(Result.first), - Result.second); + case HttpVerb::kGet: + { + IoBuffer Value; + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value); + + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value); + } + else if (Result.first == HttpResponseCode::NotFound) + { + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + case HttpVerb::kPost: + { + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->PutChunk(ProjectId, OplogId, Cid, RequestType, HttpReq.ReadPayload()); + if (Result.first == HttpResponseCode::OK || Result.first == HttpResponseCode::Created) + { + return HttpReq.WriteResponse(Result.first); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + static_cast<int>(Result.first), + Result.second); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + } + break; } - return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); }, - HttpVerb::kGet); + HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/prep", @@ -2556,6 +3097,67 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) } }, HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete); + + // Push a oplog container + m_Router.RegisterRoute( + "{project}/oplog/{log}/save", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + if (HttpReq.RequestContentType() != HttpContentType::kCbObject) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type"); + } + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + + CbObject Response; + std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->WriteOplog(ProjectId, OplogId, std::move(Payload), Response); + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Response); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kPost); + + // Pull a oplog container + m_Router.RegisterRoute( + "{project}/oplog/{log}/load", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + if (HttpReq.AcceptContentType() != HttpContentType::kCbObject) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type"); + } + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + + CbObject Response; + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->ReadOplog(ProjectId, OplogId, Req.ServerRequest().GetQueryParams(), Response); + if (Result.first == HttpResponseCode::OK) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Response); + } + return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); + }, + HttpVerb::kGet); + + // Do an rpc style operation on project/oplog + m_Router.RegisterRoute( + "{project}/oplog/{log}/rpc", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + IoBuffer Payload = Req.ServerRequest().ReadPayload(); + + m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr); + }, + HttpVerb::kPost); } HttpProjectService::~HttpProjectService() @@ -2625,9 +3227,14 @@ namespace testutils { { std::vector<uint8_t> Data; Data.resize(Size); - for (size_t Idx = 0; Idx < Size; ++Idx) + uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data()); + for (size_t Idx = 0; Idx < Size / 2; ++Idx) { - Data[Idx] = Idx % 255; + DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu); + } + if (Size & 1) + { + Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff); } CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); @@ -2908,6 +3515,28 @@ TEST_CASE("project.store.partial.read") const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData()); CHECK(FullDataPtr[0] == PartialDataPtr[0]); } + +TEST_CASE("project.store.block") +{ + using namespace std::literals; + using namespace testutils; + + std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489, + 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251, + 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225}); + + std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes); + std::vector<SharedBuffer> Chunks; + Chunks.reserve(AttachmentSizes.size()); + for (const auto& It : AttachmentsWithId) + { + Chunks.push_back(It.second.GetCompressed().Flatten()); + } + CompressedBuffer Block = GenerateBlock(std::move(Chunks)); + IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer(); + CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {})); +} + #endif void diff --git a/zenserver/projectstore/projectstore.h b/zenserver/projectstore/projectstore.h index 6b214d5a2..928a74f59 100644 --- a/zenserver/projectstore/projectstore.h +++ b/zenserver/projectstore/projectstore.h @@ -2,18 +2,11 @@ #pragma once -#include <zencore/logging.h> #include <zencore/uid.h> #include <zencore/xxhash.h> #include <zenhttp/httpserver.h> -#include <zenstore/cidstore.h> #include <zenstore/gc.h> -#include <filesystem> -#include <map> -#include <optional> -#include <string> - ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> ZEN_THIRD_PARTY_INCLUDES_END @@ -21,6 +14,9 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { class CbPackage; +class CidStore; +class AuthMgr; +class ScrubContext; struct OplogEntry { @@ -141,13 +137,13 @@ public: std::filesystem::path m_MarkerPath; std::filesystem::path m_TempPath; - mutable RwLock m_OplogLock; - OidMap<IoHash> m_ChunkMap; // output data chunk id -> CAS address - OidMap<IoHash> m_MetaMap; // meta chunk id -> CAS address - OidMap<FileMapEntry> m_FileMap; // file id -> file map entry - int32_t m_ManifestVersion; // File system manifest version - std::map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file - OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key + mutable RwLock m_OplogLock; + OidMap<IoHash> m_ChunkMap; // output data chunk id -> CAS address + OidMap<IoHash> m_MetaMap; // meta chunk id -> CAS address + OidMap<FileMapEntry> m_FileMap; // file id -> file map entry + int32_t m_ManifestVersion; // File system manifest version + tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file + OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key RefPtr<OplogStorage> m_Storage; std::string m_OplogId; @@ -280,6 +276,42 @@ public: ZenContentType AcceptType, IoBuffer& OutChunk); + std::pair<HttpResponseCode, std::string> PutChunk(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view Cid, + ZenContentType ContentType, + IoBuffer&& Chunk); + + std::pair<HttpResponseCode, std::string> WriteOplog(const std::string_view ProjectId, + const std::string_view OplogId, + IoBuffer&& Payload, + CbObject& OutResponse); + + std::pair<HttpResponseCode, std::string> ReadOplog(const std::string_view ProjectId, + const std::string_view OplogId, + const HttpServerRequest::QueryParams& Params, + CbObject& OutResponse); + + std::pair<HttpResponseCode, std::string> WriteBlock(const std::string_view ProjectId, + const std::string_view OplogId, + IoBuffer&& Payload); + + void Rpc(HttpServerRequest& HttpReq, + const std::string_view ProjectId, + const std::string_view OplogId, + IoBuffer&& Payload, + AuthMgr& AuthManager); + + std::pair<HttpResponseCode, std::string> Export(ProjectStore::Project& Project, + ProjectStore::Oplog& Oplog, + CbObjectView&& Params, + AuthMgr& AuthManager); + + std::pair<HttpResponseCode, std::string> Import(ProjectStore::Project& Project, + ProjectStore::Oplog& Oplog, + CbObjectView&& Params, + AuthMgr& AuthManager); + private: spdlog::logger& m_Log; CidStore& m_CidStore; @@ -312,7 +344,7 @@ private: class HttpProjectService : public HttpService { public: - HttpProjectService(CidStore& Store, ProjectStore* InProjectStore); + HttpProjectService(CidStore& Store, ProjectStore* InProjectStore, AuthMgr& AuthMgr); ~HttpProjectService(); virtual const char* BaseUri() const override; @@ -325,6 +357,7 @@ private: CidStore& m_CidStore; HttpRequestRouter m_Router; Ref<ProjectStore> m_ProjectStore; + AuthMgr& m_AuthMgr; }; void prj_forcelink(); diff --git a/zenserver/projectstore/remoteprojectstore.cpp b/zenserver/projectstore/remoteprojectstore.cpp new file mode 100644 index 000000000..1e6ca51a1 --- /dev/null +++ b/zenserver/projectstore/remoteprojectstore.cpp @@ -0,0 +1,1036 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "remoteprojectstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/timer.h> +#include <zencore/workthreadpool.h> +#include <zenstore/cidstore.h> + +namespace zen { + +/* + OplogContainer + Binary("ops") // Compressed CompactBinary object to hide attachment references, also makes the oplog smaller + { + CbArray("ops") + { + CbObject Op + (CbFieldType::BinaryAttachment Attachments[]) + (OpData) + } + } + CbArray("blocks") + CbObject + CbFieldType::BinaryAttachment "rawhash" // Optional, only if we are creating blocks (Jupiter/File) + CbArray("chunks") + CbFieldType::Hash // Chunk hashes + CbArray("chunks") // Optional, only if we are not creating blocks (Zen) + CbFieldType::BinaryAttachment // Chunk attachment hashes + + CompressedBinary ChunkBlock + { + VarUInt ChunkCount + VarUInt ChunkSizes[ChunkCount] + uint8_t[chunksize])[ChunkCount] + } +*/ + +////////////////////////////// AsyncRemoteResult + +struct AsyncRemoteResult +{ + void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText) + { + int32_t Expected = 0; + if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1)) + { + m_ErrorReason = ErrorReason; + m_ErrorText = ErrorText; + } + } + bool IsError() const { return m_ErrorCode.load() != 0; } + int GetError() const { return m_ErrorCode.load(); }; + const std::string& GetErrorReason() const { return m_ErrorReason; }; + const std::string& GetErrorText() const { return m_ErrorText; }; + RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const + { + return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText}; + } + +private: + std::atomic<int32_t> m_ErrorCode = 0; + std::string m_ErrorReason; + std::string m_ErrorText; +}; + +bool +IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) +{ + IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer(); + + MemoryView BlockView = BlockPayload.GetView(); + const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); + uint32_t NumberSize; + uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); + ReadPtr += NumberSize; + std::vector<uint64_t> ChunkSizes; + ChunkSizes.reserve(ChunkCount); + while (ChunkCount--) + { + ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize)); + ReadPtr += NumberSize; + } + ptrdiff_t TempBufferLength = std::distance(reinterpret_cast<const uint8_t*>(BlockView.GetData()), ReadPtr); + ZEN_ASSERT(TempBufferLength > 0); + for (uint64_t ChunkSize : ChunkSizes) + { + IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize); + IoHash AttachmentRawHash; + uint64_t AttachmentRawSize; + CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize); + + if (!CompressedChunk) + { + ZEN_ERROR("Invalid chunk in block"); + return false; + } + Visitor(std::move(CompressedChunk), AttachmentRawHash); + ReadPtr += ChunkSize; + ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd()); + } + return true; +}; + +CompressedBuffer +GenerateBlock(std::vector<SharedBuffer>&& Chunks) +{ + size_t ChunkCount = Chunks.size(); + SharedBuffer SizeBuffer; + { + IoBuffer TempBuffer(ChunkCount * 9); + MutableMemoryView View = TempBuffer.GetMutableView(); + uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData()); + uint8_t* BufferEndPtr = BufferStartPtr; + BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); + auto It = Chunks.begin(); + while (It != Chunks.end()) + { + BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(It->GetSize()), BufferEndPtr); + It++; + } + ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); + ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); + SizeBuffer = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength))); + } + CompositeBuffer AllBuffers(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks))); + + CompressedBuffer CompressedBlock = + CompressedBuffer::Compress(std::move(AllBuffers), OodleCompressor::Mermaid, OodleCompressionLevel::None); + + return CompressedBlock; +} + +struct Block +{ + IoHash BlockHash; + std::vector<IoHash> ChunksInBlock; +}; + +void +CreateBlock(WorkerThreadPool& WorkerPool, + Latch& OpSectionsLatch, + std::vector<SharedBuffer>&& ChunksInBlock, + RwLock& SectionsLock, + std::vector<Block>& Blocks, + size_t BlockIndex, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + AsyncRemoteResult& RemoteResult) +{ + OpSectionsLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable { + auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + if (!Chunks.empty()) + { + CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash + IoHash BlockHash = CompressedBlock.DecodeRawHash(); + AsyncOnBlock(std::move(CompressedBlock), BlockHash); + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope __(SectionsLock); + Blocks[BlockIndex].BlockHash = BlockHash; + } + } + }); +} + +size_t +AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks) +{ + size_t BlockIndex; + { + RwLock::ExclusiveLockScope _(BlocksLock); + BlockIndex = Blocks.size(); + Blocks.resize(BlockIndex + 1); + } + return BlockIndex; +} + +CbObject +BuildContainer(CidStore& ChunkStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + WorkerThreadPool& WorkerPool, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(const IoHash&)>& OnLargeAttachment, + const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, + AsyncRemoteResult& RemoteResult) +{ + using namespace std::literals; + + std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; + CbObjectWriter SectionOpsWriter; + SectionOpsWriter.BeginArray("ops"sv); + + size_t OpCount = 0; + + CbObject OplogContainerObject; + { + RwLock BlocksLock; + std::vector<Block> Blocks; + CompressedBuffer OpsBuffer; + + Latch BlockCreateLatch(1); + + std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; + + size_t BlockSize = 0; + std::vector<SharedBuffer> ChunksInBlock; + + std::unordered_set<IoHash, IoHash::Hasher> Attachments; + Oplog.IterateOplog([&Attachments, &SectionOpsWriter, &OpCount](CbObject Op) { + Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert(FieldView.AsAttachment()); }); + (SectionOpsWriter) << Op; + OpCount++; + }); + + for (const IoHash& AttachmentHash : Attachments) + { + IoBuffer Payload = ChunkStore.FindChunkByCid(AttachmentHash); + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {} for op", AttachmentHash), + {}); + ZEN_ERROR("Failed to build container ({}). Reason: '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()); + return {}; + } + uint64_t PayloadSize = Payload.GetSize(); + if (PayloadSize > MaxChunkEmbedSize) + { + if (LargeChunkHashes.insert(AttachmentHash).second) + { + OnLargeAttachment(AttachmentHash); + } + continue; + } + + if (!BlockAttachmentHashes.insert(AttachmentHash).second) + { + continue; + } + + BlockSize += PayloadSize; + if (BuildBlocks) + { + ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload))); + } + else + { + Payload = {}; + } + + if (BlockSize >= MaxBlockSize) + { + size_t BlockIndex = AddBlock(BlocksLock, Blocks); + if (BuildBlocks) + { + CreateBlock(WorkerPool, + BlockCreateLatch, + std::move(ChunksInBlock), + BlocksLock, + Blocks, + BlockIndex, + AsyncOnBlock, + RemoteResult); + } + else + { + OnBlockChunks(BlockAttachmentHashes); + } + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope _(BlocksLock); + Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), + BlockAttachmentHashes.begin(), + BlockAttachmentHashes.end()); + } + BlockAttachmentHashes.clear(); + ChunksInBlock.clear(); + BlockSize = 0; + } + } + if (BlockSize > 0) + { + size_t BlockIndex = AddBlock(BlocksLock, Blocks); + if (BuildBlocks) + { + CreateBlock(WorkerPool, + BlockCreateLatch, + std::move(ChunksInBlock), + BlocksLock, + Blocks, + BlockIndex, + AsyncOnBlock, + RemoteResult); + } + else + { + OnBlockChunks(BlockAttachmentHashes); + } + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope _(BlocksLock); + Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), + BlockAttachmentHashes.begin(), + BlockAttachmentHashes.end()); + } + BlockAttachmentHashes.clear(); + ChunksInBlock.clear(); + BlockSize = 0; + } + SectionOpsWriter.EndArray(); // "ops" + + CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); + ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize())); + + BlockCreateLatch.CountDown(); + while (!BlockCreateLatch.Wait(1000)) + { + ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining()); + } + + if (!RemoteResult.IsError()) + { + CbObjectWriter OplogContinerWriter; + RwLock::SharedLockScope _(BlocksLock); + OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); + + OplogContinerWriter.BeginArray("blocks"sv); + { + for (const Block& B : Blocks) + { + ZEN_ASSERT(!B.ChunksInBlock.empty()); + if (BuildBlocks) + { + ZEN_ASSERT(B.BlockHash != IoHash::Zero); + + OplogContinerWriter.BeginObject(); + { + OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& RawHash : B.ChunksInBlock) + { + OplogContinerWriter.AddHash(RawHash); + } + } + OplogContinerWriter.EndArray(); // "chunks" + } + OplogContinerWriter.EndObject(); + continue; + } + + ZEN_ASSERT(B.BlockHash == IoHash::Zero); + OplogContinerWriter.BeginObject(); + { + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& RawHash : B.ChunksInBlock) + { + OplogContinerWriter.AddBinaryAttachment(RawHash); + } + } + OplogContinerWriter.EndArray(); + } + OplogContinerWriter.EndObject(); + } + } + OplogContinerWriter.EndArray(); // "blocks"sv + + OplogContinerWriter.BeginArray("chunks"sv); + { + for (const IoHash& AttachmentHash : LargeChunkHashes) + { + OplogContinerWriter.AddBinaryAttachment(AttachmentHash); + } + } + OplogContinerWriter.EndArray(); // "chunks" + + OplogContainerObject = OplogContinerWriter.Save(); + } + } + return OplogContainerObject; +} + +RemoteProjectStore::LoadContainerResult +BuildContainer(CidStore& ChunkStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(const IoHash&)>& OnLargeAttachment, + const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks) +{ + // We are creating a worker thread pool here since we are uploading a lot of attachments in one go and we dont want to keep a + // WorkerThreadPool alive + size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); + WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); + + AsyncRemoteResult RemoteResult; + CbObject ContainerObject = BuildContainer(ChunkStore, + Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + BuildBlocks, + WorkerPool, + AsyncOnBlock, + OnLargeAttachment, + OnBlockChunks, + RemoteResult); + return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; +} + +RemoteProjectStore::Result +SaveOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + bool UseTempBlocks, + bool ForceUpload) +{ + using namespace std::literals; + + Stopwatch Timer; + + // We are creating a worker thread pool here since we are uploading a lot of attachments in one go + // Doing upload is a rare and transient occation so we don't want to keep a WorkerThreadPool alive. + size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); + WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); + + std::filesystem::path AttachmentTempPath; + if (UseTempBlocks) + { + AttachmentTempPath = Oplog.TempPath(); + AttachmentTempPath.append(".pending"); + CreateDirectories(AttachmentTempPath); + } + + AsyncRemoteResult RemoteResult; + RwLock AttachmentsLock; + std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks; + + auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, + const IoHash& BlockHash) { + std::filesystem::path BlockPath = AttachmentTempPath; + BlockPath.append(BlockHash.ToHexString()); + if (!std::filesystem::exists(BlockPath)) + { + IoBuffer BlockBuffer; + try + { + BasicFile BlockFile; + BlockFile.Open(BlockPath, BasicFile::Mode::kTruncateDelete); + uint64_t Offset = 0; + for (const SharedBuffer& Buffer : CompressedBlock.GetCompressed().GetSegments()) + { + BlockFile.Write(Buffer.GetView(), Offset); + Offset += Buffer.GetSize(); + } + void* FileHandle = BlockFile.Detach(); + BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset); + } + catch (std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + Ex.what(), + "Unable to create temp block file"); + return; + } + + BlockBuffer.MarkAsDeleteOnClose(); + { + RwLock::ExclusiveLockScope __(AttachmentsLock); + CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)}); + } + ZEN_DEBUG("Saved temp block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); + } + }; + + auto UploadBlock = [&RemoteStore, &RemoteResult](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { + RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); + return; + } + ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); + }; + + std::vector<std::vector<IoHash>> BlockChunks; + auto OnBlockChunks = [&BlockChunks](const std::unordered_set<IoHash, IoHash::Hasher>& Chunks) { + BlockChunks.push_back({Chunks.begin(), Chunks.end()}); + ZEN_DEBUG("Found {} block chunks", Chunks.size()); + }; + + auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments](const IoHash& AttachmentHash) { + { + RwLock::ExclusiveLockScope _(AttachmentsLock); + LargeAttachments.insert(AttachmentHash); + } + ZEN_DEBUG("Found attachment {}", AttachmentHash); + }; + + std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock; + if (UseTempBlocks) + { + OnBlock = MakeTempBlock; + } + else + { + OnBlock = UploadBlock; + } + + CbObject OplogContainerObject = BuildContainer(ChunkStore, + Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + BuildBlocks, + WorkerPool, + OnBlock, + OnLargeAttachment, + OnBlockChunks, + RemoteResult); + + if (!RemoteResult.IsError()) + { + uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); + uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); + ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount); + RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); + if (ContainerSaveResult.ErrorCode) + { + RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); + ZEN_ERROR("Failed to save oplog container ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); + } + ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000))); + if (!ContainerSaveResult.Needs.empty()) + { + ZEN_INFO("Filtering needed attachments..."); + std::vector<IoHash> NeededLargeAttachments; + std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments; + NeededLargeAttachments.reserve(LargeAttachments.size()); + NeededOtherAttachments.reserve(CreatedBlocks.size()); + if (ForceUpload) + { + NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end()); + } + else + { + for (const IoHash& RawHash : ContainerSaveResult.Needs) + { + if (LargeAttachments.contains(RawHash)) + { + NeededLargeAttachments.push_back(RawHash); + continue; + } + NeededOtherAttachments.insert(RawHash); + } + } + + Latch SaveAttachmentsLatch(1); + if (!NeededLargeAttachments.empty()) + { + ZEN_INFO("Saving large attachments..."); + for (const IoHash& RawHash : NeededLargeAttachments) + { + if (RemoteResult.IsError()) + { + break; + } + SaveAttachmentsLatch.AddCount(1); + WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + + IoBuffer Payload; + if (auto It = CreatedBlocks.find(RawHash); It != CreatedBlocks.end()) + { + Payload = std::move(It->second); + } + else + { + Payload = ChunkStore.FindChunkByCid(RawHash); + } + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_ERROR("Failed to build container ({}). Reason: '{}'", + RemoteResult.GetErrorReason(), + RemoteResult.GetError()); + return; + } + + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); + } + } + + if (!CreatedBlocks.empty()) + { + ZEN_INFO("Saving created block attachments..."); + for (auto& It : CreatedBlocks) + { + if (RemoteResult.IsError()) + { + break; + } + const IoHash& RawHash = It.first; + if (ForceUpload || NeededOtherAttachments.contains(RawHash)) + { + IoBuffer Payload = It.second; + ZEN_ASSERT(Payload); + SaveAttachmentsLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); + } + It.second = {}; + } + } + + if (!BlockChunks.empty()) + { + ZEN_INFO("Saving chunk block attachments..."); + for (const std::vector<IoHash>& Chunks : BlockChunks) + { + if (RemoteResult.IsError()) + { + break; + } + std::vector<IoHash> NeededChunks; + if (ForceUpload) + { + NeededChunks = Chunks; + } + else + { + NeededChunks.reserve(Chunks.size()); + for (const IoHash& Chunk : Chunks) + { + if (NeededOtherAttachments.contains(Chunk)) + { + NeededChunks.push_back(Chunk); + } + } + if (NeededChunks.empty()) + { + continue; + } + } + SaveAttachmentsLatch.AddCount(1); + WorkerPool.ScheduleWork([&RemoteStore, + &ChunkStore, + &SaveAttachmentsLatch, + &RemoteResult, + &Chunks, + NeededChunks = std::move(NeededChunks), + ForceUpload]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + std::vector<SharedBuffer> ChunkBuffers; + ChunkBuffers.reserve(NeededChunks.size()); + for (const IoHash& Chunk : NeededChunks) + { + IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk); + if (!ChunkPayload) + { + RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), + fmt::format("Missing chunk {}"sv, Chunk), + fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); + ChunkBuffers.clear(); + break; + } + ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload))); + } + RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachments with {} chunks ({}). Reason: '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Saved {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + }); + } + } + SaveAttachmentsLatch.CountDown(); + while (!SaveAttachmentsLatch.Wait(1000)) + { + ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining()); + } + SaveAttachmentsLatch.Wait(); + } + + if (!RemoteResult.IsError()) + { + ZEN_INFO("Finalizing oplog container..."); + RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); + if (ContainerFinalizeResult.ErrorCode) + { + RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); + ZEN_ERROR("Failed to finalize oplog container {} ({}). Reason: '{}'", + ContainerSaveResult.RawHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + } + ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000))); + } + } + + RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + ZEN_INFO("Saved oplog {} in {}", + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return Result; +}; + +RemoteProjectStore::Result +SaveOplogContainer(ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment) +{ + using namespace std::literals; + + Stopwatch Timer; + + CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); + for (CbFieldView LargeChunksField : LargeChunksArray) + { + IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); + if (HasAttachment(AttachmentHash)) + { + continue; + } + OnNeedAttachment(AttachmentHash); + }; + + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + for (CbFieldView BlockField : BlocksArray) + { + CbObjectView BlockView = BlockField.AsObjectView(); + IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); + + CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); + if (BlockHash == IoHash::Zero) + { + std::vector<IoHash> NeededChunks; + NeededChunks.reserve(ChunksArray.GetSize()); + for (CbFieldView ChunkField : ChunksArray) + { + IoHash ChunkHash = ChunkField.AsBinaryAttachment(); + if (HasAttachment(ChunkHash)) + { + continue; + } + NeededChunks.emplace_back(ChunkHash); + } + + if (!NeededChunks.empty()) + { + OnNeedBlock(IoHash::Zero, std::move(NeededChunks)); + } + continue; + } + + for (CbFieldView ChunkField : ChunksArray) + { + IoHash ChunkHash = ChunkField.AsHash(); + if (HasAttachment(ChunkHash)) + { + continue; + } + + OnNeedBlock(BlockHash, {}); + break; + } + }; + + MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); + IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); + IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); + + CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); + if (!SectionObject) + { + ZEN_ERROR("Failed to save oplog container. Reason: '{}'", "Section has unexpected data type"); + return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), + Timer.GetElapsedTimeMs() / 1000.500, + "Section has unexpected data type", + "Failed to save oplog container"}; + } + + CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); + for (CbFieldView OpEntry : OpsArray) + { + CbObjectView Core = OpEntry.AsObjectView(); + BinaryWriter Writer; + Core.CopyTo(Writer); + MemoryView OpView = Writer.GetView(); + IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); + CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); + const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Op); + if (OpLsn == ProjectStore::Oplog::kInvalidOp) + { + return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest), + Timer.GetElapsedTimeMs() / 1000.500, + "Failed saving op", + "Failed to save oplog container"}; + } + ZEN_DEBUG("oplog entry #{}", OpLsn); + } + return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; +} + +RemoteProjectStore::Result +LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload) +{ + using namespace std::literals; + + Stopwatch Timer; + + // We are creating a worker thread pool here since we are download a lot of attachments in one go and we dont want to keep a + // WorkerThreadPool alive + size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); + WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); + + std::unordered_set<IoHash, IoHash::Hasher> Attachments; + std::vector<std::vector<IoHash>> ChunksInBlocks; + + RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); + if (LoadContainerResult.ErrorCode) + { + ZEN_WARN("Failed to load oplog container, reason: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode); + return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, + .Reason = LoadContainerResult.Reason, + .Text = LoadContainerResult.Text}; + } + ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000))); + + AsyncRemoteResult RemoteResult; + Latch AttachmentsWorkLatch(1); + + auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { + return !ForceDownload && ChunkStore.ContainsChunk(RawHash); + }; + auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult]( + const IoHash& BlockHash, + std::vector<IoHash>&& Chunks) { + if (BlockHash == IoHash::Zero) + { + AttachmentsWorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + + RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to attachments with {} chunks ({}). Reason: '{}'", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + for (const auto& It : Result.Chunks) + { + ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); + } + }); + return; + } + AttachmentsWorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); + if (BlockResult.ErrorCode) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + ZEN_ERROR("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); + + if (!IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { + ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); + })) + { + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Invalid format for block {}", BlockHash), + {}); + ZEN_ERROR("Failed to load oplog container, attachment {} has invalid format ({}). Reason: '{}'", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + }); + }; + + auto OnNeedAttachment = + [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) { + if (!Attachments.insert(RawHash).second) + { + return; + } + + AttachmentsWorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() { + auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode); + return; + } + ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); + ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); + }); + }; + + RemoteProjectStore::Result Result = + SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + + AttachmentsWorkLatch.CountDown(); + while (!AttachmentsWorkLatch.Wait(1000)) + { + ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining()); + } + AttachmentsWorkLatch.Wait(); + if (Result.ErrorCode == 0) + { + Result = RemoteResult.ConvertResult(); + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + + ZEN_INFO("Loaded oplog {} in {}", + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0))); + + return Result; +} + +} // namespace zen diff --git a/zenserver/projectstore/remoteprojectstore.h b/zenserver/projectstore/remoteprojectstore.h new file mode 100644 index 000000000..dcabaedd4 --- /dev/null +++ b/zenserver/projectstore/remoteprojectstore.h @@ -0,0 +1,111 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "projectstore.h" + +#include <unordered_set> + +namespace zen { + +class CidStore; +class WorkerThreadPool; + +class RemoteProjectStore +{ +public: + struct Result + { + int32_t ErrorCode{}; + double ElapsedSeconds{}; + std::string Reason; + std::string Text; + }; + + struct SaveResult : public Result + { + std::unordered_set<IoHash, IoHash::Hasher> Needs; + IoHash RawHash; + }; + + struct SaveAttachmentResult : public Result + { + }; + + struct SaveAttachmentsResult : public Result + { + }; + + struct LoadAttachmentResult : public Result + { + IoBuffer Bytes; + }; + + struct LoadContainerResult : public Result + { + CbObject ContainerObject; + }; + + struct LoadAttachmentsResult : public Result + { + std::vector<std::pair<IoHash, CompressedBuffer>> Chunks; + }; + + struct RemoteStoreInfo + { + bool CreateBlocks; + bool UseTempBlockFiles; + std::string Description; + }; + + virtual ~RemoteProjectStore() {} + + virtual RemoteStoreInfo GetInfo() const = 0; + + virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) = 0; + virtual Result FinalizeContainer(const IoHash& RawHash) = 0; + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; + + virtual LoadContainerResult LoadContainer() = 0; + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0; + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0; +}; + +struct RemoteStoreOptions +{ + size_t MaxBlockSize = 128u * 1024u * 1024u; + size_t MaxChunkEmbedSize = 1024u * 1024u; +}; + +RemoteProjectStore::LoadContainerResult BuildContainer( + CidStore& ChunkStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, + const std::function<void(const IoHash&)>& OnLargeAttachment, + const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks); + +RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, + const CbObject& ContainerObject, + const std::function<bool(const IoHash& RawHash)>& HasAttachment, + const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, + const std::function<void(const IoHash& RawHash)>& OnNeedAttachment); + +RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool BuildBlocks, + bool UseTempBlocks, + bool ForceUpload); + +RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload); + +CompressedBuffer GenerateBlock(std::vector<SharedBuffer>&& Chunks); +bool IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); + +} // namespace zen diff --git a/zenserver/projectstore/zenremoteprojectstore.cpp b/zenserver/projectstore/zenremoteprojectstore.cpp new file mode 100644 index 000000000..6ff471ae5 --- /dev/null +++ b/zenserver/projectstore/zenremoteprojectstore.cpp @@ -0,0 +1,341 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenremoteprojectstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compositebuffer.h> +#include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/timer.h> +#include <zenhttp/httpshared.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class ZenRemoteStore : public RemoteProjectStore +{ +public: + ZenRemoteStore(std::string_view HostAddress, + std::string_view Project, + std::string_view Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize) + : m_HostAddress(HostAddress) + , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress)) + , m_Project(Project) + , m_Oplog(Oplog) + , m_MaxBlockSize(MaxBlockSize) + , m_MaxChunkEmbedSize(MaxChunkEmbedSize) + { + } + + virtual RemoteStoreInfo GetInfo() const override + { + return {.CreateBlocks = false, .UseTempBlockFiles = false, .Description = fmt::format("[zen] {}"sv, m_HostAddress)}; + } + + virtual SaveResult SaveContainer(const IoBuffer& Payload) override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/save"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + Session->SetUrl({SaveRequest}); + Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); + MemoryView Data(Payload.GetView()); + Session->SetBody({reinterpret_cast<const char*>(Data.GetData()), Data.GetSize()}); + cpr::Response Response = Session->Post(); + SaveResult Result = SaveResult{ConvertResult(Response)}; + + if (Result.ErrorCode) + { + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + IoBuffer ResponsePayload(IoBuffer::Wrap, Response.text.data(), Response.text.size()); + CbObject ResponseObject = LoadCompactBinaryObject(ResponsePayload); + if (!ResponseObject) + { + Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, + m_ProjectStoreUrl, + m_Project, + m_Oplog); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView(); + for (CbFieldView FieldView : NeedsArray) + { + IoHash ChunkHash = FieldView.AsHash(); + Result.Needs.insert(ChunkHash); + } + + Result.RawHash = IoHash::HashBuffer(Payload); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); + Session->SetUrl({SaveRequest}); + Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); + cpr::Response Response = Session->Post(); + SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override + { + Stopwatch Timer; + + CbPackage RequestPackage; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("method"sv, "putchunks"sv); + RequestWriter.BeginArray("chunks"sv); + { + for (const SharedBuffer& Chunk : Chunks) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, RawHash, RawSize); + RequestWriter.AddHash(RawHash); + RequestPackage.AddAttachment(CbAttachment(Compressed, RawHash)); + } + } + RequestWriter.EndArray(); // "chunks" + RequestPackage.SetObject(RequestWriter.Save()); + } + CompositeBuffer Payload = FormatPackageMessageBuffer(RequestPackage, FormatFlags::kDefault); + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + Session->SetUrl({SaveRequest}); + Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); + + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); + cpr::Response Response = Session->Post(); + SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + + CbObject Request; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("method"sv, "getchunks"sv); + RequestWriter.BeginArray("chunks"sv); + { + for (const IoHash& RawHash : RawHashes) + { + RequestWriter.AddHash(RawHash); + } + } + RequestWriter.EndArray(); // "chunks" + Request = RequestWriter.Save(); + } + IoBuffer Payload = Request.GetBuffer().AsIoBuffer(); + Session->SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()}); + Session->SetUrl(SaveRequest); + Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}, + {"Accept", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); + + cpr::Response Response = Session->Post(); + LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; + if (!Result.ErrorCode) + { + CbPackage Package = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size())); + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + Result.Chunks.reserve(Attachments.size()); + for (const CbAttachment& Attachment : Attachments) + { + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); + } + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + }; + + virtual Result FinalizeContainer(const IoHash&) override + { + Stopwatch Timer; + + RwLock::ExclusiveLockScope _(SessionsLock); + Sessions.clear(); + return {.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; + } + + virtual LoadContainerResult LoadContainer() override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + std::string SaveRequest = fmt::format("{}/{}/oplog/{}/load"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + Session->SetUrl(SaveRequest); + Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); + Session->SetParameters( + {{"maxblocksize", fmt::format("{}", m_MaxBlockSize)}, {"maxchunkembedsize", fmt::format("{}", m_MaxChunkEmbedSize)}}); + cpr::Response Response = Session->Get(); + + LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; + if (!Result.ErrorCode) + { + Result.ContainerObject = LoadCompactBinaryObject(IoBuffer(IoBuffer::Clone, Response.text.data(), Response.text.size())); + if (!Result.ContainerObject) + { + Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, + m_ProjectStoreUrl, + m_Project, + m_Oplog); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + Stopwatch Timer; + + std::unique_ptr<cpr::Session> Session(AllocateSession()); + auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); + + std::string LoadRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); + Session->SetUrl({LoadRequest}); + Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); + cpr::Response Response = Session->Get(); + LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; + if (!Result.ErrorCode) + { + Result.Bytes = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + } + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; + return Result; + } + +private: + std::unique_ptr<cpr::Session> AllocateSession() + { + RwLock::ExclusiveLockScope _(SessionsLock); + if (Sessions.empty()) + { + Sessions.emplace_back(std::make_unique<cpr::Session>()); + } + std::unique_ptr<cpr::Session> Session = std::move(Sessions.back()); + Sessions.pop_back(); + return Session; + } + + void ReleaseSession(std::unique_ptr<cpr::Session>&& Session) + { + RwLock::ExclusiveLockScope _(SessionsLock); + Sessions.emplace_back(std::move(Session)); + } + + static Result ConvertResult(const cpr::Response& Response) + { + std::string Text; + std::string Reason = Response.reason; + int32_t ErrorCode = 0; + if (Response.error.code != cpr::ErrorCode::OK) + { + ErrorCode = static_cast<int32_t>(Response.error.code); + if (!Response.error.message.empty()) + { + Reason = Response.error.message; + } + } + else if (!IsHttpSuccessCode(Response.status_code)) + { + ErrorCode = static_cast<int32_t>(Response.status_code); + + if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) + { + zen::HttpContentType ContentType = zen::ParseContentType(It->second); + if (ContentType == zen::HttpContentType::kText) + { + Text = Response.text; + } + } + + Reason = fmt::format("{}"sv, Response.status_code); + } + return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.elapsed, .Reason = Reason, .Text = Text}; + } + + RwLock SessionsLock; + std::vector<std::unique_ptr<cpr::Session>> Sessions; + + const std::string m_HostAddress; + const std::string m_ProjectStoreUrl; + const std::string m_Project; + const std::string m_Oplog; + const size_t m_MaxBlockSize; + const size_t m_MaxChunkEmbedSize; +}; + +std::unique_ptr<RemoteProjectStore> +CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) +{ + std::string Url = Options.Url; + if (Url.find("://"sv) == std::string::npos) + { + // Assume https URL + Url = fmt::format("http://{}"sv, Url); + } + std::unique_ptr<RemoteProjectStore> RemoteStore = + std::make_unique<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize); + return RemoteStore; +} + +} // namespace zen diff --git a/zenserver/projectstore/zenremoteprojectstore.h b/zenserver/projectstore/zenremoteprojectstore.h new file mode 100644 index 000000000..ef9dcad8c --- /dev/null +++ b/zenserver/projectstore/zenremoteprojectstore.h @@ -0,0 +1,18 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "remoteprojectstore.h" + +namespace zen { + +struct ZenRemoteStoreOptions : RemoteStoreOptions +{ + std::string Url; + std::string ProjectId; + std::string OplogId; +}; + +std::unique_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options); + +} // namespace zen diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 260b83355..dbb185bec 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -6,6 +6,7 @@ #include "diag/logging.h" #include <zencore/compactbinary.h> +#include <zencore/compositebuffer.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/string.h> @@ -437,6 +438,47 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K } CloudCacheResult +CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) +{ + ZEN_TRACE_CPU("HordeClient::PutCompressedBlob"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); + + cpr::Response Response = Session.Put(); + ZEN_DEBUG("PUT {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; +} + +CloudCacheResult CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) { ZEN_TRACE_CPU("HordeClient::PutObject"); diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 88ab77247..99e5c530f 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -104,6 +104,7 @@ public: PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob); CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object); FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index cf771c6de..9eae2761d 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -262,7 +262,7 @@ public: ZEN_INFO("instantiating project service"); m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager); - m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore}); + m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore, *m_AuthMgr}); #if ZEN_WITH_COMPUTE_SERVICES if (ServerOptions.ComputeServiceEnabled) |