aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-16 21:35:39 +0200
committerGitHub <[email protected]>2023-05-16 21:35:39 +0200
commit81b2757f917e34bb338fad7965ae8a74e160bee4 (patch)
tree931ba100471a2369c62a6e41a1b4a7937ed31f6f /src/zenserver/cache/structuredcachestore.cpp
parentadded benchmark utility command `bench` (#298) (diff)
downloadzen-81b2757f917e34bb338fad7965ae8a74e160bee4.tar.xz
zen-81b2757f917e34bb338fad7965ae8a74e160bee4.zip
Content scrubbing (#271)
Added zen scrub command which may be triggered via the zen CLI helper. This traverses storage and validates contents either by content hash and/or by structure. If unexpected data is encountered it is invalidated.
Diffstat (limited to 'src/zenserver/cache/structuredcachestore.cpp')
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp425
1 files changed, 290 insertions, 135 deletions
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index 3a6e5cbc3..440da3074 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -17,11 +17,13 @@
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
#include <xxhash.h>
+#include <future>
#include <limits>
#if ZEN_PLATFORM_WINDOWS
@@ -96,13 +98,18 @@ namespace {
return BucketDir / (BucketName + LogExtension);
}
- bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason)
+ bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason)
{
if (Entry.Key == IoHash::Zero)
{
OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
return false;
}
+ if (Entry.Location.Reserved != 0)
+ {
+ OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString());
+ return false;
+ }
if (Entry.Location.GetFlags() &
~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed))
{
@@ -283,6 +290,8 @@ ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx)
return;
}
+ ZEN_INFO("scrubbing '{}'", m_RootDir);
+
m_LastScrubTime = Ctx.ScrubTimestamp();
m_DiskLayer.ScrubStorage(Ctx);
@@ -665,6 +674,12 @@ ZenCacheMemoryLayer::CacheBucket::EntryCount() const
ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero)
{
+ if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap")))
+ {
+ // This is pretty ad hoc but in order to avoid too many individual files
+ // it makes sense to have a different strategy for legacy values
+ m_LargeObjectThreshold = 16 * 1024 * 1024;
+ }
}
ZenCacheDiskLayer::CacheBucket::~CacheBucket()
@@ -676,6 +691,10 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
using namespace std::literals;
+ ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate");
+
+ ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir);
+
m_BlocksBasePath = BucketDir / "blocks";
m_BucketDir = BucketDir;
@@ -694,12 +713,12 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
return false;
}
- uint32_t Version = Manifest["Version"sv].AsUInt32(0);
- if (Version != CurrentDiskBucketVersion)
- {
- ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion);
- IsNew = true;
- }
+ // uint32_t Version = Manifest["Version"sv].AsUInt32(0);
+ // if (Version != CurrentDiskBucketVersion)
+ //{
+ // ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion);
+ // IsNew = true;
+ // }
}
else if (AllowCreate)
{
@@ -745,8 +764,17 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
size_t EntryIndex = It.value();
ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
- m_Payloads[EntryIndex].RawHash = Obj["RawHash"sv].AsHash();
- m_Payloads[EntryIndex].RawSize = Obj["RawSize"sv].AsUInt64();
+
+ const IoHash RawHash = Obj["RawHash"sv].AsHash();
+ const uint64_t RawSize = Obj["RawSize"sv].AsUInt64();
+
+ if (RawHash == IoHash::Zero || RawSize == 0)
+ {
+ ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex);
+ }
+
+ m_Payloads[EntryIndex].RawHash = RawHash;
+ m_Payloads[EntryIndex].RawSize = RawSize;
}
}
}
@@ -757,18 +785,20 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
void
ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
{
+ ZEN_TRACE_CPU("Z$::Bucket::MakeIndexSnapshot");
+
uint64_t LogCount = m_SlogFile.GetLogCount();
if (m_LogFlushPosition == LogCount)
{
return;
}
- ZEN_DEBUG("write store snapshot for '{}'", m_BucketDir / m_BucketName);
+ ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir);
uint64_t EntryCount = 0;
Stopwatch Timer;
const auto _ = MakeGuard([&] {
ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
- m_BucketDir / m_BucketName,
+ m_BucketDir,
EntryCount,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
@@ -792,10 +822,9 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
{
// Write the current state of the location map to a new index state
std::vector<DiskIndexEntry> Entries;
+ Entries.resize(m_Index.size());
{
- Entries.resize(m_Index.size());
-
uint64_t EntryIndex = 0;
for (auto& Entry : m_Index)
{
@@ -841,6 +870,8 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
uint64_t
ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion)
{
+ ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile");
+
if (std::filesystem::is_regular_file(IndexPath))
{
BasicFile ObjectIndexFile;
@@ -886,7 +917,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
std::string InvalidEntryReason;
for (const DiskIndexEntry& Entry : Entries)
{
- if (!ValidateEntry(Entry, InvalidEntryReason))
+ if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason))
{
ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
continue;
@@ -914,6 +945,8 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
uint64_t
ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
{
+ ZEN_TRACE_CPU("Z$::Bucket::ReadLog");
+
if (std::filesystem::is_regular_file(LogPath))
{
uint64_t LogEntryCount = 0;
@@ -942,7 +975,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
m_Index.erase(Record.Key);
return;
}
- if (!ValidateEntry(Record, InvalidEntryReason))
+ if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
{
ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
++InvalidEntryCount;
@@ -956,7 +989,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
SkipEntryCount);
if (InvalidEntryCount)
{
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
+ ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
}
return LogEntryCount;
}
@@ -967,6 +1000,8 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
void
ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
{
+ ZEN_TRACE_CPU("Z$::Bucket::OpenLog");
+
m_TotalStandaloneSize = 0;
m_Index.clear();
@@ -1111,6 +1146,8 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con
IoBuffer
ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const
{
+ ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue");
+
ExtendablePathBuilder<256> DataFilePath;
BuildPath(DataFilePath, HashKey);
@@ -1194,6 +1231,8 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
bool
ZenCacheDiskLayer::CacheBucket::Drop()
{
+ ZEN_TRACE_CPU("Z$::Bucket::Drop");
+
RwLock::ExclusiveLockScope _(m_IndexLock);
std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks;
@@ -1216,6 +1255,8 @@ ZenCacheDiskLayer::CacheBucket::Drop()
void
ZenCacheDiskLayer::CacheBucket::Flush()
{
+ ZEN_TRACE_CPU("Z$::Bucket::Flush");
+
m_BlockStore.Flush();
RwLock::SharedLockScope _(m_IndexLock);
@@ -1229,6 +1270,8 @@ ZenCacheDiskLayer::CacheBucket::SaveManifest()
{
using namespace std::literals;
+ ZEN_TRACE_CPU("Z$::Bucket::SaveManifest");
+
CbObjectWriter Writer;
Writer << "BucketId"sv << m_BucketId;
Writer << "Version"sv << CurrentDiskBucketVersion;
@@ -1277,145 +1320,238 @@ ZenCacheDiskLayer::CacheBucket::SaveManifest()
}
}
-void
-ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
+IoHash
+HashBuffer(const CompositeBuffer& Buffer)
{
- std::vector<IoHash> BadKeys;
- uint64_t ChunkCount{0}, ChunkBytes{0};
- std::vector<BlockStoreLocation> ChunkLocations;
- std::vector<IoHash> ChunkIndexToChunkHash;
+ IoHashStream Hasher;
- auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
- if (ContentType == ZenContentType::kCbObject)
+ for (const SharedBuffer& Segment : Buffer.GetSegments())
+ {
+ Hasher.Append(Segment.GetView());
+ }
+
+ return Hasher.GetHash();
+}
+
+bool
+ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer)
+{
+ ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType);
+
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
+
+ if (Error == CbValidateError::None)
{
- CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
- return Error == CbValidateError::None;
+ return true;
}
- if (ContentType == ZenContentType::kCompressedBinary)
+
+ ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error));
+
+ return false;
+ }
+ else if (ContentType == ZenContentType::kCompressedBinary)
+ {
+ IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer);
+
+ IoHash HeaderRawHash;
+ uint64_t RawSize = 0;
+ if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize))
{
- IoHash RawHash;
- uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- return false;
- }
- if (RawHash != Hash)
- {
- return false;
- }
+ ZEN_SCOPED_ERROR("compressed buffer header validation failed");
+
+ return false;
}
- return true;
- };
- RwLock::SharedLockScope _(m_IndexLock);
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize);
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ IoHash DecompressedHash = HashBuffer(Decompressed);
- const size_t BlockChunkInitialCount = m_Index.size() / 4;
- ChunkLocations.reserve(BlockChunkInitialCount);
- ChunkIndexToChunkHash.reserve(BlockChunkInitialCount);
+ if (HeaderRawHash != DecompressedHash)
+ {
+ ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash);
- for (auto& Kv : m_Index)
+ return false;
+ }
+ }
+ else
{
- const IoHash& HashKey = Kv.first;
- const BucketPayload& Payload = m_Payloads[Kv.second];
- const DiskLocation& Loc = Payload.Location;
+ // No way to verify this kind of content (what is it exactly?)
+
+ static int Once = [&] {
+ ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType));
+ return 42;
+ }();
+ }
+
+ return true;
+};
+
+void
+ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::Scrub");
+
+ ZEN_INFO("scrubbing '{}'", m_BucketDir);
+
+ Stopwatch Timer;
+ uint64_t ChunkCount = 0;
+ uint64_t VerifiedChunkBytes = 0;
+
+ auto LogStats = MakeGuard([&] {
+ const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs());
+
+ ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})",
+ m_BucketName,
+ NiceBytes(VerifiedChunkBytes),
+ NiceTimeSpanMs(DurationMs),
+ ChunkCount,
+ NiceRate(VerifiedChunkBytes, DurationMs));
+ });
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ std::vector<IoHash> BadKeys;
+ auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); };
+
+ try
+ {
+ std::vector<BlockStoreLocation> ChunkLocations;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+
+ RwLock::SharedLockScope _(m_IndexLock);
+
+ const size_t BlockChunkInitialCount = m_Index.size() / 4;
+ ChunkLocations.reserve(BlockChunkInitialCount);
+ ChunkIndexToChunkHash.reserve(BlockChunkInitialCount);
+
+ // Do a pass over the index and verify any standalone file values straight away
+ // all other storage classes are gathered and verified in bulk in order to enable
+ // more efficient I/O scheduling
+
+ for (auto& Kv : m_Index)
{
- ++ChunkCount;
- ChunkBytes += Loc.Size();
- if (Loc.GetContentType() == ZenContentType::kBinary)
+ const IoHash& HashKey = Kv.first;
+ const BucketPayload& Payload = m_Payloads[Kv.second];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
+ Ctx.ThrowIfDeadlineExpired();
- RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
+ ++ChunkCount;
+ VerifiedChunkBytes += Loc.Size();
- std::error_code Ec;
- uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec);
- if (Ec)
+ if (Loc.GetContentType() == ZenContentType::kBinary)
{
- BadKeys.push_back(HashKey);
+ // Blob cache value, not much we can do about data integrity checking
+ // here since there's no hash available
+ ExtendablePathBuilder<256> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+
+ RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
+
+ std::error_code Ec;
+ uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec);
+ if (Ec)
+ {
+ ReportBadKey(HashKey);
+ }
+ if (size != Loc.Size())
+ {
+ ReportBadKey(HashKey);
+ }
+ continue;
}
- if (size != Loc.Size())
+ else
{
- BadKeys.push_back(HashKey);
+ // Structured cache value
+ IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey);
+ if (!Buffer)
+ {
+ ReportBadKey(HashKey);
+ continue;
+ }
+ if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer))
+ {
+ ReportBadKey(HashKey);
+ continue;
+ }
}
+ }
+ else
+ {
+ ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment));
+ ChunkIndexToChunkHash.push_back(HashKey);
continue;
}
- IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey);
+ }
+
+ const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void {
+ ++ChunkCount;
+ VerifiedChunkBytes += Size;
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ if (!Data)
+ {
+ // ChunkLocation out of range of stored blocks
+ ReportBadKey(Hash);
+ return;
+ }
+ if (!Size)
+ {
+ ReportBadKey(Hash);
+ return;
+ }
+ IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
if (!Buffer)
{
- BadKeys.push_back(HashKey);
- continue;
+ ReportBadKey(Hash);
+ return;
}
- if (!ValidateEntry(HashKey, Loc.GetContentType(), Buffer))
+ const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
+ ZenContentType ContentType = Payload.Location.GetContentType();
+ Buffer.SetContentType(ContentType);
+ if (!ValidateCacheBucketEntryValue(ContentType, Buffer))
{
- BadKeys.push_back(HashKey);
- continue;
+ ReportBadKey(Hash);
+ return;
}
- }
- else
- {
- ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment));
- ChunkIndexToChunkHash.push_back(HashKey);
- continue;
- }
- }
-
- const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- if (!Data)
- {
- // ChunkLocation out of range of stored blocks
- BadKeys.push_back(Hash);
- return;
- }
- IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
- if (!Buffer)
- {
- BadKeys.push_back(Hash);
- return;
- }
- const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
- ZenContentType ContentType = Payload.Location.GetContentType();
- if (!ValidateEntry(Hash, ContentType, Buffer))
- {
- BadKeys.push_back(Hash);
- return;
- }
- };
+ };
- const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file
- IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
- if (!Buffer)
- {
- BadKeys.push_back(Hash);
- return;
- }
- const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
- ZenContentType ContentType = Payload.Location.GetContentType();
- if (!ValidateEntry(Hash, ContentType, Buffer))
- {
- BadKeys.push_back(Hash);
- return;
- }
- };
+ const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void {
+ Ctx.ThrowIfDeadlineExpired();
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+ ++ChunkCount;
+ VerifiedChunkBytes += Size;
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+ if (!Buffer)
+ {
+ ReportBadKey(Hash);
+ return;
+ }
+ const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
+ ZenContentType ContentType = Payload.Location.GetContentType();
+ Buffer.SetContentType(ContentType);
+ if (!ValidateCacheBucketEntryValue(ContentType, Buffer))
+ {
+ ReportBadKey(Hash);
+ return;
+ }
+ };
- _.ReleaseNow();
+ m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+ }
+ catch (ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ }
- Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
+ Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes);
if (!BadKeys.empty())
{
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName);
+ ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir);
if (Ctx.RunRecovery())
{
@@ -1486,9 +1622,10 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadCidChunks(BadKeys);
-
- ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
+ if (!BadKeys.empty())
+ {
+ Ctx.ReportBadCidChunks(BadKeys);
+ }
}
void
@@ -1504,7 +1641,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
Stopwatch TotalTimer;
const auto _ = MakeGuard([&] {
ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})",
- m_BucketDir / m_BucketName,
+ m_BucketDir,
NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
NiceLatencyNs(WriteBlockTimeUs),
NiceLatencyNs(WriteBlockLongestTimeUs),
@@ -1598,7 +1735,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage");
- ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir / m_BucketName);
+ ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir);
Stopwatch TotalTimer;
uint64_t WriteBlockTimeUs = 0;
@@ -1618,7 +1755,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
"{} "
"of {} "
"entires ({}).",
- m_BucketDir / m_BucketName,
+ m_BucketDir,
NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
NiceLatencyNs(WriteBlockTimeUs),
NiceLatencyNs(WriteBlockLongestTimeUs),
@@ -1647,7 +1784,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
});
if (DeleteCacheKeys.empty())
{
- ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName);
+ ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir);
return;
}
@@ -1700,7 +1837,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
});
if (m_Index.empty())
{
- ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName);
+ ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir);
return;
}
BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
@@ -1851,7 +1988,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
uint64_t CurrentTotalSize = TotalSize();
ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}",
- m_BucketDir / m_BucketName,
+ m_BucketDir,
DeleteCount,
TotalChunkCount,
NiceBytes(CurrentTotalSize));
@@ -2010,6 +2147,8 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac
void
ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
{
+ ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue");
+
uint64_t NewFileSize = Value.Value.Size();
TemporaryFile DataFile;
@@ -2398,10 +2537,25 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
{
RwLock::SharedLockScope _(m_Lock);
- for (auto& Kv : m_Buckets)
{
- CacheBucket& Bucket = *Kv.second;
- Bucket.ScrubStorage(Ctx);
+ std::vector<std::future<void>> Results;
+ Results.reserve(m_Buckets.size());
+
+ for (auto& Kv : m_Buckets)
+ {
+#if 1
+ Results.push_back(
+ Ctx.ThreadPool().EnqueueTask(std::packaged_task<void()>{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
+#else
+ CacheBucket& Bucket = *Kv.second;
+ Bucket.ScrubStorage(Ctx);
+#endif
+ }
+
+ for (auto& Result : Results)
+ {
+ Result.get();
+ }
}
}
@@ -3736,7 +3890,8 @@ TEST_CASE("z$.scrub")
std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)};
CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes);
- ScrubContext ScrubCtx;
+ WorkerThreadPool ThreadPool{1};
+ ScrubContext ScrubCtx{ThreadPool};
Zcs.ScrubStorage(ScrubCtx);
CidStore.ScrubStorage(ScrubCtx);
CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());