diff options
| author | Stefan Boberg <[email protected]> | 2023-05-16 21:35:39 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-16 21:35:39 +0200 |
| commit | 81b2757f917e34bb338fad7965ae8a74e160bee4 (patch) | |
| tree | 931ba100471a2369c62a6e41a1b4a7937ed31f6f /src/zenstore | |
| parent | added benchmark utility command `bench` (#298) (diff) | |
| download | zen-81b2757f917e34bb338fad7965ae8a74e160bee4.tar.xz zen-81b2757f917e34bb338fad7965ae8a74e160bee4.zip | |
Content scrubbing (#271)
Added zen scrub command which may be triggered via the zen CLI helper. This traverses storage and validates contents either by content hash and/or by structure. If unexpected data is encountered it is invalidated.
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/cas.cpp | 6 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 139 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 142 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 40 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/scrubcontext.h | 45 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/zenstore.h | 2 | ||||
| -rw-r--r-- | src/zenstore/scrubcontext.cpp | 45 |
7 files changed, 316 insertions, 103 deletions
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index ab05e3e7c..b98f01385 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -18,6 +18,7 @@ #include <zencore/thread.h> #include <zencore/trace.h> #include <zencore/uid.h> +#include <zencore/workthreadpool.h> #include <zenstore/cidstore.h> #include <zenstore/gc.h> #include <zenstore/scrubcontext.h> @@ -74,6 +75,8 @@ private: void UpdateManifest(); }; +////////////////////////////////////////////////////////////////////////// + CasImpl::CasImpl(GcManager& Gc) : m_TinyStrategy(Gc), m_SmallStrategy(Gc), m_LargeStrategy(Gc) { } @@ -323,7 +326,8 @@ TEST_CASE("CasStore") std::unique_ptr<CasStore> Store = CreateCasStore(Gc); Store->Initialize(config); - ScrubContext Ctx; + WorkerThreadPool ThreadPool{1}; + ScrubContext Ctx{ThreadPool}; Store->ScrubStorage(Ctx); IoBuffer Value1{16}; diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index a8a4dc102..e9037b16c 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -244,93 +244,100 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) std::vector<BlockStoreLocation> ChunkLocations; std::vector<IoHash> ChunkIndexToChunkHash; - RwLock::SharedLockScope _(m_LocationMapLock); - - uint64_t TotalChunkCount = m_LocationMap.size(); - ChunkLocations.reserve(TotalChunkCount); - ChunkIndexToChunkHash.reserve(TotalChunkCount); + try { - for (const auto& Entry : m_LocationMap) + RwLock::SharedLockScope _(m_LocationMapLock); + + uint64_t TotalChunkCount = m_LocationMap.size(); + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); { - const IoHash& ChunkHash = Entry.first; - const BlockStoreDiskLocation& DiskLocation = Entry.second; - BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); + for (const auto& Entry : m_LocationMap) + { + const IoHash& ChunkHash = Entry.first; + const BlockStoreDiskLocation& DiskLocation = Entry.second; + BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); - ChunkLocations.push_back(Location); - ChunkIndexToChunkHash.push_back(ChunkHash); + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash.push_back(ChunkHash); + } } - } - const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; + const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - if (!Data) - { - // ChunkLocation out of range of stored blocks - BadKeys.push_back(Hash); - return; - } - - IoBuffer Buffer(IoBuffer::Wrap, Data, Size); - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - if (RawHash != Hash) + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + if (!Data) { - // Hash mismatch + // ChunkLocation out of range of stored blocks BadKeys.push_back(Hash); return; } - return; - } + + IoBuffer Buffer(IoBuffer::Wrap, Data, Size); + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + { + if (RawHash != Hash) + { + // Hash mismatch + BadKeys.push_back(Hash); + return; + } + return; + } #if ZEN_WITH_TESTS - IoHash ComputedHash = IoHash::HashBuffer(Data, Size); - if (ComputedHash == Hash) - { - return; - } + IoHash ComputedHash = IoHash::HashBuffer(Data, Size); + if (ComputedHash == Hash) + { + return; + } #endif - BadKeys.push_back(Hash); - }; + BadKeys.push_back(Hash); + }; - const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; + const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + Ctx.ThrowIfDeadlineExpired(); - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + ++ChunkCount; + ChunkBytes += Size; - IoHash RawHash; - uint64_t RawSize; - // TODO: Add API to verify compressed buffer without having to memorymap the whole file - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - if (RawHash != Hash) + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + + IoHash RawHash; + uint64_t RawSize; + // TODO: Add API to verify compressed buffer without having to memorymap the whole file + if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) { - // Hash mismatch - BadKeys.push_back(Hash); + if (RawHash != Hash) + { + // Hash mismatch + BadKeys.push_back(Hash); + return; + } return; } - return; - } #if ZEN_WITH_TESTS - IoHashStream Hasher; - File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); - IoHash ComputedHash = Hasher.GetHash(); - if (ComputedHash == Hash) - { - return; - } + IoHashStream Hasher; + File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + IoHash ComputedHash = Hasher.GetHash(); + if (ComputedHash == Hash) + { + return; + } #endif - BadKeys.push_back(Hash); - }; - - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + BadKeys.push_back(Hash); + }; - _.ReleaseNow(); + m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + } + catch (ScrubDeadlineExpiredException&) + { + ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + } Ctx.ReportScrubbed(ChunkCount, ChunkBytes); diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index dc19a9a35..516a08f14 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -15,7 +15,9 @@ #include <zencore/testutils.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> #include <zenstore/cidstore.h> +#include <zenstore/scrubcontext.h> #include "cas.h" @@ -378,6 +380,17 @@ GcManager::RemoveGcStorage(GcStorage* Storage) } void +GcManager::ScrubStorage(ScrubContext& GcCtx) +{ + RwLock::SharedLockScope _(m_Lock); + + for (GcStorage* Storage : m_GcStorage) + { + Storage->ScrubStorage(GcCtx); + } +} + +void GcManager::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Gc::CollectGarbage"); @@ -435,6 +448,7 @@ GcManager::TotalStorageSize() const } ////////////////////////////////////////////////////////////////////////// + void DiskUsageWindow::KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick) { @@ -660,7 +674,9 @@ GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params) { m_TriggerGcParams = Params; uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); - if (m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning))) + + if (m_Status.compare_exchange_strong(/* expected */ IdleState, + /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning))) { m_GcSignal.notify_one(); return true; @@ -671,6 +687,27 @@ GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params) return false; } +bool +GcScheduler::TriggerScrub(const TriggerScrubParams& Params) +{ + std::unique_lock Lock(m_GcMutex); + + if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) + { + m_TriggerScrubParams = Params; + uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); + + if (m_Status.compare_exchange_strong(/* expected */ IdleState, /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning))) + { + m_GcSignal.notify_one(); + + return true; + } + } + + return false; +} + void GcScheduler::CheckDiskSpace(const DiskSpace& Space) { @@ -697,6 +734,8 @@ GcScheduler::CheckDiskSpace(const DiskSpace& Space) void GcScheduler::SchedulerThread() { + SetCurrentThreadName("GcScheduler"); + std::chrono::seconds WaitTime{0}; for (;;) @@ -713,7 +752,7 @@ GcScheduler::SchedulerThread() break; } - if (!m_Config.Enabled) + if (!m_Config.Enabled && !m_TriggerScrubParams) { WaitTime = std::chrono::seconds::max(); continue; @@ -724,18 +763,23 @@ GcScheduler::SchedulerThread() continue; } - bool Delete = true; + bool DoGc = m_Config.Enabled; + bool DoScrubbing = false; + std::chrono::seconds ScrubTimeslice = std::chrono::seconds::max(); + bool DoDelete = true; bool CollectSmallObjects = m_Config.CollectSmallObjects; std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration; std::chrono::seconds MaxProjectStoreDuration = m_Config.MaxProjectStoreDuration; uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit; GcClock::TimePoint Now = GcClock::Now(); + if (m_TriggerGcParams) { const auto TriggerParams = m_TriggerGcParams.value(); m_TriggerGcParams.reset(); CollectSmallObjects = TriggerParams.CollectSmallObjects; + if (TriggerParams.MaxCacheDuration != std::chrono::seconds::max()) { MaxCacheDuration = TriggerParams.MaxCacheDuration; @@ -750,6 +794,29 @@ GcScheduler::SchedulerThread() } } + if (m_TriggerScrubParams) + { + DoScrubbing = true; + + if (m_TriggerScrubParams->SkipGc) + { + DoGc = false; + } + + ScrubTimeslice = m_TriggerScrubParams->MaxTimeslice; + } + + if (DoScrubbing) + { + ScrubStorage(DoDelete, ScrubTimeslice); + m_TriggerScrubParams.reset(); + } + + if (!DoGc) + { + continue; + } + GcClock::TimePoint CacheExpireTime = MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration; GcClock::TimePoint ProjectStoreExpireTime = @@ -775,14 +842,15 @@ GcScheduler::SchedulerThread() const std::chrono::duration LoadGraphTime = PressureGraphLength * m_Config.MonitorInterval; std::vector<uint64_t> DiskDeltas; uint64_t MaxLoad = 0; + { const GcClock::Tick EpochTickCount = GcClock::Now().time_since_epoch().count(); std::unique_lock Lock(m_GcMutex); m_DiskUsageWindow.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize}); m_DiskUsageLog.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize}); const GcClock::TimePoint LoadGraphStartTime = Now - LoadGraphTime; - GcClock::Tick Start = LoadGraphStartTime.time_since_epoch().count(); - GcClock::Tick End = Now.time_since_epoch().count(); + const GcClock::Tick Start = LoadGraphStartTime.time_since_epoch().count(); + const GcClock::Tick End = Now.time_since_epoch().count(); DiskDeltas = m_DiskUsageWindow.GetDiskDeltas(Start, End, Max(1, (End - Start + PressureGraphLength - 1) / PressureGraphLength), @@ -818,7 +886,7 @@ GcScheduler::SchedulerThread() } } - bool DiskSpaceGCTriggered = GcDiskSpaceGoal > 0; + const bool DiskSpaceGCTriggered = GcDiskSpaceGoal > 0; std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now()); @@ -858,7 +926,7 @@ GcScheduler::SchedulerThread() } } - CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, Delete, CollectSmallObjects); + CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, DoDelete, CollectSmallObjects); uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning); if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle))) @@ -885,6 +953,36 @@ GcScheduler::NextGcTime(GcClock::TimePoint CurrentTime) } void +GcScheduler::ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice) +{ + const std::chrono::steady_clock::time_point TimeNow = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point Deadline = TimeNow + TimeSlice; + // there really should be a saturating add in std::chrono + if (Deadline < TimeNow) + { + Deadline = std::chrono::steady_clock::time_point::max(); + } + + Stopwatch Timer; + ZEN_INFO("scrubbing STARTING (delete mode => {})", DoDelete); + + WorkerThreadPool ThreadPool{4, "scrubber"}; + ScrubContext Ctx{ThreadPool, Deadline}; + + try + { + Ctx.SetShouldDelete(DoDelete); + m_GcManager.ScrubStorage(Ctx); + } + catch (ScrubDeadlineExpiredException&) + { + ZEN_INFO("scrubbing deadline expired (top level), operation incomplete!"); + } + + ZEN_INFO("scrubbing DONE (in {})", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); +} + +void GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcClock::TimePoint& ProjectStoreExpireTime, bool Delete, @@ -1354,6 +1452,36 @@ TEST_CASE("gc.diskusagewindow") CHECK(Stats.FindTimepointThatRemoves(100000u, 1000)); } } + +TEST_CASE("scrub.basic") +{ + using namespace gc::impl; + + ScopedTemporaryDirectory TempDir; + + CidStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path() / "cas"; + + GcManager Gc; + CidStore CidStore(Gc); + + CidStore.Initialize(CasConfig); + + IoBuffer Chunk = CreateChunk(128); + auto CompressedChunk = Compress(Chunk); + + const auto InsertResult = CidStore.AddChunk(CompressedChunk.GetCompressed().Flatten().AsIoBuffer(), CompressedChunk.DecodeRawHash()); + CHECK(InsertResult.New); + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + + CidStore.Flush(); + Gc.CollectGarbage(GcCtx); + + CHECK(!CidStore.ContainsChunk(CompressedChunk.DecodeRawHash())); +} + #endif void diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index 881936d0f..22b9bc284 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -22,9 +22,10 @@ class logger; namespace zen { -class HashKeySet; -class GcManager; class CidStore; +class GcManager; +class HashKeySet; +class ScrubContext; struct IoHash; struct DiskSpace; @@ -146,6 +147,7 @@ public: void RemoveGcStorage(GcStorage* Contributor); void CollectGarbage(GcContext& GcCtx); + void ScrubStorage(ScrubContext& GcCtx); GcStorageSize TotalStorageSize() const; @@ -226,29 +228,39 @@ public: bool TriggerGc(const TriggerGcParams& Params); + struct TriggerScrubParams + { + bool SkipGc = false; + std::chrono::seconds MaxTimeslice = std::chrono::seconds::max(); + }; + + bool TriggerScrub(const TriggerScrubParams& Params); + private: void SchedulerThread(); void CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcClock::TimePoint& ProjectStoreExpireTime, bool Delete, bool CollectSmallObjects); + void ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice); GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime); spdlog::logger& Log() { return m_Log; } virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); } void CheckDiskSpace(const DiskSpace& Space); - spdlog::logger& m_Log; - GcManager& m_GcManager; - GcSchedulerConfig m_Config; - GcClock::TimePoint m_LastGcTime{}; - GcClock::TimePoint m_LastGcExpireTime{}; - GcClock::TimePoint m_NextGcTime{}; - std::atomic_uint32_t m_Status{}; - std::thread m_GcThread; - std::mutex m_GcMutex; - std::condition_variable m_GcSignal; - std::optional<TriggerGcParams> m_TriggerGcParams; - std::atomic_bool m_AreDiskWritesBlocked = false; + spdlog::logger& m_Log; + GcManager& m_GcManager; + GcSchedulerConfig m_Config; + GcClock::TimePoint m_LastGcTime{}; + GcClock::TimePoint m_LastGcExpireTime{}; + GcClock::TimePoint m_NextGcTime{}; + std::atomic_uint32_t m_Status{}; + std::thread m_GcThread; + std::mutex m_GcMutex; + std::condition_variable m_GcSignal; + std::optional<TriggerGcParams> m_TriggerGcParams; + std::optional<TriggerScrubParams> m_TriggerScrubParams; + std::atomic_bool m_AreDiskWritesBlocked = false; TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog; DiskUsageWindow m_DiskUsageWindow; diff --git a/src/zenstore/include/zenstore/scrubcontext.h b/src/zenstore/include/zenstore/scrubcontext.h index 8b8ebac3d..cefaf0888 100644 --- a/src/zenstore/include/zenstore/scrubcontext.h +++ b/src/zenstore/include/zenstore/scrubcontext.h @@ -7,38 +7,59 @@ namespace zen { +class WorkerThreadPool; + /** Context object for data scrubbing - * - * Data scrubbing is when we traverse stored data to validate it and - * optionally correct/recover + + Data scrubbing is when we traverse stored data to validate it and + optionally correct/recover */ class ScrubContext { public: - ScrubContext(); + ScrubContext(WorkerThreadPool& InWorkerThreadPool, + std::chrono::steady_clock::time_point Deadline = std::chrono::steady_clock::time_point::max()); ~ScrubContext(); - virtual void ReportBadCidChunks(std::span<IoHash> BadCasChunks) { m_BadCid.AddHashesToSet(BadCasChunks); } + void ReportBadCidChunks(std::span<IoHash> BadCasChunks); inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } - inline bool RunRecovery() const { return m_Recover; } void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) { m_ChunkCount.fetch_add(ChunkCount); m_ByteCount.fetch_add(ChunkBytes); } + std::chrono::steady_clock::time_point GetDeadline() const { return m_Deadline; } + bool IsWithinDeadline() const; + void ThrowIfDeadlineExpired() const; + inline uint64_t ScrubbedChunks() const { return m_ChunkCount; } inline uint64_t ScrubbedBytes() const { return m_ByteCount; } - const HashKeySet BadCids() const { return m_BadCid; } + HashKeySet BadCids() const; + + inline bool RunRecovery() const { return m_Recover; } + inline void SetShouldDelete(bool DoDelete) { m_Recover = DoDelete; } + + inline WorkerThreadPool& ThreadPool() { return m_WorkerThreadPool; } private: - uint64_t m_ScrubTime = GetHifreqTimerValue(); - bool m_Recover = true; - std::atomic<uint64_t> m_ChunkCount{0}; - std::atomic<uint64_t> m_ByteCount{0}; - HashKeySet m_BadCid; + uint64_t m_ScrubTime = GetHifreqTimerValue(); + bool m_Recover = true; + std::atomic<uint64_t> m_ChunkCount{0}; + std::atomic<uint64_t> m_ByteCount{0}; + mutable RwLock m_Lock; + HashKeySet m_BadCid; + WorkerThreadPool& m_WorkerThreadPool; + std::chrono::steady_clock::time_point m_Deadline{}; +}; + +class ScrubDeadlineExpiredException : public std::runtime_error +{ +public: + ScrubDeadlineExpiredException(); + ~ScrubDeadlineExpiredException(); }; } // namespace zen diff --git a/src/zenstore/include/zenstore/zenstore.h b/src/zenstore/include/zenstore/zenstore.h index 46d62029d..29f3d2639 100644 --- a/src/zenstore/include/zenstore/zenstore.h +++ b/src/zenstore/include/zenstore/zenstore.h @@ -10,4 +10,4 @@ namespace zen { ZENSTORE_API void zenstore_forcelinktests(); -} +} // namespace zen diff --git a/src/zenstore/scrubcontext.cpp b/src/zenstore/scrubcontext.cpp index f35178de6..f5a3784c3 100644 --- a/src/zenstore/scrubcontext.cpp +++ b/src/zenstore/scrubcontext.cpp @@ -6,7 +6,19 @@ namespace zen { -ScrubContext::ScrubContext() +ScrubDeadlineExpiredException::ScrubDeadlineExpiredException() : std::runtime_error("scrubbing deadline expired") +{ +} + +ScrubDeadlineExpiredException::~ScrubDeadlineExpiredException() +{ +} + +////////////////////////////////////////////////////////////////////////// + +ScrubContext::ScrubContext(WorkerThreadPool& InWorkerThreadPool, std::chrono::steady_clock::time_point Deadline) +: m_WorkerThreadPool(InWorkerThreadPool) +, m_Deadline(Deadline) { } @@ -14,4 +26,33 @@ ScrubContext::~ScrubContext() { } -} // namespace zen
\ No newline at end of file +HashKeySet +ScrubContext::BadCids() const +{ + RwLock::SharedLockScope _(m_Lock); + return m_BadCid; +} + +void +ScrubContext::ReportBadCidChunks(std::span<IoHash> BadCasChunks) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_BadCid.AddHashesToSet(BadCasChunks); +} + +bool +ScrubContext::IsWithinDeadline() const +{ + return std::chrono::steady_clock::now() < m_Deadline; +} + +void +ScrubContext::ThrowIfDeadlineExpired() const +{ + if (IsWithinDeadline()) + return; + + throw ScrubDeadlineExpiredException(); +} + +} // namespace zen |