aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-17 10:31:50 +0200
committerGitHub <[email protected]>2023-05-17 10:31:50 +0200
commit5adba30f4528a7d74090a8391d09b287501846a7 (patch)
tree25476b8e49fb5a44170b4d181de60de1f2d88ebe /src/zenserver/cache/structuredcachestore.cpp
parentamended CHANGELOG.md with recent changes (diff)
downloadzen-5adba30f4528a7d74090a8391d09b287501846a7.tar.xz
zen-5adba30f4528a7d74090a8391d09b287501846a7.zip
Restructured structured cache store (#314)
This change separates out the disk and memory storage strategies into separate cpp/h files to improve maintainability.
Diffstat (limited to 'src/zenserver/cache/structuredcachestore.cpp')
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp2424
1 files changed, 2 insertions, 2422 deletions
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index 440da3074..bc4248a8a 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -2,8 +2,6 @@
#include "structuredcachestore.h"
-#include <zencore/except.h>
-
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
@@ -17,12 +15,8 @@
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zencore/workthreadpool.h>
-#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
-#include <xxhash.h>
-
#include <future>
#include <limits>
@@ -32,6 +26,7 @@
ZEN_THIRD_PARTY_INCLUDES_START
#include <fmt/core.h>
+#include <xxhash.h>
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -39,158 +34,12 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/testing.h>
# include <zencore/testutils.h>
# include <zencore/workthreadpool.h>
+# include <zenstore/cidstore.h>
# include <random>
#endif
-//////////////////////////////////////////////////////////////////////////
-
namespace zen {
-namespace {
-
-#pragma pack(push)
-#pragma pack(1)
-
- // We use this to indicate if a on disk bucket needs wiping
- // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scrable the references
- // to block items.
- // See: https://github.com/EpicGames/zen/pull/299
- static const uint32_t CurrentDiskBucketVersion = 1;
-
- struct CacheBucketIndexHeader
- {
- static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
- static constexpr uint32_t Version2 = 2;
- static constexpr uint32_t CurrentVersion = Version2;
-
- uint32_t Magic = ExpectedMagic;
- uint32_t Version = CurrentVersion;
- uint64_t EntryCount = 0;
- uint64_t LogPosition = 0;
- uint32_t PayloadAlignment = 0;
- uint32_t Checksum = 0;
-
- static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header)
- {
- return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
- }
- };
-
- static_assert(sizeof(CacheBucketIndexHeader) == 32);
-
-#pragma pack(pop)
-
- const char* IndexExtension = ".uidx";
- const char* LogExtension = ".slog";
-
- std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
- {
- return BucketDir / (BucketName + IndexExtension);
- }
-
- std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
- {
- return BucketDir / (BucketName + ".tmp");
- }
-
- std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
- {
- return BucketDir / (BucketName + LogExtension);
- }
-
- bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason)
- {
- if (Entry.Key == IoHash::Zero)
- {
- OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Location.Reserved != 0)
- {
- OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Location.GetFlags() &
- ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed))
- {
- OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
- {
- return true;
- }
- if (Entry.Location.Reserved != 0)
- {
- OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString());
- return false;
- }
- uint64_t Size = Entry.Location.Size();
- if (Size == 0)
- {
- OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
- return false;
- }
- return true;
- }
-
- bool MoveAndDeleteDirectory(const std::filesystem::path& Dir)
- {
- int DropIndex = 0;
- do
- {
- if (!std::filesystem::exists(Dir))
- {
- return false;
- }
-
- std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex);
- std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName;
- if (std::filesystem::exists(DroppedBucketPath))
- {
- DropIndex++;
- continue;
- }
-
- std::error_code Ec;
- std::filesystem::rename(Dir, DroppedBucketPath, Ec);
- if (!Ec)
- {
- DeleteDirectories(DroppedBucketPath);
- return true;
- }
- // TODO: Do we need to bail at some point?
- zen::Sleep(100);
- } while (true);
- }
-
-} // namespace
-
-namespace fs = std::filesystem;
-
-static CbObject
-LoadCompactBinaryObject(const fs::path& Path)
-{
- FileContents Result = ReadFile(Path);
-
- if (!Result.ErrorCode)
- {
- IoBuffer Buffer = Result.Flatten();
- if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
- {
- return LoadCompactBinaryObject(Buffer);
- }
- }
-
- return CbObject();
-}
-
-static void
-SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
-{
- WriteFile(Path, Object.GetBuffer().AsIoBuffer());
-}
-
ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir)
: GcStorage(Gc)
, GcContributor(Gc)
@@ -363,2275 +212,6 @@ ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const st
return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter);
}
-//////////////////////////////////////////////////////////////////////////
-
-ZenCacheMemoryLayer::ZenCacheMemoryLayer()
-{
-}
-
-ZenCacheMemoryLayer::~ZenCacheMemoryLayer()
-{
-}
-
-bool
-ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- auto It = m_Buckets.find(std::string(InBucket));
-
- if (It == m_Buckets.end())
- {
- return false;
- }
-
- CacheBucket* Bucket = It->second.get();
-
- _.ReleaseNow();
-
- // There's a race here. Since the lock is released early to allow
- // inserts, the bucket delete path could end up deleting the
- // underlying data structure
-
- return Bucket->Get(HashKey, OutValue);
-}
-
-void
-ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
-{
- const auto BucketName = std::string(InBucket);
- CacheBucket* Bucket = nullptr;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- }
-
- if (Bucket == nullptr)
- {
- // New bucket
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- else
- {
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>());
- Bucket = InsertResult.first->second.get();
- }
- }
-
- // Note that since the underlying IoBuffer is retained, the content type is also
-
- Bucket->Put(HashKey, Value);
-}
-
-bool
-ZenCacheMemoryLayer::DropBucket(std::string_view InBucket)
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- auto It = m_Buckets.find(std::string(InBucket));
-
- if (It != m_Buckets.end())
- {
- CacheBucket& Bucket = *It->second;
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It);
- Bucket.Drop();
- return true;
- }
- return false;
-}
-
-void
-ZenCacheMemoryLayer::Drop()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
- std::vector<std::unique_ptr<CacheBucket>> Buckets;
- Buckets.reserve(m_Buckets.size());
- while (!m_Buckets.empty())
- {
- const auto& It = m_Buckets.begin();
- CacheBucket& Bucket = *It->second;
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It->first);
- Bucket.Drop();
- }
-}
-
-void
-ZenCacheMemoryLayer::ScrubStorage(ScrubContext& Ctx)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- Kv.second->ScrubStorage(Ctx);
- }
-}
-
-void
-ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes)
-{
- using namespace zen::access_tracking;
-
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first];
- Kv.second->GatherAccessTimes(Bucket);
- }
-}
-
-void
-ZenCacheMemoryLayer::Reset()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Buckets.clear();
-}
-
-uint64_t
-ZenCacheMemoryLayer::TotalSize() const
-{
- uint64_t TotalSize{};
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- TotalSize += Kv.second->TotalSize();
- }
-
- return TotalSize;
-}
-
-ZenCacheMemoryLayer::Info
-ZenCacheMemoryLayer::GetInfo() const
-{
- ZenCacheMemoryLayer::Info Info = {.Config = m_Configuration, .TotalSize = TotalSize()};
-
- RwLock::SharedLockScope _(m_Lock);
- Info.BucketNames.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Info.BucketNames.push_back(Kv.first);
- Info.EntryCount += Kv.second->EntryCount();
- }
- return Info;
-}
-
-std::optional<ZenCacheMemoryLayer::BucketInfo>
-ZenCacheMemoryLayer::GetBucketInfo(std::string_view Bucket) const
-{
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
- {
- return ZenCacheMemoryLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()};
- }
- return {};
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
-{
- RwLock::SharedLockScope _(m_BucketLock);
-
- std::vector<IoHash> BadHashes;
-
- auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) {
- if (ContentType == ZenContentType::kCbObject)
- {
- CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
- return Error == CbValidateError::None;
- }
- if (ContentType == ZenContentType::kCompressedBinary)
- {
- IoHash RawHash;
- uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
- {
- return false;
- }
- if (Hash != RawHash)
- {
- return false;
- }
- }
- return true;
- };
-
- for (auto& Kv : m_CacheMap)
- {
- const BucketPayload& Payload = m_Payloads[Kv.second];
- if (!ValidateEntry(Kv.first, Payload.Payload.GetContentType(), Payload.Payload))
- {
- BadHashes.push_back(Kv.first);
- }
- }
-
- if (!BadHashes.empty())
- {
- Ctx.ReportBadCidChunks(BadHashes);
- }
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes)
-{
- RwLock::SharedLockScope _(m_BucketLock);
- std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [this](const auto& Kv) {
- return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = m_AccessTimes[Kv.second]};
- });
-}
-
-bool
-ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- RwLock::SharedLockScope _(m_BucketLock);
-
- if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
- {
- uint32_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
- ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
-
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash};
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
-
- return true;
- }
-
- return false;
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
-{
- size_t PayloadSize = Value.Value.GetSize();
- {
- GcClock::Tick AccessTime = GcClock::TickCount();
- RwLock::ExclusiveLockScope _(m_BucketLock);
- if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max())
- {
- // No more space in our memory cache!
- return;
- }
- if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
- {
- uint32_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
-
- m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed);
- BucketPayload& Payload = m_Payloads[EntryIndex];
- Payload.Payload = Value.Value;
- Payload.RawHash = Value.RawHash;
- Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize);
- m_AccessTimes[EntryIndex] = AccessTime;
- }
- else
- {
- uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size());
- m_Payloads.emplace_back(
- BucketPayload{.Payload = Value.Value, .RawSize = gsl::narrow<uint32_t>(Value.RawSize), .RawHash = Value.RawHash});
- m_AccessTimes.emplace_back(AccessTime);
- m_CacheMap.insert_or_assign(HashKey, EntryIndex);
- }
- ZEN_ASSERT_SLOW(m_Payloads.size() == m_CacheMap.size());
- ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size());
- }
-
- m_TotalSize.fetch_add(PayloadSize, std::memory_order::relaxed);
-}
-
-void
-ZenCacheMemoryLayer::CacheBucket::Drop()
-{
- RwLock::ExclusiveLockScope _(m_BucketLock);
- m_CacheMap.clear();
- m_AccessTimes.clear();
- m_Payloads.clear();
- m_TotalSize.store(0);
-}
-
-uint64_t
-ZenCacheMemoryLayer::CacheBucket::EntryCount() const
-{
- RwLock::SharedLockScope _(m_BucketLock);
- return static_cast<uint64_t>(m_CacheMap.size());
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero)
-{
- if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap")))
- {
- // This is pretty ad hoc but in order to avoid too many individual files
- // it makes sense to have a different strategy for legacy values
- m_LargeObjectThreshold = 16 * 1024 * 1024;
- }
-}
-
-ZenCacheDiskLayer::CacheBucket::~CacheBucket()
-{
-}
-
-bool
-ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate)
-{
- using namespace std::literals;
-
- ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate");
-
- ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir);
-
- m_BlocksBasePath = BucketDir / "blocks";
- m_BucketDir = BucketDir;
-
- CreateDirectories(m_BucketDir);
-
- std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"};
-
- bool IsNew = false;
-
- CbObject Manifest = LoadCompactBinaryObject(ManifestPath);
-
- if (Manifest)
- {
- m_BucketId = Manifest["BucketId"sv].AsObjectId();
- if (m_BucketId == Oid::Zero)
- {
- return false;
- }
- // uint32_t Version = Manifest["Version"sv].AsUInt32(0);
- // if (Version != CurrentDiskBucketVersion)
- //{
- // ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion);
- // IsNew = true;
- // }
- }
- else if (AllowCreate)
- {
- m_BucketId.Generate();
-
- CbObjectWriter Writer;
- Writer << "BucketId"sv << m_BucketId;
- Writer << "Version"sv << CurrentDiskBucketVersion;
- Manifest = Writer.Save();
- SaveCompactBinaryObject(ManifestPath, Manifest);
- IsNew = true;
- }
- else
- {
- return false;
- }
-
- OpenLog(IsNew);
-
- if (!IsNew)
- {
- Stopwatch Timer;
- const auto _ =
- MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
-
- for (CbFieldView Entry : Manifest["Timestamps"sv])
- {
- const CbObjectView Obj = Entry.AsObjectView();
- const IoHash Key = Obj["Key"sv].AsHash();
-
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64();
- }
- }
- for (CbFieldView Entry : Manifest["RawInfo"sv])
- {
- const CbObjectView Obj = Entry.AsObjectView();
- const IoHash Key = Obj["Key"sv].AsHash();
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
-
- const IoHash RawHash = Obj["RawHash"sv].AsHash();
- const uint64_t RawSize = Obj["RawSize"sv].AsUInt64();
-
- if (RawHash == IoHash::Zero || RawSize == 0)
- {
- ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex);
- }
-
- m_Payloads[EntryIndex].RawHash = RawHash;
- m_Payloads[EntryIndex].RawSize = RawSize;
- }
- }
- }
-
- return true;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot()
-{
- ZEN_TRACE_CPU("Z$::Bucket::MakeIndexSnapshot");
-
- uint64_t LogCount = m_SlogFile.GetLogCount();
- if (m_LogFlushPosition == LogCount)
- {
- return;
- }
-
- ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir);
- uint64_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
- m_BucketDir,
- EntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- namespace fs = std::filesystem;
-
- fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
- fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName);
-
- // Move index away, we keep it if something goes wrong
- if (fs::is_regular_file(STmpIndexPath))
- {
- fs::remove(STmpIndexPath);
- }
- if (fs::is_regular_file(IndexPath))
- {
- fs::rename(IndexPath, STmpIndexPath);
- }
-
- try
- {
- // Write the current state of the location map to a new index state
- std::vector<DiskIndexEntry> Entries;
- Entries.resize(m_Index.size());
-
- {
- uint64_t EntryIndex = 0;
- for (auto& Entry : m_Index)
- {
- DiskIndexEntry& IndexEntry = Entries[EntryIndex++];
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = m_Payloads[Entry.second].Location;
- }
- }
-
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
- CacheBucketIndexHeader Header = {.EntryCount = Entries.size(),
- .LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
-
- Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
-
- ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
- ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
- ObjectIndexFile.Flush();
- ObjectIndexFile.Close();
- EntryCount = Entries.size();
- m_LogFlushPosition = LogCount;
- }
- catch (std::exception& Err)
- {
- ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());
-
- // Restore any previous snapshot
-
- if (fs::is_regular_file(STmpIndexPath))
- {
- fs::remove(IndexPath);
- fs::rename(STmpIndexPath, IndexPath);
- }
- }
- if (fs::is_regular_file(STmpIndexPath))
- {
- fs::remove(STmpIndexPath);
- }
-}
-
-uint64_t
-ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion)
-{
- ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile");
-
- if (std::filesystem::is_regular_file(IndexPath))
- {
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CacheBucketIndexHeader))
- {
- CacheBucketIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) &&
- (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0))
- {
- switch (Header.Version)
- {
- case CacheBucketIndexHeader::Version2:
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
- if (Header.EntryCount > ExpectedEntryCount)
- {
- break;
- }
- size_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' index containing {} entries in {}",
- IndexPath,
- EntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- m_PayloadAlignment = Header.PayloadAlignment;
-
- std::vector<DiskIndexEntry> Entries;
- Entries.resize(Header.EntryCount);
- ObjectIndexFile.Read(Entries.data(),
- Header.EntryCount * sizeof(DiskIndexEntry),
- sizeof(CacheBucketIndexHeader));
-
- m_Payloads.reserve(Header.EntryCount);
- m_AccessTimes.reserve(Header.EntryCount);
- m_Index.reserve(Header.EntryCount);
-
- std::string InvalidEntryReason;
- for (const DiskIndexEntry& Entry : Entries)
- {
- if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
- continue;
- }
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_Index.insert_or_assign(Entry.Key, EntryIndex);
- EntryCount++;
- }
- OutVersion = CacheBucketIndexHeader::Version2;
- return Header.LogPosition;
- }
- break;
- default:
- break;
- }
- }
- }
- ZEN_WARN("skipping invalid index file '{}'", IndexPath);
- }
- return 0;
-}
-
-uint64_t
-ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
-{
- ZEN_TRACE_CPU("Z$::Bucket::ReadLog");
-
- if (std::filesystem::is_regular_file(LogPath))
- {
- uint64_t LogEntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
- TCasLogFile<DiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- if (CasLog.Initialize())
- {
- uint64_t EntryCount = CasLog.GetLogCount();
- if (EntryCount < SkipEntryCount)
- {
- ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
- SkipEntryCount = 0;
- }
- LogEntryCount = EntryCount - SkipEntryCount;
- m_Index.reserve(LogEntryCount);
- uint64_t InvalidEntryCount = 0;
- CasLog.Replay(
- [&](const DiskIndexEntry& Record) {
- std::string InvalidEntryReason;
- if (Record.Location.Flags & DiskLocation::kTombStone)
- {
- m_Index.erase(Record.Key);
- return;
- }
- if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
- ++InvalidEntryCount;
- return;
- }
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_Index.insert_or_assign(Record.Key, EntryIndex);
- },
- SkipEntryCount);
- if (InvalidEntryCount)
- {
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
- }
- return LogEntryCount;
- }
- }
- return 0;
-};
-
-void
-ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
-{
- ZEN_TRACE_CPU("Z$::Bucket::OpenLog");
-
- m_TotalStandaloneSize = 0;
-
- m_Index.clear();
- m_Payloads.clear();
- m_AccessTimes.clear();
-
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
- std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
-
- if (IsNew)
- {
- fs::remove(LogPath);
- fs::remove(IndexPath);
- fs::remove_all(m_BlocksBasePath);
- }
-
- CreateDirectories(m_BucketDir);
-
- std::unordered_map<uint32_t, uint64_t> BlockSizes =
- m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);
-
- if (std::filesystem::is_regular_file(IndexPath))
- {
- uint32_t IndexVersion = 0;
- m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion);
- if (IndexVersion == 0)
- {
- ZEN_WARN("removing invalid index file at '{}'", IndexPath);
- std::filesystem::remove(IndexPath);
- }
- }
-
- uint64_t LogEntryCount = 0;
- if (std::filesystem::is_regular_file(LogPath))
- {
- if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath))
- {
- LogEntryCount = ReadLog(LogPath, m_LogFlushPosition);
- }
- else if (fs::is_regular_file(LogPath))
- {
- ZEN_WARN("removing invalid cas log at '{}'", LogPath);
- std::filesystem::remove(LogPath);
- }
- }
-
- m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
-
- std::vector<BlockStoreLocation> KnownLocations;
- KnownLocations.reserve(m_Index.size());
- std::vector<DiskIndexEntry> BadEntries;
- for (const auto& Entry : m_Index)
- {
- size_t EntryIndex = Entry.second;
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- const DiskLocation& Location = Payload.Location;
-
- if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
- continue;
- }
- const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment);
-
- auto BlockIt = BlockSizes.find(BlockLocation.BlockIndex);
- if (BlockIt == BlockSizes.end())
- {
- ZEN_WARN("Unknown block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString());
- }
- else
- {
- uint64_t BlockSize = BlockIt->second;
- if (BlockLocation.Offset + BlockLocation.Size > BlockSize)
- {
- ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString());
- }
- else
- {
- KnownLocations.push_back(BlockLocation);
- continue;
- }
- }
-
- DiskLocation NewLocation = Payload.Location;
- NewLocation.Flags |= DiskLocation::kTombStone;
- BadEntries.push_back(DiskIndexEntry{.Key = Entry.first, .Location = NewLocation});
- }
-
- if (!BadEntries.empty())
- {
- m_SlogFile.Append(BadEntries);
- m_SlogFile.Flush();
-
- LogEntryCount += BadEntries.size();
-
- for (const DiskIndexEntry& BadEntry : BadEntries)
- {
- m_Index.erase(BadEntry.Key);
- }
- }
-
- m_BlockStore.Prune(KnownLocations);
-
- if (IsNew || LogEntryCount > 0)
- {
- MakeIndexSnapshot();
- }
- // TODO: should validate integrity of container files here
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const
-{
- char HexString[sizeof(HashKey.Hash) * 2];
- ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString);
-
- Path.Append(m_BucketDir);
- Path.AppendSeparator();
- Path.Append(L"blob");
- Path.AppendSeparator();
- Path.AppendAsciiRange(HexString, HexString + 3);
- Path.AppendSeparator();
- Path.AppendAsciiRange(HexString + 3, HexString + 5);
- Path.AppendSeparator();
- Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString));
-}
-
-IoBuffer
-ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const
-{
- BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment);
-
- IoBuffer Value = m_BlockStore.TryGetChunk(Location);
- if (Value)
- {
- Value.SetContentType(Loc.GetContentType());
- }
-
- return Value;
-}
-
-IoBuffer
-ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const
-{
- ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue");
-
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
-
- RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
-
- if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath()))
- {
- Data.SetContentType(Loc.GetContentType());
-
- return Data;
- }
-
- return {};
-}
-
-bool
-ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- RwLock::SharedLockScope _(m_IndexLock);
- auto It = m_Index.find(HashKey);
- if (It == m_Index.end())
- {
- return false;
- }
- size_t EntryIndex = It.value();
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
- DiskLocation Location = Payload.Location;
- OutValue.RawSize = Payload.RawSize;
- OutValue.RawHash = Payload.RawHash;
- if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- // We don't need to hold the index lock when we read a standalone file
- _.ReleaseNow();
- OutValue.Value = GetStandaloneCacheValue(Location, HashKey);
- }
- else
- {
- OutValue.Value = GetInlineCacheValue(Location);
- }
- _.ReleaseNow();
-
- if (!Location.IsFlagSet(DiskLocation::kStructured))
- {
- if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0)
- {
- if (Location.IsFlagSet(DiskLocation::kCompressed))
- {
- (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize);
- }
- else
- {
- OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
- OutValue.RawSize = OutValue.Value.GetSize();
- }
- RwLock::ExclusiveLockScope __(m_IndexLock);
- if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end())
- {
- BucketPayload& WritePayload = m_Payloads[WriteIt.value()];
- WritePayload.RawHash = OutValue.RawHash;
- WritePayload.RawSize = OutValue.RawSize;
-
- m_LogFlushPosition = 0; // Force resave of index on exit
- }
- }
- }
-
- return (bool)OutValue.Value;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
-{
- if (Value.Value.Size() >= m_LargeObjectThreshold)
- {
- return PutStandaloneCacheValue(HashKey, Value);
- }
- PutInlineCacheValue(HashKey, Value);
-}
-
-bool
-ZenCacheDiskLayer::CacheBucket::Drop()
-{
- ZEN_TRACE_CPU("Z$::Bucket::Drop");
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
-
- std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks;
- ShardLocks.reserve(256);
- for (RwLock& Lock : m_ShardedLocks)
- {
- ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock));
- }
- m_BlockStore.Close();
- m_SlogFile.Close();
-
- bool Deleted = MoveAndDeleteDirectory(m_BucketDir);
-
- m_Index.clear();
- m_Payloads.clear();
- m_AccessTimes.clear();
- return Deleted;
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::Flush()
-{
- ZEN_TRACE_CPU("Z$::Bucket::Flush");
-
- m_BlockStore.Flush();
-
- RwLock::SharedLockScope _(m_IndexLock);
- m_SlogFile.Flush();
- MakeIndexSnapshot();
- SaveManifest();
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::SaveManifest()
-{
- using namespace std::literals;
-
- ZEN_TRACE_CPU("Z$::Bucket::SaveManifest");
-
- CbObjectWriter Writer;
- Writer << "BucketId"sv << m_BucketId;
- Writer << "Version"sv << CurrentDiskBucketVersion;
-
- if (!m_Index.empty())
- {
- Writer.BeginArray("Timestamps"sv);
- for (auto& Kv : m_Index)
- {
- const IoHash& Key = Kv.first;
- GcClock::Tick AccessTime = m_AccessTimes[Kv.second];
-
- Writer.BeginObject();
- Writer << "Key"sv << Key;
- Writer << "LastAccess"sv << AccessTime;
- Writer.EndObject();
- }
- Writer.EndArray();
-
- Writer.BeginArray("RawInfo"sv);
- {
- for (auto& Kv : m_Index)
- {
- const IoHash& Key = Kv.first;
- const BucketPayload& Payload = m_Payloads[Kv.second];
- if (Payload.RawHash != IoHash::Zero)
- {
- Writer.BeginObject();
- Writer << "Key"sv << Key;
- Writer << "RawHash"sv << Payload.RawHash;
- Writer << "RawSize"sv << Payload.RawSize;
- Writer.EndObject();
- }
- }
- }
- Writer.EndArray();
- }
-
- try
- {
- SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save());
- }
- catch (std::exception& Err)
- {
- ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what());
- }
-}
-
-IoHash
-HashBuffer(const CompositeBuffer& Buffer)
-{
- IoHashStream Hasher;
-
- for (const SharedBuffer& Segment : Buffer.GetSegments())
- {
- Hasher.Append(Segment.GetView());
- }
-
- return Hasher.GetHash();
-}
-
-bool
-ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer)
-{
- ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType);
-
- if (ContentType == ZenContentType::kCbObject)
- {
- CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
-
- if (Error == CbValidateError::None)
- {
- return true;
- }
-
- ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error));
-
- return false;
- }
- else if (ContentType == ZenContentType::kCompressedBinary)
- {
- IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer);
-
- IoHash HeaderRawHash;
- uint64_t RawSize = 0;
- if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize))
- {
- ZEN_SCOPED_ERROR("compressed buffer header validation failed");
-
- return false;
- }
-
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize);
- CompositeBuffer Decompressed = Compressed.DecompressToComposite();
- IoHash DecompressedHash = HashBuffer(Decompressed);
-
- if (HeaderRawHash != DecompressedHash)
- {
- ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash);
-
- return false;
- }
- }
- else
- {
- // No way to verify this kind of content (what is it exactly?)
-
- static int Once = [&] {
- ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType));
- return 42;
- }();
- }
-
- return true;
-};
-
-void
-ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
-{
- ZEN_TRACE_CPU("Z$::Bucket::Scrub");
-
- ZEN_INFO("scrubbing '{}'", m_BucketDir);
-
- Stopwatch Timer;
- uint64_t ChunkCount = 0;
- uint64_t VerifiedChunkBytes = 0;
-
- auto LogStats = MakeGuard([&] {
- const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs());
-
- ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})",
- m_BucketName,
- NiceBytes(VerifiedChunkBytes),
- NiceTimeSpanMs(DurationMs),
- ChunkCount,
- NiceRate(VerifiedChunkBytes, DurationMs));
- });
-
- std::vector<IoHash> BadKeys;
- auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); };
-
- try
- {
- std::vector<BlockStoreLocation> ChunkLocations;
- std::vector<IoHash> ChunkIndexToChunkHash;
-
- RwLock::SharedLockScope _(m_IndexLock);
-
- const size_t BlockChunkInitialCount = m_Index.size() / 4;
- ChunkLocations.reserve(BlockChunkInitialCount);
- ChunkIndexToChunkHash.reserve(BlockChunkInitialCount);
-
- // Do a pass over the index and verify any standalone file values straight away
- // all other storage classes are gathered and verified in bulk in order to enable
- // more efficient I/O scheduling
-
- for (auto& Kv : m_Index)
- {
- const IoHash& HashKey = Kv.first;
- const BucketPayload& Payload = m_Payloads[Kv.second];
- const DiskLocation& Loc = Payload.Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- Ctx.ThrowIfDeadlineExpired();
-
- ++ChunkCount;
- VerifiedChunkBytes += Loc.Size();
-
- if (Loc.GetContentType() == ZenContentType::kBinary)
- {
- // Blob cache value, not much we can do about data integrity checking
- // here since there's no hash available
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
-
- RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
-
- std::error_code Ec;
- uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec);
- if (Ec)
- {
- ReportBadKey(HashKey);
- }
- if (size != Loc.Size())
- {
- ReportBadKey(HashKey);
- }
- continue;
- }
- else
- {
- // Structured cache value
- IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey);
- if (!Buffer)
- {
- ReportBadKey(HashKey);
- continue;
- }
- if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer))
- {
- ReportBadKey(HashKey);
- continue;
- }
- }
- }
- else
- {
- ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment));
- ChunkIndexToChunkHash.push_back(HashKey);
- continue;
- }
- }
-
- const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void {
- ++ChunkCount;
- VerifiedChunkBytes += Size;
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- if (!Data)
- {
- // ChunkLocation out of range of stored blocks
- ReportBadKey(Hash);
- return;
- }
- if (!Size)
- {
- ReportBadKey(Hash);
- return;
- }
- IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
- if (!Buffer)
- {
- ReportBadKey(Hash);
- return;
- }
- const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
- ZenContentType ContentType = Payload.Location.GetContentType();
- Buffer.SetContentType(ContentType);
- if (!ValidateCacheBucketEntryValue(ContentType, Buffer))
- {
- ReportBadKey(Hash);
- return;
- }
- };
-
- const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void {
- Ctx.ThrowIfDeadlineExpired();
-
- ++ChunkCount;
- VerifiedChunkBytes += Size;
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
- if (!Buffer)
- {
- ReportBadKey(Hash);
- return;
- }
- const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
- ZenContentType ContentType = Payload.Location.GetContentType();
- Buffer.SetContentType(ContentType);
- if (!ValidateCacheBucketEntryValue(ContentType, Buffer))
- {
- ReportBadKey(Hash);
- return;
- }
- };
-
- m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
- }
- catch (ScrubDeadlineExpiredException&)
- {
- ZEN_INFO("Scrubbing deadline expired, operation incomplete");
- }
-
- Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes);
-
- if (!BadKeys.empty())
- {
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir);
-
- if (Ctx.RunRecovery())
- {
- // Deal with bad chunks by removing them from our lookup map
-
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(BadKeys.size());
-
- {
- RwLock::ExclusiveLockScope __(m_IndexLock);
- for (const IoHash& BadKey : BadKeys)
- {
- // Log a tombstone and delete the in-memory index for the bad entry
- const auto It = m_Index.find(BadKey);
- const BucketPayload& Payload = m_Payloads[It->second];
- DiskLocation Location = Payload.Location;
- Location.Flags |= DiskLocation::kTombStone;
- LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
- m_Index.erase(BadKey);
- }
- }
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- ExtendablePathBuilder<256> Path;
- BuildPath(Path, Entry.Key);
- fs::path FilePath = Path.ToPath();
- RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key));
- if (fs::is_regular_file(FilePath))
- {
- ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8());
- std::error_code Ec;
- fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file...
- }
- m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
- }
- }
- m_SlogFile.Append(LogEntries);
-
- // Clean up m_AccessTimes and m_Payloads vectors
- {
- std::vector<BucketPayload> Payloads;
- std::vector<AccessTime> AccessTimes;
- IndexMap Index;
-
- {
- RwLock::ExclusiveLockScope __(m_IndexLock);
- size_t EntryCount = m_Index.size();
- Payloads.reserve(EntryCount);
- AccessTimes.reserve(EntryCount);
- Index.reserve(EntryCount);
- for (auto It : m_Index)
- {
- size_t EntryIndex = Payloads.size();
- Payloads.push_back(m_Payloads[EntryIndex]);
- AccessTimes.push_back(m_AccessTimes[EntryIndex]);
- Index.insert({It.first, EntryIndex});
- }
- m_Index.swap(Index);
- m_Payloads.swap(Payloads);
- m_AccessTimes.swap(AccessTimes);
- }
- }
- }
- }
-
- // Let whomever it concerns know about the bad chunks. This could
- // be used to invalidate higher level data structures more efficiently
- // than a full validation pass might be able to do
- if (!BadKeys.empty())
- {
- Ctx.ReportBadCidChunks(BadKeys);
- }
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences");
-
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
-
- Stopwatch TotalTimer;
- const auto _ = MakeGuard([&] {
- ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})",
- m_BucketDir,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs));
- });
-
- const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime();
-
- const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
-
- IndexMap Index;
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- {
- RwLock::SharedLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- Index = m_Index;
- AccessTimes = m_AccessTimes;
- Payloads = m_Payloads;
- }
-
- std::vector<IoHash> ExpiredKeys;
- ExpiredKeys.reserve(1024);
-
- std::vector<IoHash> Cids;
- Cids.reserve(1024);
-
- for (const auto& Entry : Index)
- {
- const IoHash& Key = Entry.first;
- GcClock::Tick AccessTime = AccessTimes[Entry.second];
- if (AccessTime < ExpireTicks)
- {
- ExpiredKeys.push_back(Key);
- continue;
- }
-
- const DiskLocation& Loc = Payloads[Entry.second].Location;
-
- if (Loc.IsFlagSet(DiskLocation::kStructured))
- {
- if (Cids.size() > 1024)
- {
- GcCtx.AddRetainedCids(Cids);
- Cids.clear();
- }
-
- IoBuffer Buffer;
- {
- RwLock::SharedLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- // We don't need to hold the index lock when we read a standalone file
- __.ReleaseNow();
- if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer)
- {
- continue;
- }
- }
- else if (Buffer = GetInlineCacheValue(Loc); !Buffer)
- {
- continue;
- }
- }
-
- ZEN_ASSERT(Buffer);
- ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
- CbObject Obj(SharedBuffer{Buffer});
- Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
- }
- }
-
- GcCtx.AddRetainedCids(Cids);
- GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys));
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage");
-
- ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir);
-
- Stopwatch TotalTimer;
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
- uint64_t TotalChunkCount = 0;
- uint64_t DeletedSize = 0;
- uint64_t OldTotalSize = TotalSize();
-
- std::unordered_set<IoHash> DeletedChunks;
- uint64_t MovedCount = 0;
-
- const auto _ = MakeGuard([&] {
- ZEN_DEBUG(
- "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved "
- "{} "
- "of {} "
- "entires ({}).",
- m_BucketDir,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs),
- NiceBytes(DeletedSize),
- DeletedChunks.size(),
- MovedCount,
- TotalChunkCount,
- NiceBytes(OldTotalSize));
- RwLock::SharedLockScope _(m_IndexLock);
- SaveManifest();
- });
-
- m_SlogFile.Flush();
-
- std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string());
- std::vector<IoHash> DeleteCacheKeys;
- DeleteCacheKeys.reserve(ExpiredCacheKeys.size());
- GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
- if (Keep)
- {
- return;
- }
- DeleteCacheKeys.push_back(ChunkHash);
- });
- if (DeleteCacheKeys.empty())
- {
- ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir);
- return;
- }
-
- auto __ = MakeGuard([&]() {
- if (!DeletedChunks.empty())
- {
- // Clean up m_AccessTimes and m_Payloads vectors
- std::vector<BucketPayload> Payloads;
- std::vector<AccessTime> AccessTimes;
- IndexMap Index;
-
- {
- RwLock::ExclusiveLockScope _(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- size_t EntryCount = m_Index.size();
- Payloads.reserve(EntryCount);
- AccessTimes.reserve(EntryCount);
- Index.reserve(EntryCount);
- for (auto It : m_Index)
- {
- size_t OldEntryIndex = It.second;
- size_t NewEntryIndex = Payloads.size();
- Payloads.push_back(m_Payloads[OldEntryIndex]);
- AccessTimes.push_back(m_AccessTimes[OldEntryIndex]);
- Index.insert({It.first, NewEntryIndex});
- }
- m_Index.swap(Index);
- m_Payloads.swap(Payloads);
- m_AccessTimes.swap(AccessTimes);
- }
- GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end()));
- }
- });
-
- std::vector<DiskIndexEntry> ExpiredStandaloneEntries;
- IndexMap Index;
- BlockStore::ReclaimSnapshotState BlockStoreState;
- {
- RwLock::SharedLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (m_Index.empty())
- {
- ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir);
- return;
- }
- BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
-
- SaveManifest();
- Index = m_Index;
-
- for (const IoHash& Key : DeleteCacheKeys)
- {
- if (auto It = Index.find(Key); It != Index.end())
- {
- const BucketPayload& Payload = m_Payloads[It->second];
- DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location};
- if (Entry.Location.Flags & DiskLocation::kStandaloneFile)
- {
- Entry.Location.Flags |= DiskLocation::kTombStone;
- ExpiredStandaloneEntries.push_back(Entry);
- }
- }
- }
- if (GcCtx.IsDeletionMode())
- {
- for (const auto& Entry : ExpiredStandaloneEntries)
- {
- m_Index.erase(Entry.Key);
- m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
- DeletedChunks.insert(Entry.Key);
- }
- m_SlogFile.Append(ExpiredStandaloneEntries);
- }
- }
-
- if (GcCtx.IsDeletionMode())
- {
- std::error_code Ec;
- ExtendablePathBuilder<256> Path;
-
- for (const auto& Entry : ExpiredStandaloneEntries)
- {
- const IoHash& Key = Entry.Key;
- const DiskLocation& Loc = Entry.Location;
-
- Path.Reset();
- BuildPath(Path, Key);
- fs::path FilePath = Path.ToPath();
-
- {
- RwLock::SharedLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (m_Index.contains(Key))
- {
- // Someone added it back, let the file on disk be
- ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
- continue;
- }
- __.ReleaseNow();
-
- RwLock::ExclusiveLockScope ValueLock(LockForHash(Key));
- if (fs::is_regular_file(FilePath))
- {
- ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
- fs::remove(FilePath, Ec);
- }
- }
-
- if (Ec)
- {
- ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
- Ec.clear();
- DiskLocation RestoreLocation = Loc;
- RestoreLocation.Flags &= ~DiskLocation::kTombStone;
-
- RwLock::ExclusiveLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- if (m_Index.contains(Key))
- {
- continue;
- }
- m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation});
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_Index.insert({Key, EntryIndex});
- m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed);
- DeletedChunks.erase(Key);
- continue;
- }
- DeletedSize += Entry.Location.Size();
- }
- }
-
- TotalChunkCount = Index.size();
-
- std::vector<IoHash> TotalChunkHashes;
- TotalChunkHashes.reserve(TotalChunkCount);
- for (const auto& Entry : Index)
- {
- const DiskLocation& Location = m_Payloads[Entry.second].Location;
-
- if (Location.Flags & DiskLocation::kStandaloneFile)
- {
- continue;
- }
- TotalChunkHashes.push_back(Entry.first);
- }
-
- if (TotalChunkHashes.empty())
- {
- return;
- }
- TotalChunkCount = TotalChunkHashes.size();
-
- std::vector<BlockStoreLocation> ChunkLocations;
- BlockStore::ChunkIndexArray KeepChunkIndexes;
- std::vector<IoHash> ChunkIndexToChunkHash;
- ChunkLocations.reserve(TotalChunkCount);
- ChunkLocations.reserve(TotalChunkCount);
- ChunkIndexToChunkHash.reserve(TotalChunkCount);
-
- GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
- auto KeyIt = Index.find(ChunkHash);
- const DiskLocation& DiskLocation = m_Payloads[KeyIt->second].Location;
- BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment);
- size_t ChunkIndex = ChunkLocations.size();
- ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
- if (Keep)
- {
- KeepChunkIndexes.push_back(ChunkIndex);
- }
- });
-
- size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size();
-
- const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
- if (!PerformDelete)
- {
- m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
- uint64_t CurrentTotalSize = TotalSize();
- ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}",
- m_BucketDir,
- DeleteCount,
- TotalChunkCount,
- NiceBytes(CurrentTotalSize));
- return;
- }
-
- m_BlockStore.ReclaimSpace(
- BlockStoreState,
- ChunkLocations,
- KeepChunkIndexes,
- m_PayloadAlignment,
- false,
- [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]];
- const DiskLocation& OldDiskLocation = OldPayload.Location;
- LogEntries.push_back(
- {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())});
- }
- for (const size_t ChunkIndex : RemovedChunks)
- {
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]];
- const DiskLocation& OldDiskLocation = OldPayload.Location;
- LogEntries.push_back({.Key = ChunkHash,
- .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment),
- m_PayloadAlignment,
- OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
- DeletedChunks.insert(ChunkHash);
- }
-
- m_SlogFile.Append(LogEntries);
- m_SlogFile.Flush();
- {
- RwLock::ExclusiveLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- if (Entry.Location.GetFlags() & DiskLocation::kTombStone)
- {
- m_Index.erase(Entry.Key);
- continue;
- }
- m_Payloads[m_Index[Entry.Key]].Location = Entry.Location;
- }
- }
- },
- [&]() { return GcCtx.CollectSmallObjects(); });
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes)
-{
- using namespace access_tracking;
-
- for (const KeyAccessTime& KeyTime : AccessTimes)
- {
- if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_AccessTimes[EntryIndex] = KeyTime.LastAccess;
- }
- }
-}
-
-uint64_t
-ZenCacheDiskLayer::CacheBucket::EntryCount() const
-{
- RwLock::SharedLockScope _(m_IndexLock);
- return static_cast<uint64_t>(m_Index.size());
-}
-
-CacheValueDetails::ValueDetails
-ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index) const
-{
- std::vector<IoHash> Attachments;
- const BucketPayload& Payload = m_Payloads[Index];
- if (Payload.Location.IsFlagSet(DiskLocation::kStructured))
- {
- IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key)
- : GetInlineCacheValue(Payload.Location);
- CbObject Obj(SharedBuffer{Value});
- Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); });
- }
- return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(),
- .RawSize = Payload.RawSize,
- .RawHash = Payload.RawHash,
- .LastAccess = m_AccessTimes[Index],
- .Attachments = std::move(Attachments),
- .ContentType = Payload.Location.GetContentType()};
-}
-
-CacheValueDetails::BucketDetails
-ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const
-{
- CacheValueDetails::BucketDetails Details;
- RwLock::SharedLockScope _(m_IndexLock);
- if (ValueFilter.empty())
- {
- Details.Values.reserve(m_Index.size());
- for (const auto& It : m_Index)
- {
- Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second));
- }
- }
- else
- {
- IoHash Key = IoHash::FromHexString(ValueFilter);
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second));
- }
- }
- return Details;
-}
-
-void
-ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- CacheBucket& Bucket = *Kv.second;
- Bucket.CollectGarbage(GcCtx);
- }
-}
-
-void
-ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (const auto& Kv : AccessTimes.Buckets)
- {
- if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end())
- {
- CacheBucket& Bucket = *It->second;
- Bucket.UpdateAccessTimes(Kv.second);
- }
- }
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
-{
- ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue");
-
- uint64_t NewFileSize = Value.Value.Size();
-
- TemporaryFile DataFile;
-
- std::error_code Ec;
- DataFile.CreateTemporary(m_BucketDir.c_str(), Ec);
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir));
- }
-
- bool CleanUpTempFile = false;
- auto __ = MakeGuard([&] {
- if (CleanUpTempFile)
- {
- std::error_code Ec;
- std::filesystem::remove(DataFile.GetPath(), Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'",
- DataFile.GetPath(),
- m_BucketDir,
- Ec.message());
- }
- }
- });
-
- DataFile.WriteAll(Value.Value, Ec);
- if (Ec)
- {
- throw std::system_error(Ec,
- fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'",
- NiceBytes(NewFileSize),
- DataFile.GetPath().string(),
- m_BucketDir));
- }
-
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
- std::filesystem::path FsPath{DataFilePath.ToPath()};
-
- RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
-
- // We do a speculative remove of the file instead of probing with a exists call and check the error code instead
- std::filesystem::remove(FsPath, Ec);
- if (Ec)
- {
- if (Ec.value() != ENOENT)
- {
- ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message());
- Sleep(100);
- Ec.clear();
- std::filesystem::remove(FsPath, Ec);
- if (Ec && Ec.value() != ENOENT)
- {
- throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir));
- }
- }
- }
-
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
- if (Ec)
- {
- CreateDirectories(FsPath.parent_path());
- Ec.clear();
-
- // Try again
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retrying.",
- FsPath,
- DataFile.GetPath(),
- m_BucketDir,
- Ec.message());
- Sleep(100);
- Ec.clear();
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
- if (Ec)
- {
- throw std::system_error(
- Ec,
- fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir));
- }
- }
- }
-
- // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file
- // will be disabled as the file handle has already been closed
- CleanUpTempFile = false;
-
- uint8_t EntryFlags = DiskLocation::kStandaloneFile;
-
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- EntryFlags |= DiskLocation::kCompressed;
- }
-
- DiskLocation Loc(NewFileSize, EntryFlags);
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
- if (auto It = m_Index.find(HashKey); It == m_Index.end())
- {
- // Previously unknown object
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_Index.insert_or_assign(HashKey, EntryIndex);
- }
- else
- {
- // TODO: should check if write is idempotent and bail out if it is?
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash};
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed);
- }
-
- m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed);
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
-{
- uint8_t EntryFlags = 0;
-
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- EntryFlags |= DiskLocation::kCompressed;
- }
-
- m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) {
- DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
- m_SlogFile.Append({.Key = HashKey, .Location = Location});
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
- if (auto It = m_Index.find(HashKey); It != m_Index.end())
- {
- // TODO: should check if write is idempotent and bail out if it is?
- // this would requiring comparing contents on disk unless we add a
- // content hash to the index entry
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
- m_AccessTimes[EntryIndex] = GcClock::TickCount();
- }
- else
- {
- size_t EntryIndex = m_Payloads.size();
- m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
- m_AccessTimes.emplace_back(GcClock::TickCount());
- m_Index.insert_or_assign(HashKey, EntryIndex);
- }
- });
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir)
-{
-}
-
-ZenCacheDiskLayer::~ZenCacheDiskLayer() = default;
-
-bool
-ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- const auto BucketName = std::string(InBucket);
- CacheBucket* Bucket = nullptr;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- auto It = m_Buckets.find(BucketName);
-
- if (It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- }
-
- if (Bucket == nullptr)
- {
- // Bucket needs to be opened/created
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- else
- {
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
- Bucket = InsertResult.first->second.get();
-
- std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= BucketName;
-
- if (!Bucket->OpenOrCreate(BucketPath))
- {
- m_Buckets.erase(InsertResult.first);
- return false;
- }
- }
- }
-
- ZEN_ASSERT(Bucket != nullptr);
- return Bucket->Get(HashKey, OutValue);
-}
-
-void
-ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
-{
- const auto BucketName = std::string(InBucket);
- CacheBucket* Bucket = nullptr;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- auto It = m_Buckets.find(BucketName);
-
- if (It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- }
-
- if (Bucket == nullptr)
- {
- // New bucket needs to be created
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- else
- {
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
- Bucket = InsertResult.first->second.get();
-
- std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= BucketName;
-
- try
- {
- if (!Bucket->OpenOrCreate(BucketPath))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
- m_Buckets.erase(InsertResult.first);
- return;
- }
- }
- catch (const std::exception& Err)
- {
- ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- return;
- }
- }
- }
-
- ZEN_ASSERT(Bucket != nullptr);
-
- Bucket->Put(HashKey, Value);
-}
-
-void
-ZenCacheDiskLayer::DiscoverBuckets()
-{
- DirectoryContent DirContent;
- GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent);
-
- // Initialize buckets
-
- RwLock::ExclusiveLockScope _(m_Lock);
-
- for (const std::filesystem::path& BucketPath : DirContent.Directories)
- {
- const std::string BucketName = PathToUtf8(BucketPath.stem());
- // New bucket needs to be created
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
- {
- continue;
- }
-
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
- CacheBucket& Bucket = *InsertResult.first->second;
-
- try
- {
- if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
-
- m_Buckets.erase(InsertResult.first);
- continue;
- }
- }
- catch (const std::exception& Err)
- {
- ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- return;
- }
- ZEN_INFO("Discovered bucket '{}'", BucketName);
- }
-}
-
-bool
-ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- auto It = m_Buckets.find(std::string(InBucket));
-
- if (It != m_Buckets.end())
- {
- CacheBucket& Bucket = *It->second;
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It);
-
- return Bucket.Drop();
- }
-
- // Make sure we remove the folder even if we don't know about the bucket
- std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= std::string(InBucket);
- return MoveAndDeleteDirectory(BucketPath);
-}
-
-bool
-ZenCacheDiskLayer::Drop()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- std::vector<std::unique_ptr<CacheBucket>> Buckets;
- Buckets.reserve(m_Buckets.size());
- while (!m_Buckets.empty())
- {
- const auto& It = m_Buckets.begin();
- CacheBucket& Bucket = *It->second;
- m_DroppedBuckets.push_back(std::move(It->second));
- m_Buckets.erase(It->first);
- if (!Bucket.Drop())
- {
- return false;
- }
- }
- return MoveAndDeleteDirectory(m_RootDir);
-}
-
-void
-ZenCacheDiskLayer::Flush()
-{
- std::vector<CacheBucket*> Buckets;
-
- {
- RwLock::SharedLockScope _(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- CacheBucket* Bucket = Kv.second.get();
- Buckets.push_back(Bucket);
- }
- }
-
- for (auto& Bucket : Buckets)
- {
- Bucket->Flush();
- }
-}
-
-void
-ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- {
- std::vector<std::future<void>> Results;
- Results.reserve(m_Buckets.size());
-
- for (auto& Kv : m_Buckets)
- {
-#if 1
- Results.push_back(
- Ctx.ThreadPool().EnqueueTask(std::packaged_task<void()>{[Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}));
-#else
- CacheBucket& Bucket = *Kv.second;
- Bucket.ScrubStorage(Ctx);
-#endif
- }
-
- for (auto& Result : Results)
- {
- Result.get();
- }
- }
-}
-
-void
-ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- CacheBucket& Bucket = *Kv.second;
- Bucket.GatherReferences(GcCtx);
- }
-}
-
-uint64_t
-ZenCacheDiskLayer::TotalSize() const
-{
- uint64_t TotalSize{};
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_Buckets)
- {
- TotalSize += Kv.second->TotalSize();
- }
-
- return TotalSize;
-}
-
-ZenCacheDiskLayer::Info
-ZenCacheDiskLayer::GetInfo() const
-{
- ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir}, .TotalSize = TotalSize()};
-
- RwLock::SharedLockScope _(m_Lock);
- Info.BucketNames.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Info.BucketNames.push_back(Kv.first);
- Info.EntryCount += Kv.second->EntryCount();
- }
- return Info;
-}
-
-std::optional<ZenCacheDiskLayer::BucketInfo>
-ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const
-{
- RwLock::SharedLockScope _(m_Lock);
-
- if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end())
- {
- return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()};
- }
- return {};
-}
-
-CacheValueDetails::NamespaceDetails
-ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const
-{
- RwLock::SharedLockScope _(m_Lock);
- CacheValueDetails::NamespaceDetails Details;
- if (BucketFilter.empty())
- {
- Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1);
- for (auto& Kv : m_Buckets)
- {
- Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter);
- }
- }
- else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end())
- {
- Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter);
- }
- return Details;
-}
-
//////////////////////////// ZenCacheStore
ZEN_DEFINE_LOG_CATEGORY_STATIC(LogCacheActivity, "z$");