diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-02 15:56:00 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-02 15:56:00 +0200 |
| commit | adbec59ee0b169ba0ac9d167eac4ed72edefe1ca (patch) | |
| tree | c05d4e5624d98272969f6509fe5cf8a75d6ebf9d /src/zenserver/projectstore/projectstore.cpp | |
| parent | Zs/OIDC exe path handling (#538) (diff) | |
| download | zen-adbec59ee0b169ba0ac9d167eac4ed72edefe1ca.tar.xz zen-adbec59ee0b169ba0ac9d167eac4ed72edefe1ca.zip | |
projectstore refactor phase 2 (#539)
Refactor projectstore/httpprojectservice to prepare for move of projectstore to zenstore
Diffstat (limited to 'src/zenserver/projectstore/projectstore.cpp')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 2025 |
1 files changed, 415 insertions, 1610 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 181177653..5034a250a 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -3,36 +3,24 @@ #include "projectstore.h" #include <zencore/assertfmt.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryutil.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> -#include <zencore/jobqueue.h> #include <zencore/logging.h> #include <zencore/memory/llm.h> #include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/timer.h> #include <zencore/trace.h> -#include <zenhttp/packageformat.h> #include <zenstore/caslog.h> #include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> -#include <zenutil/cache/rpcrecording.h> -#include <zenutil/openprocesscache.h> #include <zenutil/parallelwork.h> #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> -#include "buildsremoteprojectstore.h" -#include "fileremoteprojectstore.h" -#include "jupiterremoteprojectstore.h" -#include "remoteprojectstore.h" -#include "zenremoteprojectstore.h" - ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_set.h> #include <xxh3.h> @@ -41,6 +29,9 @@ ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zenutil/chunkblock.h> + +# include <unordered_map> #endif // ZEN_WITH_TESTS namespace zen { @@ -140,263 +131,6 @@ namespace { return CheckWriteTime < ReferenceWriteTime; } - struct CreateRemoteStoreResult - { - std::shared_ptr<RemoteProjectStore> Store; - std::string Description; - }; - CreateRemoteStoreResult CreateRemoteStore(CbObjectView Params, - AuthMgr& AuthManager, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - const std::filesystem::path& TempFilePath) - { - ZEN_MEMSCOPE(GetProjectstoreTag()); - - using namespace std::literals; - - std::shared_ptr<RemoteProjectStore> RemoteStore; - - if (CbObjectView File = Params["file"sv].AsObjectView(); File) - { - std::filesystem::path FolderPath(File["path"sv].AsString()); - if (FolderPath.empty()) - { - return {nullptr, "Missing file path"}; - } - std::string_view Name(File["name"sv].AsString()); - if (Name.empty()) - { - return {nullptr, "Missing file name"}; - } - std::string_view OptionalBaseName(File["basename"sv].AsString()); - bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false); - bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false); - - FileRemoteStoreOptions Options = { - RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - FolderPath, - std::string(Name), - std::string(OptionalBaseName), - ForceDisableBlocks, - ForceEnableTempBlocks}; - RemoteStore = CreateFileRemoteStore(Options); - } - - if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud) - { - std::string_view CloudServiceUrl = Cloud["url"sv].AsString(); - if (CloudServiceUrl.empty()) - { - return {nullptr, "Missing service url"}; - } - - std::string Url = UrlDecode(CloudServiceUrl); - std::string_view Namespace = Cloud["namespace"sv].AsString(); - if (Namespace.empty()) - { - return {nullptr, "Missing namespace"}; - } - std::string_view Bucket = Cloud["bucket"sv].AsString(); - if (Bucket.empty()) - { - return {nullptr, "Missing bucket"}; - } - std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString(); - std::string AccessToken = std::string(Cloud["access-token"sv].AsString()); - if (AccessToken.empty()) - { - std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString(); - if (!AccessTokenEnvVariable.empty()) - { - AccessToken = GetEnvVariable(AccessTokenEnvVariable); - } - } - std::filesystem::path OidcExePath; - if (std::string_view OidcExePathString = Cloud["oidc-exe-path"].AsString(); !OidcExePathString.empty()) - { - std::filesystem::path OidcExePathMaybe(OidcExePathString); - if (IsFile(OidcExePathMaybe)) - { - OidcExePath = std::move(OidcExePathMaybe); - } - else - { - ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); - } - } - std::string_view KeyParam = Cloud["key"sv].AsString(); - if (KeyParam.empty()) - { - return {nullptr, "Missing key"}; - } - if (KeyParam.length() != IoHash::StringLength) - { - return {nullptr, "Invalid key"}; - } - IoHash Key = IoHash::FromHexString(KeyParam); - if (Key == IoHash::Zero) - { - return {nullptr, "Invalid key string"}; - } - IoHash BaseKey = IoHash::Zero; - std::string_view BaseKeyParam = Cloud["basekey"sv].AsString(); - if (!BaseKeyParam.empty()) - { - if (BaseKeyParam.length() != IoHash::StringLength) - { - return {nullptr, "Invalid base key"}; - } - BaseKey = IoHash::FromHexString(BaseKeyParam); - if (BaseKey == IoHash::Zero) - { - return {nullptr, "Invalid base key string"}; - } - } - - bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false); - bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false); - bool AssumeHttp2 = Cloud["assumehttp2"sv].AsBool(false); - - JupiterRemoteStoreOptions Options = { - RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - Url, - std::string(Namespace), - std::string(Bucket), - Key, - BaseKey, - std::string(OpenIdProvider), - AccessToken, - AuthManager, - OidcExePath, - ForceDisableBlocks, - ForceDisableTempBlocks, - AssumeHttp2}; - RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false); - } - - if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen) - { - std::string_view Url = Zen["url"sv].AsString(); - std::string_view Project = Zen["project"sv].AsString(); - if (Project.empty()) - { - return {nullptr, "Missing project"}; - } - std::string_view Oplog = Zen["oplog"sv].AsString(); - if (Oplog.empty()) - { - return {nullptr, "Missing oplog"}; - } - ZenRemoteStoreOptions Options = { - RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - std::string(Url), - std::string(Project), - std::string(Oplog)}; - RemoteStore = CreateZenRemoteStore(Options, TempFilePath); - } - - if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds) - { - std::string_view BuildsServiceUrl = Builds["url"sv].AsString(); - if (BuildsServiceUrl.empty()) - { - return {nullptr, "Missing service url"}; - } - - std::string Url = UrlDecode(BuildsServiceUrl); - std::string_view Namespace = Builds["namespace"sv].AsString(); - if (Namespace.empty()) - { - return {nullptr, "Missing namespace"}; - } - std::string_view Bucket = Builds["bucket"sv].AsString(); - if (Bucket.empty()) - { - return {nullptr, "Missing bucket"}; - } - std::string_view OpenIdProvider = Builds["openid-provider"sv].AsString(); - std::string AccessToken = std::string(Builds["access-token"sv].AsString()); - if (AccessToken.empty()) - { - std::string_view AccessTokenEnvVariable = Builds["access-token-env"].AsString(); - if (!AccessTokenEnvVariable.empty()) - { - AccessToken = GetEnvVariable(AccessTokenEnvVariable); - } - } - std::filesystem::path OidcExePath; - if (std::string_view OidcExePathString = Builds["oidc-exe-path"].AsString(); !OidcExePathString.empty()) - { - std::filesystem::path OidcExePathMaybe(OidcExePathString); - if (IsFile(OidcExePathMaybe)) - { - OidcExePath = std::move(OidcExePathMaybe); - } - else - { - ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); - } - } - std::string_view BuildIdParam = Builds["buildsid"sv].AsString(); - if (BuildIdParam.empty()) - { - return {nullptr, "Missing build id"}; - } - if (BuildIdParam.length() != Oid::StringLength) - { - return {nullptr, "Invalid build id"}; - } - Oid BuildId = Oid::FromHexString(BuildIdParam); - if (BuildId == Oid::Zero) - { - return {nullptr, "Invalid build id string"}; - } - - bool ForceDisableBlocks = Builds["disableblocks"sv].AsBool(false); - bool ForceDisableTempBlocks = Builds["disabletempblocks"sv].AsBool(false); - bool AssumeHttp2 = Builds["assumehttp2"sv].AsBool(false); - - MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView(); - IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize()); - - BuildsRemoteStoreOptions Options = { - RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - Url, - std::string(Namespace), - std::string(Bucket), - BuildId, - std::string(OpenIdProvider), - AccessToken, - AuthManager, - OidcExePath, - ForceDisableBlocks, - ForceDisableTempBlocks, - AssumeHttp2, - MetaData}; - RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false); - } - - if (!RemoteStore) - { - return {nullptr, "Unknown remote store type"}; - } - - return {std::move(RemoteStore), ""}; - } - - std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result) - { - if (Result.ErrorCode == 0) - { - return {HttpResponseCode::OK, Result.Text}; - } - return {static_cast<HttpResponseCode>(Result.ErrorCode), - Result.Reason.empty() ? Result.Text - : Result.Text.empty() ? Result.Reason - : fmt::format("{}: {}", Result.Reason, Result.Text)}; - } - std::pair<int32_t, int32_t> GetPagedRange(int32_t TotalSize, const ProjectStore::Oplog::Paging& EntryPaging) { int32_t Start = std::clamp(EntryPaging.Start, 0, TotalSize); @@ -4292,17 +4026,10 @@ ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const ////////////////////////////////////////////////////////////////////////// -ProjectStore::ProjectStore(CidStore& Store, - std::filesystem::path BasePath, - GcManager& Gc, - JobQueue& JobQueue, - OpenProcessCache& InOpenProcessCache, - const Configuration& Config) +ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, const Configuration& Config) : m_Log(logging::Get("project")) , m_Gc(Gc) , m_CidStore(Store) -, m_JobQueue(JobQueue) -, m_OpenProcessCache(InOpenProcessCache) , m_ProjectBasePath(BasePath) , m_Config(Config) , m_DiskWriteBlocker(Gc.GetDiskWriteBlocker()) @@ -4990,7 +4717,7 @@ ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, cons } uint64_t ChunkSize = Chunk.GetSize(); - if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) + if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) { IoHash RawHash; uint64_t RawSize; @@ -5165,6 +4892,7 @@ IoBuffer ProjectStore::GetChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash) { ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("ProjectStore::GetChunk"); ZEN_UNUSED(Project, Oplog); IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash); @@ -5182,6 +4910,7 @@ IoBuffer ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const Oid& ChunkId) { ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("ProjectStore::GetChunk"); Ref<Project> Project = OpenProject(ProjectId); if (!Project) @@ -5200,6 +4929,7 @@ IoBuffer ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const IoHash& Cid) { ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("ProjectStore::GetChunk"); Ref<Project> Project = OpenProject(ProjectId); if (!Project) @@ -5218,6 +4948,8 @@ bool ProjectStore::PutChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash, IoBuffer&& Chunk) { ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("ProjectStore::PutChunk"); + IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); @@ -5232,1041 +4964,340 @@ ProjectStore::PutChunk(Project& Project, Oplog& Oplog, const IoHash& ChunkHash, return Result.New; } -std::pair<HttpResponseCode, std::string> -ProjectStore::GetChunks(const std::string_view ProjectId, - const std::string_view OplogId, - const CbObject& RequestObject, - CbPackage& OutResponsePackage) +std::vector<ProjectStore::ChunkResult> +ProjectStore::GetChunks(Project& Project, Oplog& Oplog, std::span<const ChunkRequest> Requests) { ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::GetChunks"); + ZEN_TRACE_CPU("ProjectStore::GetChunks"); - using namespace std::literals; + ZEN_ASSERT(!Requests.empty()); - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("getchunks rpc request for unknown project '{}'", ProjectId)}; - } - Project->TouchProject(); + std::vector<ProjectStore::ChunkResult> Results; + size_t RequestCount = Requests.size(); - Ref<ProjectStore::Oplog> FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); - if (!FoundLog) - { - return {HttpResponseCode::NotFound, fmt::format("getchunks rpc request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - Project->TouchOplog(OplogId); + Results.resize(RequestCount); - if (RequestObject["chunks"sv].IsArray()) + if (RequestCount > 1) { - // Legacy full chunks only by rawhash + std::vector<IoHash> ChunkRawHashes; + std::vector<size_t> ChunkRawHashesRequestIndex; + std::vector<Oid> ChunkIds; + std::vector<size_t> ChunkIdsRequestIndex; - CbArrayView ChunksArray = RequestObject["chunks"sv].AsArrayView(); + ChunkRawHashes.reserve(RequestCount); + ChunkRawHashesRequestIndex.reserve(RequestCount); + ChunkIds.reserve(RequestCount); + ChunkIdsRequestIndex.reserve(RequestCount); - CbObjectWriter ResponseWriter; - ResponseWriter.BeginArray("chunks"sv); - for (CbFieldView FieldView : ChunksArray) + for (size_t RequestIndex = 0; RequestIndex < Requests.size(); RequestIndex++) { - IoHash RawHash = FieldView.AsHash(); - IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash); - if (ChunkBuffer) + const ChunkRequest& Request = Requests[RequestIndex]; + if (Request.Id.index() == 0) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)); - if (Compressed) + ChunkRawHashes.push_back(std::get<IoHash>(Request.Id)); + ChunkRawHashesRequestIndex.push_back(RequestIndex); + } + else + { + ChunkIds.push_back(std::get<Oid>(Request.Id)); + ChunkIdsRequestIndex.push_back(RequestIndex); + } + } + + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); + if (!ChunkRawHashes.empty()) + { + Oplog.IterateChunks( + ChunkRawHashes, + true, + [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { + if (Payload) + { + size_t RequestIndex = ChunkRawHashesRequestIndex[Index]; + const ChunkRequest& Request = Requests[RequestIndex]; + ChunkResult& Result = Results[RequestIndex]; + Result.Exists = true; + if (!Request.SkipData) + { + Result.ChunkBuffer = std::move(Payload); + Result.ChunkBuffer.MakeOwned(); + } + Result.ModTag = ModTag; + } + return true; + }, + &WorkerPool, + 8u * 1024); + } + if (!ChunkIdsRequestIndex.empty()) + { + Oplog.IterateChunks( + Project.RootDir, + ChunkIds, + true, + [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { + if (Payload) + { + size_t RequestIndex = ChunkIdsRequestIndex[Index]; + const ChunkRequest& Request = Requests[RequestIndex]; + ChunkResult& Result = Results[RequestIndex]; + Result.Exists = true; + if (!Request.SkipData) + { + Result.ChunkBuffer = std::move(Payload); + Result.ChunkBuffer.MakeOwned(); + } + Result.ModTag = ModTag; + } + return true; + }, + &WorkerPool, + 8u * 1024); + } + } + else + { + const ChunkRequest& Request = Requests.front(); + ChunkResult& Result = Results.front(); + + if (Request.Id.index() == 0) + { + const IoHash& ChunkHash = std::get<IoHash>(Request.Id); + IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash); + if (Payload) + { + Result.Exists = true; + Result.ModTag = GetModificationTagFromRawHash(ChunkHash); + if (!Request.SkipData) { - ResponseWriter.AddHash(RawHash); - OutResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); + Result.ChunkBuffer = std::move(Payload); + Result.ChunkBuffer.MakeOwned(); } - else + } + } + else + { + const Oid& ChunkId = std::get<Oid>(Request.Id); + uint64_t ModTag = 0; + IoBuffer Payload = Oplog.FindChunk(Project.RootDir, ChunkId, &ModTag); + if (Payload) + { + Result.Exists = true; + Result.ModTag = ModTag; + if (!Request.SkipData) { - ZEN_WARN("oplog '{}/{}': invalid compressed binary in cas store for {}", ProjectId, OplogId, RawHash); + Result.ChunkBuffer = std::move(Payload); + Result.ChunkBuffer.MakeOwned(); } } } - ResponseWriter.EndArray(); - OutResponsePackage.SetObject(ResponseWriter.Save()); - return {HttpResponseCode::OK, {}}; } - else if (auto RequestFieldView = RequestObject["Request"sv]; RequestFieldView.IsObject()) + return Results; +} + +std::vector<ProjectStore::ChunkRequest> +ProjectStore::ParseChunksRequests(Project& Project, Oplog& Oplog, const CbObject& Cb) +{ + ZEN_TRACE_CPU("Store::Rpc::getchunks"); + + using namespace std::literals; + + std::vector<ChunkRequest> Requests; + + if (auto RequestFieldView = Cb["Request"sv]; RequestFieldView.IsObject()) { CbObjectView RequestView = RequestFieldView.AsObjectView(); bool SkipData = RequestView["SkipData"].AsBool(false); CbArrayView ChunksArray = RequestView["Chunks"sv].AsArrayView(); - struct Request - { - struct InputData - { - uint64_t Offset = 0; - uint64_t Size = (uint64_t)-1; - std::variant<IoHash, Oid> Id; - std::optional<uint64_t> ModTag; - } Input; - struct OutputData - { - bool Exists = false; - IoBuffer ChunkBuffer; - uint64_t ModTag = 0; - } Output; - }; - - std::vector<Request> Requests; - size_t RequestCount = ChunksArray.Num(); + size_t RequestCount = ChunksArray.Num(); if (RequestCount > 0) { Requests.reserve(RequestCount); - std::vector<IoHash> ChunkRawHashes; - std::vector<size_t> ChunkRawHashesRequestIndex; - std::vector<Oid> ChunkIds; - std::vector<size_t> ChunkIdsRequestIndex; - bool DoBatch = RequestCount > 1; - if (DoBatch) - { - ChunkRawHashes.reserve(RequestCount); - ChunkRawHashesRequestIndex.reserve(RequestCount); - ChunkIds.reserve(RequestCount); - ChunkIdsRequestIndex.reserve(RequestCount); - } for (CbFieldView FieldView : ChunksArray) { CbObjectView ChunkObject = FieldView.AsObjectView(); - Request ChunkRequest = { - .Input{.Offset = ChunkObject["Offset"sv].AsUInt64(0), .Size = ChunkObject["Size"sv].AsUInt64((uint64_t)-1)}}; + ChunkRequest ChunkRequest = {.Offset = ChunkObject["Offset"sv].AsUInt64(0), + .Size = ChunkObject["Size"sv].AsUInt64((uint64_t)-1), + .SkipData = SkipData}; if (CbFieldView InputModificationTagView = ChunkObject.FindView("ModTag"); InputModificationTagView.IsInteger()) { - ChunkRequest.Input.ModTag = InputModificationTagView.AsUInt64(); + ChunkRequest.ModTag = InputModificationTagView.AsUInt64(); } if (CbFieldView RawHashView = ChunkObject.FindView("RawHash"sv); RawHashView.IsHash()) { const IoHash ChunkHash = RawHashView.AsHash(); - ChunkRequest.Input.Id = ChunkHash; - if (DoBatch) - { - ChunkRawHashes.push_back(ChunkHash); - ChunkRawHashesRequestIndex.push_back(Requests.size()); - } + ChunkRequest.Id = ChunkHash; } else if (CbFieldView IdView = ChunkObject.FindView("Oid"sv); IdView.IsObjectId()) { - const Oid ChunkId = IdView.AsObjectId(); - ChunkRequest.Input.Id = ChunkId; - if (DoBatch) - { - ChunkIds.push_back(ChunkId); - ChunkIdsRequestIndex.push_back(Requests.size()); - } + const Oid ChunkId = IdView.AsObjectId(); + ChunkRequest.Id = ChunkId; } else { - return {HttpResponseCode::BadRequest, - fmt::format("oplog '{}/{}': malformed getchunks rpc request object, chunk request has no identifier", - ProjectId, - OplogId)}; + throw std::runtime_error( + fmt::format("oplog '{}/{}': malformed getchunks rpc request object, chunk request has no identifier", + Project.Identifier, + Oplog.OplogId())); } Requests.emplace_back(std::move(ChunkRequest)); } - - if (DoBatch) - { - WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); - if (!ChunkRawHashes.empty()) - { - FoundLog->IterateChunks( - ChunkRawHashes, - true, - [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { - if (Payload) - { - size_t RequestIndex = ChunkRawHashesRequestIndex[Index]; - Requests[RequestIndex].Output.Exists = true; - if (!SkipData) - { - Requests[RequestIndex].Output.ChunkBuffer = Payload; - Requests[RequestIndex].Output.ChunkBuffer.MakeOwned(); - } - Requests[RequestIndex].Output.ModTag = ModTag; - } - return true; - }, - &WorkerPool, - 8u * 1024); - } - if (!ChunkIdsRequestIndex.empty()) - { - FoundLog->IterateChunks( - Project->RootDir, - ChunkIds, - true, - [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { - if (Payload) - { - size_t RequestIndex = ChunkIdsRequestIndex[Index]; - Requests[RequestIndex].Output.Exists = true; - if (!SkipData) - { - Requests[RequestIndex].Output.ChunkBuffer = Payload; - Requests[RequestIndex].Output.ChunkBuffer.MakeOwned(); - } - Requests[RequestIndex].Output.ModTag = ModTag; - } - return true; - }, - &WorkerPool, - 8u * 1024); - } - } - else - { - Request& ChunkRequest = Requests.front(); - if (ChunkRequest.Input.Id.index() == 0) - { - const IoHash& ChunkHash = std::get<IoHash>(ChunkRequest.Input.Id); - IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash); - if (Payload) - { - ChunkRequest.Output.Exists = true; - ChunkRequest.Output.ModTag = GetModificationTagFromRawHash(ChunkHash); - if (!SkipData) - { - ChunkRequest.Output.ChunkBuffer = Payload; - } - } - } - else - { - const Oid& ChunkId = std::get<Oid>(ChunkRequest.Input.Id); - uint64_t ModTag = 0; - IoBuffer Payload = FoundLog->FindChunk(Project->RootDir, ChunkId, &ModTag); - if (Payload) - { - ChunkRequest.Output.Exists = true; - ChunkRequest.Output.ModTag = ModTag; - if (!SkipData) - { - ChunkRequest.Output.ChunkBuffer = Payload; - } - } - } - } - } - - CbObjectWriter ResponseWriter(32 + Requests.size() * 64u); - ResponseWriter.BeginArray("Chunks"sv); - { - for (Request& ChunkRequest : Requests) - { - if (ChunkRequest.Output.Exists) - { - ResponseWriter.BeginObject(); - { - if (ChunkRequest.Input.Id.index() == 0) - { - const IoHash& RawHash = std::get<IoHash>(ChunkRequest.Input.Id); - ResponseWriter.AddHash("Id", RawHash); - } - else - { - const Oid& Id = std::get<Oid>(ChunkRequest.Input.Id); - ResponseWriter.AddObjectId("Id", Id); - } - if (!ChunkRequest.Input.ModTag.has_value() || ChunkRequest.Input.ModTag.value() != ChunkRequest.Output.ModTag) - { - ResponseWriter.AddInteger("ModTag", ChunkRequest.Output.ModTag); - if (!SkipData) - { - auto ExtractRangeResult = ExtractRange(std::move(ChunkRequest.Output.ChunkBuffer), - ChunkRequest.Input.Offset, - ChunkRequest.Input.Size, - ZenContentType::kCompressedBinary); - if (ExtractRangeResult.Error == GetChunkRangeResult::EError::Ok) - { - if (ExtractRangeResult.ContentType == ZenContentType::kCompressedBinary) - { - ZEN_ASSERT(ExtractRangeResult.RawHash != IoHash::Zero); - CompressedBuffer CompressedValue = - CompressedBuffer::FromCompressedNoValidate(std::move(ExtractRangeResult.Chunk)); - ZEN_ASSERT(CompressedValue); - - if (ExtractRangeResult.RawSize != 0) - { - // This really could use some thought so we don't send the same data if we get a request for - // multiple ranges from the same chunk block - - uint64_t FragmentRawOffset = 0; - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - uint64_t BlockSize = 0; - if (CompressedValue.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) - { - if (BlockSize > 0) - { - FragmentRawOffset = (ChunkRequest.Input.Offset / BlockSize) * BlockSize; - } - else - { - FragmentRawOffset = ChunkRequest.Input.Offset; - } - uint64_t FragmentRawLength = CompressedValue.DecodeRawSize(); - - IoHashStream FragmentHashStream; - FragmentHashStream.Append(ExtractRangeResult.RawHash.Hash, - sizeof(ExtractRangeResult.RawHash.Hash)); - FragmentHashStream.Append(&FragmentRawOffset, sizeof(FragmentRawOffset)); - FragmentHashStream.Append(&FragmentRawLength, sizeof(FragmentRawLength)); - IoHash FragmentHash = FragmentHashStream.GetHash(); - - ResponseWriter.AddHash("FragmentHash", FragmentHash); - ResponseWriter.AddInteger("FragmentOffset", FragmentRawOffset); - ResponseWriter.AddInteger("RawSize", ExtractRangeResult.RawSize); - OutResponsePackage.AddAttachment(CbAttachment(CompressedValue, FragmentHash)); - } - else - { - std::string ErrorString = - "Failed to get compression parameters from partial compressed buffer"; - ResponseWriter.AddString("Error", ErrorString); - ZEN_WARN("oplog '{}/{}': {}", ProjectId, OplogId, ErrorString); - } - } - else - { - ResponseWriter.AddHash("RawHash"sv, ExtractRangeResult.RawHash); - OutResponsePackage.AddAttachment( - CbAttachment(std::move(CompressedValue), ExtractRangeResult.RawHash)); - } - } - else - { - IoHashStream HashStream; - ZEN_ASSERT(ChunkRequest.Input.Id.index() == 1); - const Oid& Id = std::get<Oid>(ChunkRequest.Input.Id); - HashStream.Append(Id.OidBits, sizeof(Id.OidBits)); - HashStream.Append(&ChunkRequest.Input.Offset, sizeof(ChunkRequest.Input.Offset)); - HashStream.Append(&ChunkRequest.Input.Size, sizeof(ChunkRequest.Input.Size)); - IoHash Hash = HashStream.GetHash(); - - ResponseWriter.AddHash("Hash"sv, Hash); - if (ExtractRangeResult.RawSize != 0) - { - ResponseWriter.AddInteger("Size", ExtractRangeResult.RawSize); - } - OutResponsePackage.AddAttachment(CbAttachment(std::move(ExtractRangeResult.Chunk), Hash)); - } - } - else - { - std::string ErrorString = - fmt::format("Failed fetching chunk range ({})", ExtractRangeResult.ErrorDescription); - ResponseWriter.AddString("Error", ErrorString); - ZEN_WARN("oplog '{}/{}': {}", ProjectId, OplogId, ErrorString); - } - } - } - } - ResponseWriter.EndObject(); - } - } } - ResponseWriter.EndArray(); - OutResponsePackage.SetObject(ResponseWriter.Save()); - return {HttpResponseCode::OK, {}}; } - else + else if (CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); ChunksArray) { - return {HttpResponseCode::BadRequest, fmt::format("oplog '{}/{}': malformed getchunks rpc request object", ProjectId, OplogId)}; - } -} - -ProjectStore::WriteOplogResult -ProjectStore::WriteOplog(Project& Project, Oplog& Oplog, const CbObject& ContainerObject) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::WriteOplog"); - ZEN_UNUSED(Project); - - CidStore& ChunkStore = m_CidStore; - RwLock AttachmentsLock; - tsl::robin_set<IoHash, IoHash::Hasher> Attachments; - - auto HasAttachment = [&ChunkStore](const IoHash& RawHash) { return ChunkStore.ContainsChunk(RawHash); }; - auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) { - RwLock::ExclusiveLockScope _(AttachmentsLock); - if (BlockHash != IoHash::Zero) - { - Attachments.insert(BlockHash); - } - else - { - Attachments.insert(ChunkHashes.begin(), ChunkHashes.end()); - } - }; - auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) { - RwLock::ExclusiveLockScope _(AttachmentsLock); - Attachments.insert(RawHash); - }; - - auto OnChunkedAttachment = [](const ChunkedInfo&) {}; - - auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); }; + // Legacy full chunks only by rawhash - // Make sure we retain any attachments we download before writing the oplog - Oplog.EnableUpdateCapture(); - auto _ = MakeGuard([&Oplog]() { Oplog.DisableUpdateCapture(); }); + size_t RequestCount = ChunksArray.Num(); - RemoteProjectStore::Result RemoteResult = SaveOplogContainer(Oplog, - ContainerObject, - OnReferencedAttachments, - HasAttachment, - OnNeedBlock, - OnNeedAttachment, - OnChunkedAttachment, - nullptr); + Requests.reserve(RequestCount); - if (RemoteResult.ErrorCode) - { - return ProjectStore::WriteOplogResult{.ErrorCode = RemoteResult.ErrorCode, .ErrorDescription = RemoteResult.Reason}; + std::vector<IoHash> Cids; + Cids.reserve(ChunksArray.Num()); + for (CbFieldView FieldView : ChunksArray) + { + Requests.push_back(ProjectStore::ChunkRequest{.Id = FieldView.AsHash()}); + } } - - return ProjectStore::WriteOplogResult{.Need = std::vector<IoHash>(Attachments.begin(), Attachments.end())}; -} - -ProjectStore::ReadOplogResult -ProjectStore::ReadOplog(Project& Project, - Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - size_t MaxChunksPerBlock, - size_t ChunkFileSizeLimit) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::ReadOplog"); - - CidStore& ChunkStore = m_CidStore; - - RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( - ChunkStore, - Project, - Oplog, - MaxBlockSize, - MaxChunkEmbedSize, - MaxChunksPerBlock, - ChunkFileSizeLimit, - /* BuildBlocks */ false, - /* IgnoreMissingAttachments */ false, - /* AllowChunking*/ false, - [](CompressedBuffer&&, ChunkBlockDescription&&) {}, - [](const IoHash&, TGetAttachmentBufferFunc&&) {}, - [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, - /* EmbedLooseFiles*/ false); - - if (ContainerResult.ErrorCode) + else { - return ProjectStore::ReadOplogResult{.ErrorCode = ContainerResult.ErrorCode, .ErrorDescription = ContainerResult.Reason}; + throw std::runtime_error(fmt::format("oplog '{}/{}': malformed getchunks rpc request object", Project.Identifier, Oplog.OplogId())); } - - return ProjectStore::ReadOplogResult{.ContainerObject = std::move(ContainerResult.ContainerObject)}; + return Requests; } -bool -ProjectStore::Rpc(HttpServerRequest& HttpReq, - const std::string_view ProjectId, - const std::string_view OplogId, - IoBuffer&& Payload, - AuthMgr& AuthManager) +CbPackage +ProjectStore::WriteChunksRequestResponse(Project& Project, + Oplog& Oplog, + std::vector<ChunkRequest>&& Requests, + std::vector<ChunkResult>&& Results) { - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::Rpc"); - using namespace std::literals; - HttpContentType PayloadContentType = HttpReq.RequestContentType(); - CbPackage Package; - CbObject Cb; - switch (PayloadContentType) - { - case HttpContentType::kJSON: - case HttpContentType::kUnknownContentType: - case HttpContentType::kText: - { - std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); - Cb = LoadCompactBinaryFromJson(JsonText).AsObject(); - if (!Cb) - { - HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Content format not supported, expected JSON format"); - return false; - } - } - break; - case HttpContentType::kCbObject: - { - CbValidateError ValidateResult; - if (Cb = ValidateAndReadCompactBinaryObject(std::move(Payload), ValidateResult); - ValidateResult != CbValidateError::None || !Cb) - { - HttpReq.WriteResponse( - HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Content format not supported, expected compact binary format ('{}')", ToString(ValidateResult))); - return false; - } - break; - } - case HttpContentType::kCbPackage: - try - { - Package = ParsePackageMessage(Payload); - Cb = Package.GetObject(); - } - catch (const std::invalid_argument& ex) - { - HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - fmt::format("Failed to parse package request, reason: '{}'", ex.what())); - return false; - } - if (!Cb) - { - HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Content format not supported, expected package message format"); - return false; - } - break; - default: - HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type"); - return false; - } - - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Rpc oplog request for unknown project '{}'", ProjectId)); - return true; - } - Project->TouchProject(); - - std::string_view Method = Cb["method"sv].AsString(); - bool VerifyPathOnDisk = Method != "getchunks"sv; + CbPackage ResponsePackage; - Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk); - if (!Oplog) - { - HttpReq.WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)); - return true; - } - Project->TouchOplog(OplogId); - - uint32_t MethodHash = HashStringDjb2(Method); - - switch (MethodHash) + CbObjectWriter ResponseWriter(32 + Requests.size() * 64u); + ResponseWriter.BeginArray("Chunks"sv); { - case HashStringDjb2("import"sv): - { - if (!AreDiskWritesAllowed()) - { - HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - return true; - } - std::pair<HttpResponseCode, std::string> Result = - Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); - if (Result.second.empty()) - { - HttpReq.WriteResponse(Result.first); - return Result.first != HttpResponseCode::BadRequest; - } - HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - return true; - } - case HashStringDjb2("export"sv): - { - std::pair<HttpResponseCode, std::string> Result = Export(Project, *Oplog, Cb["params"sv].AsObjectView(), AuthManager); - HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - return true; - } - case HashStringDjb2("getchunks"sv): + for (size_t Index = 0; Index < Requests.size(); Index++) + { + const ChunkRequest& Request = Requests[Index]; + ChunkResult& Result = Results[Index]; + if (Result.Exists) { - ZEN_TRACE_CPU("Store::Rpc::getchunks"); - - RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u)); - int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0); - - CbPackage ResponsePackage; - std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage); - if (Result.first == HttpResponseCode::OK) + ResponseWriter.BeginObject(); { - void* TargetProcessHandle = nullptr; - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + if (Request.Id.index() == 0) { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) - { - Flags |= FormatFlags::kDenyPartialLocalReferences; - } - TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId); + const IoHash& RawHash = std::get<IoHash>(Request.Id); + ResponseWriter.AddHash("Id", RawHash); } - - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle); - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); - } - return true; - } - case HashStringDjb2("putchunks"sv): - { - ZEN_TRACE_CPU("Store::Rpc::putchunks"); - if (!AreDiskWritesAllowed()) - { - HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - return true; - } - - CbObject Object = Package.GetObject(); - const bool UsingTempFiles = Object["usingtmpfiles"].AsBool(false); - - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - if (!Attachments.empty()) - { - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - - WriteAttachmentBuffers.reserve(Attachments.size()); - WriteRawHashes.reserve(Attachments.size()); - - for (const CbAttachment& Attachment : Attachments) + else { - IoHash RawHash = Attachment.GetHash(); - const CompressedBuffer& Compressed = Attachment.AsCompressedBinary(); - IoBuffer AttachmentPayload = Compressed.GetCompressed().Flatten().AsIoBuffer(); - if (UsingTempFiles) - { - AttachmentPayload.SetDeleteOnClose(true); - } - WriteAttachmentBuffers.push_back(std::move(AttachmentPayload)); - WriteRawHashes.push_back(RawHash); + const Oid& Id = std::get<Oid>(Request.Id); + ResponseWriter.AddObjectId("Id", Id); } - - Oplog->CaptureAddedAttachments(WriteRawHashes); - m_CidStore.AddChunks(WriteAttachmentBuffers, - WriteRawHashes, - UsingTempFiles ? CidStore::InsertMode::kMayBeMovedInPlace : CidStore::InsertMode::kCopyOnly); - } - HttpReq.WriteResponse(HttpResponseCode::OK); - return true; - } - case HashStringDjb2("snapshot"sv): - { - ZEN_TRACE_CPU("Store::Rpc::snapshot"); - if (!AreDiskWritesAllowed()) - { - HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - return true; - } - - // Snapshot all referenced files. This brings the content of all - // files into the CID store - - uint32_t OpCount = 0; - uint64_t InlinedBytes = 0; - uint64_t InlinedFiles = 0; - uint64_t TotalBytes = 0; - uint64_t TotalFiles = 0; - - std::vector<CbObject> NewOps; - struct AddedChunk - { - IoBuffer Buffer; - uint64_t RawSize = 0; - }; - tsl::robin_map<IoHash, AddedChunk, IoHash::Hasher> AddedChunks; - - Oplog->IterateOplog( - [&](CbObjectView Op) { - bool OpRewritten = false; - bool AllOk = true; - - CbWriter FilesArrayWriter; - FilesArrayWriter.BeginArray("files"sv); - - for (CbFieldView& Field : Op["files"sv]) + if (!Request.ModTag.has_value() || Request.ModTag.value() != Result.ModTag) + { + ResponseWriter.AddInteger("ModTag", Result.ModTag); + if (!Request.SkipData) { - bool CopyField = true; - - if (CbObjectView View = Field.AsObjectView()) + auto ExtractRangeResult = ExtractRange(std::move(Result.ChunkBuffer), + Request.Offset, + Request.Size, + ZenContentType::kCompressedBinary); + if (ExtractRangeResult.Error == GetChunkRangeResult::EError::Ok) { - const IoHash DataHash = View["data"sv].AsHash(); - - if (DataHash == IoHash::Zero) + if (ExtractRangeResult.ContentType == ZenContentType::kCompressedBinary) { - std::string_view ServerPath = View["serverpath"sv].AsString(); - std::filesystem::path FilePath = Project->RootDir / ServerPath; - BasicFile DataFile; - std::error_code Ec; - DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); + ZEN_ASSERT(ExtractRangeResult.RawHash != IoHash::Zero); + CompressedBuffer CompressedValue = + CompressedBuffer::FromCompressedNoValidate(std::move(ExtractRangeResult.Chunk)); + ZEN_ASSERT(CompressedValue); - if (Ec) + if (ExtractRangeResult.RawSize != 0) { - // Error... - - ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message()); - - AllOk = false; - } - else - { - // Read file contents into memory, compress and keep in map of chunks to add to Cid store - IoBuffer FileIoBuffer = DataFile.ReadAll(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); - const uint64_t RawSize = Compressed.DecodeRawSize(); - const IoHash RawHash = Compressed.DecodeRawHash(); - if (!AddedChunks.contains(RawHash)) + // This really could use some thought so we don't send the same data if we get a request for + // multiple ranges from the same chunk block + + uint64_t FragmentRawOffset = 0; + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize = 0; + if (CompressedValue.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { - const std::filesystem::path TempChunkPath = Oplog->TempPath() / RawHash.ToHexString(); - BasicFile ChunkTempFile; - ChunkTempFile.Open(TempChunkPath, BasicFile::Mode::kTruncateDelete); - ChunkTempFile.Write(Compressed.GetCompressed(), 0, Ec); - if (Ec) + if (BlockSize > 0) { - Oid ChunkId = View["id"sv].AsObjectId(); - ZEN_ERROR("unable to write external file as compressed chunk '{}', id {}: {}", - FilePath, - ChunkId, - Ec.message()); - AllOk = false; + FragmentRawOffset = (Request.Offset / BlockSize) * BlockSize; } else { - void* FileHandle = ChunkTempFile.Detach(); - IoBuffer ChunkBuffer(IoBuffer::File, - FileHandle, - 0, - Compressed.GetCompressed().GetSize(), - /*IsWholeFile*/ true); - ChunkBuffer.SetDeleteOnClose(true); - AddedChunks.insert_or_assign( - RawHash, - AddedChunk{.Buffer = std::move(ChunkBuffer), .RawSize = RawSize}); + FragmentRawOffset = Request.Offset; } + uint64_t FragmentRawLength = CompressedValue.DecodeRawSize(); + + IoHashStream FragmentHashStream; + FragmentHashStream.Append(ExtractRangeResult.RawHash.Hash, + sizeof(ExtractRangeResult.RawHash.Hash)); + FragmentHashStream.Append(&FragmentRawOffset, sizeof(FragmentRawOffset)); + FragmentHashStream.Append(&FragmentRawLength, sizeof(FragmentRawLength)); + IoHash FragmentHash = FragmentHashStream.GetHash(); + + ResponseWriter.AddHash("FragmentHash", FragmentHash); + ResponseWriter.AddInteger("FragmentOffset", FragmentRawOffset); + ResponseWriter.AddInteger("RawSize", ExtractRangeResult.RawSize); + ResponsePackage.AddAttachment(CbAttachment(CompressedValue, FragmentHash)); + } + else + { + std::string ErrorString = "Failed to get compression parameters from partial compressed buffer"; + ResponseWriter.AddString("Error", ErrorString); + ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString); } - - TotalBytes += RawSize; - ++TotalFiles; - - // Rewrite file array entry with new data reference - CbObjectWriter Writer(View.GetSize()); - RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { - if (Field.GetName() == "data"sv) - { - // omit this field as we will write it explicitly ourselves - return true; - } - return false; - }); - Writer.AddBinaryAttachment("data"sv, RawHash); - - CbObject RewrittenOp = Writer.Save(); - FilesArrayWriter.AddObject(std::move(RewrittenOp)); - CopyField = false; + } + else + { + ResponseWriter.AddHash("RawHash"sv, ExtractRangeResult.RawHash); + ResponsePackage.AddAttachment(CbAttachment(std::move(CompressedValue), ExtractRangeResult.RawHash)); } } - } - - if (CopyField) - { - FilesArrayWriter.AddField(Field); - } - else - { - OpRewritten = true; - } - } - - if (OpRewritten && AllOk) - { - FilesArrayWriter.EndArray(); - CbArray FilesArray = FilesArrayWriter.Save().AsArray(); - - CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { - if (Field.GetName() == "files"sv) + else { - NewWriter.AddArray("files"sv, FilesArray); - - return true; + IoHashStream HashStream; + ZEN_ASSERT(Request.Id.index() == 1); + const Oid& Id = std::get<Oid>(Request.Id); + HashStream.Append(Id.OidBits, sizeof(Id.OidBits)); + HashStream.Append(&Request.Offset, sizeof(Request.Offset)); + HashStream.Append(&Request.Size, sizeof(Request.Size)); + IoHash Hash = HashStream.GetHash(); + + ResponseWriter.AddHash("Hash"sv, Hash); + if (ExtractRangeResult.RawSize != 0) + { + ResponseWriter.AddInteger("Size", ExtractRangeResult.RawSize); + } + ResponsePackage.AddAttachment(CbAttachment(std::move(ExtractRangeResult.Chunk), Hash)); } - - return false; - }); - - NewOps.push_back(std::move(RewrittenOp)); - } - - OpCount++; - }, - Oplog::Paging{}); - - CbObjectWriter ResponseObj; - - // Persist rewritten oplog entries - if (!NewOps.empty()) - { - ResponseObj.BeginArray("rewritten_ops"); - - for (CbObject& NewOp : NewOps) - { - ProjectStore::LogSequenceNumber NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); - - ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn.Number); - - ResponseObj.AddInteger(NewLsn.Number); - } - - ResponseObj.EndArray(); - } - - // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the - // new chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have - // unreferenced chunks. - for (auto It : AddedChunks) - { - const IoHash& RawHash = It.first; - AddedChunk& Chunk = It.second; - CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash); - if (Result.New) - { - InlinedBytes += Chunk.RawSize; - ++InlinedFiles; - } - } - - ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; - ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; - - ZEN_INFO("oplog '{}/{}': rewrote {} oplog entries (out of {})", ProjectId, OplogId, NewOps.size(), OpCount); - - HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save()); - return true; - } - case HashStringDjb2("addopattachments"sv): - { - ZEN_TRACE_CPU("Store::Rpc::addopattachments"); - if (!AreDiskWritesAllowed()) - { - HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - return true; - } - } - case HashStringDjb2("appendops"sv): - { - ZEN_TRACE_CPU("Store::Rpc::appendops"); - if (!AreDiskWritesAllowed()) - { - HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); - return true; - } - - CbArrayView OpsArray = Cb["ops"sv].AsArrayView(); - - size_t OpsBufferSize = 0; - for (CbFieldView OpView : OpsArray) - { - OpsBufferSize += OpView.GetSize(); - } - UniqueBuffer OpsBuffers = UniqueBuffer::Alloc(OpsBufferSize); - MutableMemoryView OpsBuffersMemory = OpsBuffers.GetMutableView(); - - std::vector<CbObjectView> Ops; - Ops.reserve(OpsArray.Num()); - for (CbFieldView OpView : OpsArray) - { - OpView.CopyTo(OpsBuffersMemory); - Ops.push_back(CbObjectView(OpsBuffersMemory.GetData())); - OpsBuffersMemory.MidInline(OpView.GetSize()); - } - - std::vector<ProjectStore::LogSequenceNumber> LSNs = Oplog->AppendNewOplogEntries(Ops); - ZEN_ASSERT(LSNs.size() == Ops.size()); - - std::vector<IoHash> MissingAttachments; - for (size_t OpIndex = 0; OpIndex < Ops.size(); OpIndex++) - { - if (LSNs[OpIndex]) - { - CbObjectView Op = Ops[OpIndex]; - Op.IterateAttachments([this, &MissingAttachments](CbFieldView AttachmentView) { - const IoHash Cid = AttachmentView.AsAttachment(); - if (!m_CidStore.ContainsChunk(Cid)) + } + else { - MissingAttachments.push_back(Cid); + std::string ErrorString = + fmt::format("Failed fetching chunk range ({})", ExtractRangeResult.ErrorDescription); + ResponseWriter.AddString("Error", ErrorString); + ZEN_WARN("oplog '{}/{}': {}", Project.Identifier, Oplog.OplogId(), ErrorString); } - }); - } - } - - CbPackage ResponsePackage; - - { - CbObjectWriter ResponseObj; - ResponseObj.BeginArray("written_ops"); - - for (ProjectStore::LogSequenceNumber NewLsn : LSNs) - { - ZEN_DEBUG("appended written op at LSN: {}", NewLsn.Number); - ResponseObj.AddInteger(NewLsn.Number); - } - ResponseObj.EndArray(); - - if (!MissingAttachments.empty()) - { - ResponseObj.BeginArray("need"); - - for (const IoHash& Cid : MissingAttachments) - { - ResponseObj.AddHash(Cid); } - ResponseObj.EndArray(); } - ResponsePackage.SetObject(ResponseObj.Save()); } - - std::vector<IoBuffer> ResponseBuffers = FormatPackageMessage(ResponsePackage); - - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponseBuffers); - return true; + ResponseWriter.EndObject(); } - default: - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); - return false; - } - return true; -} - -std::pair<HttpResponseCode, std::string> -ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::Export"); - - using namespace std::literals; - - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); - size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); - size_t MaxChunksPerBlock = Params["maxchunksperblock"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunksPerBlock); - size_t ChunkFileSizeLimit = Params["chunkfilesizelimit"sv].AsUInt64(RemoteStoreOptions::DefaultChunkFileSizeLimit); - bool Force = Params["force"sv].AsBool(false); - bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); - bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); - - CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); - - if (RemoteStoreResult.Store == nullptr) - { - return {HttpResponseCode::BadRequest, RemoteStoreResult.Description}; - } - std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); - RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); - - JobId JobId = m_JobQueue.QueueJob( - fmt::format("Export oplog '{}/{}'", Project->Identifier, Oplog.OplogId()), - [this, - ActualRemoteStore = std::move(RemoteStore), - Project, - OplogPtr = &Oplog, - MaxBlockSize, - MaxChunksPerBlock, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - EmbedLooseFile, - Force, - IgnoreMissingAttachments](JobContext& Context) { - Context.ReportMessage(fmt::format("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}", - Project->Identifier, - OplogPtr->OplogId(), - ActualRemoteStore->GetInfo().Description, - NiceBytes(MaxBlockSize), - NiceBytes(MaxChunkEmbedSize))); - - RemoteProjectStore::Result Result = SaveOplog(m_CidStore, - *ActualRemoteStore, - *Project.Get(), - *OplogPtr, - MaxBlockSize, - MaxChunksPerBlock, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - EmbedLooseFile, - Force, - IgnoreMissingAttachments, - &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, - (int)Response.first); - } - }); - - return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; -} - -std::pair<HttpResponseCode, std::string> -ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager) -{ - ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_TRACE_CPU("Store::Import"); - - using namespace std::literals; - - size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxBlockSize); - size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize); - bool Force = Params["force"sv].AsBool(false); - bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false); - bool CleanOplog = Params["clean"].AsBool(false); - - CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); - - if (RemoteStoreResult.Store == nullptr) - { - return {HttpResponseCode::BadRequest, RemoteStoreResult.Description}; + } } - std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); - RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + ResponseWriter.EndArray(); + ResponsePackage.SetObject(ResponseWriter.Save()); - JobId JobId = m_JobQueue.QueueJob( - fmt::format("Import oplog '{}/{}'", Project.Identifier, Oplog.OplogId()), - [this, - ChunkStore = &m_CidStore, - ActualRemoteStore = std::move(RemoteStore), - OplogPtr = &Oplog, - Force, - IgnoreMissingAttachments, - CleanOplog](JobContext& Context) { - Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}", - OplogPtr->GetOuterProjectIdentifier(), - OplogPtr->OplogId(), - ActualRemoteStore->GetInfo().Description)); - - RemoteProjectStore::Result Result = - LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, CleanOplog, &Context); - auto Response = ConvertResult(Result); - ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); - if (!IsHttpSuccessCode(Response.first)) - { - throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second, - (int)Response.first); - } - }); - - return {HttpResponseCode::Accepted, fmt::format("{}", JobId.Id)}; + return ResponsePackage; } bool @@ -7209,14 +6240,14 @@ OpKeyStringAsOid(std::string_view OpKey) namespace testutils { using namespace std::literals; - std::string OidAsString(const Oid& Id) + static std::string OidAsString(const Oid& Id) { StringBuilder<25> OidStringBuilder; Id.ToString(OidStringBuilder); return OidStringBuilder.ToString(); } - CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) + static CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) { CbPackage Package; CbObjectWriter Object; @@ -7242,9 +6273,9 @@ namespace testutils { return Package; }; - CbPackage CreateFilesOplogPackage(const Oid& Id, - const std::filesystem::path ProjectRootDir, - const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments) + static CbPackage CreateFilesOplogPackage(const Oid& Id, + const std::filesystem::path ProjectRootDir, + const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments) { CbPackage Package; CbObjectWriter Object; @@ -7268,7 +6299,7 @@ namespace testutils { return Package; }; - std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments( + static std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments( const std::span<const size_t>& Sizes, OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, uint64_t BlockSize = 0) @@ -7284,7 +6315,7 @@ namespace testutils { return Result; } - uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset) + static uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64_t RawOffset) { if (RawOffset > 0) { @@ -7416,8 +6447,6 @@ TEST_CASE("project.store.create") ScopedTemporaryDirectory TempDir; - auto JobQueue = MakeJobQueue(1, ""sv); - OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; @@ -7425,7 +6454,7 @@ TEST_CASE("project.store.create") std::string_view ProjectName("proj1"sv); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -7447,15 +6476,13 @@ TEST_CASE("project.store.lifetimes") ScopedTemporaryDirectory TempDir; - auto JobQueue = MakeJobQueue(1, ""sv); - OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -7480,135 +6507,6 @@ TEST_CASE("project.store.lifetimes") CHECK(Project->Identifier == "proj1"sv); } -struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse -{ - static const bool ForceDisableBlocks = true; - static const bool ForceEnableTempBlocks = false; -}; - -struct ExportForceDisableBlocksFalse_ForceTempBlocksFalse -{ - static const bool ForceDisableBlocks = false; - static const bool ForceEnableTempBlocks = false; -}; - -struct ExportForceDisableBlocksFalse_ForceTempBlocksTrue -{ - static const bool ForceDisableBlocks = false; - static const bool ForceEnableTempBlocks = true; -}; - -TEST_CASE_TEMPLATE("project.store.export", - Settings, - ExportForceDisableBlocksTrue_ForceTempBlocksFalse, - ExportForceDisableBlocksFalse_ForceTempBlocksFalse, - ExportForceDisableBlocksFalse_ForceTempBlocksTrue) -{ - using namespace std::literals; - using namespace testutils; - - ScopedTemporaryDirectory TempDir; - ScopedTemporaryDirectory ExportDir; - - auto JobQueue = MakeJobQueue(1, ""sv); - OpenProcessCache ProcessCache; - GcManager Gc; - CidStore CidStore(Gc); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); - std::filesystem::path RootDir = TempDir.Path() / "root"; - std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; - std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; - std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; - - Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv, - "proj1"sv, - RootDir.string(), - EngineRootDir.string(), - ProjectRootDir.string(), - ProjectFilePath.string())); - Ref<ProjectStore::Oplog> Oplog = Project->NewOplog("oplog1", {}); - CHECK(Oplog); - - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); - Oplog->AppendNewOplogEntry( - CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); - Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( - Oid::NewOid(), - CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None))); - - FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024, - .MaxChunksPerBlock = 1000, - .MaxChunkEmbedSize = 32 * 1024u, - .ChunkFileSizeLimit = 64u * 1024u}, - /*.FolderPath = */ ExportDir.Path(), - /*.Name = */ std::string("oplog1"), - /*OptionalBaseName = */ std::string(), - /*.ForceDisableBlocks = */ Settings::ForceDisableBlocks, - /*.ForceEnableTempBlocks = */ Settings::ForceEnableTempBlocks}; - std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options); - RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); - - RemoteProjectStore::Result ExportResult = SaveOplog(CidStore, - *RemoteStore, - *Project.Get(), - *Oplog, - Options.MaxBlockSize, - Options.MaxChunksPerBlock, - Options.MaxChunkEmbedSize, - Options.ChunkFileSizeLimit, - true, - false, - false, - nullptr); - - CHECK(ExportResult.ErrorCode == 0); - - Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {}); - CHECK(OplogImport); - - RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - /*Force*/ false, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ false, - nullptr); - CHECK(ImportResult.ErrorCode == 0); - - RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - /*Force*/ true, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ false, - nullptr); - CHECK(ImportForceResult.ErrorCode == 0); - - RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - /*Force*/ false, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ true, - nullptr); - CHECK(ImportCleanResult.ErrorCode == 0); - - RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - /*Force*/ true, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ true, - nullptr); - CHECK(ImportForceCleanResult.ErrorCode == 0); -} - TEST_CASE("project.store.gc") { using namespace std::literals; @@ -7616,15 +6514,13 @@ TEST_CASE("project.store.gc") ScopedTemporaryDirectory TempDir; - auto JobQueue = MakeJobQueue(1, ""sv); - OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; @@ -7817,15 +6713,13 @@ TEST_CASE("project.store.gc.prep") ScopedTemporaryDirectory TempDir; - auto JobQueue = MakeJobQueue(1, ""sv); - OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; @@ -8033,15 +6927,13 @@ TEST_CASE("project.store.rpc.getchunks") ScopedTemporaryDirectory TempDir; - auto JobQueue = MakeJobQueue(1, ""sv); - OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; @@ -8085,24 +6977,30 @@ TEST_CASE("project.store.rpc.getchunks") Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(FilesOpId, RootDir, FilesOpIdAttachments)); } + auto GetChunks = [](zen::ProjectStore& ProjectStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, const CbObject& Cb) { + std::vector<ProjectStore::ChunkRequest> Requests = ProjectStore.ParseChunksRequests(Project, Oplog, Cb); + std::vector<ProjectStore::ChunkResult> Results = + Requests.empty() ? std::vector<ProjectStore::ChunkResult>{} : ProjectStore.GetChunks(Project, Oplog, Requests); + return ProjectStore.WriteChunksRequestResponse(Project, Oplog, std::move(Requests), std::move(Results)); + }; + + Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); + CHECK(Project1); + Ref<ProjectStore::Oplog> Oplog1 = Project1->OpenOplog("oplog1"sv, false, true); + CHECK(Oplog1); // Invalid request { CbObjectWriter Request; Request.BeginObject("WrongName"sv); Request.EndObject(); CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, "oplog1"sv, Request.Save(), Response); - CHECK_EQ(HttpResponseCode::BadRequest, Result.first); + CHECK_THROWS(GetChunks(ProjectStore, *Project1, *Oplog1, Request.Save())); } // Empty request { - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, std::vector<IoHash>{}, {}, {}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, std::vector<IoHash>{}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(0, Chunks.Num()); @@ -8110,23 +7008,15 @@ TEST_CASE("project.store.rpc.getchunks") // Single non-existing chunk by RawHash IoHash NotFoundIoHash = IoHash::Max; { - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundIoHash}, {}, {}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundIoHash}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(0, Chunks.Num()); } { - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundIoHash}, {}, {}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundIoHash}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(0, Chunks.Num()); @@ -8134,21 +7024,15 @@ TEST_CASE("project.store.rpc.getchunks") // Single non-existing chunk by Id Oid NotFoundId = Oid::NewOid(); { - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundId}, {}, {}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundId}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(0, Chunks.Num()); } { - CbPackage Response; - auto Result = - ProjectStore.GetChunks("proj1"sv, "oplog1"sv, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundId}, {}, {}), Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundId}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(0, Chunks.Num()); @@ -8159,13 +7043,11 @@ TEST_CASE("project.store.rpc.getchunks") IoHash FirstAttachmentHash = Attachments[OpIds[2]][1].second.DecodeRawHash(); uint64_t ResponseModTag = 0; { - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {}), - Response); + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {})); - CHECK_EQ(HttpResponseCode::OK, Result.first); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8185,13 +7067,11 @@ TEST_CASE("project.store.rpc.getchunks") } // Fetch with matching ModTag { - CbPackage Response; - auto Result = - ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {ResponseModTag}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {ResponseModTag})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8206,14 +7086,12 @@ TEST_CASE("project.store.rpc.getchunks") } // Fetch with mismatching ModTag { - CbPackage Response; - auto Result = ProjectStore.GetChunks( - "proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}), - Response); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)})); - CHECK_EQ(HttpResponseCode::OK, Result.first); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8233,12 +7111,10 @@ TEST_CASE("project.store.rpc.getchunks") } // Fresh modtime query { - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8253,13 +7129,11 @@ TEST_CASE("project.store.rpc.getchunks") } // Modtime query with matching ModTag { - CbPackage Response; - auto Result = - ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {ResponseModTag}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {ResponseModTag})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8274,13 +7148,11 @@ TEST_CASE("project.store.rpc.getchunks") } // Modtime query with mismatching ModTag { - CbPackage Response; - auto Result = ProjectStore.GetChunks( - "proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8300,12 +7172,8 @@ TEST_CASE("project.store.rpc.getchunks") uint64_t ResponseModTag = 0; { // Full chunk request - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8325,13 +7193,11 @@ TEST_CASE("project.store.rpc.getchunks") } { // Partial chunk request - CbPackage Response; - auto Result = - ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{130 * 1024, 8100}}, {}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{130 * 1024, 8100}}, {})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8356,13 +7222,11 @@ TEST_CASE("project.store.rpc.getchunks") } { // Fetch with matching ModTag - CbPackage Response; - auto Result = - ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8377,13 +7241,11 @@ TEST_CASE("project.store.rpc.getchunks") } { // Fetch with mismatching ModTag - CbPackage Response; - auto Result = ProjectStore.GetChunks( - "proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8403,12 +7265,8 @@ TEST_CASE("project.store.rpc.getchunks") } // Fresh modtime query { - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8423,13 +7281,11 @@ TEST_CASE("project.store.rpc.getchunks") } // Modtime query with matching ModTag { - CbPackage Response; - auto Result = - ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8444,13 +7300,11 @@ TEST_CASE("project.store.rpc.getchunks") } // Modtime query with mismatching ModTag { - CbPackage Response; - auto Result = ProjectStore.GetChunks( - "proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8471,12 +7325,8 @@ TEST_CASE("project.store.rpc.getchunks") uint64_t ResponseModTag = 0; { // Full chunk request - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8496,13 +7346,11 @@ TEST_CASE("project.store.rpc.getchunks") } { // Partial chunk request - CbPackage Response; - auto Result = - ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{81823, 5434}}, {}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{81823, 5434}}, {})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8524,13 +7372,11 @@ TEST_CASE("project.store.rpc.getchunks") } { // Fetch with matching ModTag - CbPackage Response; - auto Result = - ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8545,13 +7391,11 @@ TEST_CASE("project.store.rpc.getchunks") } { // Fetch with mismatching ModTag - CbPackage Response; - auto Result = ProjectStore.GetChunks( - "proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); CHECK_EQ(1, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8571,12 +7415,8 @@ TEST_CASE("project.store.rpc.getchunks") } // Fresh modtime query { - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8591,13 +7431,11 @@ TEST_CASE("project.store.rpc.getchunks") } // Modtime query with matching ModTag { - CbPackage Response; - auto Result = - ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8612,13 +7450,11 @@ TEST_CASE("project.store.rpc.getchunks") } // Modtime query with mismatching ModTag { - CbPackage Response; - auto Result = ProjectStore.GetChunks( - "proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}), - Response); - CHECK_EQ(HttpResponseCode::OK, Result.first); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)})); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(1, Chunks.Num()); @@ -8644,13 +7480,9 @@ TEST_CASE("project.store.rpc.getchunks") std::vector<uint64_t> ResponseModTags(3, 0); { // Fresh fetch - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, {}), - Response); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, {})); - CHECK_EQ(HttpResponseCode::OK, Result.first); CHECK_EQ(3, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); @@ -8677,13 +7509,11 @@ TEST_CASE("project.store.rpc.getchunks") } { // Fetch with matching ModTag - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, ResponseModTags), - Response); + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, ResponseModTags)); - CHECK_EQ(HttpResponseCode::OK, Result.first); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); @@ -8704,13 +7534,9 @@ TEST_CASE("project.store.rpc.getchunks") } { // Fresh modtime query - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, {}), - Response); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, {})); - CHECK_EQ(HttpResponseCode::OK, Result.first); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); @@ -8731,13 +7557,11 @@ TEST_CASE("project.store.rpc.getchunks") } { // Modtime query with matching ModTags - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, ResponseModTags), - Response); + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, ResponseModTags)); - CHECK_EQ(HttpResponseCode::OK, Result.first); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); @@ -8763,13 +7587,12 @@ TEST_CASE("project.store.rpc.getchunks") { Tag++; } - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, MismatchingModTags), - Response); + const CbPackage& Response = + GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, MismatchingModTags)); - CHECK_EQ(HttpResponseCode::OK, Result.first); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); @@ -8801,13 +7624,9 @@ TEST_CASE("project.store.rpc.getchunks") std::vector<uint64_t> ResponseModTags(3, 0); { // Fresh fetch - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, {}), - Response); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, {})); - CHECK_EQ(HttpResponseCode::OK, Result.first); CHECK_EQ(3, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); @@ -8834,13 +7653,11 @@ TEST_CASE("project.store.rpc.getchunks") } { // Fetch with matching ModTag - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, ResponseModTags), - Response); + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, ResponseModTags)); - CHECK_EQ(HttpResponseCode::OK, Result.first); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); @@ -8861,13 +7678,9 @@ TEST_CASE("project.store.rpc.getchunks") } { // Fresh modtime query - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, {}), - Response); + const CbPackage& Response = + GetChunks(ProjectStore, *Project1, *Oplog1, testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, {})); - CHECK_EQ(HttpResponseCode::OK, Result.first); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); @@ -8888,13 +7701,11 @@ TEST_CASE("project.store.rpc.getchunks") } { // Modtime query with matching ModTags - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, ResponseModTags), - Response); + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, ResponseModTags)); - CHECK_EQ(HttpResponseCode::OK, Result.first); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); @@ -8920,13 +7731,11 @@ TEST_CASE("project.store.rpc.getchunks") { Tag++; } - CbPackage Response; - auto Result = ProjectStore.GetChunks("proj1"sv, - "oplog1"sv, - testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, MismatchingModTags), - Response); + const CbPackage& Response = GetChunks(ProjectStore, + *Project1, + *Oplog1, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, MismatchingModTags)); - CHECK_EQ(HttpResponseCode::OK, Result.first); CHECK_EQ(0, Response.GetAttachments().size()); CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); CHECK_EQ(3, Chunks.Num()); @@ -8955,15 +7764,13 @@ TEST_CASE("project.store.partial.read") ScopedTemporaryDirectory TempDir; - auto JobQueue = MakeJobQueue(1, ""sv); - OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; @@ -9009,7 +7816,7 @@ TEST_CASE("project.store.partial.read") Attachments[OpIds[1]][0].first, 0, ~0ull, - HttpContentType::kCompressedBinary, + ZenContentType::kCompressedBinary, &ModificationTag); CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::Ok, Result.Error); @@ -9025,7 +7832,7 @@ TEST_CASE("project.store.partial.read") Attachments[OpIds[1]][0].first, 0, ~0ull, - HttpContentType::kCompressedBinary, + ZenContentType::kCompressedBinary, &ModificationTag); CHECK_EQ(ProjectStore::GetChunkRangeResult::EError::NotModified, Result2.Error); } @@ -9039,7 +7846,7 @@ TEST_CASE("project.store.partial.read") Attachments[OpIds[2]][1].first, 0, ~0ull, - HttpContentType::kCompressedBinary, + ZenContentType::kCompressedBinary, &FullChunkModificationTag); CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok); CHECK(Result.Chunk); @@ -9053,7 +7860,7 @@ TEST_CASE("project.store.partial.read") Attachments[OpIds[2]][1].first, 0, ~0ull, - HttpContentType::kCompressedBinary, + ZenContentType::kCompressedBinary, &FullChunkModificationTag); CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified); } @@ -9067,7 +7874,7 @@ TEST_CASE("project.store.partial.read") Attachments[OpIds[2]][1].first, 5, 1773, - HttpContentType::kCompressedBinary, + ZenContentType::kCompressedBinary, &PartialChunkModificationTag); CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::Ok); @@ -9091,7 +7898,7 @@ TEST_CASE("project.store.partial.read") Attachments[OpIds[2]][1].first, 0, 1773, - HttpContentType::kCompressedBinary, + ZenContentType::kCompressedBinary, &PartialChunkModificationTag); CHECK_EQ(Result.Error, ProjectStore::GetChunkRangeResult::EError::NotModified); } @@ -9133,15 +7940,13 @@ TEST_CASE("project.store.iterateoplog") ScopedTemporaryDirectory TempDir; - auto JobQueue = MakeJobQueue(1, ""sv); - OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "enginesv"; |