diff options
| author | Stefan Boberg <[email protected]> | 2023-05-16 21:35:39 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-16 21:35:39 +0200 |
| commit | 81b2757f917e34bb338fad7965ae8a74e160bee4 (patch) | |
| tree | 931ba100471a2369c62a6e41a1b4a7937ed31f6f /src/zenserver/cache/structuredcachestore.cpp | |
| parent | added benchmark utility command `bench` (#298) (diff) | |
| download | zen-81b2757f917e34bb338fad7965ae8a74e160bee4.tar.xz zen-81b2757f917e34bb338fad7965ae8a74e160bee4.zip | |
Content scrubbing (#271)
Added zen scrub command which may be triggered via the zen CLI helper. This traverses storage and validates contents either by content hash and/or by structure. If unexpected data is encountered it is invalidated.
Diffstat (limited to 'src/zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 425 |
1 files changed, 290 insertions, 135 deletions
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index 3a6e5cbc3..440da3074 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -17,11 +17,13 @@ #include <zencore/thread.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> #include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> #include <xxhash.h> +#include <future> #include <limits> #if ZEN_PLATFORM_WINDOWS @@ -96,13 +98,18 @@ namespace { return BucketDir / (BucketName + LogExtension); } - bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason) + bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } + if (Entry.Location.Reserved != 0) + { + OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); + return false; + } if (Entry.Location.GetFlags() & ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) { @@ -283,6 +290,8 @@ ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx) return; } + ZEN_INFO("scrubbing '{}'", m_RootDir); + m_LastScrubTime = Ctx.ScrubTimestamp(); m_DiskLayer.ScrubStorage(Ctx); @@ -665,6 +674,12 @@ ZenCacheMemoryLayer::CacheBucket::EntryCount() const ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero) { + if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) + { + // This is pretty ad hoc but in order to avoid too many individual files + // it makes sense to have a different strategy for legacy values + m_LargeObjectThreshold = 16 * 1024 * 1024; + } } ZenCacheDiskLayer::CacheBucket::~CacheBucket() @@ -676,6 +691,10 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo { using namespace std::literals; + ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate"); + + ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); + m_BlocksBasePath = BucketDir / "blocks"; m_BucketDir = BucketDir; @@ -694,12 +713,12 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo { return false; } - uint32_t Version = Manifest["Version"sv].AsUInt32(0); - if (Version != CurrentDiskBucketVersion) - { - ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion); - IsNew = true; - } + // uint32_t Version = Manifest["Version"sv].AsUInt32(0); + // if (Version != CurrentDiskBucketVersion) + //{ + // ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion); + // IsNew = true; + // } } else if (AllowCreate) { @@ -745,8 +764,17 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo { size_t EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - m_Payloads[EntryIndex].RawHash = Obj["RawHash"sv].AsHash(); - m_Payloads[EntryIndex].RawSize = Obj["RawSize"sv].AsUInt64(); + + const IoHash RawHash = Obj["RawHash"sv].AsHash(); + const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); + + if (RawHash == IoHash::Zero || RawSize == 0) + { + ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); + } + + m_Payloads[EntryIndex].RawHash = RawHash; + m_Payloads[EntryIndex].RawSize = RawSize; } } } @@ -757,18 +785,20 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo void ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() { + ZEN_TRACE_CPU("Z$::Bucket::MakeIndexSnapshot"); + uint64_t LogCount = m_SlogFile.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } - ZEN_DEBUG("write store snapshot for '{}'", m_BucketDir / m_BucketName); + ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", - m_BucketDir / m_BucketName, + m_BucketDir, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); @@ -792,10 +822,9 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() { // Write the current state of the location map to a new index state std::vector<DiskIndexEntry> Entries; + Entries.resize(m_Index.size()); { - Entries.resize(m_Index.size()); - uint64_t EntryIndex = 0; for (auto& Entry : m_Index) { @@ -841,6 +870,8 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() uint64_t ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) { + ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); + if (std::filesystem::is_regular_file(IndexPath)) { BasicFile ObjectIndexFile; @@ -886,7 +917,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index std::string InvalidEntryReason; for (const DiskIndexEntry& Entry : Entries) { - if (!ValidateEntry(Entry, InvalidEntryReason)) + if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; @@ -914,6 +945,8 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index uint64_t ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { + ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); + if (std::filesystem::is_regular_file(LogPath)) { uint64_t LogEntryCount = 0; @@ -942,7 +975,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui m_Index.erase(Record.Key); return; } - if (!ValidateEntry(Record, InvalidEntryReason)) + if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; @@ -956,7 +989,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui SkipEntryCount); if (InvalidEntryCount) { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); + ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); } return LogEntryCount; } @@ -967,6 +1000,8 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui void ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) { + ZEN_TRACE_CPU("Z$::Bucket::OpenLog"); + m_TotalStandaloneSize = 0; m_Index.clear(); @@ -1111,6 +1146,8 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con IoBuffer ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const { + ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue"); + ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); @@ -1194,6 +1231,8 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& bool ZenCacheDiskLayer::CacheBucket::Drop() { + ZEN_TRACE_CPU("Z$::Bucket::Drop"); + RwLock::ExclusiveLockScope _(m_IndexLock); std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks; @@ -1216,6 +1255,8 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { + ZEN_TRACE_CPU("Z$::Bucket::Flush"); + m_BlockStore.Flush(); RwLock::SharedLockScope _(m_IndexLock); @@ -1229,6 +1270,8 @@ ZenCacheDiskLayer::CacheBucket::SaveManifest() { using namespace std::literals; + ZEN_TRACE_CPU("Z$::Bucket::SaveManifest"); + CbObjectWriter Writer; Writer << "BucketId"sv << m_BucketId; Writer << "Version"sv << CurrentDiskBucketVersion; @@ -1277,145 +1320,238 @@ ZenCacheDiskLayer::CacheBucket::SaveManifest() } } -void -ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) +IoHash +HashBuffer(const CompositeBuffer& Buffer) { - std::vector<IoHash> BadKeys; - uint64_t ChunkCount{0}, ChunkBytes{0}; - std::vector<BlockStoreLocation> ChunkLocations; - std::vector<IoHash> ChunkIndexToChunkHash; + IoHashStream Hasher; - auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { - if (ContentType == ZenContentType::kCbObject) + for (const SharedBuffer& Segment : Buffer.GetSegments()) + { + Hasher.Append(Segment.GetView()); + } + + return Hasher.GetHash(); +} + +bool +ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) +{ + ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType); + + if (ContentType == ZenContentType::kCbObject) + { + CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); + + if (Error == CbValidateError::None) { - CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); - return Error == CbValidateError::None; + return true; } - if (ContentType == ZenContentType::kCompressedBinary) + + ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error)); + + return false; + } + else if (ContentType == ZenContentType::kCompressedBinary) + { + IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer); + + IoHash HeaderRawHash; + uint64_t RawSize = 0; + if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize)) { - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - return false; - } - if (RawHash != Hash) - { - return false; - } + ZEN_SCOPED_ERROR("compressed buffer header validation failed"); + + return false; } - return true; - }; - RwLock::SharedLockScope _(m_IndexLock); + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize); + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + IoHash DecompressedHash = HashBuffer(Decompressed); - const size_t BlockChunkInitialCount = m_Index.size() / 4; - ChunkLocations.reserve(BlockChunkInitialCount); - ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); + if (HeaderRawHash != DecompressedHash) + { + ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash); - for (auto& Kv : m_Index) + return false; + } + } + else { - const IoHash& HashKey = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - const DiskLocation& Loc = Payload.Location; + // No way to verify this kind of content (what is it exactly?) + + static int Once = [&] { + ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType)); + return 42; + }(); + } + + return true; +}; + +void +ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) +{ + ZEN_TRACE_CPU("Z$::Bucket::Scrub"); + + ZEN_INFO("scrubbing '{}'", m_BucketDir); + + Stopwatch Timer; + uint64_t ChunkCount = 0; + uint64_t VerifiedChunkBytes = 0; + + auto LogStats = MakeGuard([&] { + const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs()); + + ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", + m_BucketName, + NiceBytes(VerifiedChunkBytes), + NiceTimeSpanMs(DurationMs), + ChunkCount, + NiceRate(VerifiedChunkBytes, DurationMs)); + }); - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + std::vector<IoHash> BadKeys; + auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); }; + + try + { + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkIndexToChunkHash; + + RwLock::SharedLockScope _(m_IndexLock); + + const size_t BlockChunkInitialCount = m_Index.size() / 4; + ChunkLocations.reserve(BlockChunkInitialCount); + ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); + + // Do a pass over the index and verify any standalone file values straight away + // all other storage classes are gathered and verified in bulk in order to enable + // more efficient I/O scheduling + + for (auto& Kv : m_Index) { - ++ChunkCount; - ChunkBytes += Loc.Size(); - if (Loc.GetContentType() == ZenContentType::kBinary) + const IoHash& HashKey = Kv.first; + const BucketPayload& Payload = m_Payloads[Kv.second]; + const DiskLocation& Loc = Payload.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); + Ctx.ThrowIfDeadlineExpired(); - RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + ++ChunkCount; + VerifiedChunkBytes += Loc.Size(); - std::error_code Ec; - uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); - if (Ec) + if (Loc.GetContentType() == ZenContentType::kBinary) { - BadKeys.push_back(HashKey); + // Blob cache value, not much we can do about data integrity checking + // here since there's no hash available + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); + + RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + + std::error_code Ec; + uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); + if (Ec) + { + ReportBadKey(HashKey); + } + if (size != Loc.Size()) + { + ReportBadKey(HashKey); + } + continue; } - if (size != Loc.Size()) + else { - BadKeys.push_back(HashKey); + // Structured cache value + IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); + if (!Buffer) + { + ReportBadKey(HashKey); + continue; + } + if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer)) + { + ReportBadKey(HashKey); + continue; + } } + } + else + { + ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); + ChunkIndexToChunkHash.push_back(HashKey); continue; } - IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); + } + + const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void { + ++ChunkCount; + VerifiedChunkBytes += Size; + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + if (!Data) + { + // ChunkLocation out of range of stored blocks + ReportBadKey(Hash); + return; + } + if (!Size) + { + ReportBadKey(Hash); + return; + } + IoBuffer Buffer(IoBuffer::Wrap, Data, Size); if (!Buffer) { - BadKeys.push_back(HashKey); - continue; + ReportBadKey(Hash); + return; } - if (!ValidateEntry(HashKey, Loc.GetContentType(), Buffer)) + const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; + ZenContentType ContentType = Payload.Location.GetContentType(); + Buffer.SetContentType(ContentType); + if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) { - BadKeys.push_back(HashKey); - continue; + ReportBadKey(Hash); + return; } - } - else - { - ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); - ChunkIndexToChunkHash.push_back(HashKey); - continue; - } - } - - const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - if (!Data) - { - // ChunkLocation out of range of stored blocks - BadKeys.push_back(Hash); - return; - } - IoBuffer Buffer(IoBuffer::Wrap, Data, Size); - if (!Buffer) - { - BadKeys.push_back(Hash); - return; - } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); - if (!ValidateEntry(Hash, ContentType, Buffer)) - { - BadKeys.push_back(Hash); - return; - } - }; + }; - const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file - IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); - if (!Buffer) - { - BadKeys.push_back(Hash); - return; - } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); - if (!ValidateEntry(Hash, ContentType, Buffer)) - { - BadKeys.push_back(Hash); - return; - } - }; + const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void { + Ctx.ThrowIfDeadlineExpired(); - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + ++ChunkCount; + VerifiedChunkBytes += Size; + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + if (!Buffer) + { + ReportBadKey(Hash); + return; + } + const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; + ZenContentType ContentType = Payload.Location.GetContentType(); + Buffer.SetContentType(ContentType); + if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) + { + ReportBadKey(Hash); + return; + } + }; - _.ReleaseNow(); + m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); + } + catch (ScrubDeadlineExpiredException&) + { + ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + } - Ctx.ReportScrubbed(ChunkCount, ChunkBytes); + Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes); if (!BadKeys.empty()) { - ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName); + ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); if (Ctx.RunRecovery()) { @@ -1486,9 +1622,10 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do - Ctx.ReportBadCidChunks(BadKeys); - - ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); + if (!BadKeys.empty()) + { + Ctx.ReportBadCidChunks(BadKeys); + } } void @@ -1504,7 +1641,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", - m_BucketDir / m_BucketName, + m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), @@ -1598,7 +1735,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); - ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir / m_BucketName); + ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); Stopwatch TotalTimer; uint64_t WriteBlockTimeUs = 0; @@ -1618,7 +1755,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) "{} " "of {} " "entires ({}).", - m_BucketDir / m_BucketName, + m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), @@ -1647,7 +1784,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) }); if (DeleteCacheKeys.empty()) { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName); + ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir); return; } @@ -1700,7 +1837,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) }); if (m_Index.empty()) { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); + ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir); return; } BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); @@ -1851,7 +1988,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); uint64_t CurrentTotalSize = TotalSize(); ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}", - m_BucketDir / m_BucketName, + m_BucketDir, DeleteCount, TotalChunkCount, NiceBytes(CurrentTotalSize)); @@ -2010,6 +2147,8 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { + ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue"); + uint64_t NewFileSize = Value.Value.Size(); TemporaryFile DataFile; @@ -2398,10 +2537,25 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); - for (auto& Kv : m_Buckets) { - CacheBucket& Bucket = *Kv.second; - Bucket.ScrubStorage(Ctx); + std::vector<std::future<void>> Results; + Results.reserve(m_Buckets.size()); + + for (auto& Kv : m_Buckets) + { +#if 1 + Results.push_back( + Ctx.ThreadPool().EnqueueTask(std::packaged_task<void()>{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); +#else + CacheBucket& Bucket = *Kv.second; + Bucket.ScrubStorage(Ctx); +#endif + } + + for (auto& Result : Results) + { + Result.get(); + } } } @@ -3736,7 +3890,8 @@ TEST_CASE("z$.scrub") std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)}; CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes); - ScrubContext ScrubCtx; + WorkerThreadPool ThreadPool{1}; + ScrubContext ScrubCtx{ThreadPool}; Zcs.ScrubStorage(ScrubCtx); CidStore.ScrubStorage(ScrubCtx); CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size()); |