aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zencore/filesystem.cpp24
-rw-r--r--src/zencore/include/zencore/compactbinarypackage.h4
-rw-r--r--src/zencore/include/zencore/filesystem.h4
-rw-r--r--src/zencore/include/zencore/memory/align.h2
-rw-r--r--src/zencore/include/zencore/testutils.h1
-rw-r--r--src/zencore/testutils.cpp20
-rw-r--r--src/zencore/xmake.lua1
-rw-r--r--src/zenhttp/servers/httpplugin.cpp8
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp4
-rw-r--r--src/zenserver/cache/httpstructuredcache.h5
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp12
-rw-r--r--src/zenserver/projectstore/projectstore.cpp1961
-rw-r--r--src/zenserver/projectstore/projectstore.h43
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp16
-rw-r--r--src/zenserver/vfs/vfsimpl.cpp3
-rw-r--r--src/zenserver/zenserver.cpp14
-rw-r--r--src/zenserver/zenserver.h23
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp93
-rw-r--r--src/zenstore/compactcas.cpp58
-rw-r--r--src/zenstore/filecas.cpp14
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h6
21 files changed, 1986 insertions, 330 deletions
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index 36147c5a9..52f2c4adc 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -1435,12 +1435,34 @@ FileSizeFromHandle(void* NativeHandle)
int Fd = int(intptr_t(NativeHandle));
struct stat Stat;
fstat(Fd, &Stat);
- FileSize = size_t(Stat.st_size);
+ FileSize = size_t(Stat.st_size);
#endif
return FileSize;
}
+uint64_t
+GetModificationTickFromHandle(void* NativeHandle, std::error_code& Ec)
+{
+#if ZEN_PLATFORM_WINDOWS
+ FILETIME LastWriteTime;
+ BOOL OK = GetFileTime((HANDLE)NativeHandle, NULL, NULL, &LastWriteTime);
+ if (OK)
+ {
+ return ((uint64_t(LastWriteTime.dwHighDateTime) << 32) | LastWriteTime.dwLowDateTime);
+ }
+#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ int Fd = int(uintptr_t(NativeHandle));
+ struct stat Stat;
+ if (0 == fstat(Fd, &Stat))
+ {
+ return gsl::narrow<uint64_t>(Stat.st_mtime);
+ }
+#endif
+ Ec = MakeErrorCodeFromLastError();
+ return 0;
+}
+
std::filesystem::path
GetRunningExecutablePath()
{
diff --git a/src/zencore/include/zencore/compactbinarypackage.h b/src/zencore/include/zencore/compactbinarypackage.h
index 064481f83..12fcc41b7 100644
--- a/src/zencore/include/zencore/compactbinarypackage.h
+++ b/src/zencore/include/zencore/compactbinarypackage.h
@@ -64,6 +64,9 @@ public:
ZENCORE_API CbAttachment(const CompressedBuffer& InValue, const IoHash& Hash);
ZENCORE_API CbAttachment(CompressedBuffer&& InValue, const IoHash& Hash);
+ /** Construct a binary attachment. Value is cloned if not owned. */
+ ZENCORE_API CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash);
+
/** Reset this to a null attachment. */
inline void Reset() { *this = CbAttachment(); }
@@ -130,7 +133,6 @@ public:
private:
ZENCORE_API CbAttachment(const CbObject& Value, const IoHash* Hash);
ZENCORE_API explicit CbAttachment(CompositeBuffer&& InValue);
- ZENCORE_API CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash);
IoHash Hash;
std::variant<std::nullptr_t, CbObject, CompositeBuffer, CompressedBuffer> Value;
diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h
index 2cd663afb..dba4981f0 100644
--- a/src/zencore/include/zencore/filesystem.h
+++ b/src/zencore/include/zencore/filesystem.h
@@ -45,6 +45,10 @@ ZENCORE_API std::filesystem::path CanonicalPath(std::filesystem::path InPath, st
*/
ZENCORE_API uint64_t FileSizeFromHandle(void* NativeHandle);
+/** Get a native time tick of last modification time
+ */
+ZENCORE_API uint64_t GetModificationTickFromHandle(void* NativeHandle, std::error_code& Ec);
+
ZENCORE_API std::filesystem::path GetRunningExecutablePath();
/** Set the max open file handle count to max allowed for the current process on Linux and MacOS
diff --git a/src/zencore/include/zencore/memory/align.h b/src/zencore/include/zencore/memory/align.h
index acf4157c4..9d4101fab 100644
--- a/src/zencore/include/zencore/memory/align.h
+++ b/src/zencore/include/zencore/memory/align.h
@@ -1,5 +1,7 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#pragma once
+
#include <zenbase/zenbase.h>
namespace zen {
diff --git a/src/zencore/include/zencore/testutils.h b/src/zencore/include/zencore/testutils.h
index 6a1c0184b..45fde4eda 100644
--- a/src/zencore/include/zencore/testutils.h
+++ b/src/zencore/include/zencore/testutils.h
@@ -33,6 +33,7 @@ struct ScopedCurrentDirectoryChange
};
IoBuffer CreateRandomBlob(uint64_t Size);
+IoBuffer CreateSemiRandomBlob(uint64_t Size);
struct FalseType
{
diff --git a/src/zencore/testutils.cpp b/src/zencore/testutils.cpp
index d4c8aeaef..641d5508a 100644
--- a/src/zencore/testutils.cpp
+++ b/src/zencore/testutils.cpp
@@ -71,6 +71,26 @@ CreateRandomBlob(uint64_t Size)
return Data;
};
+IoBuffer
+CreateSemiRandomBlob(uint64_t Size)
+{
+ IoBuffer Result(Size);
+ const size_t PartCount = (Size / (1u * 1024u * 64)) + 1;
+ const size_t PartSize = Size / PartCount;
+ auto Part = CreateRandomBlob(PartSize);
+ auto Remain = Result.GetMutableView().CopyFrom(Part.GetView());
+ while (Remain.GetSize() >= PartSize)
+ {
+ Remain = Remain.CopyFrom(Part.GetView());
+ }
+ if (Remain.GetSize() > 0)
+ {
+ auto RemainBuffer = CreateRandomBlob(Remain.GetSize());
+ Remain.CopyFrom(RemainBuffer.GetView());
+ }
+ return Result;
+};
+
} // namespace zen
#endif // ZEN_WITH_TESTS
diff --git a/src/zencore/xmake.lua b/src/zencore/xmake.lua
index ce8cd0996..2f82d38a3 100644
--- a/src/zencore/xmake.lua
+++ b/src/zencore/xmake.lua
@@ -13,6 +13,7 @@ target('zencore')
end)
set_configdir("include/zencore")
add_files("**.cpp")
+ add_files("trace.cpp", {unity_ignored = true })
if has_config("zenrpmalloc") then
set_languages("c17", "cxx20")
diff --git a/src/zenhttp/servers/httpplugin.cpp b/src/zenhttp/servers/httpplugin.cpp
index 09cd76f3e..9c2110476 100644
--- a/src/zenhttp/servers/httpplugin.cpp
+++ b/src/zenhttp/servers/httpplugin.cpp
@@ -27,14 +27,6 @@
# include <conio.h>
# endif
-# define PLUGIN_VERBOSE_TRACE 1
-
-# if PLUGIN_VERBOSE_TRACE
-# define ZEN_TRACE_VERBOSE ZEN_TRACE
-# else
-# define ZEN_TRACE_VERBOSE(fmtstr, ...)
-# endif
-
namespace zen {
struct HttpPluginServerImpl;
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index a6606c7ad..fd116ba8e 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -86,7 +86,8 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach
HttpStatsService& StatsService,
HttpStatusService& StatusService,
UpstreamCache& UpstreamCache,
- const DiskWriteBlocker* InDiskWriteBlocker)
+ const DiskWriteBlocker* InDiskWriteBlocker,
+ OpenProcessCache& InOpenProcessCache)
: m_Log(logging::Get("cache"))
, m_CacheStore(InCacheStore)
, m_StatsService(StatsService)
@@ -94,6 +95,7 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach
, m_CidStore(InCidStore)
, m_UpstreamCache(UpstreamCache)
, m_DiskWriteBlocker(InDiskWriteBlocker)
+, m_OpenProcessCache(InOpenProcessCache)
, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker)
{
m_StatsService.RegisterHandler("z$", *this);
diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h
index 812162efa..13c1d6475 100644
--- a/src/zenserver/cache/httpstructuredcache.h
+++ b/src/zenserver/cache/httpstructuredcache.h
@@ -75,7 +75,8 @@ public:
HttpStatsService& StatsService,
HttpStatusService& StatusService,
UpstreamCache& UpstreamCache,
- const DiskWriteBlocker* InDiskWriteBlocker);
+ const DiskWriteBlocker* InDiskWriteBlocker,
+ OpenProcessCache& InOpenProcessCache);
~HttpStructuredCacheService();
virtual const char* BaseUri() const override;
@@ -122,7 +123,7 @@ private:
metrics::OperationTiming m_UpstreamGetRequestTiming;
CacheStats m_CacheStats;
const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
- OpenProcessCache m_OpenProcessCache;
+ OpenProcessCache& m_OpenProcessCache;
CacheRpcHandler m_RpcHandler;
void ReplayRequestRecorder(const CacheRequestContext& Context, cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index 22216e88d..09f4c7ee2 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -561,7 +561,7 @@ HttpProjectService::HandleChunkBatchRequest(HttpRouterRequest& Req)
for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex)
{
const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex];
- IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId);
+ IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId, nullptr);
if (FoundChunk)
{
if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1))
@@ -834,7 +834,7 @@ HttpProjectService::HandleChunkByIdRequest(HttpRouterRequest& Req)
CompositeBuffer Chunk;
HttpContentType ContentType;
std::pair<HttpResponseCode, std::string> Result =
- m_ProjectStore->GetChunkRange(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk, ContentType);
+ m_ProjectStore->GetChunkRange(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk, ContentType, nullptr);
if (Result.first == HttpResponseCode::OK)
{
m_ProjectStats.ChunkHitCount++;
@@ -883,8 +883,8 @@ HttpProjectService::HandleChunkByCidRequest(HttpRouterRequest& Req)
case HttpVerb::kGet:
{
IoBuffer Value;
- std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value);
-
+ std::pair<HttpResponseCode, std::string> Result =
+ m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value, nullptr);
if (Result.first == HttpResponseCode::OK)
{
m_ProjectStats.ChunkHitCount++;
@@ -991,7 +991,7 @@ HttpProjectService::HandleOplogOpPrepRequest(HttpRouterRequest& Req)
std::vector<IoHash> NeedList = FoundLog->CheckPendingChunkReferences(ChunkList, std::chrono::minutes(2));
- CbObjectWriter Cbo;
+ CbObjectWriter Cbo(1 + 1 + 5 + NeedList.size() * (1 + sizeof(IoHash::Hash)) + 1);
Cbo.BeginArray("need");
{
for (const IoHash& Hash : NeedList)
@@ -1544,7 +1544,7 @@ LoadReferencedSet(ProjectStore::Oplog& Log)
return std::optional<OplogReferencedSet>();
}
- return OplogReferencedSet::LoadFromChunk(Log.FindChunk(ChunkId));
+ return OplogReferencedSet::LoadFromChunk(Log.FindChunk(ChunkId, nullptr));
}
void
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 415c80078..70045c13c 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -19,6 +19,7 @@
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/cache/rpcrecording.h>
+#include <zenutil/openprocesscache.h>
#include <zenutil/packageformat.h>
#include <zenutil/referencemetadata.h>
#include <zenutil/workerpools.h>
@@ -314,6 +315,27 @@ namespace {
static_assert(sizeof(OplogIndexHeader) == 64);
+ static std::uint64_t GetModificationTagFromRawHash(const IoHash& Hash)
+ {
+ IoHash::Hasher H;
+ return H(Hash);
+ }
+
+ static std::uint64_t GetModificationTagFromModificationTime(IoBuffer FileBuffer)
+ {
+ IoBufferFileReference FileRef;
+ if (FileBuffer.GetFileReference(FileRef))
+ {
+ std::error_code Ec;
+ uint64_t ModificationTick = GetModificationTickFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ return ModificationTick;
+ }
+ }
+ return {};
+ }
+
} // namespace
//////////////////////////////////////////////////////////////////////////
@@ -1920,19 +1942,27 @@ ProjectStore::Oplog::GetChunkByRawHash(const IoHash& RawHash)
}
bool
-ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes,
- const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool,
- uint64_t LargeSizeLimit)
-{
- return m_CidStore.IterateChunks(RawHashes, AsyncCallback, OptionalWorkerPool, LargeSizeLimit);
+ProjectStore::Oplog::IterateChunks(std::span<IoHash> RawHashes,
+ bool IncludeModTag,
+ const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
+{
+ return m_CidStore.IterateChunks(
+ RawHashes,
+ [&](size_t Index, const IoBuffer& Payload) {
+ return AsyncCallback(Index, Payload, IncludeModTag ? GetModificationTagFromRawHash(RawHashes[Index]) : 0);
+ },
+ OptionalWorkerPool,
+ LargeSizeLimit);
}
bool
-ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
- const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool,
- uint64_t LargeSizeLimit)
+ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
+ bool IncludeModTag,
+ const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit)
{
std::vector<size_t> CidChunkIndexes;
std::vector<IoHash> CidChunkHashes;
@@ -1960,12 +1990,6 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
}
}
}
- m_CidStore.IterateChunks(
- CidChunkHashes,
- [&](size_t Index, const IoBuffer& Payload) { return AsyncCallback(CidChunkIndexes[Index], Payload); },
- OptionalWorkerPool,
- LargeSizeLimit);
-
if (OptionalWorkerPool)
{
std::atomic_bool Result = true;
@@ -1979,7 +2003,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
}
WorkLatch.AddCount(1);
OptionalWorkerPool->ScheduleWork(
- [this, &WorkLatch, &ChunkIds, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() {
+ [this, &WorkLatch, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() {
auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
if (Result.load() == false)
{
@@ -1995,7 +2019,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[ChunkIndex], FilePath);
}
- if (!AsyncCallback(FileChunkIndex, Payload))
+ if (!AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0))
{
Result.store(false);
}
@@ -2012,6 +2036,18 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
});
}
+ if (!CidChunkHashes.empty())
+ {
+ m_CidStore.IterateChunks(
+ CidChunkHashes,
+ [&](size_t Index, const IoBuffer& Payload) {
+ size_t CidChunkIndex = CidChunkIndexes[Index];
+ return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0);
+ },
+ OptionalWorkerPool,
+ LargeSizeLimit);
+ }
+
WorkLatch.CountDown();
WorkLatch.Wait();
@@ -2019,6 +2055,18 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
}
else
{
+ if (!CidChunkHashes.empty())
+ {
+ m_CidStore.IterateChunks(
+ CidChunkHashes,
+ [&](size_t Index, const IoBuffer& Payload) {
+ size_t CidChunkIndex = CidChunkIndexes[Index];
+ return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0);
+ },
+ OptionalWorkerPool,
+ LargeSizeLimit);
+ }
+
for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
{
size_t FileChunkIndex = FileChunkIndexes[ChunkIndex];
@@ -2026,7 +2074,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
if (Payload)
{
- bool Result = AsyncCallback(FileChunkIndex, Payload);
+ bool Result = AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0);
if (!Result)
{
return false;
@@ -2038,7 +2086,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
}
IoBuffer
-ProjectStore::Oplog::FindChunk(const Oid& ChunkId)
+ProjectStore::Oplog::FindChunk(const Oid& ChunkId, uint64_t* OptOutModificationTag)
{
RwLock::SharedLockScope OplogLock(m_OplogLock);
if (!m_Storage)
@@ -2051,7 +2099,12 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId)
IoHash ChunkHash = ChunkIt->second;
OplogLock.ReleaseNow();
- return m_CidStore.FindChunkByCid(ChunkHash);
+ IoBuffer Result = m_CidStore.FindChunkByCid(ChunkHash);
+ if (Result && OptOutModificationTag != nullptr)
+ {
+ *OptOutModificationTag = GetModificationTagFromRawHash(ChunkHash);
+ }
+ return Result;
}
if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end())
@@ -2065,6 +2118,10 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId)
{
ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkId, FilePath);
}
+ else if (OptOutModificationTag != nullptr)
+ {
+ *OptOutModificationTag = GetModificationTagFromModificationTime(Result);
+ }
return Result;
}
@@ -2073,7 +2130,12 @@ ProjectStore::Oplog::FindChunk(const Oid& ChunkId)
IoHash ChunkHash = MetaIt->second;
OplogLock.ReleaseNow();
- return m_CidStore.FindChunkByCid(ChunkHash);
+ IoBuffer Result = m_CidStore.FindChunkByCid(ChunkHash);
+ if (Result && OptOutModificationTag != nullptr)
+ {
+ *OptOutModificationTag = GetModificationTagFromRawHash(ChunkHash);
+ }
+ return Result;
}
return {};
@@ -2109,7 +2171,7 @@ ProjectStore::Oplog::GetAllChunksInfo()
for (ChunkInfo& Info : InfoArray)
{
- if (IoBuffer Chunk = FindChunk(Info.ChunkId))
+ if (IoBuffer Chunk = FindChunk(Info.ChunkId, nullptr))
{
Info.ChunkSize = Chunk.GetSize();
}
@@ -3032,7 +3094,7 @@ ProjectStore::Project::WriteAccessTimes()
{
using namespace std::literals;
- CbObjectWriter Writer;
+ CbObjectWriter Writer(32 + (m_LastAccessTimes.size() * 16));
{
RwLock::SharedLockScope _(m_LastAccessTimesLock);
@@ -3557,11 +3619,17 @@ ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const
//////////////////////////////////////////////////////////////////////////
-ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue, const Configuration& Config)
+ProjectStore::ProjectStore(CidStore& Store,
+ std::filesystem::path BasePath,
+ GcManager& Gc,
+ JobQueue& JobQueue,
+ OpenProcessCache& InOpenProcessCache,
+ const Configuration& Config)
: m_Log(logging::Get("project"))
, m_Gc(Gc)
, m_CidStore(Store)
, m_JobQueue(JobQueue)
+, m_OpenProcessCache(InOpenProcessCache)
, m_ProjectBasePath(BasePath)
, m_Config(Config)
, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker())
@@ -3972,7 +4040,8 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
FoundLog->IterateChunks(
Ids,
- [&](size_t Index, const IoBuffer& Payload) {
+ false,
+ [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) {
try
{
if (Payload)
@@ -4033,7 +4102,8 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
256u * 1024u);
}
- CbObjectWriter Response;
+ CbObjectWriter Response(64u + Count * ((WantsIdField ? (5 + sizeof(Oid::OidBits)) : 0) + (WantsServerPathField ? 64 : 0) +
+ (WantsClientPathField ? 64 : 0) + (WantsSizeField ? 16 : 0) + (WantsRawSizeField ? 16 : 0)));
Response.BeginArray("files"sv);
for (size_t Index = 0; Index < Count; Index++)
{
@@ -4102,7 +4172,16 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
std::vector<uint64_t> RawSizes;
std::vector<uint64_t> Sizes;
- size_t Count = 0;
+ size_t Count = 0;
+ size_t EstimatedCount = FoundLog->OplogCount();
+ if (WantsIdField)
+ {
+ Ids.reserve(EstimatedCount);
+ }
+ if (WantsRawHashField || WantsRawSizeField || WantsSizeField)
+ {
+ Hashes.reserve(EstimatedCount);
+ }
FoundLog->IterateChunkMap([&](const Oid& Id, const IoHash& Hash) {
if (WantsIdField)
{
@@ -4129,7 +4208,8 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();
(void)FoundLog->IterateChunks(
Hashes,
- [&](size_t Index, const IoBuffer& Payload) -> bool {
+ false,
+ [&](size_t Index, const IoBuffer& Payload, uint64_t /*ModTag*/) -> bool {
try
{
if (Payload)
@@ -4190,7 +4270,9 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
256u * 1024u);
}
- CbObjectWriter Response;
+ CbObjectWriter Response(64u + Count * ((WantsIdField ? (5 + sizeof(Oid::OidBits)) : 0) +
+ (WantsRawHashField ? (10 + sizeof(IoHash::Hash)) : 0) + (WantsSizeField ? 16 : 0) +
+ (WantsRawSizeField ? 16 : 0)));
Response.BeginArray("chunkinfos"sv);
for (size_t Index = 0; Index < Count; Index++)
@@ -4250,7 +4332,7 @@ ProjectStore::GetChunkInfo(const std::string_view ProjectId,
const Oid Obj = Oid::FromHexString(ChunkId);
- IoBuffer Chunk = FoundLog->FindChunk(Obj);
+ IoBuffer Chunk = FoundLog->FindChunk(Obj, nullptr);
if (!Chunk)
{
return {HttpResponseCode::NotFound, {}};
@@ -4284,7 +4366,8 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
uint64_t Size,
ZenContentType AcceptType,
CompositeBuffer& OutChunk,
- ZenContentType& OutContentType)
+ ZenContentType& OutContentType,
+ uint64_t* OptionalInOutModificationTag)
{
if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
{
@@ -4293,41 +4376,19 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
const Oid Obj = Oid::FromHexString(ChunkId);
- return GetChunkRange(ProjectId, OplogId, Obj, Offset, Size, AcceptType, OutChunk, OutContentType);
+ return GetChunkRange(ProjectId, OplogId, Obj, Offset, Size, AcceptType, OutChunk, OutContentType, OptionalInOutModificationTag);
}
-std::pair<HttpResponseCode, std::string>
-ProjectStore::GetChunkRange(const std::string_view ProjectId,
- const std::string_view OplogId,
- Oid ChunkId,
- uint64_t Offset,
- uint64_t Size,
- ZenContentType AcceptType,
- CompositeBuffer& OutChunk,
- ZenContentType& OutContentType)
+static std::pair<HttpResponseCode, std::string>
+ExtractRange(IoBuffer&& Chunk,
+ uint64_t Offset,
+ uint64_t Size,
+ ZenContentType AcceptType,
+ ZenContentType& OutContentType,
+ CompositeBuffer& OutChunk,
+ IoHash& OutRawHash,
+ uint64_t& OutRawSize)
{
- bool IsOffset = Offset != 0 || Size != ~(0ull);
-
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)};
- }
- Project->TouchProject();
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false);
- if (!FoundLog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
- Project->TouchOplog(OplogId);
-
- IoBuffer Chunk = FoundLog->FindChunk(ChunkId);
- if (!Chunk)
- {
- return {HttpResponseCode::NotFound, {}};
- }
-
OutContentType = Chunk.GetContentType();
if (OutContentType == ZenContentType::kCompressedBinary)
@@ -4335,26 +4396,47 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
IoHash RawHash;
uint64_t RawSize;
CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize);
- ZEN_ASSERT(!Compressed.IsNull());
+ if (!Compressed)
+ {
+ return {HttpResponseCode::InternalServerError, "malformed compressed buffer"};
+ }
+
+ const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == RawSize));
- if (IsOffset)
+ if (IsFullRange)
+ {
+ if (AcceptType == ZenContentType::kBinary)
+ {
+ OutChunk = Compressed.DecompressToComposite();
+ OutContentType = ZenContentType::kBinary;
+ }
+ else
+ {
+ OutChunk = Compressed.GetCompressed();
+ }
+ OutRawSize = 0;
+ }
+ else
{
if (Size == ~(0ull) || (Offset + Size) > RawSize)
{
- Size = RawSize - Offset;
+ if (Offset < RawSize)
+ {
+ Size = RawSize - Offset;
+ }
+ else
+ {
+ Size = 0;
+ }
}
if (Size == 0)
{
- return {
- HttpResponseCode::NotFound,
- fmt::format("Chunk request for range outside of chunk '{}/{}'. Request: Chunk: {}, Offset: {}, Size: {}, ChunkSize: {}",
- ProjectId,
- OplogId,
- ChunkId,
- Offset,
- Size,
- RawSize)};
+ return {HttpResponseCode::NotFound,
+ fmt::format("Chunk request for range outside of compressed chunk. Offset: {}, Size: {}, ChunkSize: {}",
+ Offset,
+ Size,
+ RawSize)};
}
if (AcceptType == ZenContentType::kBinary)
@@ -4368,47 +4450,93 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
// The client will have to compensate for any offsets that do not land on an even block size multiple
OutChunk = Compressed.GetRange(Offset, Size).GetCompressed();
}
+ OutRawSize = RawSize;
+ }
+ OutRawHash = RawHash;
+ }
+ else
+ {
+ const uint64_t ChunkSize = Chunk.GetSize();
+
+ const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == ChunkSize));
+ if (IsFullRange)
+ {
+ OutChunk = CompositeBuffer(SharedBuffer(std::move(Chunk)));
+ OutRawSize = 0;
}
else
{
- if (AcceptType == ZenContentType::kBinary)
+ if (Size == ~(0ull) || (Offset + Size) > ChunkSize)
{
- OutChunk = Compressed.DecompressToComposite();
- OutContentType = ZenContentType::kBinary;
+ if (Offset < ChunkSize)
+ {
+ Size = ChunkSize - Offset;
+ }
+ else
+ {
+ Size = 0;
+ }
}
- else
+
+ if (Size == 0)
{
- OutChunk = Compressed.GetCompressed();
+ return {HttpResponseCode::NotFound,
+ fmt::format("Chunk request for range outside of chunk. Offset: {}, Size: {}, ChunkSize: {}", Offset, Size, Size)};
}
+
+ OutChunk = CompositeBuffer(SharedBuffer(IoBuffer(std::move(Chunk), Offset, Size)));
+ OutRawSize = ChunkSize;
}
}
- else if (IsOffset)
+ return {HttpResponseCode::OK, {}};
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::GetChunkRange(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ Oid ChunkId,
+ uint64_t Offset,
+ uint64_t Size,
+ ZenContentType AcceptType,
+ CompositeBuffer& OutChunk,
+ ZenContentType& OutContentType,
+ uint64_t* OptionalInOutModificationTag)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
{
- if (Size == ~(0ull) || (Offset + Size) > Chunk.GetSize())
- {
- Size = Chunk.GetSize() - Offset;
- }
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)};
+ }
+ Project->TouchProject();
- if (Size == 0)
- {
- return {HttpResponseCode::NotFound,
- fmt::format("Chunk request for range outside of chunk '{}/{}'. Request: Chunk: {}, Offset: {}, Size: {}, ChunkSize: {}",
- ProjectId,
- OplogId,
- ChunkId,
- Offset,
- Size,
- Chunk.GetSize())};
- }
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ false);
+ if (!FoundLog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+ Project->TouchOplog(OplogId);
- OutChunk = CompositeBuffer(SharedBuffer(IoBuffer(std::move(Chunk), Offset, Size)));
+ uint64_t OldTag = OptionalInOutModificationTag == nullptr ? 0 : *OptionalInOutModificationTag;
+ IoBuffer Chunk = FoundLog->FindChunk(ChunkId, OptionalInOutModificationTag);
+ if (!Chunk)
+ {
+ return {HttpResponseCode::NotFound, {}};
}
- else
+ if (OptionalInOutModificationTag != nullptr && OldTag == *OptionalInOutModificationTag)
{
- OutChunk = CompositeBuffer(SharedBuffer(std::move(Chunk)));
+ return {HttpResponseCode::NotModified, {}};
}
- return {HttpResponseCode::OK, {}};
+ IoHash _;
+ uint64_t __;
+ std::pair<HttpResponseCode, std::string> Result =
+ ExtractRange(std::move(Chunk), Offset, Size, AcceptType, OutContentType, OutChunk, /*OutRawHash*/ _, /*OutRawSize*/ __);
+ if (Result.first != HttpResponseCode::OK)
+ {
+ return {Result.first,
+ fmt::format("Chunk request for chunk {} in {}/{} failed. Reason: '{}'", ChunkId, ProjectId, OplogId, Result.second)};
+ }
+ return Result;
}
std::pair<HttpResponseCode, std::string>
@@ -4416,7 +4544,8 @@ ProjectStore::GetChunk(const std::string_view ProjectId,
const std::string_view OplogId,
const std::string_view Cid,
ZenContentType AcceptType,
- IoBuffer& OutChunk)
+ IoBuffer& OutChunk,
+ uint64_t* OptionalInOutModificationTag)
{
Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
if (!Project)
@@ -4445,6 +4574,16 @@ ProjectStore::GetChunk(const std::string_view ProjectId,
return {HttpResponseCode::NotFound, fmt::format("chunk - '{}' MISSING", Cid)};
}
+ if (OptionalInOutModificationTag != nullptr)
+ {
+ uint64_t OldTag = *OptionalInOutModificationTag;
+ *OptionalInOutModificationTag = GetModificationTagFromRawHash(Hash);
+ if (*OptionalInOutModificationTag == OldTag)
+ {
+ return {HttpResponseCode::NotModified, {}};
+ }
+ }
+
if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary)
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk));
@@ -4504,6 +4643,354 @@ ProjectStore::PutChunk(const std::string_view ProjectId,
}
std::pair<HttpResponseCode, std::string>
+ProjectStore::GetChunks(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const CbObject& RequestObject,
+ CbPackage& OutResponsePackage)
+{
+ ZEN_TRACE_CPU("Store::GetChunks");
+
+ using namespace std::literals;
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("getchunks rpc request for unknown project '{}'", ProjectId)};
+ }
+ Project->TouchProject();
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId, /*AllowCompact*/ true, /*VerifyPathOnDisk*/ false);
+ if (!FoundLog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("getchunks rpc request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+ Project->TouchOplog(OplogId);
+
+ if (RequestObject["chunks"sv].IsArray())
+ {
+ // Legacy full chunks only by rawhash
+
+ CbArrayView ChunksArray = RequestObject["chunks"sv].AsArrayView();
+
+ CbObjectWriter ResponseWriter;
+ ResponseWriter.BeginArray("chunks"sv);
+ for (CbFieldView FieldView : ChunksArray)
+ {
+ IoHash RawHash = FieldView.AsHash();
+ IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash);
+ if (ChunkBuffer)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer));
+ if (Compressed)
+ {
+ ResponseWriter.AddHash(RawHash);
+ OutResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
+ }
+ else
+ {
+ ZEN_WARN("oplog '{}/{}': invalid compressed binary in cas store for {}", ProjectId, OplogId, RawHash);
+ }
+ }
+ }
+ ResponseWriter.EndArray();
+ OutResponsePackage.SetObject(ResponseWriter.Save());
+ return {HttpResponseCode::OK, {}};
+ }
+ else if (auto RequestFieldView = RequestObject["Request"sv]; RequestFieldView.IsObject())
+ {
+ CbObjectView RequestView = RequestFieldView.AsObjectView();
+ bool SkipData = RequestView["SkipData"].AsBool(false);
+ CbArrayView ChunksArray = RequestView["Chunks"sv].AsArrayView();
+
+ struct Request
+ {
+ struct InputData
+ {
+ uint64_t Offset = 0;
+ uint64_t Size = (uint64_t)-1;
+ std::variant<IoHash, Oid> Id;
+ std::optional<uint64_t> ModTag;
+ } Input;
+ struct OutputData
+ {
+ bool Exists = false;
+ IoBuffer ChunkBuffer;
+ uint64_t ModTag = 0;
+ } Output;
+ };
+
+ std::vector<Request> Requests;
+ size_t RequestCount = ChunksArray.Num();
+ if (RequestCount > 0)
+ {
+ Requests.reserve(RequestCount);
+ std::vector<IoHash> ChunkRawHashes;
+ std::vector<size_t> ChunkRawHashesRequestIndex;
+ std::vector<Oid> ChunkIds;
+ std::vector<size_t> ChunkIdsRequestIndex;
+ bool DoBatch = RequestCount > 1;
+ if (DoBatch)
+ {
+ ChunkRawHashes.reserve(RequestCount);
+ ChunkRawHashesRequestIndex.reserve(RequestCount);
+ ChunkIds.reserve(RequestCount);
+ ChunkIdsRequestIndex.reserve(RequestCount);
+ }
+ for (CbFieldView FieldView : ChunksArray)
+ {
+ CbObjectView ChunkObject = FieldView.AsObjectView();
+ Request ChunkRequest = {
+ .Input{.Offset = ChunkObject["Offset"sv].AsUInt64(0), .Size = ChunkObject["Size"sv].AsUInt64((uint64_t)-1)}};
+ if (CbFieldView InputModificationTagView = ChunkObject.FindView("ModTag"); InputModificationTagView.IsInteger())
+ {
+ ChunkRequest.Input.ModTag = InputModificationTagView.AsUInt64();
+ }
+ if (CbFieldView RawHashView = ChunkObject.FindView("RawHash"sv); RawHashView.IsHash())
+ {
+ const IoHash ChunkHash = RawHashView.AsHash();
+ ChunkRequest.Input.Id = ChunkHash;
+ if (DoBatch)
+ {
+ ChunkRawHashes.push_back(ChunkHash);
+ ChunkRawHashesRequestIndex.push_back(Requests.size());
+ }
+ }
+ else if (CbFieldView IdView = ChunkObject.FindView("Oid"sv); IdView.IsObjectId())
+ {
+ const Oid ChunkId = IdView.AsObjectId();
+ ChunkRequest.Input.Id = ChunkId;
+ if (DoBatch)
+ {
+ ChunkIds.push_back(ChunkId);
+ ChunkIdsRequestIndex.push_back(Requests.size());
+ }
+ }
+ else
+ {
+ return {HttpResponseCode::BadRequest,
+ fmt::format("oplog '{}/{}': malformed getchunks rpc request object, chunk request has no identifier",
+ ProjectId,
+ OplogId)};
+ }
+ Requests.emplace_back(std::move(ChunkRequest));
+ }
+
+ if (DoBatch)
+ {
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();
+ if (!ChunkRawHashes.empty())
+ {
+ FoundLog->IterateChunks(
+ ChunkRawHashes,
+ true,
+ [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool {
+ if (Payload)
+ {
+ size_t RequestIndex = ChunkRawHashesRequestIndex[Index];
+ Requests[RequestIndex].Output.Exists = true;
+ if (!SkipData)
+ {
+ Requests[RequestIndex].Output.ChunkBuffer = Payload;
+ Requests[RequestIndex].Output.ChunkBuffer.MakeOwned();
+ }
+ Requests[RequestIndex].Output.ModTag = ModTag;
+ }
+ return true;
+ },
+ &WorkerPool,
+ 8u * 1024);
+ }
+ if (!ChunkIdsRequestIndex.empty())
+ {
+ FoundLog->IterateChunks(
+ ChunkIds,
+ true,
+ [&](size_t Index, const IoBuffer& Payload, uint64_t ModTag) -> bool {
+ if (Payload)
+ {
+ size_t RequestIndex = ChunkIdsRequestIndex[Index];
+ Requests[RequestIndex].Output.Exists = true;
+ if (!SkipData)
+ {
+ Requests[RequestIndex].Output.ChunkBuffer = Payload;
+ Requests[RequestIndex].Output.ChunkBuffer.MakeOwned();
+ }
+ Requests[RequestIndex].Output.ModTag = ModTag;
+ }
+ return true;
+ },
+ &WorkerPool,
+ 8u * 1024);
+ }
+ }
+ else
+ {
+ Request& ChunkRequest = Requests.front();
+ if (ChunkRequest.Input.Id.index() == 0)
+ {
+ const IoHash& ChunkHash = std::get<IoHash>(ChunkRequest.Input.Id);
+ IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash);
+ if (Payload)
+ {
+ ChunkRequest.Output.Exists = true;
+ ChunkRequest.Output.ModTag = GetModificationTagFromRawHash(ChunkHash);
+ if (!SkipData)
+ {
+ ChunkRequest.Output.ChunkBuffer = Payload;
+ }
+ }
+ }
+ else
+ {
+ const Oid& ChunkId = std::get<Oid>(ChunkRequest.Input.Id);
+ uint64_t ModTag = 0;
+ IoBuffer Payload = FoundLog->FindChunk(ChunkId, &ModTag);
+ if (Payload)
+ {
+ ChunkRequest.Output.Exists = true;
+ ChunkRequest.Output.ModTag = ModTag;
+ if (!SkipData)
+ {
+ ChunkRequest.Output.ChunkBuffer = Payload;
+ }
+ }
+ }
+ }
+ }
+
+ CbObjectWriter ResponseWriter(32 + Requests.size() * 64u);
+ ResponseWriter.BeginArray("Chunks"sv);
+ {
+ for (Request& ChunkRequest : Requests)
+ {
+ if (ChunkRequest.Output.Exists)
+ {
+ ResponseWriter.BeginObject();
+ {
+ if (ChunkRequest.Input.Id.index() == 0)
+ {
+ const IoHash& RawHash = std::get<IoHash>(ChunkRequest.Input.Id);
+ ResponseWriter.AddHash("Id", RawHash);
+ }
+ else
+ {
+ const Oid& Id = std::get<Oid>(ChunkRequest.Input.Id);
+ ResponseWriter.AddObjectId("Id", Id);
+ }
+ if (!ChunkRequest.Input.ModTag.has_value() || ChunkRequest.Input.ModTag.value() != ChunkRequest.Output.ModTag)
+ {
+ ResponseWriter.AddInteger("ModTag", ChunkRequest.Output.ModTag);
+ if (!SkipData)
+ {
+ CompositeBuffer ChunkRange;
+ ZenContentType ContentType;
+ IoHash FullChunkRawHash;
+ uint64_t FullChunkSize = 0;
+ auto ExtractRangeResult = ExtractRange(std::move(ChunkRequest.Output.ChunkBuffer),
+ ChunkRequest.Input.Offset,
+ ChunkRequest.Input.Size,
+ ZenContentType::kCompressedBinary,
+ ContentType,
+ ChunkRange,
+ FullChunkRawHash,
+ FullChunkSize);
+ if (ExtractRangeResult.first == HttpResponseCode::OK)
+ {
+ if (ContentType == ZenContentType::kCompressedBinary)
+ {
+ ZEN_ASSERT(FullChunkRawHash != IoHash::Zero);
+ CompressedBuffer CompressedValue =
+ CompressedBuffer::FromCompressedNoValidate(std::move(ChunkRange));
+ ZEN_ASSERT(CompressedValue);
+
+ if (FullChunkSize != 0)
+ {
+ // This really could use some thought so we don't send the same data if we get a request for
+ // multiple ranges from the same chunk block
+
+ uint64_t FragmentRawOffset = 0;
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize = 0;
+ if (CompressedValue.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ if (BlockSize > 0)
+ {
+ FragmentRawOffset = (ChunkRequest.Input.Offset / BlockSize) * BlockSize;
+ }
+ else
+ {
+ FragmentRawOffset = ChunkRequest.Input.Offset;
+ }
+ uint64_t FragmentRawLength = CompressedValue.DecodeRawSize();
+
+ IoHashStream FragmentHashStream;
+ FragmentHashStream.Append(FullChunkRawHash.Hash, sizeof(FullChunkRawHash.Hash));
+ FragmentHashStream.Append(&FragmentRawOffset, sizeof(FragmentRawOffset));
+ FragmentHashStream.Append(&FragmentRawLength, sizeof(FragmentRawLength));
+ IoHash FragmentHash = FragmentHashStream.GetHash();
+
+ ResponseWriter.AddHash("FragmentHash", FragmentHash);
+ ResponseWriter.AddInteger("FragmentOffset", FragmentRawOffset);
+ ResponseWriter.AddInteger("RawSize", FullChunkSize);
+ OutResponsePackage.AddAttachment(CbAttachment(CompressedValue, FragmentHash));
+ }
+ else
+ {
+ std::string ErrorString =
+ "Failed to get compression parameters from partial compressed buffer";
+ ResponseWriter.AddString("Error", ErrorString);
+ ZEN_WARN("oplog '{}/{}': {}", ProjectId, OplogId, ErrorString);
+ }
+ }
+ else
+ {
+ ResponseWriter.AddHash("RawHash"sv, FullChunkRawHash);
+ OutResponsePackage.AddAttachment(CbAttachment(std::move(CompressedValue), FullChunkRawHash));
+ }
+ }
+ else
+ {
+ IoHashStream HashStream;
+ ZEN_ASSERT(ChunkRequest.Input.Id.index() == 1);
+ const Oid& Id = std::get<Oid>(ChunkRequest.Input.Id);
+ HashStream.Append(Id.OidBits, sizeof(Id.OidBits));
+ HashStream.Append(&ChunkRequest.Input.Offset, sizeof(ChunkRequest.Input.Offset));
+ HashStream.Append(&ChunkRequest.Input.Size, sizeof(ChunkRequest.Input.Size));
+ IoHash Hash = HashStream.GetHash();
+
+ ResponseWriter.AddHash("Hash"sv, Hash);
+ if (FullChunkSize != 0)
+ {
+ ResponseWriter.AddInteger("Size", FullChunkSize);
+ }
+ OutResponsePackage.AddAttachment(CbAttachment(std::move(ChunkRange), Hash));
+ }
+ }
+ else
+ {
+ std::string ErrorString = fmt::format("Failed fetchiong chunk range ({})", ExtractRangeResult.second);
+ ResponseWriter.AddString("Error", ErrorString);
+ ZEN_WARN("oplog '{}/{}': {}", ProjectId, OplogId, ErrorString);
+ }
+ }
+ }
+ }
+ ResponseWriter.EndObject();
+ }
+ }
+ }
+ ResponseWriter.EndArray();
+ OutResponsePackage.SetObject(ResponseWriter.Save());
+ return {HttpResponseCode::OK, {}};
+ }
+ else
+ {
+ return {HttpResponseCode::BadRequest, fmt::format("oplog '{}/{}': malformed getchunks rpc request object", ProjectId, OplogId)};
+ }
+}
+
+std::pair<HttpResponseCode, std::string>
ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, CbObject& OutResponse)
{
ZEN_TRACE_CPU("Store::WriteOplog");
@@ -4570,7 +5057,7 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie
return ConvertResult(RemoteResult);
}
- CbObjectWriter Cbo;
+ CbObjectWriter Cbo(1 + 1 + 5 + Attachments.size() * (1 + sizeof(IoHash::Hash)) + 1);
Cbo.BeginArray("need");
{
for (const IoHash& Hash : Attachments)
@@ -4767,34 +5254,33 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
else if (Method == "getchunks"sv)
{
ZEN_TRACE_CPU("Store::Rpc::getchunks");
- CbPackage ResponsePackage;
+
+ RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(Cb["AcceptFlags"sv].AsUInt16(0u));
+ int32_t TargetProcessId = Cb["Pid"sv].AsInt32(0);
+
+ CbPackage ResponsePackage;
+ std::pair<HttpResponseCode, std::string> Result = GetChunks(ProjectId, OplogId, Cb, ResponsePackage);
+ if (Result.first == HttpResponseCode::OK)
{
- CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView();
- CbObjectWriter ResponseWriter;
- ResponseWriter.BeginArray("chunks"sv);
- for (CbFieldView FieldView : ChunksArray)
+ void* TargetProcessHandle = nullptr;
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
{
- IoHash RawHash = FieldView.AsHash();
- IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash);
- if (ChunkBuffer)
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer));
- if (Compressed)
- {
- ResponseWriter.AddHash(RawHash);
- ResponsePackage.AddAttachment(CbAttachment(std::move(Compressed), RawHash));
- }
- else
- {
- ZEN_WARN("oplog '{}/{}': invalid compressed binary in cas store for {}", ProjectId, OplogId, RawHash);
- }
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
}
+ TargetProcessHandle = m_OpenProcessCache.GetProcessHandle(HttpReq.SessionId(), TargetProcessId);
}
- ResponseWriter.EndArray();
- ResponsePackage.SetObject(ResponseWriter.Save());
+
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, Flags, TargetProcessHandle);
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else
+ {
+ HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
}
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault);
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
return true;
}
else if (Method == "putchunks"sv)
@@ -4927,7 +5413,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
++TotalFiles;
// Rewrite file array entry with new data reference
- CbObjectWriter Writer;
+ CbObjectWriter Writer(View.GetSize());
RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
if (Field.GetName() == "data"sv)
{
@@ -6012,7 +6498,7 @@ namespace testutils {
return OidStringBuilder.ToString();
}
- CbPackage CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
+ CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
{
CbPackage Package;
CbObjectWriter Object;
@@ -6038,16 +6524,43 @@ namespace testutils {
return Package;
};
+ CbPackage CreateFilesOplogPackage(const Oid& Id,
+ const std::filesystem::path ProjectRootDir,
+ const std::span<const std::pair<Oid, std::filesystem::path>>& Attachments)
+ {
+ CbPackage Package;
+ CbObjectWriter Object;
+ Object << "key"sv << OidAsString(Id);
+ if (!Attachments.empty())
+ {
+ Object.BeginArray("files");
+ for (const auto& Attachment : Attachments)
+ {
+ std::filesystem::path ServerPath = std::filesystem::relative(Attachment.second, ProjectRootDir);
+ std::filesystem::path ClientPath = ServerPath; // dummy
+ Object.BeginObject();
+ Object << "id"sv << Attachment.first;
+ Object << "serverpath"sv << ServerPath.string();
+ Object << "clientpath"sv << ClientPath.string();
+ Object.EndObject();
+ }
+ Object.EndArray();
+ }
+ Package.SetObject(Object.Save());
+ return Package;
+ };
+
std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(
const std::span<const size_t>& Sizes,
- OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast)
+ OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
+ uint64_t BlockSize = 0)
{
std::vector<std::pair<Oid, CompressedBuffer>> Result;
Result.reserve(Sizes.size());
for (size_t Size : Sizes)
{
CompressedBuffer Compressed =
- CompressedBuffer::Compress(SharedBuffer(CreateRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel);
+ CompressedBuffer::Compress(SharedBuffer(CreateSemiRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel, BlockSize);
Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
}
return Result;
@@ -6069,6 +6582,63 @@ namespace testutils {
return 0;
}
+ template<typename ChunkType>
+ CbObject BuildChunksRequest(bool SkipData,
+ std::string_view IdName,
+ const std::vector<ChunkType>& Chunks,
+ const std::vector<std::pair<size_t, size_t>>& Ranges,
+ const std::vector<uint64_t>& ModTags)
+ {
+ CbObjectWriter Request;
+ Request.BeginObject("Request"sv);
+ {
+ if (SkipData)
+ {
+ Request.AddBool("SkipData"sv, true);
+ }
+ if (!Chunks.empty())
+ {
+ Request.BeginArray("Chunks");
+ for (size_t Index = 0; Index < Chunks.size(); Index++)
+ {
+ Request.BeginObject();
+ {
+ Request << IdName << Chunks[Index];
+ if (!ModTags.empty())
+ {
+ Request << "ModTag" << ModTags[Index];
+ }
+ if (!Ranges.empty())
+ {
+ Request << "Offset" << Ranges[Index].first;
+ Request << "Size" << Ranges[Index].second;
+ }
+ }
+ Request.EndObject();
+ }
+ Request.EndArray();
+ }
+ }
+ Request.EndObject();
+ return Request.Save();
+ };
+
+ CbObject BuildChunksRequest(bool SkipData,
+ const std::vector<Oid>& Chunks,
+ const std::vector<std::pair<size_t, size_t>>& Ranges,
+ const std::vector<uint64_t>& ModTags)
+ {
+ return BuildChunksRequest<Oid>(SkipData, "Oid", Chunks, Ranges, ModTags);
+ }
+
+ CbObject BuildChunksRequest(bool SkipData,
+ const std::vector<IoHash>& Chunks,
+ const std::vector<std::pair<size_t, size_t>>& Ranges,
+ const std::vector<uint64_t>& ModTags)
+ {
+ return BuildChunksRequest<IoHash>(SkipData, "RawHash", Chunks, Ranges, ModTags);
+ }
+
} // namespace testutils
TEST_CASE("project.store.create")
@@ -6078,6 +6648,7 @@ TEST_CASE("project.store.create")
ScopedTemporaryDirectory TempDir;
auto JobQueue = MakeJobQueue(1, ""sv);
+ OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
@@ -6085,7 +6656,7 @@ TEST_CASE("project.store.create")
std::string_view ProjectName("proj1"sv);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -6108,13 +6679,14 @@ TEST_CASE("project.store.lifetimes")
ScopedTemporaryDirectory TempDir;
auto JobQueue = MakeJobQueue(1, ""sv);
+ OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -6170,13 +6742,14 @@ TEST_CASE_TEMPLATE("project.store.export",
ScopedTemporaryDirectory ExportDir;
auto JobQueue = MakeJobQueue(1, ""sv);
+ OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -6191,13 +6764,14 @@ TEST_CASE_TEMPLATE("project.store.export",
ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {});
CHECK(Oplog != nullptr);
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {}));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
Oplog->AppendNewOplogEntry(
- CreateOplogPackage(Oid::NewOid(),
- CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None)));
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(
+ Oid::NewOid(),
+ CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None)));
FileRemoteStoreOptions Options = {
RemoteStoreOptions{.MaxBlockSize = 64u * 1024, .MaxChunkEmbedSize = 32 * 1024u, .ChunkFileSizeLimit = 64u * 1024u},
@@ -6271,13 +6845,14 @@ TEST_CASE("project.store.gc")
ScopedTemporaryDirectory TempDir;
auto JobQueue = MakeJobQueue(1, ""sv);
+ OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
@@ -6325,10 +6900,11 @@ TEST_CASE("project.store.gc")
ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", Project1OplogPath);
CHECK(Oplog != nullptr);
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {}));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
+ Oplog->AppendNewOplogEntry(
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
}
{
@@ -6342,21 +6918,23 @@ TEST_CASE("project.store.gc")
ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog2", Project2Oplog1Path);
CHECK(Oplog != nullptr);
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {}));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177})));
Oplog->AppendNewOplogEntry(
- CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9123, 383, 590, 96})));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221})));
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9123, 383, 590, 96})));
+ Oplog->AppendNewOplogEntry(
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221})));
}
{
ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog3", Project2Oplog2Path);
CHECK(Oplog != nullptr);
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {}));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{137})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{137})));
+ Oplog->AppendNewOplogEntry(
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9723, 683, 594, 98})));
Oplog->AppendNewOplogEntry(
- CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9723, 683, 594, 98})));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{531, 271})));
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{531, 271})));
}
}
@@ -6468,13 +7046,14 @@ TEST_CASE("project.store.gc.prep")
ScopedTemporaryDirectory TempDir;
auto JobQueue = MakeJobQueue(1, ""sv);
+ OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
@@ -6507,7 +7086,7 @@ TEST_CASE("project.store.gc.prep")
Project1RootDir.string(),
Project1FilePath.string()));
ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath);
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), OpAttachments));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments));
}
{
Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
@@ -6538,7 +7117,7 @@ TEST_CASE("project.store.gc.prep")
// Make sure the chunks are stored but not the referencing op
Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath);
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), OpAttachments));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments));
Project1->DeleteOplog("oplog1"sv);
}
{
@@ -6585,7 +7164,7 @@ TEST_CASE("project.store.gc.prep")
{
Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
ProjectStore::Oplog* Oplog = Project1->OpenOplog("oplog1"sv, true, true);
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), OpAttachments));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments));
Oplog->RemovePendingChunkReferences(OpChunkHashes);
CHECK(Oplog->GetPendingChunkReferencesLocked().size() == 0);
}
@@ -6619,7 +7198,7 @@ TEST_CASE("project.store.gc.prep")
// Make sure the chunks are stored but not the referencing op
Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, Project1OplogPath);
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), OpAttachments));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), OpAttachments));
Project1->DeleteOplog("oplog1"sv);
}
@@ -6674,6 +7253,928 @@ TEST_CASE("project.store.gc.prep")
}
}
+TEST_CASE("project.store.rpc.getchunks")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ auto JobQueue = MakeJobQueue(1, ""sv);
+ OpenProcessCache ProcessCache;
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ std::filesystem::path RootDir = TempDir.Path() / "root"sv;
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
+
+ std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv;
+ std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv;
+ {
+ CreateDirectories(Project1FilePath.parent_path());
+ BasicFile ProjectFile;
+ ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate);
+ }
+
+ std::vector<Oid> OpIds;
+ OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()});
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
+ Oid FilesOpId = Oid::NewOid();
+ std::vector<std::pair<Oid, std::filesystem::path>> FilesOpIdAttachments;
+ {
+ Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ Project1RootDir.string(),
+ Project1FilePath.string()));
+ ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {});
+ CHECK(Oplog != nullptr);
+ Attachments[OpIds[0]] = {};
+ Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77});
+ Attachments[OpIds[2]] =
+ CreateAttachments(std::initializer_list<size_t>{200 * 1024, 314 * 1024, 690, 99}, OodleCompressionLevel::VeryFast, 128 * 1024);
+ Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122});
+ for (auto It : Attachments)
+ {
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(It.first, It.second));
+ }
+
+ std::filesystem::path UncompressedFilePath = RootDir / "content" / "uncompressed_file";
+ CreateDirectories(UncompressedFilePath.parent_path());
+ IoBuffer FileBlob = CreateRandomBlob(81823 * 2);
+ WriteFile(UncompressedFilePath, FileBlob);
+ FilesOpIdAttachments.push_back({Oid::NewOid(), UncompressedFilePath});
+ Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(FilesOpId, RootDir, FilesOpIdAttachments));
+ }
+
+ // Invalid request
+ {
+ CbObjectWriter Request;
+ Request.BeginObject("WrongName"sv);
+ Request.EndObject();
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv, "oplog1"sv, Request.Save(), Response);
+ CHECK_EQ(HttpResponseCode::BadRequest, Result.first);
+ }
+
+ // Empty request
+ {
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, std::vector<IoHash>{}, {}, {}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(0, Chunks.Num());
+ }
+ // Single non-existing chunk by RawHash
+ IoHash NotFoundIoHash = IoHash::Max;
+ {
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundIoHash}, {}, {}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(0, Chunks.Num());
+ }
+ {
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundIoHash}, {}, {}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(0, Chunks.Num());
+ }
+ // Single non-existing chunk by Id
+ Oid NotFoundId = Oid::NewOid();
+ {
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {NotFoundId}, {}, {}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(0, Chunks.Num());
+ }
+ {
+ CbPackage Response;
+ auto Result =
+ ProjectStore.GetChunks("proj1"sv, "oplog1"sv, testutils::BuildChunksRequest(/*SkipData*/ true, {NotFoundId}, {}, {}), Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(0, Chunks.Num());
+ }
+ // Single existing chunk by RawHash
+ {
+ // Fresh fetch
+ IoHash FirstAttachmentHash = Attachments[OpIds[2]][1].second.DecodeRawHash();
+ uint64_t ResponseModTag = 0;
+ {
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {}),
+ Response);
+
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ IoHash Id = Chunk["Id"].AsHash();
+ CHECK_EQ(FirstAttachmentHash, Id);
+ ResponseModTag = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTag);
+ IoHash AttachmentHash = Chunk["RawHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash());
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Fetch with matching ModTag
+ {
+ CbPackage Response;
+ auto Result =
+ ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {ResponseModTag}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ IoHash Id = Chunk["Id"].AsHash();
+ CHECK_EQ(FirstAttachmentHash, Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Fetch with mismatching ModTag
+ {
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks(
+ "proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}),
+ Response);
+
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ IoHash Id = Chunk["Id"].AsHash();
+ CHECK_EQ(FirstAttachmentHash, Id);
+ ResponseModTag = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTag);
+ IoHash AttachmentHash = Chunk["RawHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash());
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Fresh modtime query
+ {
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ IoHash Id = Chunk["Id"].AsHash();
+ CHECK_EQ(FirstAttachmentHash, Id);
+ uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag2);
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Modtime query with matching ModTag
+ {
+ CbPackage Response;
+ auto Result =
+ ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {ResponseModTag}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ IoHash Id = Chunk["Id"].AsHash();
+ CHECK_EQ(FirstAttachmentHash, Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Modtime query with mismatching ModTag
+ {
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks(
+ "proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentHash}, {}, {uint64_t(ResponseModTag + 1)}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ IoHash Id = Chunk["Id"].AsHash();
+ CHECK_EQ(FirstAttachmentHash, Id);
+ uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag2);
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ // Single existing CID chunk by Id
+ {
+ Oid FirstAttachmentId = Attachments[OpIds[2]][1].first;
+ uint64_t ResponseModTag = 0;
+ {
+ // Full chunk request
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ ResponseModTag = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTag);
+ IoHash AttachmentHash = Chunk["RawHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash());
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ {
+ // Partial chunk request
+ CbPackage Response;
+ auto Result =
+ ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{130 * 1024, 8100}}, {}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ ResponseModTag = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTag);
+ IoHash AttachmentHash = Chunk["FragmentHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ std::uint64_t FragmentStart = Chunk["FragmentOffset"].AsUInt64();
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK(FragmentStart <= 130 * 1024);
+ CHECK(FragmentStart + Buffer.DecodeRawSize() >= 130 * 1024 + 8100);
+ auto ResponseDecompressedBuffer = Buffer.Decompress(130 * 1024 - FragmentStart, 8100);
+ auto ExpectedDecompressedBuffer = Attachments[OpIds[2]][1].second.Decompress(130 * 1024, 8100);
+ CHECK(ResponseDecompressedBuffer.AsIoBuffer().GetView().EqualBytes(ExpectedDecompressedBuffer.AsIoBuffer().GetView()));
+ CHECK_EQ(Chunk["RawSize"sv].AsUInt64(), Attachments[OpIds[2]][1].second.DecodeRawSize());
+ CHECK(!Chunk.FindView("Size"));
+ }
+ {
+ // Fetch with matching ModTag
+ CbPackage Response;
+ auto Result =
+ ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ {
+ // Fetch with mismatching ModTag
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks(
+ "proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ uint64_t ResponseModTag3 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag3);
+ IoHash AttachmentHash = Chunk["RawHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash());
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Fresh modtime query
+ {
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag2);
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Modtime query with matching ModTag
+ {
+ CbPackage Response;
+ auto Result =
+ ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Modtime query with mismatching ModTag
+ {
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks(
+ "proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag2);
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+
+ // Single existing file chunk by Id
+ {
+ Oid FirstAttachmentId = FilesOpIdAttachments[0].first;
+ uint64_t ResponseModTag = 0;
+ {
+ // Full chunk request
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ ResponseModTag = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTag);
+ IoHash AttachmentHash = Chunk["Hash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompositeBuffer Buffer = Attachment->AsCompositeBinary();
+ CHECK_EQ(IoHash::HashBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten()), IoHash::HashBuffer(Buffer));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ {
+ // Partial chunk request
+ CbPackage Response;
+ auto Result =
+ ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {{81823, 5434}}, {}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ ResponseModTag = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTag);
+
+ IoHash AttachmentHash = Chunk["Hash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompositeBuffer Buffer = Attachment->AsCompositeBinary();
+ CHECK_EQ(IoHash::HashBuffer(IoBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten(), 81823, 5434)),
+ IoHash::HashBuffer(Buffer));
+ CHECK_EQ(Chunk["Size"sv].AsUInt64(), std::filesystem::file_size(FilesOpIdAttachments[0].second));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ {
+ // Fetch with matching ModTag
+ CbPackage Response;
+ auto Result =
+ ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {ResponseModTag}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("Hash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ {
+ // Fetch with mismatching ModTag
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks(
+ "proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(1, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ uint64_t ResponseModTag3 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag3);
+ IoHash AttachmentHash = Chunk["Hash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompositeBuffer Buffer = Attachment->AsCompositeBinary();
+ CHECK_EQ(IoHash::HashBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten()), IoHash::HashBuffer(Buffer));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Fresh modtime query
+ {
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag2);
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Modtime query with matching ModTag
+ {
+ CbPackage Response;
+ auto Result =
+ ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {ResponseModTag}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("Hash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ // Modtime query with mismatching ModTag
+ {
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks(
+ "proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, {FirstAttachmentId}, {}, {uint64_t(ResponseModTag + 1)}),
+ Response);
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(1, Chunks.Num());
+ CbObjectView Chunk = (*begin(Chunks)).AsObjectView();
+
+ Oid Id = Chunk["Id"].AsObjectId();
+ CHECK_EQ(FirstAttachmentId, Id);
+ uint64_t ResponseModTag2 = Chunk["ModTag"].AsUInt64();
+ CHECK_EQ(ResponseModTag, ResponseModTag2);
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+
+ // Multi RawHash Request
+ {
+ std::vector<CompressedBuffer> AttachmentBuffers{Attachments[OpIds[1]][0].second,
+ Attachments[OpIds[2]][0].second,
+ Attachments[OpIds[2]][1].second};
+ std::vector<IoHash> AttachmentHashes{AttachmentBuffers[0].DecodeRawHash(),
+ AttachmentBuffers[1].DecodeRawHash(),
+ AttachmentBuffers[2].DecodeRawHash()};
+ std::vector<uint64_t> ResponseModTags(3, 0);
+ {
+ // Fresh fetch
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, {}),
+ Response);
+
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(3, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ IoHash Id = Chunk["Id"].AsHash();
+
+ auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id);
+ CHECK(It != AttachmentHashes.end());
+ ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It);
+ CHECK_EQ(AttachmentHashes[Index], Id);
+ ResponseModTags[Index] = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTags[Index]);
+ IoHash AttachmentHash = Chunk["RawHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash());
+ CHECK(AttachmentBuffers[Index].GetCompressed().Flatten().GetView().EqualBytes(Buffer.GetCompressed().Flatten().GetView()));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Fetch with matching ModTag
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, AttachmentHashes, {}, ResponseModTags),
+ Response);
+
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ IoHash Id = Chunk["Id"].AsHash();
+
+ auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id);
+ CHECK(It != AttachmentHashes.end());
+ ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It);
+ CHECK_EQ(AttachmentHashes[Index], Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Fresh modtime query
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, {}),
+ Response);
+
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ IoHash Id = Chunk["Id"].AsHash();
+
+ auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id);
+ CHECK(It != AttachmentHashes.end());
+ ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It);
+ CHECK_EQ(AttachmentHashes[Index], Id);
+ CHECK_EQ(ResponseModTags[Index], Chunk["ModTag"].AsUInt64());
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Modtime query with matching ModTags
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, ResponseModTags),
+ Response);
+
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ IoHash Id = Chunk["Id"].AsHash();
+
+ auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id);
+ CHECK(It != AttachmentHashes.end());
+ ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It);
+ CHECK_EQ(AttachmentHashes[Index], Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Modtime query with mismatching ModTags
+ std::vector<uint64_t> MismatchingModTags(ResponseModTags);
+ for (uint64_t& Tag : MismatchingModTags)
+ {
+ Tag++;
+ }
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachmentHashes, {}, MismatchingModTags),
+ Response);
+
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ IoHash Id = Chunk["Id"].AsHash();
+
+ auto It = std::find(AttachmentHashes.begin(), AttachmentHashes.end(), Id);
+ CHECK(It != AttachmentHashes.end());
+ ptrdiff_t Index = std::distance(AttachmentHashes.begin(), It);
+ CHECK_EQ(AttachmentHashes[Index], Id);
+ CHECK(Chunk["ModTag"].AsUInt64() == ResponseModTags[Index]);
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ }
+ // Multi Id Request
+ {
+ std::vector<CompressedBuffer> AttachmentBuffers{Attachments[OpIds[1]][0].second,
+ Attachments[OpIds[2]][0].second,
+ Attachments[OpIds[2]][1].second};
+ std::vector<IoHash> AttachmentHashes{AttachmentBuffers[0].DecodeRawHash(),
+ AttachmentBuffers[1].DecodeRawHash(),
+ AttachmentBuffers[2].DecodeRawHash()};
+ std::vector<Oid> AttachedIds{Attachments[OpIds[1]][0].first, Attachments[OpIds[2]][0].first, Attachments[OpIds[2]][1].first};
+ std::vector<uint64_t> ResponseModTags(3, 0);
+ {
+ // Fresh fetch
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, {}),
+ Response);
+
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(3, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ Oid Id = Chunk["Id"].AsObjectId();
+
+ auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id);
+ CHECK(It != AttachedIds.end());
+ ptrdiff_t Index = std::distance(AttachedIds.begin(), It);
+ CHECK_EQ(AttachedIds[Index], Id);
+ ResponseModTags[Index] = Chunk["ModTag"].AsUInt64();
+ CHECK_NE(0, ResponseModTags[Index]);
+ IoHash AttachmentHash = Chunk["RawHash"].AsHash();
+ const CbAttachment* Attachment = Response.FindAttachment(AttachmentHash);
+ CHECK_NE(nullptr, Attachment);
+ CompressedBuffer Buffer = Attachment->AsCompressedBinary();
+ CHECK_EQ(AttachmentHash, Buffer.DecodeRawHash());
+ CHECK(AttachmentBuffers[Index].GetCompressed().Flatten().GetView().EqualBytes(Buffer.GetCompressed().Flatten().GetView()));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Fetch with matching ModTag
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ false, AttachedIds, {}, ResponseModTags),
+ Response);
+
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ Oid Id = Chunk["Id"].AsObjectId();
+
+ auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id);
+ CHECK(It != AttachedIds.end());
+ ptrdiff_t Index = std::distance(AttachedIds.begin(), It);
+ CHECK_EQ(AttachedIds[Index], Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Fresh modtime query
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, {}),
+ Response);
+
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ Oid Id = Chunk["Id"].AsObjectId();
+
+ auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id);
+ CHECK(It != AttachedIds.end());
+ ptrdiff_t Index = std::distance(AttachedIds.begin(), It);
+ CHECK_EQ(AttachedIds[Index], Id);
+ CHECK_EQ(ResponseModTags[Index], Chunk["ModTag"].AsUInt64());
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Modtime query with matching ModTags
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, ResponseModTags),
+ Response);
+
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ Oid Id = Chunk["Id"].AsObjectId();
+
+ auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id);
+ CHECK(It != AttachedIds.end());
+ ptrdiff_t Index = std::distance(AttachedIds.begin(), It);
+ CHECK_EQ(AttachedIds[Index], Id);
+ CHECK(!Chunk.FindView("ModTag"));
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ {
+ // Modtime query with mismatching ModTags
+ std::vector<uint64_t> MismatchingModTags(ResponseModTags);
+ for (uint64_t& Tag : MismatchingModTags)
+ {
+ Tag++;
+ }
+ CbPackage Response;
+ auto Result = ProjectStore.GetChunks("proj1"sv,
+ "oplog1"sv,
+ testutils::BuildChunksRequest(/*SkipData*/ true, AttachedIds, {}, MismatchingModTags),
+ Response);
+
+ CHECK_EQ(HttpResponseCode::OK, Result.first);
+ CHECK_EQ(0, Response.GetAttachments().size());
+ CbArrayView Chunks = Response.GetObject()["Chunks"].AsArrayView();
+ CHECK_EQ(3, Chunks.Num());
+ for (CbFieldView ChunkView : Chunks)
+ {
+ CbObjectView Chunk = ChunkView.AsObjectView();
+ Oid Id = Chunk["Id"].AsObjectId();
+
+ auto It = std::find(AttachedIds.begin(), AttachedIds.end(), Id);
+ CHECK(It != AttachedIds.end());
+ ptrdiff_t Index = std::distance(AttachedIds.begin(), It);
+ CHECK_EQ(AttachedIds[Index], Id);
+ CHECK(Chunk["ModTag"].AsUInt64() == ResponseModTags[Index]);
+ CHECK(!Chunk.FindView("RawHash"));
+ CHECK(!Chunk.FindView("Size"));
+ CHECK(!Chunk.FindView("RawSize"));
+ }
+ }
+ }
+}
+
TEST_CASE("project.store.partial.read")
{
using namespace std::literals;
@@ -6682,13 +8183,14 @@ TEST_CASE("project.store.partial.read")
ScopedTemporaryDirectory TempDir;
auto JobQueue = MakeJobQueue(1, ""sv);
+ OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
@@ -6718,63 +8220,119 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122});
for (auto It : Attachments)
{
- Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(It.first, It.second));
}
}
{
+ uint64_t ModificationTag = 0;
IoBuffer Chunk;
CHECK(ProjectStore
.GetChunk("proj1"sv,
"oplog1"sv,
Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(),
HttpContentType::kCompressedBinary,
- Chunk)
+ Chunk,
+ &ModificationTag)
.first == HttpResponseCode::OK);
IoHash RawHash;
uint64_t RawSize;
CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize());
- }
- CompositeBuffer ChunkResult;
- HttpContentType ContentType;
- CHECK(ProjectStore
- .GetChunkRange("proj1"sv,
- "oplog1"sv,
- OidAsString(Attachments[OpIds[2]][1].first),
- 0,
- ~0ull,
- HttpContentType::kCompressedBinary,
- ChunkResult,
- ContentType)
- .first == HttpResponseCode::OK);
- CHECK(ChunkResult);
- CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult)).DecodeRawSize() ==
- Attachments[OpIds[2]][1].second.DecodeRawSize());
-
- CompositeBuffer PartialChunkResult;
- CHECK(ProjectStore
- .GetChunkRange("proj1"sv,
- "oplog1"sv,
- OidAsString(Attachments[OpIds[2]][1].first),
- 5,
- 1773,
- HttpContentType::kCompressedBinary,
- PartialChunkResult,
- ContentType)
- .first == HttpResponseCode::OK);
- CHECK(PartialChunkResult);
- IoHash PartialRawHash;
- uint64_t PartialRawSize;
- CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(PartialChunkResult, PartialRawHash, PartialRawSize);
- CHECK(PartialRawSize >= 1773);
-
- uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5);
- SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed);
- SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress();
- const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]);
- const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData());
- CHECK(FullDataPtr[0] == PartialDataPtr[0]);
+ CHECK(ModificationTag != 0);
+ CHECK(ProjectStore
+ .GetChunk("proj1"sv,
+ "oplog1"sv,
+ Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(),
+ HttpContentType::kCompressedBinary,
+ Chunk,
+ &ModificationTag)
+ .first == HttpResponseCode::NotModified);
+ }
+
+ {
+ uint64_t FullChunkModificationTag = 0;
+ {
+ CompositeBuffer ChunkResult;
+ HttpContentType ContentType;
+ CHECK(ProjectStore
+ .GetChunkRange("proj1"sv,
+ "oplog1"sv,
+ OidAsString(Attachments[OpIds[2]][1].first),
+ 0,
+ ~0ull,
+ HttpContentType::kCompressedBinary,
+ ChunkResult,
+ ContentType,
+ &FullChunkModificationTag)
+ .first == HttpResponseCode::OK);
+ CHECK(ChunkResult);
+ CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult)).DecodeRawSize() ==
+ Attachments[OpIds[2]][1].second.DecodeRawSize());
+ }
+ {
+ CompositeBuffer ChunkResult;
+ HttpContentType ContentType;
+ CHECK(ProjectStore
+ .GetChunkRange("proj1"sv,
+ "oplog1"sv,
+ OidAsString(Attachments[OpIds[2]][1].first),
+ 0,
+ ~0ull,
+ HttpContentType::kCompressedBinary,
+ ChunkResult,
+ ContentType,
+ &FullChunkModificationTag)
+ .first == HttpResponseCode::NotModified);
+ }
+ }
+ {
+ CompositeBuffer PartialChunkResult;
+ uint64_t PartialChunkModificationTag = 0;
+ {
+ CompositeBuffer ChunkResult;
+ HttpContentType ContentType;
+ CHECK(ProjectStore
+ .GetChunkRange("proj1"sv,
+ "oplog1"sv,
+ OidAsString(Attachments[OpIds[2]][1].first),
+ 5,
+ 1773,
+ HttpContentType::kCompressedBinary,
+ PartialChunkResult,
+ ContentType,
+ &PartialChunkModificationTag)
+ .first == HttpResponseCode::OK);
+ CHECK(PartialChunkResult);
+ IoHash PartialRawHash;
+ uint64_t PartialRawSize;
+ CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(PartialChunkResult, PartialRawHash, PartialRawSize);
+ CHECK(PartialRawSize >= 1773);
+
+ uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5);
+ SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed);
+ SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress();
+ const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]);
+ const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData());
+ CHECK(FullDataPtr[0] == PartialDataPtr[0]);
+ }
+
+ {
+ CompositeBuffer ChunkResult;
+ HttpContentType ContentType;
+ CHECK(ProjectStore
+ .GetChunkRange("proj1"sv,
+ "oplog1"sv,
+ OidAsString(Attachments[OpIds[2]][1].first),
+ 5,
+ 1773,
+ HttpContentType::kCompressedBinary,
+ PartialChunkResult,
+ ContentType,
+ &PartialChunkModificationTag)
+ .first == HttpResponseCode::NotModified);
+ }
+ }
}
TEST_CASE("project.store.block")
@@ -6808,13 +8366,14 @@ TEST_CASE("project.store.iterateoplog")
ScopedTemporaryDirectory TempDir;
auto JobQueue = MakeJobQueue(1, ""sv);
+ OpenProcessCache ProcessCache;
GcManager Gc;
CidStore CidStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "enginesv";
@@ -6851,7 +8410,7 @@ TEST_CASE("project.store.iterateoplog")
TestOidData TestOids[NumTestOids];
for (const TestOidData& TestOid : TestOids)
{
- Oplog->AppendNewOplogEntry(CreateOplogPackage(TestOid.KeyAsOidNotOplogId, {}));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(TestOid.KeyAsOidNotOplogId, {}));
}
int Count = 0;
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 860f2c17d..8f2d3ce0d 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -22,6 +22,7 @@ class CidStore;
class AuthMgr;
class ScrubContext;
class JobQueue;
+class OpenProcessCache;
enum class HttpResponseCode;
@@ -67,7 +68,12 @@ public:
{
};
- ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue, const Configuration& Config);
+ ProjectStore(CidStore& Store,
+ std::filesystem::path BasePath,
+ GcManager& Gc,
+ JobQueue& JobQueue,
+ OpenProcessCache& InOpenProcessCache,
+ const Configuration& Config);
~ProjectStore();
struct Project;
@@ -114,16 +120,18 @@ public:
std::optional<CbObject> GetOpByIndex(uint32_t Index);
std::optional<uint32_t> GetOpIndexByKey(const Oid& Key);
- IoBuffer FindChunk(const Oid& ChunkId);
+ IoBuffer FindChunk(const Oid& ChunkId, uint64_t* OptOutModificationTag);
IoBuffer GetChunkByRawHash(const IoHash& RawHash);
- bool IterateChunks(std::span<IoHash> RawHashes,
- const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool,
- uint64_t LargeSizeLimit);
- bool IterateChunks(std::span<Oid> ChunkIds,
- const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
- WorkerThreadPool* OptionalWorkerPool,
- uint64_t LargeSizeLimit);
+ bool IterateChunks(std::span<IoHash> RawHashes,
+ bool IncludeModTag,
+ const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit);
+ bool IterateChunks(std::span<Oid> ChunkIds,
+ bool IncludeModTag,
+ const std::function<bool(size_t Index, const IoBuffer& Payload, uint64_t ModTag)>& AsyncCallback,
+ WorkerThreadPool* OptionalWorkerPool,
+ uint64_t LargeSizeLimit);
inline static const uint32_t kInvalidOp = ~0u;
/** Persist a new oplog entry
@@ -427,7 +435,8 @@ public:
uint64_t Size,
ZenContentType AcceptType,
CompositeBuffer& OutChunk,
- ZenContentType& OutContentType);
+ ZenContentType& OutContentType,
+ uint64_t* OptionalInOutModificationTag);
std::pair<HttpResponseCode, std::string> GetChunkRange(const std::string_view ProjectId,
const std::string_view OplogId,
const std::string_view ChunkId,
@@ -435,12 +444,14 @@ public:
uint64_t Size,
ZenContentType AcceptType,
CompositeBuffer& OutChunk,
- ZenContentType& OutContentType);
+ ZenContentType& OutContentType,
+ uint64_t* OptionalInOutModificationTag);
std::pair<HttpResponseCode, std::string> GetChunk(const std::string_view ProjectId,
const std::string_view OplogId,
const std::string_view Cid,
ZenContentType AcceptType,
- IoBuffer& OutChunk);
+ IoBuffer& OutChunk,
+ uint64_t* OptionalInOutModificationTag);
std::pair<HttpResponseCode, std::string> PutChunk(const std::string_view ProjectId,
const std::string_view OplogId,
@@ -458,6 +469,11 @@ public:
const HttpServerRequest::QueryParams& Params,
CbObject& OutResponse);
+ std::pair<HttpResponseCode, std::string> GetChunks(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const CbObject& RequestObject,
+ CbPackage& OutResponsePackage);
+
bool Rpc(HttpServerRequest& HttpReq,
const std::string_view ProjectId,
const std::string_view OplogId,
@@ -485,6 +501,7 @@ private:
GcManager& m_Gc;
CidStore& m_CidStore;
JobQueue& m_JobQueue;
+ OpenProcessCache& m_OpenProcessCache;
std::filesystem::path m_ProjectBasePath;
const Configuration m_Config;
mutable RwLock m_ProjectsLock;
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index c0082b746..6b05442b3 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -152,14 +152,22 @@ public:
{
CbObjectWriter RequestWriter;
RequestWriter.AddString("method"sv, "getchunks"sv);
- RequestWriter.BeginArray("chunks"sv);
+ RequestWriter.BeginObject("Request"sv);
{
- for (const IoHash& RawHash : RawHashes)
+ RequestWriter.BeginArray("Chunks"sv);
{
- RequestWriter.AddHash(RawHash);
+ for (const IoHash& RawHash : RawHashes)
+ {
+ RequestWriter.BeginObject();
+ {
+ RequestWriter.AddHash("RawHash", RawHash);
+ }
+ RequestWriter.EndObject();
+ }
}
+ RequestWriter.EndArray(); // "chunks"
}
- RequestWriter.EndArray(); // "chunks"
+ RequestWriter.EndObject();
Request = RequestWriter.Save();
}
diff --git a/src/zenserver/vfs/vfsimpl.cpp b/src/zenserver/vfs/vfsimpl.cpp
index 6e14b7632..2bac6b756 100644
--- a/src/zenserver/vfs/vfsimpl.cpp
+++ b/src/zenserver/vfs/vfsimpl.cpp
@@ -47,7 +47,8 @@ VfsOplogDataSource::ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t Byt
~0ull,
ZenContentType::kCompressedBinary,
/* out */ ChunkBuffer,
- /* out */ ContentType);
+ /* out */ ContentType,
+ /* OptionalInOutModificationTag */ nullptr);
if (Result.first == HttpResponseCode::OK)
{
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 3714dfaeb..66b6cb858 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -243,7 +243,12 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
ZEN_INFO("instantiating project service");
- m_ProjectStore = new ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue, ProjectStore::Configuration{});
+ m_ProjectStore = new ProjectStore(*m_CidStore,
+ m_DataRoot / "projects",
+ m_GcManager,
+ *m_JobQueue,
+ *m_OpenProcessCache,
+ ProjectStore::Configuration{});
m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr});
if (ServerOptions.WorksSpacesConfig.Enabled)
@@ -534,7 +539,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold = 128 * 1024 * 1024;
}
- m_CacheStore = new ZenCacheStore(m_GcManager, *m_JobQueue, m_DataRoot / "cache", Config, m_GcManager.GetDiskWriteBlocker());
+ m_CacheStore = new ZenCacheStore(m_GcManager, *m_JobQueue, m_DataRoot / "cache", Config, m_GcManager.GetDiskWriteBlocker());
+ m_OpenProcessCache = std::make_unique<OpenProcessCache>();
const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
@@ -616,7 +622,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
m_StatsService,
m_StatusService,
*m_UpstreamCache,
- m_GcManager.GetDiskWriteBlocker());
+ m_GcManager.GetDiskWriteBlocker(),
+ *m_OpenProcessCache);
m_Http->RegisterService(*m_StructuredCacheService);
m_Http->RegisterService(*m_UpstreamService);
@@ -797,6 +804,7 @@ ZenServer::Cleanup()
m_UpstreamService.reset();
m_UpstreamCache.reset();
m_CacheStore = {};
+ m_OpenProcessCache.reset();
m_HttpWorkspacesService.reset();
m_Workspaces.reset();
diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h
index b9d12689d..80054dc35 100644
--- a/src/zenserver/zenserver.h
+++ b/src/zenserver/zenserver.h
@@ -115,17 +115,18 @@ private:
inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; }
static std::string_view ToString(ServerState Value);
- StatsReporter m_StatsReporter;
- Ref<HttpServer> m_Http;
- std::unique_ptr<AuthMgr> m_AuthMgr;
- std::unique_ptr<HttpAuthService> m_AuthService;
- HttpStatusService m_StatusService;
- HttpStatsService m_StatsService;
- GcManager m_GcManager;
- GcScheduler m_GcScheduler{m_GcManager};
- std::unique_ptr<CidStore> m_CidStore;
- Ref<ZenCacheStore> m_CacheStore;
- HttpTestService m_TestService;
+ StatsReporter m_StatsReporter;
+ Ref<HttpServer> m_Http;
+ std::unique_ptr<AuthMgr> m_AuthMgr;
+ std::unique_ptr<HttpAuthService> m_AuthService;
+ HttpStatusService m_StatusService;
+ HttpStatsService m_StatsService;
+ GcManager m_GcManager;
+ GcScheduler m_GcScheduler{m_GcManager};
+ std::unique_ptr<CidStore> m_CidStore;
+ Ref<ZenCacheStore> m_CacheStore;
+ std::unique_ptr<OpenProcessCache> m_OpenProcessCache;
+ HttpTestService m_TestService;
#if ZEN_WITH_TESTS
HttpTestingService m_TestingService;
#endif
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index a4f9fe78b..851b1d125 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -34,7 +34,7 @@ GetCacheDiskTag()
return _;
}
-namespace {
+namespace cache::impl {
#pragma pack(push)
#pragma pack(1)
@@ -224,11 +224,15 @@ namespace {
zen::Sleep(100);
} while (true);
}
-} // namespace
+} // namespace cache::impl
namespace fs = std::filesystem;
using namespace std::literals;
+} // namespace zen
+
+namespace zen::cache::impl {
+
class BucketManifestSerializer
{
using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex;
@@ -571,7 +575,8 @@ BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& B
if (Header.EntryCount > ExpectedEntryCount)
{
ZEN_WARN(
- "Failed to read sidecar file '{}'. File is not large enough to hold expected entry count. Header count: {}, file size count: "
+ "Failed to read sidecar file '{}'. File is not large enough to hold expected entry count. Header count: {}, file size "
+ "count: "
"{}",
SidecarPath,
Header.EntryCount,
@@ -695,6 +700,12 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&,
static const float IndexMinLoadFactor = 0.2f;
static const float IndexMaxLoadFactor = 0.7f;
+} // namespace zen::cache::impl
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
std::atomic_uint64_t& OuterCacheMemoryUsage,
std::string BucketName,
@@ -705,8 +716,8 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
, m_Configuration(Config)
, m_BucketId(Oid::Zero)
{
- m_Index.min_load_factor(IndexMinLoadFactor);
- m_Index.max_load_factor(IndexMaxLoadFactor);
+ m_Index.min_load_factor(cache::impl::IndexMinLoadFactor);
+ m_Index.max_load_factor(cache::impl::IndexMaxLoadFactor);
if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap")))
{
@@ -750,11 +761,11 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
CreateDirectories(m_BucketDir);
- std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
+ std::filesystem::path ManifestPath = cache::impl::GetManifestPath(m_BucketDir, m_BucketName);
bool IsNew = false;
- BucketManifestSerializer ManifestReader;
+ cache::impl::BucketManifestSerializer ManifestReader;
if (ManifestReader.Open(ManifestPath))
{
@@ -770,7 +781,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
ZEN_INFO("Wiping bucket '{}', found version {}, required version {}",
BucketDir,
Version,
- BucketManifestSerializer::CurrentDiskBucketVersion);
+ cache::impl::BucketManifestSerializer::CurrentDiskBucketVersion);
IsNew = true;
}
}
@@ -824,11 +835,11 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition,
namespace fs = std::filesystem;
- fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ fs::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName);
try
{
- const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry);
+ const uint64_t IndexSize = sizeof(cache::impl::CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry);
std::error_code Error;
DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
if (Error)
@@ -862,14 +873,14 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition,
// all data is written to the file
BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024);
- CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
- .LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
+ cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
- Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
- IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
+ Header.Checksum = cache::impl::CacheBucketIndexHeader::ComputeChecksum(Header);
+ IndexWriter.Write(&Header, sizeof(cache::impl::CacheBucketIndexHeader), 0);
- uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader);
+ uint64_t IndexWriteOffset = sizeof(cache::impl::CacheBucketIndexHeader);
for (auto& Entry : m_Index)
{
@@ -896,7 +907,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition,
// We must only update the log flush position once the snapshot write succeeds
if (FlushLockPosition)
{
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName);
if (std::filesystem::is_regular_file(LogPath))
{
@@ -938,12 +949,12 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
BasicFile ObjectIndexFile;
ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
uint64_t FileSize = ObjectIndexFile.FileSize();
- if (FileSize < sizeof(CacheBucketIndexHeader))
+ if (FileSize < sizeof(cache::impl::CacheBucketIndexHeader))
{
return 0;
}
- CacheBucketIndexHeader Header;
+ cache::impl::CacheBucketIndexHeader Header;
ObjectIndexFile.Read(&Header, sizeof(Header), 0);
if (!Header.IsValid())
@@ -951,12 +962,12 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
return 0;
}
- if (Header.Version != CacheBucketIndexHeader::Version2)
+ if (Header.Version != cache::impl::CacheBucketIndexHeader::Version2)
{
return 0;
}
- const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(cache::impl::CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
if (Header.EntryCount > ExpectedEntryCount)
{
return 0;
@@ -977,7 +988,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024);
- uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader);
+ uint64_t CurrentReadOffset = sizeof(cache::impl::CacheBucketIndexHeader);
uint64_t RemainingEntryCount = Header.EntryCount;
std::string InvalidEntryReason;
@@ -986,7 +997,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset);
CurrentReadOffset += sizeof(DiskIndexEntry);
- if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason))
+ if (!cache::impl::ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason))
{
ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
continue;
@@ -1003,7 +1014,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount()));
- OutVersion = CacheBucketIndexHeader::Version2;
+ OutVersion = cache::impl::CacheBucketIndexHeader::Version2;
return Header.LogPosition;
}
@@ -1050,7 +1061,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::
return;
}
- if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
+ if (!cache::impl::ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
{
ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
++InvalidEntryCount;
@@ -1087,8 +1098,8 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
m_MemCachedPayloads.clear();
m_FreeMemCachedPayloads.clear();
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
- std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName);
if (IsNew)
{
@@ -1920,7 +1931,7 @@ ZenCacheDiskLayer::CacheBucket::Drop()
m_BlockStore.Close();
m_SlogFile.Close();
- bool Deleted = MoveAndDeleteDirectory(m_BucketDir);
+ const bool Deleted = cache::impl::MoveAndDeleteDirectory(m_BucketDir);
m_Index.clear();
m_Payloads.clear();
@@ -1970,8 +1981,8 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
{
bool UseLegacyScheme = false;
- IoBuffer Buffer;
- BucketManifestSerializer ManifestWriter;
+ IoBuffer Buffer;
+ cache::impl::BucketManifestSerializer ManifestWriter;
if (UseLegacyScheme)
{
@@ -2051,7 +2062,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
}
ManifestWriter.WriteSidecarFile(IndexLock,
- GetMetaPath(m_BucketDir, m_BucketName),
+ cache::impl::GetMetaPath(m_BucketDir, m_BucketName),
m_LogFlushPosition,
m_Index,
m_AccessTimes,
@@ -2059,7 +2070,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
m_MetaDatas);
}
- std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
+ const std::filesystem::path ManifestPath = cache::impl::GetManifestPath(m_BucketDir, m_BucketName);
TemporaryFile::SafeWriteFile(ManifestPath, Buffer.GetView());
}
catch (const std::exception& Err)
@@ -2733,7 +2744,7 @@ public:
Stopwatch Timer;
const auto _ = MakeGuard([&] {
- Reset(m_ExpiredStandaloneKeys);
+ cache::impl::Reset(m_ExpiredStandaloneKeys);
if (!Ctx.Settings.Verbose)
{
return;
@@ -3101,7 +3112,7 @@ ZenCacheDiskLayer::CacheBucket::ReadAttachmentsFromMetaData(uint32_t BlockI
ZEN_TRACE_CPU("Z$::Bucket::GetAttachmentsFromMetaData");
return GetAttachmentsFromMetaData<IoHash, IoHash>(
MetaDataPayload,
- BlockMetaDataExpectedMagic,
+ cache::impl::BlockMetaDataExpectedMagic,
[&](std::span<const IoHash> Keys, std::span<const uint32_t> AttachmentCounts, std::span<const IoHash> Attachments) {
auto AttachmentReadIt = Attachments.begin();
OutReferences.resize(OutReferences.size() + Attachments.size());
@@ -3278,7 +3289,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferences(const LoggerRef& Logger,
ZEN_ASSERT(Keys.size() == AttachmentCounts.size());
IoBuffer MetaDataPayload =
BuildReferenceMetaData<IoHash>(
- BlockMetaDataExpectedMagic,
+ cache::impl::BlockMetaDataExpectedMagic,
Keys,
AttachmentCounts,
std::span<const IoHash>(OutReferences)
@@ -3485,8 +3496,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
Payloads.reserve(EntryCount);
AccessTimes.reserve(EntryCount);
Index.reserve(EntryCount);
- Index.min_load_factor(IndexMinLoadFactor);
- Index.max_load_factor(IndexMaxLoadFactor);
+ Index.min_load_factor(cache::impl::IndexMinLoadFactor);
+ Index.max_load_factor(cache::impl::IndexMaxLoadFactor);
for (auto It : m_Index)
{
PayloadIndex EntryIndex = PayloadIndex(Payloads.size());
@@ -3510,9 +3521,9 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
m_Payloads.swap(Payloads);
m_AccessTimes.swap(AccessTimes);
m_MetaDatas.swap(MetaDatas);
- Reset(m_FreeMetaDatas);
+ cache::impl::Reset(m_FreeMetaDatas);
m_MemCachedPayloads.swap(MemCachedPayloads);
- Reset(m_FreeMemCachedPayloads);
+ cache::impl::Reset(m_FreeMemCachedPayloads);
}
RwLock::SharedLockScope
@@ -3963,7 +3974,7 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
// Make sure we remove the folder even if we don't know about the bucket
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= std::string(InBucket);
- return MoveAndDeleteDirectory(BucketPath);
+ return cache::impl::MoveAndDeleteDirectory(BucketPath);
}
bool
@@ -3986,7 +3997,7 @@ ZenCacheDiskLayer::Drop()
return false;
}
}
- return MoveAndDeleteDirectory(m_RootDir);
+ return cache::impl::MoveAndDeleteDirectory(m_RootDir);
}
void
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index f85b19264..50af7246e 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -65,7 +65,7 @@ struct CasDiskIndexHeader
static_assert(sizeof(CasDiskIndexHeader) == 32);
-namespace {
+namespace cas::impl {
const char* IndexExtension = ".uidx";
const char* LogExtension = ".ulog";
@@ -124,17 +124,17 @@ namespace {
return true;
}
-} // namespace
+} // namespace cas::impl
//////////////////////////////////////////////////////////////////////////
-static const float IndexMinLoadFactor = 0.2f;
-static const float IndexMaxLoadFactor = 0.7f;
-
CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("containercas")), m_Gc(Gc)
{
ZEN_MEMSCOPE(GetCasContainerTag());
+ const float IndexMinLoadFactor = 0.2f;
+ const float IndexMaxLoadFactor = 0.7f;
+
m_LocationMap.min_load_factor(IndexMinLoadFactor);
m_LocationMap.max_load_factor(IndexMaxLoadFactor);
@@ -165,7 +165,7 @@ CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory,
m_ContainerBaseName = ContainerBaseName;
m_PayloadAlignment = Alignment;
m_MaxBlockSize = MaxBlockSize;
- m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName);
+ m_BlocksBasePath = cas::impl::GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName);
OpenContainer(IsNewStore);
@@ -329,31 +329,33 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
{
ZEN_MEMSCOPE(GetCasContainerTag());
- const size_t ChunkCount = ChunkHashes.size();
- if (ChunkCount < 3)
+ const size_t ChunkCount = ChunkHashes.size();
+ std::vector<size_t> FoundChunkIndexes;
+ std::vector<BlockStoreLocation> FoundChunkLocations;
+ FoundChunkIndexes.reserve(ChunkCount);
+ FoundChunkLocations.reserve(ChunkCount);
{
+ RwLock::SharedLockScope _(m_LocationMapLock);
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++)
{
- IoBuffer Chunk = FindChunk(ChunkHashes[ChunkIndex]);
- if (!AsyncCallback(ChunkIndex, Chunk))
+ if (auto KeyIt = m_LocationMap.find(ChunkHashes[ChunkIndex]); KeyIt != m_LocationMap.end())
{
- return false;
+ FoundChunkIndexes.push_back(ChunkIndex);
+ FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment));
}
}
- return true;
}
- std::vector<size_t> FoundChunkIndexes;
- std::vector<BlockStoreLocation> FoundChunkLocations;
- FoundChunkIndexes.reserve(ChunkCount);
- FoundChunkLocations.reserve(ChunkCount);
- RwLock::SharedLockScope _(m_LocationMapLock);
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++)
+ if (FoundChunkLocations.size() < 3)
{
- if (auto KeyIt = m_LocationMap.find(ChunkHashes[ChunkIndex]); KeyIt != m_LocationMap.end())
+ for (size_t ChunkIndex : FoundChunkIndexes)
{
- FoundChunkIndexes.push_back(ChunkIndex);
- FoundChunkLocations.push_back(m_Locations[KeyIt->second].Get(m_PayloadAlignment));
+ IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]);
+ if (!AsyncCallback(ChunkIndex, Chunk))
+ {
+ return false;
+ }
}
+ return true;
}
auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) {
@@ -919,8 +921,8 @@ CasContainerStrategy::MakeIndexSnapshot()
namespace fs = std::filesystem;
- fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
- fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName);
+ fs::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName);
+ fs::path TempIndexPath = cas::impl::GetTempIndexPath(m_RootDirectory, m_ContainerBaseName);
// Move index away, we keep it if something goes wrong
if (fs::is_regular_file(TempIndexPath))
@@ -1054,7 +1056,7 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint
std::string InvalidEntryReason;
for (const CasDiskIndexEntry& Entry : Entries)
{
- if (!ValidateEntry(Entry, InvalidEntryReason))
+ if (!cas::impl::ValidateEntry(Entry, InvalidEntryReason))
{
ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
continue;
@@ -1121,7 +1123,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski
m_LocationMap.erase(Record.Key);
return;
}
- if (!ValidateEntry(Record, InvalidEntryReason))
+ if (!cas::impl::ValidateEntry(Record, InvalidEntryReason))
{
ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
return;
@@ -1147,7 +1149,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
m_LocationMap.clear();
m_Locations.clear();
- std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName);
+ std::filesystem::path BasePath = cas::impl::GetBasePath(m_RootDirectory, m_ContainerBaseName);
if (IsNewStore)
{
@@ -1158,8 +1160,8 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);
- std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
- std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LogPath = cas::impl::GetLogPath(m_RootDirectory, m_ContainerBaseName);
+ std::filesystem::path IndexPath = cas::impl::GetIndexPath(m_RootDirectory, m_ContainerBaseName);
if (std::filesystem::is_regular_file(IndexPath))
{
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 339e5de0a..82dbe3551 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -57,16 +57,14 @@ GetFileCasTag()
return _;
}
-namespace {
+namespace filecas::impl {
template<typename T>
void Reset(T& V)
{
T Tmp;
V.swap(Tmp);
}
-} // namespace
-namespace filecas::impl {
const char* IndexExtension = ".uidx";
const char* LogExtension = ".ulog";
@@ -141,11 +139,11 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo
//////////////////////////////////////////////////////////////////////////
-static const float IndexMinLoadFactor = 0.2f;
-static const float IndexMaxLoadFactor = 0.7f;
-
FileCasStrategy::FileCasStrategy(GcManager& Gc) : m_Log(logging::Get("filecas")), m_Gc(Gc)
{
+ static const float IndexMinLoadFactor = 0.2f;
+ static const float IndexMaxLoadFactor = 0.7f;
+
m_Index.min_load_factor(IndexMinLoadFactor);
m_Index.max_load_factor(IndexMaxLoadFactor);
@@ -1237,7 +1235,7 @@ public:
Stopwatch Timer;
const auto _ = MakeGuard([&] {
- Reset(m_ReferencesToClean);
+ filecas::impl::Reset(m_ReferencesToClean);
if (!Ctx.Settings.Verbose)
{
return;
@@ -1353,7 +1351,7 @@ public:
}
}
- Reset(m_ReferencesToClean);
+ filecas::impl::Reset(m_ReferencesToClean);
}
virtual std::string GetGcName(GcCtx& Ctx) override { return m_FileCasStrategy.GetGcName(Ctx); }
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 711b96c8f..b0b4f22cb 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -17,6 +17,10 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+namespace cache::impl {
+ class BucketManifestSerializer;
+}
+
class IoBuffer;
class JobQueue;
@@ -446,7 +450,7 @@ public:
friend class DiskBucketReferenceChecker;
friend class DiskBucketStoreCompactor;
- friend class BucketManifestSerializer;
+ friend class cache::impl::BucketManifestSerializer;
};
private: