diff options
| author | Stefan Boberg <[email protected]> | 2023-05-16 21:35:39 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-16 21:35:39 +0200 |
| commit | 81b2757f917e34bb338fad7965ae8a74e160bee4 (patch) | |
| tree | 931ba100471a2369c62a6e41a1b4a7937ed31f6f /src | |
| parent | added benchmark utility command `bench` (#298) (diff) | |
| download | zen-81b2757f917e34bb338fad7965ae8a74e160bee4.tar.xz zen-81b2757f917e34bb338fad7965ae8a74e160bee4.zip | |
Content scrubbing (#271)
Added zen scrub command which may be triggered via the zen CLI helper. This traverses storage and validates contents either by content hash and/or by structure. If unexpected data is encountered it is invalidated.
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/admin/admin.cpp | 16 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 425 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 4 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 6 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 139 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 142 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 40 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/scrubcontext.h | 45 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/zenstore.h | 2 | ||||
| -rw-r--r-- | src/zenstore/scrubcontext.cpp | 45 |
10 files changed, 625 insertions, 239 deletions
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index c37622cb6..575a10d83 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -79,6 +79,22 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler) : m_GcScheduler(Sched HttpVerb::kPost); m_Router.RegisterRoute( + "scrub", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + GcScheduler::TriggerScrubParams ScrubParams; + ScrubParams.MaxTimeslice = std::chrono::seconds(100); + m_GcScheduler.TriggerScrub(ScrubParams); + + CbObjectWriter Response; + Response << "ok"sv << true; + HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( "", [](HttpRouterRequest& Req) { CbObject Payload = Req.ServerRequest().ReadPayloadObject(); diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index 3a6e5cbc3..440da3074 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -17,11 +17,13 @@ #include <zencore/thread.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> #include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> #include <xxhash.h> +#include <future> #include <limits> #if ZEN_PLATFORM_WINDOWS @@ -96,13 +98,18 @@ namespace { return BucketDir / (BucketName + LogExtension); } - bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason) + bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } + if (Entry.Location.Reserved != 0) + { + OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); + return false; + } if (Entry.Location.GetFlags() & ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) { @@ -283,6 +290,8 @@ ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx) return; } + ZEN_INFO("scrubbing '{}'", m_RootDir); + m_LastScrubTime = Ctx.ScrubTimestamp(); m_DiskLayer.ScrubStorage(Ctx); @@ -665,6 +674,12 @@ ZenCacheMemoryLayer::CacheBucket::EntryCount() const ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero) { + if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) + { + // This is pretty ad hoc but in order to avoid too many individual files + // it makes sense to have a different strategy for legacy values + m_LargeObjectThreshold = 16 * 1024 * 1024; + } } ZenCacheDiskLayer::CacheBucket::~CacheBucket() @@ -676,6 +691,10 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo { using namespace std::literals; + ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate"); + + ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); + m_BlocksBasePath = BucketDir / "blocks"; m_BucketDir = BucketDir; @@ -694,12 +713,12 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo { return false; } - uint32_t Version = Manifest["Version"sv].AsUInt32(0); - if (Version != CurrentDiskBucketVersion) - { - ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion); - IsNew = true; - } + // uint32_t Version = Manifest["Version"sv].AsUInt32(0); + // if (Version != CurrentDiskBucketVersion) + //{ + // ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion); + // IsNew = true; + // } } else if (AllowCreate) { @@ -745,8 +764,17 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo { size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - m_Payloads[EntryIndex].RawHash = Obj["RawHash"sv].AsHash(); - m_Payloads[EntryIndex].RawSize = Obj["RawSize"sv].AsUInt64(); + + const IoHash RawHash = Obj["RawHash"sv].AsHash(); + const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); + + if (RawHash == IoHash::Zero || RawSize == 0) + { + ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); + } + + m_Payloads[EntryIndex].RawHash = RawHash; + m_Payloads[EntryIndex].RawSize = RawSize; } } } @@ -757,18 +785,20 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo void ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() { + ZEN_TRACE_CPU("Z$::Bucket::MakeIndexSnapshot"); + uint64_t LogCount = m_SlogFile.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } - ZEN_DEBUG("write store snapshot for '{}'", m_BucketDir / m_BucketName); + ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", - m_BucketDir / m_BucketName, + m_BucketDir, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); @@ -792,10 +822,9 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() { // Write the current state of the location map to a new index state std::vector<DiskIndexEntry> Entries; + Entries.resize(m_Index.size()); { - Entries.resize(m_Index.size()); - uint64_t EntryIndex = 0; for (auto& Entry : m_Index) { @@ -841,6 +870,8 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() uint64_t ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) { + ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); + if (std::filesystem::is_regular_file(IndexPath)) { BasicFile ObjectIndexFile; @@ -886,7 +917,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index std::string InvalidEntryReason; for (const DiskIndexEntry& Entry : Entries) { - if (!ValidateEntry(Entry, InvalidEntryReason)) + if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; @@ -914,6 +945,8 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index uint64_t ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { + ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); + if (std::filesystem::is_regular_file(LogPath)) { uint64_t LogEntryCount = 0; @@ -942,7 +975,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui m_Index.erase(Record.Key); return; } - if (!ValidateEntry(Record, InvalidEntryReason)) + if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; @@ -956,7 +989,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui SkipEntryCount); if (InvalidEntryCount) { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); + ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); } return LogEntryCount; } @@ -967,6 +1000,8 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui void ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) { + ZEN_TRACE_CPU("Z$::Bucket::OpenLog"); + m_TotalStandaloneSize = 0; m_Index.clear(); @@ -1111,6 +1146,8 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con IoBuffer ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const { + ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue"); + ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); @@ -1194,6 +1231,8 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& bool ZenCacheDiskLayer::CacheBucket::Drop() { + ZEN_TRACE_CPU("Z$::Bucket::Drop"); + RwLock::ExclusiveLockScope _(m_IndexLock); std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks; @@ -1216,6 +1255,8 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { + ZEN_TRACE_CPU("Z$::Bucket::Flush"); + m_BlockStore.Flush(); RwLock::SharedLockScope _(m_IndexLock); @@ -1229,6 +1270,8 @@ ZenCacheDiskLayer::CacheBucket::SaveManifest() { using namespace std::literals; + ZEN_TRACE_CPU("Z$::Bucket::SaveManifest"); + CbObjectWriter Writer; Writer << "BucketId"sv << m_BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; @@ -1277,145 +1320,238 @@ ZenCacheDiskLayer::CacheBucket::SaveManifest() } } -void -ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) +IoHash +HashBuffer(const CompositeBuffer& Buffer) { - std::vector<IoHash> BadKeys; - uint64_t ChunkCount{0}, ChunkBytes{0}; - std::vector<BlockStoreLocation> ChunkLocations; - std::vector<IoHash> ChunkIndexToChunkHash; + IoHashStream Hasher; - auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { - if (ContentType == ZenContentType::kCbObject) + for (const SharedBuffer& Segment : Buffer.GetSegments()) + { + Hasher.Append(Segment.GetView()); + } + + return Hasher.GetHash(); +} + +bool +ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) +{ + ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType); + + if (ContentType == ZenContentType::kCbObject) + { + CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); + + if (Error == CbValidateError::None) { - CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); - return Error == CbValidateError::None; + return true; } - if (ContentType == ZenContentType::kCompressedBinary) + + ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error)); + + return false; + } + else if (ContentType == ZenContentType::kCompressedBinary) + { + IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer); + + IoHash HeaderRawHash; + uint64_t RawSize = 0; + if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize)) { - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - return false; - } - if (RawHash != Hash) - { - return false; - } + ZEN_SCOPED_ERROR("compressed buffer header validation failed"); + + return false; } - return true; - }; - RwLock::SharedLockScope _(m_IndexLock); + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize); + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + IoHash DecompressedHash = HashBuffer(Decompressed); - const size_t BlockChunkInitialCount = m_Index.size() / 4; - ChunkLocations.reserve(BlockChunkInitialCount); - ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); + if (HeaderRawHash != DecompressedHash) + { + ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash); - for (auto& Kv : m_Index) + return false; + } + } + else { - const IoHash& HashKey = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - const DiskLocation& Loc = Payload.Location; + // No way to verify this kind of content (what is it exactly?) + + static int Once = [&] { + ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType)); + return 42; + }(); + } + + return true; +}; + +void +ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) +{ + ZEN_TRACE_CPU("Z$::Bucket::Scrub"); + + ZEN_INFO("scrubbing '{}'", m_BucketDir); + + Stopwatch Timer; + uint64_t ChunkCount = 0; + uint64_t VerifiedChunkBytes = 0; + + auto LogStats = MakeGuard([&] { + const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs()); + + ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", + m_BucketName, + NiceBytes(VerifiedChunkBytes), + NiceTimeSpanMs(DurationMs), + ChunkCount, + NiceRate(VerifiedChunkBytes, DurationMs)); + }); - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + std::vector<IoHash> BadKeys; + auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); }; + + try + { + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkIndexToChunkHash; + + RwLock::SharedLockScope _(m_IndexLock); + + const size_t BlockChunkInitialCount = m_Index.size() / 4; + ChunkLocations.reserve(BlockChunkInitialCount); + ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); + + // Do a pass over the index and verify any standalone file values straight away + // all other storage classes are gathered and verified in bulk in order to enable + // more efficient I/O scheduling + + for (auto& Kv : m_Index) { - ++ChunkCount; - ChunkBytes += Loc.Size(); - if (Loc.GetContentType() == ZenContentType::kBinary) + const IoHash& HashKey = Kv.first; + const BucketPayload& Payload = m_Payloads[Kv.second]; + const DiskLocation& Loc = Payload.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); + Ctx.ThrowIfDeadlineExpired(); - RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + ++ChunkCount; + VerifiedChunkBytes += Loc.Size(); - std::error_code Ec; - uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); - if (Ec) + if (Loc.GetContentType() == ZenContentType::kBinary) { - BadKeys.push_back(HashKey); + // Blob cache value, not much we can do about data integrity checking + // here since there's no hash available + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); + + RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + + std::error_code Ec; + uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); + if (Ec) + { + ReportBadKey(HashKey); + } + if (size != Loc.Size()) + { + ReportBadKey(HashKey); + } + continue; } - if (size != Loc.Size()) + else { - BadKeys.push_back(HashKey); + // Structured cache value + IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); + if (!Buffer) + { + ReportBadKey(HashKey); + continue; + } + if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer)) + { + ReportBadKey(HashKey); + continue; + } } + } + else + { + ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); + ChunkIndexToChunkHash.push_back(HashKey); continue; } - IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); + } + + const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void { + ++ChunkCount; + VerifiedChunkBytes += Size; + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + if (!Data) + { + // ChunkLocation out of range of stored blocks + ReportBadKey(Hash); + return; + } + if (!Size) + { + ReportBadKey(Hash); + return; + } + IoBuffer Buffer(IoBuffer::Wrap, Data, Size); if (!Buffer) { - BadKeys.push_back(HashKey); - continue; + ReportBadKey(Hash); + return; } - if (!ValidateEntry(HashKey, Loc.GetContentType(), Buffer)) + const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; + ZenContentType ContentType = Payload.Location.GetContentType(); + Buffer.SetContentType(ContentType); + if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) { - BadKeys.push_back(HashKey); - continue; + ReportBadKey(Hash); + return; } - } - else - { - ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); - ChunkIndexToChunkHash.push_back(HashKey); - continue; - } - } - - const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - if (!Data) - { - // ChunkLocation out of range of stored blocks - BadKeys.push_back(Hash); - return; - } - IoBuffer Buffer(IoBuffer::Wrap, Data, Size); - if (!Buffer) - { - BadKeys.push_back(Hash); - return; - } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); - if (!ValidateEntry(Hash, ContentType, Buffer)) - { - BadKeys.push_back(Hash); - return; - } - }; + }; - const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file - IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); - if (!Buffer) - { - BadKeys.push_back(Hash); - return; - } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); - if (!ValidateEntry(Hash, ContentType, Buffer)) - { - BadKeys.push_back(Hash); - return; - } - }; + const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void { + Ctx.ThrowIfDeadlineExpired(); - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + ++ChunkCount; + VerifiedChunkBytes += Size; + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + if (!Buffer) + { + ReportBadKey(Hash); + return; + } + const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; + ZenContentType ContentType = Payload.Location.GetContentType(); + Buffer.SetContentType(ContentType); + if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) + { + ReportBadKey(Hash); + return; + } + }; - _.ReleaseNow(); + m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + } + catch (ScrubDeadlineExpiredException&) + { + ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + } - Ctx.ReportScrubbed(ChunkCount, ChunkBytes); + Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes); if (!BadKeys.empty()) { - ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName); + ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); if (Ctx.RunRecovery()) { @@ -1486,9 +1622,10 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do - Ctx.ReportBadCidChunks(BadKeys); - - ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); + if (!BadKeys.empty()) + { + Ctx.ReportBadCidChunks(BadKeys); + } } void @@ -1504,7 +1641,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", - m_BucketDir / m_BucketName, + m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), @@ -1598,7 +1735,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); - ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir / m_BucketName); + ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); Stopwatch TotalTimer; uint64_t WriteBlockTimeUs = 0; @@ -1618,7 +1755,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) "{} " "of {} " "entires ({}).", - m_BucketDir / m_BucketName, + m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), @@ -1647,7 +1784,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) }); if (DeleteCacheKeys.empty()) { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName); + ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir); return; } @@ -1700,7 +1837,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) }); if (m_Index.empty()) { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); + ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir); return; } BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); @@ -1851,7 +1988,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); uint64_t CurrentTotalSize = TotalSize(); ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}", - m_BucketDir / m_BucketName, + m_BucketDir, DeleteCount, TotalChunkCount, NiceBytes(CurrentTotalSize)); @@ -2010,6 +2147,8 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { + ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue"); + uint64_t NewFileSize = Value.Value.Size(); TemporaryFile DataFile; @@ -2398,10 +2537,25 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); - for (auto& Kv : m_Buckets) { - CacheBucket& Bucket = *Kv.second; - Bucket.ScrubStorage(Ctx); + std::vector<std::future<void>> Results; + Results.reserve(m_Buckets.size()); + + for (auto& Kv : m_Buckets) + { +#if 1 + Results.push_back( + Ctx.ThreadPool().EnqueueTask(std::packaged_task<void()>{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); +#else + CacheBucket& Bucket = *Kv.second; + Bucket.ScrubStorage(Ctx); +#endif + } + + for (auto& Result : Results) + { + Result.get(); + } } } @@ -3736,7 +3890,8 @@ TEST_CASE("z$.scrub") std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)}; CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes); - ScrubContext ScrubCtx; + WorkerThreadPool ThreadPool{1}; + ScrubContext ScrubCtx{ThreadPool}; Zcs.ScrubStorage(ScrubCtx); CidStore.ScrubStorage(ScrubCtx); CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size()); diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 827d5d2db..30c9e3937 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -14,6 +14,7 @@ #include <zencore/thread.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> #include <zenhttp/httpserver.h> #include <zenhttp/websocket.h> #include <zenstore/cidstore.h> @@ -622,7 +623,8 @@ public: Stopwatch Timer; ZEN_INFO("Storage validation STARTING"); - ScrubContext Ctx; + WorkerThreadPool ThreadPool{1}; + ScrubContext Ctx{ThreadPool}; m_CidStore->ScrubStorage(Ctx); m_ProjectStore->ScrubStorage(Ctx); m_StructuredCacheService->ScrubStorage(Ctx); diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index ab05e3e7c..b98f01385 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -18,6 +18,7 @@ #include <zencore/thread.h> #include <zencore/trace.h> #include <zencore/uid.h> +#include <zencore/workthreadpool.h> #include <zenstore/cidstore.h> #include <zenstore/gc.h> #include <zenstore/scrubcontext.h> @@ -74,6 +75,8 @@ private: void UpdateManifest(); }; +////////////////////////////////////////////////////////////////////////// + CasImpl::CasImpl(GcManager& Gc) : m_TinyStrategy(Gc), m_SmallStrategy(Gc), m_LargeStrategy(Gc) { } @@ -323,7 +326,8 @@ TEST_CASE("CasStore") std::unique_ptr<CasStore> Store = CreateCasStore(Gc); Store->Initialize(config); - ScrubContext Ctx; + WorkerThreadPool ThreadPool{1}; + ScrubContext Ctx{ThreadPool}; Store->ScrubStorage(Ctx); IoBuffer Value1{16}; diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index a8a4dc102..e9037b16c 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -244,93 +244,100 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) std::vector<BlockStoreLocation> ChunkLocations; std::vector<IoHash> ChunkIndexToChunkHash; - RwLock::SharedLockScope _(m_LocationMapLock); - - uint64_t TotalChunkCount = m_LocationMap.size(); - ChunkLocations.reserve(TotalChunkCount); - ChunkIndexToChunkHash.reserve(TotalChunkCount); + try { - for (const auto& Entry : m_LocationMap) + RwLock::SharedLockScope _(m_LocationMapLock); + + uint64_t TotalChunkCount = m_LocationMap.size(); + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); { - const IoHash& ChunkHash = Entry.first; - const BlockStoreDiskLocation& DiskLocation = Entry.second; - BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); + for (const auto& Entry : m_LocationMap) + { + const IoHash& ChunkHash = Entry.first; + const BlockStoreDiskLocation& DiskLocation = Entry.second; + BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); - ChunkLocations.push_back(Location); - ChunkIndexToChunkHash.push_back(ChunkHash); + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash.push_back(ChunkHash); + } } - } - const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; + const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - if (!Data) - { - // ChunkLocation out of range of stored blocks - BadKeys.push_back(Hash); - return; - } - - IoBuffer Buffer(IoBuffer::Wrap, Data, Size); - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - if (RawHash != Hash) + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + if (!Data) { - // Hash mismatch + // ChunkLocation out of range of stored blocks BadKeys.push_back(Hash); return; } - return; - } + + IoBuffer Buffer(IoBuffer::Wrap, Data, Size); + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + { + if (RawHash != Hash) + { + // Hash mismatch + BadKeys.push_back(Hash); + return; + } + return; + } #if ZEN_WITH_TESTS - IoHash ComputedHash = IoHash::HashBuffer(Data, Size); - if (ComputedHash == Hash) - { - return; - } + IoHash ComputedHash = IoHash::HashBuffer(Data, Size); + if (ComputedHash == Hash) + { + return; + } #endif - BadKeys.push_back(Hash); - }; + BadKeys.push_back(Hash); + }; - const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; + const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + Ctx.ThrowIfDeadlineExpired(); - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + ++ChunkCount; + ChunkBytes += Size; - IoHash RawHash; - uint64_t RawSize; - // TODO: Add API to verify compressed buffer without having to memorymap the whole file - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - if (RawHash != Hash) + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + + IoHash RawHash; + uint64_t RawSize; + // TODO: Add API to verify compressed buffer without having to memorymap the whole file + if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) { - // Hash mismatch - BadKeys.push_back(Hash); + if (RawHash != Hash) + { + // Hash mismatch + BadKeys.push_back(Hash); + return; + } return; } - return; - } #if ZEN_WITH_TESTS - IoHashStream Hasher; - File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); - IoHash ComputedHash = Hasher.GetHash(); - if (ComputedHash == Hash) - { - return; - } + IoHashStream Hasher; + File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + IoHash ComputedHash = Hasher.GetHash(); + if (ComputedHash == Hash) + { + return; + } #endif - BadKeys.push_back(Hash); - }; - - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + BadKeys.push_back(Hash); + }; - _.ReleaseNow(); + m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + } + catch (ScrubDeadlineExpiredException&) + { + ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + } Ctx.ReportScrubbed(ChunkCount, ChunkBytes); diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index dc19a9a35..516a08f14 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -15,7 +15,9 @@ #include <zencore/testutils.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> #include <zenstore/cidstore.h> +#include <zenstore/scrubcontext.h> #include "cas.h" @@ -378,6 +380,17 @@ GcManager::RemoveGcStorage(GcStorage* Storage) } void +GcManager::ScrubStorage(ScrubContext& GcCtx) +{ + RwLock::SharedLockScope _(m_Lock); + + for (GcStorage* Storage : m_GcStorage) + { + Storage->ScrubStorage(GcCtx); + } +} + +void GcManager::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Gc::CollectGarbage"); @@ -435,6 +448,7 @@ GcManager::TotalStorageSize() const } ////////////////////////////////////////////////////////////////////////// + void DiskUsageWindow::KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick) { @@ -660,7 +674,9 @@ GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params) { m_TriggerGcParams = Params; uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); - if (m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning))) + + if (m_Status.compare_exchange_strong(/* expected */ IdleState, + /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning))) { m_GcSignal.notify_one(); return true; @@ -671,6 +687,27 @@ GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params) return false; } +bool +GcScheduler::TriggerScrub(const TriggerScrubParams& Params) +{ + std::unique_lock Lock(m_GcMutex); + + if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) + { + m_TriggerScrubParams = Params; + uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); + + if (m_Status.compare_exchange_strong(/* expected */ IdleState, /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning))) + { + m_GcSignal.notify_one(); + + return true; + } + } + + return false; +} + void GcScheduler::CheckDiskSpace(const DiskSpace& Space) { @@ -697,6 +734,8 @@ GcScheduler::CheckDiskSpace(const DiskSpace& Space) void GcScheduler::SchedulerThread() { + SetCurrentThreadName("GcScheduler"); + std::chrono::seconds WaitTime{0}; for (;;) @@ -713,7 +752,7 @@ GcScheduler::SchedulerThread() break; } - if (!m_Config.Enabled) + if (!m_Config.Enabled && !m_TriggerScrubParams) { WaitTime = std::chrono::seconds::max(); continue; @@ -724,18 +763,23 @@ GcScheduler::SchedulerThread() continue; } - bool Delete = true; + bool DoGc = m_Config.Enabled; + bool DoScrubbing = false; + std::chrono::seconds ScrubTimeslice = std::chrono::seconds::max(); + bool DoDelete = true; bool CollectSmallObjects = m_Config.CollectSmallObjects; std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration; std::chrono::seconds MaxProjectStoreDuration = m_Config.MaxProjectStoreDuration; uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit; GcClock::TimePoint Now = GcClock::Now(); + if (m_TriggerGcParams) { const auto TriggerParams = m_TriggerGcParams.value(); m_TriggerGcParams.reset(); CollectSmallObjects = TriggerParams.CollectSmallObjects; + if (TriggerParams.MaxCacheDuration != std::chrono::seconds::max()) { MaxCacheDuration = TriggerParams.MaxCacheDuration; @@ -750,6 +794,29 @@ GcScheduler::SchedulerThread() } } + if (m_TriggerScrubParams) + { + DoScrubbing = true; + + if (m_TriggerScrubParams->SkipGc) + { + DoGc = false; + } + + ScrubTimeslice = m_TriggerScrubParams->MaxTimeslice; + } + + if (DoScrubbing) + { + ScrubStorage(DoDelete, ScrubTimeslice); + m_TriggerScrubParams.reset(); + } + + if (!DoGc) + { + continue; + } + GcClock::TimePoint CacheExpireTime = MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration; GcClock::TimePoint ProjectStoreExpireTime = @@ -775,14 +842,15 @@ GcScheduler::SchedulerThread() const std::chrono::duration LoadGraphTime = PressureGraphLength * m_Config.MonitorInterval; std::vector<uint64_t> DiskDeltas; uint64_t MaxLoad = 0; + { const GcClock::Tick EpochTickCount = GcClock::Now().time_since_epoch().count(); std::unique_lock Lock(m_GcMutex); m_DiskUsageWindow.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize}); m_DiskUsageLog.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize}); const GcClock::TimePoint LoadGraphStartTime = Now - LoadGraphTime; - GcClock::Tick Start = LoadGraphStartTime.time_since_epoch().count(); - GcClock::Tick End = Now.time_since_epoch().count(); + const GcClock::Tick Start = LoadGraphStartTime.time_since_epoch().count(); + const GcClock::Tick End = Now.time_since_epoch().count(); DiskDeltas = m_DiskUsageWindow.GetDiskDeltas(Start, End, Max(1, (End - Start + PressureGraphLength - 1) / PressureGraphLength), @@ -818,7 +886,7 @@ GcScheduler::SchedulerThread() } } - bool DiskSpaceGCTriggered = GcDiskSpaceGoal > 0; + const bool DiskSpaceGCTriggered = GcDiskSpaceGoal > 0; std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now()); @@ -858,7 +926,7 @@ GcScheduler::SchedulerThread() } } - CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, Delete, CollectSmallObjects); + CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, DoDelete, CollectSmallObjects); uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning); if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle))) @@ -885,6 +953,36 @@ GcScheduler::NextGcTime(GcClock::TimePoint CurrentTime) } void +GcScheduler::ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice) +{ + const std::chrono::steady_clock::time_point TimeNow = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point Deadline = TimeNow + TimeSlice; + // there really should be a saturating add in std::chrono + if (Deadline < TimeNow) + { + Deadline = std::chrono::steady_clock::time_point::max(); + } + + Stopwatch Timer; + ZEN_INFO("scrubbing STARTING (delete mode => {})", DoDelete); + + WorkerThreadPool ThreadPool{4, "scrubber"}; + ScrubContext Ctx{ThreadPool, Deadline}; + + try + { + Ctx.SetShouldDelete(DoDelete); + m_GcManager.ScrubStorage(Ctx); + } + catch (ScrubDeadlineExpiredException&) + { + ZEN_INFO("scrubbing deadline expired (top level), operation incomplete!"); + } + + ZEN_INFO("scrubbing DONE (in {})", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); +} + +void GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcClock::TimePoint& ProjectStoreExpireTime, bool Delete, @@ -1354,6 +1452,36 @@ TEST_CASE("gc.diskusagewindow") CHECK(Stats.FindTimepointThatRemoves(100000u, 1000)); } } + +TEST_CASE("scrub.basic") +{ + using namespace gc::impl; + + ScopedTemporaryDirectory TempDir; + + CidStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path() / "cas"; + + GcManager Gc; + CidStore CidStore(Gc); + + CidStore.Initialize(CasConfig); + + IoBuffer Chunk = CreateChunk(128); + auto CompressedChunk = Compress(Chunk); + + const auto InsertResult = CidStore.AddChunk(CompressedChunk.GetCompressed().Flatten().AsIoBuffer(), CompressedChunk.DecodeRawHash()); + CHECK(InsertResult.New); + + GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); + GcCtx.CollectSmallObjects(true); + + CidStore.Flush(); + Gc.CollectGarbage(GcCtx); + + CHECK(!CidStore.ContainsChunk(CompressedChunk.DecodeRawHash())); +} + #endif void diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index 881936d0f..22b9bc284 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -22,9 +22,10 @@ class logger; namespace zen { -class HashKeySet; -class GcManager; class CidStore; +class GcManager; +class HashKeySet; +class ScrubContext; struct IoHash; struct DiskSpace; @@ -146,6 +147,7 @@ public: void RemoveGcStorage(GcStorage* Contributor); void CollectGarbage(GcContext& GcCtx); + void ScrubStorage(ScrubContext& GcCtx); GcStorageSize TotalStorageSize() const; @@ -226,29 +228,39 @@ public: bool TriggerGc(const TriggerGcParams& Params); + struct TriggerScrubParams + { + bool SkipGc = false; + std::chrono::seconds MaxTimeslice = std::chrono::seconds::max(); + }; + + bool TriggerScrub(const TriggerScrubParams& Params); + private: void SchedulerThread(); void CollectGarbage(const GcClock::TimePoint& CacheExpireTime, const GcClock::TimePoint& ProjectStoreExpireTime, bool Delete, bool CollectSmallObjects); + void ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice); GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime); spdlog::logger& Log() { return m_Log; } virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); } void CheckDiskSpace(const DiskSpace& Space); - spdlog::logger& m_Log; - GcManager& m_GcManager; - GcSchedulerConfig m_Config; - GcClock::TimePoint m_LastGcTime{}; - GcClock::TimePoint m_LastGcExpireTime{}; - GcClock::TimePoint m_NextGcTime{}; - std::atomic_uint32_t m_Status{}; - std::thread m_GcThread; - std::mutex m_GcMutex; - std::condition_variable m_GcSignal; - std::optional<TriggerGcParams> m_TriggerGcParams; - std::atomic_bool m_AreDiskWritesBlocked = false; + spdlog::logger& m_Log; + GcManager& m_GcManager; + GcSchedulerConfig m_Config; + GcClock::TimePoint m_LastGcTime{}; + GcClock::TimePoint m_LastGcExpireTime{}; + GcClock::TimePoint m_NextGcTime{}; + std::atomic_uint32_t m_Status{}; + std::thread m_GcThread; + std::mutex m_GcMutex; + std::condition_variable m_GcSignal; + std::optional<TriggerGcParams> m_TriggerGcParams; + std::optional<TriggerScrubParams> m_TriggerScrubParams; + std::atomic_bool m_AreDiskWritesBlocked = false; TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog; DiskUsageWindow m_DiskUsageWindow; diff --git a/src/zenstore/include/zenstore/scrubcontext.h b/src/zenstore/include/zenstore/scrubcontext.h index 8b8ebac3d..cefaf0888 100644 --- a/src/zenstore/include/zenstore/scrubcontext.h +++ b/src/zenstore/include/zenstore/scrubcontext.h @@ -7,38 +7,59 @@ namespace zen { +class WorkerThreadPool; + /** Context object for data scrubbing - * - * Data scrubbing is when we traverse stored data to validate it and - * optionally correct/recover + + Data scrubbing is when we traverse stored data to validate it and + optionally correct/recover */ class ScrubContext { public: - ScrubContext(); + ScrubContext(WorkerThreadPool& InWorkerThreadPool, + std::chrono::steady_clock::time_point Deadline = std::chrono::steady_clock::time_point::max()); ~ScrubContext(); - virtual void ReportBadCidChunks(std::span<IoHash> BadCasChunks) { m_BadCid.AddHashesToSet(BadCasChunks); } + void ReportBadCidChunks(std::span<IoHash> BadCasChunks); inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } - inline bool RunRecovery() const { return m_Recover; } void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) { m_ChunkCount.fetch_add(ChunkCount); m_ByteCount.fetch_add(ChunkBytes); } + std::chrono::steady_clock::time_point GetDeadline() const { return m_Deadline; } + bool IsWithinDeadline() const; + void ThrowIfDeadlineExpired() const; + inline uint64_t ScrubbedChunks() const { return m_ChunkCount; } inline uint64_t ScrubbedBytes() const { return m_ByteCount; } - const HashKeySet BadCids() const { return m_BadCid; } + HashKeySet BadCids() const; + + inline bool RunRecovery() const { return m_Recover; } + inline void SetShouldDelete(bool DoDelete) { m_Recover = DoDelete; } + + inline WorkerThreadPool& ThreadPool() { return m_WorkerThreadPool; } private: - uint64_t m_ScrubTime = GetHifreqTimerValue(); - bool m_Recover = true; - std::atomic<uint64_t> m_ChunkCount{0}; - std::atomic<uint64_t> m_ByteCount{0}; - HashKeySet m_BadCid; + uint64_t m_ScrubTime = GetHifreqTimerValue(); + bool m_Recover = true; + std::atomic<uint64_t> m_ChunkCount{0}; + std::atomic<uint64_t> m_ByteCount{0}; + mutable RwLock m_Lock; + HashKeySet m_BadCid; + WorkerThreadPool& m_WorkerThreadPool; + std::chrono::steady_clock::time_point m_Deadline{}; +}; + +class ScrubDeadlineExpiredException : public std::runtime_error +{ +public: + ScrubDeadlineExpiredException(); + ~ScrubDeadlineExpiredException(); }; } // namespace zen diff --git a/src/zenstore/include/zenstore/zenstore.h b/src/zenstore/include/zenstore/zenstore.h index 46d62029d..29f3d2639 100644 --- a/src/zenstore/include/zenstore/zenstore.h +++ b/src/zenstore/include/zenstore/zenstore.h @@ -10,4 +10,4 @@ namespace zen { ZENSTORE_API void zenstore_forcelinktests(); -} +} // namespace zen diff --git a/src/zenstore/scrubcontext.cpp b/src/zenstore/scrubcontext.cpp index f35178de6..f5a3784c3 100644 --- a/src/zenstore/scrubcontext.cpp +++ b/src/zenstore/scrubcontext.cpp @@ -6,7 +6,19 @@ namespace zen { -ScrubContext::ScrubContext() +ScrubDeadlineExpiredException::ScrubDeadlineExpiredException() : std::runtime_error("scrubbing deadline expired") +{ +} + +ScrubDeadlineExpiredException::~ScrubDeadlineExpiredException() +{ +} + +////////////////////////////////////////////////////////////////////////// + +ScrubContext::ScrubContext(WorkerThreadPool& InWorkerThreadPool, std::chrono::steady_clock::time_point Deadline) +: m_WorkerThreadPool(InWorkerThreadPool) +, m_Deadline(Deadline) { } @@ -14,4 +26,33 @@ ScrubContext::~ScrubContext() { } -} // namespace zen
\ No newline at end of file +HashKeySet +ScrubContext::BadCids() const +{ + RwLock::SharedLockScope _(m_Lock); + return m_BadCid; +} + +void +ScrubContext::ReportBadCidChunks(std::span<IoHash> BadCasChunks) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_BadCid.AddHashesToSet(BadCasChunks); +} + +bool +ScrubContext::IsWithinDeadline() const +{ + return std::chrono::steady_clock::now() < m_Deadline; +} + +void +ScrubContext::ThrowIfDeadlineExpired() const +{ + if (IsWithinDeadline()) + return; + + throw ScrubDeadlineExpiredException(); +} + +} // namespace zen |