aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/httpprojectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-02 15:56:00 +0200
committerGitHub Enterprise <[email protected]>2025-10-02 15:56:00 +0200
commitadbec59ee0b169ba0ac9d167eac4ed72edefe1ca (patch)
treec05d4e5624d98272969f6509fe5cf8a75d6ebf9d /src/zenserver/projectstore/httpprojectstore.cpp
parentZs/OIDC exe path handling (#538) (diff)
downloadzen-adbec59ee0b169ba0ac9d167eac4ed72edefe1ca.tar.xz
zen-adbec59ee0b169ba0ac9d167eac4ed72edefe1ca.zip
projectstore refactor phase 2 (#539)
Refactor projectstore/httpprojectservice to prepare for move of projectstore to zenstore
Diffstat (limited to 'src/zenserver/projectstore/httpprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp895
1 files changed, 875 insertions, 20 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index 2a1bf9a5e..175d131e1 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -14,11 +14,20 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/memory/llm.h>
+#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/trace.h>
+#include <zenhttp/packageformat.h>
#include <zenstore/zenstore.h>
+#include <zenutil/openprocesscache.h>
#include <zenutil/workerpools.h>
+#include "buildsremoteprojectstore.h"
+#include "fileremoteprojectstore.h"
+#include "jupiterremoteprojectstore.h"
+#include "remoteprojectstore.h"
+#include "zenremoteprojectstore.h"
+
namespace zen {
const FLLMTag&
@@ -233,6 +242,264 @@ namespace {
Cbo.EndObject();
}
+ struct CreateRemoteStoreResult
+ {
+ std::shared_ptr<RemoteProjectStore> Store;
+ std::string Description;
+ };
+
+ CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params,
+ AuthMgr& AuthManager,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ const std::filesystem::path& TempFilePath)
+ {
+ ZEN_MEMSCOPE(GetProjectHttpTag());
+
+ using namespace std::literals;
+
+ std::shared_ptr<RemoteProjectStore> RemoteStore;
+
+ if (CbObjectView File = Params["file"sv].AsObjectView(); File)
+ {
+ std::filesystem::path FolderPath(File["path"sv].AsString());
+ if (FolderPath.empty())
+ {
+ return {nullptr, "Missing file path"};
+ }
+ std::string_view Name(File["name"sv].AsString());
+ if (Name.empty())
+ {
+ return {nullptr, "Missing file name"};
+ }
+ std::string_view OptionalBaseName(File["basename"sv].AsString());
+ bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false);
+ bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false);
+
+ FileRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ FolderPath,
+ std::string(Name),
+ std::string(OptionalBaseName),
+ ForceDisableBlocks,
+ ForceEnableTempBlocks};
+ RemoteStore = CreateFileRemoteStore(Options);
+ }
+
+ if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud)
+ {
+ std::string_view CloudServiceUrl = Cloud["url"sv].AsString();
+ if (CloudServiceUrl.empty())
+ {
+ return {nullptr, "Missing service url"};
+ }
+
+ std::string Url = UrlDecode(CloudServiceUrl);
+ std::string_view Namespace = Cloud["namespace"sv].AsString();
+ if (Namespace.empty())
+ {
+ return {nullptr, "Missing namespace"};
+ }
+ std::string_view Bucket = Cloud["bucket"sv].AsString();
+ if (Bucket.empty())
+ {
+ return {nullptr, "Missing bucket"};
+ }
+ std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString();
+ std::string AccessToken = std::string(Cloud["access-token"sv].AsString());
+ if (AccessToken.empty())
+ {
+ std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString();
+ if (!AccessTokenEnvVariable.empty())
+ {
+ AccessToken = GetEnvVariable(AccessTokenEnvVariable);
+ }
+ }
+ std::filesystem::path OidcExePath;
+ if (std::string_view OidcExePathString = Cloud["oidc-exe-path"].AsString(); !OidcExePathString.empty())
+ {
+ std::filesystem::path OidcExePathMaybe(OidcExePathString);
+ if (IsFile(OidcExePathMaybe))
+ {
+ OidcExePath = std::move(OidcExePathMaybe);
+ }
+ else
+ {
+ ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString);
+ }
+ }
+ std::string_view KeyParam = Cloud["key"sv].AsString();
+ if (KeyParam.empty())
+ {
+ return {nullptr, "Missing key"};
+ }
+ if (KeyParam.length() != IoHash::StringLength)
+ {
+ return {nullptr, "Invalid key"};
+ }
+ IoHash Key = IoHash::FromHexString(KeyParam);
+ if (Key == IoHash::Zero)
+ {
+ return {nullptr, "Invalid key string"};
+ }
+ IoHash BaseKey = IoHash::Zero;
+ std::string_view BaseKeyParam = Cloud["basekey"sv].AsString();
+ if (!BaseKeyParam.empty())
+ {
+ if (BaseKeyParam.length() != IoHash::StringLength)
+ {
+ return {nullptr, "Invalid base key"};
+ }
+ BaseKey = IoHash::FromHexString(BaseKeyParam);
+ if (BaseKey == IoHash::Zero)
+ {
+ return {nullptr, "Invalid base key string"};
+ }
+ }
+
+ bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false);
+ bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false);
+ bool AssumeHttp2 = Cloud["assumehttp2"sv].AsBool(false);
+
+ JupiterRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ Url,
+ std::string(Namespace),
+ std::string(Bucket),
+ Key,
+ BaseKey,
+ std::string(OpenIdProvider),
+ AccessToken,
+ AuthManager,
+ OidcExePath,
+ ForceDisableBlocks,
+ ForceDisableTempBlocks,
+ AssumeHttp2};
+ RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false);
+ }
+
+ if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen)
+ {
+ std::string_view Url = Zen["url"sv].AsString();
+ std::string_view Project = Zen["project"sv].AsString();
+ if (Project.empty())
+ {
+ return {nullptr, "Missing project"};
+ }
+ std::string_view Oplog = Zen["oplog"sv].AsString();
+ if (Oplog.empty())
+ {
+ return {nullptr, "Missing oplog"};
+ }
+ ZenRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ std::string(Url),
+ std::string(Project),
+ std::string(Oplog)};
+ RemoteStore = CreateZenRemoteStore(Options, TempFilePath);
+ }
+
+ if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds)
+ {
+ std::string_view BuildsServiceUrl = Builds["url"sv].AsString();
+ if (BuildsServiceUrl.empty())
+ {
+ return {nullptr, "Missing service url"};
+ }
+
+ std::string Url = UrlDecode(BuildsServiceUrl);
+ std::string_view Namespace = Builds["namespace"sv].AsString();
+ if (Namespace.empty())
+ {
+ return {nullptr, "Missing namespace"};
+ }
+ std::string_view Bucket = Builds["bucket"sv].AsString();
+ if (Bucket.empty())
+ {
+ return {nullptr, "Missing bucket"};
+ }
+ std::string_view OpenIdProvider = Builds["openid-provider"sv].AsString();
+ std::string AccessToken = std::string(Builds["access-token"sv].AsString());
+ if (AccessToken.empty())
+ {
+ std::string_view AccessTokenEnvVariable = Builds["access-token-env"].AsString();
+ if (!AccessTokenEnvVariable.empty())
+ {
+ AccessToken = GetEnvVariable(AccessTokenEnvVariable);
+ }
+ }
+ std::filesystem::path OidcExePath;
+ if (std::string_view OidcExePathString = Builds["oidc-exe-path"].AsString(); !OidcExePathString.empty())
+ {
+ std::filesystem::path OidcExePathMaybe(OidcExePathString);
+ if (IsFile(OidcExePathMaybe))
+ {
+ OidcExePath = std::move(OidcExePathMaybe);
+ }
+ else
+ {
+ ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString);
+ }
+ }
+ std::string_view BuildIdParam = Builds["buildsid"sv].AsString();
+ if (BuildIdParam.empty())
+ {
+ return {nullptr, "Missing build id"};
+ }
+ if (BuildIdParam.length() != Oid::StringLength)
+ {
+ return {nullptr, "Invalid build id"};
+ }
+ Oid BuildId = Oid::FromHexString(BuildIdParam);
+ if (BuildId == Oid::Zero)
+ {
+ return {nullptr, "Invalid build id string"};
+ }
+
+ bool ForceDisableBlocks = Builds["disableblocks"sv].AsBool(false);
+ bool ForceDisableTempBlocks = Builds["disabletempblocks"sv].AsBool(false);
+ bool AssumeHttp2 = Builds["assumehttp2"sv].AsBool(false);
+
+ MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView();
+ IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize());
+
+ BuildsRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ Url,
+ std::string(Namespace),
+ std::string(Bucket),
+ BuildId,
+ std::string(OpenIdProvider),
+ AccessToken,
+ AuthManager,
+ OidcExePath,
+ ForceDisableBlocks,
+ ForceDisableTempBlocks,
+ AssumeHttp2,
+ MetaData};
+ RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false);
+ }
+
+ if (!RemoteStore)
+ {
+ return {nullptr, "Unknown remote store type"};
+ }
+
+ return {std::move(RemoteStore), ""};
+ }
+
+ std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result)
+ {
+ if (Result.ErrorCode == 0)
+ {
+ return {HttpResponseCode::OK, Result.Text};
+ }
+ return {static_cast<HttpResponseCode>(Result.ErrorCode),
+ Result.Reason.empty() ? Result.Text
+ : Result.Text.empty() ? Result.Reason
+ : fmt::format("{}: {}", Result.Reason, Result.Text)};
+ }
+
} // namespace
//////////////////////////////////////////////////////////////////////////
@@ -241,13 +508,17 @@ HttpProjectService::HttpProjectService(CidStore& Store,
ProjectStore* Projects,
HttpStatusService& StatusService,
HttpStatsService& StatsService,
- AuthMgr& AuthMgr)
+ AuthMgr& AuthMgr,
+ OpenProcessCache& InOpenProcessCache,
+ JobQueue& InJobQueue)
: m_Log(logging::Get("project"))
, m_CidStore(Store)
, m_ProjectStore(Projects)
, m_StatusService(StatusService)
, m_StatsService(StatsService)
, m_AuthMgr(AuthMgr)
+, m_OpenProcessCache(InOpenProcessCache)
+, m_JobQueue(InJobQueue)
{
ZEN_MEMSCOPE(GetProjectHttpTag());
@@ -2044,19 +2315,55 @@ HttpProjectService::HandleOplogSaveRequest(HttpRouterRequest& Req)
if (CbObject ContainerObject = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult);
ValidateResult == CbValidateError::None && ContainerObject)
{
- ProjectStore::WriteOplogResult Result = m_ProjectStore->WriteOplog(*Project, *Oplog, ContainerObject);
+ RwLock AttachmentsLock;
+ tsl::robin_set<IoHash, IoHash::Hasher> Attachments;
+
+ auto HasAttachment = [this](const IoHash& RawHash) { return m_CidStore.ContainsChunk(RawHash); };
+ auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ if (BlockHash != IoHash::Zero)
+ {
+ Attachments.insert(BlockHash);
+ }
+ else
+ {
+ Attachments.insert(ChunkHashes.begin(), ChunkHashes.end());
+ }
+ };
+ auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ Attachments.insert(RawHash);
+ };
+
+ auto OnChunkedAttachment = [](const ChunkedInfo&) {};
+
+ auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog->CaptureAddedAttachments(RawHashes); };
+
+ // Make sure we retain any attachments we download before writing the oplog
+ Oplog->EnableUpdateCapture();
+ auto _ = MakeGuard([&Oplog]() { Oplog->DisableUpdateCapture(); });
+
+ RemoteProjectStore::Result Result = SaveOplogContainer(*Oplog,
+ ContainerObject,
+ OnReferencedAttachments,
+ HasAttachment,
+ OnNeedBlock,
+ OnNeedAttachment,
+ OnChunkedAttachment,
+ nullptr);
+
if (Result.ErrorCode == 0)
{
- if (Result.Need.empty())
+ if (Attachments.empty())
{
HttpReq.WriteResponse(HttpResponseCode::OK);
}
else
{
- CbObjectWriter Cbo(1 + 1 + 5 + Result.Need.size() * (1 + sizeof(IoHash::Hash)) + 1);
+ CbObjectWriter Cbo(1 + 1 + 5 + Attachments.size() * (1 + sizeof(IoHash::Hash)) + 1);
Cbo.BeginArray("need");
{
- for (const IoHash& Hash : Result.Need)
+ for (const IoHash& Hash : Attachments)
{
ZEN_DEBUG("Need attachment {}", Hash);
Cbo << Hash;
@@ -2074,15 +2381,15 @@ HttpProjectService::HandleOplogSaveRequest(HttpRouterRequest& Req)
ToString(HttpReq.RequestVerb()),
HttpReq.QueryString(),
Result.ErrorCode,
- Result.ErrorDescription);
+ Result.Reason);
- if (Result.ErrorDescription.empty())
+ if (Result.Reason.empty())
{
return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode));
}
else
{
- return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.ErrorDescription);
+ return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.Reason);
}
}
}
@@ -2164,27 +2471,41 @@ HttpProjectService::HandleOplogLoadRequest(HttpRouterRequest& Req)
}
}
- ProjectStore::ReadOplogResult Result =
- m_ProjectStore->ReadOplog(*Project, *Oplog, MaxBlockSize, MaxChunkEmbedSize, MaxChunksPerBlock, ChunkFileSizeLimit);
- if (Result.ErrorCode == 0)
+ RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
+ m_CidStore,
+ *Project,
+ *Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ MaxChunksPerBlock,
+ ChunkFileSizeLimit,
+ /* BuildBlocks */ false,
+ /* IgnoreMissingAttachments */ false,
+ /* AllowChunking*/ false,
+ [](CompressedBuffer&&, ChunkBlockDescription&&) {},
+ [](const IoHash&, TGetAttachmentBufferFunc&&) {},
+ [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {},
+ /* EmbedLooseFiles*/ false);
+
+ if (ContainerResult.ErrorCode == 0)
{
- return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ContainerObject);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerResult.ContainerObject);
}
else
{
ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
ToString(HttpReq.RequestVerb()),
HttpReq.QueryString(),
- Result.ErrorCode,
- Result.ErrorDescription);
+ ContainerResult.ErrorCode,
+ ContainerResult.Reason);
- if (Result.ErrorDescription.empty())
+ if (ContainerResult.Reason.empty())
{
- return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode));
+ return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode));
}
else
{
- return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.ErrorDescription);
+ return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode), HttpContentType::kText, ContainerResult.Reason);
}
}
}
@@ -2193,6 +2514,7 @@ void
HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req)
{
ZEN_TRACE_CPU("ProjectService::Rpc");
+ using namespace std::literals;
HttpServerRequest& HttpReq = Req.ServerRequest();
@@ -2200,10 +2522,543 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req)
const auto& OplogId = Req.GetCapture(2);
IoBuffer Payload = HttpReq.ReadPayload();
- bool OkRequest = m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr);
- if (!OkRequest)
+ HttpContentType PayloadContentType = HttpReq.RequestContentType();
+ CbPackage Package;
+ CbObject Cb;
+ switch (PayloadContentType)
{
- m_ProjectStats.BadRequestCount++;
+ case HttpContentType::kJSON:
+ case HttpContentType::kUnknownContentType:
+ case HttpContentType::kText:
+ {
+ std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
+ Cb = LoadCompactBinaryFromJson(JsonText).AsObject();
+ if (!Cb)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected JSON format");
+ }
+ }
+ break;
+ case HttpContentType::kCbObject:
+ {
+ CbValidateError ValidateResult;
+ if (Cb = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult);
+ ValidateResult != CbValidateError::None || !Cb)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Content format not supported, expected compact binary format ('{}')", ToString(ValidateResult)));
+ }
+ break;
+ }
+ case HttpContentType::kCbPackage:
+ try
+ {
+ Package = ParsePackageMessage(Payload);
+ Cb = Package.GetObject();
+ }
+ catch (const std::invalid_argument& ex)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Failed to parse package request, reason: '{}'", ex.what()));
+ }
+ if (!Cb)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected package message format");
+ }
+ break;
+ default:
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type");
+ }
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Rpc oplog request for unknown project '{}'", ProjectId));
+ }
+ Project->TouchProject();
+
+ std::string_view Method = Cb["method"sv].AsString();
+
+ bool VerifyPathOnDisk = Method != "getchunks"sv;
+
+ Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk);
+ if (!Oplog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+ Project->TouchOplog(OplogId);
+
+ uint32_t MethodHash = HashStringDjb2(Method);
+
+ switch (MethodHash)
+ {
+ case HashStringDjb2("import"sv):
+ {
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ CbObjectView Params = Cb["params"sv].AsObjectView();
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
+ bool Force = Params["force"sv].AsBool(false);
+ bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
+ bool CleanOplog = Params["clean"].AsBool(false);
+
+ CreateRemoteStoreResult RemoteStoreResult =
+ CreateRemoteStore(Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath());
+
+ if (RemoteStoreResult.Store == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description);
+ }
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ JobId JobId = m_JobQueue.QueueJob(
+ fmt::format("Import oplog '{}/{}'", Project->Identifier, Oplog->OplogId()),
+ [this,
+ ChunkStore = &m_CidStore,
+ ActualRemoteStore = std::move(RemoteStore),
+ Oplog,
+ Force,
+ IgnoreMissingAttachments,
+ CleanOplog](JobContext& Context) {
+ Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}",
+ Oplog->GetOuterProjectIdentifier(),
+ Oplog->OplogId(),
+ ActualRemoteStore->GetInfo().Description));
+
+ RemoteProjectStore::Result Result =
+ LoadOplog(m_CidStore, *ActualRemoteStore, *Oplog, Force, IgnoreMissingAttachments, CleanOplog, &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
+ (int)Response.first);
+ }
+ });
+
+ return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id));
+ }
+ case HashStringDjb2("export"sv):
+ {
+ CbObjectView Params = Cb["params"sv].AsObjectView();
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
+ size_t MaxChunksPerBlock = Params["maxchunksperblock"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunksPerBlock);
+ size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit);
+ bool Force = Params["force"sv].AsBool(false);
+ bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
+ bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
+
+ CreateRemoteStoreResult RemoteStoreResult =
+ CreateRemoteStore(Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath());
+
+ if (RemoteStoreResult.Store == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description);
+ }
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ JobId JobId = m_JobQueue.QueueJob(
+ fmt::format("Export oplog '{}/{}'", Project->Identifier, Oplog->OplogId()),
+ [this,
+ ActualRemoteStore = std::move(RemoteStore),
+ Project,
+ Oplog,
+ MaxBlockSize,
+ MaxChunksPerBlock,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ EmbedLooseFile,
+ Force,
+ IgnoreMissingAttachments](JobContext& Context) {
+ Context.ReportMessage(fmt::format("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}",
+ Project->Identifier,
+ Oplog->OplogId(),
+ ActualRemoteStore->GetInfo().Description,
+ NiceBytes(MaxBlockSize),
+ NiceBytes(MaxChunkEmbedSize)));
+
+ RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ *ActualRemoteStore,
+ *Project,
+ *Oplog,
+ MaxBlockSize,
+ MaxChunksPerBlock,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ EmbedLooseFile,
+ Force,
+ IgnoreMissingAttachments,
+ &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
+ (int)Response.first);
+ }
+ });
+
+ return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id));
+ }
+ case HashStringDjb2("getchunks"sv):
+ {
+ RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u));
+ int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0);
+
+ std::vector<ProjectStore::ChunkRequest> Requests = m_ProjectStore->ParseChunksRequests(*Project, *Oplog, Cb);
+ std::vector<ProjectStore::ChunkResult> Results =
+ Requests.empty() ? std::vector<ProjectStore::ChunkResult>{} : m_ProjectStore->GetChunks(*Project, *Oplog, Requests);
+ CbPackage Response = m_ProjectStore->WriteChunksRequestResponse(*Project, *Oplog, std::move(Requests), std::move(Results));
+
+ void* TargetProcessHandle = nullptr;
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ {
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
+ TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId);
+ }
+
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(Response, Flags, TargetProcessHandle);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ case HashStringDjb2("putchunks"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::putchunks");
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ CbObject Object = Package.GetObject();
+ const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false);
+
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ if (!Attachments.empty())
+ {
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ WriteAttachmentBuffers.reserve(Attachments.size());
+ WriteRawHashes.reserve(Attachments.size());
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ IoHash RawHash = Attachment.GetHash();
+ const CompressedBuffer& Compressed = Attachment.AsCompressedBinary();
+ IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ if (UsingTempFiles)
+ {
+ AttachmentPayload.SetDeleteOnClose(true);
+ }
+ WriteAttachmentBuffers.push_back(std::move(AttachmentPayload));
+ WriteRawHashes.push_back(RawHash);
+ }
+
+ Oplog->CaptureAddedAttachments(WriteRawHashes);
+ m_CidStore.AddChunks(WriteAttachmentBuffers,
+ WriteRawHashes,
+ UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ }
+ case HashStringDjb2("snapshot"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::snapshot");
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ // Snapshot all referenced files. This brings the content of all
+ // files into the CID store
+
+ uint32_t OpCount = 0;
+ uint64_t InlinedBytes = 0;
+ uint64_t InlinedFiles = 0;
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
+
+ std::vector<CbObject> NewOps;
+ struct AddedChunk
+ {
+ IoBuffer Buffer;
+ uint64_t RawSize = 0;
+ };
+ tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks;
+
+ Oplog->IterateOplog(
+ [&](CbObjectView Op) {
+ bool OpRewritten = false;
+ bool AllOk = true;
+
+ CbWriter FilesArrayWriter;
+ FilesArrayWriter.BeginArray("files"sv);
+
+ for (CbFieldView& Field : Op["files"sv])
+ {
+ bool CopyField = true;
+
+ if (CbObjectView View = Field.AsObjectView())
+ {
+ const IoHash DataHash = View["data"sv].AsHash();
+
+ if (DataHash == IoHash::Zero)
+ {
+ std::string_view ServerPath = View["serverpath"sv].AsString();
+ std::filesystem::path FilePath = Project->RootDir / ServerPath;
+ BasicFile DataFile;
+ std::error_code Ec;
+ DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
+
+ if (Ec)
+ {
+ // Error...
+
+ ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message());
+
+ AllOk = false;
+ }
+ else
+ {
+ // Read file contents into memory, compress and keep in map of chunks to add to Cid store
+ IoBuffer FileIoBuffer = DataFile.ReadAll();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
+ const uint64_t RawSize = Compressed.DecodeRawSize();
+ const IoHash RawHash = Compressed.DecodeRawHash();
+ if (!AddedChunks.contains(RawHash))
+ {
+ const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString();
+ BasicFile ChunkTempFile;
+ ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete);
+ ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec);
+ if (Ec)
+ {
+ Oid ChunkId = View["id"sv].AsObjectId();
+ ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}",
+ FilePath,
+ ChunkId,
+ Ec.message());
+ AllOk = false;
+ }
+ else
+ {
+ void* FileHandle = ChunkTempFile.Detach();
+ IoBuffer ChunkBuffer(IoBuffer::File,
+ FileHandle,
+ 0,
+ Compressed.GetCompressed().GetSize(),
+ /*IsWholeFile*/ true);
+ ChunkBuffer.SetDeleteOnClose(true);
+ AddedChunks.insert_or_assign(
+ RawHash,
+ AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize});
+ }
+ }
+
+ TotalBytes += RawSize;
+ ++TotalFiles;
+
+ // Rewrite file array entry with new data reference
+ CbObjectWriter Writer(View.GetSize());
+ RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ // omit this field as we will write it explicitly ourselves
+ return true;
+ }
+ return false;
+ });
+ Writer.AddBinaryAttachment("data"sv, RawHash);
+
+ CbObject RewrittenOp = Writer.Save();
+ FilesArrayWriter.AddObject(std::move(RewrittenOp));
+ CopyField = false;
+ }
+ }
+ }
+
+ if (CopyField)
+ {
+ FilesArrayWriter.AddField(Field);
+ }
+ else
+ {
+ OpRewritten = true;
+ }
+ }
+
+ if (OpRewritten && AllOk)
+ {
+ FilesArrayWriter.EndArray();
+ CbArray FilesArray = FilesArrayWriter.Save().AsArray();
+
+ CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
+ if (Field.GetName() == "files"sv)
+ {
+ NewWriter.AddArray("files"sv, FilesArray);
+
+ return true;
+ }
+
+ return false;
+ });
+
+ NewOps.push_back(std::move(RewrittenOp));
+ }
+
+ OpCount++;
+ },
+ ProjectStore::Oplog::Paging{});
+
+ CbObjectWriter ResponseObj;
+
+ // Persist rewritten oplog entries
+ if (!NewOps.empty())
+ {
+ ResponseObj.BeginArray("rewritten_ops");
+
+ for (CbObject& NewOp : NewOps)
+ {
+ ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
+
+ ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number);
+
+ ResponseObj.AddInteger(NewLsn.Number);
+ }
+
+ ResponseObj.EndArray();
+ }
+
+ // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the
+ // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have
+ // unreferenced chunks.
+ for (auto It : AddedChunks)
+ {
+ const IoHash& RawHash = It.first;
+ AddedChunk& Chunk = It.second;
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash);
+ if (Result.New)
+ {
+ InlinedBytes += Chunk.RawSize;
+ ++InlinedFiles;
+ }
+ }
+
+ ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
+ ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;
+
+ ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save());
+ }
+ case HashStringDjb2("appendops"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::appendops");
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ CbArrayView OpsArray = Cb["ops"sv].AsArrayView();
+
+ size_t OpsBufferSize = 0;
+ for (CbFieldView OpView : OpsArray)
+ {
+ OpsBufferSize += OpView.GetSize();
+ }
+ UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize);
+ MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView();
+
+ std::vector<CbObjectView> Ops;
+ Ops.reserve(OpsArray.Num());
+ for (CbFieldView OpView : OpsArray)
+ {
+ OpView.CopyTo(OpsBuffersMemory);
+ Ops.push_back(CbObjectView(OpsBuffersMemory.GetData()));
+ OpsBuffersMemory.MidInline(OpView.GetSize());
+ }
+
+ std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops);
+ ZEN_ASSERT(LSNs.size() == Ops.size());
+
+ std::vector<IoHash> MissingAttachments;
+ for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++)
+ {
+ if (LSNs[OpIndex])
+ {
+ CbObjectView Op = Ops[OpIndex];
+ Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) {
+ const IoHash Cid = AttachmentView.AsAttachment();
+ if (!m_CidStore.ContainsChunk(Cid))
+ {
+ MissingAttachments.push_back(Cid);
+ }
+ });
+ }
+ }
+
+ CbPackage ResponsePackage;
+
+ {
+ CbObjectWriter ResponseObj;
+ ResponseObj.BeginArray("written_ops");
+
+ for (ProjectStore::LogSequenceNumber NewLsn : LSNs)
+ {
+ ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number);
+ ResponseObj.AddInteger(NewLsn.Number);
+ }
+ ResponseObj.EndArray();
+
+ if (!MissingAttachments.empty())
+ {
+ ResponseObj.BeginArray("need");
+
+ for (const IoHash& Cid : MissingAttachments)
+ {
+ ResponseObj.AddHash(Cid);
+ }
+ ResponseObj.EndArray();
+ }
+ ResponsePackage.SetObject(ResponseObj.Save());
+ }
+
+ std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers);
+ }
+ default:
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Unknown rpc method '{}'", Method));
}
}
void