aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/buildstore/buildstore.cpp6
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp14
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp12
-rw-r--r--src/zenstore/cidstore.cpp12
-rw-r--r--src/zenstore/compactcas.cpp19
-rw-r--r--src/zenstore/filecas.cpp6
-rw-r--r--src/zenstore/gc.cpp10
-rw-r--r--src/zenstore/include/zenstore/cache/cachepolicy.h10
-rw-r--r--src/zenstore/include/zenstore/cidstore.h22
-rw-r--r--src/zenstore/include/zenstore/memorycidstore.h68
-rw-r--r--src/zenstore/include/zenstore/projectstore.h13
-rw-r--r--src/zenstore/include/zenstore/zenstore.h37
-rw-r--r--src/zenstore/memorycidstore.cpp143
-rw-r--r--src/zenstore/projectstore.cpp87
-rw-r--r--src/zenstore/workspaces.cpp23
-rw-r--r--src/zenstore/zenstore.cpp20
16 files changed, 387 insertions, 115 deletions
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index dff1c3c61..a08741d31 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -1052,7 +1052,7 @@ public:
{
ZEN_TRACE_CPU("Builds::PreCache");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -1107,7 +1107,7 @@ public:
ZEN_TRACE_CPU("Builds::GetUnusedReferences");
ZEN_MEMSCOPE(GetBuildstoreTag());
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
size_t InitialCount = IoCids.size();
size_t UsedCount = InitialCount;
@@ -1152,7 +1152,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
ZEN_TRACE_CPU("Builds::RemoveExpiredData");
ZEN_MEMSCOPE(GetBuildstoreTag());
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 4640309d9..45a4b6456 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -3083,7 +3083,7 @@ public:
{
ZEN_TRACE_CPU("Z$::Bucket::CompactStore");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3338,7 +3338,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
{
ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
size_t TotalEntries = 0;
@@ -3502,7 +3502,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferences(const LoggerRef& Logger,
{
ZEN_TRACE_CPU("Z$::Bucket::GetReferencesLocked");
- auto Log = [&Logger]() { return Logger; };
+ ZEN_SCOPED_LOG(Logger);
auto GetAttachments = [&](const IoHash& RawHash, MemoryView Data) -> bool {
if (CbValidateError Error = ValidateCompactBinary(Data, CbValidateMode::Default); Error == CbValidateError::None)
@@ -3718,7 +3718,7 @@ public:
{
ZEN_TRACE_CPU("Z$::Bucket::PreCache");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3753,7 +3753,7 @@ public:
{
ZEN_TRACE_CPU("Z$::Bucket::UpdateLockedState");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3784,7 +3784,7 @@ public:
{
ZEN_TRACE_CPU("Z$::Bucket::GetUnusedReferences");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
const size_t InitialCount = IoCids.size();
size_t UsedCount = InitialCount;
@@ -3818,7 +3818,7 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
{
ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index cff0e9a35..97b793083 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -468,7 +468,7 @@ ZenCacheStore::LogWorker()
LoggerRef ZCacheLog(logging::Get("z$"));
- auto Log = [&ZCacheLog]() -> LoggerRef { return ZCacheLog; };
+ ZEN_SCOPED_LOG(ZCacheLog);
std::vector<AccessLogItem> Items;
while (true)
@@ -1086,11 +1086,9 @@ ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view Bu
std::vector<RwLock::SharedLockScope>
ZenCacheStore::LockState(GcCtx& Ctx)
{
+ ZEN_UNUSED(Ctx);
ZEN_TRACE_CPU("CacheStore::LockState");
- auto Log = [&Ctx]() { return Ctx.Logger; };
- ZEN_UNUSED(Log);
-
std::vector<RwLock::SharedLockScope> Locks;
Locks.emplace_back(RwLock::SharedLockScope(m_NamespacesLock));
for (auto& NamespaceIt : m_Namespaces)
@@ -1211,7 +1209,7 @@ public:
{
ZEN_TRACE_CPU("Z$::UpdateLockedState");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
@@ -1276,7 +1274,7 @@ public:
{
ZEN_TRACE_CPU("Z$::GetUnusedReferences");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
const size_t InitialCount = IoCids.size();
size_t UsedCount = InitialCount;
@@ -1309,7 +1307,7 @@ ZenCacheStore::CreateReferenceCheckers(GcCtx& Ctx)
{
ZEN_TRACE_CPU("CacheStore::CreateReferenceCheckers");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp
index b20d8f565..ac8a75a58 100644
--- a/src/zenstore/cidstore.cpp
+++ b/src/zenstore/cidstore.cpp
@@ -188,12 +188,24 @@ CidStore::Initialize(const CidStoreConfiguration& Config)
}
CidStore::InsertResult
+CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash)
+{
+ return m_Impl->AddChunk(ChunkData, RawHash, InsertMode::kMayBeMovedInPlace);
+}
+
+CidStore::InsertResult
CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode)
{
return m_Impl->AddChunk(ChunkData, RawHash, Mode);
}
std::vector<CidStore::InsertResult>
+CidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes)
+{
+ return m_Impl->AddChunks(ChunkDatas, RawHashes, InsertMode::kMayBeMovedInPlace);
+}
+
+std::vector<CidStore::InsertResult>
CidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, InsertMode Mode)
{
return m_Impl->AddChunks(ChunkDatas, RawHashes, Mode);
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 43dc389e2..6f1e1d701 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -698,7 +698,7 @@ public:
ZEN_TRACE_CPU("CasContainer::CompactStore");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -875,7 +875,7 @@ public:
ZEN_MEMSCOPE(GetCasContainerTag());
ZEN_TRACE_CPU("CasContainer::RemoveUnreferencedData");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -958,7 +958,7 @@ CasContainerStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&)
ZEN_MEMSCOPE(GetCasContainerTag());
ZEN_TRACE_CPU("CasContainer::CreateReferencePruner");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -1391,7 +1391,7 @@ TEST_CASE("compactcas.compact.gc")
{
ScopedTemporaryDirectory TempDir;
- const int kIterationCount = 1000;
+ const int kIterationCount = 200;
std::vector<IoHash> Keys(kIterationCount);
@@ -1504,7 +1504,7 @@ TEST_CASE("compactcas.threadedinsert")
ScopedTemporaryDirectory TempDir;
const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 2048;
+ const int32_t kChunkCount = 512;
uint64_t ExpectedSize = 0;
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> Chunks;
@@ -1803,7 +1803,7 @@ TEST_CASE("compactcas.restart")
}
const uint64_t kChunkSize = 1048 + 395;
- const size_t kChunkCount = 7167;
+ const size_t kChunkCount = 2000;
std::vector<IoHash> Hashes;
Hashes.reserve(kChunkCount);
@@ -1984,9 +1984,8 @@ TEST_CASE("compactcas.iteratechunks")
WorkerThreadPool ThreadPool(Max(GetHardwareConcurrency() - 1u, 2u), "put");
const uint64_t kChunkSize = 1048 + 395;
- const size_t kChunkCount = 63840;
+ const size_t kChunkCount = 10000;
- for (uint32_t N = 0; N < 2; N++)
{
GcManager Gc;
CasContainerStrategy Cas(Gc);
@@ -2017,7 +2016,7 @@ TEST_CASE("compactcas.iteratechunks")
size_t BatchCount = Min<size_t>(kChunkCount - Offset, 512u);
WorkLatch.AddCount(1);
ThreadPool.ScheduleWork(
- [N, &WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount]() {
+ [&WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount]() {
auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
std::vector<IoBuffer> BatchBlobs;
@@ -2028,7 +2027,7 @@ TEST_CASE("compactcas.iteratechunks")
while (BatchBlobs.size() < BatchCount)
{
IoBuffer Chunk = CreateRandomBlob(
- N + kChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377));
+ kChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377));
IoHash Hash = IoHash::HashBuffer(Chunk);
{
RwLock::ExclusiveLockScope __(InsertLock);
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 0088afe6e..3a7a72ee3 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -1231,7 +1231,7 @@ public:
ZEN_MEMSCOPE(GetFileCasTag());
ZEN_TRACE_CPU("FileCas::CompactStore");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -1375,7 +1375,7 @@ public:
ZEN_MEMSCOPE(GetFileCasTag());
ZEN_TRACE_CPU("FileCas::RemoveUnreferencedData");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -1458,7 +1458,7 @@ FileCasStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&)
ZEN_TRACE_CPU("FileCas::CreateReferencePruner");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index f3edf804d..928fc3f08 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -546,7 +546,7 @@ FilterReferences(GcCtx& Ctx, std::string_view Context, std::vector<IoHash>& InOu
return false;
}
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
const bool Filter = Ctx.Settings.AttachmentRangeMax != IoHash::Max || Ctx.Settings.AttachmentRangeMin != IoHash::Zero;
@@ -2063,6 +2063,14 @@ GcScheduler::GetState() const
{
{
std::unique_lock Lock(m_GcMutex);
+
+ if (m_TriggerGcParams || m_TriggerScrubParams)
+ {
+ // If a trigger is pending, treat it as running
+ Result.Status = GcSchedulerStatus::kRunning;
+ return Result;
+ }
+
Result.LastFullGcTime = m_LastGcTime;
Result.LastFullGCDiff = m_LastFullGCDiff;
Result.LastFullGcDuration = m_LastFullGcDuration;
diff --git a/src/zenstore/include/zenstore/cache/cachepolicy.h b/src/zenstore/include/zenstore/cache/cachepolicy.h
index 7773cd3d1..4a062a0c2 100644
--- a/src/zenstore/include/zenstore/cache/cachepolicy.h
+++ b/src/zenstore/include/zenstore/cache/cachepolicy.h
@@ -163,9 +163,9 @@ private:
friend class CacheRecordPolicyBuilder;
friend class OptionalCacheRecordPolicy;
- CachePolicy RecordPolicy = CachePolicy::Default;
- CachePolicy DefaultValuePolicy = CachePolicy::Default;
- RefPtr<const Private::ICacheRecordPolicyShared> Shared;
+ CachePolicy RecordPolicy = CachePolicy::Default;
+ CachePolicy DefaultValuePolicy = CachePolicy::Default;
+ Ref<const Private::ICacheRecordPolicyShared> Shared;
};
/** A cache record policy builder is used to construct a cache record policy. */
@@ -186,8 +186,8 @@ public:
CacheRecordPolicy Build();
private:
- CachePolicy BasePolicy = CachePolicy::Default;
- RefPtr<Private::ICacheRecordPolicyShared> Shared;
+ CachePolicy BasePolicy = CachePolicy::Default;
+ Ref<Private::ICacheRecordPolicyShared> Shared;
};
/**
diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h
index d54062476..c00e0449f 100644
--- a/src/zenstore/include/zenstore/cidstore.h
+++ b/src/zenstore/include/zenstore/cidstore.h
@@ -58,16 +58,14 @@ struct CidStoreConfiguration
*
*/
-class CidStore final : public ChunkResolver, public StatsProvider
+class CidStore final : public ChunkStore, public StatsProvider
{
public:
CidStore(GcManager& Gc);
~CidStore();
- struct InsertResult
- {
- bool New = false;
- };
+ using InsertResult = ChunkStore::InsertResult;
+
enum class InsertMode
{
kCopyOnly,
@@ -75,17 +73,17 @@ public:
};
void Initialize(const CidStoreConfiguration& Config);
- InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace);
- std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas,
- std::span<IoHash> RawHashes,
- InsertMode Mode = InsertMode::kMayBeMovedInPlace);
- virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+ InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) override;
+ InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode);
+ std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) override;
+ std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, InsertMode Mode);
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
bool IterateChunks(std::span<IoHash> DecompressedIds,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
WorkerThreadPool* OptionalWorkerPool,
uint64_t LargeSizeLimit);
- bool ContainsChunk(const IoHash& DecompressedId);
- void FilterChunks(HashKeySet& InOutChunks);
+ bool ContainsChunk(const IoHash& DecompressedId) override;
+ void FilterChunks(HashKeySet& InOutChunks) override;
void Flush();
CidStoreSize TotalSize() const;
CidStoreStats Stats() const;
diff --git a/src/zenstore/include/zenstore/memorycidstore.h b/src/zenstore/include/zenstore/memorycidstore.h
new file mode 100644
index 000000000..0311274d5
--- /dev/null
+++ b/src/zenstore/include/zenstore/memorycidstore.h
@@ -0,0 +1,68 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "cidstore.h"
+#include "zenstore.h"
+
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/thread.h>
+
+#include <deque>
+#include <span>
+#include <thread>
+#include <unordered_map>
+
+namespace zen {
+
+class HashKeySet;
+
+/** Memory-backed chunk store.
+ *
+ * Stores chunks in an in-memory hash map, optionally layered over a
+ * standard CidStore for write-through and read fallback. When a backing
+ * store is provided:
+ *
+ * - AddChunk writes to memory and asynchronously to the backing store.
+ * - FindChunkByCid checks memory first, then falls back to the backing store.
+ * - ContainsChunk and FilterChunks check memory first, then the backing store.
+ *
+ * The memory store does NOT cache read-through results from the backing store.
+ * Only chunks explicitly added via AddChunk/AddChunks are held in memory.
+ */
+
+class MemoryCidStore : public ChunkStore
+{
+public:
+ explicit MemoryCidStore(CidStore* BackingStore = nullptr);
+ ~MemoryCidStore();
+
+ InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) override;
+ std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) override;
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+ bool ContainsChunk(const IoHash& DecompressedId) override;
+ void FilterChunks(HashKeySet& InOutChunks) override;
+
+private:
+ RwLock m_Lock;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> m_Chunks;
+ CidStore* m_BackingStore = nullptr;
+
+ // Async write-through to backing store
+ struct PendingWrite
+ {
+ IoBuffer Data;
+ IoHash Hash;
+ };
+
+ std::mutex m_FlushLock;
+ std::vector<PendingWrite> m_FlushQueue;
+ Event m_FlushEvent;
+ std::thread m_FlushThread;
+ std::atomic<bool> m_FlushThreadEnabled{false};
+
+ void FlushThreadFunction();
+};
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/projectstore.h b/src/zenstore/include/zenstore/projectstore.h
index 100a82907..d05261967 100644
--- a/src/zenstore/include/zenstore/projectstore.h
+++ b/src/zenstore/include/zenstore/projectstore.h
@@ -305,11 +305,11 @@ public:
std::unordered_set<IoHash, IoHash::Hasher> m_PendingPrepOpAttachments;
GcClock::TimePoint m_PendingPrepOpAttachmentsRetainEnd;
- RefPtr<OplogStorage> m_Storage;
- uint64_t m_LogFlushPosition = 0;
- bool m_IsLegacySnapshot = false;
+ Ref<OplogStorage> m_Storage;
+ uint64_t m_LogFlushPosition = 0;
+ bool m_IsLegacySnapshot = false;
- RefPtr<OplogStorage> GetStorage();
+ Ref<OplogStorage> GetStorage();
/** Scan oplog and register each entry, thus updating the in-memory tracking tables
*/
@@ -484,7 +484,7 @@ public:
Project& Project,
Oplog& Oplog,
const std::unordered_set<std::string>& WantedFieldNames);
- static CbObject GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, const Oid& ChunkId);
+ static CbObject GetChunkInfo(Project& Project, Oplog& Oplog, const Oid& ChunkId);
struct GetChunkRangeResult
{
enum class EError : uint8_t
@@ -502,8 +502,7 @@ public:
uint64_t RawSize = 0;
ZenContentType ContentType = ZenContentType::kUnknownContentType;
};
- static GetChunkRangeResult GetChunkRange(LoggerRef InLog,
- Project& Project,
+ static GetChunkRangeResult GetChunkRange(Project& Project,
Oplog& Oplog,
const Oid& ChunkId,
uint64_t Offset,
diff --git a/src/zenstore/include/zenstore/zenstore.h b/src/zenstore/include/zenstore/zenstore.h
index bed219b4b..95ae33a4a 100644
--- a/src/zenstore/include/zenstore/zenstore.h
+++ b/src/zenstore/include/zenstore/zenstore.h
@@ -4,19 +4,56 @@
#include <zencore/zencore.h>
+#include <span>
+#include <vector>
+
#define ZENSTORE_API
namespace zen {
+class HashKeySet;
class IoBuffer;
struct IoHash;
class ChunkResolver
{
public:
+ virtual ~ChunkResolver() = default;
virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) = 0;
};
+/** Abstract chunk store interface.
+ *
+ * Extends ChunkResolver with write and query operations. Both CidStore
+ * (disk-backed) and MemoryCidStore (in-memory) implement this interface,
+ * allowing callers to be agnostic about the storage backend.
+ */
+class ChunkStore : public ChunkResolver
+{
+public:
+ struct InsertResult
+ {
+ bool New = false;
+ };
+
+ virtual InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) = 0;
+ virtual std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) = 0;
+ virtual bool ContainsChunk(const IoHash& DecompressedId) = 0;
+ virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
+};
+
+/** Composite resolver that tries a primary store first, then a fallback. */
+class FallbackChunkResolver : public ChunkResolver
+{
+public:
+ FallbackChunkResolver(ChunkResolver& Primary, ChunkResolver& Fallback);
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId) override;
+
+private:
+ ChunkResolver& m_Primary;
+ ChunkResolver& m_Fallback;
+};
+
ZENSTORE_API void zenstore_forcelinktests();
} // namespace zen
diff --git a/src/zenstore/memorycidstore.cpp b/src/zenstore/memorycidstore.cpp
new file mode 100644
index 000000000..b4832029b
--- /dev/null
+++ b/src/zenstore/memorycidstore.cpp
@@ -0,0 +1,143 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/hashkeyset.h>
+#include <zenstore/memorycidstore.h>
+
+namespace zen {
+
+MemoryCidStore::MemoryCidStore(CidStore* BackingStore) : m_BackingStore(BackingStore)
+{
+ if (m_BackingStore)
+ {
+ m_FlushThreadEnabled = true;
+ m_FlushThread = std::thread(&MemoryCidStore::FlushThreadFunction, this);
+ }
+}
+
+MemoryCidStore::~MemoryCidStore()
+{
+ m_FlushThreadEnabled = false;
+ m_FlushEvent.Set();
+ if (m_FlushThread.joinable())
+ {
+ m_FlushThread.join();
+ }
+}
+
+MemoryCidStore::InsertResult
+MemoryCidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash)
+{
+ bool IsNew = false;
+
+ m_Lock.WithExclusiveLock([&] {
+ auto [It, Inserted] = m_Chunks.try_emplace(RawHash, ChunkData);
+ IsNew = Inserted;
+ });
+
+ if (m_BackingStore)
+ {
+ std::lock_guard<std::mutex> Lock(m_FlushLock);
+ m_FlushQueue.push_back({.Data = ChunkData, .Hash = RawHash});
+ m_FlushEvent.Set();
+ }
+
+ return {.New = IsNew};
+}
+
+std::vector<MemoryCidStore::InsertResult>
+MemoryCidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes)
+{
+ std::vector<MemoryCidStore::InsertResult> Results;
+ Results.reserve(ChunkDatas.size());
+
+ for (size_t i = 0; i < ChunkDatas.size(); ++i)
+ {
+ Results.push_back(AddChunk(ChunkDatas[i], RawHashes[i]));
+ }
+
+ return Results;
+}
+
+IoBuffer
+MemoryCidStore::FindChunkByCid(const IoHash& DecompressedId)
+{
+ IoBuffer Result;
+
+ m_Lock.WithSharedLock([&] {
+ auto It = m_Chunks.find(DecompressedId);
+ if (It != m_Chunks.end())
+ {
+ Result = It->second;
+ }
+ });
+
+ if (!Result && m_BackingStore)
+ {
+ Result = m_BackingStore->FindChunkByCid(DecompressedId);
+ }
+
+ return Result;
+}
+
+bool
+MemoryCidStore::ContainsChunk(const IoHash& DecompressedId)
+{
+ bool Found = false;
+
+ m_Lock.WithSharedLock([&] { Found = m_Chunks.find(DecompressedId) != m_Chunks.end(); });
+
+ if (!Found && m_BackingStore)
+ {
+ Found = m_BackingStore->ContainsChunk(DecompressedId);
+ }
+
+ return Found;
+}
+
+void
+MemoryCidStore::FilterChunks(HashKeySet& InOutChunks)
+{
+ // Remove hashes that are present in our memory store
+ m_Lock.WithSharedLock([&] { InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return m_Chunks.find(Hash) != m_Chunks.end(); }); });
+
+ // Delegate remainder to backing store
+ if (m_BackingStore && !InOutChunks.IsEmpty())
+ {
+ m_BackingStore->FilterChunks(InOutChunks);
+ }
+}
+
+void
+MemoryCidStore::FlushThreadFunction()
+{
+ SetCurrentThreadName("MemCidFlush");
+
+ while (m_FlushThreadEnabled)
+ {
+ m_FlushEvent.Wait();
+
+ std::vector<PendingWrite> Batch;
+ {
+ std::lock_guard<std::mutex> Lock(m_FlushLock);
+ Batch.swap(m_FlushQueue);
+ }
+
+ for (PendingWrite& Write : Batch)
+ {
+ m_BackingStore->AddChunk(Write.Data, Write.Hash);
+ }
+ }
+
+ // Drain remaining writes on shutdown
+ std::vector<PendingWrite> Remaining;
+ {
+ std::lock_guard<std::mutex> Lock(m_FlushLock);
+ Remaining.swap(m_FlushQueue);
+ }
+ for (PendingWrite& Write : Remaining)
+ {
+ m_BackingStore->AddChunk(Write.Data, Write.Hash);
+ }
+}
+
+} // namespace zen
diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp
index 13674da4d..8159f9f83 100644
--- a/src/zenstore/projectstore.cpp
+++ b/src/zenstore/projectstore.cpp
@@ -3180,6 +3180,7 @@ ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&,
}
else
{
+ m_ChunkMap.erase(FileId);
Entry.ServerPath = ServerPath;
}
@@ -3402,11 +3403,11 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
return EntryId;
}
-RefPtr<ProjectStore::OplogStorage>
+Ref<ProjectStore::OplogStorage>
ProjectStore::Oplog::GetStorage()
{
ZEN_MEMSCOPE(GetProjectstoreTag());
- RefPtr<OplogStorage> Storage;
+ Ref<OplogStorage> Storage;
{
RwLock::SharedLockScope _(m_OplogLock);
Storage = m_Storage;
@@ -3424,7 +3425,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core)
using namespace std::literals;
- RefPtr<OplogStorage> Storage = GetStorage();
+ Ref<OplogStorage> Storage = GetStorage();
if (!Storage)
{
return {};
@@ -3456,7 +3457,7 @@ ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores)
using namespace std::literals;
- RefPtr<OplogStorage> Storage = GetStorage();
+ Ref<OplogStorage> Storage = GetStorage();
if (!Storage)
{
return std::vector<ProjectStore::LogSequenceNumber>(Cores.size(), LogSequenceNumber{});
@@ -3515,7 +3516,18 @@ ProjectStore::Project::~Project()
// Only write access times if we have not been explicitly deleted
if (!m_OplogStoragePath.empty())
{
- WriteAccessTimes();
+ try
+ {
+ WriteAccessTimes();
+ }
+ catch (const std::exception& Ex)
+ {
+ // RefCounted::Release() is noexcept, so a destructor that propagates an exception
+ // terminates the program. WriteAccessTimes() already catches I/O failures, but lock
+ // acquisition and allocations ahead of the internal try/catch can still throw, so
+ // we defend here as well.
+ ZEN_ERROR("project '{}': ~Project threw exception: '{}'", Identifier, Ex.what());
+ }
}
}
@@ -4738,7 +4750,7 @@ ProjectStore::GetProjectsList()
CbObject
ProjectStore::GetProjectFiles(LoggerRef InLog, Project& Project, Oplog& Oplog, const std::unordered_set<std::string>& WantedFieldNames)
{
- auto Log = [&InLog]() { return InLog; };
+ ZEN_SCOPED_LOG(InLog);
using namespace std::literals;
@@ -4893,7 +4905,7 @@ ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Opl
ZEN_MEMSCOPE(GetProjectstoreTag());
ZEN_TRACE_CPU("ProjectStore::GetProjectChunkInfos");
- auto Log = [&InLog]() { return InLog; };
+ ZEN_SCOPED_LOG(InLog);
using namespace std::literals;
@@ -5050,16 +5062,13 @@ ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Opl
}
CbObject
-ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, const Oid& ChunkId)
+ProjectStore::GetChunkInfo(Project& Project, Oplog& Oplog, const Oid& ChunkId)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
ZEN_TRACE_CPU("ProjectStore::GetChunkInfo");
using namespace std::literals;
- auto Log = [&InLog]() { return InLog; };
- ZEN_UNUSED(Log);
-
IoBuffer Chunk = Oplog.FindChunk(Project.RootDir, ChunkId, nullptr);
if (!Chunk)
{
@@ -5168,7 +5177,10 @@ ExtractRange(IoBuffer&& Chunk, uint64_t Offset, uint64_t Size, ZenContentType Ac
const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == ChunkSize));
if (IsFullRange)
{
- Result.Chunk = CompositeBuffer(SharedBuffer(std::move(Chunk)));
+ if (ChunkSize > 0)
+ {
+ Result.Chunk = CompositeBuffer(SharedBuffer(std::move(Chunk)));
+ }
Result.RawSize = 0;
}
else
@@ -5205,8 +5217,7 @@ ExtractRange(IoBuffer&& Chunk, uint64_t Offset, uint64_t Size, ZenContentType Ac
}
ProjectStore::GetChunkRangeResult
-ProjectStore::GetChunkRange(LoggerRef InLog,
- Project& Project,
+ProjectStore::GetChunkRange(Project& Project,
Oplog& Oplog,
const Oid& ChunkId,
uint64_t Offset,
@@ -5218,9 +5229,6 @@ ProjectStore::GetChunkRange(LoggerRef InLog,
ZEN_TRACE_CPU("ProjectStore::GetChunkRange");
- auto Log = [&InLog]() { return InLog; };
- ZEN_UNUSED(Log);
-
uint64_t OldTag = OptionalInOutModificationTag == nullptr ? 0 : *OptionalInOutModificationTag;
IoBuffer Chunk = Oplog.FindChunk(Project.RootDir, ChunkId, OptionalInOutModificationTag);
if (!Chunk)
@@ -5727,7 +5735,7 @@ public:
ZEN_TRACE_CPU("Store::CompactStore");
ZEN_MEMSCOPE(GetProjectstoreTag());
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -5863,7 +5871,7 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
ZEN_TRACE_CPU("Store::RemoveExpiredData");
ZEN_MEMSCOPE(GetProjectstoreTag());
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -6016,7 +6024,7 @@ public:
{
ZEN_TRACE_CPU("Store::UpdateLockedState");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
@@ -6093,7 +6101,7 @@ public:
{
ZEN_TRACE_CPU("Store::GetUnusedReferences");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
size_t InitialCount = IoCids.size();
size_t UsedCount = InitialCount;
@@ -6117,6 +6125,7 @@ public:
}
private:
+ LoggerRef Log() { return m_ProjectStore.Log(); }
ProjectStore& m_ProjectStore;
std::vector<IoHash> m_References;
};
@@ -6161,7 +6170,7 @@ public:
{
ZEN_TRACE_CPU("Store::Oplog::PreCache");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -6279,7 +6288,7 @@ public:
{
ZEN_TRACE_CPU("Store::Oplog::UpdateLockedState");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -6387,7 +6396,7 @@ public:
{
ZEN_TRACE_CPU("Store::Oplog::GetUnusedReferences");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
const size_t InitialCount = IoCids.size();
size_t UsedCount = InitialCount;
@@ -6413,6 +6422,7 @@ public:
return UnusedReferences;
}
+ LoggerRef Log() { return m_Project->Log(); }
ProjectStore& m_ProjectStore;
Ref<ProjectStore::Project> m_Project;
std::string m_OplogId;
@@ -6428,7 +6438,7 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx)
{
ZEN_TRACE_CPU("Store::CreateReferenceCheckers");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
size_t ProjectCount = 0;
size_t OplogCount = 0;
@@ -6490,11 +6500,9 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx)
std::vector<RwLock::SharedLockScope>
ProjectStore::LockState(GcCtx& Ctx)
{
+ ZEN_UNUSED(Ctx);
ZEN_TRACE_CPU("Store::LockState");
- auto Log = [&Ctx]() { return Ctx.Logger; };
- ZEN_UNUSED(Log);
-
std::vector<RwLock::SharedLockScope> Locks;
Locks.emplace_back(RwLock::SharedLockScope(m_ProjectsLock));
for (auto& ProjectIt : m_Projects)
@@ -6526,7 +6534,7 @@ public:
{
ZEN_TRACE_CPU("Store::Validate");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ ZEN_SCOPED_LOG(Ctx.Logger);
ProjectStore::Oplog::ValidationResult Result;
@@ -6625,9 +6633,6 @@ ProjectStore::CreateReferenceValidators(GcCtx& Ctx)
return {};
}
- auto Log = [&Ctx]() { return Ctx.Logger; };
- ZEN_UNUSED(Log);
-
DiscoverProjects();
std::vector<std::pair<std::string, std::string>> Oplogs;
@@ -8242,8 +8247,7 @@ TEST_CASE("project.store.partial.read")
{
uint64_t ModificationTag = 0;
- auto Result = ProjectStore.GetChunkRange(Log(),
- *Project1,
+ auto Result = ProjectStore.GetChunkRange(*Project1,
*Oplog1,
Attachments[OpIds[1]][0].first,
0,
@@ -8258,8 +8262,7 @@ TEST_CASE("project.store.partial.read")
CompressedBuffer Attachment = CompressedBuffer::FromCompressed(Result.Chunk, RawHash, RawSize);
CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize());
- auto Result2 = ProjectStore.GetChunkRange(Log(),
- *Project1,
+ auto Result2 = ProjectStore.GetChunkRange(*Project1,
*Oplog1,
Attachments[OpIds[1]][0].first,
0,
@@ -8272,8 +8275,7 @@ TEST_CASE("project.store.partial.read")
{
uint64_t FullChunkModificationTag = 0;
{
- auto Result = ProjectStore.GetChunkRange(Log(),
- *Project1,
+ auto Result = ProjectStore.GetChunkRange(*Project1,
*Oplog1,
Attachments[OpIds[2]][1].first,
0,
@@ -8286,8 +8288,7 @@ TEST_CASE("project.store.partial.read")
Attachments[OpIds[2]][1].second.DecodeRawSize());
}
{
- auto Result = ProjectStore.GetChunkRange(Log(),
- *Project1,
+ auto Result = ProjectStore.GetChunkRange(*Project1,
*Oplog1,
Attachments[OpIds[2]][1].first,
0,
@@ -8300,8 +8301,7 @@ TEST_CASE("project.store.partial.read")
{
uint64_t PartialChunkModificationTag = 0;
{
- auto Result = ProjectStore.GetChunkRange(Log(),
- *Project1,
+ auto Result = ProjectStore.GetChunkRange(*Project1,
*Oplog1,
Attachments[OpIds[2]][1].first,
5,
@@ -8324,8 +8324,7 @@ TEST_CASE("project.store.partial.read")
}
{
- auto Result = ProjectStore.GetChunkRange(Log(),
- *Project1,
+ auto Result = ProjectStore.GetChunkRange(*Project1,
*Oplog1,
Attachments[OpIds[2]][1].first,
0,
diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp
index ad21bbc68..cfdcd294c 100644
--- a/src/zenstore/workspaces.cpp
+++ b/src/zenstore/workspaces.cpp
@@ -331,9 +331,6 @@ ScanFolder(LoggerRef InLog, const std::filesystem::path& Path, WorkerThreadPool&
{
ZEN_TRACE_CPU("workspaces::ScanFolderImpl");
- auto Log = [&InLog]() { return InLog; };
- ZEN_UNUSED(Log);
-
FolderScanner Data(InLog, WorkerPool, Path);
Data.Traverse();
return std::make_unique<FolderStructure>(std::move(Data.FoundFiles), std::move(Data.FoundFileIds));
@@ -811,7 +808,7 @@ Workspaces::GetShareAlias(std::string_view Alias) const
std::vector<Workspaces::WorkspaceConfiguration>
Workspaces::ReadConfig(const LoggerRef& InLog, const std::filesystem::path& WorkspaceStatePath, std::string& OutError)
{
- auto Log = [&InLog]() { return InLog; };
+ ZEN_SCOPED_LOG(InLog);
using namespace std::literals;
@@ -835,7 +832,7 @@ Workspaces::WriteConfig(const LoggerRef& InLog,
const std::filesystem::path& WorkspaceStatePath,
const std::vector<WorkspaceConfiguration>& WorkspaceConfigurations)
{
- auto Log = [&InLog]() { return InLog; };
+ ZEN_SCOPED_LOG(InLog);
using namespace std::literals;
@@ -850,7 +847,7 @@ Workspaces::WriteConfig(const LoggerRef& InLog,
std::vector<Workspaces::WorkspaceShareConfiguration>
Workspaces::ReadWorkspaceConfig(const LoggerRef& InLog, const std::filesystem::path& WorkspaceRoot, std::string& OutError)
{
- auto Log = [&InLog]() { return InLog; };
+ ZEN_SCOPED_LOG(InLog);
using namespace std::literals;
@@ -874,7 +871,7 @@ Workspaces::WriteWorkspaceConfig(const LoggerRef& InLog,
const std::filesystem::path& WorkspaceRoot,
const std::vector<WorkspaceShareConfiguration>& WorkspaceShareConfigurations)
{
- auto Log = [&InLog]() { return InLog; };
+ ZEN_SCOPED_LOG(InLog);
using namespace std::literals;
@@ -1049,9 +1046,6 @@ Workspaces::RemoveWorkspaceShare(const LoggerRef& Log, const std::filesystem::pa
Workspaces::WorkspaceConfiguration
Workspaces::FindWorkspace(const LoggerRef& InLog, const std::filesystem::path& WorkspaceStatePath, const Oid& WorkspaceId)
{
- auto Log = [&InLog]() { return InLog; };
- ZEN_UNUSED(Log);
-
std::string Error;
std::vector<WorkspaceConfiguration> Workspaces = ReadConfig(InLog, WorkspaceStatePath, Error);
if (!Error.empty())
@@ -1075,9 +1069,6 @@ Workspaces::FindWorkspace(const LoggerRef& InLog,
const std::filesystem::path& WorkspaceStatePath,
const std::filesystem::path& WorkspaceRoot)
{
- auto Log = [&InLog]() { return InLog; };
- ZEN_UNUSED(Log);
-
std::string Error;
std::vector<WorkspaceConfiguration> Workspaces = ReadConfig(InLog, WorkspaceStatePath, Error);
if (!Error.empty())
@@ -1102,7 +1093,7 @@ Workspaces::FindWorkspaceShare(const LoggerRef& InLog,
std::string_view ShareAlias,
WorkspaceConfiguration& OutWorkspace)
{
- auto Log = [&InLog]() { return InLog; };
+ ZEN_SCOPED_LOG(InLog);
std::string Error;
std::vector<WorkspaceConfiguration> Workspaces = ReadConfig(InLog, WorkspaceStatePath, Error);
@@ -1151,7 +1142,7 @@ Workspaces::FindWorkspaceShare(const LoggerRef& InLog,
Workspaces::WorkspaceShareConfiguration
Workspaces::FindWorkspaceShare(const LoggerRef& InLog, const std::filesystem::path& WorkspaceRoot, const Oid& WorkspaceShareId)
{
- auto Log = [&InLog]() { return InLog; };
+ ZEN_SCOPED_LOG(InLog);
std::string Error;
std::vector<WorkspaceShareConfiguration> Shares = ReadWorkspaceConfig(InLog, WorkspaceRoot, Error);
if (!Error.empty())
@@ -1174,7 +1165,7 @@ Workspaces::FindWorkspaceShare(const LoggerRef& InLog, const std::filesystem::pa
Workspaces::WorkspaceShareConfiguration
Workspaces::FindWorkspaceShare(const LoggerRef& InLog, const std::filesystem::path& WorkspaceRoot, const std::filesystem::path& SharePath)
{
- auto Log = [&InLog]() { return InLog; };
+ ZEN_SCOPED_LOG(InLog);
std::string Error;
std::vector<WorkspaceShareConfiguration> Shares = ReadWorkspaceConfig(InLog, WorkspaceRoot, Error);
if (!Error.empty())
diff --git a/src/zenstore/zenstore.cpp b/src/zenstore/zenstore.cpp
index c563cc202..bf0c71211 100644
--- a/src/zenstore/zenstore.cpp
+++ b/src/zenstore/zenstore.cpp
@@ -2,6 +2,26 @@
#include "zenstore/zenstore.h"
+#include <zencore/iobuffer.h>
+
+namespace zen {
+
+FallbackChunkResolver::FallbackChunkResolver(ChunkResolver& Primary, ChunkResolver& Fallback) : m_Primary(Primary), m_Fallback(Fallback)
+{
+}
+
+IoBuffer
+FallbackChunkResolver::FindChunkByCid(const IoHash& DecompressedId)
+{
+ if (IoBuffer Result = m_Primary.FindChunkByCid(DecompressedId))
+ {
+ return Result;
+ }
+ return m_Fallback.FindChunkByCid(DecompressedId);
+}
+
+} // namespace zen
+
#if ZEN_WITH_TESTS
# include <zenstore/blockstore.h>