// Copyright Epic Games, Inc. All Rights Reserved. #include "structuredcachestore.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_PLATFORM_WINDOWS # include #endif ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include # include # include # include #endif ////////////////////////////////////////////////////////////////////////// namespace zen { namespace { #pragma pack(push) #pragma pack(1) struct CacheBucketIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; static constexpr uint32_t CurrentVersion = 1; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t PayloadAlignment = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } }; static_assert(sizeof(CacheBucketIndexHeader) == 32); struct LegacyDiskLocation { inline LegacyDiskLocation() = default; inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) , LowerSize(ValueSize & 0xFFFFffff) , IndexDataSize(IndexSize) { } static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize) static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } inline uint64_t Size() const { return LowerSize; } inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } inline ZenContentType GetContentType() const { ZenContentType ContentType = ZenContentType::kBinary; if (IsFlagSet(LegacyDiskLocation::kStructured)) { ContentType = ZenContentType::kCbObject; } if (IsFlagSet(LegacyDiskLocation::kCompressed)) { ContentType = ZenContentType::kCompressedBinary; } return ContentType; } inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; } private: uint64_t OffsetAndFlags = 0; uint32_t LowerSize = 0; uint32_t IndexDataSize = 0; }; struct LegacyDiskIndexEntry { IoHash Key; LegacyDiskLocation Location; }; #pragma pack(pop) static_assert(sizeof(LegacyDiskIndexEntry) == 36); const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; const char* LegacyDataExtension = ".sobs"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + IndexExtension); } std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + ".tmp" + IndexExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + LogExtension); } std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir) { return BucketDir / (std::string("zen") + LogExtension); } std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir) { return BucketDir / (std::string("zen") + LegacyDataExtension); } bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured | LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString()); return false; } if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone)) { return true; } uint64_t Size = Entry.Location.Size(); if (Size == 0) { OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); return false; } return true; } bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } if (Entry.Location.GetFlags() & ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); return false; } if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) { return true; } uint64_t Size = Entry.Location.Size(); if (Size == 0) { OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); return false; } return true; } bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) { int DropIndex = 0; do { if (!std::filesystem::exists(Dir)) { return false; } std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; if (std::filesystem::exists(DroppedBucketPath)) { DropIndex++; continue; } std::error_code Ec; std::filesystem::rename(Dir, DroppedBucketPath, Ec); if (!Ec) { DeleteDirectories(DroppedBucketPath); return true; } // TODO: Do we need to bail at some point? zen::Sleep(100); } while (true); } } // namespace namespace fs = std::filesystem; static CbObject LoadCompactBinaryObject(const fs::path& Path) { FileContents Result = ReadFile(Path); if (!Result.ErrorCode) { IoBuffer Buffer = Result.Flatten(); if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) { return LoadCompactBinaryObject(Buffer); } } return CbObject(); } static void SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) { WriteFile(Path, Object.GetBuffer().AsIoBuffer()); } ZenCacheNamespace::ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir) : GcStorage(Gc) , GcContributor(Gc) , m_RootDir(RootDir) , m_DiskLayer(RootDir) { ZEN_INFO("initializing structured cache at '{}'", RootDir); CreateDirectories(RootDir); m_DiskLayer.DiscoverBuckets(); #if ZEN_USE_CACHE_TRACKER m_AccessTracker.reset(new ZenCacheTracker(RootDir)); #endif } ZenCacheNamespace::~ZenCacheNamespace() { } bool ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { ZEN_TRACE_CPU("Z$::Get"); bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); #if ZEN_USE_CACHE_TRACKER auto _ = MakeGuard([&] { if (!Ok) return; m_AccessTracker->TrackAccess(InBucket, HashKey); }); #endif if (Ok) { ZEN_ASSERT(OutValue.Value.Size()); return true; } Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); if (Ok) { ZEN_ASSERT(OutValue.Value.Size()); if (OutValue.Value.Size() <= m_DiskLayerSizeThreshold) { m_MemLayer.Put(InBucket, HashKey, OutValue); } } return Ok; } void ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { ZEN_TRACE_CPU("Z$::Put"); // Store value and index ZEN_ASSERT(Value.Value.Size()); m_DiskLayer.Put(InBucket, HashKey, Value); #if ZEN_USE_REF_TRACKING if (Value.Value.GetContentType() == ZenContentType::kCbObject) { if (ValidateCompactBinary(Value.Value, CbValidateMode::All) == CbValidateError::None) { CbObject Object{SharedBuffer(Value.Value)}; uint8_t TempBuffer[8 * sizeof(IoHash)]; std::pmr::monotonic_buffer_resource Linear{TempBuffer, sizeof TempBuffer}; std::pmr::polymorphic_allocator Allocator{&Linear}; std::pmr::vector CidReferences{Allocator}; Object.IterateAttachments([&](CbFieldView Field) { CidReferences.push_back(Field.AsAttachment()); }); m_Gc.OnNewCidReferences(CidReferences); } } #endif if (Value.Value.Size() <= m_DiskLayerSizeThreshold) { m_MemLayer.Put(InBucket, HashKey, Value); } } bool ZenCacheNamespace::DropBucket(std::string_view Bucket) { ZEN_INFO("dropping bucket '{}'", Bucket); // TODO: should ensure this is done atomically across all layers const bool MemDropped = m_MemLayer.DropBucket(Bucket); const bool DiskDropped = m_DiskLayer.DropBucket(Bucket); const bool AnyDropped = MemDropped || DiskDropped; ZEN_INFO("bucket '{}' was {}", Bucket, AnyDropped ? "dropped" : "not found"); return AnyDropped; } bool ZenCacheNamespace::Drop() { m_MemLayer.Drop(); return m_DiskLayer.Drop(); } void ZenCacheNamespace::Flush() { m_DiskLayer.Flush(); } void ZenCacheNamespace::Scrub(ScrubContext& Ctx) { if (m_LastScrubTime == Ctx.ScrubTimestamp()) { return; } m_LastScrubTime = Ctx.ScrubTimestamp(); m_DiskLayer.Scrub(Ctx); m_MemLayer.Scrub(Ctx); } void ZenCacheNamespace::GatherReferences(GcContext& GcCtx) { Stopwatch Timer; const auto Guard = MakeGuard([&] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); access_tracking::AccessTimes AccessTimes; m_MemLayer.GatherAccessTimes(AccessTimes); m_DiskLayer.UpdateAccessTimes(AccessTimes); m_DiskLayer.GatherReferences(GcCtx); } void ZenCacheNamespace::CollectGarbage(GcContext& GcCtx) { m_MemLayer.Reset(); m_DiskLayer.CollectGarbage(GcCtx); } GcStorageSize ZenCacheNamespace::StorageSize() const { return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()}; } ////////////////////////////////////////////////////////////////////////// ZenCacheMemoryLayer::ZenCacheMemoryLayer() { } ZenCacheMemoryLayer::~ZenCacheMemoryLayer() { } bool ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(std::string(InBucket)); if (It == m_Buckets.end()) { return false; } CacheBucket* Bucket = It->second.get(); _.ReleaseNow(); // There's a race here. Since the lock is released early to allow // inserts, the bucket delete path could end up deleting the // underlying data structure return Bucket->Get(HashKey, OutValue); } void ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { // New bucket RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) { Bucket = It->second.get(); } else { auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique()); Bucket = InsertResult.first->second.get(); } } // Note that since the underlying IoBuffer is retained, the content type is also Bucket->Put(HashKey, Value); } bool ZenCacheMemoryLayer::DropBucket(std::string_view InBucket) { RwLock::ExclusiveLockScope _(m_Lock); auto It = m_Buckets.find(std::string(InBucket)); if (It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); Bucket.Drop(); return true; } return false; } void ZenCacheMemoryLayer::Drop() { RwLock::ExclusiveLockScope _(m_Lock); std::vector> Buckets; Buckets.reserve(m_Buckets.size()); while (!m_Buckets.empty()) { const auto& It = m_Buckets.begin(); CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); Bucket.Drop(); } } void ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { Kv.second->Scrub(Ctx); } } void ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes) { using namespace zen::access_tracking; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { std::vector& Bucket = AccessTimes.Buckets[Kv.first]; Kv.second->GatherAccessTimes(Bucket); } } void ZenCacheMemoryLayer::Reset() { RwLock::ExclusiveLockScope _(m_Lock); m_Buckets.clear(); } uint64_t ZenCacheMemoryLayer::TotalSize() const { uint64_t TotalSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { TotalSize += Kv.second->TotalSize(); } return TotalSize; } void ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_BucketLock); std::vector BadHashes; for (auto& Kv : m_CacheMap) { if (Kv.first != IoHash::HashBuffer(Kv.second.Payload)) { BadHashes.push_back(Kv.first); } } if (!BadHashes.empty()) { Ctx.ReportBadCasChunks(BadHashes); } } void ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector& AccessTimes) { RwLock::SharedLockScope _(m_BucketLock); std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [](const auto& Kv) { return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = Kv.second.LastAccess}; }); } bool ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { RwLock::SharedLockScope _(m_BucketLock); if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) { BucketValue& Value = It.value(); OutValue.Value = Value.Payload; Value.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); return true; } return false; } void ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { { RwLock::ExclusiveLockScope _(m_BucketLock); m_CacheMap.insert_or_assign(HashKey, BucketValue(Value.Value, GcClock::TickCount())); } m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order::relaxed); } void ZenCacheMemoryLayer::CacheBucket::Drop() { RwLock::ExclusiveLockScope _(m_BucketLock); m_CacheMap.clear(); } ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero) { } ZenCacheDiskLayer::CacheBucket::~CacheBucket() { } bool ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { using namespace std::literals; m_BlocksBasePath = BucketDir / "blocks"; CreateDirectories(BucketDir); std::filesystem::path ManifestPath{BucketDir / "zen_manifest"}; bool IsNew = false; CbObject Manifest = LoadCompactBinaryObject(ManifestPath); if (Manifest) { m_BucketId = Manifest["BucketId"].AsObjectId(); if (m_BucketId == Oid::Zero) { return false; } } else if (AllowCreate) { m_BucketId.Generate(); CbObjectWriter Writer; Writer << "BucketId"sv << m_BucketId; Manifest = Writer.Save(); SaveCompactBinaryObject(ManifestPath, Manifest); IsNew = true; } else { return false; } OpenLog(BucketDir, IsNew); for (CbFieldView Entry : Manifest["Timestamps"]) { const CbObjectView Obj = Entry.AsObjectView(); const IoHash Key = Obj["Key"sv].AsHash(); if (auto It = m_Index.find(Key); It != m_Index.end()) { It.value().LastAccess.store(Obj["LastAccess"sv].AsInt64(), std::memory_order_relaxed); } } return true; } void ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() { ZEN_INFO("write store snapshot for '{}'", m_BucketDir / m_BucketName); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", m_BucketDir / m_BucketName, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); // Move index away, we keep it if something goes wrong if (fs::is_regular_file(STmpIndexPath)) { fs::remove(STmpIndexPath); } if (fs::is_regular_file(IndexPath)) { fs::rename(IndexPath, STmpIndexPath); } try { m_SlogFile.Flush(); // Write the current state of the location map to a new index state uint64_t LogCount = 0; std::vector Entries; { Entries.resize(m_Index.size()); uint64_t EntryIndex = 0; for (auto& Entry : m_Index) { DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Location = Entry.second.Location; } LogCount = m_SlogFile.GetLogCount(); } BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount, .PayloadAlignment = gsl::narrow(m_PayloadAlignment)}; Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); ObjectIndexFile.Flush(); ObjectIndexFile.Close(); EntryCount = Entries.size(); } catch (std::exception& Err) { ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); // Restore any previous snapshot if (fs::is_regular_file(STmpIndexPath)) { fs::remove(IndexPath); fs::rename(STmpIndexPath, IndexPath); } } if (fs::is_regular_file(STmpIndexPath)) { fs::remove(STmpIndexPath); } } uint64_t ZenCacheDiskLayer::CacheBucket::ReadIndexFile() { std::vector Entries; std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); if (std::filesystem::is_regular_file(IndexPath)) { Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing #{} entries in {}", m_BucketDir / m_BucketName, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(CacheBucketIndexHeader)) { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); CacheBucketIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && (Header.Version == CacheBucketIndexHeader::CurrentVersion) && (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && (Header.EntryCount <= ExpectedEntryCount)) { Entries.resize(Header.EntryCount); ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); m_PayloadAlignment = Header.PayloadAlignment; std::string InvalidEntryReason; for (const DiskIndexEntry& Entry : Entries) { if (!ValidateEntry(Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; } m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); } return Header.LogPosition; } else { ZEN_WARN("skipping invalid index file '{}'", IndexPath); } } } return 0; } uint64_t ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount) { std::vector Entries; std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); if (std::filesystem::is_regular_file(LogPath)) { Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing #{} entries in {}", LogPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (CasLog.Initialize()) { uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } uint64_t ReadCount = EntryCount - SkipEntryCount; m_Index.reserve(ReadCount); uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const DiskIndexEntry& Record) { std::string InvalidEntryReason; if (Record.Location.Flags & DiskLocation::kTombStone) { m_Index.erase(Record.Key); return; } if (!ValidateEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; return; } m_Index.insert_or_assign(Record.Key, IndexEntry(Record.Location, GcClock::TickCount())); }, SkipEntryCount); if (InvalidEntryCount) { ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); } } } return 0; }; uint64_t ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) { std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir); if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0) { return 0; } ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName); std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir); uint64_t MigratedChunkCount = 0; uint32_t MigratedBlockCount = 0; Stopwatch MigrationTimer; uint64_t TotalSize = 0; const auto _ = MakeGuard([&] { ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", m_BucketDir / m_BucketName, MigratedChunkCount, MigratedBlockCount, NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), NiceBytes(TotalSize)); }); uint64_t BlockFileSize = 0; { BasicFile BlockFile; BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); BlockFileSize = BlockFile.FileSize(); } std::unordered_map LegacyDiskIndex; uint64_t InvalidEntryCount = 0; size_t BlockChunkCount = 0; TCasLogFile LegacyCasLog; LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); { Stopwatch Timer; const auto __ = MakeGuard([&] { ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", LegacyLogPath, LegacyDiskIndex.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); if (LegacyCasLog.Initialize()) { LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); LegacyCasLog.Replay( [&](const LegacyDiskIndexEntry& Record) { if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone)) { LegacyDiskIndex.erase(Record.Key); return; } std::string InvalidEntryReason; if (!ValidateLegacyEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); ++InvalidEntryCount; return; } if (m_Index.contains(Record.Key)) { return; } LegacyDiskIndex[Record.Key] = Record; }, 0); std::vector BadEntries; for (const auto& Entry : LegacyDiskIndex) { const LegacyDiskIndexEntry& Record(Entry.second); if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) { continue; } if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize) { BlockChunkCount++; continue; } ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath); BadEntries.push_back(Entry.first); } for (const IoHash& BadHash : BadEntries) { LegacyDiskIndex.erase(BadHash); } InvalidEntryCount += BadEntries.size(); } } if (InvalidEntryCount) { ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); } if (LegacyDiskIndex.empty()) { LegacyCasLog.Close(); if (CleanSource) { // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find // a manifest and crashes on startup if they don't. // In order to not break startup when switching back an older version, lets just reset // the legacy data files to zero length. BasicFile LegacyLog; LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); BasicFile LegacySobs; LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); } return 0; } std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); CreateDirectories(LogPath.parent_path()); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kWrite); std::unordered_map ChunkIndexToChunkHash; std::vector ChunkLocations; ChunkIndexToChunkHash.reserve(BlockChunkCount); ChunkLocations.reserve(BlockChunkCount); std::vector LogEntries; LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount); for (const auto& Entry : LegacyDiskIndex) { const IoHash& ChunkHash = Entry.first; const LegacyDiskLocation& Location = Entry.second.Location; if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) { uint8_t Flags = 0xff & (Location.Flags() >> 56); DiskLocation NewLocation = DiskLocation(Location.Size(), Flags); LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation}); continue; } size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()}); ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; TotalSize += Location.Size(); } for (const DiskIndexEntry& Entry : LogEntries) { m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); } CasLog.Append(LogEntries); m_BlockStore.Split( ChunkLocations, LegacyDataPath, m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, m_PayloadAlignment, CleanSource, [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( const BlockStore::MovedChunksArray& MovedChunks) { std::vector LogEntries; LogEntries.reserve(MovedChunks.size()); for (const auto& Entry : MovedChunks) { size_t ChunkIndex = Entry.first; const BlockStoreLocation& NewLocation = Entry.second; const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; const LegacyDiskLocation& OldLocation = OldEntry.Location; uint8_t Flags = 0xff & (OldLocation.Flags() >> 56); LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)}); } for (const DiskIndexEntry& Entry : LogEntries) { m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); } CasLog.Append(LogEntries); CasLog.Flush(); if (CleanSource) { std::vector LegacyLogEntries; LegacyLogEntries.reserve(MovedChunks.size()); for (const auto& Entry : MovedChunks) { size_t ChunkIndex = Entry.first; const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; const LegacyDiskLocation& OldLocation = OldEntry.Location; LegacyDiskLocation NewLocation(OldLocation.Offset(), OldLocation.Size(), 0, OldLocation.Flags() | LegacyDiskLocation::kTombStone); LegacyLogEntries.push_back(LegacyDiskIndexEntry{.Key = ChunkHash, .Location = NewLocation}); } LegacyCasLog.Append(LegacyLogEntries); LegacyCasLog.Flush(); } MigratedBlockCount++; MigratedChunkCount += MovedChunks.size(); }); LegacyCasLog.Close(); CasLog.Close(); if (CleanSource) { // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find // a manifest and crashes on startup if they don't. // In order to not break startup when switching back an older version, lets just reset // the legacy data files to zero length. BasicFile LegacyLog; LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); BasicFile LegacySobs; LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); } return MigratedChunkCount; } void ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) { m_BucketDir = BucketDir; m_TotalSize = 0; m_Index.clear(); std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir); std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); if (IsNew) { std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir); fs::remove(LegacyLogPath); fs::remove(LegacyDataPath); fs::remove(LogPath); fs::remove(IndexPath); fs::remove_all(m_BlocksBasePath); } uint64_t LogPosition = ReadIndexFile(); uint64_t LogEntryCount = ReadLog(LogPosition); uint64_t LegacyLogEntryCount = MigrateLegacyData(true); CreateDirectories(m_BucketDir); m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); std::vector KnownLocations; KnownLocations.reserve(m_Index.size()); for (const auto& Entry : m_Index) { const DiskLocation& Location = Entry.second.Location; m_TotalSize.fetch_add(Location.Size(), std::memory_order::relaxed); if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { continue; } const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); KnownLocations.push_back(BlockLocation); } m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0)) { MakeIndexSnapshot(); } // TODO: should validate integrity of container files here } void ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) { char HexString[sizeof(HashKey.Hash) * 2]; ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); Path.Append(m_BucketDir); Path.Append(L"/blob/"); Path.AppendAsciiRange(HexString, HexString + 3); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 3, HexString + 5); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); } bool ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue) { BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); OutValue.Value = m_BlockStore.TryGetChunk(Location); if (!OutValue.Value) { return false; } OutValue.Value.SetContentType(Loc.GetContentType()); return true; } bool ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue) { ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); if (IoBuffer Data = IoBufferBuilder::MakeFromFileWithSharedDelete(DataFilePath.ToPath())) { OutValue.Value = Data; OutValue.Value.SetContentType(Loc.GetContentType()); return true; } return false; } bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { RwLock::SharedLockScope _(m_IndexLock); auto It = m_Index.find(HashKey); if (It == m_Index.end()) { return false; } IndexEntry& Entry = It.value(); Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); DiskLocation Location = Entry.Location; if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { // We don't need to hold the index lock when we read a standalone file _.ReleaseNow(); return GetStandaloneCacheValue(Location, HashKey, OutValue); } return GetInlineCacheValue(Location, OutValue); } void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { if (Value.Value.Size() >= m_LargeObjectThreshold) { return PutStandaloneCacheValue(HashKey, Value); } PutInlineCacheValue(HashKey, Value); } bool ZenCacheDiskLayer::CacheBucket::Drop() { RwLock::ExclusiveLockScope _(m_IndexLock); std::vector> ShardLocks; ShardLocks.reserve(256); for (RwLock& Lock : m_ShardedLocks) { ShardLocks.push_back(std::make_unique(Lock)); } m_BlockStore.Close(); m_SlogFile.Close(); bool Deleted = MoveAndDeleteDirectory(m_BucketDir); m_Index.clear(); return Deleted; } void ZenCacheDiskLayer::CacheBucket::Flush() { m_BlockStore.Flush(); RwLock::SharedLockScope _(m_IndexLock); MakeIndexSnapshot(); SaveManifest(); } void ZenCacheDiskLayer::CacheBucket::SaveManifest() { using namespace std::literals; CbObjectWriter Writer; Writer << "BucketId"sv << m_BucketId; if (!m_Index.empty()) { Writer.BeginArray("Timestamps"sv); for (auto& Kv : m_Index) { const IoHash& Key = Kv.first; const IndexEntry& Entry = Kv.second; Writer.BeginObject(); Writer << "Key"sv << Key; Writer << "LastAccess"sv << Entry.LastAccess; Writer.EndObject(); } Writer.EndArray(); } SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); } void ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { std::vector BadKeys; { RwLock::SharedLockScope _(m_IndexLock); for (auto& Kv : m_Index) { const IoHash& HashKey = Kv.first; const DiskLocation& Loc = Kv.second.Location; ZenCacheValue Value; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { if (GetStandaloneCacheValue(Loc, HashKey, Value)) { // Note: we cannot currently validate contents since we don't // have a content hash! continue; } } else if (GetInlineCacheValue(Loc, Value)) { // Validate contents continue; } // Value not found BadKeys.push_back(HashKey); } } if (BadKeys.empty()) { return; } if (Ctx.RunRecovery()) { RwLock::ExclusiveLockScope _(m_IndexLock); for (const IoHash& BadKey : BadKeys) { // Log a tombstone and delete the in-memory index for the bad entry const auto It = m_Index.find(BadKey); DiskLocation Location = It->second.Location; Location.Flags |= DiskLocation::kTombStone; m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = Location}); m_Index.erase(BadKey); } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do Ctx.ReportBadCasChunks(BadKeys); } void ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_INFO("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", m_BucketDir / m_BucketName, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), NiceLatencyNs(ReadBlockTimeUs), NiceLatencyNs(ReadBlockLongestTimeUs)); }); const GcClock::TimePoint ExpireTime = GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration(); const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); IndexMap Index; { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); Index = m_Index; } std::vector ExpiredKeys; ExpiredKeys.reserve(1024); std::vector Cids; Cids.reserve(1024); for (const auto& Entry : Index) { const IoHash& Key = Entry.first; if (Entry.second.LastAccess < ExpireTicks) { ExpiredKeys.push_back(Key); continue; } const DiskLocation& Loc = Entry.second.Location; if (Loc.IsFlagSet(DiskLocation::kStructured)) { if (Cids.size() > 1024) { GcCtx.ContributeCids(Cids); Cids.clear(); } ZenCacheValue CacheValue; { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { // We don't need to hold the index lock when we read a standalone file __.ReleaseNow(); if (!GetStandaloneCacheValue(Loc, Key, CacheValue)) { continue; } } else if (!GetInlineCacheValue(Loc, CacheValue)) { continue; } } ZEN_ASSERT(CacheValue.Value); ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); CbObject Obj(SharedBuffer{CacheValue.Value}); Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); } } GcCtx.ContributeCids(Cids); GcCtx.ContributeCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); } void ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); ZEN_INFO("collecting garbage from '{}'", m_BucketDir / m_BucketName); Stopwatch TotalTimer; uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; uint64_t TotalChunkCount = 0; uint64_t DeletedSize = 0; uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); uint64_t DeletedCount = 0; uint64_t MovedCount = 0; const auto _ = MakeGuard([&] { ZEN_INFO( "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " "#{} " "of #{} " "entires ({}).", m_BucketDir / m_BucketName, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), NiceLatencyNs(WriteBlockTimeUs), NiceLatencyNs(WriteBlockLongestTimeUs), NiceLatencyNs(ReadBlockTimeUs), NiceLatencyNs(ReadBlockLongestTimeUs), NiceBytes(DeletedSize), DeletedCount, MovedCount, TotalChunkCount, NiceBytes(OldTotalSize)); RwLock::SharedLockScope _(m_IndexLock); SaveManifest(); }); m_SlogFile.Flush(); std::span ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); std::vector DeleteCacheKeys; DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { if (Keep) { return; } DeleteCacheKeys.push_back(ChunkHash); }); if (DeleteCacheKeys.empty()) { ZEN_INFO("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName); return; } std::vector ExpiredStandaloneEntries; IndexMap Index; BlockStore::ReclaimSnapshotState BlockStoreState; { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); if (m_Index.empty()) { ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); return; } BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); SaveManifest(); Index = m_Index; for (const IoHash& Key : DeleteCacheKeys) { if (auto It = Index.find(Key); It != Index.end()) { DiskIndexEntry Entry = {.Key = It->first, .Location = It->second.Location}; if (Entry.Location.Flags & DiskLocation::kStandaloneFile) { Entry.Location.Flags |= DiskLocation::kTombStone; ExpiredStandaloneEntries.push_back(Entry); } } } if (GcCtx.IsDeletionMode()) { for (const auto& Entry : ExpiredStandaloneEntries) { m_Index.erase(Entry.Key); } m_SlogFile.Append(ExpiredStandaloneEntries); } } if (GcCtx.IsDeletionMode()) { std::error_code Ec; ExtendablePathBuilder<256> Path; for (const auto& Entry : ExpiredStandaloneEntries) { const IoHash& Key = Entry.Key; const DiskLocation& Loc = Entry.Location; Path.Reset(); BuildPath(Path, Key); fs::path FilePath = Path.ToPath(); { RwLock::SharedLockScope __(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); if (m_Index.contains(Key)) { // Someone added it back, let the file on disk be ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); continue; } __.ReleaseNow(); RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); if (fs::is_regular_file(FilePath)) { ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); fs::remove(FilePath, Ec); } } if (Ec) { ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); Ec.clear(); DiskLocation RestoreLocation = Loc; RestoreLocation.Flags &= ~DiskLocation::kTombStone; RwLock::ExclusiveLockScope __(m_IndexLock); Stopwatch Timer; const auto ___ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); if (m_Index.contains(Key)) { continue; } m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); m_Index.insert({Key, {Loc, GcClock::TickCount()}}); m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); continue; } m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); DeletedSize += Entry.Location.Size(); DeletedCount++; } } TotalChunkCount = Index.size(); std::vector TotalChunkHashes; TotalChunkHashes.reserve(TotalChunkCount); for (const auto& Entry : Index) { const DiskLocation& Location = Entry.second.Location; if (Location.Flags & DiskLocation::kStandaloneFile) { continue; } TotalChunkHashes.push_back(Entry.first); } if (TotalChunkHashes.empty()) { return; } TotalChunkCount = TotalChunkHashes.size(); std::vector ChunkLocations; BlockStore::ChunkIndexArray KeepChunkIndexes; std::vector ChunkIndexToChunkHash; ChunkLocations.reserve(TotalChunkCount); ChunkLocations.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { auto KeyIt = Index.find(ChunkHash); const DiskLocation& DiskLocation = KeyIt->second.Location; BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); size_t ChunkIndex = ChunkLocations.size(); ChunkLocations.push_back(Location); ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; if (Keep) { KeepChunkIndexes.push_back(ChunkIndex); } }); size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", m_BucketDir / m_BucketName, DeleteCount, 0, // NiceBytes(TotalSize - NewTotalSize), TotalChunkCount, NiceBytes(TotalSize)); return; } std::vector DeletedChunks; m_BlockStore.ReclaimSpace( BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, false, [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { std::vector LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); for (const auto& Entry : MovedChunks) { size_t ChunkIndex = Entry.first; const BlockStoreLocation& NewLocation = Entry.second; const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; const DiskLocation& OldDiskLocation = Index[ChunkHash].Location; LogEntries.push_back( {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())}); } for (const size_t ChunkIndex : RemovedChunks) { const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; const DiskLocation& OldDiskLocation = Index[ChunkHash].Location; LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), m_PayloadAlignment, OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); DeletedChunks.push_back(ChunkHash); } m_SlogFile.Append(LogEntries); m_SlogFile.Flush(); { RwLock::ExclusiveLockScope __(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); for (const DiskIndexEntry& Entry : LogEntries) { if (Entry.Location.GetFlags() & DiskLocation::kTombStone) { m_Index.erase(Entry.Key); uint64_t ChunkSize = Entry.Location.GetBlockLocation(m_PayloadAlignment).Size; m_TotalSize.fetch_sub(ChunkSize); continue; } m_Index[Entry.Key].Location = Entry.Location; } } }, [&]() { return GcCtx.CollectSmallObjects(); }); GcCtx.DeletedCas(DeletedChunks); } void ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector& AccessTimes) { using namespace access_tracking; for (const KeyAccessTime& KeyTime : AccessTimes) { if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end()) { IndexEntry& Entry = It.value(); Entry.LastAccess.store(KeyTime.LastAccess, std::memory_order_relaxed); } } } void ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { CacheBucket& Bucket = *Kv.second; Bucket.CollectGarbage(GcCtx); } } void ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes) { RwLock::SharedLockScope _(m_Lock); for (const auto& Kv : AccessTimes.Buckets) { if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; Bucket.UpdateAccessTimes(Kv.second); } } } void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { uint64_t NewFileSize = Value.Value.Size(); TemporaryFile DataFile; std::error_code Ec; DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); } bool CleanUpTempFile = false; auto __ = MakeGuard([&] { if (CleanUpTempFile) { std::error_code Ec; std::filesystem::remove(DataFile.GetPath(), Ec); if (Ec) { ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", DataFile.GetPath(), m_BucketDir, Ec.message()); } } }); DataFile.WriteAll(Value.Value, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", NiceBytes(NewFileSize), DataFile.GetPath().string(), m_BucketDir)); } ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); std::filesystem::path FsPath{DataFilePath.ToPath()}; RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); // We do a speculative remove of the file instead of probing with a exists call and check the error code instead std::filesystem::remove(FsPath, Ec); if (Ec) { if (Ec.value() != ENOENT) { ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); Sleep(100); Ec.clear(); std::filesystem::remove(FsPath, Ec); if (Ec && Ec.value() != ENOENT) { throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); } } } DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (Ec) { std::filesystem::path ParentPath = FsPath.parent_path(); if (!std::filesystem::is_directory(ParentPath)) { Ec.clear(); std::filesystem::create_directories(ParentPath, Ec); if (Ec) { throw std::system_error( Ec, fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir)); } } // Try again DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (Ec) { ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retrying.", FsPath, DataFile.GetPath(), m_BucketDir, Ec.message()); Sleep(100); Ec.clear(); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (Ec) { throw std::system_error( Ec, fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); } } } // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file // will be disabled as the file handle has already been closed CleanUpTempFile = false; uint8_t EntryFlags = DiskLocation::kStandaloneFile; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } DiskLocation Loc(NewFileSize, EntryFlags); IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); uint64_t OldFileSize = 0; RwLock::ExclusiveLockScope _(m_IndexLock); if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object m_Index.insert({HashKey, Entry}); } else { // TODO: should check if write is idempotent and bail out if it is? OldFileSize = It.value().Location.Size(); It.value() = Entry; } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); if (OldFileSize <= NewFileSize) { m_TotalSize.fetch_add(NewFileSize - OldFileSize, std::memory_order::relaxed); } else { m_TotalSize.fetch_sub(OldFileSize - NewFileSize, std::memory_order::relaxed); } } void ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; m_SlogFile.Append(DiskIndexEntry); RwLock::ExclusiveLockScope _(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) { // TODO: should check if write is idempotent and bail out if it is? // this would requiring comparing contents on disk unless we add a // content hash to the index entry IndexEntry& Entry = It.value(); Entry.Location = Location; Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); } else { m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); } }); m_TotalSize.fetch_add(Value.Value.Size(), std::memory_order::relaxed); } ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) { } ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(BucketName); if (It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { // Bucket needs to be opened/created RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { Bucket = It->second.get(); } else { auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique(BucketName)); Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; if (!Bucket->OpenOrCreate(BucketPath)) { m_Buckets.erase(BucketName); return false; } } } ZEN_ASSERT(Bucket != nullptr); return Bucket->Get(HashKey, OutValue); } void ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto It = m_Buckets.find(BucketName); if (It != m_Buckets.end()) { Bucket = It->second.get(); } } if (Bucket == nullptr) { // New bucket needs to be created RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { Bucket = It->second.get(); } else { auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique(BucketName)); Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; if (!Bucket->OpenOrCreate(BucketPath)) { m_Buckets.erase(BucketName); return; } } } ZEN_ASSERT(Bucket != nullptr); Bucket->Put(HashKey, Value); } void ZenCacheDiskLayer::DiscoverBuckets() { DirectoryContent DirContent; GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); // Initialize buckets RwLock::ExclusiveLockScope _(m_Lock); for (const std::filesystem::path& BucketPath : DirContent.Directories) { std::string BucketName = PathToUtf8(BucketPath.stem()); // New bucket needs to be created if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { continue; } auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique(BucketName)); CacheBucket& Bucket = *InsertResult.first->second; if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false)) { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); m_Buckets.erase(InsertResult.first); continue; } ZEN_INFO("Discovered bucket '{}'", BucketName); } } bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { RwLock::ExclusiveLockScope _(m_Lock); auto It = m_Buckets.find(std::string(InBucket)); if (It != m_Buckets.end()) { CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It); return Bucket.Drop(); } // Make sure we remove the folder even if we don't know about the bucket std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); return MoveAndDeleteDirectory(BucketPath); } bool ZenCacheDiskLayer::Drop() { RwLock::ExclusiveLockScope _(m_Lock); std::vector> Buckets; Buckets.reserve(m_Buckets.size()); while (!m_Buckets.empty()) { const auto& It = m_Buckets.begin(); CacheBucket& Bucket = *It->second; m_DroppedBuckets.push_back(std::move(It->second)); m_Buckets.erase(It->first); if (!Bucket.Drop()) { return false; } } return MoveAndDeleteDirectory(m_RootDir); } void ZenCacheDiskLayer::Flush() { std::vector Buckets; { RwLock::SharedLockScope _(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { CacheBucket* Bucket = Kv.second.get(); Buckets.push_back(Bucket); } } for (auto& Bucket : Buckets) { Bucket->Flush(); } } void ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { CacheBucket& Bucket = *Kv.second; Bucket.Scrub(Ctx); } } void ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { CacheBucket& Bucket = *Kv.second; Bucket.GatherReferences(GcCtx); } } uint64_t ZenCacheDiskLayer::TotalSize() const { uint64_t TotalSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { TotalSize += Kv.second->TotalSize(); } return TotalSize; } //////////////////////////// ZenCacheStore static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc"; ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration) : GcStorage(Gc) , GcContributor(Gc) , m_Gc(Gc) , m_Configuration(Configuration) { CreateDirectories(m_Configuration.BasePath); DirectoryContent DirContent; GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent); std::vector LegacyBuckets; std::vector Namespaces; for (const std::filesystem::path& DirPath : DirContent.Directories) { std::string DirName = PathToUtf8(DirPath.filename()); if (DirName.starts_with(NamespaceDiskPrefix)) { Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length())); continue; } LegacyBuckets.push_back(DirName); } ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), m_Configuration.BasePath, LegacyBuckets.size()); if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end()) { // default (unspecified) and ue4-ddc namespace points to the same namespace instance ZEN_INFO("Moving #{} legacy buckets to '{}' namespace", LegacyBuckets.size(), UE4DDCNamespaceName); std::filesystem::path DefaultNamespaceFolder = m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName); CreateDirectories(DefaultNamespaceFolder); // Move any non-namespace folders into the default namespace folder for (const std::string& DirName : LegacyBuckets) { std::filesystem::path LegacyFolder = m_Configuration.BasePath / DirName; std::filesystem::path NewPath = DefaultNamespaceFolder / DirName; std::error_code Ec; std::filesystem::rename(LegacyFolder, NewPath, Ec); if (Ec) { ZEN_ERROR("Unable to move '{}' to '{}', reason '{}'", LegacyFolder, NewPath, Ec.message()); } } Namespaces.push_back(std::string(UE4DDCNamespaceName)); } for (const std::string& NamespaceName : Namespaces) { m_Namespaces[NamespaceName] = std::make_unique(Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName)); } } ZenCacheStore::~ZenCacheStore() { m_Namespaces.clear(); } bool ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue) { if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { return Store->Get(Bucket, HashKey, OutValue); } ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); return false; } void ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value) { if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { return Store->Put(Bucket, HashKey, Value); } ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); } bool ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket) { if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { return Store->DropBucket(Bucket); } ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropBucket, bucket '{}'", Namespace, Bucket); return false; } bool ZenCacheStore::DropNamespace(std::string_view InNamespace) { RwLock::SharedLockScope _(m_NamespacesLock); if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end()) { ZenCacheNamespace& Namespace = *It->second; m_DroppedNamespaces.push_back(std::move(It->second)); m_Namespaces.erase(It); return Namespace.Drop(); } ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace); return false; } void ZenCacheStore::Flush() { IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); }); } void ZenCacheStore::Scrub(ScrubContext& Ctx) { IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); }); } ZenCacheNamespace* ZenCacheStore::GetNamespace(std::string_view Namespace) { RwLock::SharedLockScope _(m_NamespacesLock); if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) { return It->second.get(); } if (Namespace == DefaultNamespace) { if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end()) { return It->second.get(); } } _.ReleaseNow(); if (!m_Configuration.AllowAutomaticCreationOfNamespaces) { return nullptr; } RwLock::ExclusiveLockScope __(m_NamespacesLock); if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) { return It->second.get(); } auto NewNamespace = m_Namespaces.insert_or_assign( std::string(Namespace), std::make_unique(m_Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace))); return NewNamespace.first->second.get(); } void ZenCacheStore::IterateNamespaces(const std::function& Callback) const { std::vector> Namespaces; { RwLock::SharedLockScope _(m_NamespacesLock); Namespaces.reserve(m_Namespaces.size()); for (const auto& Entry : m_Namespaces) { if (Entry.first == DefaultNamespace) { continue; } Namespaces.push_back({Entry.first, *Entry.second}); } } for (auto& Entry : Namespaces) { Callback(Entry.first, Entry.second); } } void ZenCacheStore::GatherReferences(GcContext& GcCtx) { IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.GatherReferences(GcCtx); }); } void ZenCacheStore::CollectGarbage(GcContext& GcCtx) { IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.CollectGarbage(GcCtx); }); } GcStorageSize ZenCacheStore::StorageSize() const { GcStorageSize Size; IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { GcStorageSize StoreSize = Store.StorageSize(); Size.MemorySize += StoreSize.MemorySize; Size.DiskSize += StoreSize.DiskSize; }); return Size; } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS using namespace std::literals; namespace testutils { IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); } IoBuffer CreateBinaryCacheValue(uint64_t Size) { static std::random_device rd; static std::mt19937 g(rd()); std::vector Values; Values.resize(Size); for (size_t Idx = 0; Idx < Size; ++Idx) { Values[Idx] = static_cast(Idx); } std::shuffle(Values.begin(), Values.end(), g); IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size()); Buf.SetContentType(ZenContentType::kBinary); return Buf; }; } // namespace testutils TEST_CASE("z$.store") { ScopedTemporaryDirectory TempDir; CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const int kIterationCount = 100; for (int i = 0; i < kIterationCount; ++i) { const IoHash Key = IoHash::HashBuffer(&i, sizeof i); CbObjectWriter Cbo; Cbo << "hey" << i; CbObject Obj = Cbo.Save(); ZenCacheValue Value; Value.Value = Obj.GetBuffer().AsIoBuffer(); Value.Value.SetContentType(ZenContentType::kCbObject); Zcs.Put("test_bucket"sv, Key, Value); } for (int i = 0; i < kIterationCount; ++i) { const IoHash Key = IoHash::HashBuffer(&i, sizeof i); ZenCacheValue Value; Zcs.Get("test_bucket"sv, Key, /* out */ Value); REQUIRE(Value.Value); CHECK(Value.Value.GetContentType() == ZenContentType::kCbObject); CHECK_EQ(ValidateCompactBinary(Value.Value, CbValidateMode::All), CbValidateError::None); CbObject Obj = LoadCompactBinaryObject(Value.Value); CHECK_EQ(Obj["hey"].AsInt32(), i); } } TEST_CASE("z$.size") { const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector Buf; Buf.resize(Size); CbObjectWriter Writer; Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); return Writer.Save(); }; SUBCASE("mem/disklayer") { const size_t Count = 16; ScopedTemporaryDirectory TempDir; GcStorageSize CacheSize; { CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256); IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); for (size_t Key = 0; Key < Count; ++Key) { const size_t Bucket = Key % 4; Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}); } CacheSize = Zcs.StorageSize(); CHECK_EQ(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_EQ(CacheValue.GetSize() * Count, CacheSize.MemorySize); } { CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const GcStorageSize SerializedSize = Zcs.StorageSize(); CHECK_EQ(SerializedSize.MemorySize, 0); CHECK_EQ(SerializedSize.DiskSize, CacheSize.DiskSize); for (size_t Bucket = 0; Bucket < 4; ++Bucket) { Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); } CHECK_EQ(0, Zcs.StorageSize().DiskSize); } } SUBCASE("disklayer") { const size_t Count = 16; ScopedTemporaryDirectory TempDir; GcStorageSize CacheSize; { CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64); IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); for (size_t Key = 0; Key < Count; ++Key) { const size_t Bucket = Key % 4; Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}); } CacheSize = Zcs.StorageSize(); CHECK_EQ(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_EQ(0, CacheSize.MemorySize); } { CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const GcStorageSize SerializedSize = Zcs.StorageSize(); CHECK_EQ(SerializedSize.MemorySize, 0); CHECK_EQ(SerializedSize.DiskSize, CacheSize.DiskSize); for (size_t Bucket = 0; Bucket < 4; ++Bucket) { Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); } CHECK_EQ(0, Zcs.StorageSize().DiskSize); } } } TEST_CASE("z$.gc") { using namespace testutils; SUBCASE("gather references does NOT add references for expired cache entries") { ScopedTemporaryDirectory TempDir; std::vector Cids{CreateKey(1), CreateKey(2), CreateKey(3)}; const auto CollectAndFilter = [](CasGc& Gc, GcClock::TimePoint Time, GcClock::Duration MaxDuration, std::span Cids, std::vector& OutKeep) { GcContext GcCtx(Time); GcCtx.MaxCacheDuration(MaxDuration); Gc.CollectGarbage(GcCtx); OutKeep.clear(); GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); }); }; { CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "teardrinker"sv; // Create a cache record const IoHash Key = CreateKey(42); CbObjectWriter Record; Record << "Key"sv << "SomeRecord"sv; for (size_t Idx = 0; auto& Cid : Cids) { Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid); } IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); Zcs.Put(Bucket, Key, {.Value = Buffer}); std::vector Keep; // Collect garbage with 1 hour max cache duration { CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); CHECK_EQ(Cids.size(), Keep.size()); } // Move forward in time { CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); CHECK_EQ(0, Keep.size()); } } // Expect timestamps to be serialized { CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); std::vector Keep; // Collect garbage with 1 hour max cache duration { CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); CHECK_EQ(3, Keep.size()); } // Move forward in time { CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); CHECK_EQ(0, Keep.size()); } } } SUBCASE("gc removes standalone values") { ScopedTemporaryDirectory TempDir; CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "fortysixandtwo"sv; const GcClock::TimePoint CurrentTime = GcClock::Now(); std::vector Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; for (const auto& Key : Keys) { IoBuffer Value = testutils::CreateBinaryCacheValue(128 << 10); Zcs.Put(Bucket, Key, {.Value = Value}); } { GcContext GcCtx; GcCtx.MaxCacheDuration(std::chrono::hours(46)); Gc.CollectGarbage(GcCtx); for (const auto& Key : Keys) { ZenCacheValue CacheValue; const bool Exists = Zcs.Get(Bucket, Key, CacheValue); CHECK(Exists); } } // Move forward in time and collect again { GcContext GcCtx(CurrentTime + std::chrono::hours(46)); GcCtx.MaxCacheDuration(std::chrono::minutes(2)); Gc.CollectGarbage(GcCtx); for (const auto& Key : Keys) { ZenCacheValue CacheValue; const bool Exists = Zcs.Get(Bucket, Key, CacheValue); CHECK(!Exists); } CHECK_EQ(0, Zcs.StorageSize().DiskSize); } } SUBCASE("gc removes small objects") { ScopedTemporaryDirectory TempDir; CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "rightintwo"sv; const GcClock::TimePoint CurrentTime = GcClock::Now(); std::vector Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; for (const auto& Key : Keys) { IoBuffer Value = testutils::CreateBinaryCacheValue(128); Zcs.Put(Bucket, Key, {.Value = Value}); } { GcContext GcCtx; GcCtx.MaxCacheDuration(std::chrono::hours(2)); GcCtx.CollectSmallObjects(true); Gc.CollectGarbage(GcCtx); for (const auto& Key : Keys) { ZenCacheValue CacheValue; const bool Exists = Zcs.Get(Bucket, Key, CacheValue); CHECK(Exists); } } // Move forward in time and collect again { GcContext GcCtx(CurrentTime + std::chrono::hours(2)); GcCtx.MaxCacheDuration(std::chrono::minutes(2)); GcCtx.CollectSmallObjects(true); Zcs.Flush(); Gc.CollectGarbage(GcCtx); for (const auto& Key : Keys) { ZenCacheValue CacheValue; const bool Exists = Zcs.Get(Bucket, Key, CacheValue); CHECK(!Exists); } CHECK_EQ(0, Zcs.StorageSize().DiskSize); } } } TEST_CASE("z$.legacyconversion") { ScopedTemporaryDirectory TempDir; uint64_t ChunkSizes[] = {2041, 1123, 1223, 1239, 341, 1412, 912, 774, 341, 431, 554, 1098, 2048, 339 + 64 * 1024, 561 + 64 * 1024, 16 + 64 * 1024, 16 + 64 * 1024, 2048, 2048}; size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t); size_t SingleBlockSize = 0; std::vector Chunks; Chunks.reserve(ChunkCount); for (uint64_t Size : ChunkSizes) { Chunks.push_back(testutils::CreateBinaryCacheValue(Size)); SingleBlockSize += Size; } ZEN_UNUSED(SingleBlockSize); std::vector ChunkHashes; ChunkHashes.reserve(ChunkCount); for (const IoBuffer& Chunk : Chunks) { ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } CreateDirectories(TempDir.Path()); const std::string Bucket = "rightintwo"; { CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path()); const GcClock::TimePoint CurrentTime = GcClock::Now(); for (size_t i = 0; i < ChunkCount; i++) { Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]}); } std::vector KeepChunks; for (size_t i = 0; i < ChunkCount; i += 2) { KeepChunks.push_back(ChunkHashes[i]); } GcContext GcCtx(CurrentTime + std::chrono::hours(2)); GcCtx.MaxCacheDuration(std::chrono::minutes(2)); GcCtx.CollectSmallObjects(true); GcCtx.ContributeCas(KeepChunks); Zcs.Flush(); Gc.CollectGarbage(GcCtx); } std::filesystem::path BucketDir = TempDir.Path() / Bucket; std::filesystem::path BlocksBaseDir = BucketDir / "blocks"; std::filesystem::path CasPath = BlockStore ::GetBlockPath(BlocksBaseDir, 1); std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir); std::filesystem::remove(LegacyDataPath); std::filesystem::rename(CasPath, LegacyDataPath); std::vector LogEntries; std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket); if (std::filesystem::is_regular_file(IndexPath)) { BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(CacheBucketIndexHeader)) { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); CacheBucketIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion && Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) { LogEntries.resize(Header.EntryCount); ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); } } ObjectIndexFile.Close(); std::filesystem::remove(IndexPath); } std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket); { TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kRead); LogEntries.reserve(CasLog.GetLogCount()); CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0); } TCasLogFile LegacyLog; std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir); LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate); for (const DiskIndexEntry& Entry : LogEntries) { uint64_t Size; uint64_t Offset; if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) { Size = Entry.Location.Location.StandaloneSize; Offset = 0; } else { BlockStoreLocation Location = Entry.Location.GetBlockLocation(16); Size = Location.Size; Offset = Location.Offset; } LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast(Entry.Location.Flags) << 56); LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation}; LegacyLog.Append(LegacyEntry); } LegacyLog.Close(); std::filesystem::remove_all(BlocksBaseDir); std::filesystem::remove(LogPath); std::filesystem::remove(IndexPath); { CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path()); for (size_t i = 0; i < ChunkCount; i += 2) { ZenCacheValue Value; CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value)); CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value)); CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value)); } } } TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) { // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; const uint64_t kChunkSize = 1048; const int32_t kChunkCount = 8192; struct Chunk { std::string Bucket; IoBuffer Buffer; }; std::unordered_map Chunks; Chunks.reserve(kChunkCount); const std::string Bucket1 = "rightinone"; const std::string Bucket2 = "rightintwo"; for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { while (true) { IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); IoHash Hash = HashBuffer(Chunk); if (Chunks.contains(Hash)) { continue; } Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; break; } while (true) { IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); IoHash Hash = HashBuffer(Chunk); if (Chunks.contains(Hash)) { continue; } Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; break; } } CreateDirectories(TempDir.Path()); WorkerThreadPool ThreadPool(4); CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path()); { std::atomic WorkCompleted = 0; for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size()) { Sleep(1); } } const uint64_t TotalSize = Zcs.StorageSize().DiskSize; CHECK_EQ(kChunkSize * Chunks.size(), TotalSize); { std::atomic WorkCompleted = 0; for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { std::string Bucket = Chunk.second.Bucket; IoHash ChunkHash = Chunk.first; ZenCacheValue CacheValue; CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); IoHash Hash = IoHash::HashBuffer(CacheValue.Value); CHECK(ChunkHash == Hash); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < Chunks.size()) { Sleep(1); } } std::unordered_map GcChunkHashes; GcChunkHashes.reserve(Chunks.size()); for (const auto& Chunk : Chunks) { GcChunkHashes[Chunk.first] = Chunk.second.Bucket; } { std::unordered_map NewChunks; for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { { IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); IoHash Hash = HashBuffer(Chunk); NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; } { IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); IoHash Hash = HashBuffer(Chunk); NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; } } std::atomic WorkCompleted = 0; std::atomic_uint32_t AddedChunkCount = 0; for (const auto& Chunk : NewChunks) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); AddedChunkCount.fetch_add(1); WorkCompleted.fetch_add(1); }); } for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { ZenCacheValue CacheValue; if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); } WorkCompleted.fetch_add(1); }); } while (AddedChunkCount.load() < NewChunks.size()) { // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope for (const auto& Chunk : NewChunks) { ZenCacheValue CacheValue; if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { GcChunkHashes[Chunk.first] = Chunk.second.Bucket; } } std::vector KeepHashes; KeepHashes.reserve(GcChunkHashes.size()); for (const auto& Entry : GcChunkHashes) { KeepHashes.push_back(Entry.first); } size_t C = 0; while (C < KeepHashes.size()) { if (C % 155 == 0) { if (C < KeepHashes.size() - 1) { KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } if (C + 3 < KeepHashes.size() - 1) { KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } } C++; } GcContext GcCtx; GcCtx.CollectSmallObjects(true); GcCtx.ContributeCas(KeepHashes); Zcs.CollectGarbage(GcCtx); CasChunkSet& Deleted = GcCtx.DeletedCas(); Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } while (WorkCompleted < NewChunks.size() + Chunks.size()) { Sleep(1); } { // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope for (const auto& Chunk : NewChunks) { ZenCacheValue CacheValue; if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) { GcChunkHashes[Chunk.first] = Chunk.second.Bucket; } } std::vector KeepHashes; KeepHashes.reserve(GcChunkHashes.size()); for (const auto& Entry : GcChunkHashes) { KeepHashes.push_back(Entry.first); } size_t C = 0; while (C < KeepHashes.size()) { if (C % 155 == 0) { if (C < KeepHashes.size() - 1) { KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } if (C + 3 < KeepHashes.size() - 1) { KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } } C++; } GcContext GcCtx; GcCtx.CollectSmallObjects(true); GcCtx.ContributeCas(KeepHashes); Zcs.CollectGarbage(GcCtx); CasChunkSet& Deleted = GcCtx.DeletedCas(); Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } } { std::atomic WorkCompleted = 0; for (const auto& Chunk : GcChunkHashes) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { ZenCacheValue CacheValue; CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); WorkCompleted.fetch_add(1); }); } while (WorkCompleted < GcChunkHashes.size()) { Sleep(1); } } } } TEST_CASE("z$.namespaces") { using namespace testutils; const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector Buf; Buf.resize(Size); CbObjectWriter Writer; Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); return Writer.Save(); }; ScopedTemporaryDirectory TempDir; CreateDirectories(TempDir.Path()); IoHash Key1; IoHash Key2; { CasGc Gc; ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}); const auto Bucket = "teardrinker"sv; const auto CustomNamespace = "mynamespace"sv; // Create a cache record Key1 = CreateKey(42); CbObject CacheValue = CreateCacheValue(4096); IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue); ZenCacheValue GetValue; CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); // This should just be dropped as we don't allow creating of namespaces on the fly Zcs.Put(CustomNamespace, Bucket, Key1, PutValue); CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); } { CasGc Gc; ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); const auto Bucket = "teardrinker"sv; const auto CustomNamespace = "mynamespace"sv; Key2 = CreateKey(43); CbObject CacheValue2 = CreateCacheValue(4096); IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); Buffer2.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue2 = {.Value = Buffer2}; Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2); ZenCacheValue GetValue; CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); CHECK(Zcs.Get(CustomNamespace, Bucket, Key2, GetValue)); } } TEST_CASE("z$.drop.bucket") { using namespace testutils; const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector Buf; Buf.resize(Size); CbObjectWriter Writer; Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); return Writer.Save(); }; ScopedTemporaryDirectory TempDir; CreateDirectories(TempDir.Path()); IoHash Key1; IoHash Key2; auto PutValue = [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { // Create a cache record IoHash Key = CreateKey(KeyIndex); CbObject CacheValue = CreateCacheValue(Size); IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; Zcs.Put(Namespace, Bucket, Key, PutValue); return Key; }; auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { ZenCacheValue GetValue; Zcs.Get(Namespace, Bucket, Key, GetValue); return GetValue; }; WorkerThreadPool Workers(1); { CasGc Gc; ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); const auto Bucket = "teardrinker"sv; const auto Namespace = "mynamespace"sv; Key1 = PutValue(Zcs, Namespace, Bucket, 42, 4096); Key2 = PutValue(Zcs, Namespace, Bucket, 43, 2048); ZenCacheValue Value1 = GetValue(Zcs, Namespace, Bucket, Key1); CHECK(Value1.Value); std::atomic_bool WorkComplete = false; Workers.ScheduleWork([&]() { zen::Sleep(100); Value1.Value = IoBuffer{}; WorkComplete = true; }); // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket // Our DropBucket execution blocks any incoming request from completing until we are done with the drop CHECK(Zcs.DropBucket(Namespace, Bucket)); while (!WorkComplete) { zen::Sleep(1); } // Entire bucket should be dropped, but doing a request should will re-create the namespace but it must still be empty Value1 = GetValue(Zcs, Namespace, Bucket, Key1); CHECK(!Value1.Value); ZenCacheValue Value2 = GetValue(Zcs, Namespace, Bucket, Key2); CHECK(!Value2.Value); } } TEST_CASE("z$.drop.namespace") { using namespace testutils; const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector Buf; Buf.resize(Size); CbObjectWriter Writer; Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); return Writer.Save(); }; ScopedTemporaryDirectory TempDir; CreateDirectories(TempDir.Path()); auto PutValue = [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { // Create a cache record IoHash Key = CreateKey(KeyIndex); CbObject CacheValue = CreateCacheValue(Size); IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; Zcs.Put(Namespace, Bucket, Key, PutValue); return Key; }; auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { ZenCacheValue GetValue; Zcs.Get(Namespace, Bucket, Key, GetValue); return GetValue; }; WorkerThreadPool Workers(1); { CasGc Gc; ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); const auto Bucket1 = "teardrinker1"sv; const auto Bucket2 = "teardrinker2"sv; const auto Namespace1 = "mynamespace1"sv; const auto Namespace2 = "mynamespace2"sv; IoHash Key1 = PutValue(Zcs, Namespace1, Bucket1, 42, 4096); IoHash Key2 = PutValue(Zcs, Namespace1, Bucket2, 43, 2048); IoHash Key3 = PutValue(Zcs, Namespace2, Bucket1, 44, 4096); IoHash Key4 = PutValue(Zcs, Namespace2, Bucket2, 45, 2048); ZenCacheValue Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1); CHECK(Value1.Value); ZenCacheValue Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2); CHECK(Value2.Value); ZenCacheValue Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3); CHECK(Value3.Value); ZenCacheValue Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4); CHECK(Value4.Value); std::atomic_bool WorkComplete = false; Workers.ScheduleWork([&]() { zen::Sleep(100); Value1.Value = IoBuffer{}; Value2.Value = IoBuffer{}; Value3.Value = IoBuffer{}; Value4.Value = IoBuffer{}; WorkComplete = true; }); // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket // Our DropBucket execution blocks any incoming request from completing until we are done with the drop CHECK(Zcs.DropNamespace(Namespace1)); while (!WorkComplete) { zen::Sleep(1); } // Entire namespace should be dropped, but doing a request should will re-create the namespace but it must still be empty Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1); CHECK(!Value1.Value); Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2); CHECK(!Value2.Value); Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3); CHECK(Value3.Value); Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4); CHECK(Value4.Value); } } TEST_CASE("z$.blocked.disklayer.put") { ScopedTemporaryDirectory TempDir; GcStorageSize CacheSize; const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector Buf; Buf.resize(Size, Size & 0xff); CbObjectWriter Writer; Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); return Writer.Save(); }; CasGc Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(64 * 1024 + 64); IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); size_t Key = Buffer.Size(); IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t)); Zcs.Put("test_bucket", HashKey, {.Value = Buffer}); ZenCacheValue BufferGet; CHECK(Zcs.Get("test_bucket", HashKey, BufferGet)); CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1); IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); Buffer2.SetContentType(ZenContentType::kCbObject); // We should be able to overwrite even if the file is open for read Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}); MemoryView OldView = BufferGet.Value.GetView(); ZenCacheValue BufferGet2; CHECK(Zcs.Get("test_bucket", HashKey, BufferGet2)); MemoryView NewView = BufferGet2.Value.GetView(); // Make sure file openend for read before we wrote it still have old data CHECK(OldView.GetSize() == Buffer.GetSize()); CHECK(memcmp(OldView.GetData(), Buffer.GetData(), OldView.GetSize()) == 0); // Make sure we get the new data when reading after we write new data CHECK(NewView.GetSize() == Buffer2.GetSize()); CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0); } #endif void z$_forcelink() { } } // namespace zen