diff options
| author | Zousar Shaker <[email protected]> | 2024-12-06 21:47:11 -0700 |
|---|---|---|
| committer | Zousar Shaker <[email protected]> | 2024-12-06 21:47:11 -0700 |
| commit | 5688fa8f46fe3cd32f283adcc590e447174c58a8 (patch) | |
| tree | dccaef1a87cf454639f45e330732221ce4eb1712 | |
| parent | changelog (diff) | |
| parent | 5.5.16-pre0 (diff) | |
| download | zen-zs/xrepo.tar.xz zen-zs/xrepo.zip | |
Merge branch 'main' into zs/xrepozs/xrepo
25 files changed, 2056 insertions, 440 deletions
diff --git a/.gitignore b/.gitignore index 803809da9..b4d5d8417 100644 --- a/.gitignore +++ b/.gitignore @@ -14,9 +14,7 @@ # Build results [Dd]ebug/ -[Dd]ebugPublic/ [Rr]elease/ -[Rr]eleases/ x64/ x86/ build/ @@ -37,34 +35,6 @@ vs20*/ # Uncomment if you have tasks that create the project's static files in wwwroot #wwwroot/ -# Visual Studio 2017 auto generated files -Generated\ Files/ - -# MSTest test Results -[Tt]est[Rr]esult*/ -[Bb]uild[Ll]og.* - -# NUNIT -*.VisualState.xml -TestResult.xml - -# Build Results of an ATL Project -[Dd]ebugPS/ -[Rr]eleasePS/ -dlldata.c - -# Benchmark Results -BenchmarkDotNet.Artifacts/ - -# .NET Core -project.lock.json -project.fragment.lock.json -artifacts/ -**/Properties/launchSettings.json - -# StyleCop -StyleCopReport.xml - # Files built by Visual Studio *_i.c *_p.c @@ -93,9 +63,6 @@ StyleCopReport.xml *.svclog *.scc -# Chutzpah Test files -_Chutzpah* - # Visual C++ cache files ipch/ *.aps @@ -116,83 +83,15 @@ ipch/ # Visual Studio Trace Files *.e2e -# TFS 2012 Local Workspace -$tf/ - -# Guidance Automation Toolkit -*.gpState - -# ReSharper is a .NET coding add-in -_ReSharper*/ -*.[Rr]e[Ss]harper -*.DotSettings.user - -# JustCode is a .NET coding add-in -.JustCode - -# TeamCity is a build add-in -_TeamCity* - -# DotCover is a Code Coverage Tool -*.dotCover - -# AxoCover is a Code Coverage Tool -.axoCover/* -!.axoCover/settings.json - -# Visual Studio code coverage results -*.coverage -*.coveragexml - -# NCrunch -_NCrunch_* -.*crunch*.local.xml -nCrunchTemp_* - -# MightyMoose -*.mm.* -AutoTest.Net/ - # Web workbench (sass) .sass-cache/ -# Installshield output folder -[Ee]xpress/ - - - - -# Windows Store app package directories and files -AppPackages/ -BundleArtifacts/ -Package.StoreAssociation.xml -_pkginfo.txt -*.appx - # Visual Studio cache files # files ending in .cache can be ignored *.[Cc]ache # but keep track of directories ending in .cache !*.[Cc]ache/ - - - -# Backup & report files from converting an old project file -# to a newer Visual Studio version. Backup files are not needed, -# because we have git ;-) -_UpgradeReport_Files/ -Backup*/ -UpgradeLog*.XML -UpgradeLog*.htm -ServiceFabricBackup/ -*.rptproj.bak - - -# Python Tools for Visual Studio (PTVS) -__pycache__/ -*.pyc - /vcpkg_installed .data/ .minio_data/ @@ -214,14 +113,10 @@ makefile /*.xcodeproj/ CMakefiles/ CMake* -!cmake-deps.patch # ClangD cache directory .cache .cache/ -# Ue tool chain temp directory, or any other temporary directory -.tmp*/ - -# Mac -.DS_Store
\ No newline at end of file +# Ue tool chain temp directory +.tmp-ue-toolchain/ diff --git a/CHANGELOG.md b/CHANGELOG.md index b87da2acd..e40840f65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,66 @@ ## -- Bugfix: Fix for c4core dependency with xmake repo configuration +- Feature: Project store "getchunks" rpc call `/prj/{project}/oplog/{log}/rpc` extended to accept both CAS (RawHash) and Id (Oid) identifiers as well as partial ranges + - Legacy call still has and `chunks` array in the request body with IoHash entries providing CAS data for whole chunks only + - New call has a top level `Request` object in request body with the following elements: + - `SkipData`: bool, optional, default `false` + - If `SkipData` is set to true we will not include the payload of the chunk, just the information requested for the chunk + - `Chunks`: array of objects for the requests + - `RawHash`: IoHash + - Indicates we want a Cid chunk which is stored in CAS and identified by the RawHash of the chunk. + - Mutually exclusive with `Oid`. + - The value of the field will be used as the `Id` field in the chunk response. + - `Oid`: IoChunkId + - Indicates we want a data chunk via its Id. May be stored in CAS or be a reference to a loose file on disk. + - Mutually exclusive with `RawHash`. + - The value of the field will be used as the `Id` field in the chunk response. + - `Offset`: uint64, optional + - Partial request offset into the requested chunk. Default is from start of chunk (0). + - `Size`: uint64, optional + - Partial request size of the requested chunk. Default is to end of chunk (-1). + - `ModTag`: uint64, optional + - A tag indicating the modification version of the chunk. + - If `SkipData` is set to false and the `ModTag` matches the current modification version in zenserver the chunk response will only contain an `Id` field. + - If `SkipData` is set to false and the `ModTag` does not match the current modification version in zenserver the response will contains an `Id` field, current `ModTag` and the chunk payload. + - If `SkipData` is set to true and the `ModTag` matches the current modification version in zenserver the chunk response will only contain an `Id` field. + - If `SkipData` is set to true and the `ModTag` does not match the current modification version in zenserver the response will contains an `Id` field and the current `ModTag`. + - New call responds with a zen Http CbPackage (with the header magic `0xaa77aacc`) with the following elements in the included package object: + - `Chunks`: array of objects for the response + - `Id`: IoHash or IoChunkId + - This is the identifier used for the chunk - if the requested chunk can not be found by zenserver the chunk will be omitted from the reponse `Chunks` array. + - `Error`: string + - If the chunk was found but there was an error processing the chunk to fulfill the request the reason will be added as a descriptive string. + - Mutually exclusive with `ModTag`, `Hash`, `RawHash` and `FragmentHash`. + - `ModTag`: uint64 + - This field is set if the `ModTag` for the chunk was not present in the request or the current `ModTag` for the chunk does not match the `ModTag` in the request. + - Mutually exclusive with `Error`. + - `Hash`: IoHash + - If the response payload was found and `SkipData` is false and `ModTag` in the request does not match the current `ModTag` this is the identifier for the attachment data. + - A `Hash` field indicates that the response for the chunk is attached as uncompressed data. + - If a range is given for the chunk the attached data is limited to the requested range. + - Mutually exclusive with `RawHash`, `FragmentHash` amd `Error`. + - `RawHash`: IoHash + - If the response payload was found and `SkipData` is false and `ModTag` in the request does not match the current `ModTag` this is the identifier for the attachment data. + - A `RawHash` field indicates that the response for the chunk is attached as compressed data. + - If a range is given for the chunk the this field will be replaced with a `FragmentHash` field. + - Mutually exclusive with `Hash`, `FragmentHash` and `Error`. + - `FragmentHash`: IoHash + - If the response payload was found and `SkipData` is false and `ModTag` in the request does not match the current `ModTag` this is the identifier for the attachment data. + - A `FragmentHash` field indicates that the response for the chunk is a partial chunk as compressed data. Compressed data ranges will always be sent as full compressed blocks covering the requested range. + - If `FragmentHash` is present, a `FragmentOffset` field will also be present. + - Mutually exclusive with `Hash`, `RawHash` and `Error`. + - `FragmentOffset`: uint64 + - Indicates at which offset the partial chunk of compressed data starts. Compressed data is compressed into blocks and ranges for compressed data will always send full compressed blocks. + - `FragmentOffset` will be less or equal to the requested `Offset`. + - `FragmentOffset` plus the decompressed attachment size will always be more or equal to the requested `Offset` + `Size`. + - Only present if `FragmentHash` is present. + - `Size`: uint64 + - If the uncompressed chunk response is not the full chunk the `Size` field will contain the chunks full size + - Mutually exclusive with `RawHash` and `Error`. + - `RawSize`: uint64 + - If the compressed chunk response is not the full chunk the `RawSize` field will contain the chunk full decompressed size + - Mutually exclusive with `RawHash`, `Hash` and `Error`. +- Improvement: Build release binaries with LTO on Windows/Mac +- Improvement: Release binaries now build with "faster" instead of "smaller" optimization flags ## 5.5.15 - Bugfix: Fix returned content type when requesting a project store chunk with non-compressed accept type diff --git a/VERSION.txt b/VERSION.txt index 6094e9165..7b6a98777 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1 +1 @@ -5.5.15 +5.5.16-pre0 diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 36147c5a9..52f2c4adc 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -1435,12 +1435,34 @@ FileSizeFromHandle(void* NativeHandle) int Fd = int(intptr_t(NativeHandle)); struct stat Stat; fstat(Fd, &Stat); - FileSize = size_t(Stat.st_size); + FileSize = size_t(Stat.st_size); #endif return FileSize; } +uint64_t +GetModificationTickFromHandle(void* NativeHandle, std::error_code& Ec) +{ +#if ZEN_PLATFORM_WINDOWS + FILETIME LastWriteTime; + BOOL OK = GetFileTime((HANDLE)NativeHandle, NULL, NULL, &LastWriteTime); + if (OK) + { + return ((uint64_t(LastWriteTime.dwHighDateTime) << 32) | LastWriteTime.dwLowDateTime); + } +#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + int Fd = int(uintptr_t(NativeHandle)); + struct stat Stat; + if (0 == fstat(Fd, &Stat)) + { + return gsl::narrow<uint64_t>(Stat.st_mtime); + } +#endif + Ec = MakeErrorCodeFromLastError(); + return 0; +} + std::filesystem::path GetRunningExecutablePath() { diff --git a/src/zencore/include/zencore/compactbinarypackage.h b/src/zencore/include/zencore/compactbinarypackage.h index 064481f83..12fcc41b7 100644 --- a/src/zencore/include/zencore/compactbinarypackage.h +++ b/src/zencore/include/zencore/compactbinarypackage.h @@ -64,6 +64,9 @@ public: ZENCORE_API CbAttachment(const CompressedBuffer& InValue, const IoHash& Hash); ZENCORE_API CbAttachment(CompressedBuffer&& InValue, const IoHash& Hash); + /** Construct a binary attachment. Value is cloned if not owned. */ + ZENCORE_API CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash); + /** Reset this to a null attachment. */ inline void Reset() { *this = CbAttachment(); } @@ -130,7 +133,6 @@ public: private: ZENCORE_API CbAttachment(const CbObject& Value, const IoHash* Hash); ZENCORE_API explicit CbAttachment(CompositeBuffer&& InValue); - ZENCORE_API CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash); IoHash Hash; std::variant<std::nullptr_t, CbObject, CompositeBuffer, CompressedBuffer> Value; diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h index 2cd663afb..dba4981f0 100644 --- a/src/zencore/include/zencore/filesystem.h +++ b/src/zencore/include/zencore/filesystem.h @@ -45,6 +45,10 @@ ZENCORE_API std::filesystem::path CanonicalPath(std::filesystem::path InPath, st */ ZENCORE_API uint64_t FileSizeFromHandle(void* NativeHandle); +/** Get a native time tick of last modification time + */ +ZENCORE_API uint64_t GetModificationTickFromHandle(void* NativeHandle, std::error_code& Ec); + ZENCORE_API std::filesystem::path GetRunningExecutablePath(); /** Set the max open file handle count to max allowed for the current process on Linux and MacOS diff --git a/src/zencore/include/zencore/memory/align.h b/src/zencore/include/zencore/memory/align.h index acf4157c4..9d4101fab 100644 --- a/src/zencore/include/zencore/memory/align.h +++ b/src/zencore/include/zencore/memory/align.h @@ -1,5 +1,7 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#pragma once + #include <zenbase/zenbase.h> namespace zen { diff --git a/src/zencore/include/zencore/testutils.h b/src/zencore/include/zencore/testutils.h index 6a1c0184b..45fde4eda 100644 --- a/src/zencore/include/zencore/testutils.h +++ b/src/zencore/include/zencore/testutils.h @@ -33,6 +33,7 @@ struct ScopedCurrentDirectoryChange }; IoBuffer CreateRandomBlob(uint64_t Size); +IoBuffer CreateSemiRandomBlob(uint64_t Size); struct FalseType { diff --git a/src/zencore/testutils.cpp b/src/zencore/testutils.cpp index d4c8aeaef..641d5508a 100644 --- a/src/zencore/testutils.cpp +++ b/src/zencore/testutils.cpp @@ -71,6 +71,26 @@ CreateRandomBlob(uint64_t Size) return Data; }; +IoBuffer +CreateSemiRandomBlob(uint64_t Size) +{ + IoBuffer Result(Size); + const size_t PartCount = (Size / (1u * 1024u * 64)) + 1; + const size_t PartSize = Size / PartCount; + auto Part = CreateRandomBlob(PartSize); + auto Remain = Result.GetMutableView().CopyFrom(Part.GetView()); + while (Remain.GetSize() >= PartSize) + { + Remain = Remain.CopyFrom(Part.GetView()); + } + if (Remain.GetSize() > 0) + { + auto RemainBuffer = CreateRandomBlob(Remain.GetSize()); + Remain.CopyFrom(RemainBuffer.GetView()); + } + return Result; +}; + } // namespace zen #endif // ZEN_WITH_TESTS diff --git a/src/zencore/xmake.lua b/src/zencore/xmake.lua index ce8cd0996..2f82d38a3 100644 --- a/src/zencore/xmake.lua +++ b/src/zencore/xmake.lua @@ -13,6 +13,7 @@ target('zencore') end) set_configdir("include/zencore") add_files("**.cpp") + add_files("trace.cpp", {unity_ignored = true }) if has_config("zenrpmalloc") then set_languages("c17", "cxx20") diff --git a/src/zenhttp/servers/httpplugin.cpp b/src/zenhttp/servers/httpplugin.cpp index 09cd76f3e..9c2110476 100644 --- a/src/zenhttp/servers/httpplugin.cpp +++ b/src/zenhttp/servers/httpplugin.cpp @@ -27,14 +27,6 @@ # include <conio.h> # endif -# define PLUGIN_VERBOSE_TRACE 1 - -# if PLUGIN_VERBOSE_TRACE -# define ZEN_TRACE_VERBOSE ZEN_TRACE -# else -# define ZEN_TRACE_VERBOSE(fmtstr, ...) -# endif - namespace zen { struct HttpPluginServerImpl; diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index a6606c7ad..fd116ba8e 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -86,7 +86,8 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache, - const DiskWriteBlocker* InDiskWriteBlocker) + const DiskWriteBlocker* InDiskWriteBlocker, + OpenProcessCache& InOpenProcessCache) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_StatsService(StatsService) @@ -94,6 +95,7 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach , m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) , m_DiskWriteBlocker(InDiskWriteBlocker) +, m_OpenProcessCache(InOpenProcessCache) , m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker) { m_StatsService.RegisterHandler("z$", *this); diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h index 812162efa..13c1d6475 100644 --- a/src/zenserver/cache/httpstructuredcache.h +++ b/src/zenserver/cache/httpstructuredcache.h @@ -75,7 +75,8 @@ public: HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache, - const DiskWriteBlocker* InDiskWriteBlocker); + const DiskWriteBlocker* InDiskWriteBlocker, + OpenProcessCache& InOpenProcessCache); ~HttpStructuredCacheService(); virtual const char* BaseUri() const override; @@ -122,7 +123,7 @@ private: metrics::OperationTiming m_UpstreamGetRequestTiming; CacheStats m_CacheStats; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; - OpenProcessCache m_OpenProcessCache; + OpenProcessCache& m_OpenProcessCache; CacheRpcHandler m_RpcHandler; void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 22216e88d..09f4c7ee2 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -561,7 +561,7 @@ HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req) for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex) { const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex]; - IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId); + IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId, nullptr); if (FoundChunk) { if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1)) @@ -834,7 +834,7 @@ HttpProjectService::HandleChunkByIdRequest(HttpRouterRequest& Req) CompositeBuffer Chunk; HttpContentType ContentType; std::pair<HttpResponseCode, std::string> Result = - m_ProjectStore->GetChunkRange(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk, ContentType); + m_ProjectStore->GetChunkRange(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk, ContentType, nullptr); if (Result.first == HttpResponseCode::OK) { m_ProjectStats.ChunkHitCount++; @@ -883,8 +883,8 @@ HttpProjectService::HandleChunkByCidRequest(HttpRouterRequest& Req) case HttpVerb::kGet: { IoBuffer Value; - std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value); - + std::pair<HttpResponseCode, std::string> Result = + m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value, nullptr); if (Result.first == HttpResponseCode::OK) { m_ProjectStats.ChunkHitCount++; @@ -991,7 +991,7 @@ HttpProjectService::HandleOplogOpPrepRequest(HttpRouterRequest& Req) std::vector<IoHash> NeedList = FoundLog->CheckPendingChunkReferences(ChunkList, std::chrono::minutes(2)); - CbObjectWriter Cbo; + CbObjectWriter Cbo(1 + 1 + 5 + NeedList.size() * (1 + sizeof(IoHash::Hash)) + 1); Cbo.BeginArray("need"); { for (const IoHash& Hash : NeedList) @@ -1544,7 +1544,7 @@ LoadReferencedSet(ProjectStore::Oplog& Log) return std::optional<OplogReferencedSet>(); } - return OplogReferencedSet::LoadFromChunk(Log.FindChunk(ChunkId)); + return OplogReferencedSet::LoadFromChunk(Log.FindChunk(ChunkId, nullptr)); } void diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 415c80078..70045c13c 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -19,6 +19,7 @@ #include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> #include <zenutil/cache/rpcrecording.h> +#include <zenutil/openprocesscache.h> #include <zenutil/packageformat.h> #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> @@ -314,6 +315,27 @@ namespace { static_assert(sizeof(OplogIndexHeader) == 64); + static std::uint64_t GetModificationTagFromRawHash(const IoHash& Hash) + { + IoHash::Hasher H; + return H(Hash); + } + + static std::uint64_t GetModificationTagFromModificationTime(IoBuffer FileBuffer) + { + IoBufferFileReference FileRef; + if (FileBuffer.GetFileReference(FileRef)) + { + std::error_code Ec; + uint64_t ModificationTick = GetModificationTickFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + return ModificationTick; + } + } + return {}; + } + } // namespace ////////////////////////////////////////////////////////////////////////// @@ -1920,19 +1942,27 @@ ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash) } bool -ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes, - const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool, - uint64_t LargeSizeLimit) -{ - return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool, LargeSizeLimit); +ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes, + bool IncludeModTag, + const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) +{ + return m_CidStore.IterateChunks( + RawHashes, + [&](size_t Index, const IoBuffer& Payload) { + return AsyncCallback(Index, Payload, IncludeModTag ? GetModificationTagFromRawHash(RawHashes[Index]) : 0); + }, + OptionalWorkerPool, + LargeSizeLimit); } bool -ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, - const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool, - uint64_t LargeSizeLimit) +ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, + bool IncludeModTag, + const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit) { std::vector<size_t> CidChunkIndexes; std::vector<IoHash> CidChunkHashes; @@ -1960,12 +1990,6 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } } } - m_CidStore.IterateChunks( - CidChunkHashes, - [&](size_t Index, const IoBuffer& Payload) { return AsyncCallback(CidChunkIndexes[Index], Payload); }, - OptionalWorkerPool, - LargeSizeLimit); - if (OptionalWorkerPool) { std::atomic_bool Result = true; @@ -1979,7 +2003,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } WorkLatch.AddCount(1); OptionalWorkerPool->ScheduleWork( - [this, &WorkLatch, &ChunkIds, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() { + [this, &WorkLatch, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); if (Result.load() == false) { @@ -1995,7 +2019,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[ChunkIndex], FilePath); } - if (!AsyncCallback(FileChunkIndex, Payload)) + if (!AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0)) { Result.store(false); } @@ -2012,6 +2036,18 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, }); } + if (!CidChunkHashes.empty()) + { + m_CidStore.IterateChunks( + CidChunkHashes, + [&](size_t Index, const IoBuffer& Payload) { + size_t CidChunkIndex = CidChunkIndexes[Index]; + return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); + }, + OptionalWorkerPool, + LargeSizeLimit); + } + WorkLatch.CountDown(); WorkLatch.Wait(); @@ -2019,6 +2055,18 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } else { + if (!CidChunkHashes.empty()) + { + m_CidStore.IterateChunks( + CidChunkHashes, + [&](size_t Index, const IoBuffer& Payload) { + size_t CidChunkIndex = CidChunkIndexes[Index]; + return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); + }, + OptionalWorkerPool, + LargeSizeLimit); + } + for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) { size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; @@ -2026,7 +2074,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); if (Payload) { - bool Result = AsyncCallback(FileChunkIndex, Payload); + bool Result = AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0); if (!Result) { return false; @@ -2038,7 +2086,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } IoBuffer -ProjectStore::Oplog::FindChunk(const Oid& ChunkId) +ProjectStore::Oplog::FindChunk(const Oid& ChunkId, uint64_t* OptOutModificationTag) { RwLock::SharedLockScope OplogLock(m_OplogLock); if (!m_Storage) @@ -2051,7 +2099,12 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId) IoHash ChunkHash = ChunkIt->second; OplogLock.ReleaseNow(); - return m_CidStore.FindChunkByCid(ChunkHash); + IoBuffer Result = m_CidStore.FindChunkByCid(ChunkHash); + if (Result && OptOutModificationTag != nullptr) + { + *OptOutModificationTag = GetModificationTagFromRawHash(ChunkHash); + } + return Result; } if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) @@ -2065,6 +2118,10 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId) { ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkId, FilePath); } + else if (OptOutModificationTag != nullptr) + { + *OptOutModificationTag = GetModificationTagFromModificationTime(Result); + } return Result; } @@ -2073,7 +2130,12 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId) IoHash ChunkHash = MetaIt->second; OplogLock.ReleaseNow(); - return m_CidStore.FindChunkByCid(ChunkHash); + IoBuffer Result = m_CidStore.FindChunkByCid(ChunkHash); + if (Result && OptOutModificationTag != nullptr) + { + *OptOutModificationTag = GetModificationTagFromRawHash(ChunkHash); + } + return Result; } return {}; @@ -2109,7 +2171,7 @@ ProjectStore::Oplog::GetAllChunksInfo() for (ChunkInfo& Info : InfoArray) { - if (IoBuffer Chunk = FindChunk(Info.ChunkId)) + if (IoBuffer Chunk = FindChunk(Info.ChunkId, nullptr)) { Info.ChunkSize = Chunk.GetSize(); } @@ -3032,7 +3094,7 @@ ProjectStore::Project::WriteAccessTimes() { using namespace std::literals; - CbObjectWriter Writer; + CbObjectWriter Writer(32 + (m_LastAccessTimes.size() * 16)); { RwLock::SharedLockScope _(m_LastAccessTimesLock); @@ -3557,11 +3619,17 @@ ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const ////////////////////////////////////////////////////////////////////////// -ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue, const Configuration& Config) +ProjectStore::ProjectStore(CidStore& Store, + std::filesystem::path BasePath, + GcManager& Gc, + JobQueue& JobQueue, + OpenProcessCache& InOpenProcessCache, + const Configuration& Config) : m_Log(logging::Get("project")) , m_Gc(Gc) , m_CidStore(Store) , m_JobQueue(JobQueue) +, m_OpenProcessCache(InOpenProcessCache) , m_ProjectBasePath(BasePath) , m_Config(Config) , m_DiskWriteBlocker(Gc.GetDiskWriteBlocker()) @@ -3972,7 +4040,8 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, FoundLog->IterateChunks( Ids, - [&](size_t Index, const IoBuffer& Payload) { + false, + [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) { try { if (Payload) @@ -4033,7 +4102,8 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, 256u * 1024u); } - CbObjectWriter Response; + CbObjectWriter Response(64u + Count * ((WantsIdField ? (5 + sizeof(Oid::OidBits)) : 0) + (WantsServerPathField ? 64 : 0) + + (WantsClientPathField ? 64 : 0) + (WantsSizeField ? 16 : 0) + (WantsRawSizeField ? 16 : 0))); Response.BeginArray("files"sv); for (size_t Index = 0; Index < Count; Index++) { @@ -4102,7 +4172,16 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, std::vector<uint64_t> RawSizes; std::vector<uint64_t> Sizes; - size_t Count = 0; + size_t Count = 0; + size_t EstimatedCount = FoundLog->OplogCount(); + if (WantsIdField) + { + Ids.reserve(EstimatedCount); + } + if (WantsRawHashField || WantsRawSizeField || WantsSizeField) + { + Hashes.reserve(EstimatedCount); + } FoundLog->IterateChunkMap([&](const Oid& Id, const IoHash& Hash) { if (WantsIdField) { @@ -4129,7 +4208,8 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); (void)FoundLog->IterateChunks( Hashes, - [&](size_t Index, const IoBuffer& Payload) -> bool { + false, + [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) -> bool { try { if (Payload) @@ -4190,7 +4270,9 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, 256u * 1024u); } - CbObjectWriter Response; + CbObjectWriter Response(64u + Count * ((WantsIdField ? (5 + sizeof(Oid::OidBits)) : 0) + + (WantsRawHashField ? (10 + sizeof(IoHash::Hash)) : 0) + (WantsSizeField ? 16 : 0) + + (WantsRawSizeField ? 16 : 0))); Response.BeginArray("chunkinfos"sv); for (size_t Index = 0; Index < Count; Index++) @@ -4250,7 +4332,7 @@ ProjectStore::GetChunkInfo(const std::string_view ProjectId, const Oid Obj = Oid::FromHexString(ChunkId); - IoBuffer Chunk = FoundLog->FindChunk(Obj); + IoBuffer Chunk = FoundLog->FindChunk(Obj, nullptr); if (!Chunk) { return {HttpResponseCode::NotFound, {}}; @@ -4284,7 +4366,8 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, uint64_t Size, ZenContentType AcceptType, CompositeBuffer& OutChunk, - ZenContentType& OutContentType) + ZenContentType& OutContentType, + uint64_t* OptionalInOutModificationTag) { if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) { @@ -4293,41 +4376,19 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, const Oid Obj = Oid::FromHexString(ChunkId); - return GetChunkRange(ProjectId, OplogId, Obj, Offset, Size, AcceptType, OutChunk, OutContentType); + return GetChunkRange(ProjectId, OplogId, Obj, Offset, Size, AcceptType, OutChunk, OutContentType, OptionalInOutModificationTag); } -std::pair<HttpResponseCode, std::string> -ProjectStore::GetChunkRange(const std::string_view ProjectId, - const std::string_view OplogId, - Oid ChunkId, - uint64_t Offset, - uint64_t Size, - ZenContentType AcceptType, - CompositeBuffer& OutChunk, - ZenContentType& OutContentType) +static std::pair<HttpResponseCode, std::string> +ExtractRange(IoBuffer&& Chunk, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + ZenContentType& OutContentType, + CompositeBuffer& OutChunk, + IoHash& OutRawHash, + uint64_t& OutRawSize) { - bool IsOffset = Offset != 0 || Size != ~(0ull); - - Ref<ProjectStore::Project> Project = OpenProject(ProjectId); - if (!Project) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; - } - Project->TouchProject(); - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); - if (!FoundLog) - { - return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; - } - Project->TouchOplog(OplogId); - - IoBuffer Chunk = FoundLog->FindChunk(ChunkId); - if (!Chunk) - { - return {HttpResponseCode::NotFound, {}}; - } - OutContentType = Chunk.GetContentType(); if (OutContentType == ZenContentType::kCompressedBinary) @@ -4335,26 +4396,47 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); - ZEN_ASSERT(!Compressed.IsNull()); + if (!Compressed) + { + return {HttpResponseCode::InternalServerError, "malformed compressed buffer"}; + } + + const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == RawSize)); - if (IsOffset) + if (IsFullRange) + { + if (AcceptType == ZenContentType::kBinary) + { + OutChunk = Compressed.DecompressToComposite(); + OutContentType = ZenContentType::kBinary; + } + else + { + OutChunk = Compressed.GetCompressed(); + } + OutRawSize = 0; + } + else { if (Size == ~(0ull) || (Offset + Size) > RawSize) { - Size = RawSize - Offset; + if (Offset < RawSize) + { + Size = RawSize - Offset; + } + else + { + Size = 0; + } } if (Size == 0) { - return { - HttpResponseCode::NotFound, - fmt::format("Chunk request for range outside of chunk '{}/{}'. Request: Chunk: {}, Offset: {}, Size: {}, ChunkSize: {}", - ProjectId, - OplogId, - ChunkId, - Offset, - Size, - RawSize)}; + return {HttpResponseCode::NotFound, + fmt::format("Chunk request for range outside of compressed chunk. Offset: {}, Size: {}, ChunkSize: {}", + Offset, + Size, + RawSize)}; } if (AcceptType == ZenContentType::kBinary) @@ -4368,47 +4450,93 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, // The client will have to compensate for any offsets that do not land on an even block size multiple OutChunk = Compressed.GetRange(Offset, Size).GetCompressed(); } + OutRawSize = RawSize; + } + OutRawHash = RawHash; + } + else + { + const uint64_t ChunkSize = Chunk.GetSize(); + + const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == ChunkSize)); + if (IsFullRange) + { + OutChunk = CompositeBuffer(SharedBuffer(std::move(Chunk))); + OutRawSize = 0; } else { - if (AcceptType == ZenContentType::kBinary) + if (Size == ~(0ull) || (Offset + Size) > ChunkSize) { - OutChunk = Compressed.DecompressToComposite(); - OutContentType = ZenContentType::kBinary; + if (Offset < ChunkSize) + { + Size = ChunkSize - Offset; + } + else + { + Size = 0; + } } - else + + if (Size == 0) { - OutChunk = Compressed.GetCompressed(); + return {HttpResponseCode::NotFound, + fmt::format("Chunk request for range outside of chunk. Offset: {}, Size: {}, ChunkSize: {}", Offset, Size, Size)}; } + + OutChunk = CompositeBuffer(SharedBuffer(IoBuffer(std::move(Chunk), Offset, Size))); + OutRawSize = ChunkSize; } } - else if (IsOffset) + return {HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::GetChunkRange(const std::string_view ProjectId, + const std::string_view OplogId, + Oid ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + CompositeBuffer& OutChunk, + ZenContentType& OutContentType, + uint64_t* OptionalInOutModificationTag) +{ + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) { - if (Size == ~(0ull) || (Offset + Size) > Chunk.GetSize()) - { - Size = Chunk.GetSize() - Offset; - } + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)}; + } + Project->TouchProject(); - if (Size == 0) - { - return {HttpResponseCode::NotFound, - fmt::format("Chunk request for range outside of chunk '{}/{}'. Request: Chunk: {}, Offset: {}, Size: {}, ChunkSize: {}", - ProjectId, - OplogId, - ChunkId, - Offset, - Size, - Chunk.GetSize())}; - } + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + Project->TouchOplog(OplogId); - OutChunk = CompositeBuffer(SharedBuffer(IoBuffer(std::move(Chunk), Offset, Size))); + uint64_t OldTag = OptionalInOutModificationTag == nullptr ? 0 : *OptionalInOutModificationTag; + IoBuffer Chunk = FoundLog->FindChunk(ChunkId, OptionalInOutModificationTag); + if (!Chunk) + { + return {HttpResponseCode::NotFound, {}}; } - else + if (OptionalInOutModificationTag != nullptr && OldTag == *OptionalInOutModificationTag) { - OutChunk = CompositeBuffer(SharedBuffer(std::move(Chunk))); + return {HttpResponseCode::NotModified, {}}; } - return {HttpResponseCode::OK, {}}; + IoHash _; + uint64_t __; + std::pair<HttpResponseCode, std::string> Result = + ExtractRange(std::move(Chunk), Offset, Size, AcceptType, OutContentType, OutChunk, /*OutRawHash*/ _, /*OutRawSize*/ __); + if (Result.first != HttpResponseCode::OK) + { + return {Result.first, + fmt::format("Chunk request for chunk {} in {}/{} failed. Reason: '{}'", ChunkId, ProjectId, OplogId, Result.second)}; + } + return Result; } std::pair<HttpResponseCode, std::string> @@ -4416,7 +4544,8 @@ ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view Cid, ZenContentType AcceptType, - IoBuffer& OutChunk) + IoBuffer& OutChunk, + uint64_t* OptionalInOutModificationTag) { Ref<ProjectStore::Project> Project = OpenProject(ProjectId); if (!Project) @@ -4445,6 +4574,16 @@ ProjectStore::GetChunk(const std::string_view ProjectId, return {HttpResponseCode::NotFound, fmt::format("chunk - '{}' MISSING", Cid)}; } + if (OptionalInOutModificationTag != nullptr) + { + uint64_t OldTag = *OptionalInOutModificationTag; + *OptionalInOutModificationTag = GetModificationTagFromRawHash(Hash); + if (*OptionalInOutModificationTag == OldTag) + { + return {HttpResponseCode::NotModified, {}}; + } + } + if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk)); @@ -4504,6 +4643,354 @@ ProjectStore::PutChunk(const std::string_view ProjectId, } std::pair<HttpResponseCode, std::string> +ProjectStore::GetChunks(const std::string_view ProjectId, + const std::string_view OplogId, + const CbObject& RequestObject, + CbPackage& OutResponsePackage) +{ + ZEN_TRACE_CPU("Store::GetChunks"); + + using namespace std::literals; + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("getchunks rpc request for unknown project '{}'", ProjectId)}; + } + Project->TouchProject(); + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false); + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("getchunks rpc request for unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + Project->TouchOplog(OplogId); + + if (RequestObject["chunks"sv].IsArray()) + { + // Legacy full chunks only by rawhash + + CbArrayView ChunksArray = RequestObject["chunks"sv].AsArrayView(); + + CbObjectWriter ResponseWriter; + ResponseWriter.BeginArray("chunks"sv); + for (CbFieldView FieldView : ChunksArray) + { + IoHash RawHash = FieldView.AsHash(); + IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash); + if (ChunkBuffer) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)); + if (Compressed) + { + ResponseWriter.AddHash(RawHash); + OutResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); + } + else + { + ZEN_WARN("oplog '{}/{}': invalid compressed binary in cas store for {}", ProjectId, OplogId, RawHash); + } + } + } + ResponseWriter.EndArray(); + OutResponsePackage.SetObject(ResponseWriter.Save()); + return {HttpResponseCode::OK, {}}; + } + else if (auto RequestFieldView = RequestObject["Request"sv]; RequestFieldView.IsObject()) + { + CbObjectView RequestView = RequestFieldView.AsObjectView(); + bool SkipData = RequestView["SkipData"].AsBool(false); + CbArrayView ChunksArray = RequestView["Chunks"sv].AsArrayView(); + + struct Request + { + struct InputData + { + uint64_t Offset = 0; + uint64_t Size = (uint64_t)-1; + std::variant<IoHash, Oid> Id; + std::optional<uint64_t> ModTag; + } Input; + struct OutputData + { + bool Exists = false; + IoBuffer ChunkBuffer; + uint64_t ModTag = 0; + } Output; + }; + + std::vector<Request> Requests; + size_t RequestCount = ChunksArray.Num(); + if (RequestCount > 0) + { + Requests.reserve(RequestCount); + std::vector<IoHash> ChunkRawHashes; + std::vector<size_t> ChunkRawHashesRequestIndex; + std::vector<Oid> ChunkIds; + std::vector<size_t> ChunkIdsRequestIndex; + bool DoBatch = RequestCount > 1; + if (DoBatch) + { + ChunkRawHashes.reserve(RequestCount); + ChunkRawHashesRequestIndex.reserve(RequestCount); + ChunkIds.reserve(RequestCount); + ChunkIdsRequestIndex.reserve(RequestCount); + } + for (CbFieldView FieldView : ChunksArray) + { + CbObjectView ChunkObject = FieldView.AsObjectView(); + Request ChunkRequest = { + .Input{.Offset = ChunkObject["Offset"sv].AsUInt64(0), .Size = ChunkObject["Size"sv].AsUInt64((uint64_t)-1)}}; + if (CbFieldView InputModificationTagView = ChunkObject.FindView("ModTag"); InputModificationTagView.IsInteger()) + { + ChunkRequest.Input.ModTag = InputModificationTagView.AsUInt64(); + } + if (CbFieldView RawHashView = ChunkObject.FindView("RawHash"sv); RawHashView.IsHash()) + { + const IoHash ChunkHash = RawHashView.AsHash(); + ChunkRequest.Input.Id = ChunkHash; + if (DoBatch) + { + ChunkRawHashes.push_back(ChunkHash); + ChunkRawHashesRequestIndex.push_back(Requests.size()); + } + } + else if (CbFieldView IdView = ChunkObject.FindView("Oid"sv); IdView.IsObjectId()) + { + const Oid ChunkId = IdView.AsObjectId(); + ChunkRequest.Input.Id = ChunkId; + if (DoBatch) + { + ChunkIds.push_back(ChunkId); + ChunkIdsRequestIndex.push_back(Requests.size()); + } + } + else + { + return {HttpResponseCode::BadRequest, + fmt::format("oplog '{}/{}': malformed getchunks rpc request object, chunk request has no identifier", + ProjectId, + OplogId)}; + } + Requests.emplace_back(std::move(ChunkRequest)); + } + + if (DoBatch) + { + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); + if (!ChunkRawHashes.empty()) + { + FoundLog->IterateChunks( + ChunkRawHashes, + true, + [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { + if (Payload) + { + size_t RequestIndex = ChunkRawHashesRequestIndex[Index]; + Requests[RequestIndex].Output.Exists = true; + if (!SkipData) + { + Requests[RequestIndex].Output.ChunkBuffer = Payload; + Requests[RequestIndex].Output.ChunkBuffer.MakeOwned(); + } + Requests[RequestIndex].Output.ModTag = ModTag; + } + return true; + }, + &WorkerPool, + 8u * 1024); + } + if (!ChunkIdsRequestIndex.empty()) + { + FoundLog->IterateChunks( + ChunkIds, + true, + [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool { + if (Payload) + { + size_t RequestIndex = ChunkIdsRequestIndex[Index]; + Requests[RequestIndex].Output.Exists = true; + if (!SkipData) + { + Requests[RequestIndex].Output.ChunkBuffer = Payload; + Requests[RequestIndex].Output.ChunkBuffer.MakeOwned(); + } + Requests[RequestIndex].Output.ModTag = ModTag; + } + return true; + }, + &WorkerPool, + 8u * 1024); + } + } + else + { + Request& ChunkRequest = Requests.front(); + if (ChunkRequest.Input.Id.index() == 0) + { + const IoHash& ChunkHash = std::get<IoHash>(ChunkRequest.Input.Id); + IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash); + if (Payload) + { + ChunkRequest.Output.Exists = true; + ChunkRequest.Output.ModTag = GetModificationTagFromRawHash(ChunkHash); + if (!SkipData) + { + ChunkRequest.Output.ChunkBuffer = Payload; + } + } + } + else + { + const Oid& ChunkId = std::get<Oid>(ChunkRequest.Input.Id); + uint64_t ModTag = 0; + IoBuffer Payload = FoundLog->FindChunk(ChunkId, &ModTag); + if (Payload) + { + ChunkRequest.Output.Exists = true; + ChunkRequest.Output.ModTag = ModTag; + if (!SkipData) + { + ChunkRequest.Output.ChunkBuffer = Payload; + } + } + } + } + } + + CbObjectWriter ResponseWriter(32 + Requests.size() * 64u); + ResponseWriter.BeginArray("Chunks"sv); + { + for (Request& ChunkRequest : Requests) + { + if (ChunkRequest.Output.Exists) + { + ResponseWriter.BeginObject(); + { + if (ChunkRequest.Input.Id.index() == 0) + { + const IoHash& RawHash = std::get<IoHash>(ChunkRequest.Input.Id); + ResponseWriter.AddHash("Id", RawHash); + } + else + { + const Oid& Id = std::get<Oid>(ChunkRequest.Input.Id); + ResponseWriter.AddObjectId("Id", Id); + } + if (!ChunkRequest.Input.ModTag.has_value() || ChunkRequest.Input.ModTag.value() != ChunkRequest.Output.ModTag) + { + ResponseWriter.AddInteger("ModTag", ChunkRequest.Output.ModTag); + if (!SkipData) + { + CompositeBuffer ChunkRange; + ZenContentType ContentType; + IoHash FullChunkRawHash; + uint64_t FullChunkSize = 0; + auto ExtractRangeResult = ExtractRange(std::move(ChunkRequest.Output.ChunkBuffer), + ChunkRequest.Input.Offset, + ChunkRequest.Input.Size, + ZenContentType::kCompressedBinary, + ContentType, + ChunkRange, + FullChunkRawHash, + FullChunkSize); + if (ExtractRangeResult.first == HttpResponseCode::OK) + { + if (ContentType == ZenContentType::kCompressedBinary) + { + ZEN_ASSERT(FullChunkRawHash != IoHash::Zero); + CompressedBuffer CompressedValue = + CompressedBuffer::FromCompressedNoValidate(std::move(ChunkRange)); + ZEN_ASSERT(CompressedValue); + + if (FullChunkSize != 0) + { + // This really could use some thought so we don't send the same data if we get a request for + // multiple ranges from the same chunk block + + uint64_t FragmentRawOffset = 0; + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize = 0; + if (CompressedValue.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + if (BlockSize > 0) + { + FragmentRawOffset = (ChunkRequest.Input.Offset / BlockSize) * BlockSize; + } + else + { + FragmentRawOffset = ChunkRequest.Input.Offset; + } + uint64_t FragmentRawLength = CompressedValue.DecodeRawSize(); + + IoHashStream FragmentHashStream; + FragmentHashStream.Append(FullChunkRawHash.Hash, sizeof(FullChunkRawHash.Hash)); + FragmentHashStream.Append(&FragmentRawOffset, sizeof(FragmentRawOffset)); + FragmentHashStream.Append(&FragmentRawLength, sizeof(FragmentRawLength)); + IoHash FragmentHash = FragmentHashStream.GetHash(); + + ResponseWriter.AddHash("FragmentHash", FragmentHash); + ResponseWriter.AddInteger("FragmentOffset", FragmentRawOffset); + ResponseWriter.AddInteger("RawSize", FullChunkSize); + OutResponsePackage.AddAttachment(CbAttachment(CompressedValue, FragmentHash)); + } + else + { + std::string ErrorString = + "Failed to get compression parameters from partial compressed buffer"; + ResponseWriter.AddString("Error", ErrorString); + ZEN_WARN("oplog '{}/{}': {}", ProjectId, OplogId, ErrorString); + } + } + else + { + ResponseWriter.AddHash("RawHash"sv, FullChunkRawHash); + OutResponsePackage.AddAttachment(CbAttachment(std::move(CompressedValue), FullChunkRawHash)); + } + } + else + { + IoHashStream HashStream; + ZEN_ASSERT(ChunkRequest.Input.Id.index() == 1); + const Oid& Id = std::get<Oid>(ChunkRequest.Input.Id); + HashStream.Append(Id.OidBits, sizeof(Id.OidBits)); + HashStream.Append(&ChunkRequest.Input.Offset, sizeof(ChunkRequest.Input.Offset)); + HashStream.Append(&ChunkRequest.Input.Size, sizeof(ChunkRequest.Input.Size)); + IoHash Hash = HashStream.GetHash(); + + ResponseWriter.AddHash("Hash"sv, Hash); + if (FullChunkSize != 0) + { + ResponseWriter.AddInteger("Size", FullChunkSize); + } + OutResponsePackage.AddAttachment(CbAttachment(std::move(ChunkRange), Hash)); + } + } + else + { + std::string ErrorString = fmt::format("Failed fetchiong chunk range ({})", ExtractRangeResult.second); + ResponseWriter.AddString("Error", ErrorString); + ZEN_WARN("oplog '{}/{}': {}", ProjectId, OplogId, ErrorString); + } + } + } + } + ResponseWriter.EndObject(); + } + } + } + ResponseWriter.EndArray(); + OutResponsePackage.SetObject(ResponseWriter.Save()); + return {HttpResponseCode::OK, {}}; + } + else + { + return {HttpResponseCode::BadRequest, fmt::format("oplog '{}/{}': malformed getchunks rpc request object", ProjectId, OplogId)}; + } +} + +std::pair<HttpResponseCode, std::string> ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, CbObject& OutResponse) { ZEN_TRACE_CPU("Store::WriteOplog"); @@ -4570,7 +5057,7 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie return ConvertResult(RemoteResult); } - CbObjectWriter Cbo; + CbObjectWriter Cbo(1 + 1 + 5 + Attachments.size() * (1 + sizeof(IoHash::Hash)) + 1); Cbo.BeginArray("need"); { for (const IoHash& Hash : Attachments) @@ -4767,34 +5254,33 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, else if (Method == "getchunks"sv) { ZEN_TRACE_CPU("Store::Rpc::getchunks"); - CbPackage ResponsePackage; + + RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u)); + int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0); + + CbPackage ResponsePackage; + std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage); + if (Result.first == HttpResponseCode::OK) { - CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView(); - CbObjectWriter ResponseWriter; - ResponseWriter.BeginArray("chunks"sv); - for (CbFieldView FieldView : ChunksArray) + void* TargetProcessHandle = nullptr; + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) { - IoHash RawHash = FieldView.AsHash(); - IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash); - if (ChunkBuffer) + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)); - if (Compressed) - { - ResponseWriter.AddHash(RawHash); - ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash)); - } - else - { - ZEN_WARN("oplog '{}/{}': invalid compressed binary in cas store for {}", ProjectId, OplogId, RawHash); - } + Flags |= FormatFlags::kDenyPartialLocalReferences; } + TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId); } - ResponseWriter.EndArray(); - ResponsePackage.SetObject(ResponseWriter.Save()); + + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle); + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); + } + else + { + HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault); - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); return true; } else if (Method == "putchunks"sv) @@ -4927,7 +5413,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, ++TotalFiles; // Rewrite file array entry with new data reference - CbObjectWriter Writer; + CbObjectWriter Writer(View.GetSize()); RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { if (Field.GetName() == "data"sv) { @@ -6012,7 +6498,7 @@ namespace testutils { return OidStringBuilder.ToString(); } - CbPackage CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) + CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) { CbPackage Package; CbObjectWriter Object; @@ -6038,16 +6524,43 @@ namespace testutils { return Package; }; + CbPackage CreateFilesOplogPackage(const Oid& Id, + const std::filesystem::path ProjectRootDir, + const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments) + { + CbPackage Package; + CbObjectWriter Object; + Object << "key"sv << OidAsString(Id); + if (!Attachments.empty()) + { + Object.BeginArray("files"); + for (const auto& Attachment : Attachments) + { + std::filesystem::path ServerPath = std::filesystem::relative(Attachment.second, ProjectRootDir); + std::filesystem::path ClientPath = ServerPath; // dummy + Object.BeginObject(); + Object << "id"sv << Attachment.first; + Object << "serverpath"sv << ServerPath.string(); + Object << "clientpath"sv << ClientPath.string(); + Object.EndObject(); + } + Object.EndArray(); + } + Package.SetObject(Object.Save()); + return Package; + }; + std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments( const std::span<const size_t>& Sizes, - OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast) + OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, + uint64_t BlockSize = 0) { std::vector<std::pair<Oid, CompressedBuffer>> Result; Result.reserve(Sizes.size()); for (size_t Size : Sizes) { CompressedBuffer Compressed = - CompressedBuffer::Compress(SharedBuffer(CreateRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel); + CompressedBuffer::Compress(SharedBuffer(CreateSemiRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel, BlockSize); Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); } return Result; @@ -6069,6 +6582,63 @@ namespace testutils { return 0; } + template<typename ChunkType> + CbObject BuildChunksRequest(bool SkipData, + std::string_view IdName, + const std::vector<ChunkType>& Chunks, + const std::vector<std::pair<size_t, size_t>>& Ranges, + const std::vector<uint64_t>& ModTags) + { + CbObjectWriter Request; + Request.BeginObject("Request"sv); + { + if (SkipData) + { + Request.AddBool("SkipData"sv, true); + } + if (!Chunks.empty()) + { + Request.BeginArray("Chunks"); + for (size_t Index = 0; Index < Chunks.size(); Index++) + { + Request.BeginObject(); + { + Request << IdName << Chunks[Index]; + if (!ModTags.empty()) + { + Request << "ModTag" << ModTags[Index]; + } + if (!Ranges.empty()) + { + Request << "Offset" << Ranges[Index].first; + Request << "Size" << Ranges[Index].second; + } + } + Request.EndObject(); + } + Request.EndArray(); + } + } + Request.EndObject(); + return Request.Save(); + }; + + CbObject BuildChunksRequest(bool SkipData, + const std::vector<Oid>& Chunks, + const std::vector<std::pair<size_t, size_t>>& Ranges, + const std::vector<uint64_t>& ModTags) + { + return BuildChunksRequest<Oid>(SkipData, "Oid", Chunks, Ranges, ModTags); + } + + CbObject BuildChunksRequest(bool SkipData, + const std::vector<IoHash>& Chunks, + const std::vector<std::pair<size_t, size_t>>& Ranges, + const std::vector<uint64_t>& ModTags) + { + return BuildChunksRequest<IoHash>(SkipData, "RawHash", Chunks, Ranges, ModTags); + } + } // namespace testutils TEST_CASE("project.store.create") @@ -6078,6 +6648,7 @@ TEST_CASE("project.store.create") ScopedTemporaryDirectory TempDir; auto JobQueue = MakeJobQueue(1, ""sv); + OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; @@ -6085,7 +6656,7 @@ TEST_CASE("project.store.create") std::string_view ProjectName("proj1"sv); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -6108,13 +6679,14 @@ TEST_CASE("project.store.lifetimes") ScopedTemporaryDirectory TempDir; auto JobQueue = MakeJobQueue(1, ""sv); + OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -6170,13 +6742,14 @@ TEST_CASE_TEMPLATE("project.store.export", ScopedTemporaryDirectory ExportDir; auto JobQueue = MakeJobQueue(1, ""sv); + OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -6191,13 +6764,14 @@ TEST_CASE_TEMPLATE("project.store.export", ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {}); CHECK(Oplog != nullptr); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); Oplog->AppendNewOplogEntry( - CreateOplogPackage(Oid::NewOid(), - CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None))); + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( + Oid::NewOid(), + CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None))); FileRemoteStoreOptions Options = { RemoteStoreOptions{.MaxBlockSize = 64u * 1024, .MaxChunkEmbedSize = 32 * 1024u, .ChunkFileSizeLimit = 64u * 1024u}, @@ -6271,13 +6845,14 @@ TEST_CASE("project.store.gc") ScopedTemporaryDirectory TempDir; auto JobQueue = MakeJobQueue(1, ""sv); + OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; @@ -6325,10 +6900,11 @@ TEST_CASE("project.store.gc") ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", Project1OplogPath); CHECK(Oplog != nullptr); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); } { @@ -6342,21 +6918,23 @@ TEST_CASE("project.store.gc") ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog2", Project2Oplog1Path); CHECK(Oplog != nullptr); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177}))); Oplog->AppendNewOplogEntry( - CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9123, 383, 590, 96}))); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221}))); + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9123, 383, 590, 96}))); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221}))); } { ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog3", Project2Oplog2Path); CHECK(Oplog != nullptr); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {})); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{137}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{137}))); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9723, 683, 594, 98}))); Oplog->AppendNewOplogEntry( - CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9723, 683, 594, 98}))); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{531, 271}))); + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{531, 271}))); } } @@ -6468,13 +7046,14 @@ TEST_CASE("project.store.gc.prep") ScopedTemporaryDirectory TempDir; auto JobQueue = MakeJobQueue(1, ""sv); + OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; @@ -6507,7 +7086,7 @@ TEST_CASE("project.store.gc.prep") Project1RootDir.string(), Project1FilePath.string())); ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), OpAttachments)); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); } { Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); @@ -6538,7 +7117,7 @@ TEST_CASE("project.store.gc.prep") // Make sure the chunks are stored but not the referencing op Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), OpAttachments)); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); Project1->DeleteOplog("oplog1"sv); } { @@ -6585,7 +7164,7 @@ TEST_CASE("project.store.gc.prep") { Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); ProjectStore::Oplog* Oplog = Project1->OpenOplog("oplog1"sv, true, true); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), OpAttachments)); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); Oplog->RemovePendingChunkReferences(OpChunkHashes); CHECK(Oplog->GetPendingChunkReferencesLocked().size() == 0); } @@ -6619,7 +7198,7 @@ TEST_CASE("project.store.gc.prep") // Make sure the chunks are stored but not the referencing op Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath); - Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), OpAttachments)); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments)); Project1->DeleteOplog("oplog1"sv); } @@ -6674,6 +7253,928 @@ TEST_CASE("project.store.gc.prep") } } +TEST_CASE("project.store.rpc.getchunks") +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + auto JobQueue = MakeJobQueue(1, ""sv); + OpenProcessCache ProcessCache; + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + std::filesystem::path RootDir = TempDir.Path() / "root"sv; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; + + std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; + std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; + { + CreateDirectories(Project1FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); + } + + std::vector<Oid> OpIds; + OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; + Oid FilesOpId = Oid::NewOid(); + std::vector<std::pair<Oid, std::filesystem::path>> FilesOpIdAttachments; + { + Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + Project1RootDir.string(), + Project1FilePath.string())); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {}); + CHECK(Oplog != nullptr); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); + Attachments[OpIds[2]] = + CreateAttachments(std::initializer_list<size_t>{200 * 1024, 314 * 1024, 690, 99}, OodleCompressionLevel::VeryFast, 128 * 1024); + Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122}); + for (auto It : Attachments) + { + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(It.first, It.second)); + } + + std::filesystem::path UncompressedFilePath = RootDir / "content" / "uncompressed_file"; + CreateDirectories(UncompressedFilePath.parent_path()); + IoBuffer FileBlob = CreateRandomBlob(81823 * 2); + WriteFile(UncompressedFilePath, FileBlob); + FilesOpIdAttachments.push_back({Oid::NewOid(), UncompressedFilePath}); + Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(FilesOpId, RootDir, FilesOpIdAttachments)); + } + + // Invalid request + { + CbObjectWriter Request; + Request.BeginObject("WrongName"sv); + Request.EndObject(); + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, "oplog1"sv, Request.Save(), Response); + CHECK_EQ(HttpResponseCode::BadRequest, Result.first); + } + + // Empty request + { + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, std::vector<IoHash>{}, {}, {}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(0, Chunks.Num()); + } + // Single non-existing chunk by RawHash + IoHash NotFoundIoHash = IoHash::Max; + { + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundIoHash}, {}, {}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(0, Chunks.Num()); + } + { + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundIoHash}, {}, {}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(0, Chunks.Num()); + } + // Single non-existing chunk by Id + Oid NotFoundId = Oid::NewOid(); + { + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundId}, {}, {}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(0, Chunks.Num()); + } + { + CbPackage Response; + auto Result = + ProjectStore.GetChunks("proj1"sv, "oplog1"sv, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundId}, {}, {}), Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(0, Chunks.Num()); + } + // Single existing chunk by RawHash + { + // Fresh fetch + IoHash FirstAttachmentHash = Attachments[OpIds[2]][1].second.DecodeRawHash(); + uint64_t ResponseModTag = 0; + { + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {}), + Response); + + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + IoHash Id = Chunk["Id"].AsHash(); + CHECK_EQ(FirstAttachmentHash, Id); + ResponseModTag = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTag); + IoHash AttachmentHash = Chunk["RawHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Fetch with matching ModTag + { + CbPackage Response; + auto Result = + ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {ResponseModTag}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + IoHash Id = Chunk["Id"].AsHash(); + CHECK_EQ(FirstAttachmentHash, Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Fetch with mismatching ModTag + { + CbPackage Response; + auto Result = ProjectStore.GetChunks( + "proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}), + Response); + + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + IoHash Id = Chunk["Id"].AsHash(); + CHECK_EQ(FirstAttachmentHash, Id); + ResponseModTag = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTag); + IoHash AttachmentHash = Chunk["RawHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Fresh modtime query + { + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + IoHash Id = Chunk["Id"].AsHash(); + CHECK_EQ(FirstAttachmentHash, Id); + uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag2); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Modtime query with matching ModTag + { + CbPackage Response; + auto Result = + ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {ResponseModTag}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + IoHash Id = Chunk["Id"].AsHash(); + CHECK_EQ(FirstAttachmentHash, Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Modtime query with mismatching ModTag + { + CbPackage Response; + auto Result = ProjectStore.GetChunks( + "proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + IoHash Id = Chunk["Id"].AsHash(); + CHECK_EQ(FirstAttachmentHash, Id); + uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag2); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + // Single existing CID chunk by Id + { + Oid FirstAttachmentId = Attachments[OpIds[2]][1].first; + uint64_t ResponseModTag = 0; + { + // Full chunk request + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + ResponseModTag = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTag); + IoHash AttachmentHash = Chunk["RawHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + { + // Partial chunk request + CbPackage Response; + auto Result = + ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{130 * 1024, 8100}}, {}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + ResponseModTag = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTag); + IoHash AttachmentHash = Chunk["FragmentHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + std::uint64_t FragmentStart = Chunk["FragmentOffset"].AsUInt64(); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK(FragmentStart <= 130 * 1024); + CHECK(FragmentStart + Buffer.DecodeRawSize() >= 130 * 1024 + 8100); + auto ResponseDecompressedBuffer = Buffer.Decompress(130 * 1024 - FragmentStart, 8100); + auto ExpectedDecompressedBuffer = Attachments[OpIds[2]][1].second.Decompress(130 * 1024, 8100); + CHECK(ResponseDecompressedBuffer.AsIoBuffer().GetView().EqualBytes(ExpectedDecompressedBuffer.AsIoBuffer().GetView())); + CHECK_EQ(Chunk["RawSize"sv].AsUInt64(), Attachments[OpIds[2]][1].second.DecodeRawSize()); + CHECK(!Chunk.FindView("Size")); + } + { + // Fetch with matching ModTag + CbPackage Response; + auto Result = + ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + { + // Fetch with mismatching ModTag + CbPackage Response; + auto Result = ProjectStore.GetChunks( + "proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + uint64_t ResponseModTag3 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag3); + IoHash AttachmentHash = Chunk["RawHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Fresh modtime query + { + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag2); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Modtime query with matching ModTag + { + CbPackage Response; + auto Result = + ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Modtime query with mismatching ModTag + { + CbPackage Response; + auto Result = ProjectStore.GetChunks( + "proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag2); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + + // Single existing file chunk by Id + { + Oid FirstAttachmentId = FilesOpIdAttachments[0].first; + uint64_t ResponseModTag = 0; + { + // Full chunk request + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + ResponseModTag = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTag); + IoHash AttachmentHash = Chunk["Hash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompositeBuffer Buffer = Attachment->AsCompositeBinary(); + CHECK_EQ(IoHash::HashBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten()), IoHash::HashBuffer(Buffer)); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + { + // Partial chunk request + CbPackage Response; + auto Result = + ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{81823, 5434}}, {}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + ResponseModTag = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTag); + + IoHash AttachmentHash = Chunk["Hash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompositeBuffer Buffer = Attachment->AsCompositeBinary(); + CHECK_EQ(IoHash::HashBuffer(IoBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten(), 81823, 5434)), + IoHash::HashBuffer(Buffer)); + CHECK_EQ(Chunk["Size"sv].AsUInt64(), std::filesystem::file_size(FilesOpIdAttachments[0].second)); + CHECK(!Chunk.FindView("RawSize")); + } + { + // Fetch with matching ModTag + CbPackage Response; + auto Result = + ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("Hash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + { + // Fetch with mismatching ModTag + CbPackage Response; + auto Result = ProjectStore.GetChunks( + "proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(1, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + uint64_t ResponseModTag3 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag3); + IoHash AttachmentHash = Chunk["Hash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompositeBuffer Buffer = Attachment->AsCompositeBinary(); + CHECK_EQ(IoHash::HashBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten()), IoHash::HashBuffer(Buffer)); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Fresh modtime query + { + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag2); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Modtime query with matching ModTag + { + CbPackage Response; + auto Result = + ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("Hash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + // Modtime query with mismatching ModTag + { + CbPackage Response; + auto Result = ProjectStore.GetChunks( + "proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}), + Response); + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(1, Chunks.Num()); + CbObjectView Chunk = (*begin(Chunks)).AsObjectView(); + + Oid Id = Chunk["Id"].AsObjectId(); + CHECK_EQ(FirstAttachmentId, Id); + uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64(); + CHECK_EQ(ResponseModTag, ResponseModTag2); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + + // Multi RawHash Request + { + std::vector<CompressedBuffer> AttachmentBuffers{Attachments[OpIds[1]][0].second, + Attachments[OpIds[2]][0].second, + Attachments[OpIds[2]][1].second}; + std::vector<IoHash> AttachmentHashes{AttachmentBuffers[0].DecodeRawHash(), + AttachmentBuffers[1].DecodeRawHash(), + AttachmentBuffers[2].DecodeRawHash()}; + std::vector<uint64_t> ResponseModTags(3, 0); + { + // Fresh fetch + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, {}), + Response); + + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(3, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + IoHash Id = Chunk["Id"].AsHash(); + + auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); + CHECK(It != AttachmentHashes.end()); + ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); + CHECK_EQ(AttachmentHashes[Index], Id); + ResponseModTags[Index] = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTags[Index]); + IoHash AttachmentHash = Chunk["RawHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); + CHECK(AttachmentBuffers[Index].GetCompressed().Flatten().GetView().EqualBytes(Buffer.GetCompressed().Flatten().GetView())); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Fetch with matching ModTag + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, ResponseModTags), + Response); + + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + IoHash Id = Chunk["Id"].AsHash(); + + auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); + CHECK(It != AttachmentHashes.end()); + ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); + CHECK_EQ(AttachmentHashes[Index], Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Fresh modtime query + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, {}), + Response); + + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + IoHash Id = Chunk["Id"].AsHash(); + + auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); + CHECK(It != AttachmentHashes.end()); + ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); + CHECK_EQ(AttachmentHashes[Index], Id); + CHECK_EQ(ResponseModTags[Index], Chunk["ModTag"].AsUInt64()); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Modtime query with matching ModTags + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, ResponseModTags), + Response); + + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + IoHash Id = Chunk["Id"].AsHash(); + + auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); + CHECK(It != AttachmentHashes.end()); + ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); + CHECK_EQ(AttachmentHashes[Index], Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Modtime query with mismatching ModTags + std::vector<uint64_t> MismatchingModTags(ResponseModTags); + for (uint64_t& Tag : MismatchingModTags) + { + Tag++; + } + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, MismatchingModTags), + Response); + + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + IoHash Id = Chunk["Id"].AsHash(); + + auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id); + CHECK(It != AttachmentHashes.end()); + ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It); + CHECK_EQ(AttachmentHashes[Index], Id); + CHECK(Chunk["ModTag"].AsUInt64() == ResponseModTags[Index]); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + } + // Multi Id Request + { + std::vector<CompressedBuffer> AttachmentBuffers{Attachments[OpIds[1]][0].second, + Attachments[OpIds[2]][0].second, + Attachments[OpIds[2]][1].second}; + std::vector<IoHash> AttachmentHashes{AttachmentBuffers[0].DecodeRawHash(), + AttachmentBuffers[1].DecodeRawHash(), + AttachmentBuffers[2].DecodeRawHash()}; + std::vector<Oid> AttachedIds{Attachments[OpIds[1]][0].first, Attachments[OpIds[2]][0].first, Attachments[OpIds[2]][1].first}; + std::vector<uint64_t> ResponseModTags(3, 0); + { + // Fresh fetch + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, {}), + Response); + + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(3, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + Oid Id = Chunk["Id"].AsObjectId(); + + auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); + CHECK(It != AttachedIds.end()); + ptrdiff_t Index = std::distance(AttachedIds.begin(), It); + CHECK_EQ(AttachedIds[Index], Id); + ResponseModTags[Index] = Chunk["ModTag"].AsUInt64(); + CHECK_NE(0, ResponseModTags[Index]); + IoHash AttachmentHash = Chunk["RawHash"].AsHash(); + const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash); + CHECK_NE(nullptr, Attachment); + CompressedBuffer Buffer = Attachment->AsCompressedBinary(); + CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash()); + CHECK(AttachmentBuffers[Index].GetCompressed().Flatten().GetView().EqualBytes(Buffer.GetCompressed().Flatten().GetView())); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Fetch with matching ModTag + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, ResponseModTags), + Response); + + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + Oid Id = Chunk["Id"].AsObjectId(); + + auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); + CHECK(It != AttachedIds.end()); + ptrdiff_t Index = std::distance(AttachedIds.begin(), It); + CHECK_EQ(AttachedIds[Index], Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Fresh modtime query + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, {}), + Response); + + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + Oid Id = Chunk["Id"].AsObjectId(); + + auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); + CHECK(It != AttachedIds.end()); + ptrdiff_t Index = std::distance(AttachedIds.begin(), It); + CHECK_EQ(AttachedIds[Index], Id); + CHECK_EQ(ResponseModTags[Index], Chunk["ModTag"].AsUInt64()); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Modtime query with matching ModTags + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, ResponseModTags), + Response); + + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + Oid Id = Chunk["Id"].AsObjectId(); + + auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); + CHECK(It != AttachedIds.end()); + ptrdiff_t Index = std::distance(AttachedIds.begin(), It); + CHECK_EQ(AttachedIds[Index], Id); + CHECK(!Chunk.FindView("ModTag")); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + { + // Modtime query with mismatching ModTags + std::vector<uint64_t> MismatchingModTags(ResponseModTags); + for (uint64_t& Tag : MismatchingModTags) + { + Tag++; + } + CbPackage Response; + auto Result = ProjectStore.GetChunks("proj1"sv, + "oplog1"sv, + testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, MismatchingModTags), + Response); + + CHECK_EQ(HttpResponseCode::OK, Result.first); + CHECK_EQ(0, Response.GetAttachments().size()); + CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView(); + CHECK_EQ(3, Chunks.Num()); + for (CbFieldView ChunkView : Chunks) + { + CbObjectView Chunk = ChunkView.AsObjectView(); + Oid Id = Chunk["Id"].AsObjectId(); + + auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id); + CHECK(It != AttachedIds.end()); + ptrdiff_t Index = std::distance(AttachedIds.begin(), It); + CHECK_EQ(AttachedIds[Index], Id); + CHECK(Chunk["ModTag"].AsUInt64() == ResponseModTags[Index]); + CHECK(!Chunk.FindView("RawHash")); + CHECK(!Chunk.FindView("Size")); + CHECK(!Chunk.FindView("RawSize")); + } + } + } +} + TEST_CASE("project.store.partial.read") { using namespace std::literals; @@ -6682,13 +8183,14 @@ TEST_CASE("project.store.partial.read") ScopedTemporaryDirectory TempDir; auto JobQueue = MakeJobQueue(1, ""sv); + OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; @@ -6718,63 +8220,119 @@ TEST_CASE("project.store.partial.read") Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122}); for (auto It : Attachments) { - Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second)); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(It.first, It.second)); } } { + uint64_t ModificationTag = 0; IoBuffer Chunk; CHECK(ProjectStore .GetChunk("proj1"sv, "oplog1"sv, Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), HttpContentType::kCompressedBinary, - Chunk) + Chunk, + &ModificationTag) .first == HttpResponseCode::OK); IoHash RawHash; uint64_t RawSize; CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize()); - } - CompositeBuffer ChunkResult; - HttpContentType ContentType; - CHECK(ProjectStore - .GetChunkRange("proj1"sv, - "oplog1"sv, - OidAsString(Attachments[OpIds[2]][1].first), - 0, - ~0ull, - HttpContentType::kCompressedBinary, - ChunkResult, - ContentType) - .first == HttpResponseCode::OK); - CHECK(ChunkResult); - CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult)).DecodeRawSize() == - Attachments[OpIds[2]][1].second.DecodeRawSize()); - - CompositeBuffer PartialChunkResult; - CHECK(ProjectStore - .GetChunkRange("proj1"sv, - "oplog1"sv, - OidAsString(Attachments[OpIds[2]][1].first), - 5, - 1773, - HttpContentType::kCompressedBinary, - PartialChunkResult, - ContentType) - .first == HttpResponseCode::OK); - CHECK(PartialChunkResult); - IoHash PartialRawHash; - uint64_t PartialRawSize; - CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(PartialChunkResult, PartialRawHash, PartialRawSize); - CHECK(PartialRawSize >= 1773); - - uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); - SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); - SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress(); - const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]); - const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData()); - CHECK(FullDataPtr[0] == PartialDataPtr[0]); + CHECK(ModificationTag != 0); + CHECK(ProjectStore + .GetChunk("proj1"sv, + "oplog1"sv, + Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), + HttpContentType::kCompressedBinary, + Chunk, + &ModificationTag) + .first == HttpResponseCode::NotModified); + } + + { + uint64_t FullChunkModificationTag = 0; + { + CompositeBuffer ChunkResult; + HttpContentType ContentType; + CHECK(ProjectStore + .GetChunkRange("proj1"sv, + "oplog1"sv, + OidAsString(Attachments[OpIds[2]][1].first), + 0, + ~0ull, + HttpContentType::kCompressedBinary, + ChunkResult, + ContentType, + &FullChunkModificationTag) + .first == HttpResponseCode::OK); + CHECK(ChunkResult); + CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult)).DecodeRawSize() == + Attachments[OpIds[2]][1].second.DecodeRawSize()); + } + { + CompositeBuffer ChunkResult; + HttpContentType ContentType; + CHECK(ProjectStore + .GetChunkRange("proj1"sv, + "oplog1"sv, + OidAsString(Attachments[OpIds[2]][1].first), + 0, + ~0ull, + HttpContentType::kCompressedBinary, + ChunkResult, + ContentType, + &FullChunkModificationTag) + .first == HttpResponseCode::NotModified); + } + } + { + CompositeBuffer PartialChunkResult; + uint64_t PartialChunkModificationTag = 0; + { + CompositeBuffer ChunkResult; + HttpContentType ContentType; + CHECK(ProjectStore + .GetChunkRange("proj1"sv, + "oplog1"sv, + OidAsString(Attachments[OpIds[2]][1].first), + 5, + 1773, + HttpContentType::kCompressedBinary, + PartialChunkResult, + ContentType, + &PartialChunkModificationTag) + .first == HttpResponseCode::OK); + CHECK(PartialChunkResult); + IoHash PartialRawHash; + uint64_t PartialRawSize; + CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(PartialChunkResult, PartialRawHash, PartialRawSize); + CHECK(PartialRawSize >= 1773); + + uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); + SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); + SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress(); + const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]); + const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData()); + CHECK(FullDataPtr[0] == PartialDataPtr[0]); + } + + { + CompositeBuffer ChunkResult; + HttpContentType ContentType; + CHECK(ProjectStore + .GetChunkRange("proj1"sv, + "oplog1"sv, + OidAsString(Attachments[OpIds[2]][1].first), + 5, + 1773, + HttpContentType::kCompressedBinary, + PartialChunkResult, + ContentType, + &PartialChunkModificationTag) + .first == HttpResponseCode::NotModified); + } + } } TEST_CASE("project.store.block") @@ -6808,13 +8366,14 @@ TEST_CASE("project.store.iterateoplog") ScopedTemporaryDirectory TempDir; auto JobQueue = MakeJobQueue(1, ""sv); + OpenProcessCache ProcessCache; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{}); + ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "enginesv"; @@ -6851,7 +8410,7 @@ TEST_CASE("project.store.iterateoplog") TestOidData TestOids[NumTestOids]; for (const TestOidData& TestOid : TestOids) { - Oplog->AppendNewOplogEntry(CreateOplogPackage(TestOid.KeyAsOidNotOplogId, {})); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(TestOid.KeyAsOidNotOplogId, {})); } int Count = 0; diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 860f2c17d..8f2d3ce0d 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -22,6 +22,7 @@ class CidStore; class AuthMgr; class ScrubContext; class JobQueue; +class OpenProcessCache; enum class HttpResponseCode; @@ -67,7 +68,12 @@ public: { }; - ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue, const Configuration& Config); + ProjectStore(CidStore& Store, + std::filesystem::path BasePath, + GcManager& Gc, + JobQueue& JobQueue, + OpenProcessCache& InOpenProcessCache, + const Configuration& Config); ~ProjectStore(); struct Project; @@ -114,16 +120,18 @@ public: std::optional<CbObject> GetOpByIndex(uint32_t Index); std::optional<uint32_t> GetOpIndexByKey(const Oid& Key); - IoBuffer FindChunk(const Oid& ChunkId); + IoBuffer FindChunk(const Oid& ChunkId, uint64_t* OptOutModificationTag); IoBuffer GetChunkByRawHash(const IoHash& RawHash); - bool IterateChunks(std::span<IoHash> RawHashes, - const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool, - uint64_t LargeSizeLimit); - bool IterateChunks(std::span<Oid> ChunkIds, - const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, - WorkerThreadPool* OptionalWorkerPool, - uint64_t LargeSizeLimit); + bool IterateChunks(std::span<IoHash> RawHashes, + bool IncludeModTag, + const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); + bool IterateChunks(std::span<Oid> ChunkIds, + bool IncludeModTag, + const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback, + WorkerThreadPool* OptionalWorkerPool, + uint64_t LargeSizeLimit); inline static const uint32_t kInvalidOp = ~0u; /** Persist a new oplog entry @@ -427,7 +435,8 @@ public: uint64_t Size, ZenContentType AcceptType, CompositeBuffer& OutChunk, - ZenContentType& OutContentType); + ZenContentType& OutContentType, + uint64_t* OptionalInOutModificationTag); std::pair<HttpResponseCode, std::string> GetChunkRange(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view ChunkId, @@ -435,12 +444,14 @@ public: uint64_t Size, ZenContentType AcceptType, CompositeBuffer& OutChunk, - ZenContentType& OutContentType); + ZenContentType& OutContentType, + uint64_t* OptionalInOutModificationTag); std::pair<HttpResponseCode, std::string> GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view Cid, ZenContentType AcceptType, - IoBuffer& OutChunk); + IoBuffer& OutChunk, + uint64_t* OptionalInOutModificationTag); std::pair<HttpResponseCode, std::string> PutChunk(const std::string_view ProjectId, const std::string_view OplogId, @@ -458,6 +469,11 @@ public: const HttpServerRequest::QueryParams& Params, CbObject& OutResponse); + std::pair<HttpResponseCode, std::string> GetChunks(const std::string_view ProjectId, + const std::string_view OplogId, + const CbObject& RequestObject, + CbPackage& OutResponsePackage); + bool Rpc(HttpServerRequest& HttpReq, const std::string_view ProjectId, const std::string_view OplogId, @@ -485,6 +501,7 @@ private: GcManager& m_Gc; CidStore& m_CidStore; JobQueue& m_JobQueue; + OpenProcessCache& m_OpenProcessCache; std::filesystem::path m_ProjectBasePath; const Configuration m_Config; mutable RwLock m_ProjectsLock; diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index c0082b746..6b05442b3 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -152,14 +152,22 @@ public: { CbObjectWriter RequestWriter; RequestWriter.AddString("method"sv, "getchunks"sv); - RequestWriter.BeginArray("chunks"sv); + RequestWriter.BeginObject("Request"sv); { - for (const IoHash& RawHash : RawHashes) + RequestWriter.BeginArray("Chunks"sv); { - RequestWriter.AddHash(RawHash); + for (const IoHash& RawHash : RawHashes) + { + RequestWriter.BeginObject(); + { + RequestWriter.AddHash("RawHash", RawHash); + } + RequestWriter.EndObject(); + } } + RequestWriter.EndArray(); // "chunks" } - RequestWriter.EndArray(); // "chunks" + RequestWriter.EndObject(); Request = RequestWriter.Save(); } diff --git a/src/zenserver/vfs/vfsimpl.cpp b/src/zenserver/vfs/vfsimpl.cpp index 6e14b7632..2bac6b756 100644 --- a/src/zenserver/vfs/vfsimpl.cpp +++ b/src/zenserver/vfs/vfsimpl.cpp @@ -47,7 +47,8 @@ VfsOplogDataSource::ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t Byt ~0ull, ZenContentType::kCompressedBinary, /* out */ ChunkBuffer, - /* out */ ContentType); + /* out */ ContentType, + /* OptionalInOutModificationTag */ nullptr); if (Result.first == HttpResponseCode::OK) { diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 3714dfaeb..66b6cb858 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -243,7 +243,12 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen ZEN_INFO("instantiating project service"); - m_ProjectStore = new ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue, ProjectStore::Configuration{}); + m_ProjectStore = new ProjectStore(*m_CidStore, + m_DataRoot / "projects", + m_GcManager, + *m_JobQueue, + *m_OpenProcessCache, + ProjectStore::Configuration{}); m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr}); if (ServerOptions.WorksSpacesConfig.Enabled) @@ -534,7 +539,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold = 128 * 1024 * 1024; } - m_CacheStore = new ZenCacheStore(m_GcManager, *m_JobQueue, m_DataRoot / "cache", Config, m_GcManager.GetDiskWriteBlocker()); + m_CacheStore = new ZenCacheStore(m_GcManager, *m_JobQueue, m_DataRoot / "cache", Config, m_GcManager.GetDiskWriteBlocker()); + m_OpenProcessCache = std::make_unique<OpenProcessCache>(); const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; @@ -616,7 +622,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) m_StatsService, m_StatusService, *m_UpstreamCache, - m_GcManager.GetDiskWriteBlocker()); + m_GcManager.GetDiskWriteBlocker(), + *m_OpenProcessCache); m_Http->RegisterService(*m_StructuredCacheService); m_Http->RegisterService(*m_UpstreamService); @@ -797,6 +804,7 @@ ZenServer::Cleanup() m_UpstreamService.reset(); m_UpstreamCache.reset(); m_CacheStore = {}; + m_OpenProcessCache.reset(); m_HttpWorkspacesService.reset(); m_Workspaces.reset(); diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index b9d12689d..80054dc35 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -115,17 +115,18 @@ private: inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; } static std::string_view ToString(ServerState Value); - StatsReporter m_StatsReporter; - Ref<HttpServer> m_Http; - std::unique_ptr<AuthMgr> m_AuthMgr; - std::unique_ptr<HttpAuthService> m_AuthService; - HttpStatusService m_StatusService; - HttpStatsService m_StatsService; - GcManager m_GcManager; - GcScheduler m_GcScheduler{m_GcManager}; - std::unique_ptr<CidStore> m_CidStore; - Ref<ZenCacheStore> m_CacheStore; - HttpTestService m_TestService; + StatsReporter m_StatsReporter; + Ref<HttpServer> m_Http; + std::unique_ptr<AuthMgr> m_AuthMgr; + std::unique_ptr<HttpAuthService> m_AuthService; + HttpStatusService m_StatusService; + HttpStatsService m_StatsService; + GcManager m_GcManager; + GcScheduler m_GcScheduler{m_GcManager}; + std::unique_ptr<CidStore> m_CidStore; + Ref<ZenCacheStore> m_CacheStore; + std::unique_ptr<OpenProcessCache> m_OpenProcessCache; + HttpTestService m_TestService; #if ZEN_WITH_TESTS HttpTestingService m_TestingService; #endif diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index a4f9fe78b..851b1d125 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -34,7 +34,7 @@ GetCacheDiskTag() return _; } -namespace { +namespace cache::impl { #pragma pack(push) #pragma pack(1) @@ -224,11 +224,15 @@ namespace { zen::Sleep(100); } while (true); } -} // namespace +} // namespace cache::impl namespace fs = std::filesystem; using namespace std::literals; +} // namespace zen + +namespace zen::cache::impl { + class BucketManifestSerializer { using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; @@ -571,7 +575,8 @@ BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& B if (Header.EntryCount > ExpectedEntryCount) { ZEN_WARN( - "Failed to read sidecar file '{}'. File is not large enough to hold expected entry count. Header count: {}, file size count: " + "Failed to read sidecar file '{}'. File is not large enough to hold expected entry count. Header count: {}, file size " + "count: " "{}", SidecarPath, Header.EntryCount, @@ -695,6 +700,12 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&, static const float IndexMinLoadFactor = 0.2f; static const float IndexMaxLoadFactor = 0.7f; +} // namespace zen::cache::impl + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, @@ -705,8 +716,8 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, , m_Configuration(Config) , m_BucketId(Oid::Zero) { - m_Index.min_load_factor(IndexMinLoadFactor); - m_Index.max_load_factor(IndexMaxLoadFactor); + m_Index.min_load_factor(cache::impl::IndexMinLoadFactor); + m_Index.max_load_factor(cache::impl::IndexMaxLoadFactor); if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) { @@ -750,11 +761,11 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo CreateDirectories(m_BucketDir); - std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); + std::filesystem::path ManifestPath = cache::impl::GetManifestPath(m_BucketDir, m_BucketName); bool IsNew = false; - BucketManifestSerializer ManifestReader; + cache::impl::BucketManifestSerializer ManifestReader; if (ManifestReader.Open(ManifestPath)) { @@ -770,7 +781,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, - BucketManifestSerializer::CurrentDiskBucketVersion); + cache::impl::BucketManifestSerializer::CurrentDiskBucketVersion); IsNew = true; } } @@ -824,11 +835,11 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, namespace fs = std::filesystem; - fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + fs::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName); try { - const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); + const uint64_t IndexSize = sizeof(cache::impl::CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) @@ -862,14 +873,14 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, // all data is written to the file BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024); - CacheBucketIndexHeader Header = {.EntryCount = EntryCount, - .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; + cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount, + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; - Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); - IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + Header.Checksum = cache::impl::CacheBucketIndexHeader::ComputeChecksum(Header); + IndexWriter.Write(&Header, sizeof(cache::impl::CacheBucketIndexHeader), 0); - uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader); + uint64_t IndexWriteOffset = sizeof(cache::impl::CacheBucketIndexHeader); for (auto& Entry : m_Index) { @@ -896,7 +907,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, // We must only update the log flush position once the snapshot write succeeds if (FlushLockPosition) { - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName); if (std::filesystem::is_regular_file(LogPath)) { @@ -938,12 +949,12 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t FileSize = ObjectIndexFile.FileSize(); - if (FileSize < sizeof(CacheBucketIndexHeader)) + if (FileSize < sizeof(cache::impl::CacheBucketIndexHeader)) { return 0; } - CacheBucketIndexHeader Header; + cache::impl::CacheBucketIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if (!Header.IsValid()) @@ -951,12 +962,12 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const return 0; } - if (Header.Version != CacheBucketIndexHeader::Version2) + if (Header.Version != cache::impl::CacheBucketIndexHeader::Version2) { return 0; } - const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(cache::impl::CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); if (Header.EntryCount > ExpectedEntryCount) { return 0; @@ -977,7 +988,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024); - uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader); + uint64_t CurrentReadOffset = sizeof(cache::impl::CacheBucketIndexHeader); uint64_t RemainingEntryCount = Header.EntryCount; std::string InvalidEntryReason; @@ -986,7 +997,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset); CurrentReadOffset += sizeof(DiskIndexEntry); - if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) + if (!cache::impl::ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; @@ -1003,7 +1014,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount())); - OutVersion = CacheBucketIndexHeader::Version2; + OutVersion = cache::impl::CacheBucketIndexHeader::Version2; return Header.LogPosition; } @@ -1050,7 +1061,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std:: return; } - if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) + if (!cache::impl::ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; @@ -1087,8 +1098,8 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco m_MemCachedPayloads.clear(); m_FreeMemCachedPayloads.clear(); - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); - std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName); + std::filesystem::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName); if (IsNew) { @@ -1920,7 +1931,7 @@ ZenCacheDiskLayer::CacheBucket::Drop() m_BlockStore.Close(); m_SlogFile.Close(); - bool Deleted = MoveAndDeleteDirectory(m_BucketDir); + const bool Deleted = cache::impl::MoveAndDeleteDirectory(m_BucketDir); m_Index.clear(); m_Payloads.clear(); @@ -1970,8 +1981,8 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl { bool UseLegacyScheme = false; - IoBuffer Buffer; - BucketManifestSerializer ManifestWriter; + IoBuffer Buffer; + cache::impl::BucketManifestSerializer ManifestWriter; if (UseLegacyScheme) { @@ -2051,7 +2062,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl } ManifestWriter.WriteSidecarFile(IndexLock, - GetMetaPath(m_BucketDir, m_BucketName), + cache::impl::GetMetaPath(m_BucketDir, m_BucketName), m_LogFlushPosition, m_Index, m_AccessTimes, @@ -2059,7 +2070,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl m_MetaDatas); } - std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); + const std::filesystem::path ManifestPath = cache::impl::GetManifestPath(m_BucketDir, m_BucketName); TemporaryFile::SafeWriteFile(ManifestPath, Buffer.GetView()); } catch (const std::exception& Err) @@ -2733,7 +2744,7 @@ public: Stopwatch Timer; const auto _ = MakeGuard([&] { - Reset(m_ExpiredStandaloneKeys); + cache::impl::Reset(m_ExpiredStandaloneKeys); if (!Ctx.Settings.Verbose) { return; @@ -3101,7 +3112,7 @@ ZenCacheDiskLayer::CacheBucket::ReadAttachmentsFromMetaData(uint32_t BlockI ZEN_TRACE_CPU("Z$::Bucket::GetAttachmentsFromMetaData"); return GetAttachmentsFromMetaData<IoHash, IoHash>( MetaDataPayload, - BlockMetaDataExpectedMagic, + cache::impl::BlockMetaDataExpectedMagic, [&](std::span<const IoHash> Keys, std::span<const uint32_t> AttachmentCounts, std::span<const IoHash> Attachments) { auto AttachmentReadIt = Attachments.begin(); OutReferences.resize(OutReferences.size() + Attachments.size()); @@ -3278,7 +3289,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferences(const LoggerRef& Logger, ZEN_ASSERT(Keys.size() == AttachmentCounts.size()); IoBuffer MetaDataPayload = BuildReferenceMetaData<IoHash>( - BlockMetaDataExpectedMagic, + cache::impl::BlockMetaDataExpectedMagic, Keys, AttachmentCounts, std::span<const IoHash>(OutReferences) @@ -3485,8 +3496,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, Payloads.reserve(EntryCount); AccessTimes.reserve(EntryCount); Index.reserve(EntryCount); - Index.min_load_factor(IndexMinLoadFactor); - Index.max_load_factor(IndexMaxLoadFactor); + Index.min_load_factor(cache::impl::IndexMinLoadFactor); + Index.max_load_factor(cache::impl::IndexMaxLoadFactor); for (auto It : m_Index) { PayloadIndex EntryIndex = PayloadIndex(Payloads.size()); @@ -3510,9 +3521,9 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, m_Payloads.swap(Payloads); m_AccessTimes.swap(AccessTimes); m_MetaDatas.swap(MetaDatas); - Reset(m_FreeMetaDatas); + cache::impl::Reset(m_FreeMetaDatas); m_MemCachedPayloads.swap(MemCachedPayloads); - Reset(m_FreeMemCachedPayloads); + cache::impl::Reset(m_FreeMemCachedPayloads); } RwLock::SharedLockScope @@ -3963,7 +3974,7 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket) // Make sure we remove the folder even if we don't know about the bucket std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); - return MoveAndDeleteDirectory(BucketPath); + return cache::impl::MoveAndDeleteDirectory(BucketPath); } bool @@ -3986,7 +3997,7 @@ ZenCacheDiskLayer::Drop() return false; } } - return MoveAndDeleteDirectory(m_RootDir); + return cache::impl::MoveAndDeleteDirectory(m_RootDir); } void diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index f85b19264..50af7246e 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -65,7 +65,7 @@ struct CasDiskIndexHeader static_assert(sizeof(CasDiskIndexHeader) == 32); -namespace { +namespace cas::impl { const char* IndexExtension = ".uidx"; const char* LogExtension = ".ulog"; @@ -124,17 +124,17 @@ namespace { return true; } -} // namespace +} // namespace cas::impl ////////////////////////////////////////////////////////////////////////// -static const float IndexMinLoadFactor = 0.2f; -static const float IndexMaxLoadFactor = 0.7f; - CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("containercas")), m_Gc(Gc) { ZEN_MEMSCOPE(GetCasContainerTag()); + const float IndexMinLoadFactor = 0.2f; + const float IndexMaxLoadFactor = 0.7f; + m_LocationMap.min_load_factor(IndexMinLoadFactor); m_LocationMap.max_load_factor(IndexMaxLoadFactor); @@ -165,7 +165,7 @@ CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory, m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; m_MaxBlockSize = MaxBlockSize; - m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName); + m_BlocksBasePath = cas::impl::GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName); OpenContainer(IsNewStore); @@ -329,31 +329,33 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, { ZEN_MEMSCOPE(GetCasContainerTag()); - const size_t ChunkCount = ChunkHashes.size(); - if (ChunkCount < 3) + const size_t ChunkCount = ChunkHashes.size(); + std::vector<size_t> FoundChunkIndexes; + std::vector<BlockStoreLocation> FoundChunkLocations; + FoundChunkIndexes.reserve(ChunkCount); + FoundChunkLocations.reserve(ChunkCount); { + RwLock::SharedLockScope _(m_LocationMapLock); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++) { - IoBuffer Chunk = FindChunk(ChunkHashes[ChunkIndex]); - if (!AsyncCallback(ChunkIndex, Chunk)) + if (auto KeyIt = m_LocationMap.find(ChunkHashes[ChunkIndex]); KeyIt != m_LocationMap.end()) { - return false; + FoundChunkIndexes.push_back(ChunkIndex); + FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment)); } } - return true; } - std::vector<size_t> FoundChunkIndexes; - std::vector<BlockStoreLocation> FoundChunkLocations; - FoundChunkIndexes.reserve(ChunkCount); - FoundChunkLocations.reserve(ChunkCount); - RwLock::SharedLockScope _(m_LocationMapLock); - for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++) + if (FoundChunkLocations.size() < 3) { - if (auto KeyIt = m_LocationMap.find(ChunkHashes[ChunkIndex]); KeyIt != m_LocationMap.end()) + for (size_t ChunkIndex : FoundChunkIndexes) { - FoundChunkIndexes.push_back(ChunkIndex); - FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment)); + IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]); + if (!AsyncCallback(ChunkIndex, Chunk)) + { + return false; + } } + return true; } auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) { @@ -919,8 +921,8 @@ CasContainerStrategy::MakeIndexSnapshot() namespace fs = std::filesystem; - fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); - fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName); + fs::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName); + fs::path TempIndexPath = cas::impl::GetTempIndexPath(m_RootDirectory, m_ContainerBaseName); // Move index away, we keep it if something goes wrong if (fs::is_regular_file(TempIndexPath)) @@ -1054,7 +1056,7 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint std::string InvalidEntryReason; for (const CasDiskIndexEntry& Entry : Entries) { - if (!ValidateEntry(Entry, InvalidEntryReason)) + if (!cas::impl::ValidateEntry(Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; @@ -1121,7 +1123,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski m_LocationMap.erase(Record.Key); return; } - if (!ValidateEntry(Record, InvalidEntryReason)) + if (!cas::impl::ValidateEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); return; @@ -1147,7 +1149,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) m_LocationMap.clear(); m_Locations.clear(); - std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName); + std::filesystem::path BasePath = cas::impl::GetBasePath(m_RootDirectory, m_ContainerBaseName); if (IsNewStore) { @@ -1158,8 +1160,8 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); - std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); - std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); + std::filesystem::path LogPath = cas::impl::GetLogPath(m_RootDirectory, m_ContainerBaseName); + std::filesystem::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName); if (std::filesystem::is_regular_file(IndexPath)) { diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 339e5de0a..82dbe3551 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -57,16 +57,14 @@ GetFileCasTag() return _; } -namespace { +namespace filecas::impl { template<typename T> void Reset(T& V) { T Tmp; V.swap(Tmp); } -} // namespace -namespace filecas::impl { const char* IndexExtension = ".uidx"; const char* LogExtension = ".ulog"; @@ -141,11 +139,11 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo ////////////////////////////////////////////////////////////////////////// -static const float IndexMinLoadFactor = 0.2f; -static const float IndexMaxLoadFactor = 0.7f; - FileCasStrategy::FileCasStrategy(GcManager& Gc) : m_Log(logging::Get("filecas")), m_Gc(Gc) { + static const float IndexMinLoadFactor = 0.2f; + static const float IndexMaxLoadFactor = 0.7f; + m_Index.min_load_factor(IndexMinLoadFactor); m_Index.max_load_factor(IndexMaxLoadFactor); @@ -1237,7 +1235,7 @@ public: Stopwatch Timer; const auto _ = MakeGuard([&] { - Reset(m_ReferencesToClean); + filecas::impl::Reset(m_ReferencesToClean); if (!Ctx.Settings.Verbose) { return; @@ -1353,7 +1351,7 @@ public: } } - Reset(m_ReferencesToClean); + filecas::impl::Reset(m_ReferencesToClean); } virtual std::string GetGcName(GcCtx& Ctx) override { return m_FileCasStrategy.GetGcName(Ctx); } diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 711b96c8f..b0b4f22cb 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -17,6 +17,10 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +namespace cache::impl { + class BucketManifestSerializer; +} + class IoBuffer; class JobQueue; @@ -446,7 +450,7 @@ public: friend class DiskBucketReferenceChecker; friend class DiskBucketStoreCompactor; - friend class BucketManifestSerializer; + friend class cache::impl::BucketManifestSerializer; }; private: @@ -65,7 +65,11 @@ add_rules("mode.debug", "mode.release") --add_rules("c++.unity_build") if is_mode("release") then - set_optimize("smallest") + -- LTO does not appear to work with the current UE toolchain + if not is_plat("linux") then + set_policy("build.optimization.lto", true) + end + set_optimize("fastest") end if is_mode("debug") then |