diff options
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/buildstore/buildstore.cpp | 6 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 14 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 12 | ||||
| -rw-r--r-- | src/zenstore/cidstore.cpp | 12 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 19 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 6 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 10 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachepolicy.h | 10 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cidstore.h | 22 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/memorycidstore.h | 68 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/projectstore.h | 13 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/zenstore.h | 37 | ||||
| -rw-r--r-- | src/zenstore/memorycidstore.cpp | 143 | ||||
| -rw-r--r-- | src/zenstore/projectstore.cpp | 87 | ||||
| -rw-r--r-- | src/zenstore/workspaces.cpp | 23 | ||||
| -rw-r--r-- | src/zenstore/zenstore.cpp | 20 |
16 files changed, 387 insertions, 115 deletions
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp index dff1c3c61..a08741d31 100644 --- a/src/zenstore/buildstore/buildstore.cpp +++ b/src/zenstore/buildstore/buildstore.cpp @@ -1052,7 +1052,7 @@ public: { ZEN_TRACE_CPU("Builds::PreCache"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -1107,7 +1107,7 @@ public: ZEN_TRACE_CPU("Builds::GetUnusedReferences"); ZEN_MEMSCOPE(GetBuildstoreTag()); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); size_t InitialCount = IoCids.size(); size_t UsedCount = InitialCount; @@ -1152,7 +1152,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) ZEN_TRACE_CPU("Builds::RemoveExpiredData"); ZEN_MEMSCOPE(GetBuildstoreTag()); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 4640309d9..45a4b6456 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -3083,7 +3083,7 @@ public: { ZEN_TRACE_CPU("Z$::Bucket::CompactStore"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3338,7 +3338,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); size_t TotalEntries = 0; @@ -3502,7 +3502,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferences(const LoggerRef& Logger, { ZEN_TRACE_CPU("Z$::Bucket::GetReferencesLocked"); - auto Log = [&Logger]() { return Logger; }; + ZEN_SCOPED_LOG(Logger); auto GetAttachments = [&](const IoHash& RawHash, MemoryView Data) -> bool { if (CbValidateError Error = ValidateCompactBinary(Data, CbValidateMode::Default); Error == CbValidateError::None) @@ -3718,7 +3718,7 @@ public: { ZEN_TRACE_CPU("Z$::Bucket::PreCache"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3753,7 +3753,7 @@ public: { ZEN_TRACE_CPU("Z$::Bucket::UpdateLockedState"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3784,7 +3784,7 @@ public: { ZEN_TRACE_CPU("Z$::Bucket::GetUnusedReferences"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); const size_t InitialCount = IoCids.size(); size_t UsedCount = InitialCount; @@ -3818,7 +3818,7 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) { ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index cff0e9a35..97b793083 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -468,7 +468,7 @@ ZenCacheStore::LogWorker() LoggerRef ZCacheLog(logging::Get("z$")); - auto Log = [&ZCacheLog]() -> LoggerRef { return ZCacheLog; }; + ZEN_SCOPED_LOG(ZCacheLog); std::vector<AccessLogItem> Items; while (true) @@ -1086,11 +1086,9 @@ ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view Bu std::vector<RwLock::SharedLockScope> ZenCacheStore::LockState(GcCtx& Ctx) { + ZEN_UNUSED(Ctx); ZEN_TRACE_CPU("CacheStore::LockState"); - auto Log = [&Ctx]() { return Ctx.Logger; }; - ZEN_UNUSED(Log); - std::vector<RwLock::SharedLockScope> Locks; Locks.emplace_back(RwLock::SharedLockScope(m_NamespacesLock)); for (auto& NamespaceIt : m_Namespaces) @@ -1211,7 +1209,7 @@ public: { ZEN_TRACE_CPU("Z$::UpdateLockedState"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; @@ -1276,7 +1274,7 @@ public: { ZEN_TRACE_CPU("Z$::GetUnusedReferences"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); const size_t InitialCount = IoCids.size(); size_t UsedCount = InitialCount; @@ -1309,7 +1307,7 @@ ZenCacheStore::CreateReferenceCheckers(GcCtx& Ctx) { ZEN_TRACE_CPU("CacheStore::CreateReferenceCheckers"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp index b20d8f565..ac8a75a58 100644 --- a/src/zenstore/cidstore.cpp +++ b/src/zenstore/cidstore.cpp @@ -188,12 +188,24 @@ CidStore::Initialize(const CidStoreConfiguration& Config) } CidStore::InsertResult +CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) +{ + return m_Impl->AddChunk(ChunkData, RawHash, InsertMode::kMayBeMovedInPlace); +} + +CidStore::InsertResult CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode) { return m_Impl->AddChunk(ChunkData, RawHash, Mode); } std::vector<CidStore::InsertResult> +CidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) +{ + return m_Impl->AddChunks(ChunkDatas, RawHashes, InsertMode::kMayBeMovedInPlace); +} + +std::vector<CidStore::InsertResult> CidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, InsertMode Mode) { return m_Impl->AddChunks(ChunkDatas, RawHashes, Mode); diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 43dc389e2..6f1e1d701 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -698,7 +698,7 @@ public: ZEN_TRACE_CPU("CasContainer::CompactStore"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -875,7 +875,7 @@ public: ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::RemoveUnreferencedData"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -958,7 +958,7 @@ CasContainerStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&) ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::CreateReferencePruner"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -1391,7 +1391,7 @@ TEST_CASE("compactcas.compact.gc") { ScopedTemporaryDirectory TempDir; - const int kIterationCount = 1000; + const int kIterationCount = 200; std::vector<IoHash> Keys(kIterationCount); @@ -1504,7 +1504,7 @@ TEST_CASE("compactcas.threadedinsert") ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 2048; + const int32_t kChunkCount = 512; uint64_t ExpectedSize = 0; tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> Chunks; @@ -1803,7 +1803,7 @@ TEST_CASE("compactcas.restart") } const uint64_t kChunkSize = 1048 + 395; - const size_t kChunkCount = 7167; + const size_t kChunkCount = 2000; std::vector<IoHash> Hashes; Hashes.reserve(kChunkCount); @@ -1984,9 +1984,8 @@ TEST_CASE("compactcas.iteratechunks") WorkerThreadPool ThreadPool(Max(GetHardwareConcurrency() - 1u, 2u), "put"); const uint64_t kChunkSize = 1048 + 395; - const size_t kChunkCount = 63840; + const size_t kChunkCount = 10000; - for (uint32_t N = 0; N < 2; N++) { GcManager Gc; CasContainerStrategy Cas(Gc); @@ -2017,7 +2016,7 @@ TEST_CASE("compactcas.iteratechunks") size_t BatchCount = Min<size_t>(kChunkCount - Offset, 512u); WorkLatch.AddCount(1); ThreadPool.ScheduleWork( - [N, &WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount]() { + [&WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); std::vector<IoBuffer> BatchBlobs; @@ -2028,7 +2027,7 @@ TEST_CASE("compactcas.iteratechunks") while (BatchBlobs.size() < BatchCount) { IoBuffer Chunk = CreateRandomBlob( - N + kChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377)); + kChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377)); IoHash Hash = IoHash::HashBuffer(Chunk); { RwLock::ExclusiveLockScope __(InsertLock); diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 0088afe6e..3a7a72ee3 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -1231,7 +1231,7 @@ public: ZEN_MEMSCOPE(GetFileCasTag()); ZEN_TRACE_CPU("FileCas::CompactStore"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -1375,7 +1375,7 @@ public: ZEN_MEMSCOPE(GetFileCasTag()); ZEN_TRACE_CPU("FileCas::RemoveUnreferencedData"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -1458,7 +1458,7 @@ FileCasStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&) ZEN_TRACE_CPU("FileCas::CreateReferencePruner"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index f3edf804d..928fc3f08 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -546,7 +546,7 @@ FilterReferences(GcCtx& Ctx, std::string_view Context, std::vector<IoHash>& InOu return false; } - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); const bool Filter = Ctx.Settings.AttachmentRangeMax != IoHash::Max || Ctx.Settings.AttachmentRangeMin != IoHash::Zero; @@ -2063,6 +2063,14 @@ GcScheduler::GetState() const { { std::unique_lock Lock(m_GcMutex); + + if (m_TriggerGcParams || m_TriggerScrubParams) + { + // If a trigger is pending, treat it as running + Result.Status = GcSchedulerStatus::kRunning; + return Result; + } + Result.LastFullGcTime = m_LastGcTime; Result.LastFullGCDiff = m_LastFullGCDiff; Result.LastFullGcDuration = m_LastFullGcDuration; diff --git a/src/zenstore/include/zenstore/cache/cachepolicy.h b/src/zenstore/include/zenstore/cache/cachepolicy.h index 7773cd3d1..4a062a0c2 100644 --- a/src/zenstore/include/zenstore/cache/cachepolicy.h +++ b/src/zenstore/include/zenstore/cache/cachepolicy.h @@ -163,9 +163,9 @@ private: friend class CacheRecordPolicyBuilder; friend class OptionalCacheRecordPolicy; - CachePolicy RecordPolicy = CachePolicy::Default; - CachePolicy DefaultValuePolicy = CachePolicy::Default; - RefPtr<const Private::ICacheRecordPolicyShared> Shared; + CachePolicy RecordPolicy = CachePolicy::Default; + CachePolicy DefaultValuePolicy = CachePolicy::Default; + Ref<const Private::ICacheRecordPolicyShared> Shared; }; /** A cache record policy builder is used to construct a cache record policy. */ @@ -186,8 +186,8 @@ public: CacheRecordPolicy Build(); private: - CachePolicy BasePolicy = CachePolicy::Default; - RefPtr<Private::ICacheRecordPolicyShared> Shared; + CachePolicy BasePolicy = CachePolicy::Default; + Ref<Private::ICacheRecordPolicyShared> Shared; }; /** diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index d54062476..c00e0449f 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -58,16 +58,14 @@ struct CidStoreConfiguration * */ -class CidStore final : public ChunkResolver, public StatsProvider +class CidStore final : public ChunkStore, public StatsProvider { public: CidStore(GcManager& Gc); ~CidStore(); - struct InsertResult - { - bool New = false; - }; + using InsertResult = ChunkStore::InsertResult; + enum class InsertMode { kCopyOnly, @@ -75,17 +73,17 @@ public: }; void Initialize(const CidStoreConfiguration& Config); - InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace); - std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, - std::span<IoHash> RawHashes, - InsertMode Mode = InsertMode::kMayBeMovedInPlace); - virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) override; + InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode); + std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) override; + std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes, InsertMode Mode); + IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; bool IterateChunks(std::span<IoHash> DecompressedIds, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit); - bool ContainsChunk(const IoHash& DecompressedId); - void FilterChunks(HashKeySet& InOutChunks); + bool ContainsChunk(const IoHash& DecompressedId) override; + void FilterChunks(HashKeySet& InOutChunks) override; void Flush(); CidStoreSize TotalSize() const; CidStoreStats Stats() const; diff --git a/src/zenstore/include/zenstore/memorycidstore.h b/src/zenstore/include/zenstore/memorycidstore.h new file mode 100644 index 000000000..0311274d5 --- /dev/null +++ b/src/zenstore/include/zenstore/memorycidstore.h @@ -0,0 +1,68 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "cidstore.h" +#include "zenstore.h" + +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/thread.h> + +#include <deque> +#include <span> +#include <thread> +#include <unordered_map> + +namespace zen { + +class HashKeySet; + +/** Memory-backed chunk store. + * + * Stores chunks in an in-memory hash map, optionally layered over a + * standard CidStore for write-through and read fallback. When a backing + * store is provided: + * + * - AddChunk writes to memory and asynchronously to the backing store. + * - FindChunkByCid checks memory first, then falls back to the backing store. + * - ContainsChunk and FilterChunks check memory first, then the backing store. + * + * The memory store does NOT cache read-through results from the backing store. + * Only chunks explicitly added via AddChunk/AddChunks are held in memory. + */ + +class MemoryCidStore : public ChunkStore +{ +public: + explicit MemoryCidStore(CidStore* BackingStore = nullptr); + ~MemoryCidStore(); + + InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) override; + std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) override; + IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + bool ContainsChunk(const IoHash& DecompressedId) override; + void FilterChunks(HashKeySet& InOutChunks) override; + +private: + RwLock m_Lock; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> m_Chunks; + CidStore* m_BackingStore = nullptr; + + // Async write-through to backing store + struct PendingWrite + { + IoBuffer Data; + IoHash Hash; + }; + + std::mutex m_FlushLock; + std::vector<PendingWrite> m_FlushQueue; + Event m_FlushEvent; + std::thread m_FlushThread; + std::atomic<bool> m_FlushThreadEnabled{false}; + + void FlushThreadFunction(); +}; + +} // namespace zen diff --git a/src/zenstore/include/zenstore/projectstore.h b/src/zenstore/include/zenstore/projectstore.h index 100a82907..d05261967 100644 --- a/src/zenstore/include/zenstore/projectstore.h +++ b/src/zenstore/include/zenstore/projectstore.h @@ -305,11 +305,11 @@ public: std::unordered_set<IoHash, IoHash::Hasher> m_PendingPrepOpAttachments; GcClock::TimePoint m_PendingPrepOpAttachmentsRetainEnd; - RefPtr<OplogStorage> m_Storage; - uint64_t m_LogFlushPosition = 0; - bool m_IsLegacySnapshot = false; + Ref<OplogStorage> m_Storage; + uint64_t m_LogFlushPosition = 0; + bool m_IsLegacySnapshot = false; - RefPtr<OplogStorage> GetStorage(); + Ref<OplogStorage> GetStorage(); /** Scan oplog and register each entry, thus updating the in-memory tracking tables */ @@ -484,7 +484,7 @@ public: Project& Project, Oplog& Oplog, const std::unordered_set<std::string>& WantedFieldNames); - static CbObject GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, const Oid& ChunkId); + static CbObject GetChunkInfo(Project& Project, Oplog& Oplog, const Oid& ChunkId); struct GetChunkRangeResult { enum class EError : uint8_t @@ -502,8 +502,7 @@ public: uint64_t RawSize = 0; ZenContentType ContentType = ZenContentType::kUnknownContentType; }; - static GetChunkRangeResult GetChunkRange(LoggerRef InLog, - Project& Project, + static GetChunkRangeResult GetChunkRange(Project& Project, Oplog& Oplog, const Oid& ChunkId, uint64_t Offset, diff --git a/src/zenstore/include/zenstore/zenstore.h b/src/zenstore/include/zenstore/zenstore.h index bed219b4b..95ae33a4a 100644 --- a/src/zenstore/include/zenstore/zenstore.h +++ b/src/zenstore/include/zenstore/zenstore.h @@ -4,19 +4,56 @@ #include <zencore/zencore.h> +#include <span> +#include <vector> + #define ZENSTORE_API namespace zen { +class HashKeySet; class IoBuffer; struct IoHash; class ChunkResolver { public: + virtual ~ChunkResolver() = default; virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) = 0; }; +/** Abstract chunk store interface. + * + * Extends ChunkResolver with write and query operations. Both CidStore + * (disk-backed) and MemoryCidStore (in-memory) implement this interface, + * allowing callers to be agnostic about the storage backend. + */ +class ChunkStore : public ChunkResolver +{ +public: + struct InsertResult + { + bool New = false; + }; + + virtual InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) = 0; + virtual std::vector<InsertResult> AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) = 0; + virtual bool ContainsChunk(const IoHash& DecompressedId) = 0; + virtual void FilterChunks(HashKeySet& InOutChunks) = 0; +}; + +/** Composite resolver that tries a primary store first, then a fallback. */ +class FallbackChunkResolver : public ChunkResolver +{ +public: + FallbackChunkResolver(ChunkResolver& Primary, ChunkResolver& Fallback); + IoBuffer FindChunkByCid(const IoHash& DecompressedId) override; + +private: + ChunkResolver& m_Primary; + ChunkResolver& m_Fallback; +}; + ZENSTORE_API void zenstore_forcelinktests(); } // namespace zen diff --git a/src/zenstore/memorycidstore.cpp b/src/zenstore/memorycidstore.cpp new file mode 100644 index 000000000..b4832029b --- /dev/null +++ b/src/zenstore/memorycidstore.cpp @@ -0,0 +1,143 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/hashkeyset.h> +#include <zenstore/memorycidstore.h> + +namespace zen { + +MemoryCidStore::MemoryCidStore(CidStore* BackingStore) : m_BackingStore(BackingStore) +{ + if (m_BackingStore) + { + m_FlushThreadEnabled = true; + m_FlushThread = std::thread(&MemoryCidStore::FlushThreadFunction, this); + } +} + +MemoryCidStore::~MemoryCidStore() +{ + m_FlushThreadEnabled = false; + m_FlushEvent.Set(); + if (m_FlushThread.joinable()) + { + m_FlushThread.join(); + } +} + +MemoryCidStore::InsertResult +MemoryCidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash) +{ + bool IsNew = false; + + m_Lock.WithExclusiveLock([&] { + auto [It, Inserted] = m_Chunks.try_emplace(RawHash, ChunkData); + IsNew = Inserted; + }); + + if (m_BackingStore) + { + std::lock_guard<std::mutex> Lock(m_FlushLock); + m_FlushQueue.push_back({.Data = ChunkData, .Hash = RawHash}); + m_FlushEvent.Set(); + } + + return {.New = IsNew}; +} + +std::vector<MemoryCidStore::InsertResult> +MemoryCidStore::AddChunks(std::span<IoBuffer> ChunkDatas, std::span<IoHash> RawHashes) +{ + std::vector<MemoryCidStore::InsertResult> Results; + Results.reserve(ChunkDatas.size()); + + for (size_t i = 0; i < ChunkDatas.size(); ++i) + { + Results.push_back(AddChunk(ChunkDatas[i], RawHashes[i])); + } + + return Results; +} + +IoBuffer +MemoryCidStore::FindChunkByCid(const IoHash& DecompressedId) +{ + IoBuffer Result; + + m_Lock.WithSharedLock([&] { + auto It = m_Chunks.find(DecompressedId); + if (It != m_Chunks.end()) + { + Result = It->second; + } + }); + + if (!Result && m_BackingStore) + { + Result = m_BackingStore->FindChunkByCid(DecompressedId); + } + + return Result; +} + +bool +MemoryCidStore::ContainsChunk(const IoHash& DecompressedId) +{ + bool Found = false; + + m_Lock.WithSharedLock([&] { Found = m_Chunks.find(DecompressedId) != m_Chunks.end(); }); + + if (!Found && m_BackingStore) + { + Found = m_BackingStore->ContainsChunk(DecompressedId); + } + + return Found; +} + +void +MemoryCidStore::FilterChunks(HashKeySet& InOutChunks) +{ + // Remove hashes that are present in our memory store + m_Lock.WithSharedLock([&] { InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return m_Chunks.find(Hash) != m_Chunks.end(); }); }); + + // Delegate remainder to backing store + if (m_BackingStore && !InOutChunks.IsEmpty()) + { + m_BackingStore->FilterChunks(InOutChunks); + } +} + +void +MemoryCidStore::FlushThreadFunction() +{ + SetCurrentThreadName("MemCidFlush"); + + while (m_FlushThreadEnabled) + { + m_FlushEvent.Wait(); + + std::vector<PendingWrite> Batch; + { + std::lock_guard<std::mutex> Lock(m_FlushLock); + Batch.swap(m_FlushQueue); + } + + for (PendingWrite& Write : Batch) + { + m_BackingStore->AddChunk(Write.Data, Write.Hash); + } + } + + // Drain remaining writes on shutdown + std::vector<PendingWrite> Remaining; + { + std::lock_guard<std::mutex> Lock(m_FlushLock); + Remaining.swap(m_FlushQueue); + } + for (PendingWrite& Write : Remaining) + { + m_BackingStore->AddChunk(Write.Data, Write.Hash); + } +} + +} // namespace zen diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp index 13674da4d..8159f9f83 100644 --- a/src/zenstore/projectstore.cpp +++ b/src/zenstore/projectstore.cpp @@ -3180,6 +3180,7 @@ ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, } else { + m_ChunkMap.erase(FileId); Entry.ServerPath = ServerPath; } @@ -3402,11 +3403,11 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) return EntryId; } -RefPtr<ProjectStore::OplogStorage> +Ref<ProjectStore::OplogStorage> ProjectStore::Oplog::GetStorage() { ZEN_MEMSCOPE(GetProjectstoreTag()); - RefPtr<OplogStorage> Storage; + Ref<OplogStorage> Storage; { RwLock::SharedLockScope _(m_OplogLock); Storage = m_Storage; @@ -3424,7 +3425,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) using namespace std::literals; - RefPtr<OplogStorage> Storage = GetStorage(); + Ref<OplogStorage> Storage = GetStorage(); if (!Storage) { return {}; @@ -3456,7 +3457,7 @@ ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores) using namespace std::literals; - RefPtr<OplogStorage> Storage = GetStorage(); + Ref<OplogStorage> Storage = GetStorage(); if (!Storage) { return std::vector<ProjectStore::LogSequenceNumber>(Cores.size(), LogSequenceNumber{}); @@ -3515,7 +3516,18 @@ ProjectStore::Project::~Project() // Only write access times if we have not been explicitly deleted if (!m_OplogStoragePath.empty()) { - WriteAccessTimes(); + try + { + WriteAccessTimes(); + } + catch (const std::exception& Ex) + { + // RefCounted::Release() is noexcept, so a destructor that propagates an exception + // terminates the program. WriteAccessTimes() already catches I/O failures, but lock + // acquisition and allocations ahead of the internal try/catch can still throw, so + // we defend here as well. + ZEN_ERROR("project '{}': ~Project threw exception: '{}'", Identifier, Ex.what()); + } } } @@ -4738,7 +4750,7 @@ ProjectStore::GetProjectsList() CbObject ProjectStore::GetProjectFiles(LoggerRef InLog, Project& Project, Oplog& Oplog, const std::unordered_set<std::string>& WantedFieldNames) { - auto Log = [&InLog]() { return InLog; }; + ZEN_SCOPED_LOG(InLog); using namespace std::literals; @@ -4893,7 +4905,7 @@ ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Opl ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("ProjectStore::GetProjectChunkInfos"); - auto Log = [&InLog]() { return InLog; }; + ZEN_SCOPED_LOG(InLog); using namespace std::literals; @@ -5050,16 +5062,13 @@ ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Opl } CbObject -ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, const Oid& ChunkId) +ProjectStore::GetChunkInfo(Project& Project, Oplog& Oplog, const Oid& ChunkId) { ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("ProjectStore::GetChunkInfo"); using namespace std::literals; - auto Log = [&InLog]() { return InLog; }; - ZEN_UNUSED(Log); - IoBuffer Chunk = Oplog.FindChunk(Project.RootDir, ChunkId, nullptr); if (!Chunk) { @@ -5168,7 +5177,10 @@ ExtractRange(IoBuffer&& Chunk, uint64_t Offset, uint64_t Size, ZenContentType Ac const bool IsFullRange = (Offset == 0) && ((Size == ~(0ull)) || (Size == ChunkSize)); if (IsFullRange) { - Result.Chunk = CompositeBuffer(SharedBuffer(std::move(Chunk))); + if (ChunkSize > 0) + { + Result.Chunk = CompositeBuffer(SharedBuffer(std::move(Chunk))); + } Result.RawSize = 0; } else @@ -5205,8 +5217,7 @@ ExtractRange(IoBuffer&& Chunk, uint64_t Offset, uint64_t Size, ZenContentType Ac } ProjectStore::GetChunkRangeResult -ProjectStore::GetChunkRange(LoggerRef InLog, - Project& Project, +ProjectStore::GetChunkRange(Project& Project, Oplog& Oplog, const Oid& ChunkId, uint64_t Offset, @@ -5218,9 +5229,6 @@ ProjectStore::GetChunkRange(LoggerRef InLog, ZEN_TRACE_CPU("ProjectStore::GetChunkRange"); - auto Log = [&InLog]() { return InLog; }; - ZEN_UNUSED(Log); - uint64_t OldTag = OptionalInOutModificationTag == nullptr ? 0 : *OptionalInOutModificationTag; IoBuffer Chunk = Oplog.FindChunk(Project.RootDir, ChunkId, OptionalInOutModificationTag); if (!Chunk) @@ -5727,7 +5735,7 @@ public: ZEN_TRACE_CPU("Store::CompactStore"); ZEN_MEMSCOPE(GetProjectstoreTag()); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -5863,7 +5871,7 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) ZEN_TRACE_CPU("Store::RemoveExpiredData"); ZEN_MEMSCOPE(GetProjectstoreTag()); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -6016,7 +6024,7 @@ public: { ZEN_TRACE_CPU("Store::UpdateLockedState"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; @@ -6093,7 +6101,7 @@ public: { ZEN_TRACE_CPU("Store::GetUnusedReferences"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); size_t InitialCount = IoCids.size(); size_t UsedCount = InitialCount; @@ -6117,6 +6125,7 @@ public: } private: + LoggerRef Log() { return m_ProjectStore.Log(); } ProjectStore& m_ProjectStore; std::vector<IoHash> m_References; }; @@ -6161,7 +6170,7 @@ public: { ZEN_TRACE_CPU("Store::Oplog::PreCache"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -6279,7 +6288,7 @@ public: { ZEN_TRACE_CPU("Store::Oplog::UpdateLockedState"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -6387,7 +6396,7 @@ public: { ZEN_TRACE_CPU("Store::Oplog::GetUnusedReferences"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); const size_t InitialCount = IoCids.size(); size_t UsedCount = InitialCount; @@ -6413,6 +6422,7 @@ public: return UnusedReferences; } + LoggerRef Log() { return m_Project->Log(); } ProjectStore& m_ProjectStore; Ref<ProjectStore::Project> m_Project; std::string m_OplogId; @@ -6428,7 +6438,7 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) { ZEN_TRACE_CPU("Store::CreateReferenceCheckers"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); size_t ProjectCount = 0; size_t OplogCount = 0; @@ -6490,11 +6500,9 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) std::vector<RwLock::SharedLockScope> ProjectStore::LockState(GcCtx& Ctx) { + ZEN_UNUSED(Ctx); ZEN_TRACE_CPU("Store::LockState"); - auto Log = [&Ctx]() { return Ctx.Logger; }; - ZEN_UNUSED(Log); - std::vector<RwLock::SharedLockScope> Locks; Locks.emplace_back(RwLock::SharedLockScope(m_ProjectsLock)); for (auto& ProjectIt : m_Projects) @@ -6526,7 +6534,7 @@ public: { ZEN_TRACE_CPU("Store::Validate"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + ZEN_SCOPED_LOG(Ctx.Logger); ProjectStore::Oplog::ValidationResult Result; @@ -6625,9 +6633,6 @@ ProjectStore::CreateReferenceValidators(GcCtx& Ctx) return {}; } - auto Log = [&Ctx]() { return Ctx.Logger; }; - ZEN_UNUSED(Log); - DiscoverProjects(); std::vector<std::pair<std::string, std::string>> Oplogs; @@ -8242,8 +8247,7 @@ TEST_CASE("project.store.partial.read") { uint64_t ModificationTag = 0; - auto Result = ProjectStore.GetChunkRange(Log(), - *Project1, + auto Result = ProjectStore.GetChunkRange(*Project1, *Oplog1, Attachments[OpIds[1]][0].first, 0, @@ -8258,8 +8262,7 @@ TEST_CASE("project.store.partial.read") CompressedBuffer Attachment = CompressedBuffer::FromCompressed(Result.Chunk, RawHash, RawSize); CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize()); - auto Result2 = ProjectStore.GetChunkRange(Log(), - *Project1, + auto Result2 = ProjectStore.GetChunkRange(*Project1, *Oplog1, Attachments[OpIds[1]][0].first, 0, @@ -8272,8 +8275,7 @@ TEST_CASE("project.store.partial.read") { uint64_t FullChunkModificationTag = 0; { - auto Result = ProjectStore.GetChunkRange(Log(), - *Project1, + auto Result = ProjectStore.GetChunkRange(*Project1, *Oplog1, Attachments[OpIds[2]][1].first, 0, @@ -8286,8 +8288,7 @@ TEST_CASE("project.store.partial.read") Attachments[OpIds[2]][1].second.DecodeRawSize()); } { - auto Result = ProjectStore.GetChunkRange(Log(), - *Project1, + auto Result = ProjectStore.GetChunkRange(*Project1, *Oplog1, Attachments[OpIds[2]][1].first, 0, @@ -8300,8 +8301,7 @@ TEST_CASE("project.store.partial.read") { uint64_t PartialChunkModificationTag = 0; { - auto Result = ProjectStore.GetChunkRange(Log(), - *Project1, + auto Result = ProjectStore.GetChunkRange(*Project1, *Oplog1, Attachments[OpIds[2]][1].first, 5, @@ -8324,8 +8324,7 @@ TEST_CASE("project.store.partial.read") } { - auto Result = ProjectStore.GetChunkRange(Log(), - *Project1, + auto Result = ProjectStore.GetChunkRange(*Project1, *Oplog1, Attachments[OpIds[2]][1].first, 0, diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp index ad21bbc68..cfdcd294c 100644 --- a/src/zenstore/workspaces.cpp +++ b/src/zenstore/workspaces.cpp @@ -331,9 +331,6 @@ ScanFolder(LoggerRef InLog, const std::filesystem::path& Path, WorkerThreadPool& { ZEN_TRACE_CPU("workspaces::ScanFolderImpl"); - auto Log = [&InLog]() { return InLog; }; - ZEN_UNUSED(Log); - FolderScanner Data(InLog, WorkerPool, Path); Data.Traverse(); return std::make_unique<FolderStructure>(std::move(Data.FoundFiles), std::move(Data.FoundFileIds)); @@ -811,7 +808,7 @@ Workspaces::GetShareAlias(std::string_view Alias) const std::vector<Workspaces::WorkspaceConfiguration> Workspaces::ReadConfig(const LoggerRef& InLog, const std::filesystem::path& WorkspaceStatePath, std::string& OutError) { - auto Log = [&InLog]() { return InLog; }; + ZEN_SCOPED_LOG(InLog); using namespace std::literals; @@ -835,7 +832,7 @@ Workspaces::WriteConfig(const LoggerRef& InLog, const std::filesystem::path& WorkspaceStatePath, const std::vector<WorkspaceConfiguration>& WorkspaceConfigurations) { - auto Log = [&InLog]() { return InLog; }; + ZEN_SCOPED_LOG(InLog); using namespace std::literals; @@ -850,7 +847,7 @@ Workspaces::WriteConfig(const LoggerRef& InLog, std::vector<Workspaces::WorkspaceShareConfiguration> Workspaces::ReadWorkspaceConfig(const LoggerRef& InLog, const std::filesystem::path& WorkspaceRoot, std::string& OutError) { - auto Log = [&InLog]() { return InLog; }; + ZEN_SCOPED_LOG(InLog); using namespace std::literals; @@ -874,7 +871,7 @@ Workspaces::WriteWorkspaceConfig(const LoggerRef& InLog, const std::filesystem::path& WorkspaceRoot, const std::vector<WorkspaceShareConfiguration>& WorkspaceShareConfigurations) { - auto Log = [&InLog]() { return InLog; }; + ZEN_SCOPED_LOG(InLog); using namespace std::literals; @@ -1049,9 +1046,6 @@ Workspaces::RemoveWorkspaceShare(const LoggerRef& Log, const std::filesystem::pa Workspaces::WorkspaceConfiguration Workspaces::FindWorkspace(const LoggerRef& InLog, const std::filesystem::path& WorkspaceStatePath, const Oid& WorkspaceId) { - auto Log = [&InLog]() { return InLog; }; - ZEN_UNUSED(Log); - std::string Error; std::vector<WorkspaceConfiguration> Workspaces = ReadConfig(InLog, WorkspaceStatePath, Error); if (!Error.empty()) @@ -1075,9 +1069,6 @@ Workspaces::FindWorkspace(const LoggerRef& InLog, const std::filesystem::path& WorkspaceStatePath, const std::filesystem::path& WorkspaceRoot) { - auto Log = [&InLog]() { return InLog; }; - ZEN_UNUSED(Log); - std::string Error; std::vector<WorkspaceConfiguration> Workspaces = ReadConfig(InLog, WorkspaceStatePath, Error); if (!Error.empty()) @@ -1102,7 +1093,7 @@ Workspaces::FindWorkspaceShare(const LoggerRef& InLog, std::string_view ShareAlias, WorkspaceConfiguration& OutWorkspace) { - auto Log = [&InLog]() { return InLog; }; + ZEN_SCOPED_LOG(InLog); std::string Error; std::vector<WorkspaceConfiguration> Workspaces = ReadConfig(InLog, WorkspaceStatePath, Error); @@ -1151,7 +1142,7 @@ Workspaces::FindWorkspaceShare(const LoggerRef& InLog, Workspaces::WorkspaceShareConfiguration Workspaces::FindWorkspaceShare(const LoggerRef& InLog, const std::filesystem::path& WorkspaceRoot, const Oid& WorkspaceShareId) { - auto Log = [&InLog]() { return InLog; }; + ZEN_SCOPED_LOG(InLog); std::string Error; std::vector<WorkspaceShareConfiguration> Shares = ReadWorkspaceConfig(InLog, WorkspaceRoot, Error); if (!Error.empty()) @@ -1174,7 +1165,7 @@ Workspaces::FindWorkspaceShare(const LoggerRef& InLog, const std::filesystem::pa Workspaces::WorkspaceShareConfiguration Workspaces::FindWorkspaceShare(const LoggerRef& InLog, const std::filesystem::path& WorkspaceRoot, const std::filesystem::path& SharePath) { - auto Log = [&InLog]() { return InLog; }; + ZEN_SCOPED_LOG(InLog); std::string Error; std::vector<WorkspaceShareConfiguration> Shares = ReadWorkspaceConfig(InLog, WorkspaceRoot, Error); if (!Error.empty()) diff --git a/src/zenstore/zenstore.cpp b/src/zenstore/zenstore.cpp index c563cc202..bf0c71211 100644 --- a/src/zenstore/zenstore.cpp +++ b/src/zenstore/zenstore.cpp @@ -2,6 +2,26 @@ #include "zenstore/zenstore.h" +#include <zencore/iobuffer.h> + +namespace zen { + +FallbackChunkResolver::FallbackChunkResolver(ChunkResolver& Primary, ChunkResolver& Fallback) : m_Primary(Primary), m_Fallback(Fallback) +{ +} + +IoBuffer +FallbackChunkResolver::FindChunkByCid(const IoHash& DecompressedId) +{ + if (IoBuffer Result = m_Primary.FindChunkByCid(DecompressedId)) + { + return Result; + } + return m_Fallback.FindChunkByCid(DecompressedId); +} + +} // namespace zen + #if ZEN_WITH_TESTS # include <zenstore/blockstore.h> |