diff options
| author | Stefan Boberg <[email protected]> | 2025-10-14 11:32:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-14 11:32:16 +0200 |
| commit | ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2 (patch) | |
| tree | 005a50adfddf6982bab3a06bb93d4c50da1a11fd /src/zenserver/storage/projectstore/httpprojectstore.cpp | |
| parent | make asiohttp work without IPv6 (#562) (diff) | |
| download | zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.tar.xz zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.zip | |
move all storage-related services into storage tree (#571)
* move all storage-related services into storage tree
* move config into config/
* also move admin service into storage since it mostly has storage related functionality
* header consolidation
Diffstat (limited to 'src/zenserver/storage/projectstore/httpprojectstore.cpp')
| -rw-r--r-- | src/zenserver/storage/projectstore/httpprojectstore.cpp | 3307 |
1 files changed, 3307 insertions, 0 deletions
diff --git a/src/zenserver/storage/projectstore/httpprojectstore.cpp b/src/zenserver/storage/projectstore/httpprojectstore.cpp new file mode 100644 index 000000000..1c6b5d6b0 --- /dev/null +++ b/src/zenserver/storage/projectstore/httpprojectstore.cpp @@ -0,0 +1,3307 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpprojectstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryutil.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/memory/llm.h> +#include <zencore/scopeguard.h> +#include <zencore/stream.h> +#include <zencore/trace.h> +#include <zenhttp/packageformat.h> +#include <zenremotestore/projectstore/buildsremoteprojectstore.h> +#include <zenremotestore/projectstore/fileremoteprojectstore.h> +#include <zenremotestore/projectstore/jupiterremoteprojectstore.h> +#include <zenremotestore/projectstore/remoteprojectstore.h> +#include <zenremotestore/projectstore/zenremoteprojectstore.h> +#include <zenstore/oplogreferencedset.h> +#include <zenstore/projectstore.h> +#include <zenstore/zenstore.h> +#include <zenutil/openprocesscache.h> +#include <zenutil/workerpools.h> + +namespace zen { + +const FLLMTag& +GetProjectHttpTag() +{ + static FLLMTag _("http", FLLMTag("project")); + + return _; +} + +void +CSVHeader(bool Details, bool AttachmentDetails, StringBuilderBase& CSVWriter) +{ + if (AttachmentDetails) + { + CSVWriter << "Project, Oplog, LSN, Key, Cid, Size"; + } + else if (Details) + { + CSVWriter << "Project, Oplog, LSN, Key, Size, AttachmentCount, AttachmentsSize"; + } + else + { + CSVWriter << "Project, Oplog, Key"; + } +} + +void +CSVWriteOp(CidStore& CidStore, + std::string_view ProjectId, + std::string_view OplogId, + bool Details, + bool AttachmentDetails, + ProjectStore::LogSequenceNumber LSN, + const Oid& Key, + CbObjectView Op, + StringBuilderBase& CSVWriter) +{ + StringBuilder<32> KeyStringBuilder; + Key.ToString(KeyStringBuilder); + const std::string_view KeyString = KeyStringBuilder.ToView(); + + if (AttachmentDetails) + { + Op.IterateAttachments([&CidStore, &CSVWriter, &ProjectId, &OplogId, LSN, &KeyString](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); + CSVWriter << "\r\n" + << ProjectId << ", " << OplogId << ", " << LSN.Number << ", " << KeyString << ", " << AttachmentHash.ToHexString() + << ", " << gsl::narrow<uint64_t>(Attachment.GetSize()); + }); + } + else if (Details) + { + uint64_t AttachmentCount = 0; + size_t AttachmentsSize = 0; + Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + AttachmentCount++; + IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); + AttachmentsSize += Attachment.GetSize(); + }); + CSVWriter << "\r\n" + << ProjectId << ", " << OplogId << ", " << LSN.Number << ", " << KeyString << ", " << gsl::narrow<uint64_t>(Op.GetSize()) + << ", " << AttachmentCount << ", " << gsl::narrow<uint64_t>(AttachmentsSize); + } + else + { + CSVWriter << "\r\n" << ProjectId << ", " << OplogId << ", " << KeyString; + } +}; + +////////////////////////////////////////////////////////////////////////// + +namespace { + + void CbWriteOp(CidStore& CidStore, + bool Details, + bool OpDetails, + bool AttachmentDetails, + ProjectStore::LogSequenceNumber LSN, + const Oid& Key, + CbObjectView Op, + CbObjectWriter& CbWriter) + { + CbWriter.BeginObject(); + { + CbWriter.AddObjectId("key", Key); + if (Details) + { + CbWriter.AddInteger("lsn", LSN.Number); + CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Op.GetSize())); + } + if (AttachmentDetails) + { + CbWriter.BeginArray("attachments"); + Op.IterateAttachments([&CidStore, &CbWriter](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + CbWriter.BeginObject(); + { + IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); + CbWriter.AddString("cid", AttachmentHash.ToHexString()); + CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Attachment.GetSize())); + } + CbWriter.EndObject(); + }); + CbWriter.EndArray(); + } + else if (Details) + { + uint64_t AttachmentCount = 0; + size_t AttachmentsSize = 0; + Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + AttachmentCount++; + IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); + AttachmentsSize += Attachment.GetSize(); + }); + if (AttachmentCount > 0) + { + CbWriter.AddInteger("attachments", AttachmentCount); + CbWriter.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize)); + } + } + if (OpDetails) + { + CbWriter.BeginObject("op"); + for (const CbFieldView& Field : Op) + { + if (!Field.HasName()) + { + CbWriter.AddField(Field); + continue; + } + std::string_view FieldName = Field.GetName(); + CbWriter.AddField(FieldName, Field); + } + CbWriter.EndObject(); + } + } + CbWriter.EndObject(); + }; + + void CbWriteOplogOps(CidStore& CidStore, + ProjectStore::Oplog& Oplog, + bool Details, + bool OpDetails, + bool AttachmentDetails, + CbObjectWriter& Cbo) + { + Cbo.BeginArray("ops"); + { + Oplog.IterateOplogWithKey([&Cbo, &CidStore, Details, OpDetails, AttachmentDetails](ProjectStore::LogSequenceNumber LSN, + const Oid& Key, + CbObjectView Op) { + CbWriteOp(CidStore, Details, OpDetails, AttachmentDetails, LSN, Key, Op, Cbo); + }); + } + Cbo.EndArray(); + } + + void CbWriteOplog(CidStore& CidStore, + ProjectStore::Oplog& Oplog, + bool Details, + bool OpDetails, + bool AttachmentDetails, + CbObjectWriter& Cbo) + { + Cbo.BeginObject(); + { + Cbo.AddString("name", Oplog.OplogId()); + CbWriteOplogOps(CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo); + } + Cbo.EndObject(); + } + + void CbWriteOplogs(CidStore& CidStore, + ProjectStore::Project& Project, + std::vector<std::string> OpLogs, + bool Details, + bool OpDetails, + bool AttachmentDetails, + CbObjectWriter& Cbo) + { + Cbo.BeginArray("oplogs"); + { + for (const std::string& OpLogId : OpLogs) + { + Ref<ProjectStore::Oplog> Oplog = Project.OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + if (Oplog) + { + CbWriteOplog(CidStore, *Oplog, Details, OpDetails, AttachmentDetails, Cbo); + } + } + } + Cbo.EndArray(); + } + + void CbWriteProject(CidStore& CidStore, + ProjectStore::Project& Project, + std::vector<std::string> OpLogs, + bool Details, + bool OpDetails, + bool AttachmentDetails, + CbObjectWriter& Cbo) + { + Cbo.BeginObject(); + { + Cbo.AddString("name", Project.Identifier); + CbWriteOplogs(CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); + } + Cbo.EndObject(); + } + + struct CreateRemoteStoreResult + { + std::shared_ptr<RemoteProjectStore> Store; + std::string Description; + }; + + CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params, + AuthMgr& AuthManager, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + const std::filesystem::path& TempFilePath) + { + ZEN_MEMSCOPE(GetProjectHttpTag()); + + using namespace std::literals; + + std::shared_ptr<RemoteProjectStore> RemoteStore; + + if (CbObjectView File = Params["file"sv].AsObjectView(); File) + { + std::filesystem::path FolderPath(File["path"sv].AsString()); + if (FolderPath.empty()) + { + return {nullptr, "Missing file path"}; + } + std::string_view Name(File["name"sv].AsString()); + if (Name.empty()) + { + return {nullptr, "Missing file name"}; + } + std::string_view OptionalBaseName(File["basename"sv].AsString()); + bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false); + bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false); + + FileRemoteStoreOptions Options = { + RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + FolderPath, + std::string(Name), + std::string(OptionalBaseName), + ForceDisableBlocks, + ForceEnableTempBlocks}; + RemoteStore = CreateFileRemoteStore(Options); + } + + if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud) + { + std::string_view CloudServiceUrl = Cloud["url"sv].AsString(); + if (CloudServiceUrl.empty()) + { + return {nullptr, "Missing service url"}; + } + + std::string Url = UrlDecode(CloudServiceUrl); + std::string_view Namespace = Cloud["namespace"sv].AsString(); + if (Namespace.empty()) + { + return {nullptr, "Missing namespace"}; + } + std::string_view Bucket = Cloud["bucket"sv].AsString(); + if (Bucket.empty()) + { + return {nullptr, "Missing bucket"}; + } + std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString(); + std::string AccessToken = std::string(Cloud["access-token"sv].AsString()); + if (AccessToken.empty()) + { + std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString(); + if (!AccessTokenEnvVariable.empty()) + { + AccessToken = GetEnvVariable(AccessTokenEnvVariable); + } + } + std::filesystem::path OidcExePath; + if (std::string_view OidcExePathString = Cloud["oidc-exe-path"].AsString(); !OidcExePathString.empty()) + { + std::filesystem::path OidcExePathMaybe(OidcExePathString); + if (IsFile(OidcExePathMaybe)) + { + OidcExePath = std::move(OidcExePathMaybe); + } + else + { + ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); + } + } + std::string_view KeyParam = Cloud["key"sv].AsString(); + if (KeyParam.empty()) + { + return {nullptr, "Missing key"}; + } + if (KeyParam.length() != IoHash::StringLength) + { + return {nullptr, "Invalid key"}; + } + IoHash Key = IoHash::FromHexString(KeyParam); + if (Key == IoHash::Zero) + { + return {nullptr, "Invalid key string"}; + } + IoHash BaseKey = IoHash::Zero; + std::string_view BaseKeyParam = Cloud["basekey"sv].AsString(); + if (!BaseKeyParam.empty()) + { + if (BaseKeyParam.length() != IoHash::StringLength) + { + return {nullptr, "Invalid base key"}; + } + BaseKey = IoHash::FromHexString(BaseKeyParam); + if (BaseKey == IoHash::Zero) + { + return {nullptr, "Invalid base key string"}; + } + } + + bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false); + bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false); + bool AssumeHttp2 = Cloud["assumehttp2"sv].AsBool(false); + + JupiterRemoteStoreOptions Options = { + RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + Url, + std::string(Namespace), + std::string(Bucket), + Key, + BaseKey, + std::string(OpenIdProvider), + AccessToken, + AuthManager, + OidcExePath, + ForceDisableBlocks, + ForceDisableTempBlocks, + AssumeHttp2}; + RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true); + } + + if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen) + { + std::string_view Url = Zen["url"sv].AsString(); + std::string_view Project = Zen["project"sv].AsString(); + if (Project.empty()) + { + return {nullptr, "Missing project"}; + } + std::string_view Oplog = Zen["oplog"sv].AsString(); + if (Oplog.empty()) + { + return {nullptr, "Missing oplog"}; + } + ZenRemoteStoreOptions Options = { + RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + std::string(Url), + std::string(Project), + std::string(Oplog)}; + RemoteStore = CreateZenRemoteStore(Options, TempFilePath); + } + + if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds) + { + std::string_view BuildsServiceUrl = Builds["url"sv].AsString(); + if (BuildsServiceUrl.empty()) + { + return {nullptr, "Missing service url"}; + } + + std::string Url = UrlDecode(BuildsServiceUrl); + std::string_view Namespace = Builds["namespace"sv].AsString(); + if (Namespace.empty()) + { + return {nullptr, "Missing namespace"}; + } + std::string_view Bucket = Builds["bucket"sv].AsString(); + if (Bucket.empty()) + { + return {nullptr, "Missing bucket"}; + } + std::string_view OpenIdProvider = Builds["openid-provider"sv].AsString(); + std::string AccessToken = std::string(Builds["access-token"sv].AsString()); + if (AccessToken.empty()) + { + std::string_view AccessTokenEnvVariable = Builds["access-token-env"].AsString(); + if (!AccessTokenEnvVariable.empty()) + { + AccessToken = GetEnvVariable(AccessTokenEnvVariable); + } + } + std::filesystem::path OidcExePath; + if (std::string_view OidcExePathString = Builds["oidc-exe-path"].AsString(); !OidcExePathString.empty()) + { + std::filesystem::path OidcExePathMaybe(OidcExePathString); + if (IsFile(OidcExePathMaybe)) + { + OidcExePath = std::move(OidcExePathMaybe); + } + else + { + ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); + } + } + std::string_view BuildIdParam = Builds["buildsid"sv].AsString(); + if (BuildIdParam.empty()) + { + return {nullptr, "Missing build id"}; + } + if (BuildIdParam.length() != Oid::StringLength) + { + return {nullptr, "Invalid build id"}; + } + Oid BuildId = Oid::FromHexString(BuildIdParam); + if (BuildId == Oid::Zero) + { + return {nullptr, "Invalid build id string"}; + } + + bool ForceDisableBlocks = Builds["disableblocks"sv].AsBool(false); + bool ForceDisableTempBlocks = Builds["disabletempblocks"sv].AsBool(false); + bool AssumeHttp2 = Builds["assumehttp2"sv].AsBool(false); + + MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView(); + IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize()); + + BuildsRemoteStoreOptions Options = { + RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + Url, + std::string(Namespace), + std::string(Bucket), + BuildId, + std::string(OpenIdProvider), + AccessToken, + AuthManager, + OidcExePath, + ForceDisableBlocks, + ForceDisableTempBlocks, + AssumeHttp2, + MetaData}; + RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true); + } + + if (!RemoteStore) + { + return {nullptr, "Unknown remote store type"}; + } + + return {std::move(RemoteStore), ""}; + } + + std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result) + { + if (Result.ErrorCode == 0) + { + return {HttpResponseCode::OK, Result.Text}; + } + return {static_cast<HttpResponseCode>(Result.ErrorCode), + Result.Reason.empty() ? Result.Text + : Result.Text.empty() ? Result.Reason + : fmt::format("{}: {}", Result.Reason, Result.Text)}; + } + +} // namespace + +////////////////////////////////////////////////////////////////////////// + +HttpProjectService::HttpProjectService(CidStore& Store, + ProjectStore* Projects, + HttpStatusService& StatusService, + HttpStatsService& StatsService, + AuthMgr& AuthMgr, + OpenProcessCache& InOpenProcessCache, + JobQueue& InJobQueue) +: m_Log(logging::Get("project")) +, m_CidStore(Store) +, m_ProjectStore(Projects) +, m_StatusService(StatusService) +, m_StatsService(StatsService) +, m_AuthMgr(AuthMgr) +, m_OpenProcessCache(InOpenProcessCache) +, m_JobQueue(InJobQueue) +{ + ZEN_MEMSCOPE(GetProjectHttpTag()); + + using namespace std::literals; + + m_Router.AddPattern("project", "([[:alnum:]_.]+)"); + m_Router.AddPattern("log", "([[:alnum:]_.]+)"); + m_Router.AddPattern("op", "([[:digit:]]+?)"); + m_Router.AddPattern("chunk", "([[:xdigit:]]{24})"); + m_Router.AddPattern("hash", "([[:xdigit:]]{40})"); + + m_Router.RegisterRoute( + "", + [this](HttpRouterRequest& Req) { HandleProjectListRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "list", + [this](HttpRouterRequest& Req) { HandleProjectListRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/batch", + [this](HttpRouterRequest& Req) { HandleChunkBatchRequest(Req); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/files", + [this](HttpRouterRequest& Req) { HandleFilesRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/chunkinfos", + [this](HttpRouterRequest& Req) { HandleChunkInfosRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/{chunk}/info", + [this](HttpRouterRequest& Req) { HandleChunkInfoRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/{chunk}", + [this](HttpRouterRequest& Req) { HandleChunkByIdRequest(Req); }, + HttpVerb::kGet | HttpVerb::kHead); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/{hash}", + [this](HttpRouterRequest& Req) { HandleChunkByCidRequest(Req); }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/prep", + [this](HttpRouterRequest& Req) { HandleOplogOpPrepRequest(Req); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/new", + [this](HttpRouterRequest& Req) { HandleOplogOpNewRequest(Req); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/validate", + [this](HttpRouterRequest& Req) { HandleOplogValidateRequest(Req); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/{op}", + [this](HttpRouterRequest& Req) { HandleOpLogOpRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}/oplog/{log}", + [this](HttpRouterRequest& Req) { HandleOpLogRequest(Req); }, + HttpVerb::kGet | HttpVerb::kPut | HttpVerb::kPost | HttpVerb::kDelete); + + m_Router.RegisterRoute( + "{project}/oplog/{log}/entries", + [this](HttpRouterRequest& Req) { HandleOpLogEntriesRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{project}", + [this](HttpRouterRequest& Req) { HandleProjectRequest(Req); }, + HttpVerb::kGet | HttpVerb::kPut | HttpVerb::kPost | HttpVerb::kDelete); + + // Push a oplog container + m_Router.RegisterRoute( + "{project}/oplog/{log}/save", + [this](HttpRouterRequest& Req) { HandleOplogSaveRequest(Req); }, + HttpVerb::kPost); + + // Pull a oplog container + m_Router.RegisterRoute( + "{project}/oplog/{log}/load", + [this](HttpRouterRequest& Req) { HandleOplogLoadRequest(Req); }, + HttpVerb::kGet); + + // Do an rpc style operation on project/oplog + m_Router.RegisterRoute( + "{project}/oplog/{log}/rpc", + [this](HttpRouterRequest& Req) { HandleRpcRequest(Req); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "details\\$", + [this](HttpRouterRequest& Req) { HandleDetailsRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "details\\$/{project}", + [this](HttpRouterRequest& Req) { HandleProjectDetailsRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "details\\$/{project}/{log}", + [this](HttpRouterRequest& Req) { HandleOplogDetailsRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "details\\$/{project}/{log}/{chunk}", + [this](HttpRouterRequest& Req) { HandleOplogOpDetailsRequest(Req); }, + HttpVerb::kGet); + + m_StatusService.RegisterHandler("prj", *this); + m_StatsService.RegisterHandler("prj", *this); +} + +HttpProjectService::~HttpProjectService() +{ + m_StatsService.UnregisterHandler("prj", *this); + m_StatusService.UnregisterHandler("prj", *this); +} + +const char* +HttpProjectService::BaseUri() const +{ + return "/prj/"; +} + +void +HttpProjectService::HandleRequest(HttpServerRequest& Request) +{ + m_ProjectStats.RequestCount++; + + ZEN_MEMSCOPE(GetProjectHttpTag()); + + metrics::OperationTiming::Scope $(m_HttpRequests); + + if (m_Router.HandleRequest(Request) == false) + { + m_ProjectStats.BadRequestCount++; + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + } +} + +void +HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq) +{ + ZEN_TRACE_CPU("ProjectService::Stats"); + + const GcStorageSize StoreSize = m_ProjectStore->StorageSize(); + const CidStoreSize CidSize = m_CidStore.TotalSize(); + + CbObjectWriter Cbo; + + EmitSnapshot("requests", m_HttpRequests, Cbo); + + Cbo.BeginObject("store"); + { + Cbo.BeginObject("size"); + { + Cbo << "disk" << StoreSize.DiskSize; + Cbo << "memory" << StoreSize.MemorySize; + } + Cbo.EndObject(); + + Cbo.BeginObject("project"); + { + Cbo << "readcount" << m_ProjectStats.ProjectReadCount << "writecount" << m_ProjectStats.ProjectWriteCount << "deletecount" + << m_ProjectStats.ProjectDeleteCount; + } + Cbo.EndObject(); + + Cbo.BeginObject("oplog"); + { + Cbo << "readcount" << m_ProjectStats.OpLogReadCount << "writecount" << m_ProjectStats.OpLogWriteCount << "deletecount" + << m_ProjectStats.OpLogDeleteCount; + } + Cbo.EndObject(); + + Cbo.BeginObject("op"); + { + Cbo << "hitcount" << m_ProjectStats.OpHitCount << "misscount" << m_ProjectStats.OpMissCount << "writecount" + << m_ProjectStats.OpWriteCount; + } + Cbo.EndObject(); + + Cbo.BeginObject("chunk"); + { + Cbo << "hitcount" << m_ProjectStats.ChunkHitCount << "misscount" << m_ProjectStats.ChunkMissCount << "writecount" + << m_ProjectStats.ChunkWriteCount; + } + Cbo.EndObject(); + + Cbo << "requestcount" << m_ProjectStats.RequestCount; + Cbo << "badrequestcount" << m_ProjectStats.BadRequestCount; + } + Cbo.EndObject(); + + Cbo.BeginObject("cid"); + { + Cbo.BeginObject("size"); + { + Cbo << "tiny" << CidSize.TinySize; + Cbo << "small" << CidSize.SmallSize; + Cbo << "large" << CidSize.LargeSize; + Cbo << "total" << CidSize.TotalSize; + } + Cbo.EndObject(); + } + Cbo.EndObject(); + + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void +HttpProjectService::HandleStatusRequest(HttpServerRequest& Request) +{ + ZEN_TRACE_CPU("HttpProjectService::Status"); + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void +HttpProjectService::HandleProjectListRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::ProjectList"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + CbArray ProjectsList = m_ProjectStore->GetProjectsList(); + HttpReq.WriteResponse(HttpResponseCode::OK, ProjectsList); +} + +void +HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::ChunkBatch"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchOplog(OplogId); + + // Parse Request + + IoBuffer Payload = HttpReq.ReadPayload(); + BinaryReader Reader(Payload); + + struct RequestHeader + { + enum + { + kMagic = 0xAAAA'77AC + }; + uint32_t Magic; + uint32_t ChunkCount; + uint32_t Reserved1; + uint32_t Reserved2; + }; + + struct RequestChunkEntry + { + Oid ChunkId; + uint32_t CorrelationId; + uint64_t Offset; + uint64_t RequestBytes; + }; + + if (Payload.Size() <= sizeof(RequestHeader)) + { + m_ProjectStats.BadRequestCount++; + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + RequestHeader RequestHdr; + Reader.Read(&RequestHdr, sizeof RequestHdr); + + if (RequestHdr.Magic != RequestHeader::kMagic) + { + m_ProjectStats.BadRequestCount++; + HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + std::vector<RequestChunkEntry> RequestedChunks; + RequestedChunks.resize(RequestHdr.ChunkCount); + Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); + + // Make Response + + struct ResponseHeader + { + uint32_t Magic = 0xbada'b00f; + uint32_t ChunkCount; + uint32_t Reserved1 = 0; + uint32_t Reserved2 = 0; + }; + + struct ResponseChunkEntry + { + uint32_t CorrelationId; + uint32_t Flags = 0; + uint64_t ChunkSize; + }; + + std::vector<IoBuffer> OutBlobs; + OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry)); + for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) + { + const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; + IoBuffer FoundChunk = FoundLog->FindChunk(Project->RootDir, RequestedChunk.ChunkId, nullptr); + if (FoundChunk) + { + if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) + { + uint64_t Offset = RequestedChunk.Offset; + if (Offset > FoundChunk.Size()) + { + Offset = FoundChunk.Size(); + } + uint64_t Size = RequestedChunk.RequestBytes; + if ((Offset + Size) > FoundChunk.Size()) + { + Size = FoundChunk.Size() - Offset; + } + FoundChunk = IoBuffer(FoundChunk, Offset, Size); + } + } + OutBlobs.emplace_back(std::move(FoundChunk)); + } + uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData()); + ResponseHeader ResponseHdr; + ResponseHdr.ChunkCount = RequestHdr.ChunkCount; + memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); + ResponsePtr += sizeof(ResponseHdr); + for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) + { + const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; + const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]); + ResponseChunkEntry ResponseChunk; + ResponseChunk.CorrelationId = RequestedChunk.CorrelationId; + if (FoundChunk) + { + ResponseChunk.ChunkSize = FoundChunk.Size(); + m_ProjectStats.ChunkHitCount++; + } + else + { + ResponseChunk.ChunkSize = uint64_t(-1); + m_ProjectStats.ChunkMissCount++; + } + memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); + ResponsePtr += sizeof(ResponseChunk); + } + std::erase_if(OutBlobs, [](IoBuffer Buffer) -> bool { return !Buffer; }); + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); +} + +void +HttpProjectService::HandleFilesRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::Files"); + + using namespace std::literals; + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + // File manifest fetch, returns the client file list + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + std::unordered_set<std::string> WantedFieldNames; + if (auto FieldFilter = HttpServerRequest::Decode(Params.GetValue("fieldnames")); !FieldFilter.empty()) + { + if (FieldFilter != "*") // Get all - empty FieldFilter equal getting all fields + { + ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) { + WantedFieldNames.insert(std::string(FieldName)); + return true; + }); + } + } + else + { + const bool FilterClient = Params.GetValue("filter"sv) == "client"sv; + WantedFieldNames.insert("id"); + WantedFieldNames.insert("clientpath"); + if (!FilterClient) + { + WantedFieldNames.insert("serverpath"); + } + } + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Project files request for unknown project '{}'", ProjectId)); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Project files for unknown oplog '{}/{}'", ProjectId, OplogId)); + } + Project->TouchOplog(OplogId); + + CbObject ResponsePayload = ProjectStore::GetProjectFiles(Log(), *Project, *FoundLog, WantedFieldNames); + + if (HttpReq.AcceptContentType() == HttpContentType::kCompressedBinary) + { + CompositeBuffer Payload = CompressedBuffer::Compress(ResponsePayload.GetBuffer()).GetCompressed(); + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, Payload); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); + } +} + +void +HttpProjectService::HandleChunkInfosRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::ChunkInfos"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + std::unordered_set<std::string> WantedFieldNames; + if (auto FieldFilter = HttpServerRequest::Decode(Params.GetValue("fieldnames")); !FieldFilter.empty()) + { + if (FieldFilter != "*") // Get all - empty FieldFilter equal getting all fields + { + ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) { + WantedFieldNames.insert(std::string(FieldName)); + return true; + }); + } + } + else + { + WantedFieldNames.insert("id"); + WantedFieldNames.insert("rawhash"); + WantedFieldNames.insert("rawsize"); + } + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Chunk infos request for unknown project '{}'", ProjectId)); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Chunk infos for unknown oplog '{}/{}'", ProjectId, OplogId)); + } + Project->TouchOplog(OplogId); + + CbObject ResponsePayload = ProjectStore::GetProjectChunkInfos(Log(), *Project, *FoundLog, WantedFieldNames); + if (HttpReq.AcceptContentType() == HttpContentType::kCompressedBinary) + { + CompositeBuffer Payload = CompressedBuffer::Compress(ResponsePayload.GetBuffer()).GetCompressed(); + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, Payload); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); + } +} + +void +HttpProjectService::HandleChunkInfoRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::ChunkInfo"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& ChunkId = Req.GetCapture(3); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Chunk info request for unknown project '{}'", ProjectId)); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Chunk info for unknown oplog '{}/{}'", ProjectId, OplogId)); + } + Project->TouchOplog(OplogId); + + if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)); + } + + const Oid Obj = Oid::FromHexString(ChunkId); + + CbObject ResponsePayload = ProjectStore::GetChunkInfo(Log(), *Project, *FoundLog, Obj); + if (ResponsePayload) + { + m_ProjectStats.ChunkHitCount++; + return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); + } + else + { + m_ProjectStats.ChunkMissCount++; + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Chunk info for unknown chunk '{}/{}/{}'", ProjectId, OplogId, ChunkId)); + } +} + +void +HttpProjectService::HandleChunkByIdRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::ChunkById"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& ChunkId = Req.GetCapture(3); + + uint64_t Offset = 0; + uint64_t Size = ~(0ull); + + auto QueryParms = HttpReq.GetQueryParams(); + + if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false) + { + if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm)) + { + Offset = OffsetVal.value(); + } + else + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + } + + if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false) + { + if (auto SizeVal = ParseInt<uint64_t>(SizeParm)) + { + Size = SizeVal.value(); + } + else + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + } + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Chunk request for unknown project '{}'", ProjectId)); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)); + } + Project->TouchOplog(OplogId); + + if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, ChunkId)); + } + + const Oid Obj = Oid::FromHexString(ChunkId); + + HttpContentType AcceptType = HttpReq.AcceptContentType(); + + ProjectStore::GetChunkRangeResult Result = + ProjectStore::GetChunkRange(Log(), *Project, *FoundLog, Obj, Offset, Size, AcceptType, /*OptionalInOutModificationTag*/ nullptr); + + switch (Result.Error) + { + case ProjectStore::GetChunkRangeResult::EError::Ok: + m_ProjectStats.ChunkHitCount++; + ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Result.ContentType)); + return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ContentType, Result.Chunk); + case ProjectStore::GetChunkRangeResult::EError::NotFound: + m_ProjectStats.ChunkMissCount++; + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, Result.ContentType, Result.Chunk); + case ProjectStore::GetChunkRangeResult::EError::MalformedContent: + return HttpReq.WriteResponse( + HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Get chunk {}/{}/{} failed. Reason: {}", ProjectId, OplogId, ChunkId, Result.ErrorDescription)); + case ProjectStore::GetChunkRangeResult::EError::OutOfRange: + m_ProjectStats.ChunkMissCount++; + ZEN_DEBUG("chunk - '{}/{}/{}' OUT OF RANGE", ProjectId, OplogId, ChunkId); + return HttpReq.WriteResponse( + HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Get chunk {}/{}/{} failed. Reason: {}", ProjectId, OplogId, ChunkId, Result.ErrorDescription)); + default: + ZEN_ASSERT(false); + break; + } +} + +void +HttpProjectService::HandleChunkByCidRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::ChunkByCid"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& Cid = Req.GetCapture(3); + HttpContentType AcceptType = HttpReq.AcceptContentType(); + HttpContentType RequestType = HttpReq.RequestContentType(); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Chunk request for unknown project '{}'", ProjectId)); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)); + } + Project->TouchOplog(OplogId); + + if (Cid.length() != IoHash::StringLength) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, Cid)); + } + + const IoHash Hash = IoHash::FromHexString(Cid); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + IoBuffer Value = m_ProjectStore->GetChunk(*Project, *FoundLog, Hash); + if (Value) + { + if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary || + AcceptType == ZenContentType::kJSON || AcceptType == ZenContentType::kYAML || + AcceptType == ZenContentType::kCbObject) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Value)); + IoBuffer DecompressedBuffer = Compressed.Decompress().AsIoBuffer(); + + if (DecompressedBuffer) + { + if (AcceptType == ZenContentType::kJSON || AcceptType == ZenContentType::kYAML || + AcceptType == ZenContentType::kCbObject) + { + CbValidateError CbErr = ValidateCompactBinary(DecompressedBuffer.GetView(), CbValidateMode::Default); + if (!!CbErr) + { + m_ProjectStats.BadRequestCount++; + ZEN_DEBUG( + "chunk - '{}/{}/{}' WRONGTYPE. Reason: `Requested {} format, but could not convert to object`", + ProjectId, + OplogId, + Cid, + ToString(AcceptType)); + return HttpReq.WriteResponse( + HttpResponseCode::NotAcceptable, + HttpContentType::kText, + fmt::format("Content format not supported, requested {} format, but could not convert to object", + ToString(AcceptType))); + } + + m_ProjectStats.ChunkHitCount++; + CbObject ContainerObject = LoadCompactBinaryObject(DecompressedBuffer); + return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerObject); + } + else + { + Value = DecompressedBuffer; + Value.SetContentType(ZenContentType::kBinary); + } + } + else + { + m_ProjectStats.BadRequestCount++; + ZEN_DEBUG("chunk - '{}/{}/{}' WRONGTYPE. Reason: `Requested {} format, but could not decompress stored data`", + ProjectId, + OplogId, + Cid, + ToString(AcceptType)); + return HttpReq.WriteResponse( + HttpResponseCode::NotAcceptable, + HttpContentType::kText, + fmt::format("Content format not supported, requested {} format, but could not decompress stored data", + ToString(AcceptType))); + } + } + m_ProjectStats.ChunkHitCount++; + return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value); + } + else + { + m_ProjectStats.ChunkMissCount++; + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + } + case HttpVerb::kPost: + { + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + if (RequestType != HttpContentType::kCompressedBinary) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Chunk request for chunk id '{}/{}'/'{}' as unexpected content type: '{}'", + ProjectId, + OplogId, + Cid, + ToString(RequestType))); + } + IoBuffer Payload = HttpReq.ReadPayload(); + Payload.SetContentType(RequestType); + bool IsNew = m_ProjectStore->PutChunk(*Project, *FoundLog, Hash, std::move(Payload)); + + m_ProjectStats.ChunkWriteCount++; + return HttpReq.WriteResponse(IsNew ? HttpResponseCode::Created : HttpResponseCode::OK); + } + break; + } +} + +void +HttpProjectService::HandleOplogOpPrepRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::OplogOpPrep"); + + using namespace std::literals; + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchOplog(OplogId); + + // This operation takes a list of referenced hashes and decides which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response + + CbValidateError ValidateResult; + if (CbObject RequestObject = ValidateAndReadCompactBinaryObject(HttpReq.ReadPayload(), ValidateResult); + ValidateResult == CbValidateError::None) + { + std::vector<IoHash> NeedList; + + { + eastl::fixed_vector<IoHash, 16> ChunkList; + CbArrayView HaveList = RequestObject["have"sv].AsArrayView(); + ChunkList.reserve(HaveList.Num()); + for (auto& Entry : HaveList) + { + ChunkList.push_back(Entry.AsHash()); + } + + NeedList = FoundLog->CheckPendingChunkReferences(std::span(begin(ChunkList), end(ChunkList)), std::chrono::minutes(2)); + } + + CbObjectWriter Cbo(1 + 1 + 5 + NeedList.size() * (1 + sizeof(IoHash::Hash)) + 1); + Cbo.BeginArray("need"); + { + for (const IoHash& Hash : NeedList) + { + ZEN_DEBUG("prep - NEED: {}", Hash); + Cbo << Hash; + } + } + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponseCode::OK, Response); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid compact binary format: '{}'", ToString(ValidateResult))); + } +} + +void +HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::OplogOpNew"); + + using namespace std::literals; + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + bool IsUsingSalt = false; + IoHash SaltHash = IoHash::Zero; + + if (std::string_view SaltParam = Params.GetValue("salt"sv); SaltParam.empty() == false) + { + const uint32_t Salt = std::stoi(std::string(SaltParam)); + SaltHash = IoHash::HashBuffer(&Salt, sizeof Salt); + IsUsingSalt = true; + } + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchOplog(OplogId); + + ProjectStore::Oplog& Oplog = *FoundLog; + + IoBuffer Payload = HttpReq.ReadPayload(); + + // This will attempt to open files which may not exist for the case where + // the prep step rejected the chunk. This should be fixed since there's + // a performance cost associated with any file system activity + + bool IsValid = true; + std::vector<IoHash> MissingChunks; + + CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer { + if (m_CidStore.ContainsChunk(Hash)) + { + // Return null attachment as we already have it, no point in reading it and storing it again + return {}; + } + + IoHash AttachmentId; + if (IsUsingSalt) + { + IoHash AttachmentSpec[]{SaltHash, Hash}; + AttachmentId = IoHash::HashBuffer(MakeMemoryView(AttachmentSpec)); + } + else + { + AttachmentId = Hash; + } + + std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString(); + if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) + { + Data.SetDeleteOnClose(true); + return SharedBuffer(std::move(Data)); + } + else + { + IsValid = false; + MissingChunks.push_back(Hash); + + return {}; + } + }; + + CbPackage Package; + + if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver)) + { + CbValidateError ValidateResult; + if (CbObject Core = ValidateAndReadCompactBinaryObject(IoBuffer(Payload), ValidateResult); + ValidateResult == CbValidateError::None && Core) + { + Package.SetObject(Core); + } + else + { + std::filesystem::path BadPackagePath = + Oplog.TempPath() / "bad_packages"sv / fmt::format("session{}_request{}"sv, HttpReq.SessionId(), HttpReq.RequestId()); + + ZEN_WARN("Received malformed package ('{}')! Saving payload to '{}'", ToString(ValidateResult), BadPackagePath); + + WriteFile(BadPackagePath, Payload); + + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + u8"request body must be a compact binary object or package in legacy format"); + } + } + + m_ProjectStats.ChunkMissCount += MissingChunks.size(); + + if (!IsValid) + { + ExtendableStringBuilder<256> ResponseText; + ResponseText.Append("Missing chunk references: "); + + bool IsFirst = true; + for (const auto& Hash : MissingChunks) + { + if (IsFirst) + { + IsFirst = false; + } + else + { + ResponseText.Append(", "); + } + Hash.ToHexString(ResponseText); + } + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, ResponseText); + } + + CbObject Core = Package.GetObject(); + + if (!Core["key"sv]) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); + } + + eastl::fixed_vector<IoHash, 16> ReferencedChunks; + Core.IterateAttachments([&ReferencedChunks](CbFieldView View) { ReferencedChunks.push_back(View.AsAttachment()); }); + + // Write core to oplog + + size_t AttachmentCount = Package.GetAttachments().size(); + const ProjectStore::LogSequenceNumber OpLsn = Oplog.AppendNewOplogEntry(Package); + if (!OpLsn) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + m_ProjectStats.ChunkWriteCount += AttachmentCount; + + // Once we stored the op, we no longer need to retain any chunks this op references + if (!ReferencedChunks.empty()) + { + FoundLog->RemovePendingChunkReferences(std::span(begin(ReferencedChunks), end(ReferencedChunks))); + } + + m_ProjectStats.OpWriteCount++; + ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn.Number, NiceBytes(Payload.Size()), Core["key"sv].AsString()); + HttpReq.WriteResponse(HttpResponseCode::Created); +} + +void +HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::OplogOpValidate"); + + using namespace std::literals; + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, ZenContentType::kText, fmt::format("Project '{}' not found", ProjectId)); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + ZenContentType::kText, + fmt::format("Oplog '{}' not found in project '{}'", OplogId, ProjectId)); + } + Project->TouchOplog(OplogId); + + ProjectStore::Oplog& Oplog = *FoundLog; + + std::atomic_bool CancelFlag = false; + ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(Project->RootDir, CancelFlag, &GetSmallWorkerPool(EWorkloadType::Burst)); + tsl::robin_map<Oid, std::string, Oid::Hasher> KeyNameLookup; + KeyNameLookup.reserve(Result.OpKeys.size()); + for (const auto& It : Result.OpKeys) + { + KeyNameLookup.insert_or_assign(It.first, It.second); + } + CbObjectWriter Writer; + Writer << "HasMissingData" << !Result.IsEmpty(); + Writer << "OpCount" << Result.OpCount; + Writer << "LSNLow" << Result.LSNLow.Number; + Writer << "LSNHigh" << Result.LSNHigh.Number; + if (!Result.MissingFiles.empty()) + { + Writer.BeginArray("MissingFiles"); + for (const auto& MissingFile : Result.MissingFiles) + { + Writer.BeginObject(); + { + Writer << "Key" << MissingFile.first; + Writer << "KeyName" << KeyNameLookup[MissingFile.first]; + Writer << "Id" << MissingFile.second.Id; + Writer << "Hash" << MissingFile.second.Hash; + Writer << "ServerPath" << MissingFile.second.ServerPath; + Writer << "ClientPath" << MissingFile.second.ClientPath; + } + Writer.EndObject(); + } + Writer.EndArray(); + } + if (!Result.MissingChunks.empty()) + { + Writer.BeginArray("MissingChunks"); + for (const auto& MissingChunk : Result.MissingChunks) + { + Writer.BeginObject(); + { + Writer << "Key" << MissingChunk.first; + Writer << "KeyName" << KeyNameLookup[MissingChunk.first]; + Writer << "Id" << MissingChunk.second.Id; + Writer << "Hash" << MissingChunk.second.Hash; + } + Writer.EndObject(); + } + Writer.EndArray(); + } + if (!Result.MissingMetas.empty()) + { + Writer.BeginArray("MissingMetas"); + for (const auto& MissingMeta : Result.MissingMetas) + { + Writer.BeginObject(); + { + Writer << "Key" << MissingMeta.first; + Writer << "KeyName" << KeyNameLookup[MissingMeta.first]; + Writer << "Id" << MissingMeta.second.Id; + Writer << "Hash" << MissingMeta.second.Hash; + } + Writer.EndObject(); + } + Writer.EndArray(); + } + if (!Result.MissingAttachments.empty()) + { + Writer.BeginArray("MissingAttachments"); + for (const auto& MissingMeta : Result.MissingAttachments) + { + Writer.BeginObject(); + { + Writer << "Key" << MissingMeta.first; + Writer << "KeyName" << KeyNameLookup[MissingMeta.first]; + Writer << "Hash" << MissingMeta.second; + } + Writer.EndObject(); + } + Writer.EndArray(); + } + CbObject Response = Writer.Save(); + HttpReq.WriteResponse(HttpResponseCode::OK, Response); +} + +void +HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::OplogOp"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const std::string_view ProjectId = Req.GetCapture(1); + const std::string_view OplogId = Req.GetCapture(2); + const std::string_view OpIdString = Req.GetCapture(3); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchOplog(OplogId); + + ProjectStore::Oplog& Oplog = *FoundLog; + + if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString)) + { + if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(ProjectStore::LogSequenceNumber(OpId.value()))) + { + CbObject& Op = MaybeOp.value(); + if (HttpReq.AcceptContentType() == ZenContentType::kCbPackage) + { + CbPackage Package; + Package.SetObject(Op); + + Op.IterateAttachments([&](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); + if (Payload) + { + switch (Payload.GetContentType()) + { + case ZenContentType::kCbObject: + { + CbValidateError ValidateResult; + if (CbObject Object = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult); + ValidateResult == CbValidateError::None && Object) + { + Package.AddAttachment(CbAttachment(Object)); + } + else + { + // Error - malformed object + ZEN_WARN("malformed object returned for {} ('{}')", AttachmentHash, ToString(ValidateResult)); + } + } + break; + + case ZenContentType::kCompressedBinary: + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload))) + { + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash)); + } + else + { + // Error - not compressed! + + ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash); + } + break; + + default: + Package.AddAttachment(CbAttachment(SharedBuffer(Payload))); + break; + } + } + }); + m_ProjectStats.OpHitCount++; + return HttpReq.WriteResponse(HttpResponseCode::Accepted, Package); + } + else + { + // Client cannot accept a package, so we only send the core object + m_ProjectStats.OpHitCount++; + return HttpReq.WriteResponse(HttpResponseCode::Accepted, Op); + } + } + } + m_ProjectStats.OpMissCount++; + return HttpReq.WriteResponse(HttpResponseCode::NotFound); +} + +void +HttpProjectService::HandleOpLogRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::Oplog"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + using namespace std::literals; + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("project {} not found", ProjectId)); + } + Project->TouchProject(); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + Ref<ProjectStore::Oplog> OplogIt = Project->ReadOplog(OplogId); + if (!OplogIt) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("oplog {} not found in project {}", OplogId, ProjectId)); + } + + ProjectStore::Oplog& Log = *OplogIt; + + CbObjectWriter Cb; + Cb << "id"sv << Log.OplogId() << "project"sv << Project->Identifier << "tempdir"sv << Log.TempPath().c_str() + << "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount" << Log.OplogCount() + << "expired"sv << Project->IsExpired(GcClock::TimePoint::min(), Log); + HttpReq.WriteResponse(HttpResponseCode::OK, Cb.Save()); + + m_ProjectStats.OpLogReadCount++; + } + break; + + case HttpVerb::kPost: + { + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + std::filesystem::path OplogMarkerPath; + if (CbObject Params = HttpReq.ReadPayloadObject()) + { + OplogMarkerPath = Params["gcpath"sv].AsString(); + } + + Ref<ProjectStore::Oplog> OplogIt = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + if (!OplogIt) + { + if (!Project->NewOplog(OplogId, OplogMarkerPath)) + { + // TODO: indicate why the operation failed! + return HttpReq.WriteResponse(HttpResponseCode::InternalServerError); + } + Project->TouchOplog(OplogId); + + m_ProjectStats.OpLogWriteCount++; + ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); + + return HttpReq.WriteResponse(HttpResponseCode::Created); + } + + // I guess this should ultimately be used to execute RPCs but for now, it + // does absolutely nothing + + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + break; + + case HttpVerb::kPut: + { + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + std::filesystem::path OplogMarkerPath; + if (CbObject Params = HttpReq.ReadPayloadObject()) + { + OplogMarkerPath = Params["gcpath"sv].AsString(); + } + + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + if (!FoundLog) + { + if (!Project->NewOplog(OplogId, OplogMarkerPath)) + { + // TODO: indicate why the operation failed! + return HttpReq.WriteResponse(HttpResponseCode::InternalServerError); + } + Project->TouchOplog(OplogId); + + m_ProjectStats.OpLogWriteCount++; + ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); + + return HttpReq.WriteResponse(HttpResponseCode::Created); + } + Project->TouchOplog(OplogId); + + FoundLog->Update(OplogMarkerPath); + + m_ProjectStats.OpLogWriteCount++; + ZEN_INFO("updated oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); + + return HttpReq.WriteResponse(HttpResponseCode::OK); + } + break; + + case HttpVerb::kDelete: + { + ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId); + + if (Project->DeleteOplog(OplogId)) + { + m_ProjectStats.OpLogDeleteCount++; + return HttpReq.WriteResponse(HttpResponseCode::OK); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::Locked, + HttpContentType::kText, + fmt::format("oplog {}/{} is in use", ProjectId, OplogId)); + } + } + break; + + default: + break; + } +} + +std::optional<OplogReferencedSet> +LoadReferencedSet(ProjectStore::Project& Project, ProjectStore::Oplog& Log) +{ + using namespace std::literals; + + Oid ReferencedSetOplogId = OpKeyStringAsOid(OplogReferencedSet::ReferencedSetOplogKey); + std::optional<CbObject> ReferencedSetOp = Log.GetOpByKey(ReferencedSetOplogId); + if (!ReferencedSetOp) + { + return std::optional<OplogReferencedSet>(); + } + // We expect only a single file in the "files" array; get the chunk for the first file + CbFieldView FileField = *(*ReferencedSetOp)["files"sv].AsArrayView().CreateViewIterator(); + Oid ChunkId = FileField.AsObjectView()["id"sv].AsObjectId(); + if (ChunkId == Oid::Zero) + { + return std::optional<OplogReferencedSet>(); + } + + return OplogReferencedSet::LoadFromChunk(Log.FindChunk(Project.RootDir, ChunkId, nullptr)); +} + +void +HttpProjectService::HandleOpLogEntriesRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::OplogEntries"); + + using namespace std::literals; + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + Project->TouchOplog(OplogId); + + CbObjectWriter Response; + + if (FoundLog->OplogCount() > 0) + { + std::unordered_set<std::string> FieldNamesFilter; + auto FilterObject = [&FieldNamesFilter](CbObjectView& Object) -> CbObject { + CbObject RewrittenOp = RewriteCbObject(Object, [&FieldNamesFilter](CbObjectWriter&, CbFieldView Field) -> bool { + if (FieldNamesFilter.contains(std::string(Field.GetName()))) + { + return false; + } + + return true; + }); + + return RewrittenOp; + }; + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + if (auto FieldFilter = HttpServerRequest::Decode(Params.GetValue("fieldfilter")); !FieldFilter.empty()) + { + ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) { + FieldNamesFilter.insert(std::string(FieldName)); + return true; + }); + } + + if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty()) + { + Oid OpKeyId = OpKeyStringAsOid(OpKey); + std::optional<CbObject> Op = FoundLog->GetOpByKey(OpKeyId); + + if (Op.has_value()) + { + if (FieldNamesFilter.empty()) + { + Response << "entry"sv << Op.value(); + } + else + { + Response << "entry"sv << FilterObject(Op.value()); + } + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + } + else + { + ProjectStore::Oplog::Paging EntryPaging; + if (std::string_view Param = Params.GetValue("start"); !Param.empty()) + { + if (auto Value = ParseInt<int32>(Param)) + { + EntryPaging.Start = *Value; + } + } + if (std::string_view Param = Params.GetValue("count"); !Param.empty()) + { + if (auto Value = ParseInt<int32>(Param)) + { + EntryPaging.Count = *Value; + } + } + + std::optional<OplogReferencedSet> MaybeReferencedSet; + if (auto TrimString = Params.GetValue("trim_by_referencedset"); TrimString == "true") + { + MaybeReferencedSet = LoadReferencedSet(*Project, *FoundLog); + } + Response.BeginArray("entries"sv); + + bool ShouldFilterFields = !FieldNamesFilter.empty(); + + if (MaybeReferencedSet) + { + const OplogReferencedSet& ReferencedSet = MaybeReferencedSet.value(); + FoundLog->IterateOplogWithKey( + [this, &Response, &FilterObject, ShouldFilterFields, &ReferencedSet](ProjectStore::LogSequenceNumber /* LSN */, + const Oid& Key, + CbObjectView Op) { + if (!ReferencedSet.Contains(Key)) + { + if (!OplogReferencedSet::IsNonPackage(Op["key"].AsString())) + { + return; + } + } + + if (ShouldFilterFields) + { + Response << FilterObject(Op); + } + else + { + Response << Op; + } + }, + EntryPaging); + } + else + { + FoundLog->IterateOplog( + [this, &Response, &FilterObject, ShouldFilterFields](CbObjectView Op) { + if (ShouldFilterFields) + { + Response << FilterObject(Op); + } + else + { + Response << Op; + } + }, + EntryPaging); + } + + Response.EndArray(); + } + } + if (HttpReq.AcceptContentType() == HttpContentType::kCompressedBinary) + { + CompositeBuffer Payload = CompressedBuffer::Compress(Response.Save().GetBuffer()).GetCompressed(); + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, Payload); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); + } +} + +void +HttpProjectService::HandleProjectRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::Project"); + + using namespace std::literals; + + HttpServerRequest& HttpReq = Req.ServerRequest(); + const std::string_view ProjectId = Req.GetCapture(1); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kPost: + { + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + CbValidateError ValidateResult; + if (CbObject Params = ValidateAndReadCompactBinaryObject(HttpReq.ReadPayload(), ValidateResult); + ValidateResult == CbValidateError::None) + { + std::filesystem::path Root = Params["root"sv].AsU8String(); // Workspace root (i.e `D:/UE5/`) + std::filesystem::path EngineRoot = Params["engine"sv].AsU8String(); // Engine root (i.e `D:/UE5/Engine`) + std::filesystem::path ProjectRoot = + Params["project"sv].AsU8String(); // Project root directory (i.e `D:/UE5/Samples/Games/Lyra`) + std::filesystem::path ProjectFilePath = + Params["projectfile"sv].AsU8String(); // Project file path (i.e `D:/UE5/Samples/Games/Lyra/Lyra.uproject`) + + const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId; + m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath); + + ZEN_INFO("established project (id: '{}', roots: '{}', '{}', '{}', '{}'{})", + ProjectId, + Root, + EngineRoot, + ProjectRoot, + ProjectFilePath, + ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); + + m_ProjectStats.ProjectWriteCount++; + HttpReq.WriteResponse(HttpResponseCode::Created); + } + else + { + HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Malformed compact binary object: '{}'", ToString(ValidateResult))); + } + } + break; + + case HttpVerb::kPut: + { + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + CbValidateError ValidateResult; + if (CbObject Params = ValidateAndReadCompactBinaryObject(HttpReq.ReadPayload(), ValidateResult); + ValidateResult == CbValidateError::None) + { + std::filesystem::path Root = Params["root"sv].AsU8String(); // Workspace root (i.e `D:/UE5/`) + std::filesystem::path EngineRoot = Params["engine"sv].AsU8String(); // Engine root (i.e `D:/UE5/Engine`) + std::filesystem::path ProjectRoot = + Params["project"sv].AsU8String(); // Project root directory (i.e `D:/UE5/Samples/Games/Lyra`) + std::filesystem::path ProjectFilePath = + Params["projectfile"sv].AsU8String(); // Project file path (i.e `D:/UE5/Samples/Games/Lyra/Lyra.uproject`) + + if (m_ProjectStore->UpdateProject(ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath)) + { + m_ProjectStats.ProjectWriteCount++; + ZEN_INFO("updated project (id: '{}', roots: '{}', '{}', '{}', '{}'{})", + ProjectId, + Root, + EngineRoot, + ProjectRoot, + ProjectFilePath, + ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); + + HttpReq.WriteResponse(HttpResponseCode::OK); + } + else + { + const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId; + m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath); + + m_ProjectStats.ProjectWriteCount++; + ZEN_INFO("established project (id: '{}', roots: '{}', '{}', '{}', '{}'{})", + ProjectId, + Root, + EngineRoot, + ProjectRoot, + ProjectFilePath, + ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); + + HttpReq.WriteResponse(HttpResponseCode::Created); + } + } + else + { + HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Malformed compact binary object: '{}'", ToString(ValidateResult))); + } + } + break; + + case HttpVerb::kGet: + { + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); + } + Project->TouchProject(); + + std::vector<std::string> OpLogs = Project->ScanForOplogs(); + + CbObjectWriter Response; + Response << "id"sv << Project->Identifier; + Response << "root"sv << PathToUtf8(Project->RootDir); + Response << "engine"sv << PathToUtf8(Project->EngineRootDir); + Response << "project"sv << PathToUtf8(Project->ProjectRootDir); + Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath); + + Response.BeginArray("oplogs"sv); + for (const std::string& OplogId : OpLogs) + { + Response.BeginObject(); + Response << "id"sv << OplogId; + Response.EndObject(); + } + Response.EndArray(); // oplogs + + HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); + + m_ProjectStats.ProjectReadCount++; + } + break; + + case HttpVerb::kDelete: + { + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); + } + + ZEN_INFO("deleting project '{}'", ProjectId); + if (!m_ProjectStore->DeleteProject(ProjectId)) + { + return HttpReq.WriteResponse(HttpResponseCode::Locked, + HttpContentType::kText, + fmt::format("project {} is in use", ProjectId)); + } + + m_ProjectStats.ProjectDeleteCount++; + return HttpReq.WriteResponse(HttpResponseCode::NoContent); + } + break; + + default: + break; + } +} + +void +HttpProjectService::HandleOplogSaveRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::OplogSave"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + if (HttpReq.RequestContentType() != HttpContentType::kCbObject) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type"); + } + IoBuffer Payload = HttpReq.ReadPayload(); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Write oplog request for unknown project '{}'", ProjectId)); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); + if (!Oplog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); + } + Project->TouchOplog(OplogId); + + CbValidateError ValidateResult; + if (CbObject ContainerObject = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult); + ValidateResult == CbValidateError::None && ContainerObject) + { + RwLock AttachmentsLock; + tsl::robin_set<IoHash, IoHash::Hasher> Attachments; + + auto HasAttachment = [this](const IoHash& RawHash) { return m_CidStore.ContainsChunk(RawHash); }; + auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) { + RwLock::ExclusiveLockScope _(AttachmentsLock); + if (BlockHash != IoHash::Zero) + { + Attachments.insert(BlockHash); + } + else + { + Attachments.insert(ChunkHashes.begin(), ChunkHashes.end()); + } + }; + auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) { + RwLock::ExclusiveLockScope _(AttachmentsLock); + Attachments.insert(RawHash); + }; + + auto OnChunkedAttachment = [](const ChunkedInfo&) {}; + + auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog->CaptureAddedAttachments(RawHashes); }; + + // Make sure we retain any attachments we download before writing the oplog + Oplog->EnableUpdateCapture(); + auto _ = MakeGuard([&Oplog]() { Oplog->DisableUpdateCapture(); }); + + RemoteProjectStore::Result Result = SaveOplogContainer(*Oplog, + ContainerObject, + OnReferencedAttachments, + HasAttachment, + OnNeedBlock, + OnNeedAttachment, + OnChunkedAttachment, + nullptr); + + if (Result.ErrorCode == 0) + { + if (Attachments.empty()) + { + HttpReq.WriteResponse(HttpResponseCode::OK); + } + else + { + CbObjectWriter Cbo(1 + 1 + 5 + Attachments.size() * (1 + sizeof(IoHash::Hash)) + 1); + Cbo.BeginArray("need"); + { + for (const IoHash& Hash : Attachments) + { + ZEN_DEBUG("Need attachment {}", Hash); + Cbo << Hash; + } + } + Cbo.EndArray(); // "need" + + CbObject ResponsePayload = Cbo.Save(); + return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); + } + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + Result.ErrorCode, + Result.Reason); + + if (Result.Reason.empty()) + { + return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode)); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.Reason); + } + } + } + else + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid payload: '{}'", ToString(ValidateResult))); + } +} + +void +HttpProjectService::HandleOplogLoadRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::OplogLoad"); + + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + const HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + if (HttpReq.AcceptContentType() != HttpContentType::kCbObject) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type"); + } + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Read oplog request for unknown project '{}'", ProjectId)); + } + Project->TouchProject(); + + Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); + if (!Oplog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); + } + Project->TouchOplog(OplogId); + + size_t MaxBlockSize = RemoteStoreOptions::DefaultMaxBlockSize; + if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false) + { + if (auto Value = ParseInt<size_t>(Param)) + { + MaxBlockSize = Value.value(); + } + } + size_t MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize; + if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false) + { + if (auto Value = ParseInt<size_t>(Param)) + { + MaxChunkEmbedSize = Value.value(); + } + } + size_t MaxChunksPerBlock = RemoteStoreOptions::DefaultMaxChunksPerBlock; + if (auto Param = Params.GetValue("maxchunksperblock"); Param.empty() == false) + { + if (auto Value = ParseInt<size_t>(Param)) + { + MaxChunksPerBlock = Value.value(); + } + } + + size_t ChunkFileSizeLimit = RemoteStoreOptions::DefaultChunkFileSizeLimit; + if (auto Param = Params.GetValue("chunkfilesizelimit"); Param.empty() == false) + { + if (auto Value = ParseInt<size_t>(Param)) + { + ChunkFileSizeLimit = Value.value(); + } + } + + WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); + + RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( + m_CidStore, + *Project, + *Oplog, + WorkerPool, + MaxBlockSize, + MaxChunkEmbedSize, + MaxChunksPerBlock, + ChunkFileSizeLimit, + /* BuildBlocks */ false, + /* IgnoreMissingAttachments */ false, + /* AllowChunking*/ false, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /* EmbedLooseFiles*/ false); + + if (ContainerResult.ErrorCode == 0) + { + return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerResult.ContainerObject); + } + else + { + ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", + ToString(HttpReq.RequestVerb()), + HttpReq.QueryString(), + ContainerResult.ErrorCode, + ContainerResult.Reason); + + if (ContainerResult.Reason.empty()) + { + return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode)); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode), HttpContentType::kText, ContainerResult.Reason); + } + } +} + +void +HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::Rpc"); + using namespace std::literals; + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + IoBuffer Payload = HttpReq.ReadPayload(); + + HttpContentType PayloadContentType = HttpReq.RequestContentType(); + CbPackage Package; + CbObject Cb; + switch (PayloadContentType) + { + case HttpContentType::kJSON: + case HttpContentType::kUnknownContentType: + case HttpContentType::kText: + { + std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); + Cb = LoadCompactBinaryFromJson(JsonText).AsObject(); + if (!Cb) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected JSON format"); + } + } + break; + case HttpContentType::kCbObject: + { + CbValidateError ValidateResult; + if (Cb = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult); + ValidateResult != CbValidateError::None || !Cb) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Content format not supported, expected compact binary format ('{}')", ToString(ValidateResult))); + } + break; + } + case HttpContentType::kCbPackage: + try + { + Package = ParsePackageMessage(Payload); + Cb = Package.GetObject(); + } + catch (const std::invalid_argument& ex) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Failed to parse package request, reason: '{}'", ex.what())); + } + if (!Cb) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected package message format"); + } + break; + default: + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type"); + } + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Rpc oplog request for unknown project '{}'", ProjectId)); + } + Project->TouchProject(); + + std::string_view Method = Cb["method"sv].AsString(); + + bool VerifyPathOnDisk = Method != "getchunks"sv; + + Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk); + if (!Oplog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); + } + Project->TouchOplog(OplogId); + + uint32_t MethodHash = HashStringDjb2(Method); + + switch (MethodHash) + { + case HashStringDjb2("import"sv): + { + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + CbObjectView Params = Cb["params"sv].AsObjectView(); + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); + bool Force = Params["force"sv].AsBool(false); + bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); + bool CleanOplog = Params["clean"].AsBool(false); + + CreateRemoteStoreResult RemoteStoreResult = + CreateRemoteStore(Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath()); + + if (RemoteStoreResult.Store == nullptr) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description); + } + std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); + RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + + JobId JobId = m_JobQueue.QueueJob( + fmt::format("Import oplog '{}/{}'", Project->Identifier, Oplog->OplogId()), + [this, + ChunkStore = &m_CidStore, + ActualRemoteStore = std::move(RemoteStore), + Oplog, + Force, + IgnoreMissingAttachments, + CleanOplog](JobContext& Context) { + Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}", + Oplog->GetOuterProjectIdentifier(), + Oplog->OplogId(), + ActualRemoteStore->GetInfo().Description)); + + WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); + WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background); + + RemoteProjectStore::Result Result = LoadOplog(m_CidStore, + *ActualRemoteStore, + *Oplog, + NetworkWorkerPool, + WorkerPool, + Force, + IgnoreMissingAttachments, + CleanOplog, + &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, + (int)Response.first); + } + }); + + return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id)); + } + case HashStringDjb2("export"sv): + { + CbObjectView Params = Cb["params"sv].AsObjectView(); + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); + size_t MaxChunksPerBlock = Params["maxchunksperblock"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunksPerBlock); + size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit); + bool Force = Params["force"sv].AsBool(false); + bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); + bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); + + CreateRemoteStoreResult RemoteStoreResult = + CreateRemoteStore(Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath()); + + if (RemoteStoreResult.Store == nullptr) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description); + } + std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); + RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + + JobId JobId = m_JobQueue.QueueJob( + fmt::format("Export oplog '{}/{}'", Project->Identifier, Oplog->OplogId()), + [this, + ActualRemoteStore = std::move(RemoteStore), + Project, + Oplog, + MaxBlockSize, + MaxChunksPerBlock, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + EmbedLooseFile, + Force, + IgnoreMissingAttachments](JobContext& Context) { + Context.ReportMessage(fmt::format("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", + Project->Identifier, + Oplog->OplogId(), + ActualRemoteStore->GetInfo().Description, + NiceBytes(MaxBlockSize), + NiceBytes(MaxChunkEmbedSize))); + + WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); + WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background); + + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *ActualRemoteStore, + *Project, + *Oplog, + NetworkWorkerPool, + WorkerPool, + MaxBlockSize, + MaxChunksPerBlock, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + EmbedLooseFile, + Force, + IgnoreMissingAttachments, + &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, + (int)Response.first); + } + }); + + return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id)); + } + case HashStringDjb2("getchunks"sv): + { + RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u)); + int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0); + + std::vector<ProjectStore::ChunkRequest> Requests = m_ProjectStore->ParseChunksRequests(*Project, *Oplog, Cb); + std::vector<ProjectStore::ChunkResult> Results = + Requests.empty() ? std::vector<ProjectStore::ChunkResult>{} : m_ProjectStore->GetChunks(*Project, *Oplog, Requests); + CbPackage Response = m_ProjectStore->WriteChunksRequestResponse(*Project, *Oplog, std::move(Requests), std::move(Results)); + + void* TargetProcessHandle = nullptr; + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + { + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } + TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId); + } + + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(Response, Flags, TargetProcessHandle); + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + case HashStringDjb2("putchunks"sv): + { + ZEN_TRACE_CPU("Store::Rpc::putchunks"); + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + CbObject Object = Package.GetObject(); + const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false); + + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + if (!Attachments.empty()) + { + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + + WriteAttachmentBuffers.reserve(Attachments.size()); + WriteRawHashes.reserve(Attachments.size()); + + for (const CbAttachment& Attachment : Attachments) + { + IoHash RawHash = Attachment.GetHash(); + const CompressedBuffer& Compressed = Attachment.AsCompressedBinary(); + IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + if (UsingTempFiles) + { + AttachmentPayload.SetDeleteOnClose(true); + } + WriteAttachmentBuffers.push_back(std::move(AttachmentPayload)); + WriteRawHashes.push_back(RawHash); + } + + Oplog->CaptureAddedAttachments(WriteRawHashes); + m_CidStore.AddChunks(WriteAttachmentBuffers, + WriteRawHashes, + UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly); + } + return HttpReq.WriteResponse(HttpResponseCode::OK); + } + case HashStringDjb2("snapshot"sv): + { + ZEN_TRACE_CPU("Store::Rpc::snapshot"); + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + // Snapshot all referenced files. This brings the content of all + // files into the CID store + + uint32_t OpCount = 0; + uint64_t InlinedBytes = 0; + uint64_t InlinedFiles = 0; + uint64_t TotalBytes = 0; + uint64_t TotalFiles = 0; + + std::vector<CbObject> NewOps; + struct AddedChunk + { + IoBuffer Buffer; + uint64_t RawSize = 0; + }; + tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks; + + Oplog->IterateOplog( + [&](CbObjectView Op) { + bool OpRewritten = false; + bool AllOk = true; + + CbWriter FilesArrayWriter; + FilesArrayWriter.BeginArray("files"sv); + + for (CbFieldView& Field : Op["files"sv]) + { + bool CopyField = true; + + if (CbObjectView View = Field.AsObjectView()) + { + const IoHash DataHash = View["data"sv].AsHash(); + + if (DataHash == IoHash::Zero) + { + std::string_view ServerPath = View["serverpath"sv].AsString(); + std::filesystem::path FilePath = Project->RootDir / ServerPath; + BasicFile DataFile; + std::error_code Ec; + DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); + + if (Ec) + { + // Error... + + ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message()); + + AllOk = false; + } + else + { + // Read file contents into memory, compress and keep in map of chunks to add to Cid store + IoBuffer FileIoBuffer = DataFile.ReadAll(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); + const uint64_t RawSize = Compressed.DecodeRawSize(); + const IoHash RawHash = Compressed.DecodeRawHash(); + if (!AddedChunks.contains(RawHash)) + { + const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString(); + BasicFile ChunkTempFile; + ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete); + ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec); + if (Ec) + { + Oid ChunkId = View["id"sv].AsObjectId(); + ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}", + FilePath, + ChunkId, + Ec.message()); + AllOk = false; + } + else + { + void* FileHandle = ChunkTempFile.Detach(); + IoBuffer ChunkBuffer(IoBuffer::File, + FileHandle, + 0, + Compressed.GetCompressed().GetSize(), + /*IsWholeFile*/ true); + ChunkBuffer.SetDeleteOnClose(true); + AddedChunks.insert_or_assign( + RawHash, + AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize}); + } + } + + TotalBytes += RawSize; + ++TotalFiles; + + // Rewrite file array entry with new data reference + CbObjectWriter Writer(View.GetSize()); + RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { + if (Field.GetName() == "data"sv) + { + // omit this field as we will write it explicitly ourselves + return true; + } + return false; + }); + Writer.AddBinaryAttachment("data"sv, RawHash); + + CbObject RewrittenOp = Writer.Save(); + FilesArrayWriter.AddObject(std::move(RewrittenOp)); + CopyField = false; + } + } + } + + if (CopyField) + { + FilesArrayWriter.AddField(Field); + } + else + { + OpRewritten = true; + } + } + + if (OpRewritten && AllOk) + { + FilesArrayWriter.EndArray(); + CbArray FilesArray = FilesArrayWriter.Save().AsArray(); + + CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { + if (Field.GetName() == "files"sv) + { + NewWriter.AddArray("files"sv, FilesArray); + + return true; + } + + return false; + }); + + NewOps.push_back(std::move(RewrittenOp)); + } + + OpCount++; + }, + ProjectStore::Oplog::Paging{}); + + CbObjectWriter ResponseObj; + + // Persist rewritten oplog entries + if (!NewOps.empty()) + { + ResponseObj.BeginArray("rewritten_ops"); + + for (CbObject& NewOp : NewOps) + { + ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); + + ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number); + + ResponseObj.AddInteger(NewLsn.Number); + } + + ResponseObj.EndArray(); + } + + // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the + // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have + // unreferenced chunks. + for (auto It : AddedChunks) + { + const IoHash& RawHash = It.first; + AddedChunk& Chunk = It.second; + CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash); + if (Result.New) + { + InlinedBytes += Chunk.RawSize; + ++InlinedFiles; + } + } + + ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; + ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; + + ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount); + + return HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save()); + } + case HashStringDjb2("appendops"sv): + { + ZEN_TRACE_CPU("Store::Rpc::appendops"); + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + CbArrayView OpsArray = Cb["ops"sv].AsArrayView(); + + size_t OpsBufferSize = 0; + for (CbFieldView OpView : OpsArray) + { + OpsBufferSize += OpView.GetSize(); + } + UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize); + MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView(); + + std::vector<CbObjectView> Ops; + Ops.reserve(OpsArray.Num()); + for (CbFieldView OpView : OpsArray) + { + OpView.CopyTo(OpsBuffersMemory); + Ops.push_back(CbObjectView(OpsBuffersMemory.GetData())); + OpsBuffersMemory.MidInline(OpView.GetSize()); + } + + std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops); + ZEN_ASSERT(LSNs.size() == Ops.size()); + + std::vector<IoHash> MissingAttachments; + for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++) + { + if (LSNs[OpIndex]) + { + CbObjectView Op = Ops[OpIndex]; + Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) { + const IoHash Cid = AttachmentView.AsAttachment(); + if (!m_CidStore.ContainsChunk(Cid)) + { + MissingAttachments.push_back(Cid); + } + }); + } + } + + CbPackage ResponsePackage; + + { + CbObjectWriter ResponseObj; + ResponseObj.BeginArray("written_ops"); + + for (ProjectStore::LogSequenceNumber NewLsn : LSNs) + { + ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number); + ResponseObj.AddInteger(NewLsn.Number); + } + ResponseObj.EndArray(); + + if (!MissingAttachments.empty()) + { + ResponseObj.BeginArray("need"); + + for (const IoHash& Cid : MissingAttachments) + { + ResponseObj.AddHash(Cid); + } + ResponseObj.EndArray(); + } + ResponsePackage.SetObject(ResponseObj.Save()); + } + + std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage); + + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers); + } + default: + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Unknown rpc method '{}'", Method)); + } +} +void +HttpProjectService::HandleDetailsRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::Details"); + + using namespace std::literals; + + HttpServerRequest& HttpReq = Req.ServerRequest(); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + bool CSV = Params.GetValue("csv"sv) == "true"sv; + bool Details = Params.GetValue("details"sv) == "true"sv; + bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv; + bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv; + + if (CSV) + { + ExtendableStringBuilder<4096> CSVWriter; + CSVHeader(Details, AttachmentDetails, CSVWriter); + + m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) { + Project.IterateOplogs([&](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) { + Oplog.IterateOplogWithKey( + [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](ProjectStore::LogSequenceNumber LSN, + const Oid& Key, + CbObjectView Op) { + CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); + }); + }); + }); + + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); + } + else + { + CbObjectWriter Cbo; + Cbo.BeginArray("projects"); + { + m_ProjectStore->DiscoverProjects(); + + m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) { + std::vector<std::string> OpLogs = Project.ScanForOplogs(); + CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); + }); + } + Cbo.EndArray(); + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } +} + +void +HttpProjectService::HandleProjectDetailsRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::ProjectDetails"); + + using namespace std::literals; + + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + bool CSV = Params.GetValue("csv"sv) == "true"sv; + bool Details = Params.GetValue("details"sv) == "true"sv; + bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv; + bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv; + + Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId); + if (!FoundProject) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + ProjectStore::Project& Project = *FoundProject.Get(); + + if (CSV) + { + ExtendableStringBuilder<4096> CSVWriter; + CSVHeader(Details, AttachmentDetails, CSVWriter); + + FoundProject->IterateOplogs([&](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) { + Oplog.IterateOplogWithKey([this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](ProjectStore::LogSequenceNumber LSN, + const Oid& Key, + CbObjectView Op) { + CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); + }); + }); + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); + } + else + { + CbObjectWriter Cbo; + std::vector<std::string> OpLogs = FoundProject->ScanForOplogs(); + Cbo.BeginArray("projects"); + { + CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); + } + Cbo.EndArray(); + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } +} + +void +HttpProjectService::HandleOplogDetailsRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::OplogDetails"); + + using namespace std::literals; + + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + bool CSV = Params.GetValue("csv"sv) == "true"sv; + bool Details = Params.GetValue("details"sv) == "true"sv; + bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv; + bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv; + + Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId); + if (!FoundProject) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + Ref<ProjectStore::Oplog> FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Project& Project = *FoundProject.Get(); + ProjectStore::Oplog& Oplog = *FoundLog; + if (CSV) + { + ExtendableStringBuilder<4096> CSVWriter; + CSVHeader(Details, AttachmentDetails, CSVWriter); + + Oplog.IterateOplogWithKey([this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](ProjectStore::LogSequenceNumber LSN, + const Oid& Key, + CbObjectView Op) { + CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); + }); + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); + } + else + { + CbObjectWriter Cbo; + Cbo.BeginArray("oplogs"); + { + CbWriteOplog(m_CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo); + } + Cbo.EndArray(); + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } +} + +void +HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("ProjectService::OplogOpDetails"); + + using namespace std::literals; + + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + const auto& OpId = Req.GetCapture(3); + + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + bool CSV = Params.GetValue("csv"sv) == "true"sv; + bool Details = Params.GetValue("details"sv) == "true"sv; + bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv; + bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv; + + Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId); + if (!FoundProject) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + Ref<ProjectStore::Oplog> FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + if (OpId.size() != 2 * sizeof(Oid::OidBits)) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, OpId)); + } + + const Oid ObjId = Oid::FromHexString(OpId); + ProjectStore::Project& Project = *FoundProject.Get(); + ProjectStore::Oplog& Oplog = *FoundLog; + + std::optional<CbObject> Op = Oplog.GetOpByKey(ObjId); + if (!Op.has_value()) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + ProjectStore::LogSequenceNumber LSN = Oplog.GetOpIndexByKey(ObjId); + if (!LSN) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + if (CSV) + { + ExtendableStringBuilder<4096> CSVWriter; + CSVHeader(Details, AttachmentDetails, CSVWriter); + + CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, ObjId, Op.value(), CSVWriter); + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); + } + else + { + CbObjectWriter Cbo; + Cbo.BeginArray("ops"); + { + CbWriteOp(m_CidStore, Details, OpDetails, AttachmentDetails, LSN, ObjId, Op.value(), Cbo); + } + Cbo.EndArray(); + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } +} + +} // namespace zen |