aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-02 15:56:00 +0200
committerGitHub Enterprise <[email protected]>2025-10-02 15:56:00 +0200
commitadbec59ee0b169ba0ac9d167eac4ed72edefe1ca (patch)
treec05d4e5624d98272969f6509fe5cf8a75d6ebf9d /src
parentZs/OIDC exe path handling (#538) (diff)
downloadzen-adbec59ee0b169ba0ac9d167eac4ed72edefe1ca.tar.xz
zen-adbec59ee0b169ba0ac9d167eac4ed72edefe1ca.zip
projectstore refactor phase 2 (#539)
Refactor projectstore/httpprojectservice to prepare for move of projectstore to zenstore
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.cpp1
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp895
-rw-r--r--src/zenserver/projectstore/httpprojectstore.h8
-rw-r--r--src/zenserver/projectstore/projectstore.cpp2025
-rw-r--r--src/zenserver/projectstore/projectstore.h62
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp197
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h2
-rw-r--r--src/zenserver/zenserver.cpp15
8 files changed, 1524 insertions, 1681 deletions
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp
index e550222fd..348f5fb8a 100644
--- a/src/zenserver/projectstore/fileremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp
@@ -8,6 +8,7 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/timer.h>
+#include <zenhttp/httpcommon.h>
namespace zen {
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index 2a1bf9a5e..175d131e1 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -14,11 +14,20 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/memory/llm.h>
+#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/trace.h>
+#include <zenhttp/packageformat.h>
#include <zenstore/zenstore.h>
+#include <zenutil/openprocesscache.h>
#include <zenutil/workerpools.h>
+#include "buildsremoteprojectstore.h"
+#include "fileremoteprojectstore.h"
+#include "jupiterremoteprojectstore.h"
+#include "remoteprojectstore.h"
+#include "zenremoteprojectstore.h"
+
namespace zen {
const FLLMTag&
@@ -233,6 +242,264 @@ namespace {
Cbo.EndObject();
}
+ struct CreateRemoteStoreResult
+ {
+ std::shared_ptr<RemoteProjectStore> Store;
+ std::string Description;
+ };
+
+ CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params,
+ AuthMgr& AuthManager,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ const std::filesystem::path& TempFilePath)
+ {
+ ZEN_MEMSCOPE(GetProjectHttpTag());
+
+ using namespace std::literals;
+
+ std::shared_ptr<RemoteProjectStore> RemoteStore;
+
+ if (CbObjectView File = Params["file"sv].AsObjectView(); File)
+ {
+ std::filesystem::path FolderPath(File["path"sv].AsString());
+ if (FolderPath.empty())
+ {
+ return {nullptr, "Missing file path"};
+ }
+ std::string_view Name(File["name"sv].AsString());
+ if (Name.empty())
+ {
+ return {nullptr, "Missing file name"};
+ }
+ std::string_view OptionalBaseName(File["basename"sv].AsString());
+ bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false);
+ bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false);
+
+ FileRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ FolderPath,
+ std::string(Name),
+ std::string(OptionalBaseName),
+ ForceDisableBlocks,
+ ForceEnableTempBlocks};
+ RemoteStore = CreateFileRemoteStore(Options);
+ }
+
+ if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud)
+ {
+ std::string_view CloudServiceUrl = Cloud["url"sv].AsString();
+ if (CloudServiceUrl.empty())
+ {
+ return {nullptr, "Missing service url"};
+ }
+
+ std::string Url = UrlDecode(CloudServiceUrl);
+ std::string_view Namespace = Cloud["namespace"sv].AsString();
+ if (Namespace.empty())
+ {
+ return {nullptr, "Missing namespace"};
+ }
+ std::string_view Bucket = Cloud["bucket"sv].AsString();
+ if (Bucket.empty())
+ {
+ return {nullptr, "Missing bucket"};
+ }
+ std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString();
+ std::string AccessToken = std::string(Cloud["access-token"sv].AsString());
+ if (AccessToken.empty())
+ {
+ std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString();
+ if (!AccessTokenEnvVariable.empty())
+ {
+ AccessToken = GetEnvVariable(AccessTokenEnvVariable);
+ }
+ }
+ std::filesystem::path OidcExePath;
+ if (std::string_view OidcExePathString = Cloud["oidc-exe-path"].AsString(); !OidcExePathString.empty())
+ {
+ std::filesystem::path OidcExePathMaybe(OidcExePathString);
+ if (IsFile(OidcExePathMaybe))
+ {
+ OidcExePath = std::move(OidcExePathMaybe);
+ }
+ else
+ {
+ ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString);
+ }
+ }
+ std::string_view KeyParam = Cloud["key"sv].AsString();
+ if (KeyParam.empty())
+ {
+ return {nullptr, "Missing key"};
+ }
+ if (KeyParam.length() != IoHash::StringLength)
+ {
+ return {nullptr, "Invalid key"};
+ }
+ IoHash Key = IoHash::FromHexString(KeyParam);
+ if (Key == IoHash::Zero)
+ {
+ return {nullptr, "Invalid key string"};
+ }
+ IoHash BaseKey = IoHash::Zero;
+ std::string_view BaseKeyParam = Cloud["basekey"sv].AsString();
+ if (!BaseKeyParam.empty())
+ {
+ if (BaseKeyParam.length() != IoHash::StringLength)
+ {
+ return {nullptr, "Invalid base key"};
+ }
+ BaseKey = IoHash::FromHexString(BaseKeyParam);
+ if (BaseKey == IoHash::Zero)
+ {
+ return {nullptr, "Invalid base key string"};
+ }
+ }
+
+ bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false);
+ bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false);
+ bool AssumeHttp2 = Cloud["assumehttp2"sv].AsBool(false);
+
+ JupiterRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ Url,
+ std::string(Namespace),
+ std::string(Bucket),
+ Key,
+ BaseKey,
+ std::string(OpenIdProvider),
+ AccessToken,
+ AuthManager,
+ OidcExePath,
+ ForceDisableBlocks,
+ ForceDisableTempBlocks,
+ AssumeHttp2};
+ RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false);
+ }
+
+ if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen)
+ {
+ std::string_view Url = Zen["url"sv].AsString();
+ std::string_view Project = Zen["project"sv].AsString();
+ if (Project.empty())
+ {
+ return {nullptr, "Missing project"};
+ }
+ std::string_view Oplog = Zen["oplog"sv].AsString();
+ if (Oplog.empty())
+ {
+ return {nullptr, "Missing oplog"};
+ }
+ ZenRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ std::string(Url),
+ std::string(Project),
+ std::string(Oplog)};
+ RemoteStore = CreateZenRemoteStore(Options, TempFilePath);
+ }
+
+ if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds)
+ {
+ std::string_view BuildsServiceUrl = Builds["url"sv].AsString();
+ if (BuildsServiceUrl.empty())
+ {
+ return {nullptr, "Missing service url"};
+ }
+
+ std::string Url = UrlDecode(BuildsServiceUrl);
+ std::string_view Namespace = Builds["namespace"sv].AsString();
+ if (Namespace.empty())
+ {
+ return {nullptr, "Missing namespace"};
+ }
+ std::string_view Bucket = Builds["bucket"sv].AsString();
+ if (Bucket.empty())
+ {
+ return {nullptr, "Missing bucket"};
+ }
+ std::string_view OpenIdProvider = Builds["openid-provider"sv].AsString();
+ std::string AccessToken = std::string(Builds["access-token"sv].AsString());
+ if (AccessToken.empty())
+ {
+ std::string_view AccessTokenEnvVariable = Builds["access-token-env"].AsString();
+ if (!AccessTokenEnvVariable.empty())
+ {
+ AccessToken = GetEnvVariable(AccessTokenEnvVariable);
+ }
+ }
+ std::filesystem::path OidcExePath;
+ if (std::string_view OidcExePathString = Builds["oidc-exe-path"].AsString(); !OidcExePathString.empty())
+ {
+ std::filesystem::path OidcExePathMaybe(OidcExePathString);
+ if (IsFile(OidcExePathMaybe))
+ {
+ OidcExePath = std::move(OidcExePathMaybe);
+ }
+ else
+ {
+ ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString);
+ }
+ }
+ std::string_view BuildIdParam = Builds["buildsid"sv].AsString();
+ if (BuildIdParam.empty())
+ {
+ return {nullptr, "Missing build id"};
+ }
+ if (BuildIdParam.length() != Oid::StringLength)
+ {
+ return {nullptr, "Invalid build id"};
+ }
+ Oid BuildId = Oid::FromHexString(BuildIdParam);
+ if (BuildId == Oid::Zero)
+ {
+ return {nullptr, "Invalid build id string"};
+ }
+
+ bool ForceDisableBlocks = Builds["disableblocks"sv].AsBool(false);
+ bool ForceDisableTempBlocks = Builds["disabletempblocks"sv].AsBool(false);
+ bool AssumeHttp2 = Builds["assumehttp2"sv].AsBool(false);
+
+ MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView();
+ IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize());
+
+ BuildsRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ Url,
+ std::string(Namespace),
+ std::string(Bucket),
+ BuildId,
+ std::string(OpenIdProvider),
+ AccessToken,
+ AuthManager,
+ OidcExePath,
+ ForceDisableBlocks,
+ ForceDisableTempBlocks,
+ AssumeHttp2,
+ MetaData};
+ RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false);
+ }
+
+ if (!RemoteStore)
+ {
+ return {nullptr, "Unknown remote store type"};
+ }
+
+ return {std::move(RemoteStore), ""};
+ }
+
+ std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result)
+ {
+ if (Result.ErrorCode == 0)
+ {
+ return {HttpResponseCode::OK, Result.Text};
+ }
+ return {static_cast<HttpResponseCode>(Result.ErrorCode),
+ Result.Reason.empty() ? Result.Text
+ : Result.Text.empty() ? Result.Reason
+ : fmt::format("{}: {}", Result.Reason, Result.Text)};
+ }
+
} // namespace
//////////////////////////////////////////////////////////////////////////
@@ -241,13 +508,17 @@ HttpProjectService::HttpProjectService(CidStore& Store,
ProjectStore* Projects,
HttpStatusService& StatusService,
HttpStatsService& StatsService,
- AuthMgr& AuthMgr)
+ AuthMgr& AuthMgr,
+ OpenProcessCache& InOpenProcessCache,
+ JobQueue& InJobQueue)
: m_Log(logging::Get("project"))
, m_CidStore(Store)
, m_ProjectStore(Projects)
, m_StatusService(StatusService)
, m_StatsService(StatsService)
, m_AuthMgr(AuthMgr)
+, m_OpenProcessCache(InOpenProcessCache)
+, m_JobQueue(InJobQueue)
{
ZEN_MEMSCOPE(GetProjectHttpTag());
@@ -2044,19 +2315,55 @@ HttpProjectService::HandleOplogSaveRequest(HttpRouterRequest& Req)
if (CbObject ContainerObject = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult);
ValidateResult == CbValidateError::None && ContainerObject)
{
- ProjectStore::WriteOplogResult Result = m_ProjectStore->WriteOplog(*Project, *Oplog, ContainerObject);
+ RwLock AttachmentsLock;
+ tsl::robin_set<IoHash, IoHash::Hasher> Attachments;
+
+ auto HasAttachment = [this](const IoHash& RawHash) { return m_CidStore.ContainsChunk(RawHash); };
+ auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ if (BlockHash != IoHash::Zero)
+ {
+ Attachments.insert(BlockHash);
+ }
+ else
+ {
+ Attachments.insert(ChunkHashes.begin(), ChunkHashes.end());
+ }
+ };
+ auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ Attachments.insert(RawHash);
+ };
+
+ auto OnChunkedAttachment = [](const ChunkedInfo&) {};
+
+ auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog->CaptureAddedAttachments(RawHashes); };
+
+ // Make sure we retain any attachments we download before writing the oplog
+ Oplog->EnableUpdateCapture();
+ auto _ = MakeGuard([&Oplog]() { Oplog->DisableUpdateCapture(); });
+
+ RemoteProjectStore::Result Result = SaveOplogContainer(*Oplog,
+ ContainerObject,
+ OnReferencedAttachments,
+ HasAttachment,
+ OnNeedBlock,
+ OnNeedAttachment,
+ OnChunkedAttachment,
+ nullptr);
+
if (Result.ErrorCode == 0)
{
- if (Result.Need.empty())
+ if (Attachments.empty())
{
HttpReq.WriteResponse(HttpResponseCode::OK);
}
else
{
- CbObjectWriter Cbo(1 + 1 + 5 + Result.Need.size() * (1 + sizeof(IoHash::Hash)) + 1);
+ CbObjectWriter Cbo(1 + 1 + 5 + Attachments.size() * (1 + sizeof(IoHash::Hash)) + 1);
Cbo.BeginArray("need");
{
- for (const IoHash& Hash : Result.Need)
+ for (const IoHash& Hash : Attachments)
{
ZEN_DEBUG("Need attachment {}", Hash);
Cbo << Hash;
@@ -2074,15 +2381,15 @@ HttpProjectService::HandleOplogSaveRequest(HttpRouterRequest& Req)
ToString(HttpReq.RequestVerb()),
HttpReq.QueryString(),
Result.ErrorCode,
- Result.ErrorDescription);
+ Result.Reason);
- if (Result.ErrorDescription.empty())
+ if (Result.Reason.empty())
{
return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode));
}
else
{
- return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.ErrorDescription);
+ return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.Reason);
}
}
}
@@ -2164,27 +2471,41 @@ HttpProjectService::HandleOplogLoadRequest(HttpRouterRequest& Req)
}
}
- ProjectStore::ReadOplogResult Result =
- m_ProjectStore->ReadOplog(*Project, *Oplog, MaxBlockSize, MaxChunkEmbedSize, MaxChunksPerBlock, ChunkFileSizeLimit);
- if (Result.ErrorCode == 0)
+ RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
+ m_CidStore,
+ *Project,
+ *Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ MaxChunksPerBlock,
+ ChunkFileSizeLimit,
+ /* BuildBlocks */ false,
+ /* IgnoreMissingAttachments */ false,
+ /* AllowChunking*/ false,
+ [](CompressedBuffer&&, ChunkBlockDescription&&) {},
+ [](const IoHash&, TGetAttachmentBufferFunc&&) {},
+ [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {},
+ /* EmbedLooseFiles*/ false);
+
+ if (ContainerResult.ErrorCode == 0)
{
- return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ContainerObject);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerResult.ContainerObject);
}
else
{
ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
ToString(HttpReq.RequestVerb()),
HttpReq.QueryString(),
- Result.ErrorCode,
- Result.ErrorDescription);
+ ContainerResult.ErrorCode,
+ ContainerResult.Reason);
- if (Result.ErrorDescription.empty())
+ if (ContainerResult.Reason.empty())
{
- return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode));
+ return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode));
}
else
{
- return HttpReq.WriteResponse(HttpResponseCode(Result.ErrorCode), HttpContentType::kText, Result.ErrorDescription);
+ return HttpReq.WriteResponse(HttpResponseCode(ContainerResult.ErrorCode), HttpContentType::kText, ContainerResult.Reason);
}
}
}
@@ -2193,6 +2514,7 @@ void
HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req)
{
ZEN_TRACE_CPU("ProjectService::Rpc");
+ using namespace std::literals;
HttpServerRequest& HttpReq = Req.ServerRequest();
@@ -2200,10 +2522,543 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req)
const auto& OplogId = Req.GetCapture(2);
IoBuffer Payload = HttpReq.ReadPayload();
- bool OkRequest = m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr);
- if (!OkRequest)
+ HttpContentType PayloadContentType = HttpReq.RequestContentType();
+ CbPackage Package;
+ CbObject Cb;
+ switch (PayloadContentType)
{
- m_ProjectStats.BadRequestCount++;
+ case HttpContentType::kJSON:
+ case HttpContentType::kUnknownContentType:
+ case HttpContentType::kText:
+ {
+ std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
+ Cb = LoadCompactBinaryFromJson(JsonText).AsObject();
+ if (!Cb)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected JSON format");
+ }
+ }
+ break;
+ case HttpContentType::kCbObject:
+ {
+ CbValidateError ValidateResult;
+ if (Cb = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult);
+ ValidateResult != CbValidateError::None || !Cb)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(
+ HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Content format not supported, expected compact binary format ('{}')", ToString(ValidateResult)));
+ }
+ break;
+ }
+ case HttpContentType::kCbPackage:
+ try
+ {
+ Package = ParsePackageMessage(Payload);
+ Cb = Package.GetObject();
+ }
+ catch (const std::invalid_argument& ex)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Failed to parse package request, reason: '{}'", ex.what()));
+ }
+ if (!Cb)
+ {
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected package message format");
+ }
+ break;
+ default:
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type");
+ }
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Rpc oplog request for unknown project '{}'", ProjectId));
+ }
+ Project->TouchProject();
+
+ std::string_view Method = Cb["method"sv].AsString();
+
+ bool VerifyPathOnDisk = Method != "getchunks"sv;
+
+ Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk);
+ if (!Oplog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+ Project->TouchOplog(OplogId);
+
+ uint32_t MethodHash = HashStringDjb2(Method);
+
+ switch (MethodHash)
+ {
+ case HashStringDjb2("import"sv):
+ {
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ CbObjectView Params = Cb["params"sv].AsObjectView();
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
+ bool Force = Params["force"sv].AsBool(false);
+ bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
+ bool CleanOplog = Params["clean"].AsBool(false);
+
+ CreateRemoteStoreResult RemoteStoreResult =
+ CreateRemoteStore(Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath());
+
+ if (RemoteStoreResult.Store == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description);
+ }
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ JobId JobId = m_JobQueue.QueueJob(
+ fmt::format("Import oplog '{}/{}'", Project->Identifier, Oplog->OplogId()),
+ [this,
+ ChunkStore = &m_CidStore,
+ ActualRemoteStore = std::move(RemoteStore),
+ Oplog,
+ Force,
+ IgnoreMissingAttachments,
+ CleanOplog](JobContext& Context) {
+ Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}",
+ Oplog->GetOuterProjectIdentifier(),
+ Oplog->OplogId(),
+ ActualRemoteStore->GetInfo().Description));
+
+ RemoteProjectStore::Result Result =
+ LoadOplog(m_CidStore, *ActualRemoteStore, *Oplog, Force, IgnoreMissingAttachments, CleanOplog, &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
+ (int)Response.first);
+ }
+ });
+
+ return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id));
+ }
+ case HashStringDjb2("export"sv):
+ {
+ CbObjectView Params = Cb["params"sv].AsObjectView();
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
+ size_t MaxChunksPerBlock = Params["maxchunksperblock"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunksPerBlock);
+ size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit);
+ bool Force = Params["force"sv].AsBool(false);
+ bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
+ bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
+
+ CreateRemoteStoreResult RemoteStoreResult =
+ CreateRemoteStore(Params, m_AuthMgr, MaxBlockSize, MaxChunkEmbedSize, Oplog->TempPath());
+
+ if (RemoteStoreResult.Store == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description);
+ }
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ JobId JobId = m_JobQueue.QueueJob(
+ fmt::format("Export oplog '{}/{}'", Project->Identifier, Oplog->OplogId()),
+ [this,
+ ActualRemoteStore = std::move(RemoteStore),
+ Project,
+ Oplog,
+ MaxBlockSize,
+ MaxChunksPerBlock,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ EmbedLooseFile,
+ Force,
+ IgnoreMissingAttachments](JobContext& Context) {
+ Context.ReportMessage(fmt::format("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}",
+ Project->Identifier,
+ Oplog->OplogId(),
+ ActualRemoteStore->GetInfo().Description,
+ NiceBytes(MaxBlockSize),
+ NiceBytes(MaxChunkEmbedSize)));
+
+ RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ *ActualRemoteStore,
+ *Project,
+ *Oplog,
+ MaxBlockSize,
+ MaxChunksPerBlock,
+ MaxChunkEmbedSize,
+ ChunkFileSizeLimit,
+ EmbedLooseFile,
+ Force,
+ IgnoreMissingAttachments,
+ &Context);
+ auto Response = ConvertResult(Result);
+ ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
+ if (!IsHttpSuccessCode(Response.first))
+ {
+ throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
+ (int)Response.first);
+ }
+ });
+
+ return HttpReq.WriteResponse(HttpResponseCode::Accepted, HttpContentType::kText, fmt::format("{}", JobId.Id));
+ }
+ case HashStringDjb2("getchunks"sv):
+ {
+ RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u));
+ int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0);
+
+ std::vector<ProjectStore::ChunkRequest> Requests = m_ProjectStore->ParseChunksRequests(*Project, *Oplog, Cb);
+ std::vector<ProjectStore::ChunkResult> Results =
+ Requests.empty() ? std::vector<ProjectStore::ChunkResult>{} : m_ProjectStore->GetChunks(*Project, *Oplog, Requests);
+ CbPackage Response = m_ProjectStore->WriteChunksRequestResponse(*Project, *Oplog, std::move(Requests), std::move(Results));
+
+ void* TargetProcessHandle = nullptr;
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ {
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
+ TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId);
+ }
+
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(Response, Flags, TargetProcessHandle);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ case HashStringDjb2("putchunks"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::putchunks");
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ CbObject Object = Package.GetObject();
+ const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false);
+
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ if (!Attachments.empty())
+ {
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+
+ WriteAttachmentBuffers.reserve(Attachments.size());
+ WriteRawHashes.reserve(Attachments.size());
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ IoHash RawHash = Attachment.GetHash();
+ const CompressedBuffer& Compressed = Attachment.AsCompressedBinary();
+ IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ if (UsingTempFiles)
+ {
+ AttachmentPayload.SetDeleteOnClose(true);
+ }
+ WriteAttachmentBuffers.push_back(std::move(AttachmentPayload));
+ WriteRawHashes.push_back(RawHash);
+ }
+
+ Oplog->CaptureAddedAttachments(WriteRawHashes);
+ m_CidStore.AddChunks(WriteAttachmentBuffers,
+ WriteRawHashes,
+ UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ }
+ case HashStringDjb2("snapshot"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::snapshot");
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ // Snapshot all referenced files. This brings the content of all
+ // files into the CID store
+
+ uint32_t OpCount = 0;
+ uint64_t InlinedBytes = 0;
+ uint64_t InlinedFiles = 0;
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
+
+ std::vector<CbObject> NewOps;
+ struct AddedChunk
+ {
+ IoBuffer Buffer;
+ uint64_t RawSize = 0;
+ };
+ tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks;
+
+ Oplog->IterateOplog(
+ [&](CbObjectView Op) {
+ bool OpRewritten = false;
+ bool AllOk = true;
+
+ CbWriter FilesArrayWriter;
+ FilesArrayWriter.BeginArray("files"sv);
+
+ for (CbFieldView& Field : Op["files"sv])
+ {
+ bool CopyField = true;
+
+ if (CbObjectView View = Field.AsObjectView())
+ {
+ const IoHash DataHash = View["data"sv].AsHash();
+
+ if (DataHash == IoHash::Zero)
+ {
+ std::string_view ServerPath = View["serverpath"sv].AsString();
+ std::filesystem::path FilePath = Project->RootDir / ServerPath;
+ BasicFile DataFile;
+ std::error_code Ec;
+ DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
+
+ if (Ec)
+ {
+ // Error...
+
+ ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message());
+
+ AllOk = false;
+ }
+ else
+ {
+ // Read file contents into memory, compress and keep in map of chunks to add to Cid store
+ IoBuffer FileIoBuffer = DataFile.ReadAll();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
+ const uint64_t RawSize = Compressed.DecodeRawSize();
+ const IoHash RawHash = Compressed.DecodeRawHash();
+ if (!AddedChunks.contains(RawHash))
+ {
+ const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString();
+ BasicFile ChunkTempFile;
+ ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete);
+ ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec);
+ if (Ec)
+ {
+ Oid ChunkId = View["id"sv].AsObjectId();
+ ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}",
+ FilePath,
+ ChunkId,
+ Ec.message());
+ AllOk = false;
+ }
+ else
+ {
+ void* FileHandle = ChunkTempFile.Detach();
+ IoBuffer ChunkBuffer(IoBuffer::File,
+ FileHandle,
+ 0,
+ Compressed.GetCompressed().GetSize(),
+ /*IsWholeFile*/ true);
+ ChunkBuffer.SetDeleteOnClose(true);
+ AddedChunks.insert_or_assign(
+ RawHash,
+ AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize});
+ }
+ }
+
+ TotalBytes += RawSize;
+ ++TotalFiles;
+
+ // Rewrite file array entry with new data reference
+ CbObjectWriter Writer(View.GetSize());
+ RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ // omit this field as we will write it explicitly ourselves
+ return true;
+ }
+ return false;
+ });
+ Writer.AddBinaryAttachment("data"sv, RawHash);
+
+ CbObject RewrittenOp = Writer.Save();
+ FilesArrayWriter.AddObject(std::move(RewrittenOp));
+ CopyField = false;
+ }
+ }
+ }
+
+ if (CopyField)
+ {
+ FilesArrayWriter.AddField(Field);
+ }
+ else
+ {
+ OpRewritten = true;
+ }
+ }
+
+ if (OpRewritten && AllOk)
+ {
+ FilesArrayWriter.EndArray();
+ CbArray FilesArray = FilesArrayWriter.Save().AsArray();
+
+ CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
+ if (Field.GetName() == "files"sv)
+ {
+ NewWriter.AddArray("files"sv, FilesArray);
+
+ return true;
+ }
+
+ return false;
+ });
+
+ NewOps.push_back(std::move(RewrittenOp));
+ }
+
+ OpCount++;
+ },
+ ProjectStore::Oplog::Paging{});
+
+ CbObjectWriter ResponseObj;
+
+ // Persist rewritten oplog entries
+ if (!NewOps.empty())
+ {
+ ResponseObj.BeginArray("rewritten_ops");
+
+ for (CbObject& NewOp : NewOps)
+ {
+ ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
+
+ ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number);
+
+ ResponseObj.AddInteger(NewLsn.Number);
+ }
+
+ ResponseObj.EndArray();
+ }
+
+ // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the
+ // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have
+ // unreferenced chunks.
+ for (auto It : AddedChunks)
+ {
+ const IoHash& RawHash = It.first;
+ AddedChunk& Chunk = It.second;
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash);
+ if (Result.New)
+ {
+ InlinedBytes += Chunk.RawSize;
+ ++InlinedFiles;
+ }
+ }
+
+ ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
+ ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;
+
+ ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save());
+ }
+ case HashStringDjb2("appendops"sv):
+ {
+ ZEN_TRACE_CPU("Store::Rpc::appendops");
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ CbArrayView OpsArray = Cb["ops"sv].AsArrayView();
+
+ size_t OpsBufferSize = 0;
+ for (CbFieldView OpView : OpsArray)
+ {
+ OpsBufferSize += OpView.GetSize();
+ }
+ UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize);
+ MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView();
+
+ std::vector<CbObjectView> Ops;
+ Ops.reserve(OpsArray.Num());
+ for (CbFieldView OpView : OpsArray)
+ {
+ OpView.CopyTo(OpsBuffersMemory);
+ Ops.push_back(CbObjectView(OpsBuffersMemory.GetData()));
+ OpsBuffersMemory.MidInline(OpView.GetSize());
+ }
+
+ std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops);
+ ZEN_ASSERT(LSNs.size() == Ops.size());
+
+ std::vector<IoHash> MissingAttachments;
+ for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++)
+ {
+ if (LSNs[OpIndex])
+ {
+ CbObjectView Op = Ops[OpIndex];
+ Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) {
+ const IoHash Cid = AttachmentView.AsAttachment();
+ if (!m_CidStore.ContainsChunk(Cid))
+ {
+ MissingAttachments.push_back(Cid);
+ }
+ });
+ }
+ }
+
+ CbPackage ResponsePackage;
+
+ {
+ CbObjectWriter ResponseObj;
+ ResponseObj.BeginArray("written_ops");
+
+ for (ProjectStore::LogSequenceNumber NewLsn : LSNs)
+ {
+ ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number);
+ ResponseObj.AddInteger(NewLsn.Number);
+ }
+ ResponseObj.EndArray();
+
+ if (!MissingAttachments.empty())
+ {
+ ResponseObj.BeginArray("need");
+
+ for (const IoHash& Cid : MissingAttachments)
+ {
+ ResponseObj.AddHash(Cid);
+ }
+ ResponseObj.EndArray();
+ }
+ ResponsePackage.SetObject(ResponseObj.Save());
+ }
+
+ std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers);
+ }
+ default:
+ m_ProjectStats.BadRequestCount++;
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Unknown rpc method '{}'", Method));
}
}
void
diff --git a/src/zenserver/projectstore/httpprojectstore.h b/src/zenserver/projectstore/httpprojectstore.h
index 295defa5c..f0a0bcfa1 100644
--- a/src/zenserver/projectstore/httpprojectstore.h
+++ b/src/zenserver/projectstore/httpprojectstore.h
@@ -11,6 +11,8 @@
namespace zen {
class AuthMgr;
+class JobQueue;
+class OpenProcessCache;
class ProjectStore;
//////////////////////////////////////////////////////////////////////////
@@ -39,7 +41,9 @@ public:
ProjectStore* InProjectStore,
HttpStatusService& StatusService,
HttpStatsService& StatsService,
- AuthMgr& AuthMgr);
+ AuthMgr& AuthMgr,
+ OpenProcessCache& InOpenProcessCache,
+ JobQueue& InJobQueue);
~HttpProjectService();
virtual const char* BaseUri() const override;
@@ -98,6 +102,8 @@ private:
HttpStatusService& m_StatusService;
HttpStatsService& m_StatsService;
AuthMgr& m_AuthMgr;
+ OpenProcessCache& m_OpenProcessCache;
+ JobQueue& m_JobQueue;
ProjectStats m_ProjectStats;
metrics::OperationTiming m_HttpRequests;
};
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 181177653..5034a250a 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -3,36 +3,24 @@
#include "projectstore.h"
#include <zencore/assertfmt.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryutil.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
-#include <zencore/jobqueue.h>
#include <zencore/logging.h>
#include <zencore/memory/llm.h>
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenhttp/packageformat.h>
#include <zenstore/caslog.h>
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
-#include <zenutil/cache/rpcrecording.h>
-#include <zenutil/openprocesscache.h>
#include <zenutil/parallelwork.h>
#include <zenutil/referencemetadata.h>
#include <zenutil/workerpools.h>
-#include "buildsremoteprojectstore.h"
-#include "fileremoteprojectstore.h"
-#include "jupiterremoteprojectstore.h"
-#include "remoteprojectstore.h"
-#include "zenremoteprojectstore.h"
-
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_set.h>
#include <xxh3.h>
@@ -41,6 +29,9 @@ ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zenutil/chunkblock.h>
+
+# include <unordered_map>
#endif // ZEN_WITH_TESTS
namespace zen {
@@ -140,263 +131,6 @@ namespace {
return CheckWriteTime < ReferenceWriteTime;
}
- struct CreateRemoteStoreResult
- {
- std::shared_ptr<RemoteProjectStore> Store;
- std::string Description;
- };
- CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params,
- AuthMgr& AuthManager,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize,
- const std::filesystem::path& TempFilePath)
- {
- ZEN_MEMSCOPE(GetProjectstoreTag());
-
- using namespace std::literals;
-
- std::shared_ptr<RemoteProjectStore> RemoteStore;
-
- if (CbObjectView File = Params["file"sv].AsObjectView(); File)
- {
- std::filesystem::path FolderPath(File["path"sv].AsString());
- if (FolderPath.empty())
- {
- return {nullptr, "Missing file path"};
- }
- std::string_view Name(File["name"sv].AsString());
- if (Name.empty())
- {
- return {nullptr, "Missing file name"};
- }
- std::string_view OptionalBaseName(File["basename"sv].AsString());
- bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false);
- bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false);
-
- FileRemoteStoreOptions Options = {
- RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- FolderPath,
- std::string(Name),
- std::string(OptionalBaseName),
- ForceDisableBlocks,
- ForceEnableTempBlocks};
- RemoteStore = CreateFileRemoteStore(Options);
- }
-
- if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud)
- {
- std::string_view CloudServiceUrl = Cloud["url"sv].AsString();
- if (CloudServiceUrl.empty())
- {
- return {nullptr, "Missing service url"};
- }
-
- std::string Url = UrlDecode(CloudServiceUrl);
- std::string_view Namespace = Cloud["namespace"sv].AsString();
- if (Namespace.empty())
- {
- return {nullptr, "Missing namespace"};
- }
- std::string_view Bucket = Cloud["bucket"sv].AsString();
- if (Bucket.empty())
- {
- return {nullptr, "Missing bucket"};
- }
- std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString();
- std::string AccessToken = std::string(Cloud["access-token"sv].AsString());
- if (AccessToken.empty())
- {
- std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString();
- if (!AccessTokenEnvVariable.empty())
- {
- AccessToken = GetEnvVariable(AccessTokenEnvVariable);
- }
- }
- std::filesystem::path OidcExePath;
- if (std::string_view OidcExePathString = Cloud["oidc-exe-path"].AsString(); !OidcExePathString.empty())
- {
- std::filesystem::path OidcExePathMaybe(OidcExePathString);
- if (IsFile(OidcExePathMaybe))
- {
- OidcExePath = std::move(OidcExePathMaybe);
- }
- else
- {
- ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString);
- }
- }
- std::string_view KeyParam = Cloud["key"sv].AsString();
- if (KeyParam.empty())
- {
- return {nullptr, "Missing key"};
- }
- if (KeyParam.length() != IoHash::StringLength)
- {
- return {nullptr, "Invalid key"};
- }
- IoHash Key = IoHash::FromHexString(KeyParam);
- if (Key == IoHash::Zero)
- {
- return {nullptr, "Invalid key string"};
- }
- IoHash BaseKey = IoHash::Zero;
- std::string_view BaseKeyParam = Cloud["basekey"sv].AsString();
- if (!BaseKeyParam.empty())
- {
- if (BaseKeyParam.length() != IoHash::StringLength)
- {
- return {nullptr, "Invalid base key"};
- }
- BaseKey = IoHash::FromHexString(BaseKeyParam);
- if (BaseKey == IoHash::Zero)
- {
- return {nullptr, "Invalid base key string"};
- }
- }
-
- bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false);
- bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false);
- bool AssumeHttp2 = Cloud["assumehttp2"sv].AsBool(false);
-
- JupiterRemoteStoreOptions Options = {
- RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- Url,
- std::string(Namespace),
- std::string(Bucket),
- Key,
- BaseKey,
- std::string(OpenIdProvider),
- AccessToken,
- AuthManager,
- OidcExePath,
- ForceDisableBlocks,
- ForceDisableTempBlocks,
- AssumeHttp2};
- RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false);
- }
-
- if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen)
- {
- std::string_view Url = Zen["url"sv].AsString();
- std::string_view Project = Zen["project"sv].AsString();
- if (Project.empty())
- {
- return {nullptr, "Missing project"};
- }
- std::string_view Oplog = Zen["oplog"sv].AsString();
- if (Oplog.empty())
- {
- return {nullptr, "Missing oplog"};
- }
- ZenRemoteStoreOptions Options = {
- RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- std::string(Url),
- std::string(Project),
- std::string(Oplog)};
- RemoteStore = CreateZenRemoteStore(Options, TempFilePath);
- }
-
- if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds)
- {
- std::string_view BuildsServiceUrl = Builds["url"sv].AsString();
- if (BuildsServiceUrl.empty())
- {
- return {nullptr, "Missing service url"};
- }
-
- std::string Url = UrlDecode(BuildsServiceUrl);
- std::string_view Namespace = Builds["namespace"sv].AsString();
- if (Namespace.empty())
- {
- return {nullptr, "Missing namespace"};
- }
- std::string_view Bucket = Builds["bucket"sv].AsString();
- if (Bucket.empty())
- {
- return {nullptr, "Missing bucket"};
- }
- std::string_view OpenIdProvider = Builds["openid-provider"sv].AsString();
- std::string AccessToken = std::string(Builds["access-token"sv].AsString());
- if (AccessToken.empty())
- {
- std::string_view AccessTokenEnvVariable = Builds["access-token-env"].AsString();
- if (!AccessTokenEnvVariable.empty())
- {
- AccessToken = GetEnvVariable(AccessTokenEnvVariable);
- }
- }
- std::filesystem::path OidcExePath;
- if (std::string_view OidcExePathString = Builds["oidc-exe-path"].AsString(); !OidcExePathString.empty())
- {
- std::filesystem::path OidcExePathMaybe(OidcExePathString);
- if (IsFile(OidcExePathMaybe))
- {
- OidcExePath = std::move(OidcExePathMaybe);
- }
- else
- {
- ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString);
- }
- }
- std::string_view BuildIdParam = Builds["buildsid"sv].AsString();
- if (BuildIdParam.empty())
- {
- return {nullptr, "Missing build id"};
- }
- if (BuildIdParam.length() != Oid::StringLength)
- {
- return {nullptr, "Invalid build id"};
- }
- Oid BuildId = Oid::FromHexString(BuildIdParam);
- if (BuildId == Oid::Zero)
- {
- return {nullptr, "Invalid build id string"};
- }
-
- bool ForceDisableBlocks = Builds["disableblocks"sv].AsBool(false);
- bool ForceDisableTempBlocks = Builds["disabletempblocks"sv].AsBool(false);
- bool AssumeHttp2 = Builds["assumehttp2"sv].AsBool(false);
-
- MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView();
- IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize());
-
- BuildsRemoteStoreOptions Options = {
- RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- Url,
- std::string(Namespace),
- std::string(Bucket),
- BuildId,
- std::string(OpenIdProvider),
- AccessToken,
- AuthManager,
- OidcExePath,
- ForceDisableBlocks,
- ForceDisableTempBlocks,
- AssumeHttp2,
- MetaData};
- RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false);
- }
-
- if (!RemoteStore)
- {
- return {nullptr, "Unknown remote store type"};
- }
-
- return {std::move(RemoteStore), ""};
- }
-
- std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result)
- {
- if (Result.ErrorCode == 0)
- {
- return {HttpResponseCode::OK, Result.Text};
- }
- return {static_cast<HttpResponseCode>(Result.ErrorCode),
- Result.Reason.empty() ? Result.Text
- : Result.Text.empty() ? Result.Reason
- : fmt::format("{}: {}", Result.Reason, Result.Text)};
- }
-
std::pair<int32_t, int32_t> GetPagedRange(int32_t TotalSize, const ProjectStore::Oplog::Paging& EntryPaging)
{
int32_t Start = std::clamp(EntryPaging.Start, 0, TotalSize);
@@ -4292,17 +4026,10 @@ ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const
//////////////////////////////////////////////////////////////////////////
-ProjectStore::ProjectStore(CidStore& Store,
- std::filesystem::path BasePath,
- GcManager& Gc,
- JobQueue& JobQueue,
- OpenProcessCache& InOpenProcessCache,
- const Configuration& Config)
+ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, const Configuration& Config)
: m_Log(logging::Get("project"))
, m_Gc(Gc)
, m_CidStore(Store)
-, m_JobQueue(JobQueue)
-, m_OpenProcessCache(InOpenProcessCache)
, m_ProjectBasePath(BasePath)
, m_Config(Config)
, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker())
@@ -4990,7 +4717,7 @@ ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, cons
}
uint64_t ChunkSize = Chunk.GetSize();
- if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
+ if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
{
IoHash RawHash;
uint64_t RawSize;
@@ -5165,6 +4892,7 @@ IoBuffer
ProjectStore::GetChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::GetChunk");
ZEN_UNUSED(Project, Oplog);
IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash);
@@ -5182,6 +4910,7 @@ IoBuffer
ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const Oid& ChunkId)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::GetChunk");
Ref<Project> Project = OpenProject(ProjectId);
if (!Project)
@@ -5200,6 +4929,7 @@ IoBuffer
ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const IoHash& Cid)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::GetChunk");
Ref<Project> Project = OpenProject(ProjectId);
if (!Project)
@@ -5218,6 +4948,8 @@ bool
ProjectStore::PutChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash, IoBuffer&& Chunk)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::PutChunk");
+
IoHash RawHash;
uint64_t RawSize;
CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
@@ -5232,1041 +4964,340 @@ ProjectStore::PutChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash,
return Result.New;
}
-std::pair<HttpResponseCode, std::string>
-ProjectStore::GetChunks(const std::string_view ProjectId,
- const std::string_view OplogId,
- const CbObject& RequestObject,
- CbPackage& OutResponsePackage)
+std::vector<ProjectStore::ChunkResult>
+ProjectStore::GetChunks(Project& Project, Oplog& Oplog, std::span<const ChunkRequest> Requests)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_TRACE_CPU("Store::GetChunks");
+ ZEN_TRACE_CPU("ProjectStore::GetChunks");
- using namespace std::literals;
+ ZEN_ASSERT(!Requests.empty());
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("getchunks rpc request for unknown project '{}'", ProjectId)};
- }
- Project->TouchProject();
+ std::vector<ProjectStore::ChunkResult> Results;
+ size_t RequestCount = Requests.size();
- Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false);
- if (!FoundLog)
- {
- return {HttpResponseCode::NotFound, fmt::format("getchunks rpc request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
- Project->TouchOplog(OplogId);
+ Results.resize(RequestCount);
- if (RequestObject["chunks"sv].IsArray())
+ if (RequestCount > 1)
{
- // Legacy full chunks only by rawhash
+ std::vector<IoHash> ChunkRawHashes;
+ std::vector<size_t> ChunkRawHashesRequestIndex;
+ std::vector<Oid> ChunkIds;
+ std::vector<size_t> ChunkIdsRequestIndex;
- CbArrayView ChunksArray = RequestObject["chunks"sv].AsArrayView();
+ ChunkRawHashes.reserve(RequestCount);
+ ChunkRawHashesRequestIndex.reserve(RequestCount);
+ ChunkIds.reserve(RequestCount);
+ ChunkIdsRequestIndex.reserve(RequestCount);
- CbObjectWriter ResponseWriter;
- ResponseWriter.BeginArray("chunks"sv);
- for (CbFieldView FieldView : ChunksArray)
+ for (size_t RequestIndex = 0; RequestIndex < Requests.size(); RequestIndex++)
{
- IoHash RawHash = FieldView.AsHash();
- IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash);
- if (ChunkBuffer)
+ const ChunkRequest& Request = Requests[RequestIndex];
+ if (Request.Id.index() == 0)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer));
- if (Compressed)
+ ChunkRawHashes.push_back(std::get<IoHash>(Request.Id));
+ ChunkRawHashesRequestIndex.push_back(RequestIndex);
+ }
+ else
+ {
+ ChunkIds.push_back(std::get<Oid>(Request.Id));
+ ChunkIdsRequestIndex.push_back(RequestIndex);
+ }
+ }
+
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();
+ if (!ChunkRawHashes.empty())
+ {
+ Oplog.IterateChunks(
+ ChunkRawHashes,
+ true,
+ [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool {
+ if (Payload)
+ {
+ size_t RequestIndex = ChunkRawHashesRequestIndex[Index];
+ const ChunkRequest& Request = Requests[RequestIndex];
+ ChunkResult& Result = Results[RequestIndex];
+ Result.Exists = true;
+ if (!Request.SkipData)
+ {
+ Result.ChunkBuffer = std::move(Payload);
+ Result.ChunkBuffer.MakeOwned();
+ }
+ Result.ModTag = ModTag;
+ }
+ return true;
+ },
+ &WorkerPool,
+ 8u * 1024);
+ }
+ if (!ChunkIdsRequestIndex.empty())
+ {
+ Oplog.IterateChunks(
+ Project.RootDir,
+ ChunkIds,
+ true,
+ [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool {
+ if (Payload)
+ {
+ size_t RequestIndex = ChunkIdsRequestIndex[Index];
+ const ChunkRequest& Request = Requests[RequestIndex];
+ ChunkResult& Result = Results[RequestIndex];
+ Result.Exists = true;
+ if (!Request.SkipData)
+ {
+ Result.ChunkBuffer = std::move(Payload);
+ Result.ChunkBuffer.MakeOwned();
+ }
+ Result.ModTag = ModTag;
+ }
+ return true;
+ },
+ &WorkerPool,
+ 8u * 1024);
+ }
+ }
+ else
+ {
+ const ChunkRequest& Request = Requests.front();
+ ChunkResult& Result = Results.front();
+
+ if (Request.Id.index() == 0)
+ {
+ const IoHash& ChunkHash = std::get<IoHash>(Request.Id);
+ IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash);
+ if (Payload)
+ {
+ Result.Exists = true;
+ Result.ModTag = GetModificationTagFromRawHash(ChunkHash);
+ if (!Request.SkipData)
{
- ResponseWriter.AddHash(RawHash);
- OutResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
+ Result.ChunkBuffer = std::move(Payload);
+ Result.ChunkBuffer.MakeOwned();
}
- else
+ }
+ }
+ else
+ {
+ const Oid& ChunkId = std::get<Oid>(Request.Id);
+ uint64_t ModTag = 0;
+ IoBuffer Payload = Oplog.FindChunk(Project.RootDir, ChunkId, &ModTag);
+ if (Payload)
+ {
+ Result.Exists = true;
+ Result.ModTag = ModTag;
+ if (!Request.SkipData)
{
- ZEN_WARN("oplog '{}/{}': invalid compressed binary in cas store for {}", ProjectId, OplogId, RawHash);
+ Result.ChunkBuffer = std::move(Payload);
+ Result.ChunkBuffer.MakeOwned();
}
}
}
- ResponseWriter.EndArray();
- OutResponsePackage.SetObject(ResponseWriter.Save());
- return {HttpResponseCode::OK, {}};
}
- else if (auto RequestFieldView = RequestObject["Request"sv]; RequestFieldView.IsObject())
+ return Results;
+}
+
+std::vector<ProjectStore::ChunkRequest>
+ProjectStore::ParseChunksRequests(Project& Project, Oplog& Oplog, const CbObject& Cb)
+{
+ ZEN_TRACE_CPU("Store::Rpc::getchunks");
+
+ using namespace std::literals;
+
+ std::vector<ChunkRequest> Requests;
+
+ if (auto RequestFieldView = Cb["Request"sv]; RequestFieldView.IsObject())
{
CbObjectView RequestView = RequestFieldView.AsObjectView();
bool SkipData = RequestView["SkipData"].AsBool(false);
CbArrayView ChunksArray = RequestView["Chunks"sv].AsArrayView();
- struct Request
- {
- struct InputData
- {
- uint64_t Offset = 0;
- uint64_t Size = (uint64_t)-1;
- std::variant<IoHash, Oid> Id;
- std::optional<uint64_t> ModTag;
- } Input;
- struct OutputData
- {
- bool Exists = false;
- IoBuffer ChunkBuffer;
- uint64_t ModTag = 0;
- } Output;
- };
-
- std::vector<Request> Requests;
- size_t RequestCount = ChunksArray.Num();
+ size_t RequestCount = ChunksArray.Num();
if (RequestCount > 0)
{
Requests.reserve(RequestCount);
- std::vector<IoHash> ChunkRawHashes;
- std::vector<size_t> ChunkRawHashesRequestIndex;
- std::vector<Oid> ChunkIds;
- std::vector<size_t> ChunkIdsRequestIndex;
- bool DoBatch = RequestCount > 1;
- if (DoBatch)
- {
- ChunkRawHashes.reserve(RequestCount);
- ChunkRawHashesRequestIndex.reserve(RequestCount);
- ChunkIds.reserve(RequestCount);
- ChunkIdsRequestIndex.reserve(RequestCount);
- }
for (CbFieldView FieldView : ChunksArray)
{
CbObjectView ChunkObject = FieldView.AsObjectView();
- Request ChunkRequest = {
- .Input{.Offset = ChunkObject["Offset"sv].AsUInt64(0), .Size = ChunkObject["Size"sv].AsUInt64((uint64_t)-1)}};
+ ChunkRequest ChunkRequest = {.Offset = ChunkObject["Offset"sv].AsUInt64(0),
+ .Size = ChunkObject["Size"sv].AsUInt64((uint64_t)-1),
+ .SkipData = SkipData};
if (CbFieldView InputModificationTagView = ChunkObject.FindView("ModTag"); InputModificationTagView.IsInteger())
{
- ChunkRequest.Input.ModTag = InputModificationTagView.AsUInt64();
+ ChunkRequest.ModTag = InputModificationTagView.AsUInt64();
}
if (CbFieldView RawHashView = ChunkObject.FindView("RawHash"sv); RawHashView.IsHash())
{
const IoHash ChunkHash = RawHashView.AsHash();
- ChunkRequest.Input.Id = ChunkHash;
- if (DoBatch)
- {
- ChunkRawHashes.push_back(ChunkHash);
- ChunkRawHashesRequestIndex.push_back(Requests.size());
- }
+ ChunkRequest.Id = ChunkHash;
}
else if (CbFieldView IdView = ChunkObject.FindView("Oid"sv); IdView.IsObjectId())
{
- const Oid ChunkId = IdView.AsObjectId();
- ChunkRequest.Input.Id = ChunkId;
- if (DoBatch)
- {
- ChunkIds.push_back(ChunkId);
- ChunkIdsRequestIndex.push_back(Requests.size());
- }
+ const Oid ChunkId = IdView.AsObjectId();
+ ChunkRequest.Id = ChunkId;
}
else
{
- return {HttpResponseCode::BadRequest,
- fmt::format("oplog '{}/{}': malformed getchunks rpc request object, chunk request has no identifier",
- ProjectId,
- OplogId)};
+ throw std::runtime_error(
+ fmt::format("oplog '{}/{}': malformed getchunks rpc request object, chunk request has no identifier",
+ Project.Identifier,
+ Oplog.OplogId()));
}
Requests.emplace_back(std::move(ChunkRequest));
}
-
- if (DoBatch)
- {
- WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();
- if (!ChunkRawHashes.empty())
- {
- FoundLog->IterateChunks(
- ChunkRawHashes,
- true,
- [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool {
- if (Payload)
- {
- size_t RequestIndex = ChunkRawHashesRequestIndex[Index];
- Requests[RequestIndex].Output.Exists = true;
- if (!SkipData)
- {
- Requests[RequestIndex].Output.ChunkBuffer = Payload;
- Requests[RequestIndex].Output.ChunkBuffer.MakeOwned();
- }
- Requests[RequestIndex].Output.ModTag = ModTag;
- }
- return true;
- },
- &WorkerPool,
- 8u * 1024);
- }
- if (!ChunkIdsRequestIndex.empty())
- {
- FoundLog->IterateChunks(
- Project->RootDir,
- ChunkIds,
- true,
- [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool {
- if (Payload)
- {
- size_t RequestIndex = ChunkIdsRequestIndex[Index];
- Requests[RequestIndex].Output.Exists = true;
- if (!SkipData)
- {
- Requests[RequestIndex].Output.ChunkBuffer = Payload;
- Requests[RequestIndex].Output.ChunkBuffer.MakeOwned();
- }
- Requests[RequestIndex].Output.ModTag = ModTag;
- }
- return true;
- },
- &WorkerPool,
- 8u * 1024);
- }
- }
- else
- {
- Request& ChunkRequest = Requests.front();
- if (ChunkRequest.Input.Id.index() == 0)
- {
- const IoHash& ChunkHash = std::get<IoHash>(ChunkRequest.Input.Id);
- IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash);
- if (Payload)
- {
- ChunkRequest.Output.Exists = true;
- ChunkRequest.Output.ModTag = GetModificationTagFromRawHash(ChunkHash);
- if (!SkipData)
- {
- ChunkRequest.Output.ChunkBuffer = Payload;
- }
- }
- }
- else
- {
- const Oid& ChunkId = std::get<Oid>(ChunkRequest.Input.Id);
- uint64_t ModTag = 0;
- IoBuffer Payload = FoundLog->FindChunk(Project->RootDir, ChunkId, &ModTag);
- if (Payload)
- {
- ChunkRequest.Output.Exists = true;
- ChunkRequest.Output.ModTag = ModTag;
- if (!SkipData)
- {
- ChunkRequest.Output.ChunkBuffer = Payload;
- }
- }
- }
- }
- }
-
- CbObjectWriter ResponseWriter(32 + Requests.size() * 64u);
- ResponseWriter.BeginArray("Chunks"sv);
- {
- for (Request& ChunkRequest : Requests)
- {
- if (ChunkRequest.Output.Exists)
- {
- ResponseWriter.BeginObject();
- {
- if (ChunkRequest.Input.Id.index() == 0)
- {
- const IoHash& RawHash = std::get<IoHash>(ChunkRequest.Input.Id);
- ResponseWriter.AddHash("Id", RawHash);
- }
- else
- {
- const Oid& Id = std::get<Oid>(ChunkRequest.Input.Id);
- ResponseWriter.AddObjectId("Id", Id);
- }
- if (!ChunkRequest.Input.ModTag.has_value() || ChunkRequest.Input.ModTag.value() != ChunkRequest.Output.ModTag)
- {
- ResponseWriter.AddInteger("ModTag", ChunkRequest.Output.ModTag);
- if (!SkipData)
- {
- auto ExtractRangeResult = ExtractRange(std::move(ChunkRequest.Output.ChunkBuffer),
- ChunkRequest.Input.Offset,
- ChunkRequest.Input.Size,
- ZenContentType::kCompressedBinary);
- if (ExtractRangeResult.Error == GetChunkRangeResult::EError::Ok)
- {
- if (ExtractRangeResult.ContentType == ZenContentType::kCompressedBinary)
- {
- ZEN_ASSERT(ExtractRangeResult.RawHash != IoHash::Zero);
- CompressedBuffer CompressedValue =
- CompressedBuffer::FromCompressedNoValidate(std::move(ExtractRangeResult.Chunk));
- ZEN_ASSERT(CompressedValue);
-
- if (ExtractRangeResult.RawSize != 0)
- {
- // This really could use some thought so we don't send the same data if we get a request for
- // multiple ranges from the same chunk block
-
- uint64_t FragmentRawOffset = 0;
- OodleCompressor Compressor;
- OodleCompressionLevel CompressionLevel;
- uint64_t BlockSize = 0;
- if (CompressedValue.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
- {
- if (BlockSize > 0)
- {
- FragmentRawOffset = (ChunkRequest.Input.Offset / BlockSize) * BlockSize;
- }
- else
- {
- FragmentRawOffset = ChunkRequest.Input.Offset;
- }
- uint64_t FragmentRawLength = CompressedValue.DecodeRawSize();
-
- IoHashStream FragmentHashStream;
- FragmentHashStream.Append(ExtractRangeResult.RawHash.Hash,
- sizeof(ExtractRangeResult.RawHash.Hash));
- FragmentHashStream.Append(&FragmentRawOffset, sizeof(FragmentRawOffset));
- FragmentHashStream.Append(&FragmentRawLength, sizeof(FragmentRawLength));
- IoHash FragmentHash = FragmentHashStream.GetHash();
-
- ResponseWriter.AddHash("FragmentHash", FragmentHash);
- ResponseWriter.AddInteger("FragmentOffset", FragmentRawOffset);
- ResponseWriter.AddInteger("RawSize", ExtractRangeResult.RawSize);
- OutResponsePackage.AddAttachment(CbAttachment(CompressedValue, FragmentHash));
- }
- else
- {
- std::string ErrorString =
- "Failed to get compression parameters from partial compressed buffer";
- ResponseWriter.AddString("Error", ErrorString);
- ZEN_WARN("oplog '{}/{}': {}", ProjectId, OplogId, ErrorString);
- }
- }
- else
- {
- ResponseWriter.AddHash("RawHash"sv, ExtractRangeResult.RawHash);
- OutResponsePackage.AddAttachment(
- CbAttachment(std::move(CompressedValue), ExtractRangeResult.RawHash));
- }
- }
- else
- {
- IoHashStream HashStream;
- ZEN_ASSERT(ChunkRequest.Input.Id.index() == 1);
- const Oid& Id = std::get<Oid>(ChunkRequest.Input.Id);
- HashStream.Append(Id.OidBits, sizeof(Id.OidBits));
- HashStream.Append(&ChunkRequest.Input.Offset, sizeof(ChunkRequest.Input.Offset));
- HashStream.Append(&ChunkRequest.Input.Size, sizeof(ChunkRequest.Input.Size));
- IoHash Hash = HashStream.GetHash();
-
- ResponseWriter.AddHash("Hash"sv, Hash);
- if (ExtractRangeResult.RawSize != 0)
- {
- ResponseWriter.AddInteger("Size", ExtractRangeResult.RawSize);
- }
- OutResponsePackage.AddAttachment(CbAttachment(std::move(ExtractRangeResult.Chunk), Hash));
- }
- }
- else
- {
- std::string ErrorString =
- fmt::format("Failed fetching chunk range ({})", ExtractRangeResult.ErrorDescription);
- ResponseWriter.AddString("Error", ErrorString);
- ZEN_WARN("oplog '{}/{}': {}", ProjectId, OplogId, ErrorString);
- }
- }
- }
- }
- ResponseWriter.EndObject();
- }
- }
}
- ResponseWriter.EndArray();
- OutResponsePackage.SetObject(ResponseWriter.Save());
- return {HttpResponseCode::OK, {}};
}
- else
+ else if (CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); ChunksArray)
{
- return {HttpResponseCode::BadRequest, fmt::format("oplog '{}/{}': malformed getchunks rpc request object", ProjectId, OplogId)};
- }
-}
-
-ProjectStore::WriteOplogResult
-ProjectStore::WriteOplog(Project& Project, Oplog& Oplog, const CbObject& ContainerObject)
-{
- ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_TRACE_CPU("Store::WriteOplog");
- ZEN_UNUSED(Project);
-
- CidStore& ChunkStore = m_CidStore;
- RwLock AttachmentsLock;
- tsl::robin_set<IoHash, IoHash::Hasher> Attachments;
-
- auto HasAttachment = [&ChunkStore](const IoHash& RawHash) { return ChunkStore.ContainsChunk(RawHash); };
- auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) {
- RwLock::ExclusiveLockScope _(AttachmentsLock);
- if (BlockHash != IoHash::Zero)
- {
- Attachments.insert(BlockHash);
- }
- else
- {
- Attachments.insert(ChunkHashes.begin(), ChunkHashes.end());
- }
- };
- auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) {
- RwLock::ExclusiveLockScope _(AttachmentsLock);
- Attachments.insert(RawHash);
- };
-
- auto OnChunkedAttachment = [](const ChunkedInfo&) {};
-
- auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); };
+ // Legacy full chunks only by rawhash
- // Make sure we retain any attachments we download before writing the oplog
- Oplog.EnableUpdateCapture();
- auto _ = MakeGuard([&Oplog]() { Oplog.DisableUpdateCapture(); });
+ size_t RequestCount = ChunksArray.Num();
- RemoteProjectStore::Result RemoteResult = SaveOplogContainer(Oplog,
- ContainerObject,
- OnReferencedAttachments,
- HasAttachment,
- OnNeedBlock,
- OnNeedAttachment,
- OnChunkedAttachment,
- nullptr);
+ Requests.reserve(RequestCount);
- if (RemoteResult.ErrorCode)
- {
- return ProjectStore::WriteOplogResult{.ErrorCode = RemoteResult.ErrorCode, .ErrorDescription = RemoteResult.Reason};
+ std::vector<IoHash> Cids;
+ Cids.reserve(ChunksArray.Num());
+ for (CbFieldView FieldView : ChunksArray)
+ {
+ Requests.push_back(ProjectStore::ChunkRequest{.Id = FieldView.AsHash()});
+ }
}
-
- return ProjectStore::WriteOplogResult{.Need = std::vector<IoHash>(Attachments.begin(), Attachments.end())};
-}
-
-ProjectStore::ReadOplogResult
-ProjectStore::ReadOplog(Project& Project,
- Oplog& Oplog,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize,
- size_t MaxChunksPerBlock,
- size_t ChunkFileSizeLimit)
-{
- ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_TRACE_CPU("Store::ReadOplog");
-
- CidStore& ChunkStore = m_CidStore;
-
- RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
- ChunkStore,
- Project,
- Oplog,
- MaxBlockSize,
- MaxChunkEmbedSize,
- MaxChunksPerBlock,
- ChunkFileSizeLimit,
- /* BuildBlocks */ false,
- /* IgnoreMissingAttachments */ false,
- /* AllowChunking*/ false,
- [](CompressedBuffer&&, ChunkBlockDescription&&) {},
- [](const IoHash&, TGetAttachmentBufferFunc&&) {},
- [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {},
- /* EmbedLooseFiles*/ false);
-
- if (ContainerResult.ErrorCode)
+ else
{
- return ProjectStore::ReadOplogResult{.ErrorCode = ContainerResult.ErrorCode, .ErrorDescription = ContainerResult.Reason};
+ throw std::runtime_error(fmt::format("oplog '{}/{}': malformed getchunks rpc request object", Project.Identifier, Oplog.OplogId()));
}
-
- return ProjectStore::ReadOplogResult{.ContainerObject = std::move(ContainerResult.ContainerObject)};
+ return Requests;
}
-bool
-ProjectStore::Rpc(HttpServerRequest& HttpReq,
- const std::string_view ProjectId,
- const std::string_view OplogId,
- IoBuffer&& Payload,
- AuthMgr& AuthManager)
+CbPackage
+ProjectStore::WriteChunksRequestResponse(Project& Project,
+ Oplog& Oplog,
+ std::vector<ChunkRequest>&& Requests,
+ std::vector<ChunkResult>&& Results)
{
- ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_TRACE_CPU("Store::Rpc");
-
using namespace std::literals;
- HttpContentType PayloadContentType = HttpReq.RequestContentType();
- CbPackage Package;
- CbObject Cb;
- switch (PayloadContentType)
- {
- case HttpContentType::kJSON:
- case HttpContentType::kUnknownContentType:
- case HttpContentType::kText:
- {
- std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
- Cb = LoadCompactBinaryFromJson(JsonText).AsObject();
- if (!Cb)
- {
- HttpReq.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Content format not supported, expected JSON format");
- return false;
- }
- }
- break;
- case HttpContentType::kCbObject:
- {
- CbValidateError ValidateResult;
- if (Cb = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult);
- ValidateResult != CbValidateError::None || !Cb)
- {
- HttpReq.WriteResponse(
- HttpResponseCode::BadRequest,
- HttpContentType::kText,
- fmt::format("Content format not supported, expected compact binary format ('{}')", ToString(ValidateResult)));
- return false;
- }
- break;
- }
- case HttpContentType::kCbPackage:
- try
- {
- Package = ParsePackageMessage(Payload);
- Cb = Package.GetObject();
- }
- catch (const std::invalid_argument& ex)
- {
- HttpReq.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- fmt::format("Failed to parse package request, reason: '{}'", ex.what()));
- return false;
- }
- if (!Cb)
- {
- HttpReq.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Content format not supported, expected package message format");
- return false;
- }
- break;
- default:
- HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type");
- return false;
- }
-
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- HttpReq.WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("Rpc oplog request for unknown project '{}'", ProjectId));
- return true;
- }
- Project->TouchProject();
-
- std::string_view Method = Cb["method"sv].AsString();
- bool VerifyPathOnDisk = Method != "getchunks"sv;
+ CbPackage ResponsePackage;
- Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk);
- if (!Oplog)
- {
- HttpReq.WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId));
- return true;
- }
- Project->TouchOplog(OplogId);
-
- uint32_t MethodHash = HashStringDjb2(Method);
-
- switch (MethodHash)
+ CbObjectWriter ResponseWriter(32 + Requests.size() * 64u);
+ ResponseWriter.BeginArray("Chunks"sv);
{
- case HashStringDjb2("import"sv):
- {
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
- std::pair<HttpResponseCode, std::string> Result =
- Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- if (Result.second.empty())
- {
- HttpReq.WriteResponse(Result.first);
- return Result.first != HttpResponseCode::BadRequest;
- }
- HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- return true;
- }
- case HashStringDjb2("export"sv):
- {
- std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- return true;
- }
- case HashStringDjb2("getchunks"sv):
+ for (size_t Index = 0; Index < Requests.size(); Index++)
+ {
+ const ChunkRequest& Request = Requests[Index];
+ ChunkResult& Result = Results[Index];
+ if (Result.Exists)
{
- ZEN_TRACE_CPU("Store::Rpc::getchunks");
-
- RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u));
- int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0);
-
- CbPackage ResponsePackage;
- std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage);
- if (Result.first == HttpResponseCode::OK)
+ ResponseWriter.BeginObject();
{
- void* TargetProcessHandle = nullptr;
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ if (Request.Id.index() == 0)
{
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
- {
- Flags |= FormatFlags::kDenyPartialLocalReferences;
- }
- TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId);
+ const IoHash& RawHash = std::get<IoHash>(Request.Id);
+ ResponseWriter.AddHash("Id", RawHash);
}
-
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle);
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- }
- return true;
- }
- case HashStringDjb2("putchunks"sv):
- {
- ZEN_TRACE_CPU("Store::Rpc::putchunks");
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
-
- CbObject Object = Package.GetObject();
- const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false);
-
- std::span<const CbAttachment> Attachments = Package.GetAttachments();
- if (!Attachments.empty())
- {
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
-
- WriteAttachmentBuffers.reserve(Attachments.size());
- WriteRawHashes.reserve(Attachments.size());
-
- for (const CbAttachment& Attachment : Attachments)
+ else
{
- IoHash RawHash = Attachment.GetHash();
- const CompressedBuffer& Compressed = Attachment.AsCompressedBinary();
- IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer();
- if (UsingTempFiles)
- {
- AttachmentPayload.SetDeleteOnClose(true);
- }
- WriteAttachmentBuffers.push_back(std::move(AttachmentPayload));
- WriteRawHashes.push_back(RawHash);
+ const Oid& Id = std::get<Oid>(Request.Id);
+ ResponseWriter.AddObjectId("Id", Id);
}
-
- Oplog->CaptureAddedAttachments(WriteRawHashes);
- m_CidStore.AddChunks(WriteAttachmentBuffers,
- WriteRawHashes,
- UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly);
- }
- HttpReq.WriteResponse(HttpResponseCode::OK);
- return true;
- }
- case HashStringDjb2("snapshot"sv):
- {
- ZEN_TRACE_CPU("Store::Rpc::snapshot");
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
-
- // Snapshot all referenced files. This brings the content of all
- // files into the CID store
-
- uint32_t OpCount = 0;
- uint64_t InlinedBytes = 0;
- uint64_t InlinedFiles = 0;
- uint64_t TotalBytes = 0;
- uint64_t TotalFiles = 0;
-
- std::vector<CbObject> NewOps;
- struct AddedChunk
- {
- IoBuffer Buffer;
- uint64_t RawSize = 0;
- };
- tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks;
-
- Oplog->IterateOplog(
- [&](CbObjectView Op) {
- bool OpRewritten = false;
- bool AllOk = true;
-
- CbWriter FilesArrayWriter;
- FilesArrayWriter.BeginArray("files"sv);
-
- for (CbFieldView& Field : Op["files"sv])
+ if (!Request.ModTag.has_value() || Request.ModTag.value() != Result.ModTag)
+ {
+ ResponseWriter.AddInteger("ModTag", Result.ModTag);
+ if (!Request.SkipData)
{
- bool CopyField = true;
-
- if (CbObjectView View = Field.AsObjectView())
+ auto ExtractRangeResult = ExtractRange(std::move(Result.ChunkBuffer),
+ Request.Offset,
+ Request.Size,
+ ZenContentType::kCompressedBinary);
+ if (ExtractRangeResult.Error == GetChunkRangeResult::EError::Ok)
{
- const IoHash DataHash = View["data"sv].AsHash();
-
- if (DataHash == IoHash::Zero)
+ if (ExtractRangeResult.ContentType == ZenContentType::kCompressedBinary)
{
- std::string_view ServerPath = View["serverpath"sv].AsString();
- std::filesystem::path FilePath = Project->RootDir / ServerPath;
- BasicFile DataFile;
- std::error_code Ec;
- DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
+ ZEN_ASSERT(ExtractRangeResult.RawHash != IoHash::Zero);
+ CompressedBuffer CompressedValue =
+ CompressedBuffer::FromCompressedNoValidate(std::move(ExtractRangeResult.Chunk));
+ ZEN_ASSERT(CompressedValue);
- if (Ec)
+ if (ExtractRangeResult.RawSize != 0)
{
- // Error...
-
- ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message());
-
- AllOk = false;
- }
- else
- {
- // Read file contents into memory, compress and keep in map of chunks to add to Cid store
- IoBuffer FileIoBuffer = DataFile.ReadAll();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
- const uint64_t RawSize = Compressed.DecodeRawSize();
- const IoHash RawHash = Compressed.DecodeRawHash();
- if (!AddedChunks.contains(RawHash))
+ // This really could use some thought so we don't send the same data if we get a request for
+ // multiple ranges from the same chunk block
+
+ uint64_t FragmentRawOffset = 0;
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize = 0;
+ if (CompressedValue.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
{
- const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString();
- BasicFile ChunkTempFile;
- ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete);
- ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec);
- if (Ec)
+ if (BlockSize > 0)
{
- Oid ChunkId = View["id"sv].AsObjectId();
- ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}",
- FilePath,
- ChunkId,
- Ec.message());
- AllOk = false;
+ FragmentRawOffset = (Request.Offset / BlockSize) * BlockSize;
}
else
{
- void* FileHandle = ChunkTempFile.Detach();
- IoBuffer ChunkBuffer(IoBuffer::File,
- FileHandle,
- 0,
- Compressed.GetCompressed().GetSize(),
- /*IsWholeFile*/ true);
- ChunkBuffer.SetDeleteOnClose(true);
- AddedChunks.insert_or_assign(
- RawHash,
- AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize});
+ FragmentRawOffset = Request.Offset;
}
+ uint64_t FragmentRawLength = CompressedValue.DecodeRawSize();
+
+ IoHashStream FragmentHashStream;
+ FragmentHashStream.Append(ExtractRangeResult.RawHash.Hash,
+ sizeof(ExtractRangeResult.RawHash.Hash));
+ FragmentHashStream.Append(&FragmentRawOffset, sizeof(FragmentRawOffset));
+ FragmentHashStream.Append(&FragmentRawLength, sizeof(FragmentRawLength));
+ IoHash FragmentHash = FragmentHashStream.GetHash();
+
+ ResponseWriter.AddHash("FragmentHash", FragmentHash);
+ ResponseWriter.AddInteger("FragmentOffset", FragmentRawOffset);
+ ResponseWriter.AddInteger("RawSize", ExtractRangeResult.RawSize);
+ ResponsePackage.AddAttachment(CbAttachment(CompressedValue, FragmentHash));
+ }
+ else
+ {
+ std::string ErrorString = "Failed to get compression parameters from partial compressed buffer";
+ ResponseWriter.AddString("Error", ErrorString);
+ ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString);
}
-
- TotalBytes += RawSize;
- ++TotalFiles;
-
- // Rewrite file array entry with new data reference
- CbObjectWriter Writer(View.GetSize());
- RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
- if (Field.GetName() == "data"sv)
- {
- // omit this field as we will write it explicitly ourselves
- return true;
- }
- return false;
- });
- Writer.AddBinaryAttachment("data"sv, RawHash);
-
- CbObject RewrittenOp = Writer.Save();
- FilesArrayWriter.AddObject(std::move(RewrittenOp));
- CopyField = false;
+ }
+ else
+ {
+ ResponseWriter.AddHash("RawHash"sv, ExtractRangeResult.RawHash);
+ ResponsePackage.AddAttachment(CbAttachment(std::move(CompressedValue), ExtractRangeResult.RawHash));
}
}
- }
-
- if (CopyField)
- {
- FilesArrayWriter.AddField(Field);
- }
- else
- {
- OpRewritten = true;
- }
- }
-
- if (OpRewritten && AllOk)
- {
- FilesArrayWriter.EndArray();
- CbArray FilesArray = FilesArrayWriter.Save().AsArray();
-
- CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
- if (Field.GetName() == "files"sv)
+ else
{
- NewWriter.AddArray("files"sv, FilesArray);
-
- return true;
+ IoHashStream HashStream;
+ ZEN_ASSERT(Request.Id.index() == 1);
+ const Oid& Id = std::get<Oid>(Request.Id);
+ HashStream.Append(Id.OidBits, sizeof(Id.OidBits));
+ HashStream.Append(&Request.Offset, sizeof(Request.Offset));
+ HashStream.Append(&Request.Size, sizeof(Request.Size));
+ IoHash Hash = HashStream.GetHash();
+
+ ResponseWriter.AddHash("Hash"sv, Hash);
+ if (ExtractRangeResult.RawSize != 0)
+ {
+ ResponseWriter.AddInteger("Size", ExtractRangeResult.RawSize);
+ }
+ ResponsePackage.AddAttachment(CbAttachment(std::move(ExtractRangeResult.Chunk), Hash));
}
-
- return false;
- });
-
- NewOps.push_back(std::move(RewrittenOp));
- }
-
- OpCount++;
- },
- Oplog::Paging{});
-
- CbObjectWriter ResponseObj;
-
- // Persist rewritten oplog entries
- if (!NewOps.empty())
- {
- ResponseObj.BeginArray("rewritten_ops");
-
- for (CbObject& NewOp : NewOps)
- {
- ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
-
- ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number);
-
- ResponseObj.AddInteger(NewLsn.Number);
- }
-
- ResponseObj.EndArray();
- }
-
- // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the
- // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have
- // unreferenced chunks.
- for (auto It : AddedChunks)
- {
- const IoHash& RawHash = It.first;
- AddedChunk& Chunk = It.second;
- CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash);
- if (Result.New)
- {
- InlinedBytes += Chunk.RawSize;
- ++InlinedFiles;
- }
- }
-
- ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
- ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;
-
- ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount);
-
- HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save());
- return true;
- }
- case HashStringDjb2("addopattachments"sv):
- {
- ZEN_TRACE_CPU("Store::Rpc::addopattachments");
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
- }
- case HashStringDjb2("appendops"sv):
- {
- ZEN_TRACE_CPU("Store::Rpc::appendops");
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
-
- CbArrayView OpsArray = Cb["ops"sv].AsArrayView();
-
- size_t OpsBufferSize = 0;
- for (CbFieldView OpView : OpsArray)
- {
- OpsBufferSize += OpView.GetSize();
- }
- UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize);
- MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView();
-
- std::vector<CbObjectView> Ops;
- Ops.reserve(OpsArray.Num());
- for (CbFieldView OpView : OpsArray)
- {
- OpView.CopyTo(OpsBuffersMemory);
- Ops.push_back(CbObjectView(OpsBuffersMemory.GetData()));
- OpsBuffersMemory.MidInline(OpView.GetSize());
- }
-
- std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops);
- ZEN_ASSERT(LSNs.size() == Ops.size());
-
- std::vector<IoHash> MissingAttachments;
- for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++)
- {
- if (LSNs[OpIndex])
- {
- CbObjectView Op = Ops[OpIndex];
- Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) {
- const IoHash Cid = AttachmentView.AsAttachment();
- if (!m_CidStore.ContainsChunk(Cid))
+ }
+ else
{
- MissingAttachments.push_back(Cid);
+ std::string ErrorString =
+ fmt::format("Failed fetching chunk range ({})", ExtractRangeResult.ErrorDescription);
+ ResponseWriter.AddString("Error", ErrorString);
+ ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString);
}
- });
- }
- }
-
- CbPackage ResponsePackage;
-
- {
- CbObjectWriter ResponseObj;
- ResponseObj.BeginArray("written_ops");
-
- for (ProjectStore::LogSequenceNumber NewLsn : LSNs)
- {
- ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number);
- ResponseObj.AddInteger(NewLsn.Number);
- }
- ResponseObj.EndArray();
-
- if (!MissingAttachments.empty())
- {
- ResponseObj.BeginArray("need");
-
- for (const IoHash& Cid : MissingAttachments)
- {
- ResponseObj.AddHash(Cid);
}
- ResponseObj.EndArray();
}
- ResponsePackage.SetObject(ResponseObj.Save());
}
-
- std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage);
-
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers);
- return true;
+ ResponseWriter.EndObject();
}
- default:
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method));
- return false;
- }
- return true;
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
-{
- ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_TRACE_CPU("Store::Export");
-
- using namespace std::literals;
-
- size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
- size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
- size_t MaxChunksPerBlock = Params["maxchunksperblock"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunksPerBlock);
- size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit);
- bool Force = Params["force"sv].AsBool(false);
- bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
- bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
-
- CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
-
- if (RemoteStoreResult.Store == nullptr)
- {
- return {HttpResponseCode::BadRequest, RemoteStoreResult.Description};
- }
- std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
- RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
-
- JobId JobId = m_JobQueue.QueueJob(
- fmt::format("Export oplog '{}/{}'", Project->Identifier, Oplog.OplogId()),
- [this,
- ActualRemoteStore = std::move(RemoteStore),
- Project,
- OplogPtr = &Oplog,
- MaxBlockSize,
- MaxChunksPerBlock,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- EmbedLooseFile,
- Force,
- IgnoreMissingAttachments](JobContext& Context) {
- Context.ReportMessage(fmt::format("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}",
- Project->Identifier,
- OplogPtr->OplogId(),
- ActualRemoteStore->GetInfo().Description,
- NiceBytes(MaxBlockSize),
- NiceBytes(MaxChunkEmbedSize)));
-
- RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
- *ActualRemoteStore,
- *Project.Get(),
- *OplogPtr,
- MaxBlockSize,
- MaxChunksPerBlock,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- EmbedLooseFile,
- Force,
- IgnoreMissingAttachments,
- &Context);
- auto Response = ConvertResult(Result);
- ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
- if (!IsHttpSuccessCode(Response.first))
- {
- throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
- (int)Response.first);
- }
- });
-
- return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
-{
- ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_TRACE_CPU("Store::Import");
-
- using namespace std::literals;
-
- size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
- size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
- bool Force = Params["force"sv].AsBool(false);
- bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
- bool CleanOplog = Params["clean"].AsBool(false);
-
- CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
-
- if (RemoteStoreResult.Store == nullptr)
- {
- return {HttpResponseCode::BadRequest, RemoteStoreResult.Description};
+ }
}
- std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
- RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+ ResponseWriter.EndArray();
+ ResponsePackage.SetObject(ResponseWriter.Save());
- JobId JobId = m_JobQueue.QueueJob(
- fmt::format("Import oplog '{}/{}'", Project.Identifier, Oplog.OplogId()),
- [this,
- ChunkStore = &m_CidStore,
- ActualRemoteStore = std::move(RemoteStore),
- OplogPtr = &Oplog,
- Force,
- IgnoreMissingAttachments,
- CleanOplog](JobContext& Context) {
- Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}",
- OplogPtr->GetOuterProjectIdentifier(),
- OplogPtr->OplogId(),
- ActualRemoteStore->GetInfo().Description));
-
- RemoteProjectStore::Result Result =
- LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, CleanOplog, &Context);
- auto Response = ConvertResult(Result);
- ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
- if (!IsHttpSuccessCode(Response.first))
- {
- throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
- (int)Response.first);
- }
- });
-
- return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
+ return ResponsePackage;
}
bool
@@ -7209,14 +6240,14 @@ OpKeyStringAsOid(std::string_view OpKey)
namespace testutils {
using namespace std::literals;
- std::string OidAsString(const Oid& Id)
+ static std::string OidAsString(const Oid& Id)
{
StringBuilder<25> OidStringBuilder;
Id.ToString(OidStringBuilder);
return OidStringBuilder.ToString();
}
- CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
+ static CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
{
CbPackage Package;
CbObjectWriter Object;
@@ -7242,9 +6273,9 @@ namespace testutils {
return Package;
};
- CbPackage CreateFilesOplogPackage(const Oid& Id,
- const std::filesystem::path ProjectRootDir,
- const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments)
+ static CbPackage CreateFilesOplogPackage(const Oid& Id,
+ const std::filesystem::path ProjectRootDir,
+ const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments)
{
CbPackage Package;
CbObjectWriter Object;
@@ -7268,7 +6299,7 @@ namespace testutils {
return Package;
};
- std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(
+ static std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(
const std::span<const size_t>& Sizes,
OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
uint64_t BlockSize = 0)
@@ -7284,7 +6315,7 @@ namespace testutils {
return Result;
}
- uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset)
+ static uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset)
{
if (RawOffset > 0)
{
@@ -7416,8 +6447,6 @@ TEST_CASE("project.store.create")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
@@ -7425,7 +6454,7 @@ TEST_CASE("project.store.create")
std::string_view ProjectName("proj1"sv);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -7447,15 +6476,13 @@ TEST_CASE("project.store.lifetimes")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -7480,135 +6507,6 @@ TEST_CASE("project.store.lifetimes")
CHECK(Project->Identifier == "proj1"sv);
}
-struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse
-{
- static const bool ForceDisableBlocks = true;
- static const bool ForceEnableTempBlocks = false;
-};
-
-struct ExportForceDisableBlocksFalse_ForceTempBlocksFalse
-{
- static const bool ForceDisableBlocks = false;
- static const bool ForceEnableTempBlocks = false;
-};
-
-struct ExportForceDisableBlocksFalse_ForceTempBlocksTrue
-{
- static const bool ForceDisableBlocks = false;
- static const bool ForceEnableTempBlocks = true;
-};
-
-TEST_CASE_TEMPLATE("project.store.export",
- Settings,
- ExportForceDisableBlocksTrue_ForceTempBlocksFalse,
- ExportForceDisableBlocksFalse_ForceTempBlocksFalse,
- ExportForceDisableBlocksFalse_ForceTempBlocksTrue)
-{
- using namespace std::literals;
- using namespace testutils;
-
- ScopedTemporaryDirectory TempDir;
- ScopedTemporaryDirectory ExportDir;
-
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
-
- std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
- std::filesystem::path RootDir = TempDir.Path() / "root";
- std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
- std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
- std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
-
- Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv,
- "proj1"sv,
- RootDir.string(),
- EngineRootDir.string(),
- ProjectRootDir.string(),
- ProjectFilePath.string()));
- Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog1", {});
- CHECK(Oplog);
-
- Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
- Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
- Oplog->AppendNewOplogEntry(
- CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
- Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
- Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(
- Oid::NewOid(),
- CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None)));
-
- FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024,
- .MaxChunksPerBlock = 1000,
- .MaxChunkEmbedSize = 32 * 1024u,
- .ChunkFileSizeLimit = 64u * 1024u},
- /*.FolderPath = */ ExportDir.Path(),
- /*.Name = */ std::string("oplog1"),
- /*OptionalBaseName = */ std::string(),
- /*.ForceDisableBlocks = */ Settings::ForceDisableBlocks,
- /*.ForceEnableTempBlocks = */ Settings::ForceEnableTempBlocks};
- std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options);
- RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
-
- RemoteProjectStore::Result ExportResult = SaveOplog(CidStore,
- *RemoteStore,
- *Project.Get(),
- *Oplog,
- Options.MaxBlockSize,
- Options.MaxChunksPerBlock,
- Options.MaxChunkEmbedSize,
- Options.ChunkFileSizeLimit,
- true,
- false,
- false,
- nullptr);
-
- CHECK(ExportResult.ErrorCode == 0);
-
- Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {});
- CHECK(OplogImport);
-
- RemoteProjectStore::Result ImportResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- /*Force*/ false,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ false,
- nullptr);
- CHECK(ImportResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- /*Force*/ true,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ false,
- nullptr);
- CHECK(ImportForceResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- /*Force*/ false,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ true,
- nullptr);
- CHECK(ImportCleanResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- /*Force*/ true,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ true,
- nullptr);
- CHECK(ImportForceCleanResult.ErrorCode == 0);
-}
-
TEST_CASE("project.store.gc")
{
using namespace std::literals;
@@ -7616,15 +6514,13 @@ TEST_CASE("project.store.gc")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
@@ -7817,15 +6713,13 @@ TEST_CASE("project.store.gc.prep")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
@@ -8033,15 +6927,13 @@ TEST_CASE("project.store.rpc.getchunks")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
@@ -8085,24 +6977,30 @@ TEST_CASE("project.store.rpc.getchunks")
Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(FilesOpId, RootDir, FilesOpIdAttachments));
}
+ auto GetChunks = [](zen::ProjectStore& ProjectStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, const CbObject& Cb) {
+ std::vector<ProjectStore::ChunkRequest> Requests = ProjectStore.ParseChunksRequests(Project, Oplog, Cb);
+ std::vector<ProjectStore::ChunkResult> Results =
+ Requests.empty() ? std::vector<ProjectStore::ChunkResult>{} : ProjectStore.GetChunks(Project, Oplog, Requests);
+ return ProjectStore.WriteChunksRequestResponse(Project, Oplog, std::move(Requests), std::move(Results));
+ };
+
+ Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
+ CHECK(Project1);
+ Ref<ProjectStore::Oplog> Oplog1 = Project1->OpenOplog("oplog1"sv, false, true);
+ CHECK(Oplog1);
// Invalid request
{
CbObjectWriter Request;
Request.BeginObject("WrongName"sv);
Request.EndObject();
CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv, "oplog1"sv, Request.Save(), Response);
- CHECK_EQ(HttpResponseCode::BadRequest, Result.first);
+ CHECK_THROWS(GetChunks(ProjectStore, *Project1, *Oplog1, Request.Save()));
}
// Empty request
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, std::vector<IoHash>{}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, std::vector<IoHash>{}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(0, Chunks.Num());
@@ -8110,23 +7008,15 @@ TEST_CASE("project.store.rpc.getchunks")
// Single non-existing chunk by RawHash
IoHash NotFoundIoHash = IoHash::Max;
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundIoHash}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundIoHash}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(0, Chunks.Num());
}
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundIoHash}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundIoHash}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(0, Chunks.Num());
@@ -8134,21 +7024,15 @@ TEST_CASE("project.store.rpc.getchunks")
// Single non-existing chunk by Id
Oid NotFoundId = Oid::NewOid();
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundId}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundId}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(0, Chunks.Num());
}
{
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv, "oplog1"sv, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundId}, {}, {}), Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundId}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(0, Chunks.Num());
@@ -8159,13 +7043,11 @@ TEST_CASE("project.store.rpc.getchunks")
IoHash FirstAttachmentHash = Attachments[OpIds[2]][1].second.DecodeRawHash();
uint64_t ResponseModTag = 0;
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {}),
- Response);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {}));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8185,13 +7067,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Fetch with matching ModTag
{
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {ResponseModTag}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {ResponseModTag}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8206,14 +7086,12 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Fetch with mismatching ModTag
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks(
- "proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}),
- Response);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8233,12 +7111,10 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Fresh modtime query
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8253,13 +7129,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Modtime query with matching ModTag
{
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {ResponseModTag}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {ResponseModTag}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8274,13 +7148,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Modtime query with mismatching ModTag
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks(
- "proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8300,12 +7172,8 @@ TEST_CASE("project.store.rpc.getchunks")
uint64_t ResponseModTag = 0;
{
// Full chunk request
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}));
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8325,13 +7193,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Partial chunk request
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{130 * 1024, 8100}}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{130 * 1024, 8100}}, {}));
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8356,13 +7222,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fetch with matching ModTag
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8377,13 +7241,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fetch with mismatching ModTag
- CbPackage Response;
- auto Result = ProjectStore.GetChunks(
- "proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}));
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8403,12 +7265,8 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Fresh modtime query
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8423,13 +7281,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Modtime query with matching ModTag
{
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8444,13 +7300,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Modtime query with mismatching ModTag
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks(
- "proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8471,12 +7325,8 @@ TEST_CASE("project.store.rpc.getchunks")
uint64_t ResponseModTag = 0;
{
// Full chunk request
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}));
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8496,13 +7346,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Partial chunk request
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{81823, 5434}}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{81823, 5434}}, {}));
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8524,13 +7372,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fetch with matching ModTag
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8545,13 +7391,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fetch with mismatching ModTag
- CbPackage Response;
- auto Result = ProjectStore.GetChunks(
- "proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}));
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8571,12 +7415,8 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Fresh modtime query
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8591,13 +7431,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Modtime query with matching ModTag
{
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8612,13 +7450,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Modtime query with mismatching ModTag
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks(
- "proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8644,13 +7480,9 @@ TEST_CASE("project.store.rpc.getchunks")
std::vector<uint64_t> ResponseModTags(3, 0);
{
// Fresh fetch
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, {}),
- Response);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, {}));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(3, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8677,13 +7509,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fetch with matching ModTag
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, ResponseModTags),
- Response);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, ResponseModTags));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8704,13 +7534,9 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fresh modtime query
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, {}),
- Response);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, {}));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8731,13 +7557,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Modtime query with matching ModTags
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, ResponseModTags),
- Response);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, ResponseModTags));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8763,13 +7587,12 @@ TEST_CASE("project.store.rpc.getchunks")
{
Tag++;
}
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, MismatchingModTags),
- Response);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, MismatchingModTags));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8801,13 +7624,9 @@ TEST_CASE("project.store.rpc.getchunks")
std::vector<uint64_t> ResponseModTags(3, 0);
{
// Fresh fetch
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, {}),
- Response);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, {}));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(3, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8834,13 +7653,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fetch with matching ModTag
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, ResponseModTags),
- Response);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, ResponseModTags));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8861,13 +7678,9 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fresh modtime query
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, {}),
- Response);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, {}));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8888,13 +7701,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Modtime query with matching ModTags
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, ResponseModTags),
- Response);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, ResponseModTags));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8920,13 +7731,11 @@ TEST_CASE("project.store.rpc.getchunks")
{
Tag++;
}
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, MismatchingModTags),
- Response);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, MismatchingModTags));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8955,15 +7764,13 @@ TEST_CASE("project.store.partial.read")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
@@ -9009,7 +7816,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[1]][0].first,
0,
~0ull,
- HttpContentType::kCompressedBinary,
+ ZenContentType::kCompressedBinary,
&ModificationTag);
CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::Ok, Result.Error);
@@ -9025,7 +7832,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[1]][0].first,
0,
~0ull,
- HttpContentType::kCompressedBinary,
+ ZenContentType::kCompressedBinary,
&ModificationTag);
CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::NotModified, Result2.Error);
}
@@ -9039,7 +7846,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[2]][1].first,
0,
~0ull,
- HttpContentType::kCompressedBinary,
+ ZenContentType::kCompressedBinary,
&FullChunkModificationTag);
CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok);
CHECK(Result.Chunk);
@@ -9053,7 +7860,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[2]][1].first,
0,
~0ull,
- HttpContentType::kCompressedBinary,
+ ZenContentType::kCompressedBinary,
&FullChunkModificationTag);
CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified);
}
@@ -9067,7 +7874,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[2]][1].first,
5,
1773,
- HttpContentType::kCompressedBinary,
+ ZenContentType::kCompressedBinary,
&PartialChunkModificationTag);
CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok);
@@ -9091,7 +7898,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[2]][1].first,
0,
1773,
- HttpContentType::kCompressedBinary,
+ ZenContentType::kCompressedBinary,
&PartialChunkModificationTag);
CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified);
}
@@ -9133,15 +7940,13 @@ TEST_CASE("project.store.iterateoplog")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "enginesv";
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 77aa32681..258be5930 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -3,10 +3,10 @@
#pragma once
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
#include <zencore/compositebuffer.h>
#include <zencore/uid.h>
#include <zencore/xxhash.h>
-#include <zenhttp/httpserver.h>
#include <zenstore/gc.h>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -14,16 +14,13 @@ ZEN_THIRD_PARTY_INCLUDES_START
ZEN_THIRD_PARTY_INCLUDES_END
#include <map>
-#include <unordered_map>
+#include <variant>
namespace zen {
-class CbPackage;
class CidStore;
class AuthMgr;
class ScrubContext;
-class JobQueue;
-class OpenProcessCache;
/** Project Store
@@ -46,12 +43,7 @@ public:
{
};
- ProjectStore(CidStore& Store,
- std::filesystem::path BasePath,
- GcManager& Gc,
- JobQueue& JobQueue,
- OpenProcessCache& InOpenProcessCache,
- const Configuration& Config);
+ ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, const Configuration& Config);
~ProjectStore();
struct LogSequenceNumber
@@ -518,41 +510,27 @@ public:
bool PutChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash, IoBuffer&& Chunk);
- struct WriteOplogResult
+ struct ChunkRequest
{
- int32_t ErrorCode = 0;
- std::string ErrorDescription;
- std::vector<IoHash> Need;
+ uint64_t Offset = 0;
+ uint64_t Size = (uint64_t)-1;
+ std::variant<IoHash, Oid> Id;
+ std::optional<uint64_t> ModTag;
+ bool SkipData = false;
};
- WriteOplogResult WriteOplog(Project& Project, Oplog& Oplog, const CbObject& ContainerObject);
-
- struct ReadOplogResult
+ struct ChunkResult
{
- int32_t ErrorCode = 0;
- std::string ErrorDescription;
- CbObject ContainerObject;
+ bool Exists = false;
+ IoBuffer ChunkBuffer;
+ uint64_t ModTag = 0;
};
- ReadOplogResult ReadOplog(Project& Project,
- Oplog& Oplog,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize,
- size_t MaxChunksPerBlock,
- size_t ChunkFileSizeLimit);
-
- std::pair<HttpResponseCode, std::string> GetChunks(const std::string_view ProjectId,
- const std::string_view OplogId,
- const CbObject& RequestObject,
- CbPackage& OutResponsePackage);
-
- bool Rpc(HttpServerRequest& HttpReq,
- const std::string_view ProjectId,
- const std::string_view OplogId,
- IoBuffer&& Payload,
- AuthMgr& AuthManager);
-
- std::pair<HttpResponseCode, std::string> Export(Ref<Project> Project, Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager);
+ std::vector<ChunkResult> GetChunks(Project& Project, Oplog& Oplog, std::span<const ChunkRequest> Requests);
- std::pair<HttpResponseCode, std::string> Import(Project& Project, Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager);
+ std::vector<ProjectStore::ChunkRequest> ParseChunksRequests(Project& Project, Oplog& Oplog, const CbObject& Cb);
+ CbPackage WriteChunksRequestResponse(Project& Project,
+ Oplog& Oplog,
+ std::vector<ChunkRequest>&& Requests,
+ std::vector<ChunkResult>&& Results);
bool AreDiskWritesAllowed() const;
@@ -564,8 +542,6 @@ private:
LoggerRef m_Log;
GcManager& m_Gc;
CidStore& m_CidStore;
- JobQueue& m_JobQueue;
- OpenProcessCache& m_OpenProcessCache;
std::filesystem::path m_ProjectBasePath;
const Configuration m_Config;
mutable RwLock m_ProjectsLock;
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 3d66670dd..c31c2297e 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -12,12 +12,19 @@
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
+#include <zenhttp/httpcommon.h>
#include <zenstore/cidstore.h>
#include <zenutil/chunkedfile.h>
#include <zenutil/workerpools.h>
#include <unordered_map>
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include "fileremoteprojectstore.h"
+#endif // ZEN_WITH_TESTS
+
namespace zen {
/*
@@ -3319,4 +3326,194 @@ RemoteProjectStore::~RemoteProjectStore()
{
}
+#if ZEN_WITH_TESTS
+
+namespace testutils {
+ using namespace std::literals;
+
+ static std::string OidAsString(const Oid& Id)
+ {
+ StringBuilder<25> OidStringBuilder;
+ Id.ToString(OidStringBuilder);
+ return OidStringBuilder.ToString();
+ }
+
+ static CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
+ {
+ CbPackage Package;
+ CbObjectWriter Object;
+ Object << "key"sv << OidAsString(Id);
+ if (!Attachments.empty())
+ {
+ Object.BeginArray("bulkdata");
+ for (const auto& Attachment : Attachments)
+ {
+ CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
+ Object.BeginObject();
+ Object << "id"sv << Attachment.first;
+ Object << "type"sv
+ << "Standard"sv;
+ Object << "data"sv << Attach;
+ Object.EndObject();
+
+ Package.AddAttachment(Attach);
+ }
+ Object.EndArray();
+ }
+ Package.SetObject(Object.Save());
+ return Package;
+ };
+
+ static std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(
+ const std::span<const size_t>& Sizes,
+ OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
+ uint64_t BlockSize = 0)
+ {
+ std::vector<std::pair<Oid, CompressedBuffer>> Result;
+ Result.reserve(Sizes.size());
+ for (size_t Size : Sizes)
+ {
+ CompressedBuffer Compressed =
+ CompressedBuffer::Compress(SharedBuffer(CreateSemiRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel, BlockSize);
+ Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
+ }
+ return Result;
+ }
+
+} // namespace testutils
+
+struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse
+{
+ static const bool ForceDisableBlocks = true;
+ static const bool ForceEnableTempBlocks = false;
+};
+
+struct ExportForceDisableBlocksFalse_ForceTempBlocksFalse
+{
+ static const bool ForceDisableBlocks = false;
+ static const bool ForceEnableTempBlocks = false;
+};
+
+struct ExportForceDisableBlocksFalse_ForceTempBlocksTrue
+{
+ static const bool ForceDisableBlocks = false;
+ static const bool ForceEnableTempBlocks = true;
+};
+
+TEST_CASE_TEMPLATE("project.store.export",
+ Settings,
+ ExportForceDisableBlocksTrue_ForceTempBlocksFalse,
+ ExportForceDisableBlocksFalse_ForceTempBlocksFalse,
+ ExportForceDisableBlocksFalse_ForceTempBlocksTrue)
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+ ScopedTemporaryDirectory ExportDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore";
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+ std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
+ std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
+
+ Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+ Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog1", {});
+ CHECK(Oplog);
+
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
+ Oplog->AppendNewOplogEntry(
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(
+ Oid::NewOid(),
+ CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None)));
+
+ FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024,
+ .MaxChunksPerBlock = 1000,
+ .MaxChunkEmbedSize = 32 * 1024u,
+ .ChunkFileSizeLimit = 64u * 1024u},
+ /*.FolderPath = */ ExportDir.Path(),
+ /*.Name = */ std::string("oplog1"),
+ /*OptionalBaseName = */ std::string(),
+ /*.ForceDisableBlocks = */ Settings::ForceDisableBlocks,
+ /*.ForceEnableTempBlocks = */ Settings::ForceEnableTempBlocks};
+ std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ RemoteProjectStore::Result ExportResult = SaveOplog(CidStore,
+ *RemoteStore,
+ *Project.Get(),
+ *Oplog,
+ Options.MaxBlockSize,
+ Options.MaxChunksPerBlock,
+ Options.MaxChunkEmbedSize,
+ Options.ChunkFileSizeLimit,
+ true,
+ false,
+ false,
+ nullptr);
+
+ CHECK(ExportResult.ErrorCode == 0);
+
+ Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {});
+ CHECK(OplogImport);
+
+ RemoteProjectStore::Result ImportResult = LoadOplog(CidStore,
+ *RemoteStore,
+ *OplogImport,
+ /*Force*/ false,
+ /*IgnoreMissingAttachments*/ false,
+ /*CleanOplog*/ false,
+ nullptr);
+ CHECK(ImportResult.ErrorCode == 0);
+
+ RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore,
+ *RemoteStore,
+ *OplogImport,
+ /*Force*/ true,
+ /*IgnoreMissingAttachments*/ false,
+ /*CleanOplog*/ false,
+ nullptr);
+ CHECK(ImportForceResult.ErrorCode == 0);
+
+ RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore,
+ *RemoteStore,
+ *OplogImport,
+ /*Force*/ false,
+ /*IgnoreMissingAttachments*/ false,
+ /*CleanOplog*/ true,
+ nullptr);
+ CHECK(ImportCleanResult.ErrorCode == 0);
+
+ RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore,
+ *RemoteStore,
+ *OplogImport,
+ /*Force*/ true,
+ /*IgnoreMissingAttachments*/ false,
+ /*CleanOplog*/ true,
+ nullptr);
+ CHECK(ImportForceCleanResult.ErrorCode == 0);
+}
+
+#endif // ZEN_WITH_TESTS
+
+void
+remoteprojectstore_forcelink()
+{
+}
+
} // namespace zen
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index 8e65bbd58..488656d8b 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -173,4 +173,6 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject);
std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes);
+void remoteprojectstore_forcelink();
+
} // namespace zen
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 643f0a45a..b4a4279ae 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -30,6 +30,10 @@
#include <zenutil/workerpools.h>
#include <zenutil/zenserverprocess.h>
+#if ZEN_WITH_TESTS
+# include "projectstore/remoteprojectstore.h"
+#endif // ZEN_WITH_TESTS
+
#if ZEN_PLATFORM_WINDOWS
# include <zencore/windows.h>
#endif
@@ -255,13 +259,9 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
ZEN_INFO("instantiating project service");
- m_ProjectStore = new ProjectStore(*m_CidStore,
- m_DataRoot / "projects",
- m_GcManager,
- *m_JobQueue,
- *m_OpenProcessCache,
- ProjectStore::Configuration{});
- m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatusService, m_StatsService, *m_AuthMgr});
+ m_ProjectStore = new ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, ProjectStore::Configuration{});
+ m_HttpProjectService.reset(
+ new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatusService, m_StatsService, *m_AuthMgr, *m_OpenProcessCache, *m_JobQueue});
if (ServerOptions.WorksSpacesConfig.Enabled)
{
@@ -1129,6 +1129,7 @@ void
zenserver_forcelinktests()
{
zen::prj_forcelink();
+ zen::remoteprojectstore_forcelink();
}
#endif