// Copyright Epic Games, Inc. All Rights Reserved. #include "httpprojectstore.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace zen { const FLLMTag& GetProjectHttpTag() { static FLLMTag _("http", FLLMTag("project")); return _; } void CSVHeader(bool Details, bool AttachmentDetails, StringBuilderBase& CSVWriter) { if (AttachmentDetails) { CSVWriter << "Project, Oplog, LSN, Key, Cid, Size"; } else if (Details) { CSVWriter << "Project, Oplog, LSN, Key, Size, AttachmentCount, AttachmentsSize"; } else { CSVWriter << "Project, Oplog, Key"; } } void CSVWriteOp(CidStore& CidStore, std::string_view ProjectId, std::string_view OplogId, bool Details, bool AttachmentDetails, ProjectStore::LogSequenceNumber LSN, const Oid& Key, CbObjectView Op, StringBuilderBase& CSVWriter) { StringBuilder<32> KeyStringBuilder; Key.ToString(KeyStringBuilder); const std::string_view KeyString = KeyStringBuilder.ToView(); if (AttachmentDetails) { Op.IterateAttachments([&CidStore, &CSVWriter, &ProjectId, &OplogId, LSN, &KeyString](CbFieldView FieldView) { const IoHash AttachmentHash = FieldView.AsAttachment(); IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); CSVWriter << "\r\n" << ProjectId << ", " << OplogId << ", " << LSN.Number << ", " << KeyString << ", " << AttachmentHash.ToHexString() << ", " << gsl::narrow(Attachment.GetSize()); }); } else if (Details) { uint64_t AttachmentCount = 0; size_t AttachmentsSize = 0; Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) { const IoHash AttachmentHash = FieldView.AsAttachment(); AttachmentCount++; IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); AttachmentsSize += Attachment.GetSize(); }); CSVWriter << "\r\n" << ProjectId << ", " << OplogId << ", " << LSN.Number << ", " << KeyString << ", " << gsl::narrow(Op.GetSize()) << ", " << AttachmentCount << ", " << gsl::narrow(AttachmentsSize); } else { CSVWriter << "\r\n" << ProjectId << ", " << OplogId << ", " << KeyString; } }; ////////////////////////////////////////////////////////////////////////// namespace { void CbWriteOp(CidStore& CidStore, bool Details, bool OpDetails, bool AttachmentDetails, ProjectStore::LogSequenceNumber LSN, const Oid& Key, CbObjectView Op, CbObjectWriter& CbWriter) { CbWriter.BeginObject(); { CbWriter.AddObjectId("key", Key); if (Details) { CbWriter.AddInteger("lsn", LSN.Number); CbWriter.AddInteger("size", gsl::narrow(Op.GetSize())); } if (AttachmentDetails) { CbWriter.BeginArray("attachments"); Op.IterateAttachments([&CidStore, &CbWriter](CbFieldView FieldView) { const IoHash AttachmentHash = FieldView.AsAttachment(); CbWriter.BeginObject(); { IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); CbWriter.AddString("cid", AttachmentHash.ToHexString()); CbWriter.AddInteger("size", gsl::narrow(Attachment.GetSize())); } CbWriter.EndObject(); }); CbWriter.EndArray(); } else if (Details) { uint64_t AttachmentCount = 0; size_t AttachmentsSize = 0; Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) { const IoHash AttachmentHash = FieldView.AsAttachment(); AttachmentCount++; IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash); AttachmentsSize += Attachment.GetSize(); }); if (AttachmentCount > 0) { CbWriter.AddInteger("attachments", AttachmentCount); CbWriter.AddInteger("attachmentssize", gsl::narrow(AttachmentsSize)); } } if (OpDetails) { CbWriter.BeginObject("op"); for (const CbFieldView& Field : Op) { if (!Field.HasName()) { CbWriter.AddField(Field); continue; } std::string_view FieldName = Field.GetName(); CbWriter.AddField(FieldName, Field); } CbWriter.EndObject(); } } CbWriter.EndObject(); }; void CbWriteOplogOps(CidStore& CidStore, ProjectStore::Oplog& Oplog, bool Details, bool OpDetails, bool AttachmentDetails, CbObjectWriter& Cbo) { Cbo.BeginArray("ops"); { Oplog.IterateOplogWithKey([&Cbo, &CidStore, Details, OpDetails, AttachmentDetails](ProjectStore::LogSequenceNumber LSN, const Oid& Key, CbObjectView Op) { CbWriteOp(CidStore, Details, OpDetails, AttachmentDetails, LSN, Key, Op, Cbo); }); } Cbo.EndArray(); } void CbWriteOplog(CidStore& CidStore, ProjectStore::Oplog& Oplog, bool Details, bool OpDetails, bool AttachmentDetails, CbObjectWriter& Cbo) { Cbo.BeginObject(); { Cbo.AddString("name", Oplog.OplogId()); CbWriteOplogOps(CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo); } Cbo.EndObject(); } void CbWriteOplogs(CidStore& CidStore, ProjectStore::Project& Project, std::vector OpLogs, bool Details, bool OpDetails, bool AttachmentDetails, CbObjectWriter& Cbo) { Cbo.BeginArray("oplogs"); { for (const std::string& OpLogId : OpLogs) { Ref Oplog = Project.OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); if (Oplog) { CbWriteOplog(CidStore, *Oplog, Details, OpDetails, AttachmentDetails, Cbo); } } } Cbo.EndArray(); } void CbWriteProject(CidStore& CidStore, ProjectStore::Project& Project, std::vector OpLogs, bool Details, bool OpDetails, bool AttachmentDetails, CbObjectWriter& Cbo) { Cbo.BeginObject(); { Cbo.AddString("name", Project.Identifier); CbWriteOplogs(CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); } Cbo.EndObject(); } struct CreateRemoteStoreResult { std::shared_ptr Store; std::string Description; }; CreateRemoteStoreResult CreateRemoteStore(LoggerRef InLog, CbObjectView Params, AuthMgr& AuthManager, size_t MaxBlockSize, size_t MaxChunkEmbedSize, size_t MaximumInMemoryDownloadSize, const std::filesystem::path& TempFilePath) { ZEN_MEMSCOPE(GetProjectHttpTag()); auto Log = [InLog]() { return InLog; }; using namespace std::literals; std::shared_ptr RemoteStore; if (CbObjectView File = Params["file"sv].AsObjectView(); File) { std::filesystem::path FolderPath(File["path"sv].AsString()); if (FolderPath.empty()) { return {nullptr, "Missing file path"}; } std::string_view Name(File["name"sv].AsString()); if (Name.empty()) { return {nullptr, "Missing file name"}; } std::string_view OptionalBaseName(File["basename"sv].AsString()); bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false); bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false); FileRemoteStoreOptions Options = { RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, FolderPath, std::string(Name), std::string(OptionalBaseName), ForceDisableBlocks, ForceEnableTempBlocks}; RemoteStore = CreateFileRemoteStore(Log(), Options); } if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud) { std::string_view CloudServiceUrl = Cloud["url"sv].AsString(); if (CloudServiceUrl.empty()) { return {nullptr, "Missing service url"}; } std::string Url = UrlDecode(CloudServiceUrl); std::string_view Namespace = Cloud["namespace"sv].AsString(); if (Namespace.empty()) { return {nullptr, "Missing namespace"}; } std::string_view Bucket = Cloud["bucket"sv].AsString(); if (Bucket.empty()) { return {nullptr, "Missing bucket"}; } std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString(); std::string AccessToken = std::string(Cloud["access-token"sv].AsString()); if (AccessToken.empty()) { std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString(); if (!AccessTokenEnvVariable.empty()) { AccessToken = GetEnvVariable(AccessTokenEnvVariable); } } std::filesystem::path OidcExePath; if (std::string_view OidcExePathString = Cloud["oidc-exe-path"].AsString(); !OidcExePathString.empty()) { std::filesystem::path OidcExePathMaybe(OidcExePathString); if (IsFile(OidcExePathMaybe)) { OidcExePath = std::move(OidcExePathMaybe); } else { ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); } } std::string_view KeyParam = Cloud["key"sv].AsString(); if (KeyParam.empty()) { return {nullptr, "Missing key"}; } if (KeyParam.length() != IoHash::StringLength) { return {nullptr, "Invalid key"}; } IoHash Key = IoHash::FromHexString(KeyParam); if (Key == IoHash::Zero) { return {nullptr, "Invalid key string"}; } IoHash BaseKey = IoHash::Zero; std::string_view BaseKeyParam = Cloud["basekey"sv].AsString(); if (!BaseKeyParam.empty()) { if (BaseKeyParam.length() != IoHash::StringLength) { return {nullptr, "Invalid base key"}; } BaseKey = IoHash::FromHexString(BaseKeyParam); if (BaseKey == IoHash::Zero) { return {nullptr, "Invalid base key string"}; } } bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false); bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false); bool AssumeHttp2 = Cloud["assumehttp2"sv].AsBool(false); JupiterRemoteStoreOptions Options = { RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, Url, std::string(Namespace), std::string(Bucket), Key, BaseKey, std::string(OpenIdProvider), AccessToken, AuthManager, OidcExePath, ForceDisableBlocks, ForceDisableTempBlocks, AssumeHttp2}; RemoteStore = CreateJupiterRemoteStore(Log(), Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true); } if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen) { std::string_view Url = Zen["url"sv].AsString(); std::string_view Project = Zen["project"sv].AsString(); if (Project.empty()) { return {nullptr, "Missing project"}; } std::string_view Oplog = Zen["oplog"sv].AsString(); if (Oplog.empty()) { return {nullptr, "Missing oplog"}; } ZenRemoteStoreOptions Options = { RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, std::string(Url), std::string(Project), std::string(Oplog)}; RemoteStore = CreateZenRemoteStore(Log(), Options, TempFilePath); } if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds) { std::string_view BuildsServiceHost = Builds["url"sv].AsString(); std::string_view BuildsServiceOverrideHost = Builds["override-host"sv].AsString(); if (BuildsServiceHost.empty() && BuildsServiceOverrideHost.empty()) { return {nullptr, "Missing service url or host"}; } std::string_view BuildsZenCacheHost = Builds["zencachehost"sv].AsString(); std::string Host = UrlDecode(BuildsServiceHost); std::string OverrideHost = UrlDecode(BuildsServiceOverrideHost); std::string ZenHost = UrlDecode(BuildsZenCacheHost); std::string_view Namespace = Builds["namespace"sv].AsString(); if (Namespace.empty()) { return {nullptr, "Missing namespace"}; } std::string_view Bucket = Builds["bucket"sv].AsString(); if (Bucket.empty()) { return {nullptr, "Missing bucket"}; } std::string_view OpenIdProvider = Builds["openid-provider"sv].AsString(); std::string AccessToken = std::string(Builds["access-token"sv].AsString()); if (AccessToken.empty()) { std::string_view AccessTokenEnvVariable = Builds["access-token-env"].AsString(); if (!AccessTokenEnvVariable.empty()) { AccessToken = GetEnvVariable(AccessTokenEnvVariable); } } std::filesystem::path OidcExePath; if (std::string_view OidcExePathString = Builds["oidc-exe-path"].AsString(); !OidcExePathString.empty()) { std::filesystem::path OidcExePathMaybe(OidcExePathString); if (IsFile(OidcExePathMaybe)) { OidcExePath = std::move(OidcExePathMaybe); } else { ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); } } std::string_view BuildIdParam = Builds["buildsid"sv].AsString(); if (BuildIdParam.empty()) { return {nullptr, "Missing build id"}; } if (BuildIdParam.length() != Oid::StringLength) { return {nullptr, "Invalid build id"}; } Oid BuildId = Oid::FromHexString(BuildIdParam); if (BuildId == Oid::Zero) { return {nullptr, "Invalid build id string"}; } bool ForceDisableBlocks = Builds["disableblocks"sv].AsBool(false); bool ForceDisableTempBlocks = Builds["disabletempblocks"sv].AsBool(false); bool AssumeHttp2 = Builds["assumehttp2"sv].AsBool(false); bool PopulateCache = Builds["populateCache"sv].AsBool(false); MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView(); IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize()); BuildsRemoteStoreOptions Options = { RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, Host, OverrideHost, ZenHost, std::string(Namespace), std::string(Bucket), BuildId, std::string(OpenIdProvider), AccessToken, AuthManager, OidcExePath, ForceDisableBlocks, ForceDisableTempBlocks, AssumeHttp2, PopulateCache, MetaData, MaximumInMemoryDownloadSize}; RemoteStore = CreateJupiterBuildsRemoteStore(Log(), Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true, GetTinyWorkerPool(EWorkloadType::Background)); } if (!RemoteStore) { return {nullptr, "Unknown remote store type"}; } return {std::move(RemoteStore), ""}; } std::pair ConvertResult(const RemoteProjectStore::Result& Result) { if (Result.ErrorCode == 0) { return {HttpResponseCode::OK, Result.Text}; } return {static_cast(Result.ErrorCode), Result.Reason.empty() ? Result.Text : Result.Text.empty() ? Result.Reason : fmt::format("{}: {}", Result.Reason, Result.Text)}; } static uint64_t GetMaxMemoryBufferSize(size_t MaxBlockSize, bool BoostWorkerMemory) { return BoostWorkerMemory ? (MaxBlockSize + 16u * 1024u) : 1024u * 1024u; } } // namespace ////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////// HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatusService& StatusService, HttpStatsService& StatsService, AuthMgr& AuthMgr, OpenProcessCache& InOpenProcessCache, JobQueue& InJobQueue) : m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectStore(Projects) , m_StatusService(StatusService) , m_StatsService(StatsService) , m_AuthMgr(AuthMgr) , m_OpenProcessCache(InOpenProcessCache) , m_JobQueue(InJobQueue) { ZEN_MEMSCOPE(GetProjectHttpTag()); using namespace std::literals; static constexpr AsciiSet ValidProjectCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; static constexpr AsciiSet ValidOplogCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; static constexpr AsciiSet ValidNumberCharactersSet{"0123456789"}; static constexpr AsciiSet ValidHexCharactersSet{"0123456789abcdefABCDEF"}; m_Router.AddMatcher("project", [](std::string_view Str) -> bool { return !Str.empty() && AsciiSet::HasOnly(Str, ValidProjectCharactersSet); }); m_Router.AddMatcher("log", [](std::string_view Str) -> bool { return !Str.empty() && AsciiSet::HasOnly(Str, ValidOplogCharactersSet); }); m_Router.AddMatcher("op", [](std::string_view Str) -> bool { return !Str.empty() && AsciiSet::HasOnly(Str, ValidNumberCharactersSet); }); m_Router.AddMatcher("chunk", [](std::string_view Str) -> bool { return Str.length() == Oid::StringLength && AsciiSet::HasOnly(Str, ValidHexCharactersSet); }); m_Router.AddMatcher("hash", [](std::string_view Str) -> bool { return Str.length() == IoHash::StringLength && AsciiSet::HasOnly(Str, ValidHexCharactersSet); }); m_Router.RegisterRoute( "", [this](HttpRouterRequest& Req) { HandleProjectListRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "list", [this](HttpRouterRequest& Req) { HandleProjectListRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/batch", [this](HttpRouterRequest& Req) { HandleChunkBatchRequest(Req); }, HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/files", [this](HttpRouterRequest& Req) { HandleFilesRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/chunkinfos", [this](HttpRouterRequest& Req) { HandleChunkInfosRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/{chunk}/info", [this](HttpRouterRequest& Req) { HandleChunkInfoRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/{chunk}", [this](HttpRouterRequest& Req) { HandleChunkByIdRequest(Req); }, HttpVerb::kGet | HttpVerb::kHead); m_Router.RegisterRoute( "{project}/oplog/{log}/{hash}", [this](HttpRouterRequest& Req) { HandleChunkByCidRequest(Req); }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/prep", [this](HttpRouterRequest& Req) { HandleOplogOpPrepRequest(Req); }, HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/new", [this](HttpRouterRequest& Req) { HandleOplogOpNewRequest(Req); }, HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/validate", [this](HttpRouterRequest& Req) { HandleOplogValidateRequest(Req); }, HttpVerb::kPost); m_Router.RegisterRoute( "{project}/oplog/{log}/{op}", [this](HttpRouterRequest& Req) { HandleOpLogOpRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}", [this](HttpRouterRequest& Req) { HandleOpLogRequest(Req); }, HttpVerb::kGet | HttpVerb::kPut | HttpVerb::kPost | HttpVerb::kDelete); m_Router.RegisterRoute( "{project}/oplog/{log}/entries", [this](HttpRouterRequest& Req) { HandleOpLogEntriesRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}", [this](HttpRouterRequest& Req) { HandleProjectRequest(Req); }, HttpVerb::kGet | HttpVerb::kPut | HttpVerb::kPost | HttpVerb::kDelete); // Push a oplog container m_Router.RegisterRoute( "{project}/oplog/{log}/save", [this](HttpRouterRequest& Req) { HandleOplogSaveRequest(Req); }, HttpVerb::kPost); // Pull a oplog container m_Router.RegisterRoute( "{project}/oplog/{log}/load", [this](HttpRouterRequest& Req) { HandleOplogLoadRequest(Req); }, HttpVerb::kGet); // Do an rpc style operation on project/oplog m_Router.RegisterRoute( "{project}/oplog/{log}/rpc", [this](HttpRouterRequest& Req) { HandleRpcRequest(Req); }, HttpVerb::kPost); m_Router.RegisterRoute( "details\\$", [this](HttpRouterRequest& Req) { HandleDetailsRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "details\\$/{project}", [this](HttpRouterRequest& Req) { HandleProjectDetailsRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "details\\$/{project}/{log}", [this](HttpRouterRequest& Req) { HandleOplogDetailsRequest(Req); }, HttpVerb::kGet); m_Router.RegisterRoute( "details\\$/{project}/{log}/{chunk}", [this](HttpRouterRequest& Req) { HandleOplogOpDetailsRequest(Req); }, HttpVerb::kGet); m_StatusService.RegisterHandler("prj", *this); m_StatsService.RegisterHandler("prj", *this); } HttpProjectService::~HttpProjectService() { m_StatsService.UnregisterHandler("prj", *this); m_StatusService.UnregisterHandler("prj", *this); } const char* HttpProjectService::BaseUri() const { return "/prj/"; } void HttpProjectService::HandleRequest(HttpServerRequest& Request) { m_ProjectStats.RequestCount++; ZEN_MEMSCOPE(GetProjectHttpTag()); metrics::OperationTiming::Scope $(m_HttpRequests); if (m_Router.HandleRequest(Request) == false) { m_ProjectStats.BadRequestCount++; ZEN_WARN("No route found for {0}", Request.RelativeUri()); } } void HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq) { ZEN_TRACE_CPU("ProjectService::Stats"); const GcStorageSize StoreSize = m_ProjectStore->StorageSize(); const CidStoreSize CidSize = m_CidStore.TotalSize(); CbObjectWriter Cbo; EmitSnapshot("requests", m_HttpRequests, Cbo); Cbo.BeginObject("store"); { Cbo.BeginObject("size"); { Cbo << "disk" << StoreSize.DiskSize; Cbo << "memory" << StoreSize.MemorySize; } Cbo.EndObject(); Cbo.BeginObject("project"); { Cbo << "readcount" << m_ProjectStats.ProjectReadCount << "writecount" << m_ProjectStats.ProjectWriteCount << "deletecount" << m_ProjectStats.ProjectDeleteCount; } Cbo.EndObject(); Cbo.BeginObject("oplog"); { Cbo << "readcount" << m_ProjectStats.OpLogReadCount << "writecount" << m_ProjectStats.OpLogWriteCount << "deletecount" << m_ProjectStats.OpLogDeleteCount; } Cbo.EndObject(); Cbo.BeginObject("op"); { Cbo << "hitcount" << m_ProjectStats.OpHitCount << "misscount" << m_ProjectStats.OpMissCount << "writecount" << m_ProjectStats.OpWriteCount; } Cbo.EndObject(); Cbo.BeginObject("chunk"); { Cbo << "hitcount" << m_ProjectStats.ChunkHitCount << "misscount" << m_ProjectStats.ChunkMissCount << "writecount" << m_ProjectStats.ChunkWriteCount; } Cbo.EndObject(); Cbo << "requestcount" << m_ProjectStats.RequestCount; Cbo << "badrequestcount" << m_ProjectStats.BadRequestCount; } Cbo.EndObject(); Cbo.BeginObject("cid"); { Cbo.BeginObject("size"); { Cbo << "tiny" << CidSize.TinySize; Cbo << "small" << CidSize.SmallSize; Cbo << "large" << CidSize.LargeSize; Cbo << "total" << CidSize.TotalSize; } Cbo.EndObject(); } Cbo.EndObject(); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } void HttpProjectService::HandleStatusRequest(HttpServerRequest& Request) { ZEN_TRACE_CPU("HttpProjectService::Status"); CbObjectWriter Cbo; Cbo << "ok" << true; Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } void HttpProjectService::HandleProjectListRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::ProjectList"); HttpServerRequest& HttpReq = Req.ServerRequest(); CbArray ProjectsList = m_ProjectStore->GetProjectsList(); HttpReq.WriteResponse(HttpResponseCode::OK, ProjectsList); } void HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::ChunkBatch"); HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } Project->TouchProject(); Ref FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } Project->TouchOplog(OplogId); // Parse Request IoBuffer Payload = HttpReq.ReadPayload(); BinaryReader Reader(Payload); struct RequestHeader { enum { kMagic = 0xAAAA'77AC }; uint32_t Magic; uint32_t ChunkCount; uint32_t Reserved1; uint32_t Reserved2; }; struct RequestChunkEntry { Oid ChunkId; uint32_t CorrelationId; uint64_t Offset; uint64_t RequestBytes; }; if (Payload.Size() <= sizeof(RequestHeader)) { m_ProjectStats.BadRequestCount++; HttpReq.WriteResponse(HttpResponseCode::BadRequest); } RequestHeader RequestHdr; Reader.Read(&RequestHdr, sizeof RequestHdr); if (RequestHdr.Magic != RequestHeader::kMagic) { m_ProjectStats.BadRequestCount++; HttpReq.WriteResponse(HttpResponseCode::BadRequest); } std::vector RequestedChunks; RequestedChunks.resize(RequestHdr.ChunkCount); Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); // Make Response struct ResponseHeader { uint32_t Magic = 0xbada'b00f; uint32_t ChunkCount; uint32_t Reserved1 = 0; uint32_t Reserved2 = 0; }; struct ResponseChunkEntry { uint32_t CorrelationId; uint32_t Flags = 0; uint64_t ChunkSize; }; std::vector OutBlobs; OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry)); for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) { const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; IoBuffer FoundChunk = FoundLog->FindChunk(Project->RootDir, RequestedChunk.ChunkId, nullptr); if (FoundChunk) { if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) { uint64_t Offset = RequestedChunk.Offset; if (Offset > FoundChunk.Size()) { Offset = FoundChunk.Size(); } uint64_t Size = RequestedChunk.RequestBytes; if ((Offset + Size) > FoundChunk.Size()) { Size = FoundChunk.Size() - Offset; } FoundChunk = IoBuffer(FoundChunk, Offset, Size); } } OutBlobs.emplace_back(std::move(FoundChunk)); } uint8_t* ResponsePtr = reinterpret_cast(OutBlobs[0].MutableData()); ResponseHeader ResponseHdr; ResponseHdr.ChunkCount = RequestHdr.ChunkCount; memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); ResponsePtr += sizeof(ResponseHdr); for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) { const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]); ResponseChunkEntry ResponseChunk; ResponseChunk.CorrelationId = RequestedChunk.CorrelationId; if (FoundChunk) { ResponseChunk.ChunkSize = FoundChunk.Size(); m_ProjectStats.ChunkHitCount++; } else { ResponseChunk.ChunkSize = uint64_t(-1); m_ProjectStats.ChunkMissCount++; } memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); ResponsePtr += sizeof(ResponseChunk); } std::erase_if(OutBlobs, [](IoBuffer Buffer) -> bool { return !Buffer; }); return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); } void HttpProjectService::HandleFilesRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::Files"); using namespace std::literals; HttpServerRequest& HttpReq = Req.ServerRequest(); // File manifest fetch, returns the client file list const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); std::unordered_set WantedFieldNames; if (auto FieldFilter = HttpServerRequest::Decode(Params.GetValue("fieldnames")); !FieldFilter.empty()) { if (FieldFilter != "*") // Get all - empty FieldFilter equal getting all fields { ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) { WantedFieldNames.insert(std::string(FieldName)); return true; }); } } else { const bool FilterClient = Params.GetValue("filter"sv) == "client"sv; WantedFieldNames.insert("id"); WantedFieldNames.insert("clientpath"); if (!FilterClient) { WantedFieldNames.insert("serverpath"); } } Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Project files request for unknown project '{}'", ProjectId)); } Project->TouchProject(); Ref FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Project files for unknown oplog '{}/{}'", ProjectId, OplogId)); } Project->TouchOplog(OplogId); CbObject ResponsePayload = ProjectStore::GetProjectFiles(Log(), *Project, *FoundLog, WantedFieldNames); if (HttpReq.AcceptContentType() == HttpContentType::kCompressedBinary) { CompositeBuffer Payload = CompressedBuffer::Compress(ResponsePayload.GetBuffer()).GetCompressed(); return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, Payload); } else { return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); } } void HttpProjectService::HandleChunkInfosRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::ChunkInfos"); HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); std::unordered_set WantedFieldNames; if (auto FieldFilter = HttpServerRequest::Decode(Params.GetValue("fieldnames")); !FieldFilter.empty()) { if (FieldFilter != "*") // Get all - empty FieldFilter equal getting all fields { ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) { WantedFieldNames.insert(std::string(FieldName)); return true; }); } } else { WantedFieldNames.insert("id"); WantedFieldNames.insert("rawhash"); WantedFieldNames.insert("rawsize"); } Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Chunk infos request for unknown project '{}'", ProjectId)); } Project->TouchProject(); Ref FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Chunk infos for unknown oplog '{}/{}'", ProjectId, OplogId)); } Project->TouchOplog(OplogId); CbObject ResponsePayload = ProjectStore::GetProjectChunkInfos(Log(), *Project, *FoundLog, WantedFieldNames); if (HttpReq.AcceptContentType() == HttpContentType::kCompressedBinary) { CompositeBuffer Payload = CompressedBuffer::Compress(ResponsePayload.GetBuffer()).GetCompressed(); return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, Payload); } else { return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); } } void HttpProjectService::HandleChunkInfoRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::ChunkInfo"); HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); const auto& ChunkId = Req.GetCapture(3); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Chunk info request for unknown project '{}'", ProjectId)); } Project->TouchProject(); Ref FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Chunk info for unknown oplog '{}/{}'", ProjectId, OplogId)); } Project->TouchOplog(OplogId); if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)); } const Oid Obj = Oid::FromHexString(ChunkId); CbObject ResponsePayload = ProjectStore::GetChunkInfo(Log(), *Project, *FoundLog, Obj); if (ResponsePayload) { m_ProjectStats.ChunkHitCount++; return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); } else { m_ProjectStats.ChunkMissCount++; ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Chunk info for unknown chunk '{}/{}/{}'", ProjectId, OplogId, ChunkId)); } } void HttpProjectService::HandleChunkByIdRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::ChunkById"); HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); const auto& ChunkId = Req.GetCapture(3); uint64_t Offset = 0; uint64_t Size = ~(0ull); auto QueryParms = HttpReq.GetQueryParams(); if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false) { if (auto OffsetVal = ParseInt(OffsetParm)) { Offset = OffsetVal.value(); } else { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } } if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false) { if (auto SizeVal = ParseInt(SizeParm)) { Size = SizeVal.value(); } else { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } } Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Chunk request for unknown project '{}'", ProjectId)); } Project->TouchProject(); Ref FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)); } Project->TouchOplog(OplogId); if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, ChunkId)); } const Oid Obj = Oid::FromHexString(ChunkId); HttpContentType AcceptType = HttpReq.AcceptContentType(); ProjectStore::GetChunkRangeResult Result = ProjectStore::GetChunkRange(Log(), *Project, *FoundLog, Obj, Offset, Size, AcceptType, /*OptionalInOutModificationTag*/ nullptr); switch (Result.Error) { case ProjectStore::GetChunkRangeResult::EError::Ok: m_ProjectStats.ChunkHitCount++; ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Result.ContentType)); return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ContentType, Result.Chunk); case ProjectStore::GetChunkRangeResult::EError::NotFound: m_ProjectStats.ChunkMissCount++; ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); return HttpReq.WriteResponse(HttpResponseCode::NotFound, Result.ContentType, Result.Chunk); case ProjectStore::GetChunkRangeResult::EError::MalformedContent: return HttpReq.WriteResponse( HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Get chunk {}/{}/{} failed. Reason: {}", ProjectId, OplogId, ChunkId, Result.ErrorDescription)); case ProjectStore::GetChunkRangeResult::EError::OutOfRange: m_ProjectStats.ChunkMissCount++; ZEN_DEBUG("chunk - '{}/{}/{}' OUT OF RANGE", ProjectId, OplogId, ChunkId); return HttpReq.WriteResponse( HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Get chunk {}/{}/{} failed. Reason: {}", ProjectId, OplogId, ChunkId, Result.ErrorDescription)); default: ZEN_ASSERT(false); break; } } void HttpProjectService::HandleChunkByCidRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::ChunkByCid"); HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); const auto& Cid = Req.GetCapture(3); HttpContentType AcceptType = HttpReq.AcceptContentType(); HttpContentType RequestType = HttpReq.RequestContentType(); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Chunk request for unknown project '{}'", ProjectId)); } Project->TouchProject(); Ref FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)); } Project->TouchOplog(OplogId); if (Cid.length() != IoHash::StringLength) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, Cid)); } const IoHash Hash = IoHash::FromHexString(Cid); switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { IoBuffer Value = m_ProjectStore->GetChunk(*Project, *FoundLog, Hash); if (Value) { if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kJSON || AcceptType == ZenContentType::kYAML || AcceptType == ZenContentType::kCbObject) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Value)); IoBuffer DecompressedBuffer = Compressed.Decompress().AsIoBuffer(); if (DecompressedBuffer) { if (AcceptType == ZenContentType::kJSON || AcceptType == ZenContentType::kYAML || AcceptType == ZenContentType::kCbObject) { CbValidateError CbErr = ValidateCompactBinary(DecompressedBuffer.GetView(), CbValidateMode::Default); if (!!CbErr) { m_ProjectStats.BadRequestCount++; ZEN_DEBUG( "chunk - '{}/{}/{}' WRONGTYPE. Reason: `Requested {} format, but could not convert to object`", ProjectId, OplogId, Cid, ToString(AcceptType)); return HttpReq.WriteResponse( HttpResponseCode::NotAcceptable, HttpContentType::kText, fmt::format("Content format not supported, requested {} format, but could not convert to object", ToString(AcceptType))); } m_ProjectStats.ChunkHitCount++; CbObject ContainerObject = LoadCompactBinaryObject(DecompressedBuffer); return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerObject); } else { Value = DecompressedBuffer; Value.SetContentType(ZenContentType::kBinary); } } else { m_ProjectStats.BadRequestCount++; ZEN_DEBUG("chunk - '{}/{}/{}' WRONGTYPE. Reason: `Requested {} format, but could not decompress stored data`", ProjectId, OplogId, Cid, ToString(AcceptType)); return HttpReq.WriteResponse( HttpResponseCode::NotAcceptable, HttpContentType::kText, fmt::format("Content format not supported, requested {} format, but could not decompress stored data", ToString(AcceptType))); } } m_ProjectStats.ChunkHitCount++; return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value); } else { m_ProjectStats.ChunkMissCount++; ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid); return HttpReq.WriteResponse(HttpResponseCode::NotFound); } } case HttpVerb::kPost: { if (!m_ProjectStore->AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } if (RequestType != HttpContentType::kCompressedBinary) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Chunk request for chunk id '{}/{}'/'{}' as unexpected content type: '{}'", ProjectId, OplogId, Cid, ToString(RequestType))); } IoBuffer Payload = HttpReq.ReadPayload(); Payload.SetContentType(RequestType); bool IsNew = m_ProjectStore->PutChunk(*Project, *FoundLog, Hash, std::move(Payload)); m_ProjectStats.ChunkWriteCount++; return HttpReq.WriteResponse(IsNew ? HttpResponseCode::Created : HttpResponseCode::OK); } break; } } void HttpProjectService::HandleOplogOpPrepRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::OplogOpPrep"); using namespace std::literals; HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } Project->TouchProject(); Ref FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } Project->TouchOplog(OplogId); // This operation takes a list of referenced hashes and decides which // chunks are not present on this server. This list is then returned in // the "need" list in the response CbValidateError ValidateResult; if (CbObject RequestObject = ValidateAndReadCompactBinaryObject(HttpReq.ReadPayload(), ValidateResult); ValidateResult == CbValidateError::None) { std::vector NeedList; { eastl::fixed_vector ChunkList; CbArrayView HaveList = RequestObject["have"sv].AsArrayView(); ChunkList.reserve(HaveList.Num()); for (auto& Entry : HaveList) { ChunkList.push_back(Entry.AsHash()); } NeedList = FoundLog->CheckPendingChunkReferences(std::span(begin(ChunkList), end(ChunkList)), std::chrono::minutes(2)); } CbObjectWriter Cbo(1 + 1 + 5 + NeedList.size() * (1 + sizeof(IoHash::Hash)) + 1); Cbo.BeginArray("need"); { for (const IoHash& Hash : NeedList) { ZEN_DEBUG("prep - NEED: {}", Hash); Cbo << Hash; } } Cbo.EndArray(); CbObject Response = Cbo.Save(); return HttpReq.WriteResponse(HttpResponseCode::OK, Response); } else { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Invalid compact binary format: '{}'", ToString(ValidateResult))); } } void HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::OplogOpNew"); using namespace std::literals; HttpServerRequest& HttpReq = Req.ServerRequest(); if (!m_ProjectStore->AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); bool IsUsingSalt = false; IoHash SaltHash = IoHash::Zero; if (std::string_view SaltParam = Params.GetValue("salt"sv); SaltParam.empty() == false) { const uint32_t Salt = std::stoi(std::string(SaltParam)); SaltHash = IoHash::HashBuffer(&Salt, sizeof Salt); IsUsingSalt = true; } Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } Project->TouchProject(); Ref FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } Project->TouchOplog(OplogId); ProjectStore::Oplog& Oplog = *FoundLog; IoBuffer Payload = HttpReq.ReadPayload(); // This will attempt to open files which may not exist for the case where // the prep step rejected the chunk. This should be fixed since there's // a performance cost associated with any file system activity bool IsValid = true; std::vector MissingChunks; CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer { if (m_CidStore.ContainsChunk(Hash)) { // Return null attachment as we already have it, no point in reading it and storing it again return {}; } IoHash AttachmentId; if (IsUsingSalt) { IoHash AttachmentSpec[]{SaltHash, Hash}; AttachmentId = IoHash::HashBuffer(MakeMemoryView(AttachmentSpec)); } else { AttachmentId = Hash; } std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString(); if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) { Data.SetDeleteOnClose(true); return SharedBuffer(std::move(Data)); } else { IsValid = false; MissingChunks.push_back(Hash); return {}; } }; CbPackage Package; if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver)) { CbValidateError ValidateResult; if (CbObject Core = ValidateAndReadCompactBinaryObject(IoBuffer(Payload), ValidateResult); ValidateResult == CbValidateError::None && Core) { Package.SetObject(Core); } else { std::filesystem::path BadPackagePath = Oplog.TempPath() / "bad_packages"sv / fmt::format("session{}_request{}"sv, HttpReq.SessionId(), HttpReq.RequestId()); ZEN_WARN("Received malformed package ('{}')! Saving payload to '{}'", ToString(ValidateResult), BadPackagePath); WriteFile(BadPackagePath, Payload); m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, u8"request body must be a compact binary object or package in legacy format"); } } m_ProjectStats.ChunkMissCount += MissingChunks.size(); if (!IsValid) { ExtendableStringBuilder<256> ResponseText; ResponseText.Append("Missing chunk references: "); bool IsFirst = true; for (const auto& Hash : MissingChunks) { if (IsFirst) { IsFirst = false; } else { ResponseText.Append(", "); } Hash.ToHexString(ResponseText); } return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, ResponseText); } CbObject Core = Package.GetObject(); if (!Core["key"sv]) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); } eastl::fixed_vector ReferencedChunks; Core.IterateAttachments([&ReferencedChunks](CbFieldView View) { ReferencedChunks.push_back(View.AsAttachment()); }); // Write core to oplog size_t AttachmentCount = Package.GetAttachments().size(); const ProjectStore::LogSequenceNumber OpLsn = Oplog.AppendNewOplogEntry(Package); if (!OpLsn) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } m_ProjectStats.ChunkWriteCount += AttachmentCount; // Once we stored the op, we no longer need to retain any chunks this op references if (!ReferencedChunks.empty()) { FoundLog->RemovePendingChunkReferences(std::span(begin(ReferencedChunks), end(ReferencedChunks))); } m_ProjectStats.OpWriteCount++; ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn.Number, NiceBytes(Payload.Size()), Core["key"sv].AsString()); HttpReq.WriteResponse(HttpResponseCode::Created); } void HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::OplogOpValidate"); using namespace std::literals; HttpServerRequest& HttpReq = Req.ServerRequest(); if (!m_ProjectStore->AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, ZenContentType::kText, fmt::format("Project '{}' not found", ProjectId)); } Project->TouchProject(); Ref FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, ZenContentType::kText, fmt::format("Oplog '{}' not found in project '{}'", OplogId, ProjectId)); } Project->TouchOplog(OplogId); ProjectStore::Oplog& Oplog = *FoundLog; std::atomic_bool CancelFlag = false; ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(Project->RootDir, CancelFlag, &GetSmallWorkerPool(EWorkloadType::Burst)); tsl::robin_map KeyNameLookup; KeyNameLookup.reserve(Result.OpKeys.size()); for (const auto& It : Result.OpKeys) { KeyNameLookup.insert_or_assign(It.first, It.second); } CbObjectWriter Writer; Writer << "HasMissingData" << !Result.IsEmpty(); Writer << "OpCount" << Result.OpCount; Writer << "LSNLow" << Result.LSNLow.Number; Writer << "LSNHigh" << Result.LSNHigh.Number; if (!Result.MissingFiles.empty()) { Writer.BeginArray("MissingFiles"); for (const auto& MissingFile : Result.MissingFiles) { Writer.BeginObject(); { Writer << "Key" << MissingFile.first; Writer << "KeyName" << KeyNameLookup[MissingFile.first]; Writer << "Id" << MissingFile.second.Id; Writer << "Hash" << MissingFile.second.Hash; Writer << "ServerPath" << MissingFile.second.ServerPath; Writer << "ClientPath" << MissingFile.second.ClientPath; } Writer.EndObject(); } Writer.EndArray(); } if (!Result.MissingChunks.empty()) { Writer.BeginArray("MissingChunks"); for (const auto& MissingChunk : Result.MissingChunks) { Writer.BeginObject(); { Writer << "Key" << MissingChunk.first; Writer << "KeyName" << KeyNameLookup[MissingChunk.first]; Writer << "Id" << MissingChunk.second.Id; Writer << "Hash" << MissingChunk.second.Hash; } Writer.EndObject(); } Writer.EndArray(); } if (!Result.MissingMetas.empty()) { Writer.BeginArray("MissingMetas"); for (const auto& MissingMeta : Result.MissingMetas) { Writer.BeginObject(); { Writer << "Key" << MissingMeta.first; Writer << "KeyName" << KeyNameLookup[MissingMeta.first]; Writer << "Id" << MissingMeta.second.Id; Writer << "Hash" << MissingMeta.second.Hash; } Writer.EndObject(); } Writer.EndArray(); } if (!Result.MissingAttachments.empty()) { Writer.BeginArray("MissingAttachments"); for (const auto& MissingMeta : Result.MissingAttachments) { Writer.BeginObject(); { Writer << "Key" << MissingMeta.first; Writer << "KeyName" << KeyNameLookup[MissingMeta.first]; Writer << "Hash" << MissingMeta.second; } Writer.EndObject(); } Writer.EndArray(); } CbObject Response = Writer.Save(); HttpReq.WriteResponse(HttpResponseCode::OK, Response); } void HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::OplogOp"); HttpServerRequest& HttpReq = Req.ServerRequest(); const std::string_view ProjectId = Req.GetCapture(1); const std::string_view OplogId = Req.GetCapture(2); const std::string_view OpIdString = Req.GetCapture(3); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } Project->TouchProject(); Ref FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } Project->TouchOplog(OplogId); ProjectStore::Oplog& Oplog = *FoundLog; if (const std::optional OpId = ParseInt(OpIdString)) { if (std::optional MaybeOp = Oplog.GetOpByIndex(ProjectStore::LogSequenceNumber(OpId.value()))) { CbObject& Op = MaybeOp.value(); if (HttpReq.AcceptContentType() == ZenContentType::kCbPackage) { CbPackage Package; Package.SetObject(Op); Op.IterateAttachments([&](CbFieldView FieldView) { const IoHash AttachmentHash = FieldView.AsAttachment(); IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); if (Payload) { switch (Payload.GetContentType()) { case ZenContentType::kCbObject: { CbValidateError ValidateResult; if (CbObject Object = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult); ValidateResult == CbValidateError::None && Object) { Package.AddAttachment(CbAttachment(Object)); } else { // Error - malformed object ZEN_WARN("malformed object returned for {} ('{}')", AttachmentHash, ToString(ValidateResult)); } } break; case ZenContentType::kCompressedBinary: if (CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload))) { Package.AddAttachment(CbAttachment(Compressed, AttachmentHash)); } else { // Error - not compressed! ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash); } break; default: Package.AddAttachment(CbAttachment(SharedBuffer(Payload))); break; } } }); m_ProjectStats.OpHitCount++; return HttpReq.WriteResponse(HttpResponseCode::Accepted, Package); } else { // Client cannot accept a package, so we only send the core object m_ProjectStats.OpHitCount++; return HttpReq.WriteResponse(HttpResponseCode::Accepted, Op); } } } m_ProjectStats.OpMissCount++; return HttpReq.WriteResponse(HttpResponseCode::NotFound); } void HttpProjectService::HandleOpLogRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::Oplog"); HttpServerRequest& HttpReq = Req.ServerRequest(); using namespace std::literals; const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("project {} not found", ProjectId)); } Project->TouchProject(); switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { Ref OplogIt = Project->ReadOplog(OplogId); if (!OplogIt) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("oplog {} not found in project {}", OplogId, ProjectId)); } ProjectStore::Oplog& Log = *OplogIt; CbObjectWriter Cb; Cb << "id"sv << Log.OplogId() << "project"sv << Project->Identifier << "tempdir"sv << Log.TempPath().c_str() << "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount" << Log.OplogCount() << "expired"sv << Project->IsExpired(GcClock::TimePoint::min(), Log); HttpReq.WriteResponse(HttpResponseCode::OK, Cb.Save()); m_ProjectStats.OpLogReadCount++; } break; case HttpVerb::kPost: { if (!m_ProjectStore->AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } std::filesystem::path OplogMarkerPath; if (CbObject Params = HttpReq.ReadPayloadObject()) { OplogMarkerPath = Params["gcpath"sv].AsString(); } Ref OplogIt = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); if (!OplogIt) { if (!Project->NewOplog(OplogId, OplogMarkerPath)) { // TODO: indicate why the operation failed! return HttpReq.WriteResponse(HttpResponseCode::InternalServerError); } Project->TouchOplog(OplogId); m_ProjectStats.OpLogWriteCount++; ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); return HttpReq.WriteResponse(HttpResponseCode::Created); } // I guess this should ultimately be used to execute RPCs but for now, it // does absolutely nothing m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } break; case HttpVerb::kPut: { if (!m_ProjectStore->AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } std::filesystem::path OplogMarkerPath; if (CbObject Params = HttpReq.ReadPayloadObject()) { OplogMarkerPath = Params["gcpath"sv].AsString(); } Ref FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); if (!FoundLog) { if (!Project->NewOplog(OplogId, OplogMarkerPath)) { // TODO: indicate why the operation failed! return HttpReq.WriteResponse(HttpResponseCode::InternalServerError); } Project->TouchOplog(OplogId); m_ProjectStats.OpLogWriteCount++; ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); return HttpReq.WriteResponse(HttpResponseCode::Created); } Project->TouchOplog(OplogId); FoundLog->Update(OplogMarkerPath); m_ProjectStats.OpLogWriteCount++; ZEN_INFO("updated oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); return HttpReq.WriteResponse(HttpResponseCode::OK); } break; case HttpVerb::kDelete: { ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId); if (Project->DeleteOplog(OplogId)) { m_ProjectStats.OpLogDeleteCount++; return HttpReq.WriteResponse(HttpResponseCode::OK); } else { return HttpReq.WriteResponse(HttpResponseCode::Locked, HttpContentType::kText, fmt::format("oplog {}/{} is in use", ProjectId, OplogId)); } } break; default: break; } } std::optional LoadReferencedSet(ProjectStore::Project& Project, ProjectStore::Oplog& Log) { using namespace std::literals; Oid ReferencedSetOplogId = OpKeyStringAsOid(OplogReferencedSet::ReferencedSetOplogKey); std::optional ReferencedSetOp = Log.GetOpByKey(ReferencedSetOplogId); if (!ReferencedSetOp) { return std::optional(); } // We expect only a single file in the "files" array; get the chunk for the first file CbFieldView FileField = *(*ReferencedSetOp)["files"sv].AsArrayView().CreateViewIterator(); Oid ChunkId = FileField.AsObjectView()["id"sv].AsObjectId(); if (ChunkId == Oid::Zero) { return std::optional(); } return OplogReferencedSet::LoadFromChunk(Log.FindChunk(Project.RootDir, ChunkId, nullptr)); } void HttpProjectService::HandleOpLogEntriesRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::OplogEntries"); using namespace std::literals; HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } Project->TouchProject(); Ref FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } Project->TouchOplog(OplogId); CbObjectWriter Response; if (FoundLog->OplogCount() > 0) { std::unordered_set FieldNamesFilter; auto FilterObject = [&FieldNamesFilter](CbObjectView& Object) -> CbObject { CbObject RewrittenOp = RewriteCbObject(Object, [&FieldNamesFilter](CbObjectWriter&, CbFieldView Field) -> bool { if (FieldNamesFilter.contains(std::string(Field.GetName()))) { return false; } return true; }); return RewrittenOp; }; HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); if (auto FieldFilter = HttpServerRequest::Decode(Params.GetValue("fieldfilter")); !FieldFilter.empty()) { ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) { FieldNamesFilter.insert(std::string(FieldName)); return true; }); } if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty()) { Oid OpKeyId = OpKeyStringAsOid(OpKey); std::optional Op = FoundLog->GetOpByKey(OpKeyId); if (Op.has_value()) { if (FieldNamesFilter.empty()) { Response << "entry"sv << Op.value(); } else { Response << "entry"sv << FilterObject(Op.value()); } } else { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } } else { ProjectStore::Oplog::Paging EntryPaging; if (std::string_view Param = Params.GetValue("start"); !Param.empty()) { if (auto Value = ParseInt(Param)) { EntryPaging.Start = *Value; } } if (std::string_view Param = Params.GetValue("count"); !Param.empty()) { if (auto Value = ParseInt(Param)) { EntryPaging.Count = *Value; } } std::optional MaybeReferencedSet; if (auto TrimString = Params.GetValue("trim_by_referencedset"); TrimString == "true") { MaybeReferencedSet = LoadReferencedSet(*Project, *FoundLog); } Response.BeginArray("entries"sv); bool ShouldFilterFields = !FieldNamesFilter.empty(); if (MaybeReferencedSet) { const OplogReferencedSet& ReferencedSet = MaybeReferencedSet.value(); FoundLog->IterateOplogWithKey( [this, &Response, &FilterObject, ShouldFilterFields, &ReferencedSet](ProjectStore::LogSequenceNumber /* LSN */, const Oid& Key, CbObjectView Op) { if (!ReferencedSet.Contains(Key)) { if (!OplogReferencedSet::IsNonPackage(Op["key"].AsString())) { return; } } if (ShouldFilterFields) { Response << FilterObject(Op); } else { Response << Op; } }, EntryPaging); } else { FoundLog->IterateOplog( [this, &Response, &FilterObject, ShouldFilterFields](CbObjectView Op) { if (ShouldFilterFields) { Response << FilterObject(Op); } else { Response << Op; } }, EntryPaging); } Response.EndArray(); } } if (HttpReq.AcceptContentType() == HttpContentType::kCompressedBinary) { CompositeBuffer Payload = CompressedBuffer::Compress(Response.Save().GetBuffer()).GetCompressed(); return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCompressedBinary, Payload); } else { return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); } } void HttpProjectService::HandleProjectRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::Project"); using namespace std::literals; HttpServerRequest& HttpReq = Req.ServerRequest(); const std::string_view ProjectId = Req.GetCapture(1); switch (HttpReq.RequestVerb()) { case HttpVerb::kPost: { if (!m_ProjectStore->AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } CbValidateError ValidateResult; if (CbObject Params = ValidateAndReadCompactBinaryObject(HttpReq.ReadPayload(), ValidateResult); ValidateResult == CbValidateError::None) { std::filesystem::path Root = Params["root"sv].AsU8String(); // Workspace root (i.e `D:/UE5/`) std::filesystem::path EngineRoot = Params["engine"sv].AsU8String(); // Engine root (i.e `D:/UE5/Engine`) std::filesystem::path ProjectRoot = Params["project"sv].AsU8String(); // Project root directory (i.e `D:/UE5/Samples/Games/Lyra`) std::filesystem::path ProjectFilePath = Params["projectfile"sv].AsU8String(); // Project file path (i.e `D:/UE5/Samples/Games/Lyra/Lyra.uproject`) const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId; m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath); ZEN_INFO("established project (id: '{}', roots: '{}', '{}', '{}', '{}'{})", ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath, ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); m_ProjectStats.ProjectWriteCount++; HttpReq.WriteResponse(HttpResponseCode::Created); } else { HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Malformed compact binary object: '{}'", ToString(ValidateResult))); } } break; case HttpVerb::kPut: { if (!m_ProjectStore->AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } CbValidateError ValidateResult; if (CbObject Params = ValidateAndReadCompactBinaryObject(HttpReq.ReadPayload(), ValidateResult); ValidateResult == CbValidateError::None) { std::filesystem::path Root = Params["root"sv].AsU8String(); // Workspace root (i.e `D:/UE5/`) std::filesystem::path EngineRoot = Params["engine"sv].AsU8String(); // Engine root (i.e `D:/UE5/Engine`) std::filesystem::path ProjectRoot = Params["project"sv].AsU8String(); // Project root directory (i.e `D:/UE5/Samples/Games/Lyra`) std::filesystem::path ProjectFilePath = Params["projectfile"sv].AsU8String(); // Project file path (i.e `D:/UE5/Samples/Games/Lyra/Lyra.uproject`) if (m_ProjectStore->UpdateProject(ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath)) { m_ProjectStats.ProjectWriteCount++; ZEN_INFO("updated project (id: '{}', roots: '{}', '{}', '{}', '{}'{})", ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath, ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); HttpReq.WriteResponse(HttpResponseCode::OK); } else { const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId; m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath); m_ProjectStats.ProjectWriteCount++; ZEN_INFO("established project (id: '{}', roots: '{}', '{}', '{}', '{}'{})", ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath, ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); HttpReq.WriteResponse(HttpResponseCode::Created); } } else { HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Malformed compact binary object: '{}'", ToString(ValidateResult))); } } break; case HttpVerb::kGet: { Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("project {} not found", ProjectId)); } Project->TouchProject(); std::vector OpLogs = Project->ScanForOplogs(); CbObjectWriter Response; Response << "id"sv << Project->Identifier; Response << "root"sv << PathToUtf8(Project->RootDir); Response << "engine"sv << PathToUtf8(Project->EngineRootDir); Response << "project"sv << PathToUtf8(Project->ProjectRootDir); Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath); Response.BeginArray("oplogs"sv); for (const std::string& OplogId : OpLogs) { Response.BeginObject(); Response << "id"sv << OplogId; Response.EndObject(); } Response.EndArray(); // oplogs HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); m_ProjectStats.ProjectReadCount++; } break; case HttpVerb::kDelete: { Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("project {} not found", ProjectId)); } ZEN_INFO("deleting project '{}'", ProjectId); if (!m_ProjectStore->DeleteProject(ProjectId)) { return HttpReq.WriteResponse(HttpResponseCode::Locked, HttpContentType::kText, fmt::format("project {} is in use", ProjectId)); } m_ProjectStats.ProjectDeleteCount++; return HttpReq.WriteResponse(HttpResponseCode::NoContent); } break; default: break; } } void HttpProjectService::HandleOplogSaveRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::OplogSave"); HttpServerRequest& HttpReq = Req.ServerRequest(); if (!m_ProjectStore->AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); if (HttpReq.RequestContentType() != HttpContentType::kCbObject) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type"); } IoBuffer Payload = HttpReq.ReadPayload(); Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Write oplog request for unknown project '{}'", ProjectId)); } Project->TouchProject(); Ref Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); if (!Oplog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); } Project->TouchOplog(OplogId); CbValidateError ValidateResult; if (CbObject ContainerObject = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult); ValidateResult == CbValidateError::None && ContainerObject) { RwLock AttachmentsLock; tsl::robin_set Attachments; auto HasAttachment = [this](const IoHash& RawHash) { return m_CidStore.ContainsChunk(RawHash); }; auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector&& ChunkHashes) { RwLock::ExclusiveLockScope _(AttachmentsLock); if (BlockHash != IoHash::Zero) { Attachments.insert(BlockHash); } else { Attachments.insert(ChunkHashes.begin(), ChunkHashes.end()); } }; auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) { RwLock::ExclusiveLockScope _(AttachmentsLock); Attachments.insert(RawHash); }; auto OnChunkedAttachment = [](const ChunkedInfo&) {}; auto OnReferencedAttachments = [&Oplog](std::span RawHashes) { Oplog->CaptureAddedAttachments(RawHashes); }; // Make sure we retain any attachments we download before writing the oplog Oplog->EnableUpdateCapture(); auto _ = MakeGuard([&Oplog]() { Oplog->DisableUpdateCapture(); }); RemoteProjectStore::Result Result = SaveOplogContainer(*Oplog, ContainerObject, OnReferencedAttachments, HasAttachment, OnNeedBlock, OnNeedAttachment, OnChunkedAttachment, nullptr); if (Result.ErrorCode == 0) { if (Attachments.empty()) { HttpReq.WriteResponse(HttpResponseCode::OK); } else { CbObjectWriter Cbo(1 + 1 + 5 + Attachments.size() * (1 + sizeof(IoHash::Hash)) + 1); Cbo.BeginArray("need"); { for (const IoHash& Hash : Attachments) { ZEN_DEBUG("Need attachment {}", Hash); Cbo << Hash; } } Cbo.EndArray(); // "need" CbObject ResponsePayload = Cbo.Save(); return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); } } else { ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", ToString(HttpReq.RequestVerb()), HttpReq.QueryString(), Result.ErrorCode, Result.Reason); if (Result.Reason.empty()) { return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode)); } else { return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.Reason); } } } else { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Invalid payload: '{}'", ToString(ValidateResult))); } } void HttpProjectService::HandleOplogLoadRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::OplogLoad"); HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); const HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); if (HttpReq.AcceptContentType() != HttpContentType::kCbObject) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type"); } Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Read oplog request for unknown project '{}'", ProjectId)); } Project->TouchProject(); Ref Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ true); if (!Oplog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); } Project->TouchOplog(OplogId); size_t MaxBlockSize = RemoteStoreOptions::DefaultMaxBlockSize; if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false) { if (auto Value = ParseInt(Param)) { MaxBlockSize = Value.value(); } } size_t MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize; if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false) { if (auto Value = ParseInt(Param)) { MaxChunkEmbedSize = Value.value(); } } size_t MaxChunksPerBlock = RemoteStoreOptions::DefaultMaxChunksPerBlock; if (auto Param = Params.GetValue("maxchunksperblock"); Param.empty() == false) { if (auto Value = ParseInt(Param)) { MaxChunksPerBlock = Value.value(); } } size_t ChunkFileSizeLimit = RemoteStoreOptions::DefaultChunkFileSizeLimit; if (auto Param = Params.GetValue("chunkfilesizelimit"); Param.empty() == false) { if (auto Value = ParseInt(Param)) { ChunkFileSizeLimit = Value.value(); } } WorkerThreadPool& WorkerPool = GetLargeWorkerPool(EWorkloadType::Background); RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( m_CidStore, *Project, *Oplog, WorkerPool, MaxBlockSize, MaxChunkEmbedSize, MaxChunksPerBlock, ChunkFileSizeLimit, /* BuildBlocks */ false, /* IgnoreMissingAttachments */ false, /* AllowChunking*/ false, [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector>&&) {}, /* EmbedLooseFiles*/ false); if (ContainerResult.ErrorCode == 0) { return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerResult.ContainerObject); } else { ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", ToString(HttpReq.RequestVerb()), HttpReq.QueryString(), ContainerResult.ErrorCode, ContainerResult.Reason); if (ContainerResult.Reason.empty()) { return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode)); } else { return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode), HttpContentType::kText, ContainerResult.Reason); } } } void HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::Rpc"); using namespace std::literals; HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); IoBuffer Payload = HttpReq.ReadPayload(); HttpContentType PayloadContentType = HttpReq.RequestContentType(); CbPackage Package; CbObject Cb; switch (PayloadContentType) { case HttpContentType::kJSON: case HttpContentType::kUnknownContentType: case HttpContentType::kText: { std::string JsonText(reinterpret_cast(Payload.GetData()), Payload.GetSize()); Cb = LoadCompactBinaryFromJson(JsonText).AsObject(); if (!Cb) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content format not supported, expected JSON format"); } } break; case HttpContentType::kCbObject: { CbValidateError ValidateResult; if (Cb = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult); ValidateResult != CbValidateError::None || !Cb) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse( HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Content format not supported, expected compact binary format ('{}')", ToString(ValidateResult))); } break; } case HttpContentType::kCbPackage: try { Package = ParsePackageMessage(Payload); Cb = Package.GetObject(); } catch (const std::invalid_argument& ex) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Failed to parse package request, reason: '{}'", ex.what())); } if (!Cb) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content format not supported, expected package message format"); } break; default: m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type"); } Ref Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Rpc oplog request for unknown project '{}'", ProjectId)); } Project->TouchProject(); std::string_view Method = Cb["method"sv].AsString(); bool VerifyPathOnDisk = Method != "getchunks"sv; Ref Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk); if (!Oplog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); } Project->TouchOplog(OplogId); uint32_t MethodHash = HashStringDjb2(Method); switch (MethodHash) { case HashStringDjb2("import"sv): { if (!m_ProjectStore->AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } CbObjectView Params = Cb["params"sv].AsObjectView(); size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); bool Force = Params["force"sv].AsBool(false); bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); bool CleanOplog = Params["clean"].AsBool(false); bool BoostWorkerCount = Params["boostworkercount"].AsBool(false); bool BoostWorkerMemory = Params["boostworkermemory"sv].AsBool(false); CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Log(), Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, GetMaxMemoryBufferSize(MaxBlockSize, BoostWorkerMemory), Oplog->TempPath()); if (RemoteStoreResult.Store == nullptr) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description); } std::shared_ptr RemoteStore = std::move(RemoteStoreResult.Store); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); JobId JobId = m_JobQueue.QueueJob( fmt::format("Import oplog '{}/{}'", Project->Identifier, Oplog->OplogId()), [this, ChunkStore = &m_CidStore, ActualRemoteStore = std::move(RemoteStore), Oplog, Force, IgnoreMissingAttachments, CleanOplog, BoostWorkerCount](JobContext& Context) { Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}", Oplog->GetOuterProjectIdentifier(), Oplog->OplogId(), ActualRemoteStore->GetInfo().Description)); Ref Workers = GetThreadWorkers(BoostWorkerCount, /*SingleThreaded*/ false); WorkerThreadPool& WorkerPool = Workers->GetIOWorkerPool(); WorkerThreadPool& NetworkWorkerPool = Workers->GetNetworkPool(); Context.ReportMessage(fmt::format("{}", Workers->GetWorkersInfo())); RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *ActualRemoteStore, *Oplog, NetworkWorkerPool, WorkerPool, Force, IgnoreMissingAttachments, CleanOplog, &Context); auto Response = ConvertResult(Result); ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); if (!IsHttpSuccessCode(Response.first)) { throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, (int)Response.first); } }); return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id)); } case HashStringDjb2("export"sv): { CbObjectView Params = Cb["params"sv].AsObjectView(); size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); size_t MaxChunksPerBlock = Params["maxchunksperblock"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunksPerBlock); size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit); bool Force = Params["force"sv].AsBool(false); bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); bool BoostWorkerCount = Params["boostworkercount"].AsBool(false); bool BoostWorkerMemory = Params["boostworkermemory"sv].AsBool(false); CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Log(), Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, GetMaxMemoryBufferSize(MaxBlockSize, BoostWorkerMemory), Oplog->TempPath()); if (RemoteStoreResult.Store == nullptr) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description); } std::shared_ptr RemoteStore = std::move(RemoteStoreResult.Store); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); JobId JobId = m_JobQueue.QueueJob( fmt::format("Export oplog '{}/{}'", Project->Identifier, Oplog->OplogId()), [this, ActualRemoteStore = std::move(RemoteStore), Project, Oplog, MaxBlockSize, MaxChunksPerBlock, MaxChunkEmbedSize, ChunkFileSizeLimit, EmbedLooseFile, Force, IgnoreMissingAttachments, BoostWorkerCount](JobContext& Context) { Context.ReportMessage(fmt::format("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", Project->Identifier, Oplog->OplogId(), ActualRemoteStore->GetInfo().Description, NiceBytes(MaxBlockSize), NiceBytes(MaxChunkEmbedSize))); Ref Workers = GetThreadWorkers(BoostWorkerCount, /*SingleThreaded*/ false); Context.ReportMessage(fmt::format("{}", Workers->GetWorkersInfo())); WorkerThreadPool& WorkerPool = Workers->GetIOWorkerPool(); WorkerThreadPool& NetworkWorkerPool = Workers->GetNetworkPool(); RemoteProjectStore::Result Result = SaveOplog(m_CidStore, *ActualRemoteStore, *Project, *Oplog, NetworkWorkerPool, WorkerPool, MaxBlockSize, MaxChunksPerBlock, MaxChunkEmbedSize, ChunkFileSizeLimit, EmbedLooseFile, Force, IgnoreMissingAttachments, &Context); auto Response = ConvertResult(Result); ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); if (!IsHttpSuccessCode(Response.first)) { throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, (int)Response.first); } }); return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id)); } case HashStringDjb2("getchunks"sv): { RpcAcceptOptions AcceptFlags = static_cast(Cb["AcceptFlags"sv].AsUInt16(0u)); int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0); std::vector Requests = m_ProjectStore->ParseChunksRequests(*Project, *Oplog, Cb); std::vector Results = Requests.empty() ? std::vector{} : m_ProjectStore->GetChunks(*Project, *Oplog, Requests); CbPackage Response = m_ProjectStore->WriteChunksRequestResponse(*Project, *Oplog, std::move(Requests), std::move(Results)); void* TargetProcessHandle = nullptr; FormatFlags Flags = FormatFlags::kDefault; if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) { Flags |= FormatFlags::kAllowLocalReferences; if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) { Flags |= FormatFlags::kDenyPartialLocalReferences; } TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId); } CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(Response, Flags, TargetProcessHandle); return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } case HashStringDjb2("putchunks"sv): { ZEN_TRACE_CPU("Store::Rpc::putchunks"); if (!m_ProjectStore->AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } CbObject Object = Package.GetObject(); const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false); std::span Attachments = Package.GetAttachments(); if (!Attachments.empty()) { std::vector WriteAttachmentBuffers; std::vector WriteRawHashes; WriteAttachmentBuffers.reserve(Attachments.size()); WriteRawHashes.reserve(Attachments.size()); for (const CbAttachment& Attachment : Attachments) { IoHash RawHash = Attachment.GetHash(); const CompressedBuffer& Compressed = Attachment.AsCompressedBinary(); IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer(); if (UsingTempFiles) { AttachmentPayload.SetDeleteOnClose(true); } WriteAttachmentBuffers.push_back(std::move(AttachmentPayload)); WriteRawHashes.push_back(RawHash); } Oplog->CaptureAddedAttachments(WriteRawHashes); m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly); } return HttpReq.WriteResponse(HttpResponseCode::OK); } case HashStringDjb2("snapshot"sv): { ZEN_TRACE_CPU("Store::Rpc::snapshot"); if (!m_ProjectStore->AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } // Snapshot all referenced files. This brings the content of all // files into the CID store uint32_t OpCount = 0; uint64_t InlinedBytes = 0; uint64_t InlinedFiles = 0; uint64_t TotalBytes = 0; uint64_t TotalFiles = 0; std::vector NewOps; struct AddedChunk { IoBuffer Buffer; uint64_t RawSize = 0; }; tsl::robin_map AddedChunks; const std::filesystem::path CanonicalRoot = std::filesystem::canonical(Project->RootDir); Oplog->IterateOplog( [&](CbObjectView Op) { bool OpRewritten = false; bool AllOk = true; CbWriter FilesArrayWriter; FilesArrayWriter.BeginArray("files"sv); for (CbFieldView& Field : Op["files"sv]) { bool CopyField = true; if (CbObjectView View = Field.AsObjectView()) { const IoHash DataHash = View["data"sv].AsHash(); if (DataHash == IoHash::Zero) { std::string_view ServerPath = View["serverpath"sv].AsString(); if (CanonicalRoot.empty()) { ZEN_WARN("Attempting to load file '{}' from project with unset project root", ServerPath); AllOk = false; continue; } std::error_code Ec; const std::filesystem::path FilePath = std::filesystem::canonical(Project->RootDir / ServerPath, Ec); if (Ec) { ZEN_WARN("Failed to find file '{}' in project root '{}' for 'snapshot'. Reason: '{}'", ServerPath, Project->RootDir, Ec.message()); AllOk = false; continue; } if (std::mismatch(CanonicalRoot.begin(), CanonicalRoot.end(), FilePath.begin()).first != CanonicalRoot.end()) { ZEN_WARN("Unable to read file '{}' outside of project root '{}'", FilePath, CanonicalRoot); AllOk = false; continue; } BasicFile DataFile; DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); if (Ec) { // Error... ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message()); AllOk = false; } else { // Read file contents into memory, compress and keep in map of chunks to add to Cid store IoBuffer FileIoBuffer = DataFile.ReadAll(); CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); const uint64_t RawSize = Compressed.DecodeRawSize(); const IoHash RawHash = Compressed.DecodeRawHash(); if (!AddedChunks.contains(RawHash)) { const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString(); BasicFile ChunkTempFile; ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete); ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec); if (Ec) { Oid ChunkId = View["id"sv].AsObjectId(); ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}", FilePath, ChunkId, Ec.message()); AllOk = false; } else { void* FileHandle = ChunkTempFile.Detach(); IoBuffer ChunkBuffer(IoBuffer::File, FileHandle, 0, Compressed.GetCompressed().GetSize(), /*IsWholeFile*/ true); ChunkBuffer.SetDeleteOnClose(true); AddedChunks.insert_or_assign( RawHash, AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize}); } } TotalBytes += RawSize; ++TotalFiles; // Rewrite file array entry with new data reference CbObjectWriter Writer(View.GetSize()); RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { if (Field.GetName() == "data"sv) { // omit this field as we will write it explicitly ourselves return true; } return false; }); Writer.AddBinaryAttachment("data"sv, RawHash); CbObject RewrittenOp = Writer.Save(); FilesArrayWriter.AddObject(std::move(RewrittenOp)); CopyField = false; } } } if (CopyField) { FilesArrayWriter.AddField(Field); } else { OpRewritten = true; } } if (OpRewritten && AllOk) { FilesArrayWriter.EndArray(); CbArray FilesArray = FilesArrayWriter.Save().AsArray(); CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { if (Field.GetName() == "files"sv) { NewWriter.AddArray("files"sv, FilesArray); return true; } return false; }); NewOps.push_back(std::move(RewrittenOp)); } OpCount++; }, ProjectStore::Oplog::Paging{}); CbObjectWriter ResponseObj; // Persist rewritten oplog entries if (!NewOps.empty()) { ResponseObj.BeginArray("rewritten_ops"); for (CbObject& NewOp : NewOps) { ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number); ResponseObj.AddInteger(NewLsn.Number); } ResponseObj.EndArray(); } // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have // unreferenced chunks. for (auto It : AddedChunks) { const IoHash& RawHash = It.first; AddedChunk& Chunk = It.second; CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash); if (Result.New) { InlinedBytes += Chunk.RawSize; ++InlinedFiles; } } ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount); return HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save()); } case HashStringDjb2("appendops"sv): { ZEN_TRACE_CPU("Store::Rpc::appendops"); if (!m_ProjectStore->AreDiskWritesAllowed()) { return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); } CbArrayView OpsArray = Cb["ops"sv].AsArrayView(); size_t OpsBufferSize = 0; for (CbFieldView OpView : OpsArray) { OpsBufferSize += OpView.GetSize(); } UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize); MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView(); std::vector Ops; Ops.reserve(OpsArray.Num()); for (CbFieldView OpView : OpsArray) { OpView.CopyTo(OpsBuffersMemory); Ops.push_back(CbObjectView(OpsBuffersMemory.GetData())); OpsBuffersMemory.MidInline(OpView.GetSize()); } std::vector LSNs = Oplog->AppendNewOplogEntries(Ops); ZEN_ASSERT(LSNs.size() == Ops.size()); std::vector MissingAttachments; for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++) { if (LSNs[OpIndex]) { CbObjectView Op = Ops[OpIndex]; Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) { const IoHash Cid = AttachmentView.AsAttachment(); if (!m_CidStore.ContainsChunk(Cid)) { MissingAttachments.push_back(Cid); } }); } } CbPackage ResponsePackage; { CbObjectWriter ResponseObj; ResponseObj.BeginArray("written_ops"); for (ProjectStore::LogSequenceNumber NewLsn : LSNs) { ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number); ResponseObj.AddInteger(NewLsn.Number); } ResponseObj.EndArray(); if (!MissingAttachments.empty()) { ResponseObj.BeginArray("need"); for (const IoHash& Cid : MissingAttachments) { ResponseObj.AddHash(Cid); } ResponseObj.EndArray(); } ResponsePackage.SetObject(ResponseObj.Save()); } std::vector ResponseBuffers = FormatPackageMessage(ResponsePackage); return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers); } default: m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); } } void HttpProjectService::HandleDetailsRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::Details"); using namespace std::literals; HttpServerRequest& HttpReq = Req.ServerRequest(); HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); bool CSV = Params.GetValue("csv"sv) == "true"sv; bool Details = Params.GetValue("details"sv) == "true"sv; bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv; bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv; if (CSV) { ExtendableStringBuilder<4096> CSVWriter; CSVHeader(Details, AttachmentDetails, CSVWriter); m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) { Project.IterateOplogs([&](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) { Oplog.IterateOplogWithKey( [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](ProjectStore::LogSequenceNumber LSN, const Oid& Key, CbObjectView Op) { CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); }); }); }); HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); } else { CbObjectWriter Cbo; Cbo.BeginArray("projects"); { m_ProjectStore->DiscoverProjects(); m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) { std::vector OpLogs = Project.ScanForOplogs(); CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); }); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } } void HttpProjectService::HandleProjectDetailsRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::ProjectDetails"); using namespace std::literals; HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); bool CSV = Params.GetValue("csv"sv) == "true"sv; bool Details = Params.GetValue("details"sv) == "true"sv; bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv; bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv; Ref FoundProject = m_ProjectStore->OpenProject(ProjectId); if (!FoundProject) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Project& Project = *FoundProject.Get(); if (CSV) { ExtendableStringBuilder<4096> CSVWriter; CSVHeader(Details, AttachmentDetails, CSVWriter); FoundProject->IterateOplogs([&](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) { Oplog.IterateOplogWithKey([this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](ProjectStore::LogSequenceNumber LSN, const Oid& Key, CbObjectView Op) { CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); }); }); HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); } else { CbObjectWriter Cbo; std::vector OpLogs = FoundProject->ScanForOplogs(); Cbo.BeginArray("projects"); { CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } } void HttpProjectService::HandleOplogDetailsRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::OplogDetails"); using namespace std::literals; HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); bool CSV = Params.GetValue("csv"sv) == "true"sv; bool Details = Params.GetValue("details"sv) == "true"sv; bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv; bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv; Ref FoundProject = m_ProjectStore->OpenProject(ProjectId); if (!FoundProject) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } Ref FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Project& Project = *FoundProject.Get(); ProjectStore::Oplog& Oplog = *FoundLog; if (CSV) { ExtendableStringBuilder<4096> CSVWriter; CSVHeader(Details, AttachmentDetails, CSVWriter); Oplog.IterateOplogWithKey([this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](ProjectStore::LogSequenceNumber LSN, const Oid& Key, CbObjectView Op) { CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); }); HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); } else { CbObjectWriter Cbo; Cbo.BeginArray("oplogs"); { CbWriteOplog(m_CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } } void HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::OplogOpDetails"); using namespace std::literals; HttpServerRequest& HttpReq = Req.ServerRequest(); const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); const auto& OpId = Req.GetCapture(3); HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); bool CSV = Params.GetValue("csv"sv) == "true"sv; bool Details = Params.GetValue("details"sv) == "true"sv; bool OpDetails = Params.GetValue("opdetails"sv) == "true"sv; bool AttachmentDetails = Params.GetValue("attachmentdetails"sv) == "true"sv; Ref FoundProject = m_ProjectStore->OpenProject(ProjectId); if (!FoundProject) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } Ref FoundLog = FoundProject->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); if (!FoundLog) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } if (OpId.size() != 2 * sizeof(Oid::OidBits)) { m_ProjectStats.BadRequestCount++; return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, OpId)); } const Oid ObjId = Oid::FromHexString(OpId); ProjectStore::Project& Project = *FoundProject.Get(); ProjectStore::Oplog& Oplog = *FoundLog; std::optional Op = Oplog.GetOpByKey(ObjId); if (!Op.has_value()) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::LogSequenceNumber LSN = Oplog.GetOpIndexByKey(ObjId); if (!LSN) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } if (CSV) { ExtendableStringBuilder<4096> CSVWriter; CSVHeader(Details, AttachmentDetails, CSVWriter); CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, ObjId, Op.value(), CSVWriter); HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); } else { CbObjectWriter Cbo; Cbo.BeginArray("ops"); { CbWriteOp(m_CidStore, Details, OpDetails, AttachmentDetails, LSN, ObjId, Op.value(), Cbo); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } } Ref HttpProjectService::GetThreadWorkers(bool BoostWorkers, bool SingleThreaded) { RwLock::ExclusiveLockScope _(m_ThreadWorkersLock); if (m_ThreadWorkers && m_ThreadWorkers->IsBoostWorkers() == BoostWorkers && m_ThreadWorkers->IsSingleThreaded() == SingleThreaded) { return m_ThreadWorkers; } m_ThreadWorkers = new TransferThreadWorkers(BoostWorkers, SingleThreaded); return m_ThreadWorkers; } } // namespace zen