// Copyright Epic Games, Inc. All Rights Reserved. #include "structuredcachestore.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_PLATFORM_WINDOWS # include #endif ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END ////////////////////////////////////////////////////////////////////////// namespace zen { namespace fs = std::filesystem; static CbObject LoadCompactBinaryObject(const fs::path& Path) { FileContents Result = ReadFile(Path); if (!Result.ErrorCode) { IoBuffer Buffer = Result.Flatten(); if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) { return LoadCompactBinaryObject(Buffer); } } return CbObject(); } static void SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) { WriteFile(Path, Object.GetBuffer().AsIoBuffer()); } ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) : GcStorage(Gc) , GcContributor(Gc) , m_RootDir(RootDir) , m_DiskLayer(RootDir) { ZEN_INFO("initializing structured cache at '{}'", RootDir); CreateDirectories(RootDir); m_DiskLayer.DiscoverBuckets(); #if ZEN_USE_CACHE_TRACKER m_AccessTracker.reset(new ZenCacheTracker(RootDir)); #endif } ZenCacheStore::~ZenCacheStore() { } bool ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { ZEN_TRACE_CPU("Z$::Get"); bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); #if ZEN_USE_CACHE_TRACKER auto _ = MakeGuard([&] { if (!Ok) return; m_AccessTracker->TrackAccess(InBucket, HashKey); }); #endif if (Ok) { ZEN_ASSERT(OutValue.Value.Size()); return true; } Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); if (Ok) { ZEN_ASSERT(OutValue.Value.Size()); if (OutValue.Value.Size() <= m_DiskLayerSizeThreshold) { m_MemLayer.Put(InBucket, HashKey, OutValue); } } return Ok; } void ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { ZEN_TRACE_CPU("Z$::Put"); // Store value and index ZEN_ASSERT(Value.Value.Size()); m_DiskLayer.Put(InBucket, HashKey, Value); #if ZEN_USE_REF_TRACKING if (Value.Value.GetContentType() == ZenContentType::kCbObject) { if (ValidateCompactBinary(Value.Value, CbValidateMode::All) == CbValidateError::None) { CbObject Object{SharedBuffer(Value.Value)}; uint8_t TempBuffer[8 * sizeof(IoHash)]; std::pmr::monotonic_buffer_resource Linear{TempBuffer, sizeof TempBuffer}; std::pmr::polymorphic_allocator Allocator{&Linear}; std::pmr::vector CidReferences{Allocator}; Object.IterateAttachments([&](CbFieldView Field) { CidReferences.push_back(Field.AsAttachment()); }); m_Gc.OnNewCidReferences(CidReferences); } } #endif if (Value.Value.Size() <= m_DiskLayerSizeThreshold) { m_MemLayer.Put(InBucket, HashKey, Value); } } bool ZenCacheStore::DropBucket(std::string_view Bucket) { ZEN_INFO("dropping bucket '{}'", Bucket); // TODO: should ensure this is done atomically across all layers const bool MemDropped = m_MemLayer.DropBucket(Bucket); const bool DiskDropped = m_DiskLayer.DropBucket(Bucket); const bool AnyDropped = MemDropped || DiskDropped; ZEN_INFO("bucket '{}' was {}", Bucket, AnyDropped ? "dropped" : "not found"); return AnyDropped; } void ZenCacheStore::Flush() { m_DiskLayer.Flush(); } void ZenCacheStore::Scrub(ScrubContext& Ctx) { if (m_LastScrubTime == Ctx.ScrubTimestamp()) { return; } m_LastScrubTime = Ctx.ScrubTimestamp(); m_DiskLayer.Scrub(Ctx); m_MemLayer.Scrub(Ctx); } void ZenCacheStore::GatherReferences(GcContext& GcCtx) { Stopwatch Timer; const auto Guard = MakeGuard( [this, &Timer] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); access_tracking::AccessTimes AccessTimes; m_MemLayer.GatherAccessTimes(AccessTimes); m_DiskLayer.UpdateAccessTimes(AccessTimes); m_DiskLayer.GatherReferences(GcCtx); } void ZenCacheStore::CollectGarbage(GcContext& GcCtx) { m_MemLayer.Reset(); m_DiskLayer.CollectGarbage(GcCtx); } GcStorageSize ZenCacheStore::StorageSize() const { return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()}; } ////////////////////////////////////////////////////////////////////////// ZenCacheMemoryLayer::ZenCacheMemoryLayer() { } ZenCacheMemoryLayer::~ZenCacheMemoryLayer() { } bool ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); if (it == m_Buckets.end()) { return false; } CacheBucket* Bucket = &it->second; _.ReleaseNow(); // There's a race here. Since the lock is released early to allow // inserts, the bucket delete path could end up deleting the // underlying data structure return Bucket->Get(HashKey, OutValue); } void ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) { Bucket = &It->second; } } if (Bucket == nullptr) { // New bucket RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) { Bucket = &It->second; } else { Bucket = &m_Buckets[std::string(InBucket)]; } } // Note that since the underlying IoBuffer is retained, the content type is also Bucket->Put(HashKey, Value); } bool ZenCacheMemoryLayer::DropBucket(std::string_view Bucket) { RwLock::ExclusiveLockScope _(m_Lock); return !!m_Buckets.erase(std::string(Bucket)); } void ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { Kv.second.Scrub(Ctx); } } void ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes) { using namespace zen::access_tracking; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { std::vector& Bucket = AccessTimes.Buckets[Kv.first]; Kv.second.GatherAccessTimes(Bucket); } } void ZenCacheMemoryLayer::Reset() { RwLock::ExclusiveLockScope _(m_Lock); m_Buckets.clear(); } uint64_t ZenCacheMemoryLayer::TotalSize() const { uint64_t TotalSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { TotalSize += Kv.second.TotalSize(); } return TotalSize; } void ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_BucketLock); std::vector BadHashes; for (auto& Kv : m_CacheMap) { if (Kv.first != IoHash::HashBuffer(Kv.second.Payload)) { BadHashes.push_back(Kv.first); } } if (!BadHashes.empty()) { Ctx.ReportBadCasChunks(BadHashes); } } void ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector& AccessTimes) { RwLock::SharedLockScope _(m_BucketLock); std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [](const auto& Kv) { return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = Kv.second.LastAccess}; }); } bool ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { RwLock::SharedLockScope _(m_BucketLock); if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) { BucketValue& Value = It.value(); OutValue.Value = Value.Payload; Value.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); return true; } return false; } void ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { { RwLock::ExclusiveLockScope _(m_BucketLock); m_CacheMap.insert_or_assign(HashKey, BucketValue(Value.Value, GcClock::TickCount())); } m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order::relaxed); } ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)) { } ZenCacheDiskLayer::CacheBucket::~CacheBucket() { } bool ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) { if (std::filesystem::exists(BucketDir)) { DeleteDirectories(BucketDir); return true; } return false; } void ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { using namespace std::literals; CreateDirectories(BucketDir); std::filesystem::path ManifestPath{BucketDir / "zen_manifest"}; bool IsNew = false; CbObject Manifest = LoadCompactBinaryObject(ManifestPath); if (Manifest) { m_BucketId = Manifest["BucketId"].AsObjectId(); m_IsOk = m_BucketId != Oid::Zero; } else if (AllowCreate) { m_BucketId.Generate(); CbObjectWriter Writer; Writer << "BucketId"sv << m_BucketId; Manifest = Writer.Save(); SaveCompactBinaryObject(ManifestPath, Manifest); IsNew = true; } else { return; } OpenLog(BucketDir, IsNew); for (CbFieldView Entry : Manifest["Timestamps"]) { const CbObjectView Obj = Entry.AsObjectView(); const IoHash Key = Obj["Key"sv].AsHash(); if (auto It = m_Index.find(Key); It != m_Index.end()) { It.value().LastAccess.store(Obj["LastAccess"sv].AsInt64(), std::memory_order_relaxed); } } m_IsOk = true; } void ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) { m_BucketDir = BucketDir; uint64_t MaxFileOffset = 0; uint64_t InvalidEntryCount = 0; m_SobsCursor = 0; m_TotalSize = 0; m_Index.clear(); std::filesystem::path SobsPath{BucketDir / "zen.sobs"}; std::filesystem::path SlogPath{BucketDir / "zen.slog"}; m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); m_SlogFile.Replay( [&](const DiskIndexEntry& Entry) { if (Entry.Key == IoHash::Zero) { ++InvalidEntryCount; } else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) { m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); } else { m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); } MaxFileOffset = std::max(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size()); }, 0); if (InvalidEntryCount) { ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, SlogPath); } m_SobsCursor = (MaxFileOffset + 15) & ~15; } void ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) { char HexString[sizeof(HashKey.Hash) * 2]; ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); Path.Append(m_BucketDir); Path.Append(L"/blob/"); Path.AppendAsciiRange(HexString, HexString + 3); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 3, HexString + 5); Path.AppendSeparator(); Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); } bool ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue) { if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { return false; } OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size()); OutValue.Value.SetContentType(Loc.GetContentType()); return true; } bool ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue) { ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) { OutValue.Value = Data; OutValue.Value.SetContentType(Loc.GetContentType()); return true; } return false; } void ZenCacheDiskLayer::CacheBucket::DeleteStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, const fs::path& Path, std::error_code& Ec) { ZEN_DEBUG("deleting standalone cache file '{}'", Path); fs::remove(Path, Ec); if (!Ec) { m_SlogFile.Append(DiskIndexEntry{.Key = HashKey, .Location = {0, Loc.Size(), 0, DiskLocation::kTombStone}}); m_Index.erase(HashKey); m_TotalSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); } } bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { if (!m_IsOk) { return false; } RwLock::SharedLockScope _(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) { IndexEntry& Entry = It.value(); Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); if (GetInlineCacheValue(Entry.Location, OutValue)) { return true; } _.ReleaseNow(); return GetStandaloneCacheValue(Entry.Location, HashKey, OutValue); } return false; } void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { if (!m_IsOk) { return; } if (Value.Value.Size() >= m_LargeObjectThreshold) { return PutStandaloneCacheValue(HashKey, Value); } else { // Small object put uint64_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } RwLock::ExclusiveLockScope _(m_IndexLock); DiskLocation Loc(m_SobsCursor, Value.Value.Size(), 0, EntryFlags); m_SobsCursor = RoundUp(m_SobsCursor + Loc.Size(), 16); if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object m_Index.insert({HashKey, {Loc, GcClock::TickCount()}}); } else { // TODO: should check if write is idempotent and bail out if it is? // this would requiring comparing contents on disk unless we add a // content hash to the index entry IndexEntry& Entry = It.value(); Entry.Location = Loc; Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset()); m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); } } void ZenCacheDiskLayer::CacheBucket::Drop() { // TODO: add error handling m_SobsFile.Close(); m_SlogFile.Close(); DeleteDirectories(m_BucketDir); } void ZenCacheDiskLayer::CacheBucket::Flush() { RwLock::SharedLockScope _(m_IndexLock); m_SobsFile.Flush(); m_SlogFile.Flush(); SaveManifest(); } void ZenCacheDiskLayer::CacheBucket::SaveManifest() { using namespace std::literals; CbObjectWriter Writer; Writer << "BucketId"sv << m_BucketId; if (!m_Index.empty()) { Writer.BeginArray("Timestamps"sv); for (auto& Kv : m_Index) { const IoHash& Key = Kv.first; const IndexEntry& Entry = Kv.second; Writer.BeginObject(); Writer << "Key"sv << Key; Writer << "LastAccess"sv << Entry.LastAccess; Writer.EndObject(); } Writer.EndArray(); } SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); } void ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { std::vector BadKeys; { RwLock::SharedLockScope _(m_IndexLock); for (auto& Kv : m_Index) { const IoHash& HashKey = Kv.first; const DiskLocation& Loc = Kv.second.Location; ZenCacheValue Value; if (GetInlineCacheValue(Loc, Value)) { // Validate contents } else if (GetStandaloneCacheValue(Loc, HashKey, Value)) { // Note: we cannot currently validate contents since we don't // have a content hash! } else { // Value not found BadKeys.push_back(HashKey); } } } if (BadKeys.empty()) { return; } if (Ctx.RunRecovery()) { RwLock::ExclusiveLockScope _(m_IndexLock); for (const IoHash& BadKey : BadKeys) { // Log a tombstone and delete the in-memory index for the bad entry const auto It = m_Index.find(BadKey); const DiskLocation& Location = It->second.Location; m_SlogFile.Append(DiskIndexEntry{.Key = BadKey, .Location = {Location.Offset(), Location.Size(), 0, DiskLocation::kTombStone}}); m_Index.erase(BadKey); } } } void ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); Stopwatch Timer; const auto Guard = MakeGuard( [this, &Timer] { ZEN_INFO("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); const GcClock::TimePoint ExpireTime = GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration(); const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); RwLock::SharedLockScope _(m_IndexLock); std::vector ValidKeys; std::vector ExpiredKeys; std::vector Cids; std::vector Entries(m_Index.begin(), m_Index.end()); std::sort(Entries.begin(), Entries.end(), [](const auto& LHS, const auto& RHS) { return LHS.second.LastAccess < RHS.second.LastAccess; }); const auto ValidIt = std::lower_bound(Entries.begin(), Entries.end(), ExpireTicks, [](const auto& Kv, auto Ticks) { const IndexEntry& Entry = Kv.second; return Entry.LastAccess < Ticks; }); Cids.reserve(1024); for (auto Kv = ValidIt; Kv != Entries.end(); ++Kv) { const IoHash& Key = Kv->first; const DiskLocation& Loc = Kv->second.Location; if (Loc.IsFlagSet(DiskLocation::kStructured)) { ZenCacheValue CacheValue; if (!GetInlineCacheValue(Loc, CacheValue)) { GetStandaloneCacheValue(Loc, Key, CacheValue); } if (CacheValue.Value) { ZEN_ASSERT(CacheValue.Value.GetContentType() == ZenContentType::kCbObject); if (Cids.size() > 1024) { GcCtx.ContributeCids(Cids); Cids.clear(); } CbObject Obj(SharedBuffer{CacheValue.Value}); Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); } } } ValidKeys.reserve(std::distance(ValidIt, Entries.end())); ExpiredKeys.reserve(std::distance(Entries.begin(), ValidIt)); std::transform(ValidIt, Entries.end(), std::back_inserter(ValidKeys), [](const auto& Kv) { return Kv.first; }); std::transform(Entries.begin(), ValidIt, std::back_inserter(ExpiredKeys), [](const auto& Kv) { return Kv.first; }); GcCtx.ContributeCids(Cids); GcCtx.ContributeCacheKeys(m_BucketName, std::move(ValidKeys), std::move(ExpiredKeys)); } void ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); Flush(); RwLock::ExclusiveLockScope _(m_IndexLock); const uint64_t OldCount = m_Index.size(); const uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir); Stopwatch Timer; const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize] { const uint64_t NewCount = m_Index.size(); const uint64_t NewTotalSize = m_TotalSize.load(std::memory_order::relaxed); ZEN_INFO("garbage collect from '{}' DONE after {}, collected {} ({}) chunks of total {} ({})", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), OldCount - NewCount, NiceBytes(OldTotalSize - NewTotalSize), OldCount, NiceBytes(OldTotalSize)); SaveManifest(); }); if (m_Index.empty()) { return; } auto AddEntries = [this](std::span Keys, std::vector& OutEntries) { for (const IoHash& Key : Keys) { if (auto It = m_Index.find(Key); It != m_Index.end()) { OutEntries.push_back(*It); } } }; std::vector ValidEntries; std::vector ExpiredEntries; AddEntries(GcCtx.ValidCacheKeys(m_BucketName), ValidEntries); AddEntries(GcCtx.ExpiredCacheKeys(m_BucketName), ExpiredEntries); // Remove all standalone file(s) // NOTE: This can probably be made asynchronously { std::error_code Ec; ExtendablePathBuilder<256> Path; for (const auto& Entry : ExpiredEntries) { const IoHash& Key = Entry.first; const DiskLocation& Loc = Entry.second.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { Path.Reset(); BuildPath(Path, Key); // NOTE: this will update index and log file DeleteStandaloneCacheValue(Loc, Key, Path.c_str(), Ec); if (Ec) { ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", Path.ToUtf8(), Ec.message()); Ec.clear(); } } } } if (GcCtx.CollectSmallObjects() && !ExpiredEntries.empty()) { // Naive GC implementation of small objects. Needs enough free // disk space to store intermediate sob container along side the // old container const auto ResetSobStorage = [this, &ValidEntries]() { m_SobsFile.Close(); m_SlogFile.Close(); const bool IsNew = true; m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); m_SobsCursor = 0; m_TotalSize = 0; m_Index.clear(); for (const auto& Entry : ValidEntries) { const IoHash& Key = Entry.first; const DiskLocation& Loc = Entry.second.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { m_SlogFile.Append({.Key = Key, .Location = Loc}); m_Index.insert({Key, {Loc, GcClock::TickCount()}}); m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); } } }; uint64_t NewContainerSize{}; for (const auto& Entry : ValidEntries) { const DiskLocation& Loc = Entry.second.Location; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false) { NewContainerSize += (Loc.Size() + sizeof(DiskLocation)); } } if (NewContainerSize == 0) { ResetSobStorage(); return; } const uint64_t DiskSpaceMargin = (256 << 10); std::error_code Ec; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Ec); if (Ec || Space.Free < NewContainerSize + DiskSpaceMargin) { ZEN_WARN("garbage collect z$ bucket '{}' FAILED, not enough disk space {}/{} (required/free)", m_BucketDir, NiceBytes(NewContainerSize), NiceBytes(Space.Free)); return; } std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"}; std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"}; // Copy non expired sob(s) to temporary sob container { BasicFile TmpSobs; TCasLogFile TmpLog; uint64_t TmpCursor{}; std::vector Chunk; TmpSobs.Open(TmpSobsPath, BasicFile::Mode::kTruncate); TmpLog.Open(TmpSlogPath, CasLogFile::Mode::kTruncate); for (const auto& Entry : ValidEntries) { const IoHash& Key = Entry.first; const DiskLocation& Loc = Entry.second.Location; DiskLocation NewLoc; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { NewLoc = DiskLocation(0, Loc.Size(), 0, Loc.GetFlags()); } else { Chunk.resize(Loc.Size()); m_SobsFile.Read(Chunk.data(), Chunk.size(), Loc.Offset()); NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, Loc.GetFlags()); TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor); TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16); } TmpLog.Append(DiskIndexEntry{.Key = Key, .Location = NewLoc}); } } // Swap state try { fs::path SobsPath{m_BucketDir / "zen.sobs"}; fs::path SlogPath{m_BucketDir / "zen.slog"}; m_SobsFile.Close(); m_SlogFile.Close(); fs::remove(SobsPath); fs::remove(SlogPath); fs::rename(TmpSobsPath, SobsPath); fs::rename(TmpSlogPath, SlogPath); const bool IsNew = false; OpenLog(m_BucketDir, IsNew); } catch (std::exception& Err) { ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); ResetSobStorage(); } } } void ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector& AccessTimes) { using namespace access_tracking; for (const KeyAccessTime& KeyTime : AccessTimes) { if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end()) { IndexEntry& Entry = It.value(); Entry.LastAccess.store(KeyTime.LastAccess, std::memory_order_relaxed); } } } void ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { Kv.second.CollectGarbage(GcCtx); } } void ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes) { RwLock::SharedLockScope _(m_Lock); for (const auto& Kv : AccessTimes.Buckets) { if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) { CacheBucket& Bucket = It->second; Bucket.UpdateAccessTimes(Kv.second); } } } void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); TemporaryFile DataFile; std::error_code Ec; DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to open temporary file for put at '{}'", m_BucketDir)); } DataFile.WriteAll(Value.Value, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to file", NiceBytes(Value.Value.Size()))); } // Move file into place (atomically) std::filesystem::path FsPath{DataFilePath.ToPath()}; DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (Ec) { int RetryCount = 3; do { std::filesystem::path ParentPath = FsPath.parent_path(); CreateDirectories(ParentPath); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (!Ec) { break; } std::error_code InnerEc; const uint64_t ExistingFileSize = std::filesystem::file_size(FsPath, InnerEc); if (!InnerEc && ExistingFileSize == Value.Value.Size()) { // Concurrent write of same value? return; } // Semi arbitrary back-off zen::Sleep(1000 * RetryCount); } while (RetryCount--); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to finalize file '{}'", DataFilePath.ToUtf8())); } } // Update index uint64_t EntryFlags = DiskLocation::kStandaloneFile; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } RwLock::ExclusiveLockScope _(m_IndexLock); DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags); IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); if (auto It = m_Index.find(HashKey); It == m_Index.end()) { // Previously unknown object m_Index.insert({HashKey, Entry}); } else { // TODO: should check if write is idempotent and bail out if it is? It.value() = Entry; } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); } ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) { } ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(BucketName); if (it != m_Buckets.end()) { Bucket = &it->second; } } if (Bucket == nullptr) { // Bucket needs to be opened/created RwLock::ExclusiveLockScope _(m_Lock); if (auto it = m_Buckets.find(BucketName); it != m_Buckets.end()) { Bucket = &it->second; } else { auto It = m_Buckets.try_emplace(BucketName, BucketName); Bucket = &It.first->second; std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; Bucket->OpenOrCreate(BucketPath); } } ZEN_ASSERT(Bucket != nullptr); return Bucket->Get(HashKey, OutValue); } void ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { const auto BucketName = std::string(InBucket); CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(BucketName); if (it != m_Buckets.end()) { Bucket = &it->second; } } if (Bucket == nullptr) { // New bucket needs to be created RwLock::ExclusiveLockScope _(m_Lock); if (auto it = m_Buckets.find(BucketName); it != m_Buckets.end()) { Bucket = &it->second; } else { auto It = m_Buckets.try_emplace(BucketName, BucketName); Bucket = &It.first->second; std::filesystem::path bucketPath = m_RootDir; bucketPath /= BucketName; Bucket->OpenOrCreate(bucketPath); } } ZEN_ASSERT(Bucket != nullptr); if (Bucket->IsOk()) { Bucket->Put(HashKey, Value); } } void ZenCacheDiskLayer::DiscoverBuckets() { FileSystemTraversal Traversal; struct Visitor : public FileSystemTraversal::TreeVisitor { virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent, [[maybe_unused]] const path_view& File, [[maybe_unused]] uint64_t FileSize) override { } virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override { Dirs.push_back((decltype(Dirs)::value_type)(DirectoryName)); return false; } std::vector Dirs; } Visit; Traversal.TraverseFileSystem(m_RootDir, Visit); // Initialize buckets RwLock::ExclusiveLockScope _(m_Lock); for (const auto& BucketName : Visit.Dirs) { // New bucket needs to be created #if ZEN_PLATFORM_WINDOWS std::string BucketName8 = WideToUtf8(BucketName); #else const auto& BucketName8 = BucketName; #endif if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end()) { } else { auto InsertResult = m_Buckets.try_emplace(BucketName8, BucketName8); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName8; CacheBucket& Bucket = InsertResult.first->second; Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false); if (Bucket.IsOk()) { ZEN_INFO("Discovered bucket '{}'", BucketName8); } else { ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir); m_Buckets.erase(InsertResult.first); } } } } bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { RwLock::ExclusiveLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); if (it != m_Buckets.end()) { CacheBucket* Bucket = &it->second; Bucket->Drop(); m_Buckets.erase(it); return true; } std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); return CacheBucket::Delete(BucketPath); } void ZenCacheDiskLayer::Flush() { std::vector Buckets; Buckets.reserve(m_Buckets.size()); { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { Buckets.push_back(&Kv.second); } } for (auto& Bucket : Buckets) { Bucket->Flush(); } } void ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { Kv.second.Scrub(Ctx); } } void ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { Kv.second.GatherReferences(GcCtx); } } uint64_t ZenCacheDiskLayer::TotalSize() const { uint64_t TotalSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { TotalSize += Kv.second.TotalSize(); } return TotalSize; } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS using namespace std::literals; namespace testutils { IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); } IoBuffer CreateBinaryCacheValue(uint64_t Size) { std::vector Data(size_t(Size / sizeof(uint32_t))); std::generate(Data.begin(), Data.end(), [Idx = 0]() mutable { return Idx++; }); IoBuffer Buf(IoBuffer::Clone, Data.data(), Data.size() * sizeof(uint32_t)); Buf.SetContentType(ZenContentType::kBinary); return Buf; }; } // namespace testutils TEST_CASE("z$.store") { ScopedTemporaryDirectory TempDir; CasGc Gc; ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); const int kIterationCount = 100; for (int i = 0; i < kIterationCount; ++i) { const IoHash Key = IoHash::HashBuffer(&i, sizeof i); CbObjectWriter Cbo; Cbo << "hey" << i; CbObject Obj = Cbo.Save(); ZenCacheValue Value; Value.Value = Obj.GetBuffer().AsIoBuffer(); Value.Value.SetContentType(ZenContentType::kCbObject); Zcs.Put("test_bucket"sv, Key, Value); } for (int i = 0; i < kIterationCount; ++i) { const IoHash Key = IoHash::HashBuffer(&i, sizeof i); ZenCacheValue Value; Zcs.Get("test_bucket"sv, Key, /* out */ Value); REQUIRE(Value.Value); CHECK(Value.Value.GetContentType() == ZenContentType::kCbObject); CHECK_EQ(ValidateCompactBinary(Value.Value, CbValidateMode::All), CbValidateError::None); CbObject Obj = LoadCompactBinaryObject(Value.Value); CHECK_EQ(Obj["hey"].AsInt32(), i); } } TEST_CASE("z$.size") { const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector Buf; Buf.resize(Size); CbObjectWriter Writer; Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); return Writer.Save(); }; SUBCASE("mem/disklayer") { const size_t Count = 16; ScopedTemporaryDirectory TempDir; GcStorageSize CacheSize; { CasGc Gc; ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256); IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); for (size_t Key = 0; Key < Count; ++Key) { const size_t Bucket = Key % 4; Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}); } CacheSize = Zcs.StorageSize(); CHECK_EQ(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_EQ(CacheValue.GetSize() * Count, CacheSize.MemorySize); } { CasGc Gc; ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); const GcStorageSize SerializedSize = Zcs.StorageSize(); CHECK_EQ(SerializedSize.MemorySize, 0); CHECK_EQ(SerializedSize.DiskSize, CacheSize.DiskSize); for (size_t Bucket = 0; Bucket < 4; ++Bucket) { Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); } CHECK_EQ(0, Zcs.StorageSize().DiskSize); } } SUBCASE("disklayer") { const size_t Count = 16; ScopedTemporaryDirectory TempDir; GcStorageSize CacheSize; { CasGc Gc; ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64); IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); for (size_t Key = 0; Key < Count; ++Key) { const size_t Bucket = Key % 4; Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}); } CacheSize = Zcs.StorageSize(); CHECK_EQ(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_EQ(0, CacheSize.MemorySize); } { CasGc Gc; ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); const GcStorageSize SerializedSize = Zcs.StorageSize(); CHECK_EQ(SerializedSize.MemorySize, 0); CHECK_EQ(SerializedSize.DiskSize, CacheSize.DiskSize); for (size_t Bucket = 0; Bucket < 4; ++Bucket) { Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); } CHECK_EQ(0, Zcs.StorageSize().DiskSize); } } } TEST_CASE("z$.gc") { using namespace testutils; SUBCASE("gather references does NOT add references for expired cache entries") { ScopedTemporaryDirectory TempDir; std::vector Cids{CreateKey(1), CreateKey(2), CreateKey(3)}; const auto CollectAndFilter = [](CasGc& Gc, GcClock::TimePoint Time, GcClock::Duration MaxDuration, std::span Cids, std::vector& OutKeep) { GcContext GcCtx(Time); GcCtx.MaxCacheDuration(MaxDuration); Gc.CollectGarbage(GcCtx); OutKeep.clear(); GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); }); }; { CasGc Gc; ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "teardrinker"sv; // Create a cache record const IoHash Key = CreateKey(42); CbObjectWriter Record; Record << "Key"sv << "SomeRecord"sv; for (size_t Idx = 0; auto& Cid : Cids) { Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid); } IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); Zcs.Put(Bucket, Key, {.Value = Buffer}); std::vector Keep; // Collect garbage with 1 hour max cache duration { CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); CHECK_EQ(Cids.size(), Keep.size()); } // Move forward in time { CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); CHECK_EQ(0, Keep.size()); } } // Expect timestamps to be serialized { CasGc Gc; ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); std::vector Keep; // Collect garbage with 1 hour max cache duration { CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); CHECK_EQ(3, Keep.size()); } // Move forward in time { CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); CHECK_EQ(0, Keep.size()); } } } SUBCASE("gc removes standalone values") { ScopedTemporaryDirectory TempDir; CasGc Gc; ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "fortysixandtwo"sv; const GcClock::TimePoint CurrentTime = GcClock::Now(); std::vector Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; for (const auto& Key : Keys) { IoBuffer Value = testutils::CreateBinaryCacheValue(128 << 10); Zcs.Put(Bucket, Key, {.Value = Value}); } { GcContext GcCtx; GcCtx.MaxCacheDuration(std::chrono::hours(46)); Gc.CollectGarbage(GcCtx); for (const auto& Key : Keys) { ZenCacheValue CacheValue; const bool Exists = Zcs.Get(Bucket, Key, CacheValue); CHECK(Exists); } } // Move forward in time and collect again { GcContext GcCtx(CurrentTime + std::chrono::hours(46)); GcCtx.MaxCacheDuration(std::chrono::minutes(2)); Gc.CollectGarbage(GcCtx); for (const auto& Key : Keys) { ZenCacheValue CacheValue; const bool Exists = Zcs.Get(Bucket, Key, CacheValue); CHECK(!Exists); } CHECK_EQ(0, Zcs.StorageSize().DiskSize); } } SUBCASE("gc removes small objects") { ScopedTemporaryDirectory TempDir; CasGc Gc; ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "rightintwo"sv; const GcClock::TimePoint CurrentTime = GcClock::Now(); std::vector Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; for (const auto& Key : Keys) { IoBuffer Value = testutils::CreateBinaryCacheValue(128); Zcs.Put(Bucket, Key, {.Value = Value}); } { GcContext GcCtx; GcCtx.MaxCacheDuration(std::chrono::hours(2)); GcCtx.CollectSmallObjects(true); Gc.CollectGarbage(GcCtx); for (const auto& Key : Keys) { ZenCacheValue CacheValue; const bool Exists = Zcs.Get(Bucket, Key, CacheValue); CHECK(Exists); } } // Move forward in time and collect again { GcContext GcCtx(CurrentTime + std::chrono::hours(2)); GcCtx.MaxCacheDuration(std::chrono::minutes(2)); GcCtx.CollectSmallObjects(true); Gc.CollectGarbage(GcCtx); for (const auto& Key : Keys) { ZenCacheValue CacheValue; const bool Exists = Zcs.Get(Bucket, Key, CacheValue); CHECK(!Exists); } CHECK_EQ(0, Zcs.StorageSize().DiskSize); } } } #endif void z$_forcelink() { } } // namespace zen