diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-02 15:56:00 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-02 15:56:00 +0200 |
| commit | adbec59ee0b169ba0ac9d167eac4ed72edefe1ca (patch) | |
| tree | c05d4e5624d98272969f6509fe5cf8a75d6ebf9d /src/zenserver/projectstore/httpprojectstore.cpp | |
| parent | Zs/OIDC exe path handling (#538) (diff) | |
| download | zen-adbec59ee0b169ba0ac9d167eac4ed72edefe1ca.tar.xz zen-adbec59ee0b169ba0ac9d167eac4ed72edefe1ca.zip | |
projectstore refactor phase 2 (#539)
Refactor projectstore/httpprojectservice to prepare for move of projectstore to zenstore
Diffstat (limited to 'src/zenserver/projectstore/httpprojectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 895 |
1 files changed, 875 insertions, 20 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 2a1bf9a5e..175d131e1 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -14,11 +14,20 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory/llm.h> +#include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/trace.h> +#include <zenhttp/packageformat.h> #include <zenstore/zenstore.h> +#include <zenutil/openprocesscache.h> #include <zenutil/workerpools.h> +#include "buildsremoteprojectstore.h" +#include "fileremoteprojectstore.h" +#include "jupiterremoteprojectstore.h" +#include "remoteprojectstore.h" +#include "zenremoteprojectstore.h" + namespace zen { const FLLMTag& @@ -233,6 +242,264 @@ namespace { Cbo.EndObject(); } + struct CreateRemoteStoreResult + { + std::shared_ptr<RemoteProjectStore> Store; + std::string Description; + }; + + CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params, + AuthMgr& AuthManager, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + const std::filesystem::path& TempFilePath) + { + ZEN_MEMSCOPE(GetProjectHttpTag()); + + using namespace std::literals; + + std::shared_ptr<RemoteProjectStore> RemoteStore; + + if (CbObjectView File = Params["file"sv].AsObjectView(); File) + { + std::filesystem::path FolderPath(File["path"sv].AsString()); + if (FolderPath.empty()) + { + return {nullptr, "Missing file path"}; + } + std::string_view Name(File["name"sv].AsString()); + if (Name.empty()) + { + return {nullptr, "Missing file name"}; + } + std::string_view OptionalBaseName(File["basename"sv].AsString()); + bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false); + bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false); + + FileRemoteStoreOptions Options = { + RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + FolderPath, + std::string(Name), + std::string(OptionalBaseName), + ForceDisableBlocks, + ForceEnableTempBlocks}; + RemoteStore = CreateFileRemoteStore(Options); + } + + if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud) + { + std::string_view CloudServiceUrl = Cloud["url"sv].AsString(); + if (CloudServiceUrl.empty()) + { + return {nullptr, "Missing service url"}; + } + + std::string Url = UrlDecode(CloudServiceUrl); + std::string_view Namespace = Cloud["namespace"sv].AsString(); + if (Namespace.empty()) + { + return {nullptr, "Missing namespace"}; + } + std::string_view Bucket = Cloud["bucket"sv].AsString(); + if (Bucket.empty()) + { + return {nullptr, "Missing bucket"}; + } + std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString(); + std::string AccessToken = std::string(Cloud["access-token"sv].AsString()); + if (AccessToken.empty()) + { + std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString(); + if (!AccessTokenEnvVariable.empty()) + { + AccessToken = GetEnvVariable(AccessTokenEnvVariable); + } + } + std::filesystem::path OidcExePath; + if (std::string_view OidcExePathString = Cloud["oidc-exe-path"].AsString(); !OidcExePathString.empty()) + { + std::filesystem::path OidcExePathMaybe(OidcExePathString); + if (IsFile(OidcExePathMaybe)) + { + OidcExePath = std::move(OidcExePathMaybe); + } + else + { + ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); + } + } + std::string_view KeyParam = Cloud["key"sv].AsString(); + if (KeyParam.empty()) + { + return {nullptr, "Missing key"}; + } + if (KeyParam.length() != IoHash::StringLength) + { + return {nullptr, "Invalid key"}; + } + IoHash Key = IoHash::FromHexString(KeyParam); + if (Key == IoHash::Zero) + { + return {nullptr, "Invalid key string"}; + } + IoHash BaseKey = IoHash::Zero; + std::string_view BaseKeyParam = Cloud["basekey"sv].AsString(); + if (!BaseKeyParam.empty()) + { + if (BaseKeyParam.length() != IoHash::StringLength) + { + return {nullptr, "Invalid base key"}; + } + BaseKey = IoHash::FromHexString(BaseKeyParam); + if (BaseKey == IoHash::Zero) + { + return {nullptr, "Invalid base key string"}; + } + } + + bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false); + bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false); + bool AssumeHttp2 = Cloud["assumehttp2"sv].AsBool(false); + + JupiterRemoteStoreOptions Options = { + RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + Url, + std::string(Namespace), + std::string(Bucket), + Key, + BaseKey, + std::string(OpenIdProvider), + AccessToken, + AuthManager, + OidcExePath, + ForceDisableBlocks, + ForceDisableTempBlocks, + AssumeHttp2}; + RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false); + } + + if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen) + { + std::string_view Url = Zen["url"sv].AsString(); + std::string_view Project = Zen["project"sv].AsString(); + if (Project.empty()) + { + return {nullptr, "Missing project"}; + } + std::string_view Oplog = Zen["oplog"sv].AsString(); + if (Oplog.empty()) + { + return {nullptr, "Missing oplog"}; + } + ZenRemoteStoreOptions Options = { + RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + std::string(Url), + std::string(Project), + std::string(Oplog)}; + RemoteStore = CreateZenRemoteStore(Options, TempFilePath); + } + + if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds) + { + std::string_view BuildsServiceUrl = Builds["url"sv].AsString(); + if (BuildsServiceUrl.empty()) + { + return {nullptr, "Missing service url"}; + } + + std::string Url = UrlDecode(BuildsServiceUrl); + std::string_view Namespace = Builds["namespace"sv].AsString(); + if (Namespace.empty()) + { + return {nullptr, "Missing namespace"}; + } + std::string_view Bucket = Builds["bucket"sv].AsString(); + if (Bucket.empty()) + { + return {nullptr, "Missing bucket"}; + } + std::string_view OpenIdProvider = Builds["openid-provider"sv].AsString(); + std::string AccessToken = std::string(Builds["access-token"sv].AsString()); + if (AccessToken.empty()) + { + std::string_view AccessTokenEnvVariable = Builds["access-token-env"].AsString(); + if (!AccessTokenEnvVariable.empty()) + { + AccessToken = GetEnvVariable(AccessTokenEnvVariable); + } + } + std::filesystem::path OidcExePath; + if (std::string_view OidcExePathString = Builds["oidc-exe-path"].AsString(); !OidcExePathString.empty()) + { + std::filesystem::path OidcExePathMaybe(OidcExePathString); + if (IsFile(OidcExePathMaybe)) + { + OidcExePath = std::move(OidcExePathMaybe); + } + else + { + ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); + } + } + std::string_view BuildIdParam = Builds["buildsid"sv].AsString(); + if (BuildIdParam.empty()) + { + return {nullptr, "Missing build id"}; + } + if (BuildIdParam.length() != Oid::StringLength) + { + return {nullptr, "Invalid build id"}; + } + Oid BuildId = Oid::FromHexString(BuildIdParam); + if (BuildId == Oid::Zero) + { + return {nullptr, "Invalid build id string"}; + } + + bool ForceDisableBlocks = Builds["disableblocks"sv].AsBool(false); + bool ForceDisableTempBlocks = Builds["disabletempblocks"sv].AsBool(false); + bool AssumeHttp2 = Builds["assumehttp2"sv].AsBool(false); + + MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView(); + IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize()); + + BuildsRemoteStoreOptions Options = { + RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + Url, + std::string(Namespace), + std::string(Bucket), + BuildId, + std::string(OpenIdProvider), + AccessToken, + AuthManager, + OidcExePath, + ForceDisableBlocks, + ForceDisableTempBlocks, + AssumeHttp2, + MetaData}; + RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false); + } + + if (!RemoteStore) + { + return {nullptr, "Unknown remote store type"}; + } + + return {std::move(RemoteStore), ""}; + } + + std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result) + { + if (Result.ErrorCode == 0) + { + return {HttpResponseCode::OK, Result.Text}; + } + return {static_cast<HttpResponseCode>(Result.ErrorCode), + Result.Reason.empty() ? Result.Text + : Result.Text.empty() ? Result.Reason + : fmt::format("{}: {}", Result.Reason, Result.Text)}; + } + } // namespace ////////////////////////////////////////////////////////////////////////// @@ -241,13 +508,17 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatusService& StatusService, HttpStatsService& StatsService, - AuthMgr& AuthMgr) + AuthMgr& AuthMgr, + OpenProcessCache& InOpenProcessCache, + JobQueue& InJobQueue) : m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectStore(Projects) , m_StatusService(StatusService) , m_StatsService(StatsService) , m_AuthMgr(AuthMgr) +, m_OpenProcessCache(InOpenProcessCache) +, m_JobQueue(InJobQueue) { ZEN_MEMSCOPE(GetProjectHttpTag()); @@ -2044,19 +2315,55 @@ HttpProjectService::HandleOplogSaveRequest(HttpRouterRequest& Req) if (CbObject ContainerObject = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult); ValidateResult == CbValidateError::None && ContainerObject) { - ProjectStore::WriteOplogResult Result = m_ProjectStore->WriteOplog(*Project, *Oplog, ContainerObject); + RwLock AttachmentsLock; + tsl::robin_set<IoHash, IoHash::Hasher> Attachments; + + auto HasAttachment = [this](const IoHash& RawHash) { return m_CidStore.ContainsChunk(RawHash); }; + auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) { + RwLock::ExclusiveLockScope _(AttachmentsLock); + if (BlockHash != IoHash::Zero) + { + Attachments.insert(BlockHash); + } + else + { + Attachments.insert(ChunkHashes.begin(), ChunkHashes.end()); + } + }; + auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) { + RwLock::ExclusiveLockScope _(AttachmentsLock); + Attachments.insert(RawHash); + }; + + auto OnChunkedAttachment = [](const ChunkedInfo&) {}; + + auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog->CaptureAddedAttachments(RawHashes); }; + + // Make sure we retain any attachments we download before writing the oplog + Oplog->EnableUpdateCapture(); + auto _ = MakeGuard([&Oplog]() { Oplog->DisableUpdateCapture(); }); + + RemoteProjectStore::Result Result = SaveOplogContainer(*Oplog, + ContainerObject, + OnReferencedAttachments, + HasAttachment, + OnNeedBlock, + OnNeedAttachment, + OnChunkedAttachment, + nullptr); + if (Result.ErrorCode == 0) { - if (Result.Need.empty()) + if (Attachments.empty()) { HttpReq.WriteResponse(HttpResponseCode::OK); } else { - CbObjectWriter Cbo(1 + 1 + 5 + Result.Need.size() * (1 + sizeof(IoHash::Hash)) + 1); + CbObjectWriter Cbo(1 + 1 + 5 + Attachments.size() * (1 + sizeof(IoHash::Hash)) + 1); Cbo.BeginArray("need"); { - for (const IoHash& Hash : Result.Need) + for (const IoHash& Hash : Attachments) { ZEN_DEBUG("Need attachment {}", Hash); Cbo << Hash; @@ -2074,15 +2381,15 @@ HttpProjectService::HandleOplogSaveRequest(HttpRouterRequest& Req) ToString(HttpReq.RequestVerb()), HttpReq.QueryString(), Result.ErrorCode, - Result.ErrorDescription); + Result.Reason); - if (Result.ErrorDescription.empty()) + if (Result.Reason.empty()) { return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode)); } else { - return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.ErrorDescription); + return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.Reason); } } } @@ -2164,27 +2471,41 @@ HttpProjectService::HandleOplogLoadRequest(HttpRouterRequest& Req) } } - ProjectStore::ReadOplogResult Result = - m_ProjectStore->ReadOplog(*Project, *Oplog, MaxBlockSize, MaxChunkEmbedSize, MaxChunksPerBlock, ChunkFileSizeLimit); - if (Result.ErrorCode == 0) + RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( + m_CidStore, + *Project, + *Oplog, + MaxBlockSize, + MaxChunkEmbedSize, + MaxChunksPerBlock, + ChunkFileSizeLimit, + /* BuildBlocks */ false, + /* IgnoreMissingAttachments */ false, + /* AllowChunking*/ false, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, + [](const IoHash&, TGetAttachmentBufferFunc&&) {}, + [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, + /* EmbedLooseFiles*/ false); + + if (ContainerResult.ErrorCode == 0) { - return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ContainerObject); + return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerResult.ContainerObject); } else { ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`", ToString(HttpReq.RequestVerb()), HttpReq.QueryString(), - Result.ErrorCode, - Result.ErrorDescription); + ContainerResult.ErrorCode, + ContainerResult.Reason); - if (Result.ErrorDescription.empty()) + if (ContainerResult.Reason.empty()) { - return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode)); + return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode)); } else { - return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.ErrorDescription); + return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode), HttpContentType::kText, ContainerResult.Reason); } } } @@ -2193,6 +2514,7 @@ void HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::Rpc"); + using namespace std::literals; HttpServerRequest& HttpReq = Req.ServerRequest(); @@ -2200,10 +2522,543 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) const auto& OplogId = Req.GetCapture(2); IoBuffer Payload = HttpReq.ReadPayload(); - bool OkRequest = m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr); - if (!OkRequest) + HttpContentType PayloadContentType = HttpReq.RequestContentType(); + CbPackage Package; + CbObject Cb; + switch (PayloadContentType) { - m_ProjectStats.BadRequestCount++; + case HttpContentType::kJSON: + case HttpContentType::kUnknownContentType: + case HttpContentType::kText: + { + std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); + Cb = LoadCompactBinaryFromJson(JsonText).AsObject(); + if (!Cb) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected JSON format"); + } + } + break; + case HttpContentType::kCbObject: + { + CbValidateError ValidateResult; + if (Cb = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult); + ValidateResult != CbValidateError::None || !Cb) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Content format not supported, expected compact binary format ('{}')", ToString(ValidateResult))); + } + break; + } + case HttpContentType::kCbPackage: + try + { + Package = ParsePackageMessage(Payload); + Cb = Package.GetObject(); + } + catch (const std::invalid_argument& ex) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Failed to parse package request, reason: '{}'", ex.what())); + } + if (!Cb) + { + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected package message format"); + } + break; + default: + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type"); + } + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Rpc oplog request for unknown project '{}'", ProjectId)); + } + Project->TouchProject(); + + std::string_view Method = Cb["method"sv].AsString(); + + bool VerifyPathOnDisk = Method != "getchunks"sv; + + Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk); + if (!Oplog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); + } + Project->TouchOplog(OplogId); + + uint32_t MethodHash = HashStringDjb2(Method); + + switch (MethodHash) + { + case HashStringDjb2("import"sv): + { + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + CbObjectView Params = Cb["params"sv].AsObjectView(); + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); + bool Force = Params["force"sv].AsBool(false); + bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); + bool CleanOplog = Params["clean"].AsBool(false); + + CreateRemoteStoreResult RemoteStoreResult = + CreateRemoteStore(Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath()); + + if (RemoteStoreResult.Store == nullptr) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description); + } + std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); + RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + + JobId JobId = m_JobQueue.QueueJob( + fmt::format("Import oplog '{}/{}'", Project->Identifier, Oplog->OplogId()), + [this, + ChunkStore = &m_CidStore, + ActualRemoteStore = std::move(RemoteStore), + Oplog, + Force, + IgnoreMissingAttachments, + CleanOplog](JobContext& Context) { + Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}", + Oplog->GetOuterProjectIdentifier(), + Oplog->OplogId(), + ActualRemoteStore->GetInfo().Description)); + + RemoteProjectStore::Result Result = + LoadOplog(m_CidStore, *ActualRemoteStore, *Oplog, Force, IgnoreMissingAttachments, CleanOplog, &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, + (int)Response.first); + } + }); + + return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id)); + } + case HashStringDjb2("export"sv): + { + CbObjectView Params = Cb["params"sv].AsObjectView(); + size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); + size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); + size_t MaxChunksPerBlock = Params["maxchunksperblock"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunksPerBlock); + size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit); + bool Force = Params["force"sv].AsBool(false); + bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); + bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); + + CreateRemoteStoreResult RemoteStoreResult = + CreateRemoteStore(Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath()); + + if (RemoteStoreResult.Store == nullptr) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description); + } + std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); + RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + + JobId JobId = m_JobQueue.QueueJob( + fmt::format("Export oplog '{}/{}'", Project->Identifier, Oplog->OplogId()), + [this, + ActualRemoteStore = std::move(RemoteStore), + Project, + Oplog, + MaxBlockSize, + MaxChunksPerBlock, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + EmbedLooseFile, + Force, + IgnoreMissingAttachments](JobContext& Context) { + Context.ReportMessage(fmt::format("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", + Project->Identifier, + Oplog->OplogId(), + ActualRemoteStore->GetInfo().Description, + NiceBytes(MaxBlockSize), + NiceBytes(MaxChunkEmbedSize))); + + RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + *ActualRemoteStore, + *Project, + *Oplog, + MaxBlockSize, + MaxChunksPerBlock, + MaxChunkEmbedSize, + ChunkFileSizeLimit, + EmbedLooseFile, + Force, + IgnoreMissingAttachments, + &Context); + auto Response = ConvertResult(Result); + ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); + if (!IsHttpSuccessCode(Response.first)) + { + throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, + (int)Response.first); + } + }); + + return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id)); + } + case HashStringDjb2("getchunks"sv): + { + RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u)); + int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0); + + std::vector<ProjectStore::ChunkRequest> Requests = m_ProjectStore->ParseChunksRequests(*Project, *Oplog, Cb); + std::vector<ProjectStore::ChunkResult> Results = + Requests.empty() ? std::vector<ProjectStore::ChunkResult>{} : m_ProjectStore->GetChunks(*Project, *Oplog, Requests); + CbPackage Response = m_ProjectStore->WriteChunksRequestResponse(*Project, *Oplog, std::move(Requests), std::move(Results)); + + void* TargetProcessHandle = nullptr; + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + { + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } + TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId); + } + + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(Response, Flags, TargetProcessHandle); + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + case HashStringDjb2("putchunks"sv): + { + ZEN_TRACE_CPU("Store::Rpc::putchunks"); + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + CbObject Object = Package.GetObject(); + const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false); + + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + if (!Attachments.empty()) + { + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + + WriteAttachmentBuffers.reserve(Attachments.size()); + WriteRawHashes.reserve(Attachments.size()); + + for (const CbAttachment& Attachment : Attachments) + { + IoHash RawHash = Attachment.GetHash(); + const CompressedBuffer& Compressed = Attachment.AsCompressedBinary(); + IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + if (UsingTempFiles) + { + AttachmentPayload.SetDeleteOnClose(true); + } + WriteAttachmentBuffers.push_back(std::move(AttachmentPayload)); + WriteRawHashes.push_back(RawHash); + } + + Oplog->CaptureAddedAttachments(WriteRawHashes); + m_CidStore.AddChunks(WriteAttachmentBuffers, + WriteRawHashes, + UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly); + } + return HttpReq.WriteResponse(HttpResponseCode::OK); + } + case HashStringDjb2("snapshot"sv): + { + ZEN_TRACE_CPU("Store::Rpc::snapshot"); + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + // Snapshot all referenced files. This brings the content of all + // files into the CID store + + uint32_t OpCount = 0; + uint64_t InlinedBytes = 0; + uint64_t InlinedFiles = 0; + uint64_t TotalBytes = 0; + uint64_t TotalFiles = 0; + + std::vector<CbObject> NewOps; + struct AddedChunk + { + IoBuffer Buffer; + uint64_t RawSize = 0; + }; + tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks; + + Oplog->IterateOplog( + [&](CbObjectView Op) { + bool OpRewritten = false; + bool AllOk = true; + + CbWriter FilesArrayWriter; + FilesArrayWriter.BeginArray("files"sv); + + for (CbFieldView& Field : Op["files"sv]) + { + bool CopyField = true; + + if (CbObjectView View = Field.AsObjectView()) + { + const IoHash DataHash = View["data"sv].AsHash(); + + if (DataHash == IoHash::Zero) + { + std::string_view ServerPath = View["serverpath"sv].AsString(); + std::filesystem::path FilePath = Project->RootDir / ServerPath; + BasicFile DataFile; + std::error_code Ec; + DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); + + if (Ec) + { + // Error... + + ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message()); + + AllOk = false; + } + else + { + // Read file contents into memory, compress and keep in map of chunks to add to Cid store + IoBuffer FileIoBuffer = DataFile.ReadAll(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); + const uint64_t RawSize = Compressed.DecodeRawSize(); + const IoHash RawHash = Compressed.DecodeRawHash(); + if (!AddedChunks.contains(RawHash)) + { + const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString(); + BasicFile ChunkTempFile; + ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete); + ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec); + if (Ec) + { + Oid ChunkId = View["id"sv].AsObjectId(); + ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}", + FilePath, + ChunkId, + Ec.message()); + AllOk = false; + } + else + { + void* FileHandle = ChunkTempFile.Detach(); + IoBuffer ChunkBuffer(IoBuffer::File, + FileHandle, + 0, + Compressed.GetCompressed().GetSize(), + /*IsWholeFile*/ true); + ChunkBuffer.SetDeleteOnClose(true); + AddedChunks.insert_or_assign( + RawHash, + AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize}); + } + } + + TotalBytes += RawSize; + ++TotalFiles; + + // Rewrite file array entry with new data reference + CbObjectWriter Writer(View.GetSize()); + RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { + if (Field.GetName() == "data"sv) + { + // omit this field as we will write it explicitly ourselves + return true; + } + return false; + }); + Writer.AddBinaryAttachment("data"sv, RawHash); + + CbObject RewrittenOp = Writer.Save(); + FilesArrayWriter.AddObject(std::move(RewrittenOp)); + CopyField = false; + } + } + } + + if (CopyField) + { + FilesArrayWriter.AddField(Field); + } + else + { + OpRewritten = true; + } + } + + if (OpRewritten && AllOk) + { + FilesArrayWriter.EndArray(); + CbArray FilesArray = FilesArrayWriter.Save().AsArray(); + + CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { + if (Field.GetName() == "files"sv) + { + NewWriter.AddArray("files"sv, FilesArray); + + return true; + } + + return false; + }); + + NewOps.push_back(std::move(RewrittenOp)); + } + + OpCount++; + }, + ProjectStore::Oplog::Paging{}); + + CbObjectWriter ResponseObj; + + // Persist rewritten oplog entries + if (!NewOps.empty()) + { + ResponseObj.BeginArray("rewritten_ops"); + + for (CbObject& NewOp : NewOps) + { + ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); + + ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number); + + ResponseObj.AddInteger(NewLsn.Number); + } + + ResponseObj.EndArray(); + } + + // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the + // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have + // unreferenced chunks. + for (auto It : AddedChunks) + { + const IoHash& RawHash = It.first; + AddedChunk& Chunk = It.second; + CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash); + if (Result.New) + { + InlinedBytes += Chunk.RawSize; + ++InlinedFiles; + } + } + + ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; + ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; + + ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount); + + return HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save()); + } + case HashStringDjb2("appendops"sv): + { + ZEN_TRACE_CPU("Store::Rpc::appendops"); + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + CbArrayView OpsArray = Cb["ops"sv].AsArrayView(); + + size_t OpsBufferSize = 0; + for (CbFieldView OpView : OpsArray) + { + OpsBufferSize += OpView.GetSize(); + } + UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize); + MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView(); + + std::vector<CbObjectView> Ops; + Ops.reserve(OpsArray.Num()); + for (CbFieldView OpView : OpsArray) + { + OpView.CopyTo(OpsBuffersMemory); + Ops.push_back(CbObjectView(OpsBuffersMemory.GetData())); + OpsBuffersMemory.MidInline(OpView.GetSize()); + } + + std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops); + ZEN_ASSERT(LSNs.size() == Ops.size()); + + std::vector<IoHash> MissingAttachments; + for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++) + { + if (LSNs[OpIndex]) + { + CbObjectView Op = Ops[OpIndex]; + Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) { + const IoHash Cid = AttachmentView.AsAttachment(); + if (!m_CidStore.ContainsChunk(Cid)) + { + MissingAttachments.push_back(Cid); + } + }); + } + } + + CbPackage ResponsePackage; + + { + CbObjectWriter ResponseObj; + ResponseObj.BeginArray("written_ops"); + + for (ProjectStore::LogSequenceNumber NewLsn : LSNs) + { + ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number); + ResponseObj.AddInteger(NewLsn.Number); + } + ResponseObj.EndArray(); + + if (!MissingAttachments.empty()) + { + ResponseObj.BeginArray("need"); + + for (const IoHash& Cid : MissingAttachments) + { + ResponseObj.AddHash(Cid); + } + ResponseObj.EndArray(); + } + ResponsePackage.SetObject(ResponseObj.Save()); + } + + std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage); + + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers); + } + default: + m_ProjectStats.BadRequestCount++; + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Unknown rpc method '{}'", Method)); } } void |