aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-16 21:35:39 +0200
committerGitHub <[email protected]>2023-05-16 21:35:39 +0200
commit81b2757f917e34bb338fad7965ae8a74e160bee4 (patch)
tree931ba100471a2369c62a6e41a1b4a7937ed31f6f /src/zenstore
parentadded benchmark utility command `bench` (#298) (diff)
downloadzen-81b2757f917e34bb338fad7965ae8a74e160bee4.tar.xz
zen-81b2757f917e34bb338fad7965ae8a74e160bee4.zip
Content scrubbing (#271)
Added zen scrub command which may be triggered via the zen CLI helper. This traverses storage and validates contents either by content hash and/or by structure. If unexpected data is encountered it is invalidated.
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/cas.cpp6
-rw-r--r--src/zenstore/compactcas.cpp139
-rw-r--r--src/zenstore/gc.cpp142
-rw-r--r--src/zenstore/include/zenstore/gc.h40
-rw-r--r--src/zenstore/include/zenstore/scrubcontext.h45
-rw-r--r--src/zenstore/include/zenstore/zenstore.h2
-rw-r--r--src/zenstore/scrubcontext.cpp45
7 files changed, 316 insertions, 103 deletions
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index ab05e3e7c..b98f01385 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -18,6 +18,7 @@
#include <zencore/thread.h>
#include <zencore/trace.h>
#include <zencore/uid.h>
+#include <zencore/workthreadpool.h>
#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
#include <zenstore/scrubcontext.h>
@@ -74,6 +75,8 @@ private:
void UpdateManifest();
};
+//////////////////////////////////////////////////////////////////////////
+
CasImpl::CasImpl(GcManager& Gc) : m_TinyStrategy(Gc), m_SmallStrategy(Gc), m_LargeStrategy(Gc)
{
}
@@ -323,7 +326,8 @@ TEST_CASE("CasStore")
std::unique_ptr<CasStore> Store = CreateCasStore(Gc);
Store->Initialize(config);
- ScrubContext Ctx;
+ WorkerThreadPool ThreadPool{1};
+ ScrubContext Ctx{ThreadPool};
Store->ScrubStorage(Ctx);
IoBuffer Value1{16};
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index a8a4dc102..e9037b16c 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -244,93 +244,100 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
- RwLock::SharedLockScope _(m_LocationMapLock);
-
- uint64_t TotalChunkCount = m_LocationMap.size();
- ChunkLocations.reserve(TotalChunkCount);
- ChunkIndexToChunkHash.reserve(TotalChunkCount);
+ try
{
- for (const auto& Entry : m_LocationMap)
+ RwLock::SharedLockScope _(m_LocationMapLock);
+
+ uint64_t TotalChunkCount = m_LocationMap.size();
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
{
- const IoHash& ChunkHash = Entry.first;
- const BlockStoreDiskLocation& DiskLocation = Entry.second;
- BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
+ for (const auto& Entry : m_LocationMap)
+ {
+ const IoHash& ChunkHash = Entry.first;
+ const BlockStoreDiskLocation& DiskLocation = Entry.second;
+ BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
- ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash.push_back(ChunkHash);
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash.push_back(ChunkHash);
+ }
}
- }
- const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
+ const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- if (!Data)
- {
- // ChunkLocation out of range of stored blocks
- BadKeys.push_back(Hash);
- return;
- }
-
- IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- if (RawHash != Hash)
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ if (!Data)
{
- // Hash mismatch
+ // ChunkLocation out of range of stored blocks
BadKeys.push_back(Hash);
return;
}
- return;
- }
+
+ IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ {
+ if (RawHash != Hash)
+ {
+ // Hash mismatch
+ BadKeys.push_back(Hash);
+ return;
+ }
+ return;
+ }
#if ZEN_WITH_TESTS
- IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
- if (ComputedHash == Hash)
- {
- return;
- }
+ IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
+ if (ComputedHash == Hash)
+ {
+ return;
+ }
#endif
- BadKeys.push_back(Hash);
- };
+ BadKeys.push_back(Hash);
+ };
- const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
+ const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ Ctx.ThrowIfDeadlineExpired();
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+ ++ChunkCount;
+ ChunkBytes += Size;
- IoHash RawHash;
- uint64_t RawSize;
- // TODO: Add API to verify compressed buffer without having to memorymap the whole file
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- if (RawHash != Hash)
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ // TODO: Add API to verify compressed buffer without having to memorymap the whole file
+ if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
{
- // Hash mismatch
- BadKeys.push_back(Hash);
+ if (RawHash != Hash)
+ {
+ // Hash mismatch
+ BadKeys.push_back(Hash);
+ return;
+ }
return;
}
- return;
- }
#if ZEN_WITH_TESTS
- IoHashStream Hasher;
- File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
- IoHash ComputedHash = Hasher.GetHash();
- if (ComputedHash == Hash)
- {
- return;
- }
+ IoHashStream Hasher;
+ File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
+ IoHash ComputedHash = Hasher.GetHash();
+ if (ComputedHash == Hash)
+ {
+ return;
+ }
#endif
- BadKeys.push_back(Hash);
- };
-
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+ BadKeys.push_back(Hash);
+ };
- _.ReleaseNow();
+ m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+ }
+ catch (ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ }
Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index dc19a9a35..516a08f14 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -15,7 +15,9 @@
#include <zencore/testutils.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
#include <zenstore/cidstore.h>
+#include <zenstore/scrubcontext.h>
#include "cas.h"
@@ -378,6 +380,17 @@ GcManager::RemoveGcStorage(GcStorage* Storage)
}
void
+GcManager::ScrubStorage(ScrubContext& GcCtx)
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (GcStorage* Storage : m_GcStorage)
+ {
+ Storage->ScrubStorage(GcCtx);
+ }
+}
+
+void
GcManager::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Gc::CollectGarbage");
@@ -435,6 +448,7 @@ GcManager::TotalStorageSize() const
}
//////////////////////////////////////////////////////////////////////////
+
void
DiskUsageWindow::KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick)
{
@@ -660,7 +674,9 @@ GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params)
{
m_TriggerGcParams = Params;
uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
- if (m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
+
+ if (m_Status.compare_exchange_strong(/* expected */ IdleState,
+ /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
{
m_GcSignal.notify_one();
return true;
@@ -671,6 +687,27 @@ GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params)
return false;
}
+bool
+GcScheduler::TriggerScrub(const TriggerScrubParams& Params)
+{
+ std::unique_lock Lock(m_GcMutex);
+
+ if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
+ {
+ m_TriggerScrubParams = Params;
+ uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
+
+ if (m_Status.compare_exchange_strong(/* expected */ IdleState, /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
+ {
+ m_GcSignal.notify_one();
+
+ return true;
+ }
+ }
+
+ return false;
+}
+
void
GcScheduler::CheckDiskSpace(const DiskSpace& Space)
{
@@ -697,6 +734,8 @@ GcScheduler::CheckDiskSpace(const DiskSpace& Space)
void
GcScheduler::SchedulerThread()
{
+ SetCurrentThreadName("GcScheduler");
+
std::chrono::seconds WaitTime{0};
for (;;)
@@ -713,7 +752,7 @@ GcScheduler::SchedulerThread()
break;
}
- if (!m_Config.Enabled)
+ if (!m_Config.Enabled && !m_TriggerScrubParams)
{
WaitTime = std::chrono::seconds::max();
continue;
@@ -724,18 +763,23 @@ GcScheduler::SchedulerThread()
continue;
}
- bool Delete = true;
+ bool DoGc = m_Config.Enabled;
+ bool DoScrubbing = false;
+ std::chrono::seconds ScrubTimeslice = std::chrono::seconds::max();
+ bool DoDelete = true;
bool CollectSmallObjects = m_Config.CollectSmallObjects;
std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration;
std::chrono::seconds MaxProjectStoreDuration = m_Config.MaxProjectStoreDuration;
uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit;
GcClock::TimePoint Now = GcClock::Now();
+
if (m_TriggerGcParams)
{
const auto TriggerParams = m_TriggerGcParams.value();
m_TriggerGcParams.reset();
CollectSmallObjects = TriggerParams.CollectSmallObjects;
+
if (TriggerParams.MaxCacheDuration != std::chrono::seconds::max())
{
MaxCacheDuration = TriggerParams.MaxCacheDuration;
@@ -750,6 +794,29 @@ GcScheduler::SchedulerThread()
}
}
+ if (m_TriggerScrubParams)
+ {
+ DoScrubbing = true;
+
+ if (m_TriggerScrubParams->SkipGc)
+ {
+ DoGc = false;
+ }
+
+ ScrubTimeslice = m_TriggerScrubParams->MaxTimeslice;
+ }
+
+ if (DoScrubbing)
+ {
+ ScrubStorage(DoDelete, ScrubTimeslice);
+ m_TriggerScrubParams.reset();
+ }
+
+ if (!DoGc)
+ {
+ continue;
+ }
+
GcClock::TimePoint CacheExpireTime =
MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration;
GcClock::TimePoint ProjectStoreExpireTime =
@@ -775,14 +842,15 @@ GcScheduler::SchedulerThread()
const std::chrono::duration LoadGraphTime = PressureGraphLength * m_Config.MonitorInterval;
std::vector<uint64_t> DiskDeltas;
uint64_t MaxLoad = 0;
+
{
const GcClock::Tick EpochTickCount = GcClock::Now().time_since_epoch().count();
std::unique_lock Lock(m_GcMutex);
m_DiskUsageWindow.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize});
m_DiskUsageLog.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize});
const GcClock::TimePoint LoadGraphStartTime = Now - LoadGraphTime;
- GcClock::Tick Start = LoadGraphStartTime.time_since_epoch().count();
- GcClock::Tick End = Now.time_since_epoch().count();
+ const GcClock::Tick Start = LoadGraphStartTime.time_since_epoch().count();
+ const GcClock::Tick End = Now.time_since_epoch().count();
DiskDeltas = m_DiskUsageWindow.GetDiskDeltas(Start,
End,
Max(1, (End - Start + PressureGraphLength - 1) / PressureGraphLength),
@@ -818,7 +886,7 @@ GcScheduler::SchedulerThread()
}
}
- bool DiskSpaceGCTriggered = GcDiskSpaceGoal > 0;
+ const bool DiskSpaceGCTriggered = GcDiskSpaceGoal > 0;
std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now());
@@ -858,7 +926,7 @@ GcScheduler::SchedulerThread()
}
}
- CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, Delete, CollectSmallObjects);
+ CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, DoDelete, CollectSmallObjects);
uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle)))
@@ -885,6 +953,36 @@ GcScheduler::NextGcTime(GcClock::TimePoint CurrentTime)
}
void
+GcScheduler::ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice)
+{
+ const std::chrono::steady_clock::time_point TimeNow = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point Deadline = TimeNow + TimeSlice;
+ // there really should be a saturating add in std::chrono
+ if (Deadline < TimeNow)
+ {
+ Deadline = std::chrono::steady_clock::time_point::max();
+ }
+
+ Stopwatch Timer;
+ ZEN_INFO("scrubbing STARTING (delete mode => {})", DoDelete);
+
+ WorkerThreadPool ThreadPool{4, "scrubber"};
+ ScrubContext Ctx{ThreadPool, Deadline};
+
+ try
+ {
+ Ctx.SetShouldDelete(DoDelete);
+ m_GcManager.ScrubStorage(Ctx);
+ }
+ catch (ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("scrubbing deadline expired (top level), operation incomplete!");
+ }
+
+ ZEN_INFO("scrubbing DONE (in {})", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+}
+
+void
GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
const GcClock::TimePoint& ProjectStoreExpireTime,
bool Delete,
@@ -1354,6 +1452,36 @@ TEST_CASE("gc.diskusagewindow")
CHECK(Stats.FindTimepointThatRemoves(100000u, 1000));
}
}
+
+TEST_CASE("scrub.basic")
+{
+ using namespace gc::impl;
+
+ ScopedTemporaryDirectory TempDir;
+
+ CidStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path() / "cas";
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+
+ CidStore.Initialize(CasConfig);
+
+ IoBuffer Chunk = CreateChunk(128);
+ auto CompressedChunk = Compress(Chunk);
+
+ const auto InsertResult = CidStore.AddChunk(CompressedChunk.GetCompressed().Flatten().AsIoBuffer(), CompressedChunk.DecodeRawHash());
+ CHECK(InsertResult.New);
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+
+ CidStore.Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ CHECK(!CidStore.ContainsChunk(CompressedChunk.DecodeRawHash()));
+}
+
#endif
void
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index 881936d0f..22b9bc284 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -22,9 +22,10 @@ class logger;
namespace zen {
-class HashKeySet;
-class GcManager;
class CidStore;
+class GcManager;
+class HashKeySet;
+class ScrubContext;
struct IoHash;
struct DiskSpace;
@@ -146,6 +147,7 @@ public:
void RemoveGcStorage(GcStorage* Contributor);
void CollectGarbage(GcContext& GcCtx);
+ void ScrubStorage(ScrubContext& GcCtx);
GcStorageSize TotalStorageSize() const;
@@ -226,29 +228,39 @@ public:
bool TriggerGc(const TriggerGcParams& Params);
+ struct TriggerScrubParams
+ {
+ bool SkipGc = false;
+ std::chrono::seconds MaxTimeslice = std::chrono::seconds::max();
+ };
+
+ bool TriggerScrub(const TriggerScrubParams& Params);
+
private:
void SchedulerThread();
void CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
const GcClock::TimePoint& ProjectStoreExpireTime,
bool Delete,
bool CollectSmallObjects);
+ void ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice);
GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime);
spdlog::logger& Log() { return m_Log; }
virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); }
void CheckDiskSpace(const DiskSpace& Space);
- spdlog::logger& m_Log;
- GcManager& m_GcManager;
- GcSchedulerConfig m_Config;
- GcClock::TimePoint m_LastGcTime{};
- GcClock::TimePoint m_LastGcExpireTime{};
- GcClock::TimePoint m_NextGcTime{};
- std::atomic_uint32_t m_Status{};
- std::thread m_GcThread;
- std::mutex m_GcMutex;
- std::condition_variable m_GcSignal;
- std::optional<TriggerGcParams> m_TriggerGcParams;
- std::atomic_bool m_AreDiskWritesBlocked = false;
+ spdlog::logger& m_Log;
+ GcManager& m_GcManager;
+ GcSchedulerConfig m_Config;
+ GcClock::TimePoint m_LastGcTime{};
+ GcClock::TimePoint m_LastGcExpireTime{};
+ GcClock::TimePoint m_NextGcTime{};
+ std::atomic_uint32_t m_Status{};
+ std::thread m_GcThread;
+ std::mutex m_GcMutex;
+ std::condition_variable m_GcSignal;
+ std::optional<TriggerGcParams> m_TriggerGcParams;
+ std::optional<TriggerScrubParams> m_TriggerScrubParams;
+ std::atomic_bool m_AreDiskWritesBlocked = false;
TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog;
DiskUsageWindow m_DiskUsageWindow;
diff --git a/src/zenstore/include/zenstore/scrubcontext.h b/src/zenstore/include/zenstore/scrubcontext.h
index 8b8ebac3d..cefaf0888 100644
--- a/src/zenstore/include/zenstore/scrubcontext.h
+++ b/src/zenstore/include/zenstore/scrubcontext.h
@@ -7,38 +7,59 @@
namespace zen {
+class WorkerThreadPool;
+
/** Context object for data scrubbing
- *
- * Data scrubbing is when we traverse stored data to validate it and
- * optionally correct/recover
+
+ Data scrubbing is when we traverse stored data to validate it and
+ optionally correct/recover
*/
class ScrubContext
{
public:
- ScrubContext();
+ ScrubContext(WorkerThreadPool& InWorkerThreadPool,
+ std::chrono::steady_clock::time_point Deadline = std::chrono::steady_clock::time_point::max());
~ScrubContext();
- virtual void ReportBadCidChunks(std::span<IoHash> BadCasChunks) { m_BadCid.AddHashesToSet(BadCasChunks); }
+ void ReportBadCidChunks(std::span<IoHash> BadCasChunks);
inline uint64_t ScrubTimestamp() const { return m_ScrubTime; }
- inline bool RunRecovery() const { return m_Recover; }
void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
{
m_ChunkCount.fetch_add(ChunkCount);
m_ByteCount.fetch_add(ChunkBytes);
}
+ std::chrono::steady_clock::time_point GetDeadline() const { return m_Deadline; }
+ bool IsWithinDeadline() const;
+ void ThrowIfDeadlineExpired() const;
+
inline uint64_t ScrubbedChunks() const { return m_ChunkCount; }
inline uint64_t ScrubbedBytes() const { return m_ByteCount; }
- const HashKeySet BadCids() const { return m_BadCid; }
+ HashKeySet BadCids() const;
+
+ inline bool RunRecovery() const { return m_Recover; }
+ inline void SetShouldDelete(bool DoDelete) { m_Recover = DoDelete; }
+
+ inline WorkerThreadPool& ThreadPool() { return m_WorkerThreadPool; }
private:
- uint64_t m_ScrubTime = GetHifreqTimerValue();
- bool m_Recover = true;
- std::atomic<uint64_t> m_ChunkCount{0};
- std::atomic<uint64_t> m_ByteCount{0};
- HashKeySet m_BadCid;
+ uint64_t m_ScrubTime = GetHifreqTimerValue();
+ bool m_Recover = true;
+ std::atomic<uint64_t> m_ChunkCount{0};
+ std::atomic<uint64_t> m_ByteCount{0};
+ mutable RwLock m_Lock;
+ HashKeySet m_BadCid;
+ WorkerThreadPool& m_WorkerThreadPool;
+ std::chrono::steady_clock::time_point m_Deadline{};
+};
+
+class ScrubDeadlineExpiredException : public std::runtime_error
+{
+public:
+ ScrubDeadlineExpiredException();
+ ~ScrubDeadlineExpiredException();
};
} // namespace zen
diff --git a/src/zenstore/include/zenstore/zenstore.h b/src/zenstore/include/zenstore/zenstore.h
index 46d62029d..29f3d2639 100644
--- a/src/zenstore/include/zenstore/zenstore.h
+++ b/src/zenstore/include/zenstore/zenstore.h
@@ -10,4 +10,4 @@ namespace zen {
ZENSTORE_API void zenstore_forcelinktests();
-}
+} // namespace zen
diff --git a/src/zenstore/scrubcontext.cpp b/src/zenstore/scrubcontext.cpp
index f35178de6..f5a3784c3 100644
--- a/src/zenstore/scrubcontext.cpp
+++ b/src/zenstore/scrubcontext.cpp
@@ -6,7 +6,19 @@
namespace zen {
-ScrubContext::ScrubContext()
+ScrubDeadlineExpiredException::ScrubDeadlineExpiredException() : std::runtime_error("scrubbing deadline expired")
+{
+}
+
+ScrubDeadlineExpiredException::~ScrubDeadlineExpiredException()
+{
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ScrubContext::ScrubContext(WorkerThreadPool& InWorkerThreadPool, std::chrono::steady_clock::time_point Deadline)
+: m_WorkerThreadPool(InWorkerThreadPool)
+, m_Deadline(Deadline)
{
}
@@ -14,4 +26,33 @@ ScrubContext::~ScrubContext()
{
}
-} // namespace zen \ No newline at end of file
+HashKeySet
+ScrubContext::BadCids() const
+{
+ RwLock::SharedLockScope _(m_Lock);
+ return m_BadCid;
+}
+
+void
+ScrubContext::ReportBadCidChunks(std::span<IoHash> BadCasChunks)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_BadCid.AddHashesToSet(BadCasChunks);
+}
+
+bool
+ScrubContext::IsWithinDeadline() const
+{
+ return std::chrono::steady_clock::now() < m_Deadline;
+}
+
+void
+ScrubContext::ThrowIfDeadlineExpired() const
+{
+ if (IsWithinDeadline())
+ return;
+
+ throw ScrubDeadlineExpiredException();
+}
+
+} // namespace zen