diff options
| author | Stefan Boberg <[email protected]> | 2023-05-17 10:31:50 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-17 10:31:50 +0200 |
| commit | 5adba30f4528a7d74090a8391d09b287501846a7 (patch) | |
| tree | 25476b8e49fb5a44170b4d181de60de1f2d88ebe /src/zenserver/cache/structuredcachestore.cpp | |
| parent | amended CHANGELOG.md with recent changes (diff) | |
| download | zen-5adba30f4528a7d74090a8391d09b287501846a7.tar.xz zen-5adba30f4528a7d74090a8391d09b287501846a7.zip | |
Restructured structured cache store (#314)
This change separates out the disk and memory storage strategies into separate cpp/h files to improve maintainability.
Diffstat (limited to 'src/zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 2424 |
1 files changed, 2 insertions, 2422 deletions
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index 440da3074..bc4248a8a 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -2,8 +2,6 @@ #include "structuredcachestore.h" -#include <zencore/except.h> - #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> @@ -17,12 +15,8 @@ #include <zencore/thread.h> #include <zencore/timer.h> #include <zencore/trace.h> -#include <zencore/workthreadpool.h> -#include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> -#include <xxhash.h> - #include <future> #include <limits> @@ -32,6 +26,7 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/core.h> +#include <xxhash.h> #include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END @@ -39,158 +34,12 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/testing.h> # include <zencore/testutils.h> # include <zencore/workthreadpool.h> +# include <zenstore/cidstore.h> # include <random> #endif -////////////////////////////////////////////////////////////////////////// - namespace zen { -namespace { - -#pragma pack(push) -#pragma pack(1) - - // We use this to indicate if a on disk bucket needs wiping - // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scrable the references - // to block items. - // See: https://github.com/EpicGames/zen/pull/299 - static const uint32_t CurrentDiskBucketVersion = 1; - - struct CacheBucketIndexHeader - { - static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; - static constexpr uint32_t Version2 = 2; - static constexpr uint32_t CurrentVersion = Version2; - - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - uint64_t EntryCount = 0; - uint64_t LogPosition = 0; - uint32_t PayloadAlignment = 0; - uint32_t Checksum = 0; - - static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) - { - return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); - } - }; - - static_assert(sizeof(CacheBucketIndexHeader) == 32); - -#pragma pack(pop) - - const char* IndexExtension = ".uidx"; - const char* LogExtension = ".slog"; - - std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - return BucketDir / (BucketName + IndexExtension); - } - - std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - return BucketDir / (BucketName + ".tmp"); - } - - std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - return BucketDir / (BucketName + LogExtension); - } - - bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) - { - if (Entry.Key == IoHash::Zero) - { - OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); - return false; - } - if (Entry.Location.Reserved != 0) - { - OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); - return false; - } - if (Entry.Location.GetFlags() & - ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) - { - OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); - return false; - } - if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) - { - return true; - } - if (Entry.Location.Reserved != 0) - { - OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); - return false; - } - uint64_t Size = Entry.Location.Size(); - if (Size == 0) - { - OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); - return false; - } - return true; - } - - bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) - { - int DropIndex = 0; - do - { - if (!std::filesystem::exists(Dir)) - { - return false; - } - - std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); - std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; - if (std::filesystem::exists(DroppedBucketPath)) - { - DropIndex++; - continue; - } - - std::error_code Ec; - std::filesystem::rename(Dir, DroppedBucketPath, Ec); - if (!Ec) - { - DeleteDirectories(DroppedBucketPath); - return true; - } - // TODO: Do we need to bail at some point? - zen::Sleep(100); - } while (true); - } - -} // namespace - -namespace fs = std::filesystem; - -static CbObject -LoadCompactBinaryObject(const fs::path& Path) -{ - FileContents Result = ReadFile(Path); - - if (!Result.ErrorCode) - { - IoBuffer Buffer = Result.Flatten(); - if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) - { - return LoadCompactBinaryObject(Buffer); - } - } - - return CbObject(); -} - -static void -SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) -{ - WriteFile(Path, Object.GetBuffer().AsIoBuffer()); -} - ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir) : GcStorage(Gc) , GcContributor(Gc) @@ -363,2275 +212,6 @@ ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const st return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter); } -////////////////////////////////////////////////////////////////////////// - -ZenCacheMemoryLayer::ZenCacheMemoryLayer() -{ -} - -ZenCacheMemoryLayer::~ZenCacheMemoryLayer() -{ -} - -bool -ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It == m_Buckets.end()) - { - return false; - } - - CacheBucket* Bucket = It->second.get(); - - _.ReleaseNow(); - - // There's a race here. Since the lock is released early to allow - // inserts, the bucket delete path could end up deleting the - // underlying data structure - - return Bucket->Get(HashKey, OutValue); -} - -void -ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) -{ - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - } - - if (Bucket == nullptr) - { - // New bucket - - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>()); - Bucket = InsertResult.first->second.get(); - } - } - - // Note that since the underlying IoBuffer is retained, the content type is also - - Bucket->Put(HashKey, Value); -} - -bool -ZenCacheMemoryLayer::DropBucket(std::string_view InBucket) -{ - RwLock::ExclusiveLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It); - Bucket.Drop(); - return true; - } - return false; -} - -void -ZenCacheMemoryLayer::Drop() -{ - RwLock::ExclusiveLockScope _(m_Lock); - std::vector<std::unique_ptr<CacheBucket>> Buckets; - Buckets.reserve(m_Buckets.size()); - while (!m_Buckets.empty()) - { - const auto& It = m_Buckets.begin(); - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It->first); - Bucket.Drop(); - } -} - -void -ZenCacheMemoryLayer::ScrubStorage(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - Kv.second->ScrubStorage(Ctx); - } -} - -void -ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes) -{ - using namespace zen::access_tracking; - - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first]; - Kv.second->GatherAccessTimes(Bucket); - } -} - -void -ZenCacheMemoryLayer::Reset() -{ - RwLock::ExclusiveLockScope _(m_Lock); - m_Buckets.clear(); -} - -uint64_t -ZenCacheMemoryLayer::TotalSize() const -{ - uint64_t TotalSize{}; - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - TotalSize += Kv.second->TotalSize(); - } - - return TotalSize; -} - -ZenCacheMemoryLayer::Info -ZenCacheMemoryLayer::GetInfo() const -{ - ZenCacheMemoryLayer::Info Info = {.Config = m_Configuration, .TotalSize = TotalSize()}; - - RwLock::SharedLockScope _(m_Lock); - Info.BucketNames.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Info.BucketNames.push_back(Kv.first); - Info.EntryCount += Kv.second->EntryCount(); - } - return Info; -} - -std::optional<ZenCacheMemoryLayer::BucketInfo> -ZenCacheMemoryLayer::GetBucketInfo(std::string_view Bucket) const -{ - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) - { - return ZenCacheMemoryLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; - } - return {}; -} - -void -ZenCacheMemoryLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_BucketLock); - - std::vector<IoHash> BadHashes; - - auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { - if (ContentType == ZenContentType::kCbObject) - { - CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); - return Error == CbValidateError::None; - } - if (ContentType == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - return false; - } - if (Hash != RawHash) - { - return false; - } - } - return true; - }; - - for (auto& Kv : m_CacheMap) - { - const BucketPayload& Payload = m_Payloads[Kv.second]; - if (!ValidateEntry(Kv.first, Payload.Payload.GetContentType(), Payload.Payload)) - { - BadHashes.push_back(Kv.first); - } - } - - if (!BadHashes.empty()) - { - Ctx.ReportBadCidChunks(BadHashes); - } -} - -void -ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) -{ - RwLock::SharedLockScope _(m_BucketLock); - std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [this](const auto& Kv) { - return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = m_AccessTimes[Kv.second]}; - }); -} - -bool -ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) -{ - RwLock::SharedLockScope _(m_BucketLock); - - if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) - { - uint32_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); - - const BucketPayload& Payload = m_Payloads[EntryIndex]; - OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash}; - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - - return true; - } - - return false; -} - -void -ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) -{ - size_t PayloadSize = Value.Value.GetSize(); - { - GcClock::Tick AccessTime = GcClock::TickCount(); - RwLock::ExclusiveLockScope _(m_BucketLock); - if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max()) - { - // No more space in our memory cache! - return; - } - if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) - { - uint32_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed); - BucketPayload& Payload = m_Payloads[EntryIndex]; - Payload.Payload = Value.Value; - Payload.RawHash = Value.RawHash; - Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize); - m_AccessTimes[EntryIndex] = AccessTime; - } - else - { - uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size()); - m_Payloads.emplace_back( - BucketPayload{.Payload = Value.Value, .RawSize = gsl::narrow<uint32_t>(Value.RawSize), .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(AccessTime); - m_CacheMap.insert_or_assign(HashKey, EntryIndex); - } - ZEN_ASSERT_SLOW(m_Payloads.size() == m_CacheMap.size()); - ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); - } - - m_TotalSize.fetch_add(PayloadSize, std::memory_order::relaxed); -} - -void -ZenCacheMemoryLayer::CacheBucket::Drop() -{ - RwLock::ExclusiveLockScope _(m_BucketLock); - m_CacheMap.clear(); - m_AccessTimes.clear(); - m_Payloads.clear(); - m_TotalSize.store(0); -} - -uint64_t -ZenCacheMemoryLayer::CacheBucket::EntryCount() const -{ - RwLock::SharedLockScope _(m_BucketLock); - return static_cast<uint64_t>(m_CacheMap.size()); -} - -////////////////////////////////////////////////////////////////////////// - -ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero) -{ - if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) - { - // This is pretty ad hoc but in order to avoid too many individual files - // it makes sense to have a different strategy for legacy values - m_LargeObjectThreshold = 16 * 1024 * 1024; - } -} - -ZenCacheDiskLayer::CacheBucket::~CacheBucket() -{ -} - -bool -ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) -{ - using namespace std::literals; - - ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate"); - - ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); - - m_BlocksBasePath = BucketDir / "blocks"; - m_BucketDir = BucketDir; - - CreateDirectories(m_BucketDir); - - std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; - - bool IsNew = false; - - CbObject Manifest = LoadCompactBinaryObject(ManifestPath); - - if (Manifest) - { - m_BucketId = Manifest["BucketId"sv].AsObjectId(); - if (m_BucketId == Oid::Zero) - { - return false; - } - // uint32_t Version = Manifest["Version"sv].AsUInt32(0); - // if (Version != CurrentDiskBucketVersion) - //{ - // ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion); - // IsNew = true; - // } - } - else if (AllowCreate) - { - m_BucketId.Generate(); - - CbObjectWriter Writer; - Writer << "BucketId"sv << m_BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; - Manifest = Writer.Save(); - SaveCompactBinaryObject(ManifestPath, Manifest); - IsNew = true; - } - else - { - return false; - } - - OpenLog(IsNew); - - if (!IsNew) - { - Stopwatch Timer; - const auto _ = - MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - - for (CbFieldView Entry : Manifest["Timestamps"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); - } - } - for (CbFieldView Entry : Manifest["RawInfo"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - const IoHash RawHash = Obj["RawHash"sv].AsHash(); - const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); - - if (RawHash == IoHash::Zero || RawSize == 0) - { - ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); - } - - m_Payloads[EntryIndex].RawHash = RawHash; - m_Payloads[EntryIndex].RawSize = RawSize; - } - } - } - - return true; -} - -void -ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() -{ - ZEN_TRACE_CPU("Z$::Bucket::MakeIndexSnapshot"); - - uint64_t LogCount = m_SlogFile.GetLogCount(); - if (m_LogFlushPosition == LogCount) - { - return; - } - - ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", - m_BucketDir, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - namespace fs = std::filesystem; - - fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(STmpIndexPath); - } - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, STmpIndexPath); - } - - try - { - // Write the current state of the location map to a new index state - std::vector<DiskIndexEntry> Entries; - Entries.resize(m_Index.size()); - - { - uint64_t EntryIndex = 0; - for (auto& Entry : m_Index) - { - DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Location = m_Payloads[Entry.second].Location; - } - } - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); - CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), - .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; - - Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); - - ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); - ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); - ObjectIndexFile.Flush(); - ObjectIndexFile.Close(); - EntryCount = Entries.size(); - m_LogFlushPosition = LogCount; - } - catch (std::exception& Err) - { - ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); - - // Restore any previous snapshot - - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(IndexPath); - fs::rename(STmpIndexPath, IndexPath); - } - } - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(STmpIndexPath); - } -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) -{ - ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); - - if (std::filesystem::is_regular_file(IndexPath)) - { - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CacheBucketIndexHeader)) - { - CacheBucketIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && - (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0)) - { - switch (Header.Version) - { - case CacheBucketIndexHeader::Version2: - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); - if (Header.EntryCount > ExpectedEntryCount) - { - break; - } - size_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", - IndexPath, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - m_PayloadAlignment = Header.PayloadAlignment; - - std::vector<DiskIndexEntry> Entries; - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), - Header.EntryCount * sizeof(DiskIndexEntry), - sizeof(CacheBucketIndexHeader)); - - m_Payloads.reserve(Header.EntryCount); - m_AccessTimes.reserve(Header.EntryCount); - m_Index.reserve(Header.EntryCount); - - std::string InvalidEntryReason; - for (const DiskIndexEntry& Entry : Entries) - { - if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(Entry.Key, EntryIndex); - EntryCount++; - } - OutVersion = CacheBucketIndexHeader::Version2; - return Header.LogPosition; - } - break; - default: - break; - } - } - } - ZEN_WARN("skipping invalid index file '{}'", IndexPath); - } - return 0; -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) -{ - ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); - - if (std::filesystem::is_regular_file(LogPath)) - { - uint64_t LogEntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - TCasLogFile<DiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - if (CasLog.Initialize()) - { - uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); - SkipEntryCount = 0; - } - LogEntryCount = EntryCount - SkipEntryCount; - m_Index.reserve(LogEntryCount); - uint64_t InvalidEntryCount = 0; - CasLog.Replay( - [&](const DiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Location.Flags & DiskLocation::kTombStone) - { - m_Index.erase(Record.Key); - return; - } - if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); - ++InvalidEntryCount; - return; - } - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(Record.Key, EntryIndex); - }, - SkipEntryCount); - if (InvalidEntryCount) - { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); - } - return LogEntryCount; - } - } - return 0; -}; - -void -ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) -{ - ZEN_TRACE_CPU("Z$::Bucket::OpenLog"); - - m_TotalStandaloneSize = 0; - - m_Index.clear(); - m_Payloads.clear(); - m_AccessTimes.clear(); - - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); - std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - - if (IsNew) - { - fs::remove(LogPath); - fs::remove(IndexPath); - fs::remove_all(m_BlocksBasePath); - } - - CreateDirectories(m_BucketDir); - - std::unordered_map<uint32_t, uint64_t> BlockSizes = - m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); - - if (std::filesystem::is_regular_file(IndexPath)) - { - uint32_t IndexVersion = 0; - m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); - if (IndexVersion == 0) - { - ZEN_WARN("removing invalid index file at '{}'", IndexPath); - std::filesystem::remove(IndexPath); - } - } - - uint64_t LogEntryCount = 0; - if (std::filesystem::is_regular_file(LogPath)) - { - if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath)) - { - LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); - } - else if (fs::is_regular_file(LogPath)) - { - ZEN_WARN("removing invalid cas log at '{}'", LogPath); - std::filesystem::remove(LogPath); - } - } - - m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - - std::vector<BlockStoreLocation> KnownLocations; - KnownLocations.reserve(m_Index.size()); - std::vector<DiskIndexEntry> BadEntries; - for (const auto& Entry : m_Index) - { - size_t EntryIndex = Entry.second; - const BucketPayload& Payload = m_Payloads[EntryIndex]; - const DiskLocation& Location = Payload.Location; - - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); - continue; - } - const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); - - auto BlockIt = BlockSizes.find(BlockLocation.BlockIndex); - if (BlockIt == BlockSizes.end()) - { - ZEN_WARN("Unknown block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); - } - else - { - uint64_t BlockSize = BlockIt->second; - if (BlockLocation.Offset + BlockLocation.Size > BlockSize) - { - ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); - } - else - { - KnownLocations.push_back(BlockLocation); - continue; - } - } - - DiskLocation NewLocation = Payload.Location; - NewLocation.Flags |= DiskLocation::kTombStone; - BadEntries.push_back(DiskIndexEntry{.Key = Entry.first, .Location = NewLocation}); - } - - if (!BadEntries.empty()) - { - m_SlogFile.Append(BadEntries); - m_SlogFile.Flush(); - - LogEntryCount += BadEntries.size(); - - for (const DiskIndexEntry& BadEntry : BadEntries) - { - m_Index.erase(BadEntry.Key); - } - } - - m_BlockStore.Prune(KnownLocations); - - if (IsNew || LogEntryCount > 0) - { - MakeIndexSnapshot(); - } - // TODO: should validate integrity of container files here -} - -void -ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const -{ - char HexString[sizeof(HashKey.Hash) * 2]; - ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); - - Path.Append(m_BucketDir); - Path.AppendSeparator(); - Path.Append(L"blob"); - Path.AppendSeparator(); - Path.AppendAsciiRange(HexString, HexString + 3); - Path.AppendSeparator(); - Path.AppendAsciiRange(HexString + 3, HexString + 5); - Path.AppendSeparator(); - Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); -} - -IoBuffer -ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const -{ - BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); - - IoBuffer Value = m_BlockStore.TryGetChunk(Location); - if (Value) - { - Value.SetContentType(Loc.GetContentType()); - } - - return Value; -} - -IoBuffer -ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const -{ - ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue"); - - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - - RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); - - if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) - { - Data.SetContentType(Loc.GetContentType()); - - return Data; - } - - return {}; -} - -bool -ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) -{ - RwLock::SharedLockScope _(m_IndexLock); - auto It = m_Index.find(HashKey); - if (It == m_Index.end()) - { - return false; - } - size_t EntryIndex = It.value(); - const BucketPayload& Payload = m_Payloads[EntryIndex]; - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - DiskLocation Location = Payload.Location; - OutValue.RawSize = Payload.RawSize; - OutValue.RawHash = Payload.RawHash; - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - // We don't need to hold the index lock when we read a standalone file - _.ReleaseNow(); - OutValue.Value = GetStandaloneCacheValue(Location, HashKey); - } - else - { - OutValue.Value = GetInlineCacheValue(Location); - } - _.ReleaseNow(); - - if (!Location.IsFlagSet(DiskLocation::kStructured)) - { - if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0) - { - if (Location.IsFlagSet(DiskLocation::kCompressed)) - { - (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize); - } - else - { - OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); - OutValue.RawSize = OutValue.Value.GetSize(); - } - RwLock::ExclusiveLockScope __(m_IndexLock); - if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) - { - BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; - WritePayload.RawHash = OutValue.RawHash; - WritePayload.RawSize = OutValue.RawSize; - - m_LogFlushPosition = 0; // Force resave of index on exit - } - } - } - - return (bool)OutValue.Value; -} - -void -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) -{ - if (Value.Value.Size() >= m_LargeObjectThreshold) - { - return PutStandaloneCacheValue(HashKey, Value); - } - PutInlineCacheValue(HashKey, Value); -} - -bool -ZenCacheDiskLayer::CacheBucket::Drop() -{ - ZEN_TRACE_CPU("Z$::Bucket::Drop"); - - RwLock::ExclusiveLockScope _(m_IndexLock); - - std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks; - ShardLocks.reserve(256); - for (RwLock& Lock : m_ShardedLocks) - { - ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock)); - } - m_BlockStore.Close(); - m_SlogFile.Close(); - - bool Deleted = MoveAndDeleteDirectory(m_BucketDir); - - m_Index.clear(); - m_Payloads.clear(); - m_AccessTimes.clear(); - return Deleted; -} - -void -ZenCacheDiskLayer::CacheBucket::Flush() -{ - ZEN_TRACE_CPU("Z$::Bucket::Flush"); - - m_BlockStore.Flush(); - - RwLock::SharedLockScope _(m_IndexLock); - m_SlogFile.Flush(); - MakeIndexSnapshot(); - SaveManifest(); -} - -void -ZenCacheDiskLayer::CacheBucket::SaveManifest() -{ - using namespace std::literals; - - ZEN_TRACE_CPU("Z$::Bucket::SaveManifest"); - - CbObjectWriter Writer; - Writer << "BucketId"sv << m_BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; - - if (!m_Index.empty()) - { - Writer.BeginArray("Timestamps"sv); - for (auto& Kv : m_Index) - { - const IoHash& Key = Kv.first; - GcClock::Tick AccessTime = m_AccessTimes[Kv.second]; - - Writer.BeginObject(); - Writer << "Key"sv << Key; - Writer << "LastAccess"sv << AccessTime; - Writer.EndObject(); - } - Writer.EndArray(); - - Writer.BeginArray("RawInfo"sv); - { - for (auto& Kv : m_Index) - { - const IoHash& Key = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - if (Payload.RawHash != IoHash::Zero) - { - Writer.BeginObject(); - Writer << "Key"sv << Key; - Writer << "RawHash"sv << Payload.RawHash; - Writer << "RawSize"sv << Payload.RawSize; - Writer.EndObject(); - } - } - } - Writer.EndArray(); - } - - try - { - SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); - } - catch (std::exception& Err) - { - ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what()); - } -} - -IoHash -HashBuffer(const CompositeBuffer& Buffer) -{ - IoHashStream Hasher; - - for (const SharedBuffer& Segment : Buffer.GetSegments()) - { - Hasher.Append(Segment.GetView()); - } - - return Hasher.GetHash(); -} - -bool -ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) -{ - ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType); - - if (ContentType == ZenContentType::kCbObject) - { - CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); - - if (Error == CbValidateError::None) - { - return true; - } - - ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error)); - - return false; - } - else if (ContentType == ZenContentType::kCompressedBinary) - { - IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer); - - IoHash HeaderRawHash; - uint64_t RawSize = 0; - if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize)) - { - ZEN_SCOPED_ERROR("compressed buffer header validation failed"); - - return false; - } - - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize); - CompositeBuffer Decompressed = Compressed.DecompressToComposite(); - IoHash DecompressedHash = HashBuffer(Decompressed); - - if (HeaderRawHash != DecompressedHash) - { - ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash); - - return false; - } - } - else - { - // No way to verify this kind of content (what is it exactly?) - - static int Once = [&] { - ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType)); - return 42; - }(); - } - - return true; -}; - -void -ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) -{ - ZEN_TRACE_CPU("Z$::Bucket::Scrub"); - - ZEN_INFO("scrubbing '{}'", m_BucketDir); - - Stopwatch Timer; - uint64_t ChunkCount = 0; - uint64_t VerifiedChunkBytes = 0; - - auto LogStats = MakeGuard([&] { - const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs()); - - ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", - m_BucketName, - NiceBytes(VerifiedChunkBytes), - NiceTimeSpanMs(DurationMs), - ChunkCount, - NiceRate(VerifiedChunkBytes, DurationMs)); - }); - - std::vector<IoHash> BadKeys; - auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); }; - - try - { - std::vector<BlockStoreLocation> ChunkLocations; - std::vector<IoHash> ChunkIndexToChunkHash; - - RwLock::SharedLockScope _(m_IndexLock); - - const size_t BlockChunkInitialCount = m_Index.size() / 4; - ChunkLocations.reserve(BlockChunkInitialCount); - ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); - - // Do a pass over the index and verify any standalone file values straight away - // all other storage classes are gathered and verified in bulk in order to enable - // more efficient I/O scheduling - - for (auto& Kv : m_Index) - { - const IoHash& HashKey = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - const DiskLocation& Loc = Payload.Location; - - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - Ctx.ThrowIfDeadlineExpired(); - - ++ChunkCount; - VerifiedChunkBytes += Loc.Size(); - - if (Loc.GetContentType() == ZenContentType::kBinary) - { - // Blob cache value, not much we can do about data integrity checking - // here since there's no hash available - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - - RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); - - std::error_code Ec; - uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); - if (Ec) - { - ReportBadKey(HashKey); - } - if (size != Loc.Size()) - { - ReportBadKey(HashKey); - } - continue; - } - else - { - // Structured cache value - IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); - if (!Buffer) - { - ReportBadKey(HashKey); - continue; - } - if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer)) - { - ReportBadKey(HashKey); - continue; - } - } - } - else - { - ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); - ChunkIndexToChunkHash.push_back(HashKey); - continue; - } - } - - const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void { - ++ChunkCount; - VerifiedChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - if (!Data) - { - // ChunkLocation out of range of stored blocks - ReportBadKey(Hash); - return; - } - if (!Size) - { - ReportBadKey(Hash); - return; - } - IoBuffer Buffer(IoBuffer::Wrap, Data, Size); - if (!Buffer) - { - ReportBadKey(Hash); - return; - } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); - Buffer.SetContentType(ContentType); - if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) - { - ReportBadKey(Hash); - return; - } - }; - - const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void { - Ctx.ThrowIfDeadlineExpired(); - - ++ChunkCount; - VerifiedChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); - if (!Buffer) - { - ReportBadKey(Hash); - return; - } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); - Buffer.SetContentType(ContentType); - if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) - { - ReportBadKey(Hash); - return; - } - }; - - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); - } - catch (ScrubDeadlineExpiredException&) - { - ZEN_INFO("Scrubbing deadline expired, operation incomplete"); - } - - Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes); - - if (!BadKeys.empty()) - { - ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); - - if (Ctx.RunRecovery()) - { - // Deal with bad chunks by removing them from our lookup map - - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(BadKeys.size()); - - { - RwLock::ExclusiveLockScope __(m_IndexLock); - for (const IoHash& BadKey : BadKeys) - { - // Log a tombstone and delete the in-memory index for the bad entry - const auto It = m_Index.find(BadKey); - const BucketPayload& Payload = m_Payloads[It->second]; - DiskLocation Location = Payload.Location; - Location.Flags |= DiskLocation::kTombStone; - LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); - m_Index.erase(BadKey); - } - } - for (const DiskIndexEntry& Entry : LogEntries) - { - if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - ExtendablePathBuilder<256> Path; - BuildPath(Path, Entry.Key); - fs::path FilePath = Path.ToPath(); - RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); - if (fs::is_regular_file(FilePath)) - { - ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); - std::error_code Ec; - fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... - } - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - } - } - m_SlogFile.Append(LogEntries); - - // Clean up m_AccessTimes and m_Payloads vectors - { - std::vector<BucketPayload> Payloads; - std::vector<AccessTime> AccessTimes; - IndexMap Index; - - { - RwLock::ExclusiveLockScope __(m_IndexLock); - size_t EntryCount = m_Index.size(); - Payloads.reserve(EntryCount); - AccessTimes.reserve(EntryCount); - Index.reserve(EntryCount); - for (auto It : m_Index) - { - size_t EntryIndex = Payloads.size(); - Payloads.push_back(m_Payloads[EntryIndex]); - AccessTimes.push_back(m_AccessTimes[EntryIndex]); - Index.insert({It.first, EntryIndex}); - } - m_Index.swap(Index); - m_Payloads.swap(Payloads); - m_AccessTimes.swap(AccessTimes); - } - } - } - } - - // Let whomever it concerns know about the bad chunks. This could - // be used to invalidate higher level data structures more efficiently - // than a full validation pass might be able to do - if (!BadKeys.empty()) - { - Ctx.ReportBadCidChunks(BadKeys); - } -} - -void -ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); - - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - - Stopwatch TotalTimer; - const auto _ = MakeGuard([&] { - ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", - m_BucketDir, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs)); - }); - - const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime(); - - const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - - IndexMap Index; - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - { - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - Index = m_Index; - AccessTimes = m_AccessTimes; - Payloads = m_Payloads; - } - - std::vector<IoHash> ExpiredKeys; - ExpiredKeys.reserve(1024); - - std::vector<IoHash> Cids; - Cids.reserve(1024); - - for (const auto& Entry : Index) - { - const IoHash& Key = Entry.first; - GcClock::Tick AccessTime = AccessTimes[Entry.second]; - if (AccessTime < ExpireTicks) - { - ExpiredKeys.push_back(Key); - continue; - } - - const DiskLocation& Loc = Payloads[Entry.second].Location; - - if (Loc.IsFlagSet(DiskLocation::kStructured)) - { - if (Cids.size() > 1024) - { - GcCtx.AddRetainedCids(Cids); - Cids.clear(); - } - - IoBuffer Buffer; - { - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - // We don't need to hold the index lock when we read a standalone file - __.ReleaseNow(); - if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer) - { - continue; - } - } - else if (Buffer = GetInlineCacheValue(Loc); !Buffer) - { - continue; - } - } - - ZEN_ASSERT(Buffer); - ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); - CbObject Obj(SharedBuffer{Buffer}); - Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); - } - } - - GcCtx.AddRetainedCids(Cids); - GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); -} - -void -ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); - - ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); - - Stopwatch TotalTimer; - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - uint64_t TotalChunkCount = 0; - uint64_t DeletedSize = 0; - uint64_t OldTotalSize = TotalSize(); - - std::unordered_set<IoHash> DeletedChunks; - uint64_t MovedCount = 0; - - const auto _ = MakeGuard([&] { - ZEN_DEBUG( - "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " - "{} " - "of {} " - "entires ({}).", - m_BucketDir, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs), - NiceBytes(DeletedSize), - DeletedChunks.size(), - MovedCount, - TotalChunkCount, - NiceBytes(OldTotalSize)); - RwLock::SharedLockScope _(m_IndexLock); - SaveManifest(); - }); - - m_SlogFile.Flush(); - - std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); - std::vector<IoHash> DeleteCacheKeys; - DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); - GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { - if (Keep) - { - return; - } - DeleteCacheKeys.push_back(ChunkHash); - }); - if (DeleteCacheKeys.empty()) - { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir); - return; - } - - auto __ = MakeGuard([&]() { - if (!DeletedChunks.empty()) - { - // Clean up m_AccessTimes and m_Payloads vectors - std::vector<BucketPayload> Payloads; - std::vector<AccessTime> AccessTimes; - IndexMap Index; - - { - RwLock::ExclusiveLockScope _(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - size_t EntryCount = m_Index.size(); - Payloads.reserve(EntryCount); - AccessTimes.reserve(EntryCount); - Index.reserve(EntryCount); - for (auto It : m_Index) - { - size_t OldEntryIndex = It.second; - size_t NewEntryIndex = Payloads.size(); - Payloads.push_back(m_Payloads[OldEntryIndex]); - AccessTimes.push_back(m_AccessTimes[OldEntryIndex]); - Index.insert({It.first, NewEntryIndex}); - } - m_Index.swap(Index); - m_Payloads.swap(Payloads); - m_AccessTimes.swap(AccessTimes); - } - GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end())); - } - }); - - std::vector<DiskIndexEntry> ExpiredStandaloneEntries; - IndexMap Index; - BlockStore::ReclaimSnapshotState BlockStoreState; - { - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_Index.empty()) - { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir); - return; - } - BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - - SaveManifest(); - Index = m_Index; - - for (const IoHash& Key : DeleteCacheKeys) - { - if (auto It = Index.find(Key); It != Index.end()) - { - const BucketPayload& Payload = m_Payloads[It->second]; - DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; - if (Entry.Location.Flags & DiskLocation::kStandaloneFile) - { - Entry.Location.Flags |= DiskLocation::kTombStone; - ExpiredStandaloneEntries.push_back(Entry); - } - } - } - if (GcCtx.IsDeletionMode()) - { - for (const auto& Entry : ExpiredStandaloneEntries) - { - m_Index.erase(Entry.Key); - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - DeletedChunks.insert(Entry.Key); - } - m_SlogFile.Append(ExpiredStandaloneEntries); - } - } - - if (GcCtx.IsDeletionMode()) - { - std::error_code Ec; - ExtendablePathBuilder<256> Path; - - for (const auto& Entry : ExpiredStandaloneEntries) - { - const IoHash& Key = Entry.Key; - const DiskLocation& Loc = Entry.Location; - - Path.Reset(); - BuildPath(Path, Key); - fs::path FilePath = Path.ToPath(); - - { - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_Index.contains(Key)) - { - // Someone added it back, let the file on disk be - ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); - continue; - } - __.ReleaseNow(); - - RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); - if (fs::is_regular_file(FilePath)) - { - ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); - fs::remove(FilePath, Ec); - } - } - - if (Ec) - { - ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); - Ec.clear(); - DiskLocation RestoreLocation = Loc; - RestoreLocation.Flags &= ~DiskLocation::kTombStone; - - RwLock::ExclusiveLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - if (m_Index.contains(Key)) - { - continue; - } - m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert({Key, EntryIndex}); - m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed); - DeletedChunks.erase(Key); - continue; - } - DeletedSize += Entry.Location.Size(); - } - } - - TotalChunkCount = Index.size(); - - std::vector<IoHash> TotalChunkHashes; - TotalChunkHashes.reserve(TotalChunkCount); - for (const auto& Entry : Index) - { - const DiskLocation& Location = m_Payloads[Entry.second].Location; - - if (Location.Flags & DiskLocation::kStandaloneFile) - { - continue; - } - TotalChunkHashes.push_back(Entry.first); - } - - if (TotalChunkHashes.empty()) - { - return; - } - TotalChunkCount = TotalChunkHashes.size(); - - std::vector<BlockStoreLocation> ChunkLocations; - BlockStore::ChunkIndexArray KeepChunkIndexes; - std::vector<IoHash> ChunkIndexToChunkHash; - ChunkLocations.reserve(TotalChunkCount); - ChunkLocations.reserve(TotalChunkCount); - ChunkIndexToChunkHash.reserve(TotalChunkCount); - - GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { - auto KeyIt = Index.find(ChunkHash); - const DiskLocation& DiskLocation = m_Payloads[KeyIt->second].Location; - BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); - size_t ChunkIndex = ChunkLocations.size(); - ChunkLocations.push_back(Location); - ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; - if (Keep) - { - KeepChunkIndexes.push_back(ChunkIndex); - } - }); - - size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); - - const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); - if (!PerformDelete) - { - m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); - uint64_t CurrentTotalSize = TotalSize(); - ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}", - m_BucketDir, - DeleteCount, - TotalChunkCount, - NiceBytes(CurrentTotalSize)); - return; - } - - m_BlockStore.ReclaimSpace( - BlockStoreState, - ChunkLocations, - KeepChunkIndexes, - m_PayloadAlignment, - false, - [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const BlockStoreLocation& NewLocation = Entry.second; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; - const DiskLocation& OldDiskLocation = OldPayload.Location; - LogEntries.push_back( - {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())}); - } - for (const size_t ChunkIndex : RemovedChunks) - { - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; - const DiskLocation& OldDiskLocation = OldPayload.Location; - LogEntries.push_back({.Key = ChunkHash, - .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), - m_PayloadAlignment, - OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); - DeletedChunks.insert(ChunkHash); - } - - m_SlogFile.Append(LogEntries); - m_SlogFile.Flush(); - { - RwLock::ExclusiveLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - for (const DiskIndexEntry& Entry : LogEntries) - { - if (Entry.Location.GetFlags() & DiskLocation::kTombStone) - { - m_Index.erase(Entry.Key); - continue; - } - m_Payloads[m_Index[Entry.Key]].Location = Entry.Location; - } - } - }, - [&]() { return GcCtx.CollectSmallObjects(); }); -} - -void -ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) -{ - using namespace access_tracking; - - for (const KeyAccessTime& KeyTime : AccessTimes) - { - if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = KeyTime.LastAccess; - } - } -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::EntryCount() const -{ - RwLock::SharedLockScope _(m_IndexLock); - return static_cast<uint64_t>(m_Index.size()); -} - -CacheValueDetails::ValueDetails -ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index) const -{ - std::vector<IoHash> Attachments; - const BucketPayload& Payload = m_Payloads[Index]; - if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) - { - IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key) - : GetInlineCacheValue(Payload.Location); - CbObject Obj(SharedBuffer{Value}); - Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); - } - return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), - .RawSize = Payload.RawSize, - .RawHash = Payload.RawHash, - .LastAccess = m_AccessTimes[Index], - .Attachments = std::move(Attachments), - .ContentType = Payload.Location.GetContentType()}; -} - -CacheValueDetails::BucketDetails -ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const -{ - CacheValueDetails::BucketDetails Details; - RwLock::SharedLockScope _(m_IndexLock); - if (ValueFilter.empty()) - { - Details.Values.reserve(m_Index.size()); - for (const auto& It : m_Index) - { - Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second)); - } - } - else - { - IoHash Key = IoHash::FromHexString(ValueFilter); - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second)); - } - } - return Details; -} - -void -ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - CacheBucket& Bucket = *Kv.second; - Bucket.CollectGarbage(GcCtx); - } -} - -void -ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes) -{ - RwLock::SharedLockScope _(m_Lock); - - for (const auto& Kv : AccessTimes.Buckets) - { - if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - Bucket.UpdateAccessTimes(Kv.second); - } - } -} - -void -ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) -{ - ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue"); - - uint64_t NewFileSize = Value.Value.Size(); - - TemporaryFile DataFile; - - std::error_code Ec; - DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); - } - - bool CleanUpTempFile = false; - auto __ = MakeGuard([&] { - if (CleanUpTempFile) - { - std::error_code Ec; - std::filesystem::remove(DataFile.GetPath(), Ec); - if (Ec) - { - ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", - DataFile.GetPath(), - m_BucketDir, - Ec.message()); - } - } - }); - - DataFile.WriteAll(Value.Value, Ec); - if (Ec) - { - throw std::system_error(Ec, - fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", - NiceBytes(NewFileSize), - DataFile.GetPath().string(), - m_BucketDir)); - } - - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - std::filesystem::path FsPath{DataFilePath.ToPath()}; - - RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - - // We do a speculative remove of the file instead of probing with a exists call and check the error code instead - std::filesystem::remove(FsPath, Ec); - if (Ec) - { - if (Ec.value() != ENOENT) - { - ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); - Sleep(100); - Ec.clear(); - std::filesystem::remove(FsPath, Ec); - if (Ec && Ec.value() != ENOENT) - { - throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); - } - } - } - - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - if (Ec) - { - CreateDirectories(FsPath.parent_path()); - Ec.clear(); - - // Try again - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - if (Ec) - { - ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retrying.", - FsPath, - DataFile.GetPath(), - m_BucketDir, - Ec.message()); - Sleep(100); - Ec.clear(); - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - if (Ec) - { - throw std::system_error( - Ec, - fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); - } - } - } - - // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file - // will be disabled as the file handle has already been closed - CleanUpTempFile = false; - - uint8_t EntryFlags = DiskLocation::kStandaloneFile; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - DiskLocation Loc(NewFileSize, EntryFlags); - - RwLock::ExclusiveLockScope _(m_IndexLock); - if (auto It = m_Index.find(HashKey); It == m_Index.end()) - { - // Previously unknown object - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(HashKey, EntryIndex); - } - else - { - // TODO: should check if write is idempotent and bail out if it is? - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}; - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); - } - - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); -} - -void -ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) -{ - uint8_t EntryFlags = 0; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { - DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); - m_SlogFile.Append({.Key = HashKey, .Location = Location}); - - RwLock::ExclusiveLockScope _(m_IndexLock); - if (auto It = m_Index.find(HashKey); It != m_Index.end()) - { - // TODO: should check if write is idempotent and bail out if it is? - // this would requiring comparing contents on disk unless we add a - // content hash to the index entry - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - } - else - { - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(HashKey, EntryIndex); - } - }); -} - -////////////////////////////////////////////////////////////////////////// - -ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) -{ -} - -ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; - -bool -ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(BucketName); - - if (It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - } - - if (Bucket == nullptr) - { - // Bucket needs to be opened/created - - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); - Bucket = InsertResult.first->second.get(); - - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; - - if (!Bucket->OpenOrCreate(BucketPath)) - { - m_Buckets.erase(InsertResult.first); - return false; - } - } - } - - ZEN_ASSERT(Bucket != nullptr); - return Bucket->Get(HashKey, OutValue); -} - -void -ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) -{ - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(BucketName); - - if (It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - } - - if (Bucket == nullptr) - { - // New bucket needs to be created - - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); - Bucket = InsertResult.first->second.get(); - - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; - - try - { - if (!Bucket->OpenOrCreate(BucketPath)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - m_Buckets.erase(InsertResult.first); - return; - } - } - catch (const std::exception& Err) - { - ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - return; - } - } - } - - ZEN_ASSERT(Bucket != nullptr); - - Bucket->Put(HashKey, Value); -} - -void -ZenCacheDiskLayer::DiscoverBuckets() -{ - DirectoryContent DirContent; - GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); - - // Initialize buckets - - RwLock::ExclusiveLockScope _(m_Lock); - - for (const std::filesystem::path& BucketPath : DirContent.Directories) - { - const std::string BucketName = PathToUtf8(BucketPath.stem()); - // New bucket needs to be created - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - continue; - } - - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); - CacheBucket& Bucket = *InsertResult.first->second; - - try - { - if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - - m_Buckets.erase(InsertResult.first); - continue; - } - } - catch (const std::exception& Err) - { - ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - return; - } - ZEN_INFO("Discovered bucket '{}'", BucketName); - } -} - -bool -ZenCacheDiskLayer::DropBucket(std::string_view InBucket) -{ - RwLock::ExclusiveLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It); - - return Bucket.Drop(); - } - - // Make sure we remove the folder even if we don't know about the bucket - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= std::string(InBucket); - return MoveAndDeleteDirectory(BucketPath); -} - -bool -ZenCacheDiskLayer::Drop() -{ - RwLock::ExclusiveLockScope _(m_Lock); - - std::vector<std::unique_ptr<CacheBucket>> Buckets; - Buckets.reserve(m_Buckets.size()); - while (!m_Buckets.empty()) - { - const auto& It = m_Buckets.begin(); - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It->first); - if (!Bucket.Drop()) - { - return false; - } - } - return MoveAndDeleteDirectory(m_RootDir); -} - -void -ZenCacheDiskLayer::Flush() -{ - std::vector<CacheBucket*> Buckets; - - { - RwLock::SharedLockScope _(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - CacheBucket* Bucket = Kv.second.get(); - Buckets.push_back(Bucket); - } - } - - for (auto& Bucket : Buckets) - { - Bucket->Flush(); - } -} - -void -ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_Lock); - - { - std::vector<std::future<void>> Results; - Results.reserve(m_Buckets.size()); - - for (auto& Kv : m_Buckets) - { -#if 1 - Results.push_back( - Ctx.ThreadPool().EnqueueTask(std::packaged_task<void()>{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); -#else - CacheBucket& Bucket = *Kv.second; - Bucket.ScrubStorage(Ctx); -#endif - } - - for (auto& Result : Results) - { - Result.get(); - } - } -} - -void -ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - CacheBucket& Bucket = *Kv.second; - Bucket.GatherReferences(GcCtx); - } -} - -uint64_t -ZenCacheDiskLayer::TotalSize() const -{ - uint64_t TotalSize{}; - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - TotalSize += Kv.second->TotalSize(); - } - - return TotalSize; -} - -ZenCacheDiskLayer::Info -ZenCacheDiskLayer::GetInfo() const -{ - ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir}, .TotalSize = TotalSize()}; - - RwLock::SharedLockScope _(m_Lock); - Info.BucketNames.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Info.BucketNames.push_back(Kv.first); - Info.EntryCount += Kv.second->EntryCount(); - } - return Info; -} - -std::optional<ZenCacheDiskLayer::BucketInfo> -ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const -{ - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) - { - return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; - } - return {}; -} - -CacheValueDetails::NamespaceDetails -ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const -{ - RwLock::SharedLockScope _(m_Lock); - CacheValueDetails::NamespaceDetails Details; - if (BucketFilter.empty()) - { - Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); - for (auto& Kv : m_Buckets) - { - Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter); - } - } - else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) - { - Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter); - } - return Details; -} - //////////////////////////// ZenCacheStore ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$"); |