diff options
| author | Stefan Boberg <[email protected]> | 2025-10-14 11:32:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-14 11:32:16 +0200 |
| commit | ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2 (patch) | |
| tree | 005a50adfddf6982bab3a06bb93d4c50da1a11fd /src/zenserver/projectstore/httpprojectstore.cpp | |
| parent | make asiohttp work without IPv6 (#562) (diff) | |
| download | zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.tar.xz zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.zip | |
move all storage-related services into storage tree (#571)
* move all storage-related services into storage tree
* move config into config/
* also move admin service into storage since it mostly has storage related functionality
* header consolidation
Diffstat (limited to 'src/zenserver/projectstore/httpprojectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 3307 |
1 files changed, 0 insertions, 3307 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp deleted file mode 100644 index 1c6b5d6b0..000000000 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ /dev/null @@ -1,3307 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "httpprojectstore.h" - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compactbinaryutil.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/memory/llm.h> -#include <zencore/scopeguard.h> -#include <zencore/stream.h> -#include <zencore/trace.h> -#include <zenhttp/packageformat.h> -#include <zenremotestore/projectstore/buildsremoteprojectstore.h> -#include <zenremotestore/projectstore/fileremoteprojectstore.h> -#include <zenremotestore/projectstore/jupiterremoteprojectstore.h> -#include <zenremotestore/projectstore/remoteprojectstore.h> -#include <zenremotestore/projectstore/zenremoteprojectstore.h> -#include <zenstore/oplogreferencedset.h> -#include <zenstore/projectstore.h> -#include <zenstore/zenstore.h> -#include <zenutil/openprocesscache.h> -#include <zenutil/workerpools.h> - -namespace zen { - -const FLLMTag& -GetProjectHttpTag() -{ - static FLLMTag _("http", FLLMTag("project")); - - return _; -} - -void -CSVHeader(bool Details, bool AttachmentDetails, StringBuilderBase& CSVWriter) -{ - if (AttachmentDetails) - { - CSVWriter << "Project, Oplog, LSN, Key, Cid, Size"; - } - else if (Details) - { - CSVWriter << "Project, Oplog, LSN, Key, Size, AttachmentCount, AttachmentsSize"; - } - else - { - CSVWriter << "Project, Oplog, Key"; - } -} - -void -CSVWriteOp(CidStore& CidStore, - std::string_view ProjectId, - std::string_view OplogId, - bool Details, - bool AttachmentDetails, - ProjectStore::LogSequenceNumber LSN, - const Oid& Key, - CbObjectView Op, - StringBuilderBase& CSVWriter) -{ - StringBuilder<32> KeyStringBuilder; - Key.ToString(KeyStringBuilder); - const std::string_view KeyString = KeyStringBuilder.ToView(); - - if (AttachmentDetails) - { - Op.IterateAttachments([&CidStore, &CSVWriter, &ProjectId, &OplogId, LSN, &KeyString](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsAttachment(); - IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); - CSVWriter << "\r\n" - << ProjectId << ", " << OplogId << ", " << LSN.Number << ", " << KeyString << ", " << AttachmentHash.ToHexString() - << ", " << gsl::narrow<uint64_t>(Attachment.GetSize()); - }); - } - else if (Details) - { - uint64_t AttachmentCount = 0; - size_t AttachmentsSize = 0; - Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsAttachment(); - AttachmentCount++; - IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); - AttachmentsSize += Attachment.GetSize(); - }); - CSVWriter << "\r\n" - << ProjectId << ", " << OplogId << ", " << LSN.Number << ", " << KeyString << ", " << gsl::narrow<uint64_t>(Op.GetSize()) - << ", " << AttachmentCount << ", " << gsl::narrow<uint64_t>(AttachmentsSize); - } - else - { - CSVWriter << "\r\n" << ProjectId << ", " << OplogId << ", " << KeyString; - } -}; - -////////////////////////////////////////////////////////////////////////// - -namespace { - - void CbWriteOp(CidStore& CidStore, - bool Details, - bool OpDetails, - bool AttachmentDetails, - ProjectStore::LogSequenceNumber LSN, - const Oid& Key, - CbObjectView Op, - CbObjectWriter& CbWriter) - { - CbWriter.BeginObject(); - { - CbWriter.AddObjectId("key", Key); - if (Details) - { - CbWriter.AddInteger("lsn", LSN.Number); - CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Op.GetSize())); - } - if (AttachmentDetails) - { - CbWriter.BeginArray("attachments"); - Op.IterateAttachments([&CidStore, &CbWriter](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsAttachment(); - CbWriter.BeginObject(); - { - IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); - CbWriter.AddString("cid", AttachmentHash.ToHexString()); - CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Attachment.GetSize())); - } - CbWriter.EndObject(); - }); - CbWriter.EndArray(); - } - else if (Details) - { - uint64_t AttachmentCount = 0; - size_t AttachmentsSize = 0; - Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsAttachment(); - AttachmentCount++; - IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); - AttachmentsSize += Attachment.GetSize(); - }); - if (AttachmentCount > 0) - { - CbWriter.AddInteger("attachments", AttachmentCount); - CbWriter.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize)); - } - } - if (OpDetails) - { - CbWriter.BeginObject("op"); - for (const CbFieldView& Field : Op) - { - if (!Field.HasName()) - { - CbWriter.AddField(Field); - continue; - } - std::string_view FieldName = Field.GetName(); - CbWriter.AddField(FieldName, Field); - } - CbWriter.EndObject(); - } - } - CbWriter.EndObject(); - }; - - void CbWriteOplogOps(CidStore& CidStore, - ProjectStore::Oplog& Oplog, - bool Details, - bool OpDetails, - bool AttachmentDetails, - CbObjectWriter& Cbo) - { - Cbo.BeginArray("ops"); - { - Oplog.IterateOplogWithKey([&Cbo, &CidStore, Details, OpDetails, AttachmentDetails](ProjectStore::LogSequenceNumber LSN, - const Oid& Key, - CbObjectView Op) { - CbWriteOp(CidStore, Details, OpDetails, AttachmentDetails, LSN, Key, Op, Cbo); - }); - } - Cbo.EndArray(); - } - - void CbWriteOplog(CidStore& CidStore, - ProjectStore::Oplog& Oplog, - bool Details, - bool OpDetails, - bool AttachmentDetails, - CbObjectWriter& Cbo) - { - Cbo.BeginObject(); - { - Cbo.AddString("name", Oplog.OplogId()); - CbWriteOplogOps(CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo); - } - Cbo.EndObject(); - } - - void CbWriteOplogs(CidStore& CidStore, - ProjectStore::Project& Project, - std::vector<std::string> OpLogs, - bool Details, - bool OpDetails, - bool AttachmentDetails, - CbObjectWriter& Cbo) - { - Cbo.BeginArray("oplogs"); - { - for (const std::string& OpLogId : OpLogs) - { - Ref<ProjectStore::Oplog> Oplog = Project.OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); - if (Oplog) - { - CbWriteOplog(CidStore, *Oplog, Details, OpDetails, AttachmentDetails, Cbo); - } - } - } - Cbo.EndArray(); - } - - void CbWriteProject(CidStore& CidStore, - ProjectStore::Project& Project, - std::vector<std::string> OpLogs, - bool Details, - bool OpDetails, - bool AttachmentDetails, - CbObjectWriter& Cbo) - { - Cbo.BeginObject(); - { - Cbo.AddString("name", Project.Identifier); - CbWriteOplogs(CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); - } - Cbo.EndObject(); - } - - struct CreateRemoteStoreResult - { - std::shared_ptr<RemoteProjectStore> Store; - std::string Description; - }; - - CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params, - AuthMgr& AuthManager, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - const std::filesystem::path& TempFilePath) - { - ZEN_MEMSCOPE(GetProjectHttpTag()); - - using namespace std::literals; - - std::shared_ptr<RemoteProjectStore> RemoteStore; - - if (CbObjectView File = Params["file"sv].AsObjectView(); File) - { - std::filesystem::path FolderPath(File["path"sv].AsString()); - if (FolderPath.empty()) - { - return {nullptr, "Missing file path"}; - } - std::string_view Name(File["name"sv].AsString()); - if (Name.empty()) - { - return {nullptr, "Missing file name"}; - } - std::string_view OptionalBaseName(File["basename"sv].AsString()); - bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false); - bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false); - - FileRemoteStoreOptions Options = { - RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - FolderPath, - std::string(Name), - std::string(OptionalBaseName), - ForceDisableBlocks, - ForceEnableTempBlocks}; - RemoteStore = CreateFileRemoteStore(Options); - } - - if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud) - { - std::string_view CloudServiceUrl = Cloud["url"sv].AsString(); - if (CloudServiceUrl.empty()) - { - return {nullptr, "Missing service url"}; - } - - std::string Url = UrlDecode(CloudServiceUrl); - std::string_view Namespace = Cloud["namespace"sv].AsString(); - if (Namespace.empty()) - { - return {nullptr, "Missing namespace"}; - } - std::string_view Bucket = Cloud["bucket"sv].AsString(); - if (Bucket.empty()) - { - return {nullptr, "Missing bucket"}; - } - std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString(); - std::string AccessToken = std::string(Cloud["access-token"sv].AsString()); - if (AccessToken.empty()) - { - std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString(); - if (!AccessTokenEnvVariable.empty()) - { - AccessToken = GetEnvVariable(AccessTokenEnvVariable); - } - } - std::filesystem::path OidcExePath; - if (std::string_view OidcExePathString = Cloud["oidc-exe-path"].AsString(); !OidcExePathString.empty()) - { - std::filesystem::path OidcExePathMaybe(OidcExePathString); - if (IsFile(OidcExePathMaybe)) - { - OidcExePath = std::move(OidcExePathMaybe); - } - else - { - ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); - } - } - std::string_view KeyParam = Cloud["key"sv].AsString(); - if (KeyParam.empty()) - { - return {nullptr, "Missing key"}; - } - if (KeyParam.length() != IoHash::StringLength) - { - return {nullptr, "Invalid key"}; - } - IoHash Key = IoHash::FromHexString(KeyParam); - if (Key == IoHash::Zero) - { - return {nullptr, "Invalid key string"}; - } - IoHash BaseKey = IoHash::Zero; - std::string_view BaseKeyParam = Cloud["basekey"sv].AsString(); - if (!BaseKeyParam.empty()) - { - if (BaseKeyParam.length() != IoHash::StringLength) - { - return {nullptr, "Invalid base key"}; - } - BaseKey = IoHash::FromHexString(BaseKeyParam); - if (BaseKey == IoHash::Zero) - { - return {nullptr, "Invalid base key string"}; - } - } - - bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false); - bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false); - bool AssumeHttp2 = Cloud["assumehttp2"sv].AsBool(false); - - JupiterRemoteStoreOptions Options = { - RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - Url, - std::string(Namespace), - std::string(Bucket), - Key, - BaseKey, - std::string(OpenIdProvider), - AccessToken, - AuthManager, - OidcExePath, - ForceDisableBlocks, - ForceDisableTempBlocks, - AssumeHttp2}; - RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true); - } - - if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen) - { - std::string_view Url = Zen["url"sv].AsString(); - std::string_view Project = Zen["project"sv].AsString(); - if (Project.empty()) - { - return {nullptr, "Missing project"}; - } - std::string_view Oplog = Zen["oplog"sv].AsString(); - if (Oplog.empty()) - { - return {nullptr, "Missing oplog"}; - } - ZenRemoteStoreOptions Options = { - RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - std::string(Url), - std::string(Project), - std::string(Oplog)}; - RemoteStore = CreateZenRemoteStore(Options, TempFilePath); - } - - if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds) - { - std::string_view BuildsServiceUrl = Builds["url"sv].AsString(); - if (BuildsServiceUrl.empty()) - { - return {nullptr, "Missing service url"}; - } - - std::string Url = UrlDecode(BuildsServiceUrl); - std::string_view Namespace = Builds["namespace"sv].AsString(); - if (Namespace.empty()) - { - return {nullptr, "Missing namespace"}; - } - std::string_view Bucket = Builds["bucket"sv].AsString(); - if (Bucket.empty()) - { - return {nullptr, "Missing bucket"}; - } - std::string_view OpenIdProvider = Builds["openid-provider"sv].AsString(); - std::string AccessToken = std::string(Builds["access-token"sv].AsString()); - if (AccessToken.empty()) - { - std::string_view AccessTokenEnvVariable = Builds["access-token-env"].AsString(); - if (!AccessTokenEnvVariable.empty()) - { - AccessToken = GetEnvVariable(AccessTokenEnvVariable); - } - } - std::filesystem::path OidcExePath; - if (std::string_view OidcExePathString = Builds["oidc-exe-path"].AsString(); !OidcExePathString.empty()) - { - std::filesystem::path OidcExePathMaybe(OidcExePathString); - if (IsFile(OidcExePathMaybe)) - { - OidcExePath = std::move(OidcExePathMaybe); - } - else - { - ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); - } - } - std::string_view BuildIdParam = Builds["buildsid"sv].AsString(); - if (BuildIdParam.empty()) - { - return {nullptr, "Missing build id"}; - } - if (BuildIdParam.length() != Oid::StringLength) - { - return {nullptr, "Invalid build id"}; - } - Oid BuildId = Oid::FromHexString(BuildIdParam); - if (BuildId == Oid::Zero) - { - return {nullptr, "Invalid build id string"}; - } - - bool ForceDisableBlocks = Builds["disableblocks"sv].AsBool(false); - bool ForceDisableTempBlocks = Builds["disabletempblocks"sv].AsBool(false); - bool AssumeHttp2 = Builds["assumehttp2"sv].AsBool(false); - - MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView(); - IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize()); - - BuildsRemoteStoreOptions Options = { - RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - Url, - std::string(Namespace), - std::string(Bucket), - BuildId, - std::string(OpenIdProvider), - AccessToken, - AuthManager, - OidcExePath, - ForceDisableBlocks, - ForceDisableTempBlocks, - AssumeHttp2, - MetaData}; - RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true); - } - - if (!RemoteStore) - { - return {nullptr, "Unknown remote store type"}; - } - - return {std::move(RemoteStore), ""}; - } - - std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result) - { - if (Result.ErrorCode == 0) - { - return {HttpResponseCode::OK, Result.Text}; - } - return {static_cast<HttpResponseCode>(Result.ErrorCode), - Result.Reason.empty() ? Result.Text - : Result.Text.empty() ? Result.Reason - : fmt::format("{}: {}", Result.Reason, Result.Text)}; - } - -} // namespace - -////////////////////////////////////////////////////////////////////////// - -HttpProjectService::HttpProjectService(CidStore& Store, - ProjectStore* Projects, - HttpStatusService& StatusService, - HttpStatsService& StatsService, - AuthMgr& AuthMgr, - OpenProcessCache& InOpenProcessCache, - JobQueue& InJobQueue) -: m_Log(logging::Get("project")) -, m_CidStore(Store) -, m_ProjectStore(Projects) -, m_StatusService(StatusService) -, m_StatsService(StatsService) -, m_AuthMgr(AuthMgr) -, m_OpenProcessCache(InOpenProcessCache) -, m_JobQueue(InJobQueue) -{ - ZEN_MEMSCOPE(GetProjectHttpTag()); - - using namespace std::literals; - - m_Router.AddPattern("project", "([[:alnum:]_.]+)"); - m_Router.AddPattern("log", "([[:alnum:]_.]+)"); - m_Router.AddPattern("op", "([[:digit:]]+?)"); - m_Router.AddPattern("chunk", "([[:xdigit:]]{24})"); - m_Router.AddPattern("hash", "([[:xdigit:]]{40})"); - - m_Router.RegisterRoute( - "", - [this](HttpRouterRequest& Req) { HandleProjectListRequest(Req); }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "list", - [this](HttpRouterRequest& Req) { HandleProjectListRequest(Req); }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/batch", - [this](HttpRouterRequest& Req) { HandleChunkBatchRequest(Req); }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/files", - [this](HttpRouterRequest& Req) { HandleFilesRequest(Req); }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/chunkinfos", - [this](HttpRouterRequest& Req) { HandleChunkInfosRequest(Req); }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/{chunk}/info", - [this](HttpRouterRequest& Req) { HandleChunkInfoRequest(Req); }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/{chunk}", - [this](HttpRouterRequest& Req) { HandleChunkByIdRequest(Req); }, - HttpVerb::kGet | HttpVerb::kHead); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/{hash}", - [this](HttpRouterRequest& Req) { HandleChunkByCidRequest(Req); }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/prep", - [this](HttpRouterRequest& Req) { HandleOplogOpPrepRequest(Req); }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/new", - [this](HttpRouterRequest& Req) { HandleOplogOpNewRequest(Req); }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/validate", - [this](HttpRouterRequest& Req) { HandleOplogValidateRequest(Req); }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/{op}", - [this](HttpRouterRequest& Req) { HandleOpLogOpRequest(Req); }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}/oplog/{log}", - [this](HttpRouterRequest& Req) { HandleOpLogRequest(Req); }, - HttpVerb::kGet | HttpVerb::kPut | HttpVerb::kPost | HttpVerb::kDelete); - - m_Router.RegisterRoute( - "{project}/oplog/{log}/entries", - [this](HttpRouterRequest& Req) { HandleOpLogEntriesRequest(Req); }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "{project}", - [this](HttpRouterRequest& Req) { HandleProjectRequest(Req); }, - HttpVerb::kGet | HttpVerb::kPut | HttpVerb::kPost | HttpVerb::kDelete); - - // Push a oplog container - m_Router.RegisterRoute( - "{project}/oplog/{log}/save", - [this](HttpRouterRequest& Req) { HandleOplogSaveRequest(Req); }, - HttpVerb::kPost); - - // Pull a oplog container - m_Router.RegisterRoute( - "{project}/oplog/{log}/load", - [this](HttpRouterRequest& Req) { HandleOplogLoadRequest(Req); }, - HttpVerb::kGet); - - // Do an rpc style operation on project/oplog - m_Router.RegisterRoute( - "{project}/oplog/{log}/rpc", - [this](HttpRouterRequest& Req) { HandleRpcRequest(Req); }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "details\\$", - [this](HttpRouterRequest& Req) { HandleDetailsRequest(Req); }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "details\\$/{project}", - [this](HttpRouterRequest& Req) { HandleProjectDetailsRequest(Req); }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "details\\$/{project}/{log}", - [this](HttpRouterRequest& Req) { HandleOplogDetailsRequest(Req); }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "details\\$/{project}/{log}/{chunk}", - [this](HttpRouterRequest& Req) { HandleOplogOpDetailsRequest(Req); }, - HttpVerb::kGet); - - m_StatusService.RegisterHandler("prj", *this); - m_StatsService.RegisterHandler("prj", *this); -} - -HttpProjectService::~HttpProjectService() -{ - m_StatsService.UnregisterHandler("prj", *this); - m_StatusService.UnregisterHandler("prj", *this); -} - -const char* -HttpProjectService::BaseUri() const -{ - return "/prj/"; -} - -void -HttpProjectService::HandleRequest(HttpServerRequest& Request) -{ - m_ProjectStats.RequestCount++; - - ZEN_MEMSCOPE(GetProjectHttpTag()); - - metrics::OperationTiming::Scope $(m_HttpRequests); - - if (m_Router.HandleRequest(Request) == false) - { - m_ProjectStats.BadRequestCount++; - ZEN_WARN("No route found for {0}", Request.RelativeUri()); - } -} - -void -HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq) -{ - ZEN_TRACE_CPU("ProjectService::Stats"); - - const GcStorageSize StoreSize = m_ProjectStore->StorageSize(); - const CidStoreSize CidSize = m_CidStore.TotalSize(); - - CbObjectWriter Cbo; - - EmitSnapshot("requests", m_HttpRequests, Cbo); - - Cbo.BeginObject("store"); - { - Cbo.BeginObject("size"); - { - Cbo << "disk" << StoreSize.DiskSize; - Cbo << "memory" << StoreSize.MemorySize; - } - Cbo.EndObject(); - - Cbo.BeginObject("project"); - { - Cbo << "readcount" << m_ProjectStats.ProjectReadCount << "writecount" << m_ProjectStats.ProjectWriteCount << "deletecount" - << m_ProjectStats.ProjectDeleteCount; - } - Cbo.EndObject(); - - Cbo.BeginObject("oplog"); - { - Cbo << "readcount" << m_ProjectStats.OpLogReadCount << "writecount" << m_ProjectStats.OpLogWriteCount << "deletecount" - << m_ProjectStats.OpLogDeleteCount; - } - Cbo.EndObject(); - - Cbo.BeginObject("op"); - { - Cbo << "hitcount" << m_ProjectStats.OpHitCount << "misscount" << m_ProjectStats.OpMissCount << "writecount" - << m_ProjectStats.OpWriteCount; - } - Cbo.EndObject(); - - Cbo.BeginObject("chunk"); - { - Cbo << "hitcount" << m_ProjectStats.ChunkHitCount << "misscount" << m_ProjectStats.ChunkMissCount << "writecount" - << m_ProjectStats.ChunkWriteCount; - } - Cbo.EndObject(); - - Cbo << "requestcount" << m_ProjectStats.RequestCount; - Cbo << "badrequestcount" << m_ProjectStats.BadRequestCount; - } - Cbo.EndObject(); - - Cbo.BeginObject("cid"); - { - Cbo.BeginObject("size"); - { - Cbo << "tiny" << CidSize.TinySize; - Cbo << "small" << CidSize.SmallSize; - Cbo << "large" << CidSize.LargeSize; - Cbo << "total" << CidSize.TotalSize; - } - Cbo.EndObject(); - } - Cbo.EndObject(); - - return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); -} - -void -HttpProjectService::HandleStatusRequest(HttpServerRequest& Request) -{ - ZEN_TRACE_CPU("HttpProjectService::Status"); - CbObjectWriter Cbo; - Cbo << "ok" << true; - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); -} - -void -HttpProjectService::HandleProjectListRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::ProjectList"); - - HttpServerRequest& HttpReq = Req.ServerRequest(); - CbArray ProjectsList = m_ProjectStore->GetProjectsList(); - HttpReq.WriteResponse(HttpResponseCode::OK, ProjectsList); -} - -void -HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::ChunkBatch"); - - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - Project->TouchOplog(OplogId); - - // Parse Request - - IoBuffer Payload = HttpReq.ReadPayload(); - BinaryReader Reader(Payload); - - struct RequestHeader - { - enum - { - kMagic = 0xAAAA'77AC - }; - uint32_t Magic; - uint32_t ChunkCount; - uint32_t Reserved1; - uint32_t Reserved2; - }; - - struct RequestChunkEntry - { - Oid ChunkId; - uint32_t CorrelationId; - uint64_t Offset; - uint64_t RequestBytes; - }; - - if (Payload.Size() <= sizeof(RequestHeader)) - { - m_ProjectStats.BadRequestCount++; - HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - - RequestHeader RequestHdr; - Reader.Read(&RequestHdr, sizeof RequestHdr); - - if (RequestHdr.Magic != RequestHeader::kMagic) - { - m_ProjectStats.BadRequestCount++; - HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - - std::vector<RequestChunkEntry> RequestedChunks; - RequestedChunks.resize(RequestHdr.ChunkCount); - Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); - - // Make Response - - struct ResponseHeader - { - uint32_t Magic = 0xbada'b00f; - uint32_t ChunkCount; - uint32_t Reserved1 = 0; - uint32_t Reserved2 = 0; - }; - - struct ResponseChunkEntry - { - uint32_t CorrelationId; - uint32_t Flags = 0; - uint64_t ChunkSize; - }; - - std::vector<IoBuffer> OutBlobs; - OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry)); - for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) - { - const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; - IoBuffer FoundChunk = FoundLog->FindChunk(Project->RootDir, RequestedChunk.ChunkId, nullptr); - if (FoundChunk) - { - if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) - { - uint64_t Offset = RequestedChunk.Offset; - if (Offset > FoundChunk.Size()) - { - Offset = FoundChunk.Size(); - } - uint64_t Size = RequestedChunk.RequestBytes; - if ((Offset + Size) > FoundChunk.Size()) - { - Size = FoundChunk.Size() - Offset; - } - FoundChunk = IoBuffer(FoundChunk, Offset, Size); - } - } - OutBlobs.emplace_back(std::move(FoundChunk)); - } - uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData()); - ResponseHeader ResponseHdr; - ResponseHdr.ChunkCount = RequestHdr.ChunkCount; - memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); - ResponsePtr += sizeof(ResponseHdr); - for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) - { - const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; - const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]); - ResponseChunkEntry ResponseChunk; - ResponseChunk.CorrelationId = RequestedChunk.CorrelationId; - if (FoundChunk) - { - ResponseChunk.ChunkSize = FoundChunk.Size(); - m_ProjectStats.ChunkHitCount++; - } - else - { - ResponseChunk.ChunkSize = uint64_t(-1); - m_ProjectStats.ChunkMissCount++; - } - memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); - ResponsePtr += sizeof(ResponseChunk); - } - std::erase_if(OutBlobs, [](IoBuffer Buffer) -> bool { return !Buffer; }); - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); -} - -void -HttpProjectService::HandleFilesRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::Files"); - - using namespace std::literals; - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - // File manifest fetch, returns the client file list - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - - std::unordered_set<std::string> WantedFieldNames; - if (auto FieldFilter = HttpServerRequest::Decode(Params.GetValue("fieldnames")); !FieldFilter.empty()) - { - if (FieldFilter != "*") // Get all - empty FieldFilter equal getting all fields - { - ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) { - WantedFieldNames.insert(std::string(FieldName)); - return true; - }); - } - } - else - { - const bool FilterClient = Params.GetValue("filter"sv) == "client"sv; - WantedFieldNames.insert("id"); - WantedFieldNames.insert("clientpath"); - if (!FilterClient) - { - WantedFieldNames.insert("serverpath"); - } - } - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Project files request for unknown project '{}'", ProjectId)); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Project files for unknown oplog '{}/{}'", ProjectId, OplogId)); - } - Project->TouchOplog(OplogId); - - CbObject ResponsePayload = ProjectStore::GetProjectFiles(Log(), *Project, *FoundLog, WantedFieldNames); - - if (HttpReq.AcceptContentType() == HttpContentType::kCompressedBinary) - { - CompositeBuffer Payload = CompressedBuffer::Compress(ResponsePayload.GetBuffer()).GetCompressed(); - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, Payload); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); - } -} - -void -HttpProjectService::HandleChunkInfosRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::ChunkInfos"); - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - - std::unordered_set<std::string> WantedFieldNames; - if (auto FieldFilter = HttpServerRequest::Decode(Params.GetValue("fieldnames")); !FieldFilter.empty()) - { - if (FieldFilter != "*") // Get all - empty FieldFilter equal getting all fields - { - ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) { - WantedFieldNames.insert(std::string(FieldName)); - return true; - }); - } - } - else - { - WantedFieldNames.insert("id"); - WantedFieldNames.insert("rawhash"); - WantedFieldNames.insert("rawsize"); - } - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Chunk infos request for unknown project '{}'", ProjectId)); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Chunk infos for unknown oplog '{}/{}'", ProjectId, OplogId)); - } - Project->TouchOplog(OplogId); - - CbObject ResponsePayload = ProjectStore::GetProjectChunkInfos(Log(), *Project, *FoundLog, WantedFieldNames); - if (HttpReq.AcceptContentType() == HttpContentType::kCompressedBinary) - { - CompositeBuffer Payload = CompressedBuffer::Compress(ResponsePayload.GetBuffer()).GetCompressed(); - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, Payload); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); - } -} - -void -HttpProjectService::HandleChunkInfoRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::ChunkInfo"); - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& ChunkId = Req.GetCapture(3); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Chunk info request for unknown project '{}'", ProjectId)); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Chunk info for unknown oplog '{}/{}'", ProjectId, OplogId)); - } - Project->TouchOplog(OplogId); - - if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)); - } - - const Oid Obj = Oid::FromHexString(ChunkId); - - CbObject ResponsePayload = ProjectStore::GetChunkInfo(Log(), *Project, *FoundLog, Obj); - if (ResponsePayload) - { - m_ProjectStats.ChunkHitCount++; - return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); - } - else - { - m_ProjectStats.ChunkMissCount++; - ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Chunk info for unknown chunk '{}/{}/{}'", ProjectId, OplogId, ChunkId)); - } -} - -void -HttpProjectService::HandleChunkByIdRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::ChunkById"); - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& ChunkId = Req.GetCapture(3); - - uint64_t Offset = 0; - uint64_t Size = ~(0ull); - - auto QueryParms = HttpReq.GetQueryParams(); - - if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false) - { - if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm)) - { - Offset = OffsetVal.value(); - } - else - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - } - - if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false) - { - if (auto SizeVal = ParseInt<uint64_t>(SizeParm)) - { - Size = SizeVal.value(); - } - else - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - } - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Chunk request for unknown project '{}'", ProjectId)); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)); - } - Project->TouchOplog(OplogId); - - if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, ChunkId)); - } - - const Oid Obj = Oid::FromHexString(ChunkId); - - HttpContentType AcceptType = HttpReq.AcceptContentType(); - - ProjectStore::GetChunkRangeResult Result = - ProjectStore::GetChunkRange(Log(), *Project, *FoundLog, Obj, Offset, Size, AcceptType, /*OptionalInOutModificationTag*/ nullptr); - - switch (Result.Error) - { - case ProjectStore::GetChunkRangeResult::EError::Ok: - m_ProjectStats.ChunkHitCount++; - ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Result.ContentType)); - return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ContentType, Result.Chunk); - case ProjectStore::GetChunkRangeResult::EError::NotFound: - m_ProjectStats.ChunkMissCount++; - ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); - return HttpReq.WriteResponse(HttpResponseCode::NotFound, Result.ContentType, Result.Chunk); - case ProjectStore::GetChunkRangeResult::EError::MalformedContent: - return HttpReq.WriteResponse( - HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Get chunk {}/{}/{} failed. Reason: {}", ProjectId, OplogId, ChunkId, Result.ErrorDescription)); - case ProjectStore::GetChunkRangeResult::EError::OutOfRange: - m_ProjectStats.ChunkMissCount++; - ZEN_DEBUG("chunk - '{}/{}/{}' OUT OF RANGE", ProjectId, OplogId, ChunkId); - return HttpReq.WriteResponse( - HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Get chunk {}/{}/{} failed. Reason: {}", ProjectId, OplogId, ChunkId, Result.ErrorDescription)); - default: - ZEN_ASSERT(false); - break; - } -} - -void -HttpProjectService::HandleChunkByCidRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::ChunkByCid"); - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& Cid = Req.GetCapture(3); - HttpContentType AcceptType = HttpReq.AcceptContentType(); - HttpContentType RequestType = HttpReq.RequestContentType(); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Chunk request for unknown project '{}'", ProjectId)); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)); - } - Project->TouchOplog(OplogId); - - if (Cid.length() != IoHash::StringLength) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, Cid)); - } - - const IoHash Hash = IoHash::FromHexString(Cid); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - IoBuffer Value = m_ProjectStore->GetChunk(*Project, *FoundLog, Hash); - if (Value) - { - if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary || - AcceptType == ZenContentType::kJSON || AcceptType == ZenContentType::kYAML || - AcceptType == ZenContentType::kCbObject) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Value)); - IoBuffer DecompressedBuffer = Compressed.Decompress().AsIoBuffer(); - - if (DecompressedBuffer) - { - if (AcceptType == ZenContentType::kJSON || AcceptType == ZenContentType::kYAML || - AcceptType == ZenContentType::kCbObject) - { - CbValidateError CbErr = ValidateCompactBinary(DecompressedBuffer.GetView(), CbValidateMode::Default); - if (!!CbErr) - { - m_ProjectStats.BadRequestCount++; - ZEN_DEBUG( - "chunk - '{}/{}/{}' WRONGTYPE. Reason: `Requested {} format, but could not convert to object`", - ProjectId, - OplogId, - Cid, - ToString(AcceptType)); - return HttpReq.WriteResponse( - HttpResponseCode::NotAcceptable, - HttpContentType::kText, - fmt::format("Content format not supported, requested {} format, but could not convert to object", - ToString(AcceptType))); - } - - m_ProjectStats.ChunkHitCount++; - CbObject ContainerObject = LoadCompactBinaryObject(DecompressedBuffer); - return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerObject); - } - else - { - Value = DecompressedBuffer; - Value.SetContentType(ZenContentType::kBinary); - } - } - else - { - m_ProjectStats.BadRequestCount++; - ZEN_DEBUG("chunk - '{}/{}/{}' WRONGTYPE. Reason: `Requested {} format, but could not decompress stored data`", - ProjectId, - OplogId, - Cid, - ToString(AcceptType)); - return HttpReq.WriteResponse( - HttpResponseCode::NotAcceptable, - HttpContentType::kText, - fmt::format("Content format not supported, requested {} format, but could not decompress stored data", - ToString(AcceptType))); - } - } - m_ProjectStats.ChunkHitCount++; - return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value); - } - else - { - m_ProjectStats.ChunkMissCount++; - ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid); - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - } - case HttpVerb::kPost: - { - if (!m_ProjectStore->AreDiskWritesAllowed()) - { - return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - } - if (RequestType != HttpContentType::kCompressedBinary) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Chunk request for chunk id '{}/{}'/'{}' as unexpected content type: '{}'", - ProjectId, - OplogId, - Cid, - ToString(RequestType))); - } - IoBuffer Payload = HttpReq.ReadPayload(); - Payload.SetContentType(RequestType); - bool IsNew = m_ProjectStore->PutChunk(*Project, *FoundLog, Hash, std::move(Payload)); - - m_ProjectStats.ChunkWriteCount++; - return HttpReq.WriteResponse(IsNew ? HttpResponseCode::Created : HttpResponseCode::OK); - } - break; - } -} - -void -HttpProjectService::HandleOplogOpPrepRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::OplogOpPrep"); - - using namespace std::literals; - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - Project->TouchOplog(OplogId); - - // This operation takes a list of referenced hashes and decides which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response - - CbValidateError ValidateResult; - if (CbObject RequestObject = ValidateAndReadCompactBinaryObject(HttpReq.ReadPayload(), ValidateResult); - ValidateResult == CbValidateError::None) - { - std::vector<IoHash> NeedList; - - { - eastl::fixed_vector<IoHash, 16> ChunkList; - CbArrayView HaveList = RequestObject["have"sv].AsArrayView(); - ChunkList.reserve(HaveList.Num()); - for (auto& Entry : HaveList) - { - ChunkList.push_back(Entry.AsHash()); - } - - NeedList = FoundLog->CheckPendingChunkReferences(std::span(begin(ChunkList), end(ChunkList)), std::chrono::minutes(2)); - } - - CbObjectWriter Cbo(1 + 1 + 5 + NeedList.size() * (1 + sizeof(IoHash::Hash)) + 1); - Cbo.BeginArray("need"); - { - for (const IoHash& Hash : NeedList) - { - ZEN_DEBUG("prep - NEED: {}", Hash); - Cbo << Hash; - } - } - Cbo.EndArray(); - CbObject Response = Cbo.Save(); - - return HttpReq.WriteResponse(HttpResponseCode::OK, Response); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Invalid compact binary format: '{}'", ToString(ValidateResult))); - } -} - -void -HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::OplogOpNew"); - - using namespace std::literals; - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - if (!m_ProjectStore->AreDiskWritesAllowed()) - { - return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - - bool IsUsingSalt = false; - IoHash SaltHash = IoHash::Zero; - - if (std::string_view SaltParam = Params.GetValue("salt"sv); SaltParam.empty() == false) - { - const uint32_t Salt = std::stoi(std::string(SaltParam)); - SaltHash = IoHash::HashBuffer(&Salt, sizeof Salt); - IsUsingSalt = true; - } - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - Project->TouchOplog(OplogId); - - ProjectStore::Oplog& Oplog = *FoundLog; - - IoBuffer Payload = HttpReq.ReadPayload(); - - // This will attempt to open files which may not exist for the case where - // the prep step rejected the chunk. This should be fixed since there's - // a performance cost associated with any file system activity - - bool IsValid = true; - std::vector<IoHash> MissingChunks; - - CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer { - if (m_CidStore.ContainsChunk(Hash)) - { - // Return null attachment as we already have it, no point in reading it and storing it again - return {}; - } - - IoHash AttachmentId; - if (IsUsingSalt) - { - IoHash AttachmentSpec[]{SaltHash, Hash}; - AttachmentId = IoHash::HashBuffer(MakeMemoryView(AttachmentSpec)); - } - else - { - AttachmentId = Hash; - } - - std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString(); - if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) - { - Data.SetDeleteOnClose(true); - return SharedBuffer(std::move(Data)); - } - else - { - IsValid = false; - MissingChunks.push_back(Hash); - - return {}; - } - }; - - CbPackage Package; - - if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver)) - { - CbValidateError ValidateResult; - if (CbObject Core = ValidateAndReadCompactBinaryObject(IoBuffer(Payload), ValidateResult); - ValidateResult == CbValidateError::None && Core) - { - Package.SetObject(Core); - } - else - { - std::filesystem::path BadPackagePath = - Oplog.TempPath() / "bad_packages"sv / fmt::format("session{}_request{}"sv, HttpReq.SessionId(), HttpReq.RequestId()); - - ZEN_WARN("Received malformed package ('{}')! Saving payload to '{}'", ToString(ValidateResult), BadPackagePath); - - WriteFile(BadPackagePath, Payload); - - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - u8"request body must be a compact binary object or package in legacy format"); - } - } - - m_ProjectStats.ChunkMissCount += MissingChunks.size(); - - if (!IsValid) - { - ExtendableStringBuilder<256> ResponseText; - ResponseText.Append("Missing chunk references: "); - - bool IsFirst = true; - for (const auto& Hash : MissingChunks) - { - if (IsFirst) - { - IsFirst = false; - } - else - { - ResponseText.Append(", "); - } - Hash.ToHexString(ResponseText); - } - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, ResponseText); - } - - CbObject Core = Package.GetObject(); - - if (!Core["key"sv]) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); - } - - eastl::fixed_vector<IoHash, 16> ReferencedChunks; - Core.IterateAttachments([&ReferencedChunks](CbFieldView View) { ReferencedChunks.push_back(View.AsAttachment()); }); - - // Write core to oplog - - size_t AttachmentCount = Package.GetAttachments().size(); - const ProjectStore::LogSequenceNumber OpLsn = Oplog.AppendNewOplogEntry(Package); - if (!OpLsn) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - m_ProjectStats.ChunkWriteCount += AttachmentCount; - - // Once we stored the op, we no longer need to retain any chunks this op references - if (!ReferencedChunks.empty()) - { - FoundLog->RemovePendingChunkReferences(std::span(begin(ReferencedChunks), end(ReferencedChunks))); - } - - m_ProjectStats.OpWriteCount++; - ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn.Number, NiceBytes(Payload.Size()), Core["key"sv].AsString()); - HttpReq.WriteResponse(HttpResponseCode::Created); -} - -void -HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::OplogOpValidate"); - - using namespace std::literals; - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - if (!m_ProjectStore->AreDiskWritesAllowed()) - { - return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, ZenContentType::kText, fmt::format("Project '{}' not found", ProjectId)); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - ZenContentType::kText, - fmt::format("Oplog '{}' not found in project '{}'", OplogId, ProjectId)); - } - Project->TouchOplog(OplogId); - - ProjectStore::Oplog& Oplog = *FoundLog; - - std::atomic_bool CancelFlag = false; - ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(Project->RootDir, CancelFlag, &GetSmallWorkerPool(EWorkloadType::Burst)); - tsl::robin_map<Oid, std::string, Oid::Hasher> KeyNameLookup; - KeyNameLookup.reserve(Result.OpKeys.size()); - for (const auto& It : Result.OpKeys) - { - KeyNameLookup.insert_or_assign(It.first, It.second); - } - CbObjectWriter Writer; - Writer << "HasMissingData" << !Result.IsEmpty(); - Writer << "OpCount" << Result.OpCount; - Writer << "LSNLow" << Result.LSNLow.Number; - Writer << "LSNHigh" << Result.LSNHigh.Number; - if (!Result.MissingFiles.empty()) - { - Writer.BeginArray("MissingFiles"); - for (const auto& MissingFile : Result.MissingFiles) - { - Writer.BeginObject(); - { - Writer << "Key" << MissingFile.first; - Writer << "KeyName" << KeyNameLookup[MissingFile.first]; - Writer << "Id" << MissingFile.second.Id; - Writer << "Hash" << MissingFile.second.Hash; - Writer << "ServerPath" << MissingFile.second.ServerPath; - Writer << "ClientPath" << MissingFile.second.ClientPath; - } - Writer.EndObject(); - } - Writer.EndArray(); - } - if (!Result.MissingChunks.empty()) - { - Writer.BeginArray("MissingChunks"); - for (const auto& MissingChunk : Result.MissingChunks) - { - Writer.BeginObject(); - { - Writer << "Key" << MissingChunk.first; - Writer << "KeyName" << KeyNameLookup[MissingChunk.first]; - Writer << "Id" << MissingChunk.second.Id; - Writer << "Hash" << MissingChunk.second.Hash; - } - Writer.EndObject(); - } - Writer.EndArray(); - } - if (!Result.MissingMetas.empty()) - { - Writer.BeginArray("MissingMetas"); - for (const auto& MissingMeta : Result.MissingMetas) - { - Writer.BeginObject(); - { - Writer << "Key" << MissingMeta.first; - Writer << "KeyName" << KeyNameLookup[MissingMeta.first]; - Writer << "Id" << MissingMeta.second.Id; - Writer << "Hash" << MissingMeta.second.Hash; - } - Writer.EndObject(); - } - Writer.EndArray(); - } - if (!Result.MissingAttachments.empty()) - { - Writer.BeginArray("MissingAttachments"); - for (const auto& MissingMeta : Result.MissingAttachments) - { - Writer.BeginObject(); - { - Writer << "Key" << MissingMeta.first; - Writer << "KeyName" << KeyNameLookup[MissingMeta.first]; - Writer << "Hash" << MissingMeta.second; - } - Writer.EndObject(); - } - Writer.EndArray(); - } - CbObject Response = Writer.Save(); - HttpReq.WriteResponse(HttpResponseCode::OK, Response); -} - -void -HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::OplogOp"); - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const std::string_view ProjectId = Req.GetCapture(1); - const std::string_view OplogId = Req.GetCapture(2); - const std::string_view OpIdString = Req.GetCapture(3); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - Project->TouchOplog(OplogId); - - ProjectStore::Oplog& Oplog = *FoundLog; - - if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString)) - { - if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(ProjectStore::LogSequenceNumber(OpId.value()))) - { - CbObject& Op = MaybeOp.value(); - if (HttpReq.AcceptContentType() == ZenContentType::kCbPackage) - { - CbPackage Package; - Package.SetObject(Op); - - Op.IterateAttachments([&](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsAttachment(); - IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); - if (Payload) - { - switch (Payload.GetContentType()) - { - case ZenContentType::kCbObject: - { - CbValidateError ValidateResult; - if (CbObject Object = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult); - ValidateResult == CbValidateError::None && Object) - { - Package.AddAttachment(CbAttachment(Object)); - } - else - { - // Error - malformed object - ZEN_WARN("malformed object returned for {} ('{}')", AttachmentHash, ToString(ValidateResult)); - } - } - break; - - case ZenContentType::kCompressedBinary: - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload))) - { - Package.AddAttachment(CbAttachment(Compressed, AttachmentHash)); - } - else - { - // Error - not compressed! - - ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash); - } - break; - - default: - Package.AddAttachment(CbAttachment(SharedBuffer(Payload))); - break; - } - } - }); - m_ProjectStats.OpHitCount++; - return HttpReq.WriteResponse(HttpResponseCode::Accepted, Package); - } - else - { - // Client cannot accept a package, so we only send the core object - m_ProjectStats.OpHitCount++; - return HttpReq.WriteResponse(HttpResponseCode::Accepted, Op); - } - } - } - m_ProjectStats.OpMissCount++; - return HttpReq.WriteResponse(HttpResponseCode::NotFound); -} - -void -HttpProjectService::HandleOpLogRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::Oplog"); - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - using namespace std::literals; - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("project {} not found", ProjectId)); - } - Project->TouchProject(); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - Ref<ProjectStore::Oplog> OplogIt = Project->ReadOplog(OplogId); - if (!OplogIt) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("oplog {} not found in project {}", OplogId, ProjectId)); - } - - ProjectStore::Oplog& Log = *OplogIt; - - CbObjectWriter Cb; - Cb << "id"sv << Log.OplogId() << "project"sv << Project->Identifier << "tempdir"sv << Log.TempPath().c_str() - << "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount" << Log.OplogCount() - << "expired"sv << Project->IsExpired(GcClock::TimePoint::min(), Log); - HttpReq.WriteResponse(HttpResponseCode::OK, Cb.Save()); - - m_ProjectStats.OpLogReadCount++; - } - break; - - case HttpVerb::kPost: - { - if (!m_ProjectStore->AreDiskWritesAllowed()) - { - return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - } - std::filesystem::path OplogMarkerPath; - if (CbObject Params = HttpReq.ReadPayloadObject()) - { - OplogMarkerPath = Params["gcpath"sv].AsString(); - } - - Ref<ProjectStore::Oplog> OplogIt = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); - if (!OplogIt) - { - if (!Project->NewOplog(OplogId, OplogMarkerPath)) - { - // TODO: indicate why the operation failed! - return HttpReq.WriteResponse(HttpResponseCode::InternalServerError); - } - Project->TouchOplog(OplogId); - - m_ProjectStats.OpLogWriteCount++; - ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); - - return HttpReq.WriteResponse(HttpResponseCode::Created); - } - - // I guess this should ultimately be used to execute RPCs but for now, it - // does absolutely nothing - - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - break; - - case HttpVerb::kPut: - { - if (!m_ProjectStore->AreDiskWritesAllowed()) - { - return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - std::filesystem::path OplogMarkerPath; - if (CbObject Params = HttpReq.ReadPayloadObject()) - { - OplogMarkerPath = Params["gcpath"sv].AsString(); - } - - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); - if (!FoundLog) - { - if (!Project->NewOplog(OplogId, OplogMarkerPath)) - { - // TODO: indicate why the operation failed! - return HttpReq.WriteResponse(HttpResponseCode::InternalServerError); - } - Project->TouchOplog(OplogId); - - m_ProjectStats.OpLogWriteCount++; - ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); - - return HttpReq.WriteResponse(HttpResponseCode::Created); - } - Project->TouchOplog(OplogId); - - FoundLog->Update(OplogMarkerPath); - - m_ProjectStats.OpLogWriteCount++; - ZEN_INFO("updated oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); - - return HttpReq.WriteResponse(HttpResponseCode::OK); - } - break; - - case HttpVerb::kDelete: - { - ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId); - - if (Project->DeleteOplog(OplogId)) - { - m_ProjectStats.OpLogDeleteCount++; - return HttpReq.WriteResponse(HttpResponseCode::OK); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::Locked, - HttpContentType::kText, - fmt::format("oplog {}/{} is in use", ProjectId, OplogId)); - } - } - break; - - default: - break; - } -} - -std::optional<OplogReferencedSet> -LoadReferencedSet(ProjectStore::Project& Project, ProjectStore::Oplog& Log) -{ - using namespace std::literals; - - Oid ReferencedSetOplogId = OpKeyStringAsOid(OplogReferencedSet::ReferencedSetOplogKey); - std::optional<CbObject> ReferencedSetOp = Log.GetOpByKey(ReferencedSetOplogId); - if (!ReferencedSetOp) - { - return std::optional<OplogReferencedSet>(); - } - // We expect only a single file in the "files" array; get the chunk for the first file - CbFieldView FileField = *(*ReferencedSetOp)["files"sv].AsArrayView().CreateViewIterator(); - Oid ChunkId = FileField.AsObjectView()["id"sv].AsObjectId(); - if (ChunkId == Oid::Zero) - { - return std::optional<OplogReferencedSet>(); - } - - return OplogReferencedSet::LoadFromChunk(Log.FindChunk(Project.RootDir, ChunkId, nullptr)); -} - -void -HttpProjectService::HandleOpLogEntriesRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::OplogEntries"); - - using namespace std::literals; - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - Project->TouchOplog(OplogId); - - CbObjectWriter Response; - - if (FoundLog->OplogCount() > 0) - { - std::unordered_set<std::string> FieldNamesFilter; - auto FilterObject = [&FieldNamesFilter](CbObjectView& Object) -> CbObject { - CbObject RewrittenOp = RewriteCbObject(Object, [&FieldNamesFilter](CbObjectWriter&, CbFieldView Field) -> bool { - if (FieldNamesFilter.contains(std::string(Field.GetName()))) - { - return false; - } - - return true; - }); - - return RewrittenOp; - }; - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - if (auto FieldFilter = HttpServerRequest::Decode(Params.GetValue("fieldfilter")); !FieldFilter.empty()) - { - ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) { - FieldNamesFilter.insert(std::string(FieldName)); - return true; - }); - } - - if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty()) - { - Oid OpKeyId = OpKeyStringAsOid(OpKey); - std::optional<CbObject> Op = FoundLog->GetOpByKey(OpKeyId); - - if (Op.has_value()) - { - if (FieldNamesFilter.empty()) - { - Response << "entry"sv << Op.value(); - } - else - { - Response << "entry"sv << FilterObject(Op.value()); - } - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - } - else - { - ProjectStore::Oplog::Paging EntryPaging; - if (std::string_view Param = Params.GetValue("start"); !Param.empty()) - { - if (auto Value = ParseInt<int32>(Param)) - { - EntryPaging.Start = *Value; - } - } - if (std::string_view Param = Params.GetValue("count"); !Param.empty()) - { - if (auto Value = ParseInt<int32>(Param)) - { - EntryPaging.Count = *Value; - } - } - - std::optional<OplogReferencedSet> MaybeReferencedSet; - if (auto TrimString = Params.GetValue("trim_by_referencedset"); TrimString == "true") - { - MaybeReferencedSet = LoadReferencedSet(*Project, *FoundLog); - } - Response.BeginArray("entries"sv); - - bool ShouldFilterFields = !FieldNamesFilter.empty(); - - if (MaybeReferencedSet) - { - const OplogReferencedSet& ReferencedSet = MaybeReferencedSet.value(); - FoundLog->IterateOplogWithKey( - [this, &Response, &FilterObject, ShouldFilterFields, &ReferencedSet](ProjectStore::LogSequenceNumber /* LSN */, - const Oid& Key, - CbObjectView Op) { - if (!ReferencedSet.Contains(Key)) - { - if (!OplogReferencedSet::IsNonPackage(Op["key"].AsString())) - { - return; - } - } - - if (ShouldFilterFields) - { - Response << FilterObject(Op); - } - else - { - Response << Op; - } - }, - EntryPaging); - } - else - { - FoundLog->IterateOplog( - [this, &Response, &FilterObject, ShouldFilterFields](CbObjectView Op) { - if (ShouldFilterFields) - { - Response << FilterObject(Op); - } - else - { - Response << Op; - } - }, - EntryPaging); - } - - Response.EndArray(); - } - } - if (HttpReq.AcceptContentType() == HttpContentType::kCompressedBinary) - { - CompositeBuffer Payload = CompressedBuffer::Compress(Response.Save().GetBuffer()).GetCompressed(); - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, Payload); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); - } -} - -void -HttpProjectService::HandleProjectRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::Project"); - - using namespace std::literals; - - HttpServerRequest& HttpReq = Req.ServerRequest(); - const std::string_view ProjectId = Req.GetCapture(1); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kPost: - { - if (!m_ProjectStore->AreDiskWritesAllowed()) - { - return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - CbValidateError ValidateResult; - if (CbObject Params = ValidateAndReadCompactBinaryObject(HttpReq.ReadPayload(), ValidateResult); - ValidateResult == CbValidateError::None) - { - std::filesystem::path Root = Params["root"sv].AsU8String(); // Workspace root (i.e `D:/UE5/`) - std::filesystem::path EngineRoot = Params["engine"sv].AsU8String(); // Engine root (i.e `D:/UE5/Engine`) - std::filesystem::path ProjectRoot = - Params["project"sv].AsU8String(); // Project root directory (i.e `D:/UE5/Samples/Games/Lyra`) - std::filesystem::path ProjectFilePath = - Params["projectfile"sv].AsU8String(); // Project file path (i.e `D:/UE5/Samples/Games/Lyra/Lyra.uproject`) - - const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId; - m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath); - - ZEN_INFO("established project (id: '{}', roots: '{}', '{}', '{}', '{}'{})", - ProjectId, - Root, - EngineRoot, - ProjectRoot, - ProjectFilePath, - ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); - - m_ProjectStats.ProjectWriteCount++; - HttpReq.WriteResponse(HttpResponseCode::Created); - } - else - { - HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Malformed compact binary object: '{}'", ToString(ValidateResult))); - } - } - break; - - case HttpVerb::kPut: - { - if (!m_ProjectStore->AreDiskWritesAllowed()) - { - return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - } - CbValidateError ValidateResult; - if (CbObject Params = ValidateAndReadCompactBinaryObject(HttpReq.ReadPayload(), ValidateResult); - ValidateResult == CbValidateError::None) - { - std::filesystem::path Root = Params["root"sv].AsU8String(); // Workspace root (i.e `D:/UE5/`) - std::filesystem::path EngineRoot = Params["engine"sv].AsU8String(); // Engine root (i.e `D:/UE5/Engine`) - std::filesystem::path ProjectRoot = - Params["project"sv].AsU8String(); // Project root directory (i.e `D:/UE5/Samples/Games/Lyra`) - std::filesystem::path ProjectFilePath = - Params["projectfile"sv].AsU8String(); // Project file path (i.e `D:/UE5/Samples/Games/Lyra/Lyra.uproject`) - - if (m_ProjectStore->UpdateProject(ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath)) - { - m_ProjectStats.ProjectWriteCount++; - ZEN_INFO("updated project (id: '{}', roots: '{}', '{}', '{}', '{}'{})", - ProjectId, - Root, - EngineRoot, - ProjectRoot, - ProjectFilePath, - ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); - - HttpReq.WriteResponse(HttpResponseCode::OK); - } - else - { - const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId; - m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath); - - m_ProjectStats.ProjectWriteCount++; - ZEN_INFO("established project (id: '{}', roots: '{}', '{}', '{}', '{}'{})", - ProjectId, - Root, - EngineRoot, - ProjectRoot, - ProjectFilePath, - ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); - - HttpReq.WriteResponse(HttpResponseCode::Created); - } - } - else - { - HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Malformed compact binary object: '{}'", ToString(ValidateResult))); - } - } - break; - - case HttpVerb::kGet: - { - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("project {} not found", ProjectId)); - } - Project->TouchProject(); - - std::vector<std::string> OpLogs = Project->ScanForOplogs(); - - CbObjectWriter Response; - Response << "id"sv << Project->Identifier; - Response << "root"sv << PathToUtf8(Project->RootDir); - Response << "engine"sv << PathToUtf8(Project->EngineRootDir); - Response << "project"sv << PathToUtf8(Project->ProjectRootDir); - Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath); - - Response.BeginArray("oplogs"sv); - for (const std::string& OplogId : OpLogs) - { - Response.BeginObject(); - Response << "id"sv << OplogId; - Response.EndObject(); - } - Response.EndArray(); // oplogs - - HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); - - m_ProjectStats.ProjectReadCount++; - } - break; - - case HttpVerb::kDelete: - { - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("project {} not found", ProjectId)); - } - - ZEN_INFO("deleting project '{}'", ProjectId); - if (!m_ProjectStore->DeleteProject(ProjectId)) - { - return HttpReq.WriteResponse(HttpResponseCode::Locked, - HttpContentType::kText, - fmt::format("project {} is in use", ProjectId)); - } - - m_ProjectStats.ProjectDeleteCount++; - return HttpReq.WriteResponse(HttpResponseCode::NoContent); - } - break; - - default: - break; - } -} - -void -HttpProjectService::HandleOplogSaveRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::OplogSave"); - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - if (!m_ProjectStore->AreDiskWritesAllowed()) - { - return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - if (HttpReq.RequestContentType() != HttpContentType::kCbObject) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type"); - } - IoBuffer Payload = HttpReq.ReadPayload(); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Write oplog request for unknown project '{}'", ProjectId)); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); - if (!Oplog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); - } - Project->TouchOplog(OplogId); - - CbValidateError ValidateResult; - if (CbObject ContainerObject = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult); - ValidateResult == CbValidateError::None && ContainerObject) - { - RwLock AttachmentsLock; - tsl::robin_set<IoHash, IoHash::Hasher> Attachments; - - auto HasAttachment = [this](const IoHash& RawHash) { return m_CidStore.ContainsChunk(RawHash); }; - auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) { - RwLock::ExclusiveLockScope _(AttachmentsLock); - if (BlockHash != IoHash::Zero) - { - Attachments.insert(BlockHash); - } - else - { - Attachments.insert(ChunkHashes.begin(), ChunkHashes.end()); - } - }; - auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) { - RwLock::ExclusiveLockScope _(AttachmentsLock); - Attachments.insert(RawHash); - }; - - auto OnChunkedAttachment = [](const ChunkedInfo&) {}; - - auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog->CaptureAddedAttachments(RawHashes); }; - - // Make sure we retain any attachments we download before writing the oplog - Oplog->EnableUpdateCapture(); - auto _ = MakeGuard([&Oplog]() { Oplog->DisableUpdateCapture(); }); - - RemoteProjectStore::Result Result = SaveOplogContainer(*Oplog, - ContainerObject, - OnReferencedAttachments, - HasAttachment, - OnNeedBlock, - OnNeedAttachment, - OnChunkedAttachment, - nullptr); - - if (Result.ErrorCode == 0) - { - if (Attachments.empty()) - { - HttpReq.WriteResponse(HttpResponseCode::OK); - } - else - { - CbObjectWriter Cbo(1 + 1 + 5 + Attachments.size() * (1 + sizeof(IoHash::Hash)) + 1); - Cbo.BeginArray("need"); - { - for (const IoHash& Hash : Attachments) - { - ZEN_DEBUG("Need attachment {}", Hash); - Cbo << Hash; - } - } - Cbo.EndArray(); // "need" - - CbObject ResponsePayload = Cbo.Save(); - return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); - } - } - else - { - ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", - ToString(HttpReq.RequestVerb()), - HttpReq.QueryString(), - Result.ErrorCode, - Result.Reason); - - if (Result.Reason.empty()) - { - return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode)); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.Reason); - } - } - } - else - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Invalid payload: '{}'", ToString(ValidateResult))); - } -} - -void -HttpProjectService::HandleOplogLoadRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::OplogLoad"); - - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - const HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - - if (HttpReq.AcceptContentType() != HttpContentType::kCbObject) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type"); - } - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Read oplog request for unknown project '{}'", ProjectId)); - } - Project->TouchProject(); - - Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); - if (!Oplog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); - } - Project->TouchOplog(OplogId); - - size_t MaxBlockSize = RemoteStoreOptions::DefaultMaxBlockSize; - if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false) - { - if (auto Value = ParseInt<size_t>(Param)) - { - MaxBlockSize = Value.value(); - } - } - size_t MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize; - if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false) - { - if (auto Value = ParseInt<size_t>(Param)) - { - MaxChunkEmbedSize = Value.value(); - } - } - size_t MaxChunksPerBlock = RemoteStoreOptions::DefaultMaxChunksPerBlock; - if (auto Param = Params.GetValue("maxchunksperblock"); Param.empty() == false) - { - if (auto Value = ParseInt<size_t>(Param)) - { - MaxChunksPerBlock = Value.value(); - } - } - - size_t ChunkFileSizeLimit = RemoteStoreOptions::DefaultChunkFileSizeLimit; - if (auto Param = Params.GetValue("chunkfilesizelimit"); Param.empty() == false) - { - if (auto Value = ParseInt<size_t>(Param)) - { - ChunkFileSizeLimit = Value.value(); - } - } - - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - - RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( - m_CidStore, - *Project, - *Oplog, - WorkerPool, - MaxBlockSize, - MaxChunkEmbedSize, - MaxChunksPerBlock, - ChunkFileSizeLimit, - /* BuildBlocks */ false, - /* IgnoreMissingAttachments */ false, - /* AllowChunking*/ false, - [](CompressedBuffer&&, ChunkBlockDescription&&) {}, - [](const IoHash&, TGetAttachmentBufferFunc&&) {}, - [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, - /* EmbedLooseFiles*/ false); - - if (ContainerResult.ErrorCode == 0) - { - return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerResult.ContainerObject); - } - else - { - ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", - ToString(HttpReq.RequestVerb()), - HttpReq.QueryString(), - ContainerResult.ErrorCode, - ContainerResult.Reason); - - if (ContainerResult.Reason.empty()) - { - return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode)); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode), HttpContentType::kText, ContainerResult.Reason); - } - } -} - -void -HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::Rpc"); - using namespace std::literals; - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - IoBuffer Payload = HttpReq.ReadPayload(); - - HttpContentType PayloadContentType = HttpReq.RequestContentType(); - CbPackage Package; - CbObject Cb; - switch (PayloadContentType) - { - case HttpContentType::kJSON: - case HttpContentType::kUnknownContentType: - case HttpContentType::kText: - { - std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); - Cb = LoadCompactBinaryFromJson(JsonText).AsObject(); - if (!Cb) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Content format not supported, expected JSON format"); - } - } - break; - case HttpContentType::kCbObject: - { - CbValidateError ValidateResult; - if (Cb = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult); - ValidateResult != CbValidateError::None || !Cb) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse( - HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Content format not supported, expected compact binary format ('{}')", ToString(ValidateResult))); - } - break; - } - case HttpContentType::kCbPackage: - try - { - Package = ParsePackageMessage(Payload); - Cb = Package.GetObject(); - } - catch (const std::invalid_argument& ex) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Failed to parse package request, reason: '{}'", ex.what())); - } - if (!Cb) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Content format not supported, expected package message format"); - } - break; - default: - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type"); - } - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Rpc oplog request for unknown project '{}'", ProjectId)); - } - Project->TouchProject(); - - std::string_view Method = Cb["method"sv].AsString(); - - bool VerifyPathOnDisk = Method != "getchunks"sv; - - Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk); - if (!Oplog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); - } - Project->TouchOplog(OplogId); - - uint32_t MethodHash = HashStringDjb2(Method); - - switch (MethodHash) - { - case HashStringDjb2("import"sv): - { - if (!m_ProjectStore->AreDiskWritesAllowed()) - { - return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - CbObjectView Params = Cb["params"sv].AsObjectView(); - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); - size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); - bool Force = Params["force"sv].AsBool(false); - bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); - bool CleanOplog = Params["clean"].AsBool(false); - - CreateRemoteStoreResult RemoteStoreResult = - CreateRemoteStore(Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath()); - - if (RemoteStoreResult.Store == nullptr) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description); - } - std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); - RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); - - JobId JobId = m_JobQueue.QueueJob( - fmt::format("Import oplog '{}/{}'", Project->Identifier, Oplog->OplogId()), - [this, - ChunkStore = &m_CidStore, - ActualRemoteStore = std::move(RemoteStore), - Oplog, - Force, - IgnoreMissingAttachments, - CleanOplog](JobContext& Context) { - Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}", - Oplog->GetOuterProjectIdentifier(), - Oplog->OplogId(), - ActualRemoteStore->GetInfo().Description)); - - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background); - - RemoteProjectStore::Result Result = LoadOplog(m_CidStore, - *ActualRemoteStore, - *Oplog, - NetworkWorkerPool, - WorkerPool, - Force, - IgnoreMissingAttachments, - CleanOplog, - &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, - (int)Response.first); - } - }); - - return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id)); - } - case HashStringDjb2("export"sv): - { - CbObjectView Params = Cb["params"sv].AsObjectView(); - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); - size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); - size_t MaxChunksPerBlock = Params["maxchunksperblock"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunksPerBlock); - size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit); - bool Force = Params["force"sv].AsBool(false); - bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); - bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); - - CreateRemoteStoreResult RemoteStoreResult = - CreateRemoteStore(Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath()); - - if (RemoteStoreResult.Store == nullptr) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description); - } - std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); - RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); - - JobId JobId = m_JobQueue.QueueJob( - fmt::format("Export oplog '{}/{}'", Project->Identifier, Oplog->OplogId()), - [this, - ActualRemoteStore = std::move(RemoteStore), - Project, - Oplog, - MaxBlockSize, - MaxChunksPerBlock, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - EmbedLooseFile, - Force, - IgnoreMissingAttachments](JobContext& Context) { - Context.ReportMessage(fmt::format("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", - Project->Identifier, - Oplog->OplogId(), - ActualRemoteStore->GetInfo().Description, - NiceBytes(MaxBlockSize), - NiceBytes(MaxChunkEmbedSize))); - - WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); - WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(EWorkloadType::Background); - - RemoteProjectStore::Result Result = SaveOplog(m_CidStore, - *ActualRemoteStore, - *Project, - *Oplog, - NetworkWorkerPool, - WorkerPool, - MaxBlockSize, - MaxChunksPerBlock, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - EmbedLooseFile, - Force, - IgnoreMissingAttachments, - &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, - (int)Response.first); - } - }); - - return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id)); - } - case HashStringDjb2("getchunks"sv): - { - RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u)); - int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0); - - std::vector<ProjectStore::ChunkRequest> Requests = m_ProjectStore->ParseChunksRequests(*Project, *Oplog, Cb); - std::vector<ProjectStore::ChunkResult> Results = - Requests.empty() ? std::vector<ProjectStore::ChunkResult>{} : m_ProjectStore->GetChunks(*Project, *Oplog, Requests); - CbPackage Response = m_ProjectStore->WriteChunksRequestResponse(*Project, *Oplog, std::move(Requests), std::move(Results)); - - void* TargetProcessHandle = nullptr; - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) - { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) - { - Flags |= FormatFlags::kDenyPartialLocalReferences; - } - TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId); - } - - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(Response, Flags, TargetProcessHandle); - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - case HashStringDjb2("putchunks"sv): - { - ZEN_TRACE_CPU("Store::Rpc::putchunks"); - if (!m_ProjectStore->AreDiskWritesAllowed()) - { - return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - CbObject Object = Package.GetObject(); - const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false); - - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - if (!Attachments.empty()) - { - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - - WriteAttachmentBuffers.reserve(Attachments.size()); - WriteRawHashes.reserve(Attachments.size()); - - for (const CbAttachment& Attachment : Attachments) - { - IoHash RawHash = Attachment.GetHash(); - const CompressedBuffer& Compressed = Attachment.AsCompressedBinary(); - IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer(); - if (UsingTempFiles) - { - AttachmentPayload.SetDeleteOnClose(true); - } - WriteAttachmentBuffers.push_back(std::move(AttachmentPayload)); - WriteRawHashes.push_back(RawHash); - } - - Oplog->CaptureAddedAttachments(WriteRawHashes); - m_CidStore.AddChunks(WriteAttachmentBuffers, - WriteRawHashes, - UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly); - } - return HttpReq.WriteResponse(HttpResponseCode::OK); - } - case HashStringDjb2("snapshot"sv): - { - ZEN_TRACE_CPU("Store::Rpc::snapshot"); - if (!m_ProjectStore->AreDiskWritesAllowed()) - { - return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - // Snapshot all referenced files. This brings the content of all - // files into the CID store - - uint32_t OpCount = 0; - uint64_t InlinedBytes = 0; - uint64_t InlinedFiles = 0; - uint64_t TotalBytes = 0; - uint64_t TotalFiles = 0; - - std::vector<CbObject> NewOps; - struct AddedChunk - { - IoBuffer Buffer; - uint64_t RawSize = 0; - }; - tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks; - - Oplog->IterateOplog( - [&](CbObjectView Op) { - bool OpRewritten = false; - bool AllOk = true; - - CbWriter FilesArrayWriter; - FilesArrayWriter.BeginArray("files"sv); - - for (CbFieldView& Field : Op["files"sv]) - { - bool CopyField = true; - - if (CbObjectView View = Field.AsObjectView()) - { - const IoHash DataHash = View["data"sv].AsHash(); - - if (DataHash == IoHash::Zero) - { - std::string_view ServerPath = View["serverpath"sv].AsString(); - std::filesystem::path FilePath = Project->RootDir / ServerPath; - BasicFile DataFile; - std::error_code Ec; - DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); - - if (Ec) - { - // Error... - - ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message()); - - AllOk = false; - } - else - { - // Read file contents into memory, compress and keep in map of chunks to add to Cid store - IoBuffer FileIoBuffer = DataFile.ReadAll(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); - const uint64_t RawSize = Compressed.DecodeRawSize(); - const IoHash RawHash = Compressed.DecodeRawHash(); - if (!AddedChunks.contains(RawHash)) - { - const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString(); - BasicFile ChunkTempFile; - ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete); - ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec); - if (Ec) - { - Oid ChunkId = View["id"sv].AsObjectId(); - ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}", - FilePath, - ChunkId, - Ec.message()); - AllOk = false; - } - else - { - void* FileHandle = ChunkTempFile.Detach(); - IoBuffer ChunkBuffer(IoBuffer::File, - FileHandle, - 0, - Compressed.GetCompressed().GetSize(), - /*IsWholeFile*/ true); - ChunkBuffer.SetDeleteOnClose(true); - AddedChunks.insert_or_assign( - RawHash, - AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize}); - } - } - - TotalBytes += RawSize; - ++TotalFiles; - - // Rewrite file array entry with new data reference - CbObjectWriter Writer(View.GetSize()); - RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { - if (Field.GetName() == "data"sv) - { - // omit this field as we will write it explicitly ourselves - return true; - } - return false; - }); - Writer.AddBinaryAttachment("data"sv, RawHash); - - CbObject RewrittenOp = Writer.Save(); - FilesArrayWriter.AddObject(std::move(RewrittenOp)); - CopyField = false; - } - } - } - - if (CopyField) - { - FilesArrayWriter.AddField(Field); - } - else - { - OpRewritten = true; - } - } - - if (OpRewritten && AllOk) - { - FilesArrayWriter.EndArray(); - CbArray FilesArray = FilesArrayWriter.Save().AsArray(); - - CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { - if (Field.GetName() == "files"sv) - { - NewWriter.AddArray("files"sv, FilesArray); - - return true; - } - - return false; - }); - - NewOps.push_back(std::move(RewrittenOp)); - } - - OpCount++; - }, - ProjectStore::Oplog::Paging{}); - - CbObjectWriter ResponseObj; - - // Persist rewritten oplog entries - if (!NewOps.empty()) - { - ResponseObj.BeginArray("rewritten_ops"); - - for (CbObject& NewOp : NewOps) - { - ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); - - ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number); - - ResponseObj.AddInteger(NewLsn.Number); - } - - ResponseObj.EndArray(); - } - - // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the - // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have - // unreferenced chunks. - for (auto It : AddedChunks) - { - const IoHash& RawHash = It.first; - AddedChunk& Chunk = It.second; - CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash); - if (Result.New) - { - InlinedBytes += Chunk.RawSize; - ++InlinedFiles; - } - } - - ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; - ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; - - ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount); - - return HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save()); - } - case HashStringDjb2("appendops"sv): - { - ZEN_TRACE_CPU("Store::Rpc::appendops"); - if (!m_ProjectStore->AreDiskWritesAllowed()) - { - return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - } - - CbArrayView OpsArray = Cb["ops"sv].AsArrayView(); - - size_t OpsBufferSize = 0; - for (CbFieldView OpView : OpsArray) - { - OpsBufferSize += OpView.GetSize(); - } - UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize); - MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView(); - - std::vector<CbObjectView> Ops; - Ops.reserve(OpsArray.Num()); - for (CbFieldView OpView : OpsArray) - { - OpView.CopyTo(OpsBuffersMemory); - Ops.push_back(CbObjectView(OpsBuffersMemory.GetData())); - OpsBuffersMemory.MidInline(OpView.GetSize()); - } - - std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops); - ZEN_ASSERT(LSNs.size() == Ops.size()); - - std::vector<IoHash> MissingAttachments; - for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++) - { - if (LSNs[OpIndex]) - { - CbObjectView Op = Ops[OpIndex]; - Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) { - const IoHash Cid = AttachmentView.AsAttachment(); - if (!m_CidStore.ContainsChunk(Cid)) - { - MissingAttachments.push_back(Cid); - } - }); - } - } - - CbPackage ResponsePackage; - - { - CbObjectWriter ResponseObj; - ResponseObj.BeginArray("written_ops"); - - for (ProjectStore::LogSequenceNumber NewLsn : LSNs) - { - ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number); - ResponseObj.AddInteger(NewLsn.Number); - } - ResponseObj.EndArray(); - - if (!MissingAttachments.empty()) - { - ResponseObj.BeginArray("need"); - - for (const IoHash& Cid : MissingAttachments) - { - ResponseObj.AddHash(Cid); - } - ResponseObj.EndArray(); - } - ResponsePackage.SetObject(ResponseObj.Save()); - } - - std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage); - - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers); - } - default: - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Unknown rpc method '{}'", Method)); - } -} -void -HttpProjectService::HandleDetailsRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::Details"); - - using namespace std::literals; - - HttpServerRequest& HttpReq = Req.ServerRequest(); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - bool CSV = Params.GetValue("csv"sv) == "true"sv; - bool Details = Params.GetValue("details"sv) == "true"sv; - bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv; - bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv; - - if (CSV) - { - ExtendableStringBuilder<4096> CSVWriter; - CSVHeader(Details, AttachmentDetails, CSVWriter); - - m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) { - Project.IterateOplogs([&](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) { - Oplog.IterateOplogWithKey( - [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](ProjectStore::LogSequenceNumber LSN, - const Oid& Key, - CbObjectView Op) { - CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); - }); - }); - }); - - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); - } - else - { - CbObjectWriter Cbo; - Cbo.BeginArray("projects"); - { - m_ProjectStore->DiscoverProjects(); - - m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) { - std::vector<std::string> OpLogs = Project.ScanForOplogs(); - CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); - }); - } - Cbo.EndArray(); - HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - } -} - -void -HttpProjectService::HandleProjectDetailsRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::ProjectDetails"); - - using namespace std::literals; - - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - bool CSV = Params.GetValue("csv"sv) == "true"sv; - bool Details = Params.GetValue("details"sv) == "true"sv; - bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv; - bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv; - - Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId); - if (!FoundProject) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - ProjectStore::Project& Project = *FoundProject.Get(); - - if (CSV) - { - ExtendableStringBuilder<4096> CSVWriter; - CSVHeader(Details, AttachmentDetails, CSVWriter); - - FoundProject->IterateOplogs([&](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) { - Oplog.IterateOplogWithKey([this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](ProjectStore::LogSequenceNumber LSN, - const Oid& Key, - CbObjectView Op) { - CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); - }); - }); - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); - } - else - { - CbObjectWriter Cbo; - std::vector<std::string> OpLogs = FoundProject->ScanForOplogs(); - Cbo.BeginArray("projects"); - { - CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); - } - Cbo.EndArray(); - HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - } -} - -void -HttpProjectService::HandleOplogDetailsRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::OplogDetails"); - - using namespace std::literals; - - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - bool CSV = Params.GetValue("csv"sv) == "true"sv; - bool Details = Params.GetValue("details"sv) == "true"sv; - bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv; - bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv; - - Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId); - if (!FoundProject) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - Ref<ProjectStore::Oplog> FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Project& Project = *FoundProject.Get(); - ProjectStore::Oplog& Oplog = *FoundLog; - if (CSV) - { - ExtendableStringBuilder<4096> CSVWriter; - CSVHeader(Details, AttachmentDetails, CSVWriter); - - Oplog.IterateOplogWithKey([this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](ProjectStore::LogSequenceNumber LSN, - const Oid& Key, - CbObjectView Op) { - CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); - }); - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); - } - else - { - CbObjectWriter Cbo; - Cbo.BeginArray("oplogs"); - { - CbWriteOplog(m_CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo); - } - Cbo.EndArray(); - HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - } -} - -void -HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req) -{ - ZEN_TRACE_CPU("ProjectService::OplogOpDetails"); - - using namespace std::literals; - - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& OpId = Req.GetCapture(3); - - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); - bool CSV = Params.GetValue("csv"sv) == "true"sv; - bool Details = Params.GetValue("details"sv) == "true"sv; - bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv; - bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv; - - Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId); - if (!FoundProject) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - Ref<ProjectStore::Oplog> FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - if (OpId.size() != 2 * sizeof(Oid::OidBits)) - { - m_ProjectStats.BadRequestCount++; - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, OpId)); - } - - const Oid ObjId = Oid::FromHexString(OpId); - ProjectStore::Project& Project = *FoundProject.Get(); - ProjectStore::Oplog& Oplog = *FoundLog; - - std::optional<CbObject> Op = Oplog.GetOpByKey(ObjId); - if (!Op.has_value()) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - ProjectStore::LogSequenceNumber LSN = Oplog.GetOpIndexByKey(ObjId); - if (!LSN) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - if (CSV) - { - ExtendableStringBuilder<4096> CSVWriter; - CSVHeader(Details, AttachmentDetails, CSVWriter); - - CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, ObjId, Op.value(), CSVWriter); - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); - } - else - { - CbObjectWriter Cbo; - Cbo.BeginArray("ops"); - { - CbWriteOp(m_CidStore, Details, OpDetails, AttachmentDetails, LSN, ObjId, Op.value(), Cbo); - } - Cbo.EndArray(); - HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - } -} - -} // namespace zen |