aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp728
1 files changed, 179 insertions, 549 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 4be33170c..4e7ad522d 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -18,6 +18,7 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenstore/cidstore.h>
+#include <zenstore/scrubcontext.h>
#include <xxhash.h>
@@ -66,67 +67,10 @@ namespace {
static_assert(sizeof(CacheBucketIndexHeader) == 32);
- struct LegacyDiskLocation
- {
- inline LegacyDiskLocation() = default;
-
- inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
- : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
- , LowerSize(ValueSize & 0xFFFFffff)
- , IndexDataSize(IndexSize)
- {
- }
-
- static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
- static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize)
- static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
- static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file
- static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary
- static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value
- static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format
-
- static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
-
- inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
- inline uint64_t Size() const { return LowerSize; }
- inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
- inline ZenContentType GetContentType() const
- {
- ZenContentType ContentType = ZenContentType::kBinary;
-
- if (IsFlagSet(LegacyDiskLocation::kStructured))
- {
- ContentType = ZenContentType::kCbObject;
- }
-
- if (IsFlagSet(LegacyDiskLocation::kCompressed))
- {
- ContentType = ZenContentType::kCompressedBinary;
- }
-
- return ContentType;
- }
- inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; }
-
- private:
- uint64_t OffsetAndFlags = 0;
- uint32_t LowerSize = 0;
- uint32_t IndexDataSize = 0;
- };
-
- struct LegacyDiskIndexEntry
- {
- IoHash Key;
- LegacyDiskLocation Location;
- };
-
#pragma pack(pop)
- static_assert(sizeof(LegacyDiskIndexEntry) == 36);
-
- const char* IndexExtension = ".uidx";
- const char* LogExtension = ".slog";
- const char* LegacyDataExtension = ".sobs";
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".slog";
std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
{
@@ -143,42 +87,6 @@ namespace {
return BucketDir / (BucketName + LogExtension);
}
- std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir)
- {
- return BucketDir / (std::string("zen") + LogExtension);
- }
-
- std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir)
- {
- return BucketDir / (std::string("zen") + LegacyDataExtension);
- }
-
- bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason)
- {
- if (Entry.Key == IoHash::Zero)
- {
- OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured |
- LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed))
- {
- OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString());
- return false;
- }
- if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
- {
- return true;
- }
- uint64_t Size = Entry.Location.Size();
- if (Size == 0)
- {
- OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
- return false;
- }
- return true;
- }
-
bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason)
{
if (Entry.Key == IoHash::Zero)
@@ -262,7 +170,7 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
WriteFile(Path, Object.GetBuffer().AsIoBuffer());
}
-ZenCacheNamespace::ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir)
+ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir)
: GcStorage(Gc)
, GcContributor(Gc)
, m_RootDir(RootDir)
@@ -583,9 +491,25 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
std::vector<IoHash> BadHashes;
+ auto ValidateEntry = [](ZenContentType ContentType, IoBuffer Buffer) {
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
+ return Error == CbValidateError::None;
+ }
+ if (ContentType == ZenContentType::kCompressedBinary)
+ {
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); !Compressed)
+ {
+ return false;
+ }
+ }
+ return true;
+ };
+
for (auto& Kv : m_CacheMap)
{
- if (Kv.first != IoHash::HashBuffer(Kv.second.Payload))
+ if (!ValidateEntry(Kv.second.Payload.GetContentType(), Kv.second.Payload))
{
BadHashes.push_back(Kv.first);
}
@@ -593,7 +517,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
if (!BadHashes.empty())
{
- Ctx.ReportBadCasChunks(BadHashes);
+ Ctx.ReportBadCidChunks(BadHashes);
}
}
@@ -891,229 +815,6 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
return 0;
};
-uint64_t
-ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
-{
- std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
-
- if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0)
- {
- return 0;
- }
-
- ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName);
-
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
-
- uint64_t MigratedChunkCount = 0;
- uint32_t MigratedBlockCount = 0;
- Stopwatch MigrationTimer;
- uint64_t TotalSize = 0;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
- m_BucketDir / m_BucketName,
- MigratedChunkCount,
- MigratedBlockCount,
- NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
- NiceBytes(TotalSize));
- });
-
- uint64_t BlockFileSize = 0;
- {
- BasicFile BlockFile;
- BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
- BlockFileSize = BlockFile.FileSize();
- }
-
- std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
- uint64_t InvalidEntryCount = 0;
-
- size_t BlockChunkCount = 0;
- TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog;
- LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
- {
- Stopwatch Timer;
- const auto __ = MakeGuard([&] {
- ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
- LegacyLogPath,
- LegacyDiskIndex.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
- if (LegacyCasLog.Initialize())
- {
- LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
- LegacyCasLog.Replay(
- [&](const LegacyDiskIndexEntry& Record) {
- if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
- {
- LegacyDiskIndex.erase(Record.Key);
- return;
- }
- std::string InvalidEntryReason;
- if (!ValidateLegacyEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
- ++InvalidEntryCount;
- return;
- }
- if (m_Index.contains(Record.Key))
- {
- return;
- }
- LegacyDiskIndex[Record.Key] = Record;
- },
- 0);
-
- std::vector<IoHash> BadEntries;
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyDiskIndexEntry& Record(Entry.second);
- if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
- {
- continue;
- }
- if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize)
- {
- BlockChunkCount++;
- continue;
- }
- ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath);
- BadEntries.push_back(Entry.first);
- }
- for (const IoHash& BadHash : BadEntries)
- {
- LegacyDiskIndex.erase(BadHash);
- }
- InvalidEntryCount += BadEntries.size();
- }
- }
- if (InvalidEntryCount)
- {
- ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
- }
-
- if (LegacyDiskIndex.empty())
- {
- LegacyCasLog.Close();
- if (CleanSource)
- {
- // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
- // a manifest and crashes on startup if they don't.
- // In order to not break startup when switching back an older version, lets just reset
- // the legacy data files to zero length.
-
- BasicFile LegacyLog;
- LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySobs;
- LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
- }
- return 0;
- }
-
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
- CreateDirectories(LogPath.parent_path());
- TCasLogFile<DiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
-
- std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
- std::vector<BlockStoreLocation> ChunkLocations;
- ChunkIndexToChunkHash.reserve(BlockChunkCount);
- ChunkLocations.reserve(BlockChunkCount);
-
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount);
-
- for (const auto& Entry : LegacyDiskIndex)
- {
- const IoHash& ChunkHash = Entry.first;
- const LegacyDiskLocation& Location = Entry.second.Location;
- if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
- {
- uint8_t Flags = 0xff & (Location.Flags() >> 56);
- DiskLocation NewLocation = DiskLocation(Location.Size(), Flags);
- LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation});
- continue;
- }
- size_t ChunkIndex = ChunkLocations.size();
- ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()});
- ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
- TotalSize += Location.Size();
- }
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
- }
- CasLog.Append(LogEntries);
-
- m_BlockStore.Split(
- ChunkLocations,
- LegacyDataPath,
- m_BlocksBasePath,
- MaxBlockSize,
- BlockStoreDiskLocation::MaxBlockIndex + 1,
- m_PayloadAlignment,
- CleanSource,
- [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
- const BlockStore::MovedChunksArray& MovedChunks) {
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(MovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
- const LegacyDiskLocation& OldLocation = OldEntry.Location;
- uint8_t Flags = 0xff & (OldLocation.Flags() >> 56);
- LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)});
- }
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
- }
- CasLog.Append(LogEntries);
- CasLog.Flush();
- if (CleanSource)
- {
- std::vector<LegacyDiskIndexEntry> LegacyLogEntries;
- LegacyLogEntries.reserve(MovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
- const LegacyDiskLocation& OldLocation = OldEntry.Location;
- LegacyDiskLocation NewLocation(OldLocation.Offset(),
- OldLocation.Size(),
- 0,
- OldLocation.Flags() | LegacyDiskLocation::kTombStone);
- LegacyLogEntries.push_back(LegacyDiskIndexEntry{.Key = ChunkHash, .Location = NewLocation});
- }
- LegacyCasLog.Append(LegacyLogEntries);
- LegacyCasLog.Flush();
- }
- MigratedBlockCount++;
- MigratedChunkCount += MovedChunks.size();
- });
-
- LegacyCasLog.Close();
- CasLog.Close();
-
- if (CleanSource)
- {
- // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
- // a manifest and crashes on startup if they don't.
- // In order to not break startup when switching back an older version, lets just reset
- // the legacy data files to zero length.
-
- BasicFile LegacyLog;
- LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySobs;
- LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
- }
- return MigratedChunkCount;
-}
-
void
ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew)
{
@@ -1123,23 +824,18 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
m_Index.clear();
- std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
- std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
if (IsNew)
{
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
- fs::remove(LegacyLogPath);
- fs::remove(LegacyDataPath);
fs::remove(LogPath);
fs::remove(IndexPath);
fs::remove_all(m_BlocksBasePath);
}
- uint64_t LogPosition = ReadIndexFile();
- uint64_t LogEntryCount = ReadLog(LogPosition);
- uint64_t LegacyLogEntryCount = MigrateLegacyData(true);
+ uint64_t LogPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(LogPosition);
CreateDirectories(m_BucketDir);
@@ -1161,7 +857,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
- if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0))
+ if (IsNew || LogEntryCount > 0)
{
MakeIndexSnapshot();
}
@@ -1309,6 +1005,7 @@ void
ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
std::vector<IoHash> BadKeys;
+ uint64_t ChunkCount{0}, ChunkBytes{0};
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
@@ -1341,6 +1038,8 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
+ ++ChunkCount;
+ ChunkBytes += Loc.Size();
if (Loc.GetContentType() == ZenContentType::kBinary)
{
ExtendablePathBuilder<256> DataFilePath;
@@ -1381,6 +1080,8 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
}
const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
if (!Data)
{
@@ -1403,8 +1104,11 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
};
const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+ // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file
+ IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
if (!Buffer)
{
BadKeys.push_back(Hash);
@@ -1422,40 +1126,41 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
_.ReleaseNow();
- if (BadKeys.empty())
- {
- return;
- }
+ Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
- ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName);
-
- if (Ctx.RunRecovery())
+ if (!BadKeys.empty())
{
- // Deal with bad chunks by removing them from our lookup map
-
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(BadKeys.size());
+ ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName);
+ if (Ctx.RunRecovery())
{
- RwLock::ExclusiveLockScope __(m_IndexLock);
- for (const IoHash& BadKey : BadKeys)
- {
- // Log a tombstone and delete the in-memory index for the bad entry
+ // Deal with bad chunks by removing them from our lookup map
- const auto It = m_Index.find(BadKey);
- DiskLocation Location = It->second.Location;
- Location.Flags |= DiskLocation::kTombStone;
- LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
- m_Index.erase(BadKey);
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(BadKeys.size());
+
+ {
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ for (const IoHash& BadKey : BadKeys)
+ {
+ // Log a tombstone and delete the in-memory index for the bad entry
+ const auto It = m_Index.find(BadKey);
+ DiskLocation Location = It->second.Location;
+ Location.Flags |= DiskLocation::kTombStone;
+ LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
+ m_Index.erase(BadKey);
+ }
}
+ m_SlogFile.Append(LogEntries);
}
- m_SlogFile.Append(LogEntries);
}
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadCasChunks(BadKeys);
+ Ctx.ReportBadCidChunks(BadKeys);
+
+ ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
}
void
@@ -1517,7 +1222,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
if (Cids.size() > 1024)
{
- GcCtx.ContributeCids(Cids);
+ GcCtx.AddRetainedCids(Cids);
Cids.clear();
}
@@ -1552,8 +1257,8 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
}
}
- GcCtx.ContributeCids(Cids);
- GcCtx.ContributeCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys));
+ GcCtx.AddRetainedCids(Cids);
+ GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys));
}
void
@@ -1601,7 +1306,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string());
std::vector<IoHash> DeleteCacheKeys;
DeleteCacheKeys.reserve(ExpiredCacheKeys.size());
- GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
+ GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
if (Keep)
{
return;
@@ -1752,7 +1457,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
ChunkLocations.reserve(TotalChunkCount);
ChunkIndexToChunkHash.reserve(TotalChunkCount);
- GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
auto KeyIt = Index.find(ChunkHash);
const DiskLocation& DiskLocation = KeyIt->second.Location;
BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment);
@@ -1836,7 +1541,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
},
[&]() { return GcCtx.CollectSmallObjects(); });
- GcCtx.DeletedCas(DeletedChunks);
+ GcCtx.AddDeletedCids(DeletedChunks);
}
void
@@ -2302,7 +2007,7 @@ ZenCacheDiskLayer::TotalSize() const
static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc";
-ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration)
+ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration)
: GcStorage(Gc)
, GcContributor(Gc)
, m_Gc(Gc)
@@ -2313,7 +2018,6 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration)
DirectoryContent DirContent;
GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
- std::vector<std::string> LegacyBuckets;
std::vector<std::string> Namespaces;
for (const std::filesystem::path& DirPath : DirContent.Directories)
{
@@ -2323,33 +2027,17 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration)
Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length()));
continue;
}
- LegacyBuckets.push_back(DirName);
}
- ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), m_Configuration.BasePath, LegacyBuckets.size());
+ ZEN_INFO("Found #{} namespaces in '{}'", Namespaces.size(), m_Configuration.BasePath);
if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end())
{
// default (unspecified) and ue4-ddc namespace points to the same namespace instance
- ZEN_INFO("Moving #{} legacy buckets to '{}' namespace", LegacyBuckets.size(), UE4DDCNamespaceName);
-
std::filesystem::path DefaultNamespaceFolder =
m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
CreateDirectories(DefaultNamespaceFolder);
-
- // Move any non-namespace folders into the default namespace folder
- for (const std::string& DirName : LegacyBuckets)
- {
- std::filesystem::path LegacyFolder = m_Configuration.BasePath / DirName;
- std::filesystem::path NewPath = DefaultNamespaceFolder / DirName;
- std::error_code Ec;
- std::filesystem::rename(LegacyFolder, NewPath, Ec);
- if (Ec)
- {
- ZEN_ERROR("Unable to move '{}' to '{}', reason '{}'", LegacyFolder, NewPath, Ec.message());
- }
- }
Namespaces.push_back(std::string(UE4DDCNamespaceName));
}
@@ -2537,7 +2225,7 @@ TEST_CASE("z$.store")
{
ScopedTemporaryDirectory TempDir;
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
@@ -2592,7 +2280,7 @@ TEST_CASE("z$.size")
GcStorageSize CacheSize;
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256);
@@ -2612,7 +2300,7 @@ TEST_CASE("z$.size")
}
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const GcStorageSize SerializedSize = Zcs.StorageSize();
@@ -2635,7 +2323,7 @@ TEST_CASE("z$.size")
GcStorageSize CacheSize;
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64);
@@ -2655,7 +2343,7 @@ TEST_CASE("z$.size")
}
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const GcStorageSize SerializedSize = Zcs.StorageSize();
@@ -2680,7 +2368,7 @@ TEST_CASE("z$.gc")
ScopedTemporaryDirectory TempDir;
std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)};
- const auto CollectAndFilter = [](CasGc& Gc,
+ const auto CollectAndFilter = [](GcManager& Gc,
GcClock::TimePoint Time,
GcClock::Duration MaxDuration,
std::span<const IoHash> Cids,
@@ -2693,7 +2381,7 @@ TEST_CASE("z$.gc")
};
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "teardrinker"sv;
@@ -2730,7 +2418,7 @@ TEST_CASE("z$.gc")
// Expect timestamps to be serialized
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
std::vector<IoHash> Keep;
@@ -2751,7 +2439,7 @@ TEST_CASE("z$.gc")
SUBCASE("gc removes standalone values")
{
ScopedTemporaryDirectory TempDir;
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "fortysixandtwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -2799,7 +2487,7 @@ TEST_CASE("z$.gc")
SUBCASE("gc removes small objects")
{
ScopedTemporaryDirectory TempDir;
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "rightintwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -2848,154 +2536,6 @@ TEST_CASE("z$.gc")
}
}
-TEST_CASE("z$.legacyconversion")
-{
- ScopedTemporaryDirectory TempDir;
-
- uint64_t ChunkSizes[] = {2041,
- 1123,
- 1223,
- 1239,
- 341,
- 1412,
- 912,
- 774,
- 341,
- 431,
- 554,
- 1098,
- 2048,
- 339 + 64 * 1024,
- 561 + 64 * 1024,
- 16 + 64 * 1024,
- 16 + 64 * 1024,
- 2048,
- 2048};
- size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t);
- size_t SingleBlockSize = 0;
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(ChunkCount);
- for (uint64_t Size : ChunkSizes)
- {
- Chunks.push_back(testutils::CreateBinaryCacheValue(Size));
- SingleBlockSize += Size;
- }
-
- ZEN_UNUSED(SingleBlockSize);
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(ChunkCount);
- for (const IoBuffer& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- CreateDirectories(TempDir.Path());
-
- const std::string Bucket = "rightintwo";
- {
- CasGc Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path());
- const GcClock::TimePoint CurrentTime = GcClock::Now();
-
- for (size_t i = 0; i < ChunkCount; i++)
- {
- Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]});
- }
-
- std::vector<IoHash> KeepChunks;
- for (size_t i = 0; i < ChunkCount; i += 2)
- {
- KeepChunks.push_back(ChunkHashes[i]);
- }
- GcContext GcCtx(CurrentTime + std::chrono::hours(2));
- GcCtx.MaxCacheDuration(std::chrono::minutes(2));
- GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepChunks);
- Zcs.Flush();
- Gc.CollectGarbage(GcCtx);
- }
- std::filesystem::path BucketDir = TempDir.Path() / Bucket;
- std::filesystem::path BlocksBaseDir = BucketDir / "blocks";
-
- std::filesystem::path CasPath = BlockStore ::GetBlockPath(BlocksBaseDir, 1);
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir);
- std::filesystem::remove(LegacyDataPath);
- std::filesystem::rename(CasPath, LegacyDataPath);
-
- std::vector<DiskIndexEntry> LogEntries;
- std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket);
- if (std::filesystem::is_regular_file(IndexPath))
- {
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CacheBucketIndexHeader))
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
- CacheBucketIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion &&
- Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
- {
- LogEntries.resize(Header.EntryCount);
- ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
- }
- }
- ObjectIndexFile.Close();
- std::filesystem::remove(IndexPath);
- }
-
- std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket);
- {
- TCasLogFile<DiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- LogEntries.reserve(CasLog.GetLogCount());
- CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
- }
- TCasLogFile<LegacyDiskIndexEntry> LegacyLog;
- std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir);
- LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate);
-
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- uint64_t Size;
- uint64_t Offset;
- if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- Size = Entry.Location.Location.StandaloneSize;
- Offset = 0;
- }
- else
- {
- BlockStoreLocation Location = Entry.Location.GetBlockLocation(16);
- Size = Location.Size;
- Offset = Location.Offset;
- }
- LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast<uint64_t>(Entry.Location.Flags) << 56);
- LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation};
- LegacyLog.Append(LegacyEntry);
- }
- LegacyLog.Close();
-
- std::filesystem::remove_all(BlocksBaseDir);
- std::filesystem::remove(LogPath);
- std::filesystem::remove(IndexPath);
-
- {
- CasGc Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path());
-
- for (size_t i = 0; i < ChunkCount; i += 2)
- {
- ZenCacheValue Value;
- CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value));
- CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value));
- CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value));
- }
- }
-}
-
TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
{
// for (uint32_t i = 0; i < 100; ++i)
@@ -3045,7 +2585,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
CreateDirectories(TempDir.Path());
WorkerThreadPool ThreadPool(4);
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path());
{
@@ -3169,10 +2709,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepHashes);
+ GcCtx.AddRetainedCids(KeepHashes);
Zcs.CollectGarbage(GcCtx);
- CasChunkSet& Deleted = GcCtx.DeletedCas();
- Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
while (WorkCompleted < NewChunks.size() + Chunks.size())
@@ -3217,10 +2757,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepHashes);
+ GcCtx.AddRetainedCids(KeepHashes);
Zcs.CollectGarbage(GcCtx);
- CasChunkSet& Deleted = GcCtx.DeletedCas();
- Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
}
{
@@ -3261,7 +2801,7 @@ TEST_CASE("z$.namespaces")
IoHash Key1;
IoHash Key2;
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false});
const auto Bucket = "teardrinker"sv;
const auto CustomNamespace = "mynamespace"sv;
@@ -3286,7 +2826,7 @@ TEST_CASE("z$.namespaces")
}
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
const auto Bucket = "teardrinker"sv;
const auto CustomNamespace = "mynamespace"sv;
@@ -3346,7 +2886,7 @@ TEST_CASE("z$.drop.bucket")
};
WorkerThreadPool Workers(1);
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
const auto Bucket = "teardrinker"sv;
const auto Namespace = "mynamespace"sv;
@@ -3415,7 +2955,7 @@ TEST_CASE("z$.drop.namespace")
};
WorkerThreadPool Workers(1);
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
const auto Bucket1 = "teardrinker1"sv;
const auto Bucket2 = "teardrinker2"sv;
@@ -3480,7 +3020,7 @@ TEST_CASE("z$.blocked.disklayer.put")
return Writer.Save();
};
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
@@ -3517,6 +3057,96 @@ TEST_CASE("z$.blocked.disklayer.put")
CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0);
}
+TEST_CASE("z$.scrub")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ using namespace testutils;
+
+ struct CacheRecord
+ {
+ IoBuffer Record;
+ std::vector<CompressedBuffer> Attachments;
+ };
+
+ auto CreateCacheRecord = [](bool Structured, std::string_view Bucket, const IoHash& Key, const std::vector<size_t>& AttachmentSizes) {
+ CacheRecord Result;
+ if (Structured)
+ {
+ Result.Attachments.resize(AttachmentSizes.size());
+ CbObjectWriter Record;
+ Record.BeginObject("Key"sv);
+ {
+ Record << "Bucket"sv << Bucket;
+ Record << "Hash"sv << Key;
+ }
+ Record.EndObject();
+ for (size_t Index = 0; Index < AttachmentSizes.size(); Index++)
+ {
+ IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]);
+ CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData));
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), IoHash::FromBLAKE3(CompressedAttachmentData.GetRawHash()));
+ Result.Attachments[Index] = CompressedAttachmentData;
+ }
+ Result.Record = Record.Save().GetBuffer().AsIoBuffer();
+ Result.Record.SetContentType(ZenContentType::kCbObject);
+ }
+ else
+ {
+ std::string RecordData = fmt::format("{}:{}", Bucket, Key.ToHexString());
+ size_t TotalSize = RecordData.length() + 1;
+ for (size_t AttachmentSize : AttachmentSizes)
+ {
+ TotalSize += AttachmentSize;
+ }
+ Result.Record = IoBuffer(TotalSize);
+ char* DataPtr = (char*)Result.Record.MutableData();
+ memcpy(DataPtr, RecordData.c_str(), RecordData.length() + 1);
+ DataPtr += RecordData.length() + 1;
+ for (size_t AttachmentSize : AttachmentSizes)
+ {
+ IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSize);
+ memcpy(DataPtr, AttachmentData.GetData(), AttachmentData.GetSize());
+ DataPtr += AttachmentData.GetSize();
+ }
+ }
+ return Result;
+ };
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ auto CreateRecords =
+ [&](bool IsStructured, std::string_view BucketName, const std::vector<IoHash>& Cids, const std::vector<size_t>& AttachmentSizes) {
+ for (const IoHash& Cid : Cids)
+ {
+ CacheRecord Record = CreateCacheRecord(IsStructured, BucketName, Cid, AttachmentSizes);
+ Zcs.Put("mybucket", Cid, {.Value = Record.Record});
+ for (const CompressedBuffer& Attachment : Record.Attachments)
+ {
+ CidStore.AddChunk(Attachment);
+ }
+ }
+ };
+
+ std::vector<size_t> AttachmentSizes = {16, 1000, 2000, 4000, 8000, 64000, 80000};
+
+ std::vector<IoHash> UnstructuredCids{CreateKey(4), CreateKey(5), CreateKey(6)};
+ CreateRecords(false, "mybucket"sv, UnstructuredCids, AttachmentSizes);
+
+ std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)};
+ CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes);
+
+ ScrubContext ScrubCtx;
+ Zcs.Scrub(ScrubCtx);
+ CidStore.Scrub(ScrubCtx);
+ CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
+ CHECK(ScrubCtx.BadCids().GetSize() == 0);
+}
+
#endif
void