aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-02 15:56:00 +0200
committerGitHub Enterprise <[email protected]>2025-10-02 15:56:00 +0200
commitadbec59ee0b169ba0ac9d167eac4ed72edefe1ca (patch)
treec05d4e5624d98272969f6509fe5cf8a75d6ebf9d /src/zenserver/projectstore/projectstore.cpp
parentZs/OIDC exe path handling (#538) (diff)
downloadzen-adbec59ee0b169ba0ac9d167eac4ed72edefe1ca.tar.xz
zen-adbec59ee0b169ba0ac9d167eac4ed72edefe1ca.zip
projectstore refactor phase 2 (#539)
Refactor projectstore/httpprojectservice to prepare for move of projectstore to zenstore
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp2025
1 files changed, 415 insertions, 1610 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 181177653..5034a250a 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -3,36 +3,24 @@
#include "projectstore.h"
#include <zencore/assertfmt.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryutil.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
-#include <zencore/jobqueue.h>
#include <zencore/logging.h>
#include <zencore/memory/llm.h>
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenhttp/packageformat.h>
#include <zenstore/caslog.h>
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
-#include <zenutil/cache/rpcrecording.h>
-#include <zenutil/openprocesscache.h>
#include <zenutil/parallelwork.h>
#include <zenutil/referencemetadata.h>
#include <zenutil/workerpools.h>
-#include "buildsremoteprojectstore.h"
-#include "fileremoteprojectstore.h"
-#include "jupiterremoteprojectstore.h"
-#include "remoteprojectstore.h"
-#include "zenremoteprojectstore.h"
-
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_set.h>
#include <xxh3.h>
@@ -41,6 +29,9 @@ ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zenutil/chunkblock.h>
+
+# include <unordered_map>
#endif // ZEN_WITH_TESTS
namespace zen {
@@ -140,263 +131,6 @@ namespace {
return CheckWriteTime < ReferenceWriteTime;
}
- struct CreateRemoteStoreResult
- {
- std::shared_ptr<RemoteProjectStore> Store;
- std::string Description;
- };
- CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params,
- AuthMgr& AuthManager,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize,
- const std::filesystem::path& TempFilePath)
- {
- ZEN_MEMSCOPE(GetProjectstoreTag());
-
- using namespace std::literals;
-
- std::shared_ptr<RemoteProjectStore> RemoteStore;
-
- if (CbObjectView File = Params["file"sv].AsObjectView(); File)
- {
- std::filesystem::path FolderPath(File["path"sv].AsString());
- if (FolderPath.empty())
- {
- return {nullptr, "Missing file path"};
- }
- std::string_view Name(File["name"sv].AsString());
- if (Name.empty())
- {
- return {nullptr, "Missing file name"};
- }
- std::string_view OptionalBaseName(File["basename"sv].AsString());
- bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false);
- bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false);
-
- FileRemoteStoreOptions Options = {
- RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- FolderPath,
- std::string(Name),
- std::string(OptionalBaseName),
- ForceDisableBlocks,
- ForceEnableTempBlocks};
- RemoteStore = CreateFileRemoteStore(Options);
- }
-
- if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud)
- {
- std::string_view CloudServiceUrl = Cloud["url"sv].AsString();
- if (CloudServiceUrl.empty())
- {
- return {nullptr, "Missing service url"};
- }
-
- std::string Url = UrlDecode(CloudServiceUrl);
- std::string_view Namespace = Cloud["namespace"sv].AsString();
- if (Namespace.empty())
- {
- return {nullptr, "Missing namespace"};
- }
- std::string_view Bucket = Cloud["bucket"sv].AsString();
- if (Bucket.empty())
- {
- return {nullptr, "Missing bucket"};
- }
- std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString();
- std::string AccessToken = std::string(Cloud["access-token"sv].AsString());
- if (AccessToken.empty())
- {
- std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString();
- if (!AccessTokenEnvVariable.empty())
- {
- AccessToken = GetEnvVariable(AccessTokenEnvVariable);
- }
- }
- std::filesystem::path OidcExePath;
- if (std::string_view OidcExePathString = Cloud["oidc-exe-path"].AsString(); !OidcExePathString.empty())
- {
- std::filesystem::path OidcExePathMaybe(OidcExePathString);
- if (IsFile(OidcExePathMaybe))
- {
- OidcExePath = std::move(OidcExePathMaybe);
- }
- else
- {
- ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString);
- }
- }
- std::string_view KeyParam = Cloud["key"sv].AsString();
- if (KeyParam.empty())
- {
- return {nullptr, "Missing key"};
- }
- if (KeyParam.length() != IoHash::StringLength)
- {
- return {nullptr, "Invalid key"};
- }
- IoHash Key = IoHash::FromHexString(KeyParam);
- if (Key == IoHash::Zero)
- {
- return {nullptr, "Invalid key string"};
- }
- IoHash BaseKey = IoHash::Zero;
- std::string_view BaseKeyParam = Cloud["basekey"sv].AsString();
- if (!BaseKeyParam.empty())
- {
- if (BaseKeyParam.length() != IoHash::StringLength)
- {
- return {nullptr, "Invalid base key"};
- }
- BaseKey = IoHash::FromHexString(BaseKeyParam);
- if (BaseKey == IoHash::Zero)
- {
- return {nullptr, "Invalid base key string"};
- }
- }
-
- bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false);
- bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false);
- bool AssumeHttp2 = Cloud["assumehttp2"sv].AsBool(false);
-
- JupiterRemoteStoreOptions Options = {
- RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- Url,
- std::string(Namespace),
- std::string(Bucket),
- Key,
- BaseKey,
- std::string(OpenIdProvider),
- AccessToken,
- AuthManager,
- OidcExePath,
- ForceDisableBlocks,
- ForceDisableTempBlocks,
- AssumeHttp2};
- RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false);
- }
-
- if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen)
- {
- std::string_view Url = Zen["url"sv].AsString();
- std::string_view Project = Zen["project"sv].AsString();
- if (Project.empty())
- {
- return {nullptr, "Missing project"};
- }
- std::string_view Oplog = Zen["oplog"sv].AsString();
- if (Oplog.empty())
- {
- return {nullptr, "Missing oplog"};
- }
- ZenRemoteStoreOptions Options = {
- RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- std::string(Url),
- std::string(Project),
- std::string(Oplog)};
- RemoteStore = CreateZenRemoteStore(Options, TempFilePath);
- }
-
- if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds)
- {
- std::string_view BuildsServiceUrl = Builds["url"sv].AsString();
- if (BuildsServiceUrl.empty())
- {
- return {nullptr, "Missing service url"};
- }
-
- std::string Url = UrlDecode(BuildsServiceUrl);
- std::string_view Namespace = Builds["namespace"sv].AsString();
- if (Namespace.empty())
- {
- return {nullptr, "Missing namespace"};
- }
- std::string_view Bucket = Builds["bucket"sv].AsString();
- if (Bucket.empty())
- {
- return {nullptr, "Missing bucket"};
- }
- std::string_view OpenIdProvider = Builds["openid-provider"sv].AsString();
- std::string AccessToken = std::string(Builds["access-token"sv].AsString());
- if (AccessToken.empty())
- {
- std::string_view AccessTokenEnvVariable = Builds["access-token-env"].AsString();
- if (!AccessTokenEnvVariable.empty())
- {
- AccessToken = GetEnvVariable(AccessTokenEnvVariable);
- }
- }
- std::filesystem::path OidcExePath;
- if (std::string_view OidcExePathString = Builds["oidc-exe-path"].AsString(); !OidcExePathString.empty())
- {
- std::filesystem::path OidcExePathMaybe(OidcExePathString);
- if (IsFile(OidcExePathMaybe))
- {
- OidcExePath = std::move(OidcExePathMaybe);
- }
- else
- {
- ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString);
- }
- }
- std::string_view BuildIdParam = Builds["buildsid"sv].AsString();
- if (BuildIdParam.empty())
- {
- return {nullptr, "Missing build id"};
- }
- if (BuildIdParam.length() != Oid::StringLength)
- {
- return {nullptr, "Invalid build id"};
- }
- Oid BuildId = Oid::FromHexString(BuildIdParam);
- if (BuildId == Oid::Zero)
- {
- return {nullptr, "Invalid build id string"};
- }
-
- bool ForceDisableBlocks = Builds["disableblocks"sv].AsBool(false);
- bool ForceDisableTempBlocks = Builds["disabletempblocks"sv].AsBool(false);
- bool AssumeHttp2 = Builds["assumehttp2"sv].AsBool(false);
-
- MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView();
- IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize());
-
- BuildsRemoteStoreOptions Options = {
- RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- Url,
- std::string(Namespace),
- std::string(Bucket),
- BuildId,
- std::string(OpenIdProvider),
- AccessToken,
- AuthManager,
- OidcExePath,
- ForceDisableBlocks,
- ForceDisableTempBlocks,
- AssumeHttp2,
- MetaData};
- RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false);
- }
-
- if (!RemoteStore)
- {
- return {nullptr, "Unknown remote store type"};
- }
-
- return {std::move(RemoteStore), ""};
- }
-
- std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result)
- {
- if (Result.ErrorCode == 0)
- {
- return {HttpResponseCode::OK, Result.Text};
- }
- return {static_cast<HttpResponseCode>(Result.ErrorCode),
- Result.Reason.empty() ? Result.Text
- : Result.Text.empty() ? Result.Reason
- : fmt::format("{}: {}", Result.Reason, Result.Text)};
- }
-
std::pair<int32_t, int32_t> GetPagedRange(int32_t TotalSize, const ProjectStore::Oplog::Paging& EntryPaging)
{
int32_t Start = std::clamp(EntryPaging.Start, 0, TotalSize);
@@ -4292,17 +4026,10 @@ ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const
//////////////////////////////////////////////////////////////////////////
-ProjectStore::ProjectStore(CidStore& Store,
- std::filesystem::path BasePath,
- GcManager& Gc,
- JobQueue& JobQueue,
- OpenProcessCache& InOpenProcessCache,
- const Configuration& Config)
+ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, const Configuration& Config)
: m_Log(logging::Get("project"))
, m_Gc(Gc)
, m_CidStore(Store)
-, m_JobQueue(JobQueue)
-, m_OpenProcessCache(InOpenProcessCache)
, m_ProjectBasePath(BasePath)
, m_Config(Config)
, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker())
@@ -4990,7 +4717,7 @@ ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, cons
}
uint64_t ChunkSize = Chunk.GetSize();
- if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
+ if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
{
IoHash RawHash;
uint64_t RawSize;
@@ -5165,6 +4892,7 @@ IoBuffer
ProjectStore::GetChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::GetChunk");
ZEN_UNUSED(Project, Oplog);
IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash);
@@ -5182,6 +4910,7 @@ IoBuffer
ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const Oid& ChunkId)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::GetChunk");
Ref<Project> Project = OpenProject(ProjectId);
if (!Project)
@@ -5200,6 +4929,7 @@ IoBuffer
ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const IoHash& Cid)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::GetChunk");
Ref<Project> Project = OpenProject(ProjectId);
if (!Project)
@@ -5218,6 +4948,8 @@ bool
ProjectStore::PutChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash, IoBuffer&& Chunk)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("ProjectStore::PutChunk");
+
IoHash RawHash;
uint64_t RawSize;
CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
@@ -5232,1041 +4964,340 @@ ProjectStore::PutChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash,
return Result.New;
}
-std::pair<HttpResponseCode, std::string>
-ProjectStore::GetChunks(const std::string_view ProjectId,
- const std::string_view OplogId,
- const CbObject& RequestObject,
- CbPackage& OutResponsePackage)
+std::vector<ProjectStore::ChunkResult>
+ProjectStore::GetChunks(Project& Project, Oplog& Oplog, std::span<const ChunkRequest> Requests)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_TRACE_CPU("Store::GetChunks");
+ ZEN_TRACE_CPU("ProjectStore::GetChunks");
- using namespace std::literals;
+ ZEN_ASSERT(!Requests.empty());
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("getchunks rpc request for unknown project '{}'", ProjectId)};
- }
- Project->TouchProject();
+ std::vector<ProjectStore::ChunkResult> Results;
+ size_t RequestCount = Requests.size();
- Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false);
- if (!FoundLog)
- {
- return {HttpResponseCode::NotFound, fmt::format("getchunks rpc request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
- Project->TouchOplog(OplogId);
+ Results.resize(RequestCount);
- if (RequestObject["chunks"sv].IsArray())
+ if (RequestCount > 1)
{
- // Legacy full chunks only by rawhash
+ std::vector<IoHash> ChunkRawHashes;
+ std::vector<size_t> ChunkRawHashesRequestIndex;
+ std::vector<Oid> ChunkIds;
+ std::vector<size_t> ChunkIdsRequestIndex;
- CbArrayView ChunksArray = RequestObject["chunks"sv].AsArrayView();
+ ChunkRawHashes.reserve(RequestCount);
+ ChunkRawHashesRequestIndex.reserve(RequestCount);
+ ChunkIds.reserve(RequestCount);
+ ChunkIdsRequestIndex.reserve(RequestCount);
- CbObjectWriter ResponseWriter;
- ResponseWriter.BeginArray("chunks"sv);
- for (CbFieldView FieldView : ChunksArray)
+ for (size_t RequestIndex = 0; RequestIndex < Requests.size(); RequestIndex++)
{
- IoHash RawHash = FieldView.AsHash();
- IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash);
- if (ChunkBuffer)
+ const ChunkRequest& Request = Requests[RequestIndex];
+ if (Request.Id.index() == 0)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer));
- if (Compressed)
+ ChunkRawHashes.push_back(std::get<IoHash>(Request.Id));
+ ChunkRawHashesRequestIndex.push_back(RequestIndex);
+ }
+ else
+ {
+ ChunkIds.push_back(std::get<Oid>(Request.Id));
+ ChunkIdsRequestIndex.push_back(RequestIndex);
+ }
+ }
+
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();
+ if (!ChunkRawHashes.empty())
+ {
+ Oplog.IterateChunks(
+ ChunkRawHashes,
+ true,
+ [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool {
+ if (Payload)
+ {
+ size_t RequestIndex = ChunkRawHashesRequestIndex[Index];
+ const ChunkRequest& Request = Requests[RequestIndex];
+ ChunkResult& Result = Results[RequestIndex];
+ Result.Exists = true;
+ if (!Request.SkipData)
+ {
+ Result.ChunkBuffer = std::move(Payload);
+ Result.ChunkBuffer.MakeOwned();
+ }
+ Result.ModTag = ModTag;
+ }
+ return true;
+ },
+ &WorkerPool,
+ 8u * 1024);
+ }
+ if (!ChunkIdsRequestIndex.empty())
+ {
+ Oplog.IterateChunks(
+ Project.RootDir,
+ ChunkIds,
+ true,
+ [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool {
+ if (Payload)
+ {
+ size_t RequestIndex = ChunkIdsRequestIndex[Index];
+ const ChunkRequest& Request = Requests[RequestIndex];
+ ChunkResult& Result = Results[RequestIndex];
+ Result.Exists = true;
+ if (!Request.SkipData)
+ {
+ Result.ChunkBuffer = std::move(Payload);
+ Result.ChunkBuffer.MakeOwned();
+ }
+ Result.ModTag = ModTag;
+ }
+ return true;
+ },
+ &WorkerPool,
+ 8u * 1024);
+ }
+ }
+ else
+ {
+ const ChunkRequest& Request = Requests.front();
+ ChunkResult& Result = Results.front();
+
+ if (Request.Id.index() == 0)
+ {
+ const IoHash& ChunkHash = std::get<IoHash>(Request.Id);
+ IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash);
+ if (Payload)
+ {
+ Result.Exists = true;
+ Result.ModTag = GetModificationTagFromRawHash(ChunkHash);
+ if (!Request.SkipData)
{
- ResponseWriter.AddHash(RawHash);
- OutResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
+ Result.ChunkBuffer = std::move(Payload);
+ Result.ChunkBuffer.MakeOwned();
}
- else
+ }
+ }
+ else
+ {
+ const Oid& ChunkId = std::get<Oid>(Request.Id);
+ uint64_t ModTag = 0;
+ IoBuffer Payload = Oplog.FindChunk(Project.RootDir, ChunkId, &ModTag);
+ if (Payload)
+ {
+ Result.Exists = true;
+ Result.ModTag = ModTag;
+ if (!Request.SkipData)
{
- ZEN_WARN("oplog '{}/{}': invalid compressed binary in cas store for {}", ProjectId, OplogId, RawHash);
+ Result.ChunkBuffer = std::move(Payload);
+ Result.ChunkBuffer.MakeOwned();
}
}
}
- ResponseWriter.EndArray();
- OutResponsePackage.SetObject(ResponseWriter.Save());
- return {HttpResponseCode::OK, {}};
}
- else if (auto RequestFieldView = RequestObject["Request"sv]; RequestFieldView.IsObject())
+ return Results;
+}
+
+std::vector<ProjectStore::ChunkRequest>
+ProjectStore::ParseChunksRequests(Project& Project, Oplog& Oplog, const CbObject& Cb)
+{
+ ZEN_TRACE_CPU("Store::Rpc::getchunks");
+
+ using namespace std::literals;
+
+ std::vector<ChunkRequest> Requests;
+
+ if (auto RequestFieldView = Cb["Request"sv]; RequestFieldView.IsObject())
{
CbObjectView RequestView = RequestFieldView.AsObjectView();
bool SkipData = RequestView["SkipData"].AsBool(false);
CbArrayView ChunksArray = RequestView["Chunks"sv].AsArrayView();
- struct Request
- {
- struct InputData
- {
- uint64_t Offset = 0;
- uint64_t Size = (uint64_t)-1;
- std::variant<IoHash, Oid> Id;
- std::optional<uint64_t> ModTag;
- } Input;
- struct OutputData
- {
- bool Exists = false;
- IoBuffer ChunkBuffer;
- uint64_t ModTag = 0;
- } Output;
- };
-
- std::vector<Request> Requests;
- size_t RequestCount = ChunksArray.Num();
+ size_t RequestCount = ChunksArray.Num();
if (RequestCount > 0)
{
Requests.reserve(RequestCount);
- std::vector<IoHash> ChunkRawHashes;
- std::vector<size_t> ChunkRawHashesRequestIndex;
- std::vector<Oid> ChunkIds;
- std::vector<size_t> ChunkIdsRequestIndex;
- bool DoBatch = RequestCount > 1;
- if (DoBatch)
- {
- ChunkRawHashes.reserve(RequestCount);
- ChunkRawHashesRequestIndex.reserve(RequestCount);
- ChunkIds.reserve(RequestCount);
- ChunkIdsRequestIndex.reserve(RequestCount);
- }
for (CbFieldView FieldView : ChunksArray)
{
CbObjectView ChunkObject = FieldView.AsObjectView();
- Request ChunkRequest = {
- .Input{.Offset = ChunkObject["Offset"sv].AsUInt64(0), .Size = ChunkObject["Size"sv].AsUInt64((uint64_t)-1)}};
+ ChunkRequest ChunkRequest = {.Offset = ChunkObject["Offset"sv].AsUInt64(0),
+ .Size = ChunkObject["Size"sv].AsUInt64((uint64_t)-1),
+ .SkipData = SkipData};
if (CbFieldView InputModificationTagView = ChunkObject.FindView("ModTag"); InputModificationTagView.IsInteger())
{
- ChunkRequest.Input.ModTag = InputModificationTagView.AsUInt64();
+ ChunkRequest.ModTag = InputModificationTagView.AsUInt64();
}
if (CbFieldView RawHashView = ChunkObject.FindView("RawHash"sv); RawHashView.IsHash())
{
const IoHash ChunkHash = RawHashView.AsHash();
- ChunkRequest.Input.Id = ChunkHash;
- if (DoBatch)
- {
- ChunkRawHashes.push_back(ChunkHash);
- ChunkRawHashesRequestIndex.push_back(Requests.size());
- }
+ ChunkRequest.Id = ChunkHash;
}
else if (CbFieldView IdView = ChunkObject.FindView("Oid"sv); IdView.IsObjectId())
{
- const Oid ChunkId = IdView.AsObjectId();
- ChunkRequest.Input.Id = ChunkId;
- if (DoBatch)
- {
- ChunkIds.push_back(ChunkId);
- ChunkIdsRequestIndex.push_back(Requests.size());
- }
+ const Oid ChunkId = IdView.AsObjectId();
+ ChunkRequest.Id = ChunkId;
}
else
{
- return {HttpResponseCode::BadRequest,
- fmt::format("oplog '{}/{}': malformed getchunks rpc request object, chunk request has no identifier",
- ProjectId,
- OplogId)};
+ throw std::runtime_error(
+ fmt::format("oplog '{}/{}': malformed getchunks rpc request object, chunk request has no identifier",
+ Project.Identifier,
+ Oplog.OplogId()));
}
Requests.emplace_back(std::move(ChunkRequest));
}
-
- if (DoBatch)
- {
- WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();
- if (!ChunkRawHashes.empty())
- {
- FoundLog->IterateChunks(
- ChunkRawHashes,
- true,
- [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool {
- if (Payload)
- {
- size_t RequestIndex = ChunkRawHashesRequestIndex[Index];
- Requests[RequestIndex].Output.Exists = true;
- if (!SkipData)
- {
- Requests[RequestIndex].Output.ChunkBuffer = Payload;
- Requests[RequestIndex].Output.ChunkBuffer.MakeOwned();
- }
- Requests[RequestIndex].Output.ModTag = ModTag;
- }
- return true;
- },
- &WorkerPool,
- 8u * 1024);
- }
- if (!ChunkIdsRequestIndex.empty())
- {
- FoundLog->IterateChunks(
- Project->RootDir,
- ChunkIds,
- true,
- [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool {
- if (Payload)
- {
- size_t RequestIndex = ChunkIdsRequestIndex[Index];
- Requests[RequestIndex].Output.Exists = true;
- if (!SkipData)
- {
- Requests[RequestIndex].Output.ChunkBuffer = Payload;
- Requests[RequestIndex].Output.ChunkBuffer.MakeOwned();
- }
- Requests[RequestIndex].Output.ModTag = ModTag;
- }
- return true;
- },
- &WorkerPool,
- 8u * 1024);
- }
- }
- else
- {
- Request& ChunkRequest = Requests.front();
- if (ChunkRequest.Input.Id.index() == 0)
- {
- const IoHash& ChunkHash = std::get<IoHash>(ChunkRequest.Input.Id);
- IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash);
- if (Payload)
- {
- ChunkRequest.Output.Exists = true;
- ChunkRequest.Output.ModTag = GetModificationTagFromRawHash(ChunkHash);
- if (!SkipData)
- {
- ChunkRequest.Output.ChunkBuffer = Payload;
- }
- }
- }
- else
- {
- const Oid& ChunkId = std::get<Oid>(ChunkRequest.Input.Id);
- uint64_t ModTag = 0;
- IoBuffer Payload = FoundLog->FindChunk(Project->RootDir, ChunkId, &ModTag);
- if (Payload)
- {
- ChunkRequest.Output.Exists = true;
- ChunkRequest.Output.ModTag = ModTag;
- if (!SkipData)
- {
- ChunkRequest.Output.ChunkBuffer = Payload;
- }
- }
- }
- }
- }
-
- CbObjectWriter ResponseWriter(32 + Requests.size() * 64u);
- ResponseWriter.BeginArray("Chunks"sv);
- {
- for (Request& ChunkRequest : Requests)
- {
- if (ChunkRequest.Output.Exists)
- {
- ResponseWriter.BeginObject();
- {
- if (ChunkRequest.Input.Id.index() == 0)
- {
- const IoHash& RawHash = std::get<IoHash>(ChunkRequest.Input.Id);
- ResponseWriter.AddHash("Id", RawHash);
- }
- else
- {
- const Oid& Id = std::get<Oid>(ChunkRequest.Input.Id);
- ResponseWriter.AddObjectId("Id", Id);
- }
- if (!ChunkRequest.Input.ModTag.has_value() || ChunkRequest.Input.ModTag.value() != ChunkRequest.Output.ModTag)
- {
- ResponseWriter.AddInteger("ModTag", ChunkRequest.Output.ModTag);
- if (!SkipData)
- {
- auto ExtractRangeResult = ExtractRange(std::move(ChunkRequest.Output.ChunkBuffer),
- ChunkRequest.Input.Offset,
- ChunkRequest.Input.Size,
- ZenContentType::kCompressedBinary);
- if (ExtractRangeResult.Error == GetChunkRangeResult::EError::Ok)
- {
- if (ExtractRangeResult.ContentType == ZenContentType::kCompressedBinary)
- {
- ZEN_ASSERT(ExtractRangeResult.RawHash != IoHash::Zero);
- CompressedBuffer CompressedValue =
- CompressedBuffer::FromCompressedNoValidate(std::move(ExtractRangeResult.Chunk));
- ZEN_ASSERT(CompressedValue);
-
- if (ExtractRangeResult.RawSize != 0)
- {
- // This really could use some thought so we don't send the same data if we get a request for
- // multiple ranges from the same chunk block
-
- uint64_t FragmentRawOffset = 0;
- OodleCompressor Compressor;
- OodleCompressionLevel CompressionLevel;
- uint64_t BlockSize = 0;
- if (CompressedValue.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
- {
- if (BlockSize > 0)
- {
- FragmentRawOffset = (ChunkRequest.Input.Offset / BlockSize) * BlockSize;
- }
- else
- {
- FragmentRawOffset = ChunkRequest.Input.Offset;
- }
- uint64_t FragmentRawLength = CompressedValue.DecodeRawSize();
-
- IoHashStream FragmentHashStream;
- FragmentHashStream.Append(ExtractRangeResult.RawHash.Hash,
- sizeof(ExtractRangeResult.RawHash.Hash));
- FragmentHashStream.Append(&FragmentRawOffset, sizeof(FragmentRawOffset));
- FragmentHashStream.Append(&FragmentRawLength, sizeof(FragmentRawLength));
- IoHash FragmentHash = FragmentHashStream.GetHash();
-
- ResponseWriter.AddHash("FragmentHash", FragmentHash);
- ResponseWriter.AddInteger("FragmentOffset", FragmentRawOffset);
- ResponseWriter.AddInteger("RawSize", ExtractRangeResult.RawSize);
- OutResponsePackage.AddAttachment(CbAttachment(CompressedValue, FragmentHash));
- }
- else
- {
- std::string ErrorString =
- "Failed to get compression parameters from partial compressed buffer";
- ResponseWriter.AddString("Error", ErrorString);
- ZEN_WARN("oplog '{}/{}': {}", ProjectId, OplogId, ErrorString);
- }
- }
- else
- {
- ResponseWriter.AddHash("RawHash"sv, ExtractRangeResult.RawHash);
- OutResponsePackage.AddAttachment(
- CbAttachment(std::move(CompressedValue), ExtractRangeResult.RawHash));
- }
- }
- else
- {
- IoHashStream HashStream;
- ZEN_ASSERT(ChunkRequest.Input.Id.index() == 1);
- const Oid& Id = std::get<Oid>(ChunkRequest.Input.Id);
- HashStream.Append(Id.OidBits, sizeof(Id.OidBits));
- HashStream.Append(&ChunkRequest.Input.Offset, sizeof(ChunkRequest.Input.Offset));
- HashStream.Append(&ChunkRequest.Input.Size, sizeof(ChunkRequest.Input.Size));
- IoHash Hash = HashStream.GetHash();
-
- ResponseWriter.AddHash("Hash"sv, Hash);
- if (ExtractRangeResult.RawSize != 0)
- {
- ResponseWriter.AddInteger("Size", ExtractRangeResult.RawSize);
- }
- OutResponsePackage.AddAttachment(CbAttachment(std::move(ExtractRangeResult.Chunk), Hash));
- }
- }
- else
- {
- std::string ErrorString =
- fmt::format("Failed fetching chunk range ({})", ExtractRangeResult.ErrorDescription);
- ResponseWriter.AddString("Error", ErrorString);
- ZEN_WARN("oplog '{}/{}': {}", ProjectId, OplogId, ErrorString);
- }
- }
- }
- }
- ResponseWriter.EndObject();
- }
- }
}
- ResponseWriter.EndArray();
- OutResponsePackage.SetObject(ResponseWriter.Save());
- return {HttpResponseCode::OK, {}};
}
- else
+ else if (CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); ChunksArray)
{
- return {HttpResponseCode::BadRequest, fmt::format("oplog '{}/{}': malformed getchunks rpc request object", ProjectId, OplogId)};
- }
-}
-
-ProjectStore::WriteOplogResult
-ProjectStore::WriteOplog(Project& Project, Oplog& Oplog, const CbObject& ContainerObject)
-{
- ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_TRACE_CPU("Store::WriteOplog");
- ZEN_UNUSED(Project);
-
- CidStore& ChunkStore = m_CidStore;
- RwLock AttachmentsLock;
- tsl::robin_set<IoHash, IoHash::Hasher> Attachments;
-
- auto HasAttachment = [&ChunkStore](const IoHash& RawHash) { return ChunkStore.ContainsChunk(RawHash); };
- auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) {
- RwLock::ExclusiveLockScope _(AttachmentsLock);
- if (BlockHash != IoHash::Zero)
- {
- Attachments.insert(BlockHash);
- }
- else
- {
- Attachments.insert(ChunkHashes.begin(), ChunkHashes.end());
- }
- };
- auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) {
- RwLock::ExclusiveLockScope _(AttachmentsLock);
- Attachments.insert(RawHash);
- };
-
- auto OnChunkedAttachment = [](const ChunkedInfo&) {};
-
- auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); };
+ // Legacy full chunks only by rawhash
- // Make sure we retain any attachments we download before writing the oplog
- Oplog.EnableUpdateCapture();
- auto _ = MakeGuard([&Oplog]() { Oplog.DisableUpdateCapture(); });
+ size_t RequestCount = ChunksArray.Num();
- RemoteProjectStore::Result RemoteResult = SaveOplogContainer(Oplog,
- ContainerObject,
- OnReferencedAttachments,
- HasAttachment,
- OnNeedBlock,
- OnNeedAttachment,
- OnChunkedAttachment,
- nullptr);
+ Requests.reserve(RequestCount);
- if (RemoteResult.ErrorCode)
- {
- return ProjectStore::WriteOplogResult{.ErrorCode = RemoteResult.ErrorCode, .ErrorDescription = RemoteResult.Reason};
+ std::vector<IoHash> Cids;
+ Cids.reserve(ChunksArray.Num());
+ for (CbFieldView FieldView : ChunksArray)
+ {
+ Requests.push_back(ProjectStore::ChunkRequest{.Id = FieldView.AsHash()});
+ }
}
-
- return ProjectStore::WriteOplogResult{.Need = std::vector<IoHash>(Attachments.begin(), Attachments.end())};
-}
-
-ProjectStore::ReadOplogResult
-ProjectStore::ReadOplog(Project& Project,
- Oplog& Oplog,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize,
- size_t MaxChunksPerBlock,
- size_t ChunkFileSizeLimit)
-{
- ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_TRACE_CPU("Store::ReadOplog");
-
- CidStore& ChunkStore = m_CidStore;
-
- RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
- ChunkStore,
- Project,
- Oplog,
- MaxBlockSize,
- MaxChunkEmbedSize,
- MaxChunksPerBlock,
- ChunkFileSizeLimit,
- /* BuildBlocks */ false,
- /* IgnoreMissingAttachments */ false,
- /* AllowChunking*/ false,
- [](CompressedBuffer&&, ChunkBlockDescription&&) {},
- [](const IoHash&, TGetAttachmentBufferFunc&&) {},
- [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {},
- /* EmbedLooseFiles*/ false);
-
- if (ContainerResult.ErrorCode)
+ else
{
- return ProjectStore::ReadOplogResult{.ErrorCode = ContainerResult.ErrorCode, .ErrorDescription = ContainerResult.Reason};
+ throw std::runtime_error(fmt::format("oplog '{}/{}': malformed getchunks rpc request object", Project.Identifier, Oplog.OplogId()));
}
-
- return ProjectStore::ReadOplogResult{.ContainerObject = std::move(ContainerResult.ContainerObject)};
+ return Requests;
}
-bool
-ProjectStore::Rpc(HttpServerRequest& HttpReq,
- const std::string_view ProjectId,
- const std::string_view OplogId,
- IoBuffer&& Payload,
- AuthMgr& AuthManager)
+CbPackage
+ProjectStore::WriteChunksRequestResponse(Project& Project,
+ Oplog& Oplog,
+ std::vector<ChunkRequest>&& Requests,
+ std::vector<ChunkResult>&& Results)
{
- ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_TRACE_CPU("Store::Rpc");
-
using namespace std::literals;
- HttpContentType PayloadContentType = HttpReq.RequestContentType();
- CbPackage Package;
- CbObject Cb;
- switch (PayloadContentType)
- {
- case HttpContentType::kJSON:
- case HttpContentType::kUnknownContentType:
- case HttpContentType::kText:
- {
- std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
- Cb = LoadCompactBinaryFromJson(JsonText).AsObject();
- if (!Cb)
- {
- HttpReq.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Content format not supported, expected JSON format");
- return false;
- }
- }
- break;
- case HttpContentType::kCbObject:
- {
- CbValidateError ValidateResult;
- if (Cb = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult);
- ValidateResult != CbValidateError::None || !Cb)
- {
- HttpReq.WriteResponse(
- HttpResponseCode::BadRequest,
- HttpContentType::kText,
- fmt::format("Content format not supported, expected compact binary format ('{}')", ToString(ValidateResult)));
- return false;
- }
- break;
- }
- case HttpContentType::kCbPackage:
- try
- {
- Package = ParsePackageMessage(Payload);
- Cb = Package.GetObject();
- }
- catch (const std::invalid_argument& ex)
- {
- HttpReq.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- fmt::format("Failed to parse package request, reason: '{}'", ex.what()));
- return false;
- }
- if (!Cb)
- {
- HttpReq.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Content format not supported, expected package message format");
- return false;
- }
- break;
- default:
- HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type");
- return false;
- }
-
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- HttpReq.WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("Rpc oplog request for unknown project '{}'", ProjectId));
- return true;
- }
- Project->TouchProject();
-
- std::string_view Method = Cb["method"sv].AsString();
- bool VerifyPathOnDisk = Method != "getchunks"sv;
+ CbPackage ResponsePackage;
- Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk);
- if (!Oplog)
- {
- HttpReq.WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId));
- return true;
- }
- Project->TouchOplog(OplogId);
-
- uint32_t MethodHash = HashStringDjb2(Method);
-
- switch (MethodHash)
+ CbObjectWriter ResponseWriter(32 + Requests.size() * 64u);
+ ResponseWriter.BeginArray("Chunks"sv);
{
- case HashStringDjb2("import"sv):
- {
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
- std::pair<HttpResponseCode, std::string> Result =
- Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- if (Result.second.empty())
- {
- HttpReq.WriteResponse(Result.first);
- return Result.first != HttpResponseCode::BadRequest;
- }
- HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- return true;
- }
- case HashStringDjb2("export"sv):
- {
- std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- return true;
- }
- case HashStringDjb2("getchunks"sv):
+ for (size_t Index = 0; Index < Requests.size(); Index++)
+ {
+ const ChunkRequest& Request = Requests[Index];
+ ChunkResult& Result = Results[Index];
+ if (Result.Exists)
{
- ZEN_TRACE_CPU("Store::Rpc::getchunks");
-
- RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u));
- int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0);
-
- CbPackage ResponsePackage;
- std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage);
- if (Result.first == HttpResponseCode::OK)
+ ResponseWriter.BeginObject();
{
- void* TargetProcessHandle = nullptr;
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ if (Request.Id.index() == 0)
{
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
- {
- Flags |= FormatFlags::kDenyPartialLocalReferences;
- }
- TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId);
+ const IoHash& RawHash = std::get<IoHash>(Request.Id);
+ ResponseWriter.AddHash("Id", RawHash);
}
-
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle);
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- }
- return true;
- }
- case HashStringDjb2("putchunks"sv):
- {
- ZEN_TRACE_CPU("Store::Rpc::putchunks");
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
-
- CbObject Object = Package.GetObject();
- const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false);
-
- std::span<const CbAttachment> Attachments = Package.GetAttachments();
- if (!Attachments.empty())
- {
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
-
- WriteAttachmentBuffers.reserve(Attachments.size());
- WriteRawHashes.reserve(Attachments.size());
-
- for (const CbAttachment& Attachment : Attachments)
+ else
{
- IoHash RawHash = Attachment.GetHash();
- const CompressedBuffer& Compressed = Attachment.AsCompressedBinary();
- IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer();
- if (UsingTempFiles)
- {
- AttachmentPayload.SetDeleteOnClose(true);
- }
- WriteAttachmentBuffers.push_back(std::move(AttachmentPayload));
- WriteRawHashes.push_back(RawHash);
+ const Oid& Id = std::get<Oid>(Request.Id);
+ ResponseWriter.AddObjectId("Id", Id);
}
-
- Oplog->CaptureAddedAttachments(WriteRawHashes);
- m_CidStore.AddChunks(WriteAttachmentBuffers,
- WriteRawHashes,
- UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly);
- }
- HttpReq.WriteResponse(HttpResponseCode::OK);
- return true;
- }
- case HashStringDjb2("snapshot"sv):
- {
- ZEN_TRACE_CPU("Store::Rpc::snapshot");
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
-
- // Snapshot all referenced files. This brings the content of all
- // files into the CID store
-
- uint32_t OpCount = 0;
- uint64_t InlinedBytes = 0;
- uint64_t InlinedFiles = 0;
- uint64_t TotalBytes = 0;
- uint64_t TotalFiles = 0;
-
- std::vector<CbObject> NewOps;
- struct AddedChunk
- {
- IoBuffer Buffer;
- uint64_t RawSize = 0;
- };
- tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks;
-
- Oplog->IterateOplog(
- [&](CbObjectView Op) {
- bool OpRewritten = false;
- bool AllOk = true;
-
- CbWriter FilesArrayWriter;
- FilesArrayWriter.BeginArray("files"sv);
-
- for (CbFieldView& Field : Op["files"sv])
+ if (!Request.ModTag.has_value() || Request.ModTag.value() != Result.ModTag)
+ {
+ ResponseWriter.AddInteger("ModTag", Result.ModTag);
+ if (!Request.SkipData)
{
- bool CopyField = true;
-
- if (CbObjectView View = Field.AsObjectView())
+ auto ExtractRangeResult = ExtractRange(std::move(Result.ChunkBuffer),
+ Request.Offset,
+ Request.Size,
+ ZenContentType::kCompressedBinary);
+ if (ExtractRangeResult.Error == GetChunkRangeResult::EError::Ok)
{
- const IoHash DataHash = View["data"sv].AsHash();
-
- if (DataHash == IoHash::Zero)
+ if (ExtractRangeResult.ContentType == ZenContentType::kCompressedBinary)
{
- std::string_view ServerPath = View["serverpath"sv].AsString();
- std::filesystem::path FilePath = Project->RootDir / ServerPath;
- BasicFile DataFile;
- std::error_code Ec;
- DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
+ ZEN_ASSERT(ExtractRangeResult.RawHash != IoHash::Zero);
+ CompressedBuffer CompressedValue =
+ CompressedBuffer::FromCompressedNoValidate(std::move(ExtractRangeResult.Chunk));
+ ZEN_ASSERT(CompressedValue);
- if (Ec)
+ if (ExtractRangeResult.RawSize != 0)
{
- // Error...
-
- ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message());
-
- AllOk = false;
- }
- else
- {
- // Read file contents into memory, compress and keep in map of chunks to add to Cid store
- IoBuffer FileIoBuffer = DataFile.ReadAll();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
- const uint64_t RawSize = Compressed.DecodeRawSize();
- const IoHash RawHash = Compressed.DecodeRawHash();
- if (!AddedChunks.contains(RawHash))
+ // This really could use some thought so we don't send the same data if we get a request for
+ // multiple ranges from the same chunk block
+
+ uint64_t FragmentRawOffset = 0;
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize = 0;
+ if (CompressedValue.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
{
- const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString();
- BasicFile ChunkTempFile;
- ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete);
- ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec);
- if (Ec)
+ if (BlockSize > 0)
{
- Oid ChunkId = View["id"sv].AsObjectId();
- ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}",
- FilePath,
- ChunkId,
- Ec.message());
- AllOk = false;
+ FragmentRawOffset = (Request.Offset / BlockSize) * BlockSize;
}
else
{
- void* FileHandle = ChunkTempFile.Detach();
- IoBuffer ChunkBuffer(IoBuffer::File,
- FileHandle,
- 0,
- Compressed.GetCompressed().GetSize(),
- /*IsWholeFile*/ true);
- ChunkBuffer.SetDeleteOnClose(true);
- AddedChunks.insert_or_assign(
- RawHash,
- AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize});
+ FragmentRawOffset = Request.Offset;
}
+ uint64_t FragmentRawLength = CompressedValue.DecodeRawSize();
+
+ IoHashStream FragmentHashStream;
+ FragmentHashStream.Append(ExtractRangeResult.RawHash.Hash,
+ sizeof(ExtractRangeResult.RawHash.Hash));
+ FragmentHashStream.Append(&FragmentRawOffset, sizeof(FragmentRawOffset));
+ FragmentHashStream.Append(&FragmentRawLength, sizeof(FragmentRawLength));
+ IoHash FragmentHash = FragmentHashStream.GetHash();
+
+ ResponseWriter.AddHash("FragmentHash", FragmentHash);
+ ResponseWriter.AddInteger("FragmentOffset", FragmentRawOffset);
+ ResponseWriter.AddInteger("RawSize", ExtractRangeResult.RawSize);
+ ResponsePackage.AddAttachment(CbAttachment(CompressedValue, FragmentHash));
+ }
+ else
+ {
+ std::string ErrorString = "Failed to get compression parameters from partial compressed buffer";
+ ResponseWriter.AddString("Error", ErrorString);
+ ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString);
}
-
- TotalBytes += RawSize;
- ++TotalFiles;
-
- // Rewrite file array entry with new data reference
- CbObjectWriter Writer(View.GetSize());
- RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
- if (Field.GetName() == "data"sv)
- {
- // omit this field as we will write it explicitly ourselves
- return true;
- }
- return false;
- });
- Writer.AddBinaryAttachment("data"sv, RawHash);
-
- CbObject RewrittenOp = Writer.Save();
- FilesArrayWriter.AddObject(std::move(RewrittenOp));
- CopyField = false;
+ }
+ else
+ {
+ ResponseWriter.AddHash("RawHash"sv, ExtractRangeResult.RawHash);
+ ResponsePackage.AddAttachment(CbAttachment(std::move(CompressedValue), ExtractRangeResult.RawHash));
}
}
- }
-
- if (CopyField)
- {
- FilesArrayWriter.AddField(Field);
- }
- else
- {
- OpRewritten = true;
- }
- }
-
- if (OpRewritten && AllOk)
- {
- FilesArrayWriter.EndArray();
- CbArray FilesArray = FilesArrayWriter.Save().AsArray();
-
- CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
- if (Field.GetName() == "files"sv)
+ else
{
- NewWriter.AddArray("files"sv, FilesArray);
-
- return true;
+ IoHashStream HashStream;
+ ZEN_ASSERT(Request.Id.index() == 1);
+ const Oid& Id = std::get<Oid>(Request.Id);
+ HashStream.Append(Id.OidBits, sizeof(Id.OidBits));
+ HashStream.Append(&Request.Offset, sizeof(Request.Offset));
+ HashStream.Append(&Request.Size, sizeof(Request.Size));
+ IoHash Hash = HashStream.GetHash();
+
+ ResponseWriter.AddHash("Hash"sv, Hash);
+ if (ExtractRangeResult.RawSize != 0)
+ {
+ ResponseWriter.AddInteger("Size", ExtractRangeResult.RawSize);
+ }
+ ResponsePackage.AddAttachment(CbAttachment(std::move(ExtractRangeResult.Chunk), Hash));
}
-
- return false;
- });
-
- NewOps.push_back(std::move(RewrittenOp));
- }
-
- OpCount++;
- },
- Oplog::Paging{});
-
- CbObjectWriter ResponseObj;
-
- // Persist rewritten oplog entries
- if (!NewOps.empty())
- {
- ResponseObj.BeginArray("rewritten_ops");
-
- for (CbObject& NewOp : NewOps)
- {
- ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
-
- ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number);
-
- ResponseObj.AddInteger(NewLsn.Number);
- }
-
- ResponseObj.EndArray();
- }
-
- // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the
- // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have
- // unreferenced chunks.
- for (auto It : AddedChunks)
- {
- const IoHash& RawHash = It.first;
- AddedChunk& Chunk = It.second;
- CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash);
- if (Result.New)
- {
- InlinedBytes += Chunk.RawSize;
- ++InlinedFiles;
- }
- }
-
- ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
- ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;
-
- ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount);
-
- HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save());
- return true;
- }
- case HashStringDjb2("addopattachments"sv):
- {
- ZEN_TRACE_CPU("Store::Rpc::addopattachments");
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
- }
- case HashStringDjb2("appendops"sv):
- {
- ZEN_TRACE_CPU("Store::Rpc::appendops");
- if (!AreDiskWritesAllowed())
- {
- HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
- return true;
- }
-
- CbArrayView OpsArray = Cb["ops"sv].AsArrayView();
-
- size_t OpsBufferSize = 0;
- for (CbFieldView OpView : OpsArray)
- {
- OpsBufferSize += OpView.GetSize();
- }
- UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize);
- MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView();
-
- std::vector<CbObjectView> Ops;
- Ops.reserve(OpsArray.Num());
- for (CbFieldView OpView : OpsArray)
- {
- OpView.CopyTo(OpsBuffersMemory);
- Ops.push_back(CbObjectView(OpsBuffersMemory.GetData()));
- OpsBuffersMemory.MidInline(OpView.GetSize());
- }
-
- std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops);
- ZEN_ASSERT(LSNs.size() == Ops.size());
-
- std::vector<IoHash> MissingAttachments;
- for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++)
- {
- if (LSNs[OpIndex])
- {
- CbObjectView Op = Ops[OpIndex];
- Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) {
- const IoHash Cid = AttachmentView.AsAttachment();
- if (!m_CidStore.ContainsChunk(Cid))
+ }
+ else
{
- MissingAttachments.push_back(Cid);
+ std::string ErrorString =
+ fmt::format("Failed fetching chunk range ({})", ExtractRangeResult.ErrorDescription);
+ ResponseWriter.AddString("Error", ErrorString);
+ ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString);
}
- });
- }
- }
-
- CbPackage ResponsePackage;
-
- {
- CbObjectWriter ResponseObj;
- ResponseObj.BeginArray("written_ops");
-
- for (ProjectStore::LogSequenceNumber NewLsn : LSNs)
- {
- ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number);
- ResponseObj.AddInteger(NewLsn.Number);
- }
- ResponseObj.EndArray();
-
- if (!MissingAttachments.empty())
- {
- ResponseObj.BeginArray("need");
-
- for (const IoHash& Cid : MissingAttachments)
- {
- ResponseObj.AddHash(Cid);
}
- ResponseObj.EndArray();
}
- ResponsePackage.SetObject(ResponseObj.Save());
}
-
- std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage);
-
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers);
- return true;
+ ResponseWriter.EndObject();
}
- default:
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method));
- return false;
- }
- return true;
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
-{
- ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_TRACE_CPU("Store::Export");
-
- using namespace std::literals;
-
- size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
- size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
- size_t MaxChunksPerBlock = Params["maxchunksperblock"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunksPerBlock);
- size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit);
- bool Force = Params["force"sv].AsBool(false);
- bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
- bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
-
- CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
-
- if (RemoteStoreResult.Store == nullptr)
- {
- return {HttpResponseCode::BadRequest, RemoteStoreResult.Description};
- }
- std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
- RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
-
- JobId JobId = m_JobQueue.QueueJob(
- fmt::format("Export oplog '{}/{}'", Project->Identifier, Oplog.OplogId()),
- [this,
- ActualRemoteStore = std::move(RemoteStore),
- Project,
- OplogPtr = &Oplog,
- MaxBlockSize,
- MaxChunksPerBlock,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- EmbedLooseFile,
- Force,
- IgnoreMissingAttachments](JobContext& Context) {
- Context.ReportMessage(fmt::format("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}",
- Project->Identifier,
- OplogPtr->OplogId(),
- ActualRemoteStore->GetInfo().Description,
- NiceBytes(MaxBlockSize),
- NiceBytes(MaxChunkEmbedSize)));
-
- RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
- *ActualRemoteStore,
- *Project.Get(),
- *OplogPtr,
- MaxBlockSize,
- MaxChunksPerBlock,
- MaxChunkEmbedSize,
- ChunkFileSizeLimit,
- EmbedLooseFile,
- Force,
- IgnoreMissingAttachments,
- &Context);
- auto Response = ConvertResult(Result);
- ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
- if (!IsHttpSuccessCode(Response.first))
- {
- throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
- (int)Response.first);
- }
- });
-
- return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
-{
- ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_TRACE_CPU("Store::Import");
-
- using namespace std::literals;
-
- size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize);
- size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
- bool Force = Params["force"sv].AsBool(false);
- bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
- bool CleanOplog = Params["clean"].AsBool(false);
-
- CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
-
- if (RemoteStoreResult.Store == nullptr)
- {
- return {HttpResponseCode::BadRequest, RemoteStoreResult.Description};
+ }
}
- std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
- RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+ ResponseWriter.EndArray();
+ ResponsePackage.SetObject(ResponseWriter.Save());
- JobId JobId = m_JobQueue.QueueJob(
- fmt::format("Import oplog '{}/{}'", Project.Identifier, Oplog.OplogId()),
- [this,
- ChunkStore = &m_CidStore,
- ActualRemoteStore = std::move(RemoteStore),
- OplogPtr = &Oplog,
- Force,
- IgnoreMissingAttachments,
- CleanOplog](JobContext& Context) {
- Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}",
- OplogPtr->GetOuterProjectIdentifier(),
- OplogPtr->OplogId(),
- ActualRemoteStore->GetInfo().Description));
-
- RemoteProjectStore::Result Result =
- LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, CleanOplog, &Context);
- auto Response = ConvertResult(Result);
- ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
- if (!IsHttpSuccessCode(Response.first))
- {
- throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
- (int)Response.first);
- }
- });
-
- return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)};
+ return ResponsePackage;
}
bool
@@ -7209,14 +6240,14 @@ OpKeyStringAsOid(std::string_view OpKey)
namespace testutils {
using namespace std::literals;
- std::string OidAsString(const Oid& Id)
+ static std::string OidAsString(const Oid& Id)
{
StringBuilder<25> OidStringBuilder;
Id.ToString(OidStringBuilder);
return OidStringBuilder.ToString();
}
- CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
+ static CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
{
CbPackage Package;
CbObjectWriter Object;
@@ -7242,9 +6273,9 @@ namespace testutils {
return Package;
};
- CbPackage CreateFilesOplogPackage(const Oid& Id,
- const std::filesystem::path ProjectRootDir,
- const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments)
+ static CbPackage CreateFilesOplogPackage(const Oid& Id,
+ const std::filesystem::path ProjectRootDir,
+ const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments)
{
CbPackage Package;
CbObjectWriter Object;
@@ -7268,7 +6299,7 @@ namespace testutils {
return Package;
};
- std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(
+ static std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(
const std::span<const size_t>& Sizes,
OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
uint64_t BlockSize = 0)
@@ -7284,7 +6315,7 @@ namespace testutils {
return Result;
}
- uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset)
+ static uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset)
{
if (RawOffset > 0)
{
@@ -7416,8 +6447,6 @@ TEST_CASE("project.store.create")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
@@ -7425,7 +6454,7 @@ TEST_CASE("project.store.create")
std::string_view ProjectName("proj1"sv);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -7447,15 +6476,13 @@ TEST_CASE("project.store.lifetimes")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -7480,135 +6507,6 @@ TEST_CASE("project.store.lifetimes")
CHECK(Project->Identifier == "proj1"sv);
}
-struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse
-{
- static const bool ForceDisableBlocks = true;
- static const bool ForceEnableTempBlocks = false;
-};
-
-struct ExportForceDisableBlocksFalse_ForceTempBlocksFalse
-{
- static const bool ForceDisableBlocks = false;
- static const bool ForceEnableTempBlocks = false;
-};
-
-struct ExportForceDisableBlocksFalse_ForceTempBlocksTrue
-{
- static const bool ForceDisableBlocks = false;
- static const bool ForceEnableTempBlocks = true;
-};
-
-TEST_CASE_TEMPLATE("project.store.export",
- Settings,
- ExportForceDisableBlocksTrue_ForceTempBlocksFalse,
- ExportForceDisableBlocksFalse_ForceTempBlocksFalse,
- ExportForceDisableBlocksFalse_ForceTempBlocksTrue)
-{
- using namespace std::literals;
- using namespace testutils;
-
- ScopedTemporaryDirectory TempDir;
- ScopedTemporaryDirectory ExportDir;
-
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
-
- std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
- std::filesystem::path RootDir = TempDir.Path() / "root";
- std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
- std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
- std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
-
- Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv,
- "proj1"sv,
- RootDir.string(),
- EngineRootDir.string(),
- ProjectRootDir.string(),
- ProjectFilePath.string()));
- Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog1", {});
- CHECK(Oplog);
-
- Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
- Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
- Oplog->AppendNewOplogEntry(
- CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
- Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
- Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(
- Oid::NewOid(),
- CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None)));
-
- FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024,
- .MaxChunksPerBlock = 1000,
- .MaxChunkEmbedSize = 32 * 1024u,
- .ChunkFileSizeLimit = 64u * 1024u},
- /*.FolderPath = */ ExportDir.Path(),
- /*.Name = */ std::string("oplog1"),
- /*OptionalBaseName = */ std::string(),
- /*.ForceDisableBlocks = */ Settings::ForceDisableBlocks,
- /*.ForceEnableTempBlocks = */ Settings::ForceEnableTempBlocks};
- std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options);
- RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
-
- RemoteProjectStore::Result ExportResult = SaveOplog(CidStore,
- *RemoteStore,
- *Project.Get(),
- *Oplog,
- Options.MaxBlockSize,
- Options.MaxChunksPerBlock,
- Options.MaxChunkEmbedSize,
- Options.ChunkFileSizeLimit,
- true,
- false,
- false,
- nullptr);
-
- CHECK(ExportResult.ErrorCode == 0);
-
- Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {});
- CHECK(OplogImport);
-
- RemoteProjectStore::Result ImportResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- /*Force*/ false,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ false,
- nullptr);
- CHECK(ImportResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- /*Force*/ true,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ false,
- nullptr);
- CHECK(ImportForceResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- /*Force*/ false,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ true,
- nullptr);
- CHECK(ImportCleanResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- /*Force*/ true,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ true,
- nullptr);
- CHECK(ImportForceCleanResult.ErrorCode == 0);
-}
-
TEST_CASE("project.store.gc")
{
using namespace std::literals;
@@ -7616,15 +6514,13 @@ TEST_CASE("project.store.gc")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
@@ -7817,15 +6713,13 @@ TEST_CASE("project.store.gc.prep")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
@@ -8033,15 +6927,13 @@ TEST_CASE("project.store.rpc.getchunks")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
@@ -8085,24 +6977,30 @@ TEST_CASE("project.store.rpc.getchunks")
Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(FilesOpId, RootDir, FilesOpIdAttachments));
}
+ auto GetChunks = [](zen::ProjectStore& ProjectStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, const CbObject& Cb) {
+ std::vector<ProjectStore::ChunkRequest> Requests = ProjectStore.ParseChunksRequests(Project, Oplog, Cb);
+ std::vector<ProjectStore::ChunkResult> Results =
+ Requests.empty() ? std::vector<ProjectStore::ChunkResult>{} : ProjectStore.GetChunks(Project, Oplog, Requests);
+ return ProjectStore.WriteChunksRequestResponse(Project, Oplog, std::move(Requests), std::move(Results));
+ };
+
+ Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
+ CHECK(Project1);
+ Ref<ProjectStore::Oplog> Oplog1 = Project1->OpenOplog("oplog1"sv, false, true);
+ CHECK(Oplog1);
// Invalid request
{
CbObjectWriter Request;
Request.BeginObject("WrongName"sv);
Request.EndObject();
CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv, "oplog1"sv, Request.Save(), Response);
- CHECK_EQ(HttpResponseCode::BadRequest, Result.first);
+ CHECK_THROWS(GetChunks(ProjectStore, *Project1, *Oplog1, Request.Save()));
}
// Empty request
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, std::vector<IoHash>{}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, std::vector<IoHash>{}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(0, Chunks.Num());
@@ -8110,23 +7008,15 @@ TEST_CASE("project.store.rpc.getchunks")
// Single non-existing chunk by RawHash
IoHash NotFoundIoHash = IoHash::Max;
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundIoHash}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundIoHash}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(0, Chunks.Num());
}
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundIoHash}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundIoHash}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(0, Chunks.Num());
@@ -8134,21 +7024,15 @@ TEST_CASE("project.store.rpc.getchunks")
// Single non-existing chunk by Id
Oid NotFoundId = Oid::NewOid();
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundId}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundId}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(0, Chunks.Num());
}
{
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv, "oplog1"sv, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundId}, {}, {}), Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundId}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(0, Chunks.Num());
@@ -8159,13 +7043,11 @@ TEST_CASE("project.store.rpc.getchunks")
IoHash FirstAttachmentHash = Attachments[OpIds[2]][1].second.DecodeRawHash();
uint64_t ResponseModTag = 0;
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {}),
- Response);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {}));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8185,13 +7067,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Fetch with matching ModTag
{
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {ResponseModTag}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {ResponseModTag}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8206,14 +7086,12 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Fetch with mismatching ModTag
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks(
- "proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}),
- Response);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8233,12 +7111,10 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Fresh modtime query
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8253,13 +7129,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Modtime query with matching ModTag
{
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {ResponseModTag}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {ResponseModTag}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8274,13 +7148,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Modtime query with mismatching ModTag
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks(
- "proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8300,12 +7172,8 @@ TEST_CASE("project.store.rpc.getchunks")
uint64_t ResponseModTag = 0;
{
// Full chunk request
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}));
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8325,13 +7193,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Partial chunk request
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{130 * 1024, 8100}}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{130 * 1024, 8100}}, {}));
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8356,13 +7222,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fetch with matching ModTag
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8377,13 +7241,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fetch with mismatching ModTag
- CbPackage Response;
- auto Result = ProjectStore.GetChunks(
- "proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}));
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8403,12 +7265,8 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Fresh modtime query
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8423,13 +7281,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Modtime query with matching ModTag
{
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8444,13 +7300,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Modtime query with mismatching ModTag
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks(
- "proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8471,12 +7325,8 @@ TEST_CASE("project.store.rpc.getchunks")
uint64_t ResponseModTag = 0;
{
// Full chunk request
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}));
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8496,13 +7346,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Partial chunk request
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{81823, 5434}}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{81823, 5434}}, {}));
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8524,13 +7372,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fetch with matching ModTag
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8545,13 +7391,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fetch with mismatching ModTag
- CbPackage Response;
- auto Result = ProjectStore.GetChunks(
- "proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}));
CHECK_EQ(1, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8571,12 +7415,8 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Fresh modtime query
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8591,13 +7431,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Modtime query with matching ModTag
{
- CbPackage Response;
- auto Result =
- ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8612,13 +7450,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
// Modtime query with mismatching ModTag
{
- CbPackage Response;
- auto Result = ProjectStore.GetChunks(
- "proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}),
- Response);
- CHECK_EQ(HttpResponseCode::OK, Result.first);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}));
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(1, Chunks.Num());
@@ -8644,13 +7480,9 @@ TEST_CASE("project.store.rpc.getchunks")
std::vector<uint64_t> ResponseModTags(3, 0);
{
// Fresh fetch
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, {}),
- Response);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, {}));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(3, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8677,13 +7509,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fetch with matching ModTag
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, ResponseModTags),
- Response);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, ResponseModTags));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8704,13 +7534,9 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fresh modtime query
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, {}),
- Response);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, {}));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8731,13 +7557,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Modtime query with matching ModTags
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, ResponseModTags),
- Response);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, ResponseModTags));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8763,13 +7587,12 @@ TEST_CASE("project.store.rpc.getchunks")
{
Tag++;
}
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, MismatchingModTags),
- Response);
+ const CbPackage& Response =
+ GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, MismatchingModTags));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8801,13 +7624,9 @@ TEST_CASE("project.store.rpc.getchunks")
std::vector<uint64_t> ResponseModTags(3, 0);
{
// Fresh fetch
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, {}),
- Response);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, {}));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(3, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8834,13 +7653,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fetch with matching ModTag
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, ResponseModTags),
- Response);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, ResponseModTags));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8861,13 +7678,9 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Fresh modtime query
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, {}),
- Response);
+ const CbPackage& Response =
+ GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, {}));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8888,13 +7701,11 @@ TEST_CASE("project.store.rpc.getchunks")
}
{
// Modtime query with matching ModTags
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, ResponseModTags),
- Response);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, ResponseModTags));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8920,13 +7731,11 @@ TEST_CASE("project.store.rpc.getchunks")
{
Tag++;
}
- CbPackage Response;
- auto Result = ProjectStore.GetChunks("proj1"sv,
- "oplog1"sv,
- testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, MismatchingModTags),
- Response);
+ const CbPackage& Response = GetChunks(ProjectStore,
+ *Project1,
+ *Oplog1,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, MismatchingModTags));
- CHECK_EQ(HttpResponseCode::OK, Result.first);
CHECK_EQ(0, Response.GetAttachments().size());
CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
CHECK_EQ(3, Chunks.Num());
@@ -8955,15 +7764,13 @@ TEST_CASE("project.store.partial.read")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
@@ -9009,7 +7816,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[1]][0].first,
0,
~0ull,
- HttpContentType::kCompressedBinary,
+ ZenContentType::kCompressedBinary,
&ModificationTag);
CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::Ok, Result.Error);
@@ -9025,7 +7832,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[1]][0].first,
0,
~0ull,
- HttpContentType::kCompressedBinary,
+ ZenContentType::kCompressedBinary,
&ModificationTag);
CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::NotModified, Result2.Error);
}
@@ -9039,7 +7846,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[2]][1].first,
0,
~0ull,
- HttpContentType::kCompressedBinary,
+ ZenContentType::kCompressedBinary,
&FullChunkModificationTag);
CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok);
CHECK(Result.Chunk);
@@ -9053,7 +7860,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[2]][1].first,
0,
~0ull,
- HttpContentType::kCompressedBinary,
+ ZenContentType::kCompressedBinary,
&FullChunkModificationTag);
CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified);
}
@@ -9067,7 +7874,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[2]][1].first,
5,
1773,
- HttpContentType::kCompressedBinary,
+ ZenContentType::kCompressedBinary,
&PartialChunkModificationTag);
CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok);
@@ -9091,7 +7898,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[2]][1].first,
0,
1773,
- HttpContentType::kCompressedBinary,
+ ZenContentType::kCompressedBinary,
&PartialChunkModificationTag);
CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified);
}
@@ -9133,15 +7940,13 @@ TEST_CASE("project.store.iterateoplog")
ScopedTemporaryDirectory TempDir;
- auto JobQueue = MakeJobQueue(1, ""sv);
- OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "enginesv";