aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-16 21:35:39 +0200
committerGitHub <[email protected]>2023-05-16 21:35:39 +0200
commit81b2757f917e34bb338fad7965ae8a74e160bee4 (patch)
tree931ba100471a2369c62a6e41a1b4a7937ed31f6f /src
parentadded benchmark utility command `bench` (#298) (diff)
downloadzen-81b2757f917e34bb338fad7965ae8a74e160bee4.tar.xz
zen-81b2757f917e34bb338fad7965ae8a74e160bee4.zip
Content scrubbing (#271)
Added zen scrub command which may be triggered via the zen CLI helper. This traverses storage and validates contents either by content hash and/or by structure. If unexpected data is encountered it is invalidated.
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/admin/admin.cpp16
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp425
-rw-r--r--src/zenserver/zenserver.cpp4
-rw-r--r--src/zenstore/cas.cpp6
-rw-r--r--src/zenstore/compactcas.cpp139
-rw-r--r--src/zenstore/gc.cpp142
-rw-r--r--src/zenstore/include/zenstore/gc.h40
-rw-r--r--src/zenstore/include/zenstore/scrubcontext.h45
-rw-r--r--src/zenstore/include/zenstore/zenstore.h2
-rw-r--r--src/zenstore/scrubcontext.cpp45
10 files changed, 625 insertions, 239 deletions
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index c37622cb6..575a10d83 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -79,6 +79,22 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler) : m_GcScheduler(Sched
HttpVerb::kPost);
m_Router.RegisterRoute(
+ "scrub",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ GcScheduler::TriggerScrubParams ScrubParams;
+ ScrubParams.MaxTimeslice = std::chrono::seconds(100);
+ m_GcScheduler.TriggerScrub(ScrubParams);
+
+ CbObjectWriter Response;
+ Response << "ok"sv << true;
+ HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
"",
[](HttpRouterRequest& Req) {
CbObject Payload = Req.ServerRequest().ReadPayloadObject();
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index 3a6e5cbc3..440da3074 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -17,11 +17,13 @@
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
#include <xxhash.h>
+#include <future>
#include <limits>
#if ZEN_PLATFORM_WINDOWS
@@ -96,13 +98,18 @@ namespace {
return BucketDir / (BucketName + LogExtension);
}
- bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason)
+ bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason)
{
if (Entry.Key == IoHash::Zero)
{
OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
return false;
}
+ if (Entry.Location.Reserved != 0)
+ {
+ OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString());
+ return false;
+ }
if (Entry.Location.GetFlags() &
~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed))
{
@@ -283,6 +290,8 @@ ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx)
return;
}
+ ZEN_INFO("scrubbing '{}'", m_RootDir);
+
m_LastScrubTime = Ctx.ScrubTimestamp();
m_DiskLayer.ScrubStorage(Ctx);
@@ -665,6 +674,12 @@ ZenCacheMemoryLayer::CacheBucket::EntryCount() const
ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero)
{
+ if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap")))
+ {
+ // This is pretty ad hoc but in order to avoid too many individual files
+ // it makes sense to have a different strategy for legacy values
+ m_LargeObjectThreshold = 16 * 1024 * 1024;
+ }
}
ZenCacheDiskLayer::CacheBucket::~CacheBucket()
@@ -676,6 +691,10 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
using namespace std::literals;
+ ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate");
+
+ ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir);
+
m_BlocksBasePath = BucketDir / "blocks";
m_BucketDir = BucketDir;
@@ -694,12 +713,12 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
return false;
}
- uint32_t Version = Manifest["Version"sv].AsUInt32(0);
- if (Version != CurrentDiskBucketVersion)
- {
- ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion);
- IsNew = true;
- }
+ // uint32_t Version = Manifest["Version"sv].AsUInt32(0);
+ // if (Version != CurrentDiskBucketVersion)
+ //{
+ // ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion);
+ // IsNew = true;
+ // }
}
else if (AllowCreate)
{
@@ -745,8 +764,17 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
{
size_t EntryIndex = It.value();
ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
- m_Payloads[EntryIndex].RawHash = Obj["RawHash"sv].AsHash();
- m_Payloads[EntryIndex].RawSize = Obj["RawSize"sv].AsUInt64();
+
+ const IoHash RawHash = Obj["RawHash"sv].AsHash();
+ const uint64_t RawSize = Obj["RawSize"sv].AsUInt64();
+
+ if (RawHash == IoHash::Zero || RawSize == 0)
+ {
+ ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex);
+ }
+
+ m_Payloads[EntryIndex].RawHash = RawHash;
+ m_Payloads[EntryIndex].RawSize = RawSize;
}
}
}
@@ -757,18 +785,20 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
void
ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
{
+ ZEN_TRACE_CPU("Z$::Bucket::MakeIndexSnapshot");
+
uint64_t LogCount = m_SlogFile.GetLogCount();
if (m_LogFlushPosition == LogCount)
{
return;
}
- ZEN_DEBUG("write store snapshot for '{}'", m_BucketDir / m_BucketName);
+ ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir);
uint64_t EntryCount = 0;
Stopwatch Timer;
const auto _ = MakeGuard([&] {
ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
- m_BucketDir / m_BucketName,
+ m_BucketDir,
EntryCount,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
@@ -792,10 +822,9 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
{
// Write the current state of the location map to a new index state
std::vector<DiskIndexEntry> Entries;
+ Entries.resize(m_Index.size());
{
- Entries.resize(m_Index.size());
-
uint64_t EntryIndex = 0;
for (auto& Entry : m_Index)
{
@@ -841,6 +870,8 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
uint64_t
ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion)
{
+ ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile");
+
if (std::filesystem::is_regular_file(IndexPath))
{
BasicFile ObjectIndexFile;
@@ -886,7 +917,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
std::string InvalidEntryReason;
for (const DiskIndexEntry& Entry : Entries)
{
- if (!ValidateEntry(Entry, InvalidEntryReason))
+ if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason))
{
ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
continue;
@@ -914,6 +945,8 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
uint64_t
ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
{
+ ZEN_TRACE_CPU("Z$::Bucket::ReadLog");
+
if (std::filesystem::is_regular_file(LogPath))
{
uint64_t LogEntryCount = 0;
@@ -942,7 +975,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
m_Index.erase(Record.Key);
return;
}
- if (!ValidateEntry(Record, InvalidEntryReason))
+ if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
{
ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
++InvalidEntryCount;
@@ -956,7 +989,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
SkipEntryCount);
if (InvalidEntryCount)
{
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
+ ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
}
return LogEntryCount;
}
@@ -967,6 +1000,8 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
void
ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
{
+ ZEN_TRACE_CPU("Z$::Bucket::OpenLog");
+
m_TotalStandaloneSize = 0;
m_Index.clear();
@@ -1111,6 +1146,8 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con
IoBuffer
ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const
{
+ ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue");
+
ExtendablePathBuilder<256> DataFilePath;
BuildPath(DataFilePath, HashKey);
@@ -1194,6 +1231,8 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
bool
ZenCacheDiskLayer::CacheBucket::Drop()
{
+ ZEN_TRACE_CPU("Z$::Bucket::Drop");
+
RwLock::ExclusiveLockScope _(m_IndexLock);
std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks;
@@ -1216,6 +1255,8 @@ ZenCacheDiskLayer::CacheBucket::Drop()
void
ZenCacheDiskLayer::CacheBucket::Flush()
{
+ ZEN_TRACE_CPU("Z$::Bucket::Flush");
+
m_BlockStore.Flush();
RwLock::SharedLockScope _(m_IndexLock);
@@ -1229,6 +1270,8 @@ ZenCacheDiskLayer::CacheBucket::SaveManifest()
{
using namespace std::literals;
+ ZEN_TRACE_CPU("Z$::Bucket::SaveManifest");
+
CbObjectWriter Writer;
Writer << "BucketId"sv << m_BucketId;
Writer << "Version"sv << CurrentDiskBucketVersion;
@@ -1277,145 +1320,238 @@ ZenCacheDiskLayer::CacheBucket::SaveManifest()
}
}
-void
-ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
+IoHash
+HashBuffer(const CompositeBuffer& Buffer)
{
- std::vector<IoHash> BadKeys;
- uint64_t ChunkCount{0}, ChunkBytes{0};
- std::vector<BlockStoreLocation> ChunkLocations;
- std::vector<IoHash> ChunkIndexToChunkHash;
+ IoHashStream Hasher;
- auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
- if (ContentType == ZenContentType::kCbObject)
+ for (const SharedBuffer& Segment : Buffer.GetSegments())
+ {
+ Hasher.Append(Segment.GetView());
+ }
+
+ return Hasher.GetHash();
+}
+
+bool
+ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer)
+{
+ ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType);
+
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
+
+ if (Error == CbValidateError::None)
{
- CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
- return Error == CbValidateError::None;
+ return true;
}
- if (ContentType == ZenContentType::kCompressedBinary)
+
+ ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error));
+
+ return false;
+ }
+ else if (ContentType == ZenContentType::kCompressedBinary)
+ {
+ IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer);
+
+ IoHash HeaderRawHash;
+ uint64_t RawSize = 0;
+ if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize))
{
- IoHash RawHash;
- uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- return false;
- }
- if (RawHash != Hash)
- {
- return false;
- }
+ ZEN_SCOPED_ERROR("compressed buffer header validation failed");
+
+ return false;
}
- return true;
- };
- RwLock::SharedLockScope _(m_IndexLock);
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize);
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ IoHash DecompressedHash = HashBuffer(Decompressed);
- const size_t BlockChunkInitialCount = m_Index.size() / 4;
- ChunkLocations.reserve(BlockChunkInitialCount);
- ChunkIndexToChunkHash.reserve(BlockChunkInitialCount);
+ if (HeaderRawHash != DecompressedHash)
+ {
+ ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash);
- for (auto& Kv : m_Index)
+ return false;
+ }
+ }
+ else
{
- const IoHash& HashKey = Kv.first;
- const BucketPayload& Payload = m_Payloads[Kv.second];
- const DiskLocation& Loc = Payload.Location;
+ // No way to verify this kind of content (what is it exactly?)
+
+ static int Once = [&] {
+ ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType));
+ return 42;
+ }();
+ }
+
+ return true;
+};
+
+void
+ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::Scrub");
+
+ ZEN_INFO("scrubbing '{}'", m_BucketDir);
+
+ Stopwatch Timer;
+ uint64_t ChunkCount = 0;
+ uint64_t VerifiedChunkBytes = 0;
+
+ auto LogStats = MakeGuard([&] {
+ const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs());
+
+ ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})",
+ m_BucketName,
+ NiceBytes(VerifiedChunkBytes),
+ NiceTimeSpanMs(DurationMs),
+ ChunkCount,
+ NiceRate(VerifiedChunkBytes, DurationMs));
+ });
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ std::vector<IoHash> BadKeys;
+ auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); };
+
+ try
+ {
+ std::vector<BlockStoreLocation> ChunkLocations;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+
+ RwLock::SharedLockScope _(m_IndexLock);
+
+ const size_t BlockChunkInitialCount = m_Index.size() / 4;
+ ChunkLocations.reserve(BlockChunkInitialCount);
+ ChunkIndexToChunkHash.reserve(BlockChunkInitialCount);
+
+ // Do a pass over the index and verify any standalone file values straight away
+ // all other storage classes are gathered and verified in bulk in order to enable
+ // more efficient I/O scheduling
+
+ for (auto& Kv : m_Index)
{
- ++ChunkCount;
- ChunkBytes += Loc.Size();
- if (Loc.GetContentType() == ZenContentType::kBinary)
+ const IoHash& HashKey = Kv.first;
+ const BucketPayload& Payload = m_Payloads[Kv.second];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
+ Ctx.ThrowIfDeadlineExpired();
- RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
+ ++ChunkCount;
+ VerifiedChunkBytes += Loc.Size();
- std::error_code Ec;
- uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec);
- if (Ec)
+ if (Loc.GetContentType() == ZenContentType::kBinary)
{
- BadKeys.push_back(HashKey);
+ // Blob cache value, not much we can do about data integrity checking
+ // here since there's no hash available
+ ExtendablePathBuilder<256> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+
+ RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
+
+ std::error_code Ec;
+ uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec);
+ if (Ec)
+ {
+ ReportBadKey(HashKey);
+ }
+ if (size != Loc.Size())
+ {
+ ReportBadKey(HashKey);
+ }
+ continue;
}
- if (size != Loc.Size())
+ else
{
- BadKeys.push_back(HashKey);
+ // Structured cache value
+ IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey);
+ if (!Buffer)
+ {
+ ReportBadKey(HashKey);
+ continue;
+ }
+ if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer))
+ {
+ ReportBadKey(HashKey);
+ continue;
+ }
}
+ }
+ else
+ {
+ ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment));
+ ChunkIndexToChunkHash.push_back(HashKey);
continue;
}
- IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey);
+ }
+
+ const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void {
+ ++ChunkCount;
+ VerifiedChunkBytes += Size;
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ if (!Data)
+ {
+ // ChunkLocation out of range of stored blocks
+ ReportBadKey(Hash);
+ return;
+ }
+ if (!Size)
+ {
+ ReportBadKey(Hash);
+ return;
+ }
+ IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
if (!Buffer)
{
- BadKeys.push_back(HashKey);
- continue;
+ ReportBadKey(Hash);
+ return;
}
- if (!ValidateEntry(HashKey, Loc.GetContentType(), Buffer))
+ const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
+ ZenContentType ContentType = Payload.Location.GetContentType();
+ Buffer.SetContentType(ContentType);
+ if (!ValidateCacheBucketEntryValue(ContentType, Buffer))
{
- BadKeys.push_back(HashKey);
- continue;
+ ReportBadKey(Hash);
+ return;
}
- }
- else
- {
- ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment));
- ChunkIndexToChunkHash.push_back(HashKey);
- continue;
- }
- }
-
- const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- if (!Data)
- {
- // ChunkLocation out of range of stored blocks
- BadKeys.push_back(Hash);
- return;
- }
- IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
- if (!Buffer)
- {
- BadKeys.push_back(Hash);
- return;
- }
- const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
- ZenContentType ContentType = Payload.Location.GetContentType();
- if (!ValidateEntry(Hash, ContentType, Buffer))
- {
- BadKeys.push_back(Hash);
- return;
- }
- };
+ };
- const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file
- IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
- if (!Buffer)
- {
- BadKeys.push_back(Hash);
- return;
- }
- const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
- ZenContentType ContentType = Payload.Location.GetContentType();
- if (!ValidateEntry(Hash, ContentType, Buffer))
- {
- BadKeys.push_back(Hash);
- return;
- }
- };
+ const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void {
+ Ctx.ThrowIfDeadlineExpired();
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+ ++ChunkCount;
+ VerifiedChunkBytes += Size;
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+ if (!Buffer)
+ {
+ ReportBadKey(Hash);
+ return;
+ }
+ const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
+ ZenContentType ContentType = Payload.Location.GetContentType();
+ Buffer.SetContentType(ContentType);
+ if (!ValidateCacheBucketEntryValue(ContentType, Buffer))
+ {
+ ReportBadKey(Hash);
+ return;
+ }
+ };
- _.ReleaseNow();
+ m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+ }
+ catch (ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ }
- Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
+ Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes);
if (!BadKeys.empty())
{
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName);
+ ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir);
if (Ctx.RunRecovery())
{
@@ -1486,9 +1622,10 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadCidChunks(BadKeys);
-
- ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
+ if (!BadKeys.empty())
+ {
+ Ctx.ReportBadCidChunks(BadKeys);
+ }
}
void
@@ -1504,7 +1641,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
Stopwatch TotalTimer;
const auto _ = MakeGuard([&] {
ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})",
- m_BucketDir / m_BucketName,
+ m_BucketDir,
NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
NiceLatencyNs(WriteBlockTimeUs),
NiceLatencyNs(WriteBlockLongestTimeUs),
@@ -1598,7 +1735,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage");
- ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir / m_BucketName);
+ ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir);
Stopwatch TotalTimer;
uint64_t WriteBlockTimeUs = 0;
@@ -1618,7 +1755,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
"{} "
"of {} "
"entires ({}).",
- m_BucketDir / m_BucketName,
+ m_BucketDir,
NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
NiceLatencyNs(WriteBlockTimeUs),
NiceLatencyNs(WriteBlockLongestTimeUs),
@@ -1647,7 +1784,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
});
if (DeleteCacheKeys.empty())
{
- ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName);
+ ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir);
return;
}
@@ -1700,7 +1837,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
});
if (m_Index.empty())
{
- ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName);
+ ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir);
return;
}
BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
@@ -1851,7 +1988,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
uint64_t CurrentTotalSize = TotalSize();
ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}",
- m_BucketDir / m_BucketName,
+ m_BucketDir,
DeleteCount,
TotalChunkCount,
NiceBytes(CurrentTotalSize));
@@ -2010,6 +2147,8 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac
void
ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
{
+ ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue");
+
uint64_t NewFileSize = Value.Value.Size();
TemporaryFile DataFile;
@@ -2398,10 +2537,25 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
{
RwLock::SharedLockScope _(m_Lock);
- for (auto& Kv : m_Buckets)
{
- CacheBucket& Bucket = *Kv.second;
- Bucket.ScrubStorage(Ctx);
+ std::vector<std::future<void>> Results;
+ Results.reserve(m_Buckets.size());
+
+ for (auto& Kv : m_Buckets)
+ {
+#if 1
+ Results.push_back(
+ Ctx.ThreadPool().EnqueueTask(std::packaged_task<void()>{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
+#else
+ CacheBucket& Bucket = *Kv.second;
+ Bucket.ScrubStorage(Ctx);
+#endif
+ }
+
+ for (auto& Result : Results)
+ {
+ Result.get();
+ }
}
}
@@ -3736,7 +3890,8 @@ TEST_CASE("z$.scrub")
std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)};
CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes);
- ScrubContext ScrubCtx;
+ WorkerThreadPool ThreadPool{1};
+ ScrubContext ScrubCtx{ThreadPool};
Zcs.ScrubStorage(ScrubCtx);
CidStore.ScrubStorage(ScrubCtx);
CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 827d5d2db..30c9e3937 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -14,6 +14,7 @@
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/websocket.h>
#include <zenstore/cidstore.h>
@@ -622,7 +623,8 @@ public:
Stopwatch Timer;
ZEN_INFO("Storage validation STARTING");
- ScrubContext Ctx;
+ WorkerThreadPool ThreadPool{1};
+ ScrubContext Ctx{ThreadPool};
m_CidStore->ScrubStorage(Ctx);
m_ProjectStore->ScrubStorage(Ctx);
m_StructuredCacheService->ScrubStorage(Ctx);
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index ab05e3e7c..b98f01385 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -18,6 +18,7 @@
#include <zencore/thread.h>
#include <zencore/trace.h>
#include <zencore/uid.h>
+#include <zencore/workthreadpool.h>
#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
#include <zenstore/scrubcontext.h>
@@ -74,6 +75,8 @@ private:
void UpdateManifest();
};
+//////////////////////////////////////////////////////////////////////////
+
CasImpl::CasImpl(GcManager& Gc) : m_TinyStrategy(Gc), m_SmallStrategy(Gc), m_LargeStrategy(Gc)
{
}
@@ -323,7 +326,8 @@ TEST_CASE("CasStore")
std::unique_ptr<CasStore> Store = CreateCasStore(Gc);
Store->Initialize(config);
- ScrubContext Ctx;
+ WorkerThreadPool ThreadPool{1};
+ ScrubContext Ctx{ThreadPool};
Store->ScrubStorage(Ctx);
IoBuffer Value1{16};
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index a8a4dc102..e9037b16c 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -244,93 +244,100 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
- RwLock::SharedLockScope _(m_LocationMapLock);
-
- uint64_t TotalChunkCount = m_LocationMap.size();
- ChunkLocations.reserve(TotalChunkCount);
- ChunkIndexToChunkHash.reserve(TotalChunkCount);
+ try
{
- for (const auto& Entry : m_LocationMap)
+ RwLock::SharedLockScope _(m_LocationMapLock);
+
+ uint64_t TotalChunkCount = m_LocationMap.size();
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
{
- const IoHash& ChunkHash = Entry.first;
- const BlockStoreDiskLocation& DiskLocation = Entry.second;
- BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
+ for (const auto& Entry : m_LocationMap)
+ {
+ const IoHash& ChunkHash = Entry.first;
+ const BlockStoreDiskLocation& DiskLocation = Entry.second;
+ BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
- ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash.push_back(ChunkHash);
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash.push_back(ChunkHash);
+ }
}
- }
- const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
+ const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- if (!Data)
- {
- // ChunkLocation out of range of stored blocks
- BadKeys.push_back(Hash);
- return;
- }
-
- IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- if (RawHash != Hash)
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ if (!Data)
{
- // Hash mismatch
+ // ChunkLocation out of range of stored blocks
BadKeys.push_back(Hash);
return;
}
- return;
- }
+
+ IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ {
+ if (RawHash != Hash)
+ {
+ // Hash mismatch
+ BadKeys.push_back(Hash);
+ return;
+ }
+ return;
+ }
#if ZEN_WITH_TESTS
- IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
- if (ComputedHash == Hash)
- {
- return;
- }
+ IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
+ if (ComputedHash == Hash)
+ {
+ return;
+ }
#endif
- BadKeys.push_back(Hash);
- };
+ BadKeys.push_back(Hash);
+ };
- const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- ++ChunkCount;
- ChunkBytes += Size;
+ const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ Ctx.ThrowIfDeadlineExpired();
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+ ++ChunkCount;
+ ChunkBytes += Size;
- IoHash RawHash;
- uint64_t RawSize;
- // TODO: Add API to verify compressed buffer without having to memorymap the whole file
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- if (RawHash != Hash)
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ // TODO: Add API to verify compressed buffer without having to memorymap the whole file
+ if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
{
- // Hash mismatch
- BadKeys.push_back(Hash);
+ if (RawHash != Hash)
+ {
+ // Hash mismatch
+ BadKeys.push_back(Hash);
+ return;
+ }
return;
}
- return;
- }
#if ZEN_WITH_TESTS
- IoHashStream Hasher;
- File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
- IoHash ComputedHash = Hasher.GetHash();
- if (ComputedHash == Hash)
- {
- return;
- }
+ IoHashStream Hasher;
+ File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
+ IoHash ComputedHash = Hasher.GetHash();
+ if (ComputedHash == Hash)
+ {
+ return;
+ }
#endif
- BadKeys.push_back(Hash);
- };
-
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+ BadKeys.push_back(Hash);
+ };
- _.ReleaseNow();
+ m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
+ }
+ catch (ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ }
Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index dc19a9a35..516a08f14 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -15,7 +15,9 @@
#include <zencore/testutils.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
#include <zenstore/cidstore.h>
+#include <zenstore/scrubcontext.h>
#include "cas.h"
@@ -378,6 +380,17 @@ GcManager::RemoveGcStorage(GcStorage* Storage)
}
void
+GcManager::ScrubStorage(ScrubContext& GcCtx)
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (GcStorage* Storage : m_GcStorage)
+ {
+ Storage->ScrubStorage(GcCtx);
+ }
+}
+
+void
GcManager::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Gc::CollectGarbage");
@@ -435,6 +448,7 @@ GcManager::TotalStorageSize() const
}
//////////////////////////////////////////////////////////////////////////
+
void
DiskUsageWindow::KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick)
{
@@ -660,7 +674,9 @@ GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params)
{
m_TriggerGcParams = Params;
uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
- if (m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
+
+ if (m_Status.compare_exchange_strong(/* expected */ IdleState,
+ /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
{
m_GcSignal.notify_one();
return true;
@@ -671,6 +687,27 @@ GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params)
return false;
}
+bool
+GcScheduler::TriggerScrub(const TriggerScrubParams& Params)
+{
+ std::unique_lock Lock(m_GcMutex);
+
+ if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
+ {
+ m_TriggerScrubParams = Params;
+ uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
+
+ if (m_Status.compare_exchange_strong(/* expected */ IdleState, /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
+ {
+ m_GcSignal.notify_one();
+
+ return true;
+ }
+ }
+
+ return false;
+}
+
void
GcScheduler::CheckDiskSpace(const DiskSpace& Space)
{
@@ -697,6 +734,8 @@ GcScheduler::CheckDiskSpace(const DiskSpace& Space)
void
GcScheduler::SchedulerThread()
{
+ SetCurrentThreadName("GcScheduler");
+
std::chrono::seconds WaitTime{0};
for (;;)
@@ -713,7 +752,7 @@ GcScheduler::SchedulerThread()
break;
}
- if (!m_Config.Enabled)
+ if (!m_Config.Enabled && !m_TriggerScrubParams)
{
WaitTime = std::chrono::seconds::max();
continue;
@@ -724,18 +763,23 @@ GcScheduler::SchedulerThread()
continue;
}
- bool Delete = true;
+ bool DoGc = m_Config.Enabled;
+ bool DoScrubbing = false;
+ std::chrono::seconds ScrubTimeslice = std::chrono::seconds::max();
+ bool DoDelete = true;
bool CollectSmallObjects = m_Config.CollectSmallObjects;
std::chrono::seconds MaxCacheDuration = m_Config.MaxCacheDuration;
std::chrono::seconds MaxProjectStoreDuration = m_Config.MaxProjectStoreDuration;
uint64_t DiskSizeSoftLimit = m_Config.DiskSizeSoftLimit;
GcClock::TimePoint Now = GcClock::Now();
+
if (m_TriggerGcParams)
{
const auto TriggerParams = m_TriggerGcParams.value();
m_TriggerGcParams.reset();
CollectSmallObjects = TriggerParams.CollectSmallObjects;
+
if (TriggerParams.MaxCacheDuration != std::chrono::seconds::max())
{
MaxCacheDuration = TriggerParams.MaxCacheDuration;
@@ -750,6 +794,29 @@ GcScheduler::SchedulerThread()
}
}
+ if (m_TriggerScrubParams)
+ {
+ DoScrubbing = true;
+
+ if (m_TriggerScrubParams->SkipGc)
+ {
+ DoGc = false;
+ }
+
+ ScrubTimeslice = m_TriggerScrubParams->MaxTimeslice;
+ }
+
+ if (DoScrubbing)
+ {
+ ScrubStorage(DoDelete, ScrubTimeslice);
+ m_TriggerScrubParams.reset();
+ }
+
+ if (!DoGc)
+ {
+ continue;
+ }
+
GcClock::TimePoint CacheExpireTime =
MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration;
GcClock::TimePoint ProjectStoreExpireTime =
@@ -775,14 +842,15 @@ GcScheduler::SchedulerThread()
const std::chrono::duration LoadGraphTime = PressureGraphLength * m_Config.MonitorInterval;
std::vector<uint64_t> DiskDeltas;
uint64_t MaxLoad = 0;
+
{
const GcClock::Tick EpochTickCount = GcClock::Now().time_since_epoch().count();
std::unique_lock Lock(m_GcMutex);
m_DiskUsageWindow.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize});
m_DiskUsageLog.Append({.SampleTime = EpochTickCount, .DiskUsage = TotalSize.DiskSize});
const GcClock::TimePoint LoadGraphStartTime = Now - LoadGraphTime;
- GcClock::Tick Start = LoadGraphStartTime.time_since_epoch().count();
- GcClock::Tick End = Now.time_since_epoch().count();
+ const GcClock::Tick Start = LoadGraphStartTime.time_since_epoch().count();
+ const GcClock::Tick End = Now.time_since_epoch().count();
DiskDeltas = m_DiskUsageWindow.GetDiskDeltas(Start,
End,
Max(1, (End - Start + PressureGraphLength - 1) / PressureGraphLength),
@@ -818,7 +886,7 @@ GcScheduler::SchedulerThread()
}
}
- bool DiskSpaceGCTriggered = GcDiskSpaceGoal > 0;
+ const bool DiskSpaceGCTriggered = GcDiskSpaceGoal > 0;
std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now());
@@ -858,7 +926,7 @@ GcScheduler::SchedulerThread()
}
}
- CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, Delete, CollectSmallObjects);
+ CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, DoDelete, CollectSmallObjects);
uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle)))
@@ -885,6 +953,36 @@ GcScheduler::NextGcTime(GcClock::TimePoint CurrentTime)
}
void
+GcScheduler::ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice)
+{
+ const std::chrono::steady_clock::time_point TimeNow = std::chrono::steady_clock::now();
+ std::chrono::steady_clock::time_point Deadline = TimeNow + TimeSlice;
+ // there really should be a saturating add in std::chrono
+ if (Deadline < TimeNow)
+ {
+ Deadline = std::chrono::steady_clock::time_point::max();
+ }
+
+ Stopwatch Timer;
+ ZEN_INFO("scrubbing STARTING (delete mode => {})", DoDelete);
+
+ WorkerThreadPool ThreadPool{4, "scrubber"};
+ ScrubContext Ctx{ThreadPool, Deadline};
+
+ try
+ {
+ Ctx.SetShouldDelete(DoDelete);
+ m_GcManager.ScrubStorage(Ctx);
+ }
+ catch (ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("scrubbing deadline expired (top level), operation incomplete!");
+ }
+
+ ZEN_INFO("scrubbing DONE (in {})", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+}
+
+void
GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
const GcClock::TimePoint& ProjectStoreExpireTime,
bool Delete,
@@ -1354,6 +1452,36 @@ TEST_CASE("gc.diskusagewindow")
CHECK(Stats.FindTimepointThatRemoves(100000u, 1000));
}
}
+
+TEST_CASE("scrub.basic")
+{
+ using namespace gc::impl;
+
+ ScopedTemporaryDirectory TempDir;
+
+ CidStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path() / "cas";
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+
+ CidStore.Initialize(CasConfig);
+
+ IoBuffer Chunk = CreateChunk(128);
+ auto CompressedChunk = Compress(Chunk);
+
+ const auto InsertResult = CidStore.AddChunk(CompressedChunk.GetCompressed().Flatten().AsIoBuffer(), CompressedChunk.DecodeRawHash());
+ CHECK(InsertResult.New);
+
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
+ GcCtx.CollectSmallObjects(true);
+
+ CidStore.Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ CHECK(!CidStore.ContainsChunk(CompressedChunk.DecodeRawHash()));
+}
+
#endif
void
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index 881936d0f..22b9bc284 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -22,9 +22,10 @@ class logger;
namespace zen {
-class HashKeySet;
-class GcManager;
class CidStore;
+class GcManager;
+class HashKeySet;
+class ScrubContext;
struct IoHash;
struct DiskSpace;
@@ -146,6 +147,7 @@ public:
void RemoveGcStorage(GcStorage* Contributor);
void CollectGarbage(GcContext& GcCtx);
+ void ScrubStorage(ScrubContext& GcCtx);
GcStorageSize TotalStorageSize() const;
@@ -226,29 +228,39 @@ public:
bool TriggerGc(const TriggerGcParams& Params);
+ struct TriggerScrubParams
+ {
+ bool SkipGc = false;
+ std::chrono::seconds MaxTimeslice = std::chrono::seconds::max();
+ };
+
+ bool TriggerScrub(const TriggerScrubParams& Params);
+
private:
void SchedulerThread();
void CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
const GcClock::TimePoint& ProjectStoreExpireTime,
bool Delete,
bool CollectSmallObjects);
+ void ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice);
GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime);
spdlog::logger& Log() { return m_Log; }
virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); }
void CheckDiskSpace(const DiskSpace& Space);
- spdlog::logger& m_Log;
- GcManager& m_GcManager;
- GcSchedulerConfig m_Config;
- GcClock::TimePoint m_LastGcTime{};
- GcClock::TimePoint m_LastGcExpireTime{};
- GcClock::TimePoint m_NextGcTime{};
- std::atomic_uint32_t m_Status{};
- std::thread m_GcThread;
- std::mutex m_GcMutex;
- std::condition_variable m_GcSignal;
- std::optional<TriggerGcParams> m_TriggerGcParams;
- std::atomic_bool m_AreDiskWritesBlocked = false;
+ spdlog::logger& m_Log;
+ GcManager& m_GcManager;
+ GcSchedulerConfig m_Config;
+ GcClock::TimePoint m_LastGcTime{};
+ GcClock::TimePoint m_LastGcExpireTime{};
+ GcClock::TimePoint m_NextGcTime{};
+ std::atomic_uint32_t m_Status{};
+ std::thread m_GcThread;
+ std::mutex m_GcMutex;
+ std::condition_variable m_GcSignal;
+ std::optional<TriggerGcParams> m_TriggerGcParams;
+ std::optional<TriggerScrubParams> m_TriggerScrubParams;
+ std::atomic_bool m_AreDiskWritesBlocked = false;
TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog;
DiskUsageWindow m_DiskUsageWindow;
diff --git a/src/zenstore/include/zenstore/scrubcontext.h b/src/zenstore/include/zenstore/scrubcontext.h
index 8b8ebac3d..cefaf0888 100644
--- a/src/zenstore/include/zenstore/scrubcontext.h
+++ b/src/zenstore/include/zenstore/scrubcontext.h
@@ -7,38 +7,59 @@
namespace zen {
+class WorkerThreadPool;
+
/** Context object for data scrubbing
- *
- * Data scrubbing is when we traverse stored data to validate it and
- * optionally correct/recover
+
+ Data scrubbing is when we traverse stored data to validate it and
+ optionally correct/recover
*/
class ScrubContext
{
public:
- ScrubContext();
+ ScrubContext(WorkerThreadPool& InWorkerThreadPool,
+ std::chrono::steady_clock::time_point Deadline = std::chrono::steady_clock::time_point::max());
~ScrubContext();
- virtual void ReportBadCidChunks(std::span<IoHash> BadCasChunks) { m_BadCid.AddHashesToSet(BadCasChunks); }
+ void ReportBadCidChunks(std::span<IoHash> BadCasChunks);
inline uint64_t ScrubTimestamp() const { return m_ScrubTime; }
- inline bool RunRecovery() const { return m_Recover; }
void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
{
m_ChunkCount.fetch_add(ChunkCount);
m_ByteCount.fetch_add(ChunkBytes);
}
+ std::chrono::steady_clock::time_point GetDeadline() const { return m_Deadline; }
+ bool IsWithinDeadline() const;
+ void ThrowIfDeadlineExpired() const;
+
inline uint64_t ScrubbedChunks() const { return m_ChunkCount; }
inline uint64_t ScrubbedBytes() const { return m_ByteCount; }
- const HashKeySet BadCids() const { return m_BadCid; }
+ HashKeySet BadCids() const;
+
+ inline bool RunRecovery() const { return m_Recover; }
+ inline void SetShouldDelete(bool DoDelete) { m_Recover = DoDelete; }
+
+ inline WorkerThreadPool& ThreadPool() { return m_WorkerThreadPool; }
private:
- uint64_t m_ScrubTime = GetHifreqTimerValue();
- bool m_Recover = true;
- std::atomic<uint64_t> m_ChunkCount{0};
- std::atomic<uint64_t> m_ByteCount{0};
- HashKeySet m_BadCid;
+ uint64_t m_ScrubTime = GetHifreqTimerValue();
+ bool m_Recover = true;
+ std::atomic<uint64_t> m_ChunkCount{0};
+ std::atomic<uint64_t> m_ByteCount{0};
+ mutable RwLock m_Lock;
+ HashKeySet m_BadCid;
+ WorkerThreadPool& m_WorkerThreadPool;
+ std::chrono::steady_clock::time_point m_Deadline{};
+};
+
+class ScrubDeadlineExpiredException : public std::runtime_error
+{
+public:
+ ScrubDeadlineExpiredException();
+ ~ScrubDeadlineExpiredException();
};
} // namespace zen
diff --git a/src/zenstore/include/zenstore/zenstore.h b/src/zenstore/include/zenstore/zenstore.h
index 46d62029d..29f3d2639 100644
--- a/src/zenstore/include/zenstore/zenstore.h
+++ b/src/zenstore/include/zenstore/zenstore.h
@@ -10,4 +10,4 @@ namespace zen {
ZENSTORE_API void zenstore_forcelinktests();
-}
+} // namespace zen
diff --git a/src/zenstore/scrubcontext.cpp b/src/zenstore/scrubcontext.cpp
index f35178de6..f5a3784c3 100644
--- a/src/zenstore/scrubcontext.cpp
+++ b/src/zenstore/scrubcontext.cpp
@@ -6,7 +6,19 @@
namespace zen {
-ScrubContext::ScrubContext()
+ScrubDeadlineExpiredException::ScrubDeadlineExpiredException() : std::runtime_error("scrubbing deadline expired")
+{
+}
+
+ScrubDeadlineExpiredException::~ScrubDeadlineExpiredException()
+{
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ScrubContext::ScrubContext(WorkerThreadPool& InWorkerThreadPool, std::chrono::steady_clock::time_point Deadline)
+: m_WorkerThreadPool(InWorkerThreadPool)
+, m_Deadline(Deadline)
{
}
@@ -14,4 +26,33 @@ ScrubContext::~ScrubContext()
{
}
-} // namespace zen \ No newline at end of file
+HashKeySet
+ScrubContext::BadCids() const
+{
+ RwLock::SharedLockScope _(m_Lock);
+ return m_BadCid;
+}
+
+void
+ScrubContext::ReportBadCidChunks(std::span<IoHash> BadCasChunks)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_BadCid.AddHashesToSet(BadCasChunks);
+}
+
+bool
+ScrubContext::IsWithinDeadline() const
+{
+ return std::chrono::steady_clock::now() < m_Deadline;
+}
+
+void
+ScrubContext::ThrowIfDeadlineExpired() const
+{
+ if (IsWithinDeadline())
+ return;
+
+ throw ScrubDeadlineExpiredException();
+}
+
+} // namespace zen