aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/storage/projectstore/httpprojectstore.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-10-14 11:32:16 +0200
committerGitHub Enterprise <[email protected]>2025-10-14 11:32:16 +0200
commitca09abbeef5b1788f4a52b61eedd2f3dd07f81f2 (patch)
tree005a50adfddf6982bab3a06bb93d4c50da1a11fd /src/zenserver/storage/projectstore/httpprojectstore.cpp
parentmake asiohttp work without IPv6 (#562) (diff)
downloadzen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.tar.xz
zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.zip
move all storage-related services into storage tree (#571)
* move all storage-related services into storage tree * move config into config/ * also move admin service into storage since it mostly has storage related functionality * header consolidation
Diffstat (limited to 'src/zenserver/storage/projectstore/httpprojectstore.cpp')
-rw-r--r--src/zenserver/storage/projectstore/httpprojectstore.cpp3307
1 files changed, 3307 insertions, 0 deletions
diff --git a/src/zenserver/storage/projectstore/httpprojectstore.cpp b/src/zenserver/storage/projectstore/httpprojectstore.cpp
new file mode 100644
index 000000000..1c6b5d6b0
--- /dev/null
+++ b/src/zenserver/storage/projectstore/httpprojectstore.cpp
@@ -0,0 +1,3307 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "httpprojectstore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryutil.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/memory/llm.h>
+#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
+#include <zencore/trace.h>
+#include <zenhttp/packageformat.h>
+#include <zenremotestore/projectstore/buildsremoteprojectstore.h>
+#include <zenremotestore/projectstore/fileremoteprojectstore.h>
+#include <zenremotestore/projectstore/jupiterremoteprojectstore.h>
+#include <zenremotestore/projectstore/remoteprojectstore.h>
+#include <zenremotestore/projectstore/zenremoteprojectstore.h>
+#include <zenstore/oplogreferencedset.h>
+#include <zenstore/projectstore.h>
+#include <zenstore/zenstore.h>
+#include <zenutil/openprocesscache.h>
+#include <zenutil/workerpools.h>
+
+namespace zen {
+
+const FLLMTag&
+GetProjectHttpTag()
+{
+ static FLLMTag _("http", FLLMTag("project"));
+
+ return _;
+}
+
+void
+CSVHeader(bool Details, bool AttachmentDetails, StringBuilderBase& CSVWriter)
+{
+ if (AttachmentDetails)
+ {
+ CSVWriter << "Project, Oplog, LSN, Key, Cid, Size";
+ }
+ else if (Details)
+ {
+ CSVWriter << "Project, Oplog, LSN, Key, Size, AttachmentCount, AttachmentsSize";
+ }
+ else
+ {
+ CSVWriter << "Project, Oplog, Key";
+ }
+}
+
+void
+CSVWriteOp(CidStore& CidStore,
+ std::string_view ProjectId,
+ std::string_view OplogId,
+ bool Details,
+ bool AttachmentDetails,
+ ProjectStore::LogSequenceNumber LSN,
+ const Oid& Key,
+ CbObjectView Op,
+ StringBuilderBase& CSVWriter)
+{
+ StringBuilder<32> KeyStringBuilder;
+ Key.ToString(KeyStringBuilder);
+ const std::string_view KeyString = KeyStringBuilder.ToView();
+
+ if (AttachmentDetails)
+ {
+ Op.IterateAttachments([&CidStore, &CSVWriter, &ProjectId, &OplogId, LSN, &KeyString](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsAttachment();
+ IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash);
+ CSVWriter << "\r\n"
+ << ProjectId << ", " << OplogId << ", " << LSN.Number << ", " << KeyString << ", " << AttachmentHash.ToHexString()
+ << ", " << gsl::narrow<uint64_t>(Attachment.GetSize());
+ });
+ }
+ else if (Details)
+ {
+ uint64_t AttachmentCount = 0;
+ size_t AttachmentsSize = 0;
+ Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsAttachment();
+ AttachmentCount++;
+ IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash);
+ AttachmentsSize += Attachment.GetSize();
+ });
+ CSVWriter << "\r\n"
+ << ProjectId << ", " << OplogId << ", " << LSN.Number << ", " << KeyString << ", " << gsl::narrow<uint64_t>(Op.GetSize())
+ << ", " << AttachmentCount << ", " << gsl::narrow<uint64_t>(AttachmentsSize);
+ }
+ else
+ {
+ CSVWriter << "\r\n" << ProjectId << ", " << OplogId << ", " << KeyString;
+ }
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+ void CbWriteOp(CidStore& CidStore,
+ bool Details,
+ bool OpDetails,
+ bool AttachmentDetails,
+ ProjectStore::LogSequenceNumber LSN,
+ const Oid& Key,
+ CbObjectView Op,
+ CbObjectWriter& CbWriter)
+ {
+ CbWriter.BeginObject();
+ {
+ CbWriter.AddObjectId("key", Key);
+ if (Details)
+ {
+ CbWriter.AddInteger("lsn", LSN.Number);
+ CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Op.GetSize()));
+ }
+ if (AttachmentDetails)
+ {
+ CbWriter.BeginArray("attachments");
+ Op.IterateAttachments([&CidStore, &CbWriter](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsAttachment();
+ CbWriter.BeginObject();
+ {
+ IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash);
+ CbWriter.AddString("cid", AttachmentHash.ToHexString());
+ CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Attachment.GetSize()));
+ }
+ CbWriter.EndObject();
+ });
+ CbWriter.EndArray();
+ }
+ else if (Details)
+ {
+ uint64_t AttachmentCount = 0;
+ size_t AttachmentsSize = 0;
+ Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsAttachment();
+ AttachmentCount++;
+ IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash);
+ AttachmentsSize += Attachment.GetSize();
+ });
+ if (AttachmentCount > 0)
+ {
+ CbWriter.AddInteger("attachments", AttachmentCount);
+ CbWriter.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize));
+ }
+ }
+ if (OpDetails)
+ {
+ CbWriter.BeginObject("op");
+ for (const CbFieldView& Field : Op)
+ {
+ if (!Field.HasName())
+ {
+ CbWriter.AddField(Field);
+ continue;
+ }
+ std::string_view FieldName = Field.GetName();
+ CbWriter.AddField(FieldName, Field);
+ }
+ CbWriter.EndObject();
+ }
+ }
+ CbWriter.EndObject();
+ };
+
+ void CbWriteOplogOps(CidStore& CidStore,
+ ProjectStore::Oplog& Oplog,
+ bool Details,
+ bool OpDetails,
+ bool AttachmentDetails,
+ CbObjectWriter& Cbo)
+ {
+ Cbo.BeginArray("ops");
+ {
+ Oplog.IterateOplogWithKey([&Cbo, &CidStore, Details, OpDetails, AttachmentDetails](ProjectStore::LogSequenceNumber LSN,
+ const Oid& Key,
+ CbObjectView Op) {
+ CbWriteOp(CidStore, Details, OpDetails, AttachmentDetails, LSN, Key, Op, Cbo);
+ });
+ }
+ Cbo.EndArray();
+ }
+
+ void CbWriteOplog(CidStore& CidStore,
+ ProjectStore::Oplog& Oplog,
+ bool Details,
+ bool OpDetails,
+ bool AttachmentDetails,
+ CbObjectWriter& Cbo)
+ {
+ Cbo.BeginObject();
+ {
+ Cbo.AddString("name", Oplog.OplogId());
+ CbWriteOplogOps(CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo);
+ }
+ Cbo.EndObject();
+ }
+
+ void CbWriteOplogs(CidStore& CidStore,
+ ProjectStore::Project& Project,
+ std::vector<std::string> OpLogs,
+ bool Details,
+ bool OpDetails,
+ bool AttachmentDetails,
+ CbObjectWriter& Cbo)
+ {
+ Cbo.BeginArray("oplogs");
+ {
+ for (const std::string& OpLogId : OpLogs)
+ {
+ Ref<ProjectStore::Oplog> Oplog = Project.OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true);
+ if (Oplog)
+ {
+ CbWriteOplog(CidStore, *Oplog, Details, OpDetails, AttachmentDetails, Cbo);
+ }
+ }
+ }
+ Cbo.EndArray();
+ }
+
+ void CbWriteProject(CidStore& CidStore,
+ ProjectStore::Project& Project,
+ std::vector<std::string> OpLogs,
+ bool Details,
+ bool OpDetails,
+ bool AttachmentDetails,
+ CbObjectWriter& Cbo)
+ {
+ Cbo.BeginObject();
+ {
+ Cbo.AddString("name", Project.Identifier);
+ CbWriteOplogs(CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
+ }
+ Cbo.EndObject();
+ }
+
+ struct CreateRemoteStoreResult
+ {
+ std::shared_ptr<RemoteProjectStore> Store;
+ std::string Description;
+ };
+
+ CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params,
+ AuthMgr& AuthManager,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ const std::filesystem::path& TempFilePath)
+ {
+ ZEN_MEMSCOPE(GetProjectHttpTag());
+
+ using namespace std::literals;
+
+ std::shared_ptr<RemoteProjectStore> RemoteStore;
+
+ if (CbObjectView File = Params["file"sv].AsObjectView(); File)
+ {
+ std::filesystem::path FolderPath(File["path"sv].AsString());
+ if (FolderPath.empty())
+ {
+ return {nullptr, "Missing file path"};
+ }
+ std::string_view Name(File["name"sv].AsString());
+ if (Name.empty())
+ {
+ return {nullptr, "Missing file name"};
+ }
+ std::string_view OptionalBaseName(File["basename"sv].AsString());
+ bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false);
+ bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false);
+
+ FileRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ FolderPath,
+ std::string(Name),
+ std::string(OptionalBaseName),
+ ForceDisableBlocks,
+ ForceEnableTempBlocks};
+ RemoteStore = CreateFileRemoteStore(Options);
+ }
+
+ if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud)
+ {
+ std::string_view CloudServiceUrl = Cloud["url"sv].AsString();
+ if (CloudServiceUrl.empty())
+ {
+ return {nullptr, "Missing service url"};
+ }
+
+ std::string Url = UrlDecode(CloudServiceUrl);
+ std::string_view Namespace = Cloud["namespace"sv].AsString();
+ if (Namespace.empty())
+ {
+ return {nullptr, "Missing namespace"};
+ }
+ std::string_view Bucket = Cloud["bucket"sv].AsString();
+ if (Bucket.empty())
+ {
+ return {nullptr, "Missing bucket"};
+ }
+ std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString();
+ std::string AccessToken = std::string(Cloud["access-token"sv].AsString());
+ if (AccessToken.empty())
+ {
+ std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString();
+ if (!AccessTokenEnvVariable.empty())
+ {
+ AccessToken = GetEnvVariable(AccessTokenEnvVariable);
+ }
+ }
+ std::filesystem::path OidcExePath;
+ if (std::string_view OidcExePathString = Cloud["oidc-exe-path"].AsString(); !OidcExePathString.empty())
+ {
+ std::filesystem::path OidcExePathMaybe(OidcExePathString);
+ if (IsFile(OidcExePathMaybe))
+ {
+ OidcExePath = std::move(OidcExePathMaybe);
+ }
+ else
+ {
+ ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString);
+ }
+ }
+ std::string_view KeyParam = Cloud["key"sv].AsString();
+ if (KeyParam.empty())
+ {
+ return {nullptr, "Missing key"};
+ }
+ if (KeyParam.length() != IoHash::StringLength)
+ {
+ return {nullptr, "Invalid key"};
+ }
+ IoHash Key = IoHash::FromHexString(KeyParam);
+ if (Key == IoHash::Zero)
+ {
+ return {nullptr, "Invalid key string"};
+ }
+ IoHash BaseKey = IoHash::Zero;
+ std::string_view BaseKeyParam = Cloud["basekey"sv].AsString();
+ if (!BaseKeyParam.empty())
+ {
+ if (BaseKeyParam.length() != IoHash::StringLength)
+ {
+ return {nullptr, "Invalid base key"};
+ }
+ BaseKey = IoHash::FromHexString(BaseKeyParam);
+ if (BaseKey == IoHash::Zero)
+ {
+ return {nullptr, "Invalid base key string"};
+ }
+ }
+
+ bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false);
+ bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false);
+ bool AssumeHttp2 = Cloud["assumehttp2"sv].AsBool(false);
+
+ JupiterRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ Url,
+ std::string(Namespace),
+ std::string(Bucket),
+ Key,
+ BaseKey,
+ std::string(OpenIdProvider),
+ AccessToken,
+ AuthManager,
+ OidcExePath,
+ ForceDisableBlocks,
+ ForceDisableTempBlocks,
+ AssumeHttp2};
+ RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true);
+ }
+
+ if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen)
+ {
+ std::string_view Url = Zen["url"sv].AsString();
+ std::string_view Project = Zen["project"sv].AsString();
+ if (Project.empty())
+ {
+ return {nullptr, "Missing project"};
+ }
+ std::string_view Oplog = Zen["oplog"sv].AsString();
+ if (Oplog.empty())
+ {
+ return {nullptr, "Missing oplog"};
+ }
+ ZenRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ std::string(Url),
+ std::string(Project),
+ std::string(Oplog)};
+ RemoteStore = CreateZenRemoteStore(Options, TempFilePath);
+ }
+
+ if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds)
+ {
+ std::string_view BuildsServiceUrl = Builds["url"sv].AsString();
+ if (BuildsServiceUrl.empty())
+ {
+ return {nullptr, "Missing service url"};
+ }
+
+ std::string Url = UrlDecode(BuildsServiceUrl);
+ std::string_view Namespace = Builds["namespace"sv].AsString();
+ if (Namespace.empty())
+ {
+ return {nullptr, "Missing namespace"};
+ }
+ std::string_view Bucket = Builds["bucket"sv].AsString();
+ if (Bucket.empty())
+ {
+ return {nullptr, "Missing bucket"};
+ }
+ std::string_view OpenIdProvider = Builds["openid-provider"sv].AsString();
+ std::string AccessToken = std::string(Builds["access-token"sv].AsString());
+ if (AccessToken.empty())
+ {
+ std::string_view AccessTokenEnvVariable = Builds["access-token-env"].AsString();
+ if (!AccessTokenEnvVariable.empty())
+ {
+ AccessToken = GetEnvVariable(AccessTokenEnvVariable);
+ }
+ }
+ std::filesystem::path OidcExePath;
+ if (std::string_view OidcExePathString = Builds["oidc-exe-path"].AsString(); !OidcExePathString.empty())
+ {
+ std::filesystem::path OidcExePathMaybe(OidcExePathString);
+ if (IsFile(OidcExePathMaybe))
+ {
+ OidcExePath = std::move(OidcExePathMaybe);
+ }
+ else
+ {
+ ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString);
+ }
+ }
+ std::string_view BuildIdParam = Builds["buildsid"sv].AsString();
+ if (BuildIdParam.empty())
+ {
+ return {nullptr, "Missing build id"};
+ }
+ if (BuildIdParam.length() != Oid::StringLength)
+ {
+ return {nullptr, "Invalid build id"};
+ }
+ Oid BuildId = Oid::FromHexString(BuildIdParam);
+ if (BuildId == Oid::Zero)
+ {
+ return {nullptr, "Invalid build id string"};
+ }
+
+ bool ForceDisableBlocks = Builds["disableblocks"sv].AsBool(false);
+ bool ForceDisableTempBlocks = Builds["disabletempblocks"sv].AsBool(false);
+ bool AssumeHttp2 = Builds["assumehttp2"sv].AsBool(false);
+
+ MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView();
+ IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize());
+
+ BuildsRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ Url,
+ std::string(Namespace),
+ std::string(Bucket),
+ BuildId,
+ std::string(OpenIdProvider),
+ AccessToken,
+ AuthManager,
+ OidcExePath,
+ ForceDisableBlocks,
+ ForceDisableTempBlocks,
+ AssumeHttp2,
+ MetaData};
+ RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true);
+ }
+
+ if (!RemoteStore)
+ {
+ return {nullptr, "Unknown remote store type"};
+ }
+
+ return {std::move(RemoteStore), ""};
+ }
+
+ std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result)
+ {
+ if (Result.ErrorCode == 0)
+ {
+ return {HttpResponseCode::OK, Result.Text};
+ }
+ return {static_cast<HttpResponseCode>(Result.ErrorCode),
+ Result.Reason.empty() ? Result.Text
+ : Result.Text.empty() ? Result.Reason
+ : fmt::format("{}: {}", Result.Reason, Result.Text)};
+ }
+
+} // namespace
+
+//////////////////////////////////////////////////////////////////////////
+
+HttpProjectService::HttpProjectService(CidStore& Store,
+ ProjectStore* Projects,
+ HttpStatusService& StatusService,
+ HttpStatsService& StatsService,
+ AuthMgr& AuthMgr,
+ OpenProcessCache& InOpenProcessCache,
+ JobQueue& InJobQueue)
+: m_Log(logging::Get("project"))
+, m_CidStore(Store)
+, m_ProjectStore(Projects)
+, m_StatusService(StatusService)
+, m_StatsService(StatsService)
+, m_AuthMgr(AuthMgr)
+, m_OpenProcessCache(InOpenProcessCache)
+, m_JobQueue(InJobQueue)
+{
+ ZEN_MEMSCOPE(GetProjectHttpTag());
+
+ using namespace std::literals;
+
+ m_Router.AddPattern("project", "([[:alnum:]_.]+)");
+ m_Router.AddPattern("log", "([[:alnum:]_.]+)");
+ m_Router.AddPattern("op", "([[:digit:]]+?)");
+ m_Router.AddPattern("chunk", "([[:xdigit:]]{24})");
+ m_Router.AddPattern("hash", "([[:xdigit:]]{40})");
+
+ m_Router.RegisterRoute(
+ "",
+ [this](HttpRouterRequest& Req) { HandleProjectListRequest(Req); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "list",
+ [this](HttpRouterRequest& Req) { HandleProjectListRequest(Req); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/batch",
+ [this](HttpRouterRequest& Req) { HandleChunkBatchRequest(Req); },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/files",
+ [this](HttpRouterRequest& Req) { HandleFilesRequest(Req); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/chunkinfos",
+ [this](HttpRouterRequest& Req) { HandleChunkInfosRequest(Req); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/{chunk}/info",
+ [this](HttpRouterRequest& Req) { HandleChunkInfoRequest(Req); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/{chunk}",
+ [this](HttpRouterRequest& Req) { HandleChunkByIdRequest(Req); },
+ HttpVerb::kGet | HttpVerb::kHead);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/{hash}",
+ [this](HttpRouterRequest& Req) { HandleChunkByCidRequest(Req); },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/prep",
+ [this](HttpRouterRequest& Req) { HandleOplogOpPrepRequest(Req); },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/new",
+ [this](HttpRouterRequest& Req) { HandleOplogOpNewRequest(Req); },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/validate",
+ [this](HttpRouterRequest& Req) { HandleOplogValidateRequest(Req); },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/{op}",
+ [this](HttpRouterRequest& Req) { HandleOpLogOpRequest(Req); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}",
+ [this](HttpRouterRequest& Req) { HandleOpLogRequest(Req); },
+ HttpVerb::kGet | HttpVerb::kPut | HttpVerb::kPost | HttpVerb::kDelete);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/entries",
+ [this](HttpRouterRequest& Req) { HandleOpLogEntriesRequest(Req); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}",
+ [this](HttpRouterRequest& Req) { HandleProjectRequest(Req); },
+ HttpVerb::kGet | HttpVerb::kPut | HttpVerb::kPost | HttpVerb::kDelete);
+
+ // Push a oplog container
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/save",
+ [this](HttpRouterRequest& Req) { HandleOplogSaveRequest(Req); },
+ HttpVerb::kPost);
+
+ // Pull a oplog container
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/load",
+ [this](HttpRouterRequest& Req) { HandleOplogLoadRequest(Req); },
+ HttpVerb::kGet);
+
+ // Do an rpc style operation on project/oplog
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/rpc",
+ [this](HttpRouterRequest& Req) { HandleRpcRequest(Req); },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "details\\$",
+ [this](HttpRouterRequest& Req) { HandleDetailsRequest(Req); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "details\\$/{project}",
+ [this](HttpRouterRequest& Req) { HandleProjectDetailsRequest(Req); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "details\\$/{project}/{log}",
+ [this](HttpRouterRequest& Req) { HandleOplogDetailsRequest(Req); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "details\\$/{project}/{log}/{chunk}",
+ [this](HttpRouterRequest& Req) { HandleOplogOpDetailsRequest(Req); },
+ HttpVerb::kGet);
+
+ m_StatusService.RegisterHandler("prj", *this);
+ m_StatsService.RegisterHandler("prj", *this);
+}
+
+HttpProjectService::~HttpProjectService()
+{
+ m_StatsService.UnregisterHandler("prj", *this);
+ m_StatusService.UnregisterHandler("prj", *this);
+}
+
+const char*
+HttpProjectService::BaseUri() const
+{
+ return "/prj/";
+}
+
+void
+HttpProjectService::HandleRequest(HttpServerRequest& Request)
+{
+ m_ProjectStats.RequestCount++;
+
+ ZEN_MEMSCOPE(GetProjectHttpTag());
+
+ metrics::OperationTiming::Scope $(m_HttpRequests);
+
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ m_ProjectStats.BadRequestCount++;
+ ZEN_WARN("No route found for {0}", Request.RelativeUri());
+ }
+}
+
+void
+HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq)
+{
+ ZEN_TRACE_CPU("ProjectService::Stats");
+
+ const GcStorageSize StoreSize = m_ProjectStore->StorageSize();
+ const CidStoreSize CidSize = m_CidStore.TotalSize();
+
+ CbObjectWriter Cbo;
+
+ EmitSnapshot("requests", m_HttpRequests, Cbo);
+
+ Cbo.BeginObject("store");
+ {
+ Cbo.BeginObject("size");
+ {
+ Cbo << "disk" << StoreSize.DiskSize;
+ Cbo << "memory" << StoreSize.MemorySize;
+ }
+ Cbo.EndObject();
+
+ Cbo.BeginObject("project");
+ {
+ Cbo << "readcount" << m_ProjectStats.ProjectReadCount << "writecount" << m_ProjectStats.ProjectWriteCount << "deletecount"
+ << m_ProjectStats.ProjectDeleteCount;
+ }
+ Cbo.EndObject();
+
+ Cbo.BeginObject("oplog");
+ {
+ Cbo << "readcount" << m_ProjectStats.OpLogReadCount << "writecount" << m_ProjectStats.OpLogWriteCount << "deletecount"
+ << m_ProjectStats.OpLogDeleteCount;
+ }
+ Cbo.EndObject();
+
+ Cbo.BeginObject("op");
+ {
+ Cbo << "hitcount" << m_ProjectStats.OpHitCount << "misscount" << m_ProjectStats.OpMissCount << "writecount"
+ << m_ProjectStats.OpWriteCount;
+ }
+ Cbo.EndObject();
+
+ Cbo.BeginObject("chunk");
+ {
+ Cbo << "hitcount" << m_ProjectStats.ChunkHitCount << "misscount" << m_ProjectStats.ChunkMissCount << "writecount"
+ << m_ProjectStats.ChunkWriteCount;
+ }
+ Cbo.EndObject();
+
+ Cbo << "requestcount" << m_ProjectStats.RequestCount;
+ Cbo << "badrequestcount" << m_ProjectStats.BadRequestCount;
+ }
+ Cbo.EndObject();
+
+ Cbo.BeginObject("cid");
+ {
+ Cbo.BeginObject("size");
+ {
+ Cbo << "tiny" << CidSize.TinySize;
+ Cbo << "small" << CidSize.SmallSize;
+ Cbo << "large" << CidSize.LargeSize;
+ Cbo << "total" << CidSize.TotalSize;
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndObject();
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+void
+HttpProjectService::HandleStatusRequest(HttpServerRequest& Request)
+{
+ ZEN_TRACE_CPU("HttpProjectService::Status");
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+void
+HttpProjectService::HandleProjectListRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::ProjectList");
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ CbArray ProjectsList = m_ProjectStore->GetProjectsList();
+ HttpReq.WriteResponse(HttpResponseCode::OK, ProjectsList);
+}
+
+void
+HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::ChunkBatch");
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchOplog(OplogId);
+
+ // Parse Request
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ BinaryReader Reader(Payload);
+
+ struct RequestHeader
+ {
+ enum
+ {
+ kMagic = 0xAAAA'77AC
+ };
+ uint32_t Magic;
+ uint32_t ChunkCount;
+ uint32_t Reserved1;
+ uint32_t Reserved2;
+ };
+
+ struct RequestChunkEntry
+ {
+ Oid ChunkId;
+ uint32_t CorrelationId;
+ uint64_t Offset;
+ uint64_t RequestBytes;
+ };
+
+ if (Payload.Size() <= sizeof(RequestHeader))
+ {
+ m_ProjectStats.BadRequestCount++;
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ RequestHeader RequestHdr;
+ Reader.Read(&RequestHdr, sizeof RequestHdr);
+
+ if (RequestHdr.Magic != RequestHeader::kMagic)
+ {
+ m_ProjectStats.BadRequestCount++;
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ std::vector<RequestChunkEntry> RequestedChunks;
+ RequestedChunks.resize(RequestHdr.ChunkCount);
+ Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount);
+
+ // Make Response
+
+ struct ResponseHeader
+ {
+ uint32_t Magic = 0xbada'b00f;
+ uint32_t ChunkCount;
+ uint32_t Reserved1 = 0;
+ uint32_t Reserved2 = 0;
+ };
+
+ struct ResponseChunkEntry
+ {
+ uint32_t CorrelationId;
+ uint32_t Flags = 0;
+ uint64_t ChunkSize;
+ };
+
+ std::vector<IoBuffer> OutBlobs;
+ OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry));
+ for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex)
+ {
+ const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex];
+ IoBuffer FoundChunk = FoundLog->FindChunk(Project->RootDir, RequestedChunk.ChunkId, nullptr);
+ if (FoundChunk)
+ {
+ if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1))
+ {
+ uint64_t Offset = RequestedChunk.Offset;
+ if (Offset > FoundChunk.Size())
+ {
+ Offset = FoundChunk.Size();
+ }
+ uint64_t Size = RequestedChunk.RequestBytes;
+ if ((Offset + Size) > FoundChunk.Size())
+ {
+ Size = FoundChunk.Size() - Offset;
+ }
+ FoundChunk = IoBuffer(FoundChunk, Offset, Size);
+ }
+ }
+ OutBlobs.emplace_back(std::move(FoundChunk));
+ }
+ uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData());
+ ResponseHeader ResponseHdr;
+ ResponseHdr.ChunkCount = RequestHdr.ChunkCount;
+ memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr));
+ ResponsePtr += sizeof(ResponseHdr);
+ for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex)
+ {
+ const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex];
+ const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]);
+ ResponseChunkEntry ResponseChunk;
+ ResponseChunk.CorrelationId = RequestedChunk.CorrelationId;
+ if (FoundChunk)
+ {
+ ResponseChunk.ChunkSize = FoundChunk.Size();
+ m_ProjectStats.ChunkHitCount++;
+ }
+ else
+ {
+ ResponseChunk.ChunkSize = uint64_t(-1);
+ m_ProjectStats.ChunkMissCount++;
+ }
+ memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk));
+ ResponsePtr += sizeof(ResponseChunk);
+ }
+ std::erase_if(OutBlobs, [](IoBuffer Buffer) -> bool { return !Buffer; });
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs);
+}
+
+void
+HttpProjectService::HandleFilesRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::Files");
+
+ using namespace std::literals;
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ // File manifest fetch, returns the client file list
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ std::unordered_set<std::string> WantedFieldNames;
+ if (auto FieldFilter = HttpServerRequest::Decode(Params.GetValue("fieldnames")); !FieldFilter.empty())
+ {
+ if (FieldFilter != "*") // Get all - empty FieldFilter equal getting all fields
+ {
+ ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) {
+ WantedFieldNames.insert(std::string(FieldName));
+ return true;
+ });
+ }
+ }
+ else
+ {
+ const bool FilterClient = Params.GetValue("filter"sv) == "client"sv;
+ WantedFieldNames.insert("id");
+ WantedFieldNames.insert("clientpath");
+ if (!FilterClient)
+ {
+ WantedFieldNames.insert("serverpath");
+ }
+ }
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Project files request for unknown project '{}'", ProjectId));
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Project files for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+ Project->TouchOplog(OplogId);
+
+ CbObject ResponsePayload = ProjectStore::GetProjectFiles(Log(), *Project, *FoundLog, WantedFieldNames);
+
+ if (HttpReq.AcceptContentType() == HttpContentType::kCompressedBinary)
+ {
+ CompositeBuffer Payload = CompressedBuffer::Compress(ResponsePayload.GetBuffer()).GetCompressed();
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, Payload);
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload);
+ }
+}
+
+void
+HttpProjectService::HandleChunkInfosRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::ChunkInfos");
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ std::unordered_set<std::string> WantedFieldNames;
+ if (auto FieldFilter = HttpServerRequest::Decode(Params.GetValue("fieldnames")); !FieldFilter.empty())
+ {
+ if (FieldFilter != "*") // Get all - empty FieldFilter equal getting all fields
+ {
+ ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) {
+ WantedFieldNames.insert(std::string(FieldName));
+ return true;
+ });
+ }
+ }
+ else
+ {
+ WantedFieldNames.insert("id");
+ WantedFieldNames.insert("rawhash");
+ WantedFieldNames.insert("rawsize");
+ }
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Chunk infos request for unknown project '{}'", ProjectId));
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Chunk infos for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+ Project->TouchOplog(OplogId);
+
+ CbObject ResponsePayload = ProjectStore::GetProjectChunkInfos(Log(), *Project, *FoundLog, WantedFieldNames);
+ if (HttpReq.AcceptContentType() == HttpContentType::kCompressedBinary)
+ {
+ CompositeBuffer Payload = CompressedBuffer::Compress(ResponsePayload.GetBuffer()).GetCompressed();
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, Payload);
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload);
+ }
+}
+
+void
+HttpProjectService::HandleChunkInfoRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::ChunkInfo");
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ const auto& ChunkId = Req.GetCapture(3);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Chunk info request for unknown project '{}'", ProjectId));
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Chunk info for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+ Project->TouchOplog(OplogId);
+
+ if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId));
+ }
+
+ const Oid Obj = Oid::FromHexString(ChunkId);
+
+ CbObject ResponsePayload = ProjectStore::GetChunkInfo(Log(), *Project, *FoundLog, Obj);
+ if (ResponsePayload)
+ {
+ m_ProjectStats.ChunkHitCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload);
+ }
+ else
+ {
+ m_ProjectStats.ChunkMissCount++;
+ ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Chunk info for unknown chunk '{}/{}/{}'", ProjectId, OplogId, ChunkId));
+ }
+}
+
+void
+HttpProjectService::HandleChunkByIdRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::ChunkById");
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ const auto& ChunkId = Req.GetCapture(3);
+
+ uint64_t Offset = 0;
+ uint64_t Size = ~(0ull);
+
+ auto QueryParms = HttpReq.GetQueryParams();
+
+ if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false)
+ {
+ if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm))
+ {
+ Offset = OffsetVal.value();
+ }
+ else
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ }
+
+ if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false)
+ {
+ if (auto SizeVal = ParseInt<uint64_t>(SizeParm))
+ {
+ Size = SizeVal.value();
+ }
+ else
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ }
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Chunk request for unknown project '{}'", ProjectId));
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+ Project->TouchOplog(OplogId);
+
+ if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, ChunkId));
+ }
+
+ const Oid Obj = Oid::FromHexString(ChunkId);
+
+ HttpContentType AcceptType = HttpReq.AcceptContentType();
+
+ ProjectStore::GetChunkRangeResult Result =
+ ProjectStore::GetChunkRange(Log(), *Project, *FoundLog, Obj, Offset, Size, AcceptType, /*OptionalInOutModificationTag*/ nullptr);
+
+ switch (Result.Error)
+ {
+ case ProjectStore::GetChunkRangeResult::EError::Ok:
+ m_ProjectStats.ChunkHitCount++;
+ ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Result.ContentType));
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ContentType, Result.Chunk);
+ case ProjectStore::GetChunkRangeResult::EError::NotFound:
+ m_ProjectStats.ChunkMissCount++;
+ ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Result.ContentType, Result.Chunk);
+ case ProjectStore::GetChunkRangeResult::EError::MalformedContent:
+ return HttpReq.WriteResponse(
+ HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Get chunk {}/{}/{} failed. Reason: {}", ProjectId, OplogId, ChunkId, Result.ErrorDescription));
+ case ProjectStore::GetChunkRangeResult::EError::OutOfRange:
+ m_ProjectStats.ChunkMissCount++;
+ ZEN_DEBUG("chunk - '{}/{}/{}' OUT OF RANGE", ProjectId, OplogId, ChunkId);
+ return HttpReq.WriteResponse(
+ HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Get chunk {}/{}/{} failed. Reason: {}", ProjectId, OplogId, ChunkId, Result.ErrorDescription));
+ default:
+ ZEN_ASSERT(false);
+ break;
+ }
+}
+
+void
+HttpProjectService::HandleChunkByCidRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::ChunkByCid");
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ const auto& Cid = Req.GetCapture(3);
+ HttpContentType AcceptType = HttpReq.AcceptContentType();
+ HttpContentType RequestType = HttpReq.RequestContentType();
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Chunk request for unknown project '{}'", ProjectId));
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+ Project->TouchOplog(OplogId);
+
+ if (Cid.length() != IoHash::StringLength)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, Cid));
+ }
+
+ const IoHash Hash = IoHash::FromHexString(Cid);
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ IoBuffer Value = m_ProjectStore->GetChunk(*Project, *FoundLog, Hash);
+ if (Value)
+ {
+ if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary ||
+ AcceptType == ZenContentType::kJSON || AcceptType == ZenContentType::kYAML ||
+ AcceptType == ZenContentType::kCbObject)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Value));
+ IoBuffer DecompressedBuffer = Compressed.Decompress().AsIoBuffer();
+
+ if (DecompressedBuffer)
+ {
+ if (AcceptType == ZenContentType::kJSON || AcceptType == ZenContentType::kYAML ||
+ AcceptType == ZenContentType::kCbObject)
+ {
+ CbValidateError CbErr = ValidateCompactBinary(DecompressedBuffer.GetView(), CbValidateMode::Default);
+ if (!!CbErr)
+ {
+ m_ProjectStats.BadRequestCount++;
+ ZEN_DEBUG(
+ "chunk - '{}/{}/{}' WRONGTYPE. Reason: `Requested {} format, but could not convert to object`",
+ ProjectId,
+ OplogId,
+ Cid,
+ ToString(AcceptType));
+ return HttpReq.WriteResponse(
+ HttpResponseCode::NotAcceptable,
+ HttpContentType::kText,
+ fmt::format("Content format not supported, requested {} format, but could not convert to object",
+ ToString(AcceptType)));
+ }
+
+ m_ProjectStats.ChunkHitCount++;
+ CbObject ContainerObject = LoadCompactBinaryObject(DecompressedBuffer);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerObject);
+ }
+ else
+ {
+ Value = DecompressedBuffer;
+ Value.SetContentType(ZenContentType::kBinary);
+ }
+ }
+ else
+ {
+ m_ProjectStats.BadRequestCount++;
+ ZEN_DEBUG("chunk - '{}/{}/{}' WRONGTYPE. Reason: `Requested {} format, but could not decompress stored data`",
+ ProjectId,
+ OplogId,
+ Cid,
+ ToString(AcceptType));
+ return HttpReq.WriteResponse(
+ HttpResponseCode::NotAcceptable,
+ HttpContentType::kText,
+ fmt::format("Content format not supported, requested {} format, but could not decompress stored data",
+ ToString(AcceptType)));
+ }
+ }
+ m_ProjectStats.ChunkHitCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value);
+ }
+ else
+ {
+ m_ProjectStats.ChunkMissCount++;
+ ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid);
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ }
+ case HttpVerb::kPost:
+ {
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+ if (RequestType != HttpContentType::kCompressedBinary)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Chunk request for chunk id '{}/{}'/'{}' as unexpected content type: '{}'",
+ ProjectId,
+ OplogId,
+ Cid,
+ ToString(RequestType)));
+ }
+ IoBuffer Payload = HttpReq.ReadPayload();
+ Payload.SetContentType(RequestType);
+ bool IsNew = m_ProjectStore->PutChunk(*Project, *FoundLog, Hash, std::move(Payload));
+
+ m_ProjectStats.ChunkWriteCount++;
+ return HttpReq.WriteResponse(IsNew ? HttpResponseCode::Created : HttpResponseCode::OK);
+ }
+ break;
+ }
+}
+
+void
+HttpProjectService::HandleOplogOpPrepRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::OplogOpPrep");
+
+ using namespace std::literals;
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchOplog(OplogId);
+
+ // This operation takes a list of referenced hashes and decides which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
+
+ CbValidateError ValidateResult;
+ if (CbObject RequestObject = ValidateAndReadCompactBinaryObject(HttpReq.ReadPayload(), ValidateResult);
+ ValidateResult == CbValidateError::None)
+ {
+ std::vector<IoHash> NeedList;
+
+ {
+ eastl::fixed_vector<IoHash, 16> ChunkList;
+ CbArrayView HaveList = RequestObject["have"sv].AsArrayView();
+ ChunkList.reserve(HaveList.Num());
+ for (auto& Entry : HaveList)
+ {
+ ChunkList.push_back(Entry.AsHash());
+ }
+
+ NeedList = FoundLog->CheckPendingChunkReferences(std::span(begin(ChunkList), end(ChunkList)), std::chrono::minutes(2));
+ }
+
+ CbObjectWriter Cbo(1 + 1 + 5 + NeedList.size() * (1 + sizeof(IoHash::Hash)) + 1);
+ Cbo.BeginArray("need");
+ {
+ for (const IoHash& Hash : NeedList)
+ {
+ ZEN_DEBUG("prep - NEED: {}", Hash);
+ Cbo << Hash;
+ }
+ }
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Invalid compact binary format: '{}'", ToString(ValidateResult)));
+ }
+}
+
+void
+HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::OplogOpNew");
+
+ using namespace std::literals;
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ bool IsUsingSalt = false;
+ IoHash SaltHash = IoHash::Zero;
+
+ if (std::string_view SaltParam = Params.GetValue("salt"sv); SaltParam.empty() == false)
+ {
+ const uint32_t Salt = std::stoi(std::string(SaltParam));
+ SaltHash = IoHash::HashBuffer(&Salt, sizeof Salt);
+ IsUsingSalt = true;
+ }
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchOplog(OplogId);
+
+ ProjectStore::Oplog& Oplog = *FoundLog;
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+
+ // This will attempt to open files which may not exist for the case where
+ // the prep step rejected the chunk. This should be fixed since there's
+ // a performance cost associated with any file system activity
+
+ bool IsValid = true;
+ std::vector<IoHash> MissingChunks;
+
+ CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer {
+ if (m_CidStore.ContainsChunk(Hash))
+ {
+ // Return null attachment as we already have it, no point in reading it and storing it again
+ return {};
+ }
+
+ IoHash AttachmentId;
+ if (IsUsingSalt)
+ {
+ IoHash AttachmentSpec[]{SaltHash, Hash};
+ AttachmentId = IoHash::HashBuffer(MakeMemoryView(AttachmentSpec));
+ }
+ else
+ {
+ AttachmentId = Hash;
+ }
+
+ std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString();
+ if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath))
+ {
+ Data.SetDeleteOnClose(true);
+ return SharedBuffer(std::move(Data));
+ }
+ else
+ {
+ IsValid = false;
+ MissingChunks.push_back(Hash);
+
+ return {};
+ }
+ };
+
+ CbPackage Package;
+
+ if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver))
+ {
+ CbValidateError ValidateResult;
+ if (CbObject Core = ValidateAndReadCompactBinaryObject(IoBuffer(Payload), ValidateResult);
+ ValidateResult == CbValidateError::None && Core)
+ {
+ Package.SetObject(Core);
+ }
+ else
+ {
+ std::filesystem::path BadPackagePath =
+ Oplog.TempPath() / "bad_packages"sv / fmt::format("session{}_request{}"sv, HttpReq.SessionId(), HttpReq.RequestId());
+
+ ZEN_WARN("Received malformed package ('{}')! Saving payload to '{}'", ToString(ValidateResult), BadPackagePath);
+
+ WriteFile(BadPackagePath, Payload);
+
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ u8"request body must be a compact binary object or package in legacy format");
+ }
+ }
+
+ m_ProjectStats.ChunkMissCount += MissingChunks.size();
+
+ if (!IsValid)
+ {
+ ExtendableStringBuilder<256> ResponseText;
+ ResponseText.Append("Missing chunk references: ");
+
+ bool IsFirst = true;
+ for (const auto& Hash : MissingChunks)
+ {
+ if (IsFirst)
+ {
+ IsFirst = false;
+ }
+ else
+ {
+ ResponseText.Append(", ");
+ }
+ Hash.ToHexString(ResponseText);
+ }
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, ResponseText);
+ }
+
+ CbObject Core = Package.GetObject();
+
+ if (!Core["key"sv])
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified");
+ }
+
+ eastl::fixed_vector<IoHash, 16> ReferencedChunks;
+ Core.IterateAttachments([&ReferencedChunks](CbFieldView View) { ReferencedChunks.push_back(View.AsAttachment()); });
+
+ // Write core to oplog
+
+ size_t AttachmentCount = Package.GetAttachments().size();
+ const ProjectStore::LogSequenceNumber OpLsn = Oplog.AppendNewOplogEntry(Package);
+ if (!OpLsn)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ m_ProjectStats.ChunkWriteCount += AttachmentCount;
+
+ // Once we stored the op, we no longer need to retain any chunks this op references
+ if (!ReferencedChunks.empty())
+ {
+ FoundLog->RemovePendingChunkReferences(std::span(begin(ReferencedChunks), end(ReferencedChunks)));
+ }
+
+ m_ProjectStats.OpWriteCount++;
+ ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn.Number, NiceBytes(Payload.Size()), Core["key"sv].AsString());
+ HttpReq.WriteResponse(HttpResponseCode::Created);
+}
+
+void
+HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::OplogOpValidate");
+
+ using namespace std::literals;
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, ZenContentType::kText, fmt::format("Project '{}' not found", ProjectId));
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ ZenContentType::kText,
+ fmt::format("Oplog '{}' not found in project '{}'", OplogId, ProjectId));
+ }
+ Project->TouchOplog(OplogId);
+
+ ProjectStore::Oplog& Oplog = *FoundLog;
+
+ std::atomic_bool CancelFlag = false;
+ ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(Project->RootDir, CancelFlag, &GetSmallWorkerPool(EWorkloadType::Burst));
+ tsl::robin_map<Oid, std::string, Oid::Hasher> KeyNameLookup;
+ KeyNameLookup.reserve(Result.OpKeys.size());
+ for (const auto& It : Result.OpKeys)
+ {
+ KeyNameLookup.insert_or_assign(It.first, It.second);
+ }
+ CbObjectWriter Writer;
+ Writer << "HasMissingData" << !Result.IsEmpty();
+ Writer << "OpCount" << Result.OpCount;
+ Writer << "LSNLow" << Result.LSNLow.Number;
+ Writer << "LSNHigh" << Result.LSNHigh.Number;
+ if (!Result.MissingFiles.empty())
+ {
+ Writer.BeginArray("MissingFiles");
+ for (const auto& MissingFile : Result.MissingFiles)
+ {
+ Writer.BeginObject();
+ {
+ Writer << "Key" << MissingFile.first;
+ Writer << "KeyName" << KeyNameLookup[MissingFile.first];
+ Writer << "Id" << MissingFile.second.Id;
+ Writer << "Hash" << MissingFile.second.Hash;
+ Writer << "ServerPath" << MissingFile.second.ServerPath;
+ Writer << "ClientPath" << MissingFile.second.ClientPath;
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ if (!Result.MissingChunks.empty())
+ {
+ Writer.BeginArray("MissingChunks");
+ for (const auto& MissingChunk : Result.MissingChunks)
+ {
+ Writer.BeginObject();
+ {
+ Writer << "Key" << MissingChunk.first;
+ Writer << "KeyName" << KeyNameLookup[MissingChunk.first];
+ Writer << "Id" << MissingChunk.second.Id;
+ Writer << "Hash" << MissingChunk.second.Hash;
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ if (!Result.MissingMetas.empty())
+ {
+ Writer.BeginArray("MissingMetas");
+ for (const auto& MissingMeta : Result.MissingMetas)
+ {
+ Writer.BeginObject();
+ {
+ Writer << "Key" << MissingMeta.first;
+ Writer << "KeyName" << KeyNameLookup[MissingMeta.first];
+ Writer << "Id" << MissingMeta.second.Id;
+ Writer << "Hash" << MissingMeta.second.Hash;
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ if (!Result.MissingAttachments.empty())
+ {
+ Writer.BeginArray("MissingAttachments");
+ for (const auto& MissingMeta : Result.MissingAttachments)
+ {
+ Writer.BeginObject();
+ {
+ Writer << "Key" << MissingMeta.first;
+ Writer << "KeyName" << KeyNameLookup[MissingMeta.first];
+ Writer << "Hash" << MissingMeta.second;
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray();
+ }
+ CbObject Response = Writer.Save();
+ HttpReq.WriteResponse(HttpResponseCode::OK, Response);
+}
+
+void
+HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::OplogOp");
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const std::string_view ProjectId = Req.GetCapture(1);
+ const std::string_view OplogId = Req.GetCapture(2);
+ const std::string_view OpIdString = Req.GetCapture(3);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchOplog(OplogId);
+
+ ProjectStore::Oplog& Oplog = *FoundLog;
+
+ if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString))
+ {
+ if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(ProjectStore::LogSequenceNumber(OpId.value())))
+ {
+ CbObject& Op = MaybeOp.value();
+ if (HttpReq.AcceptContentType() == ZenContentType::kCbPackage)
+ {
+ CbPackage Package;
+ Package.SetObject(Op);
+
+ Op.IterateAttachments([&](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsAttachment();
+ IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash);
+ if (Payload)
+ {
+ switch (Payload.GetContentType())
+ {
+ case ZenContentType::kCbObject:
+ {
+ CbValidateError ValidateResult;
+ if (CbObject Object = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult);
+ ValidateResult == CbValidateError::None && Object)
+ {
+ Package.AddAttachment(CbAttachment(Object));
+ }
+ else
+ {
+ // Error - malformed object
+ ZEN_WARN("malformed object returned for {} ('{}')", AttachmentHash, ToString(ValidateResult));
+ }
+ }
+ break;
+
+ case ZenContentType::kCompressedBinary:
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)))
+ {
+ Package.AddAttachment(CbAttachment(Compressed, AttachmentHash));
+ }
+ else
+ {
+ // Error - not compressed!
+
+ ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash);
+ }
+ break;
+
+ default:
+ Package.AddAttachment(CbAttachment(SharedBuffer(Payload)));
+ break;
+ }
+ }
+ });
+ m_ProjectStats.OpHitCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::Accepted, Package);
+ }
+ else
+ {
+ // Client cannot accept a package, so we only send the core object
+ m_ProjectStats.OpHitCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::Accepted, Op);
+ }
+ }
+ }
+ m_ProjectStats.OpMissCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+}
+
+void
+HttpProjectService::HandleOpLogRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::Oplog");
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ using namespace std::literals;
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("project {} not found", ProjectId));
+ }
+ Project->TouchProject();
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ Ref<ProjectStore::Oplog> OplogIt = Project->ReadOplog(OplogId);
+ if (!OplogIt)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("oplog {} not found in project {}", OplogId, ProjectId));
+ }
+
+ ProjectStore::Oplog& Log = *OplogIt;
+
+ CbObjectWriter Cb;
+ Cb << "id"sv << Log.OplogId() << "project"sv << Project->Identifier << "tempdir"sv << Log.TempPath().c_str()
+ << "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount" << Log.OplogCount()
+ << "expired"sv << Project->IsExpired(GcClock::TimePoint::min(), Log);
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cb.Save());
+
+ m_ProjectStats.OpLogReadCount++;
+ }
+ break;
+
+ case HttpVerb::kPost:
+ {
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+ std::filesystem::path OplogMarkerPath;
+ if (CbObject Params = HttpReq.ReadPayloadObject())
+ {
+ OplogMarkerPath = Params["gcpath"sv].AsString();
+ }
+
+ Ref<ProjectStore::Oplog> OplogIt = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true);
+ if (!OplogIt)
+ {
+ if (!Project->NewOplog(OplogId, OplogMarkerPath))
+ {
+ // TODO: indicate why the operation failed!
+ return HttpReq.WriteResponse(HttpResponseCode::InternalServerError);
+ }
+ Project->TouchOplog(OplogId);
+
+ m_ProjectStats.OpLogWriteCount++;
+ ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath);
+
+ return HttpReq.WriteResponse(HttpResponseCode::Created);
+ }
+
+ // I guess this should ultimately be used to execute RPCs but for now, it
+ // does absolutely nothing
+
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ break;
+
+ case HttpVerb::kPut:
+ {
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ std::filesystem::path OplogMarkerPath;
+ if (CbObject Params = HttpReq.ReadPayloadObject())
+ {
+ OplogMarkerPath = Params["gcpath"sv].AsString();
+ }
+
+ Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true);
+ if (!FoundLog)
+ {
+ if (!Project->NewOplog(OplogId, OplogMarkerPath))
+ {
+ // TODO: indicate why the operation failed!
+ return HttpReq.WriteResponse(HttpResponseCode::InternalServerError);
+ }
+ Project->TouchOplog(OplogId);
+
+ m_ProjectStats.OpLogWriteCount++;
+ ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath);
+
+ return HttpReq.WriteResponse(HttpResponseCode::Created);
+ }
+ Project->TouchOplog(OplogId);
+
+ FoundLog->Update(OplogMarkerPath);
+
+ m_ProjectStats.OpLogWriteCount++;
+ ZEN_INFO("updated oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ }
+ break;
+
+ case HttpVerb::kDelete:
+ {
+ ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId);
+
+ if (Project->DeleteOplog(OplogId))
+ {
+ m_ProjectStats.OpLogDeleteCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::Locked,
+ HttpContentType::kText,
+ fmt::format("oplog {}/{} is in use", ProjectId, OplogId));
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+}
+
+std::optional<OplogReferencedSet>
+LoadReferencedSet(ProjectStore::Project& Project, ProjectStore::Oplog& Log)
+{
+ using namespace std::literals;
+
+ Oid ReferencedSetOplogId = OpKeyStringAsOid(OplogReferencedSet::ReferencedSetOplogKey);
+ std::optional<CbObject> ReferencedSetOp = Log.GetOpByKey(ReferencedSetOplogId);
+ if (!ReferencedSetOp)
+ {
+ return std::optional<OplogReferencedSet>();
+ }
+ // We expect only a single file in the "files" array; get the chunk for the first file
+ CbFieldView FileField = *(*ReferencedSetOp)["files"sv].AsArrayView().CreateViewIterator();
+ Oid ChunkId = FileField.AsObjectView()["id"sv].AsObjectId();
+ if (ChunkId == Oid::Zero)
+ {
+ return std::optional<OplogReferencedSet>();
+ }
+
+ return OplogReferencedSet::LoadFromChunk(Log.FindChunk(Project.RootDir, ChunkId, nullptr));
+}
+
+void
+HttpProjectService::HandleOpLogEntriesRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::OplogEntries");
+
+ using namespace std::literals;
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ Project->TouchOplog(OplogId);
+
+ CbObjectWriter Response;
+
+ if (FoundLog->OplogCount() > 0)
+ {
+ std::unordered_set<std::string> FieldNamesFilter;
+ auto FilterObject = [&FieldNamesFilter](CbObjectView& Object) -> CbObject {
+ CbObject RewrittenOp = RewriteCbObject(Object, [&FieldNamesFilter](CbObjectWriter&, CbFieldView Field) -> bool {
+ if (FieldNamesFilter.contains(std::string(Field.GetName())))
+ {
+ return false;
+ }
+
+ return true;
+ });
+
+ return RewrittenOp;
+ };
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+ if (auto FieldFilter = HttpServerRequest::Decode(Params.GetValue("fieldfilter")); !FieldFilter.empty())
+ {
+ ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) {
+ FieldNamesFilter.insert(std::string(FieldName));
+ return true;
+ });
+ }
+
+ if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty())
+ {
+ Oid OpKeyId = OpKeyStringAsOid(OpKey);
+ std::optional<CbObject> Op = FoundLog->GetOpByKey(OpKeyId);
+
+ if (Op.has_value())
+ {
+ if (FieldNamesFilter.empty())
+ {
+ Response << "entry"sv << Op.value();
+ }
+ else
+ {
+ Response << "entry"sv << FilterObject(Op.value());
+ }
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ }
+ else
+ {
+ ProjectStore::Oplog::Paging EntryPaging;
+ if (std::string_view Param = Params.GetValue("start"); !Param.empty())
+ {
+ if (auto Value = ParseInt<int32>(Param))
+ {
+ EntryPaging.Start = *Value;
+ }
+ }
+ if (std::string_view Param = Params.GetValue("count"); !Param.empty())
+ {
+ if (auto Value = ParseInt<int32>(Param))
+ {
+ EntryPaging.Count = *Value;
+ }
+ }
+
+ std::optional<OplogReferencedSet> MaybeReferencedSet;
+ if (auto TrimString = Params.GetValue("trim_by_referencedset"); TrimString == "true")
+ {
+ MaybeReferencedSet = LoadReferencedSet(*Project, *FoundLog);
+ }
+ Response.BeginArray("entries"sv);
+
+ bool ShouldFilterFields = !FieldNamesFilter.empty();
+
+ if (MaybeReferencedSet)
+ {
+ const OplogReferencedSet& ReferencedSet = MaybeReferencedSet.value();
+ FoundLog->IterateOplogWithKey(
+ [this, &Response, &FilterObject, ShouldFilterFields, &ReferencedSet](ProjectStore::LogSequenceNumber /* LSN */,
+ const Oid& Key,
+ CbObjectView Op) {
+ if (!ReferencedSet.Contains(Key))
+ {
+ if (!OplogReferencedSet::IsNonPackage(Op["key"].AsString()))
+ {
+ return;
+ }
+ }
+
+ if (ShouldFilterFields)
+ {
+ Response << FilterObject(Op);
+ }
+ else
+ {
+ Response << Op;
+ }
+ },
+ EntryPaging);
+ }
+ else
+ {
+ FoundLog->IterateOplog(
+ [this, &Response, &FilterObject, ShouldFilterFields](CbObjectView Op) {
+ if (ShouldFilterFields)
+ {
+ Response << FilterObject(Op);
+ }
+ else
+ {
+ Response << Op;
+ }
+ },
+ EntryPaging);
+ }
+
+ Response.EndArray();
+ }
+ }
+ if (HttpReq.AcceptContentType() == HttpContentType::kCompressedBinary)
+ {
+ CompositeBuffer Payload = CompressedBuffer::Compress(Response.Save().GetBuffer()).GetCompressed();
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, Payload);
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
+ }
+}
+
+void
+HttpProjectService::HandleProjectRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::Project");
+
+ using namespace std::literals;
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const std::string_view ProjectId = Req.GetCapture(1);
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kPost:
+ {
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ CbValidateError ValidateResult;
+ if (CbObject Params = ValidateAndReadCompactBinaryObject(HttpReq.ReadPayload(), ValidateResult);
+ ValidateResult == CbValidateError::None)
+ {
+ std::filesystem::path Root = Params["root"sv].AsU8String(); // Workspace root (i.e `D:/UE5/`)
+ std::filesystem::path EngineRoot = Params["engine"sv].AsU8String(); // Engine root (i.e `D:/UE5/Engine`)
+ std::filesystem::path ProjectRoot =
+ Params["project"sv].AsU8String(); // Project root directory (i.e `D:/UE5/Samples/Games/Lyra`)
+ std::filesystem::path ProjectFilePath =
+ Params["projectfile"sv].AsU8String(); // Project file path (i.e `D:/UE5/Samples/Games/Lyra/Lyra.uproject`)
+
+ const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId;
+ m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath);
+
+ ZEN_INFO("established project (id: '{}', roots: '{}', '{}', '{}', '{}'{})",
+ ProjectId,
+ Root,
+ EngineRoot,
+ ProjectRoot,
+ ProjectFilePath,
+ ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : "");
+
+ m_ProjectStats.ProjectWriteCount++;
+ HttpReq.WriteResponse(HttpResponseCode::Created);
+ }
+ else
+ {
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Malformed compact binary object: '{}'", ToString(ValidateResult)));
+ }
+ }
+ break;
+
+ case HttpVerb::kPut:
+ {
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+ CbValidateError ValidateResult;
+ if (CbObject Params = ValidateAndReadCompactBinaryObject(HttpReq.ReadPayload(), ValidateResult);
+ ValidateResult == CbValidateError::None)
+ {
+ std::filesystem::path Root = Params["root"sv].AsU8String(); // Workspace root (i.e `D:/UE5/`)
+ std::filesystem::path EngineRoot = Params["engine"sv].AsU8String(); // Engine root (i.e `D:/UE5/Engine`)
+ std::filesystem::path ProjectRoot =
+ Params["project"sv].AsU8String(); // Project root directory (i.e `D:/UE5/Samples/Games/Lyra`)
+ std::filesystem::path ProjectFilePath =
+ Params["projectfile"sv].AsU8String(); // Project file path (i.e `D:/UE5/Samples/Games/Lyra/Lyra.uproject`)
+
+ if (m_ProjectStore->UpdateProject(ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath))
+ {
+ m_ProjectStats.ProjectWriteCount++;
+ ZEN_INFO("updated project (id: '{}', roots: '{}', '{}', '{}', '{}'{})",
+ ProjectId,
+ Root,
+ EngineRoot,
+ ProjectRoot,
+ ProjectFilePath,
+ ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : "");
+
+ HttpReq.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId;
+ m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath);
+
+ m_ProjectStats.ProjectWriteCount++;
+ ZEN_INFO("established project (id: '{}', roots: '{}', '{}', '{}', '{}'{})",
+ ProjectId,
+ Root,
+ EngineRoot,
+ ProjectRoot,
+ ProjectFilePath,
+ ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : "");
+
+ HttpReq.WriteResponse(HttpResponseCode::Created);
+ }
+ }
+ else
+ {
+ HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Malformed compact binary object: '{}'", ToString(ValidateResult)));
+ }
+ }
+ break;
+
+ case HttpVerb::kGet:
+ {
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("project {} not found", ProjectId));
+ }
+ Project->TouchProject();
+
+ std::vector<std::string> OpLogs = Project->ScanForOplogs();
+
+ CbObjectWriter Response;
+ Response << "id"sv << Project->Identifier;
+ Response << "root"sv << PathToUtf8(Project->RootDir);
+ Response << "engine"sv << PathToUtf8(Project->EngineRootDir);
+ Response << "project"sv << PathToUtf8(Project->ProjectRootDir);
+ Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath);
+
+ Response.BeginArray("oplogs"sv);
+ for (const std::string& OplogId : OpLogs)
+ {
+ Response.BeginObject();
+ Response << "id"sv << OplogId;
+ Response.EndObject();
+ }
+ Response.EndArray(); // oplogs
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
+
+ m_ProjectStats.ProjectReadCount++;
+ }
+ break;
+
+ case HttpVerb::kDelete:
+ {
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("project {} not found", ProjectId));
+ }
+
+ ZEN_INFO("deleting project '{}'", ProjectId);
+ if (!m_ProjectStore->DeleteProject(ProjectId))
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::Locked,
+ HttpContentType::kText,
+ fmt::format("project {} is in use", ProjectId));
+ }
+
+ m_ProjectStats.ProjectDeleteCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
+ }
+ break;
+
+ default:
+ break;
+ }
+}
+
+void
+HttpProjectService::HandleOplogSaveRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::OplogSave");
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ if (HttpReq.RequestContentType() != HttpContentType::kCbObject)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type");
+ }
+ IoBuffer Payload = HttpReq.ReadPayload();
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Write oplog request for unknown project '{}'", ProjectId));
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false);
+ if (!Oplog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+ Project->TouchOplog(OplogId);
+
+ CbValidateError ValidateResult;
+ if (CbObject ContainerObject = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult);
+ ValidateResult == CbValidateError::None && ContainerObject)
+ {
+ RwLock AttachmentsLock;
+ tsl::robin_set<IoHash, IoHash::Hasher> Attachments;
+
+ auto HasAttachment = [this](const IoHash& RawHash) { return m_CidStore.ContainsChunk(RawHash); };
+ auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ if (BlockHash != IoHash::Zero)
+ {
+ Attachments.insert(BlockHash);
+ }
+ else
+ {
+ Attachments.insert(ChunkHashes.begin(), ChunkHashes.end());
+ }
+ };
+ auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ Attachments.insert(RawHash);
+ };
+
+ auto OnChunkedAttachment = [](const ChunkedInfo&) {};
+
+ auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog->CaptureAddedAttachments(RawHashes); };
+
+ // Make sure we retain any attachments we download before writing the oplog
+ Oplog->EnableUpdateCapture();
+ auto _ = MakeGuard([&Oplog]() { Oplog->DisableUpdateCapture(); });
+
+ RemoteProjectStore::Result Result = SaveOplogContainer(*Oplog,
+ ContainerObject,
+ OnReferencedAttachments,
+ HasAttachment,
+ OnNeedBlock,
+ OnNeedAttachment,
+ OnChunkedAttachment,
+ nullptr);
+
+ if (Result.ErrorCode == 0)
+ {
+ if (Attachments.empty())
+ {
+ HttpReq.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ CbObjectWriter Cbo(1 + 1 + 5 + Attachments.size() * (1 + sizeof(IoHash::Hash)) + 1);
+ Cbo.BeginArray("need");
+ {
+ for (const IoHash& Hash : Attachments)
+ {
+ ZEN_DEBUG("Need attachment {}", Hash);
+ Cbo << Hash;
+ }
+ }
+ Cbo.EndArray(); // "need"
+
+ CbObject ResponsePayload = Cbo.Save();
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload);
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
+ ToString(HttpReq.RequestVerb()),
+ HttpReq.QueryString(),
+ Result.ErrorCode,
+ Result.Reason);
+
+ if (Result.Reason.empty())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode));
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.Reason);
+ }
+ }
+ }
+ else
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Invalid payload: '{}'", ToString(ValidateResult)));
+ }
+}
+
+void
+HttpProjectService::HandleOplogLoadRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::OplogLoad");
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ const HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ if (HttpReq.AcceptContentType() != HttpContentType::kCbObject)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type");
+ }
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Read oplog request for unknown project '{}'", ProjectId));
+ }
+ Project->TouchProject();
+
+ Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true);
+ if (!Oplog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+ Project->TouchOplog(OplogId);
+
+ size_t MaxBlockSize = RemoteStoreOptions::DefaultMaxBlockSize;
+ if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<size_t>(Param))
+ {
+ MaxBlockSize = Value.value();
+ }
+ }
+ size_t MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize;
+ if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<size_t>(Param))
+ {
+ MaxChunkEmbedSize = Value.value();
+ }
+ }
+ size_t MaxChunksPerBlock = RemoteStoreOptions::DefaultMaxChunksPerBlock;
+ if (auto Param = Params.GetValue("maxchunksperblock"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<size_t>(Param))
+ {
+ MaxChunksPerBlock = Value.value();
+ }
+ }
+
+ size_t ChunkFileSizeLimit = RemoteStoreOptions::DefaultChunkFileSizeLimit;
+ if (auto Param = Params.GetValue("chunkfilesizelimit"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<size_t>(Param))
+ {
+ ChunkFileSizeLimit = Value.value();
+ }
+ }
+
+ WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
+
+ RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
+ m_CidStore,
+ *Project,
+ *Oplog,
+ WorkerPool,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ MaxChunksPerBlock,
+ ChunkFileSizeLimit,
+ /* BuildBlocks */ false,
+ /* IgnoreMissingAttachments */ false,
+ /* AllowChunking*/ false,
+ [](CompressedBuffer&&, ChunkBlockDescription&&) {},
+ [](const IoHash&, TGetAttachmentBufferFunc&&) {},
+ [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {},
+ /* EmbedLooseFiles*/ false);
+
+ if (ContainerResult.ErrorCode == 0)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerResult.ContainerObject);
+ }
+ else
+ {
+ ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
+ ToString(HttpReq.RequestVerb()),
+ HttpReq.QueryString(),
+ ContainerResult.ErrorCode,
+ ContainerResult.Reason);
+
+ if (ContainerResult.Reason.empty())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode));
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode), HttpContentType::kText, ContainerResult.Reason);
+ }
+ }
+}
+
+void
+HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::Rpc");
+ using namespace std::literals;
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ IoBuffer Payload = HttpReq.ReadPayload();
+
+ HttpContentType PayloadContentType = HttpReq.RequestContentType();
+ CbPackage Package;
+ CbObject Cb;
+ switch (PayloadContentType)
+ {
+ case HttpContentType::kJSON:
+ case HttpContentType::kUnknownContentType:
+ case HttpContentType::kText:
+ {
+ std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
+ Cb = LoadCompactBinaryFromJson(JsonText).AsObject();
+ if (!Cb)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected JSON format");
+ }
+ }
+ break;
+ case HttpContentType::kCbObject:
+ {
+ CbValidateError ValidateResult;
+ if (Cb = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult);
+ ValidateResult != CbValidateError::None || !Cb)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Content format not supported, expected compact binary format ('{}')", ToString(ValidateResult)));
+ }
+ break;
+ }
+ case HttpContentType::kCbPackage:
+ try
+ {
+ Package = ParsePackageMessage(Payload);
+ Cb = Package.GetObject();
+ }
+ catch (const std::invalid_argument& ex)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Failed to parse package request, reason: '{}'", ex.what()));
+ }
+ if (!Cb)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected package message format");
+ }
+ break;
+ default:
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type");
+ }
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Rpc oplog request for unknown project '{}'", ProjectId));
+ }
+ Project->TouchProject();
+
+ std::string_view Method = Cb["method"sv].AsString();
+
+ bool VerifyPathOnDisk = Method != "getchunks"sv;
+
+ Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk);
+ if (!Oplog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+ Project->TouchOplog(OplogId);
+
+ uint32_t MethodHash = HashStringDjb2(Method);
+
+ switch (MethodHash)
+ {
+ case HashStringDjb2("import"sv):
+ {
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ CbObjectView Params = Cb["params"sv].AsObjectView();
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
+ bool Force = Params["force"sv].AsBool(false);
+ bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
+ bool CleanOplog = Params["clean"].AsBool(false);
+
+ CreateRemoteStoreResult RemoteStoreResult =
+ CreateRemoteStore(Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath());
+
+ if (RemoteStoreResult.Store == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description);
+ }
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ JobId JobId = m_JobQueue.QueueJob(
+ fmt::format("Import oplog '{}/{}'", Project->Identifier, Oplog->OplogId()),
+ [this,
+ ChunkStore = &m_CidStore,
+ ActualRemoteStore = std::move(RemoteStore),
+ Oplog,
+ Force,
+ IgnoreMissingAttachments,
+ CleanOplog](JobContext& Context) {
+ Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}",
+ Oplog->GetOuterProjectIdentifier(),
+ Oplog->OplogId(),
+ ActualRemoteStore->GetInfo().Description));
+
+ WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
+ WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
+
+ RemoteProjectStore::Result Result = LoadOplog(m_CidStore,
+ *ActualRemoteStore,
+ *Oplog,
+ NetworkWorkerPool,
+ WorkerPool,
+ Force,
+ IgnoreMissingAttachments,
+ CleanOplog,
+ &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
+ (int)Response.first);
+ }
+ });
+
+ return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id));
+ }
+ case HashStringDjb2("export"sv):
+ {
+ CbObjectView Params = Cb["params"sv].AsObjectView();
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
+ size_t MaxChunksPerBlock = Params["maxchunksperblock"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunksPerBlock);
+ size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit);
+ bool Force = Params["force"sv].AsBool(false);
+ bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
+ bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
+
+ CreateRemoteStoreResult RemoteStoreResult =
+ CreateRemoteStore(Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath());
+
+ if (RemoteStoreResult.Store == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description);
+ }
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ JobId JobId = m_JobQueue.QueueJob(
+ fmt::format("Export oplog '{}/{}'", Project->Identifier, Oplog->OplogId()),
+ [this,
+ ActualRemoteStore = std::move(RemoteStore),
+ Project,
+ Oplog,
+ MaxBlockSize,
+ MaxChunksPerBlock,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ EmbedLooseFile,
+ Force,
+ IgnoreMissingAttachments](JobContext& Context) {
+ Context.ReportMessage(fmt::format("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}",
+ Project->Identifier,
+ Oplog->OplogId(),
+ ActualRemoteStore->GetInfo().Description,
+ NiceBytes(MaxBlockSize),
+ NiceBytes(MaxChunkEmbedSize)));
+
+ WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background);
+ WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
+
+ RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ *ActualRemoteStore,
+ *Project,
+ *Oplog,
+ NetworkWorkerPool,
+ WorkerPool,
+ MaxBlockSize,
+ MaxChunksPerBlock,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ EmbedLooseFile,
+ Force,
+ IgnoreMissingAttachments,
+ &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
+ (int)Response.first);
+ }
+ });
+
+ return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id));
+ }
+ case HashStringDjb2("getchunks"sv):
+ {
+ RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u));
+ int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0);
+
+ std::vector<ProjectStore::ChunkRequest> Requests = m_ProjectStore->ParseChunksRequests(*Project, *Oplog, Cb);
+ std::vector<ProjectStore::ChunkResult> Results =
+ Requests.empty() ? std::vector<ProjectStore::ChunkResult>{} : m_ProjectStore->GetChunks(*Project, *Oplog, Requests);
+ CbPackage Response = m_ProjectStore->WriteChunksRequestResponse(*Project, *Oplog, std::move(Requests), std::move(Results));
+
+ void* TargetProcessHandle = nullptr;
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ {
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
+ TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId);
+ }
+
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(Response, Flags, TargetProcessHandle);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ case HashStringDjb2("putchunks"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::putchunks");
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ CbObject Object = Package.GetObject();
+ const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false);
+
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ if (!Attachments.empty())
+ {
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ WriteAttachmentBuffers.reserve(Attachments.size());
+ WriteRawHashes.reserve(Attachments.size());
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ IoHash RawHash = Attachment.GetHash();
+ const CompressedBuffer& Compressed = Attachment.AsCompressedBinary();
+ IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ if (UsingTempFiles)
+ {
+ AttachmentPayload.SetDeleteOnClose(true);
+ }
+ WriteAttachmentBuffers.push_back(std::move(AttachmentPayload));
+ WriteRawHashes.push_back(RawHash);
+ }
+
+ Oplog->CaptureAddedAttachments(WriteRawHashes);
+ m_CidStore.AddChunks(WriteAttachmentBuffers,
+ WriteRawHashes,
+ UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ }
+ case HashStringDjb2("snapshot"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::snapshot");
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ // Snapshot all referenced files. This brings the content of all
+ // files into the CID store
+
+ uint32_t OpCount = 0;
+ uint64_t InlinedBytes = 0;
+ uint64_t InlinedFiles = 0;
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
+
+ std::vector<CbObject> NewOps;
+ struct AddedChunk
+ {
+ IoBuffer Buffer;
+ uint64_t RawSize = 0;
+ };
+ tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks;
+
+ Oplog->IterateOplog(
+ [&](CbObjectView Op) {
+ bool OpRewritten = false;
+ bool AllOk = true;
+
+ CbWriter FilesArrayWriter;
+ FilesArrayWriter.BeginArray("files"sv);
+
+ for (CbFieldView& Field : Op["files"sv])
+ {
+ bool CopyField = true;
+
+ if (CbObjectView View = Field.AsObjectView())
+ {
+ const IoHash DataHash = View["data"sv].AsHash();
+
+ if (DataHash == IoHash::Zero)
+ {
+ std::string_view ServerPath = View["serverpath"sv].AsString();
+ std::filesystem::path FilePath = Project->RootDir / ServerPath;
+ BasicFile DataFile;
+ std::error_code Ec;
+ DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
+
+ if (Ec)
+ {
+ // Error...
+
+ ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message());
+
+ AllOk = false;
+ }
+ else
+ {
+ // Read file contents into memory, compress and keep in map of chunks to add to Cid store
+ IoBuffer FileIoBuffer = DataFile.ReadAll();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
+ const uint64_t RawSize = Compressed.DecodeRawSize();
+ const IoHash RawHash = Compressed.DecodeRawHash();
+ if (!AddedChunks.contains(RawHash))
+ {
+ const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString();
+ BasicFile ChunkTempFile;
+ ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete);
+ ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec);
+ if (Ec)
+ {
+ Oid ChunkId = View["id"sv].AsObjectId();
+ ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}",
+ FilePath,
+ ChunkId,
+ Ec.message());
+ AllOk = false;
+ }
+ else
+ {
+ void* FileHandle = ChunkTempFile.Detach();
+ IoBuffer ChunkBuffer(IoBuffer::File,
+ FileHandle,
+ 0,
+ Compressed.GetCompressed().GetSize(),
+ /*IsWholeFile*/ true);
+ ChunkBuffer.SetDeleteOnClose(true);
+ AddedChunks.insert_or_assign(
+ RawHash,
+ AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize});
+ }
+ }
+
+ TotalBytes += RawSize;
+ ++TotalFiles;
+
+ // Rewrite file array entry with new data reference
+ CbObjectWriter Writer(View.GetSize());
+ RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ // omit this field as we will write it explicitly ourselves
+ return true;
+ }
+ return false;
+ });
+ Writer.AddBinaryAttachment("data"sv, RawHash);
+
+ CbObject RewrittenOp = Writer.Save();
+ FilesArrayWriter.AddObject(std::move(RewrittenOp));
+ CopyField = false;
+ }
+ }
+ }
+
+ if (CopyField)
+ {
+ FilesArrayWriter.AddField(Field);
+ }
+ else
+ {
+ OpRewritten = true;
+ }
+ }
+
+ if (OpRewritten && AllOk)
+ {
+ FilesArrayWriter.EndArray();
+ CbArray FilesArray = FilesArrayWriter.Save().AsArray();
+
+ CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
+ if (Field.GetName() == "files"sv)
+ {
+ NewWriter.AddArray("files"sv, FilesArray);
+
+ return true;
+ }
+
+ return false;
+ });
+
+ NewOps.push_back(std::move(RewrittenOp));
+ }
+
+ OpCount++;
+ },
+ ProjectStore::Oplog::Paging{});
+
+ CbObjectWriter ResponseObj;
+
+ // Persist rewritten oplog entries
+ if (!NewOps.empty())
+ {
+ ResponseObj.BeginArray("rewritten_ops");
+
+ for (CbObject& NewOp : NewOps)
+ {
+ ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
+
+ ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number);
+
+ ResponseObj.AddInteger(NewLsn.Number);
+ }
+
+ ResponseObj.EndArray();
+ }
+
+ // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the
+ // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have
+ // unreferenced chunks.
+ for (auto It : AddedChunks)
+ {
+ const IoHash& RawHash = It.first;
+ AddedChunk& Chunk = It.second;
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash);
+ if (Result.New)
+ {
+ InlinedBytes += Chunk.RawSize;
+ ++InlinedFiles;
+ }
+ }
+
+ ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
+ ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;
+
+ ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save());
+ }
+ case HashStringDjb2("appendops"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::appendops");
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ CbArrayView OpsArray = Cb["ops"sv].AsArrayView();
+
+ size_t OpsBufferSize = 0;
+ for (CbFieldView OpView : OpsArray)
+ {
+ OpsBufferSize += OpView.GetSize();
+ }
+ UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize);
+ MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView();
+
+ std::vector<CbObjectView> Ops;
+ Ops.reserve(OpsArray.Num());
+ for (CbFieldView OpView : OpsArray)
+ {
+ OpView.CopyTo(OpsBuffersMemory);
+ Ops.push_back(CbObjectView(OpsBuffersMemory.GetData()));
+ OpsBuffersMemory.MidInline(OpView.GetSize());
+ }
+
+ std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops);
+ ZEN_ASSERT(LSNs.size() == Ops.size());
+
+ std::vector<IoHash> MissingAttachments;
+ for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++)
+ {
+ if (LSNs[OpIndex])
+ {
+ CbObjectView Op = Ops[OpIndex];
+ Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) {
+ const IoHash Cid = AttachmentView.AsAttachment();
+ if (!m_CidStore.ContainsChunk(Cid))
+ {
+ MissingAttachments.push_back(Cid);
+ }
+ });
+ }
+ }
+
+ CbPackage ResponsePackage;
+
+ {
+ CbObjectWriter ResponseObj;
+ ResponseObj.BeginArray("written_ops");
+
+ for (ProjectStore::LogSequenceNumber NewLsn : LSNs)
+ {
+ ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number);
+ ResponseObj.AddInteger(NewLsn.Number);
+ }
+ ResponseObj.EndArray();
+
+ if (!MissingAttachments.empty())
+ {
+ ResponseObj.BeginArray("need");
+
+ for (const IoHash& Cid : MissingAttachments)
+ {
+ ResponseObj.AddHash(Cid);
+ }
+ ResponseObj.EndArray();
+ }
+ ResponsePackage.SetObject(ResponseObj.Save());
+ }
+
+ std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers);
+ }
+ default:
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Unknown rpc method '{}'", Method));
+ }
+}
+void
+HttpProjectService::HandleDetailsRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::Details");
+
+ using namespace std::literals;
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+ bool CSV = Params.GetValue("csv"sv) == "true"sv;
+ bool Details = Params.GetValue("details"sv) == "true"sv;
+ bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv;
+ bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv;
+
+ if (CSV)
+ {
+ ExtendableStringBuilder<4096> CSVWriter;
+ CSVHeader(Details, AttachmentDetails, CSVWriter);
+
+ m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) {
+ Project.IterateOplogs([&](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) {
+ Oplog.IterateOplogWithKey(
+ [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](ProjectStore::LogSequenceNumber LSN,
+ const Oid& Key,
+ CbObjectView Op) {
+ CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
+ });
+ });
+ });
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
+ }
+ else
+ {
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("projects");
+ {
+ m_ProjectStore->DiscoverProjects();
+
+ m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) {
+ std::vector<std::string> OpLogs = Project.ScanForOplogs();
+ CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
+ });
+ }
+ Cbo.EndArray();
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+}
+
+void
+HttpProjectService::HandleProjectDetailsRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::ProjectDetails");
+
+ using namespace std::literals;
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+ bool CSV = Params.GetValue("csv"sv) == "true"sv;
+ bool Details = Params.GetValue("details"sv) == "true"sv;
+ bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv;
+ bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv;
+
+ Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId);
+ if (!FoundProject)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ ProjectStore::Project& Project = *FoundProject.Get();
+
+ if (CSV)
+ {
+ ExtendableStringBuilder<4096> CSVWriter;
+ CSVHeader(Details, AttachmentDetails, CSVWriter);
+
+ FoundProject->IterateOplogs([&](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) {
+ Oplog.IterateOplogWithKey([this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](ProjectStore::LogSequenceNumber LSN,
+ const Oid& Key,
+ CbObjectView Op) {
+ CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
+ });
+ });
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
+ }
+ else
+ {
+ CbObjectWriter Cbo;
+ std::vector<std::string> OpLogs = FoundProject->ScanForOplogs();
+ Cbo.BeginArray("projects");
+ {
+ CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
+ }
+ Cbo.EndArray();
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+}
+
+void
+HttpProjectService::HandleOplogDetailsRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::OplogDetails");
+
+ using namespace std::literals;
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+ bool CSV = Params.GetValue("csv"sv) == "true"sv;
+ bool Details = Params.GetValue("details"sv) == "true"sv;
+ bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv;
+ bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv;
+
+ Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId);
+ if (!FoundProject)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ Ref<ProjectStore::Oplog> FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ ProjectStore::Project& Project = *FoundProject.Get();
+ ProjectStore::Oplog& Oplog = *FoundLog;
+ if (CSV)
+ {
+ ExtendableStringBuilder<4096> CSVWriter;
+ CSVHeader(Details, AttachmentDetails, CSVWriter);
+
+ Oplog.IterateOplogWithKey([this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](ProjectStore::LogSequenceNumber LSN,
+ const Oid& Key,
+ CbObjectView Op) {
+ CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
+ });
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
+ }
+ else
+ {
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("oplogs");
+ {
+ CbWriteOplog(m_CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo);
+ }
+ Cbo.EndArray();
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+}
+
+void
+HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req)
+{
+ ZEN_TRACE_CPU("ProjectService::OplogOpDetails");
+
+ using namespace std::literals;
+
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ const auto& OpId = Req.GetCapture(3);
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+ bool CSV = Params.GetValue("csv"sv) == "true"sv;
+ bool Details = Params.GetValue("details"sv) == "true"sv;
+ bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv;
+ bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv;
+
+ Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId);
+ if (!FoundProject)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ Ref<ProjectStore::Oplog> FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true);
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ if (OpId.size() != 2 * sizeof(Oid::OidBits))
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, OpId));
+ }
+
+ const Oid ObjId = Oid::FromHexString(OpId);
+ ProjectStore::Project& Project = *FoundProject.Get();
+ ProjectStore::Oplog& Oplog = *FoundLog;
+
+ std::optional<CbObject> Op = Oplog.GetOpByKey(ObjId);
+ if (!Op.has_value())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ ProjectStore::LogSequenceNumber LSN = Oplog.GetOpIndexByKey(ObjId);
+ if (!LSN)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ if (CSV)
+ {
+ ExtendableStringBuilder<4096> CSVWriter;
+ CSVHeader(Details, AttachmentDetails, CSVWriter);
+
+ CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, ObjId, Op.value(), CSVWriter);
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
+ }
+ else
+ {
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("ops");
+ {
+ CbWriteOp(m_CidStore, Details, OpDetails, AttachmentDetails, LSN, ObjId, Op.value(), Cbo);
+ }
+ Cbo.EndArray();
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+}
+
+} // namespace zen