diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /zenserver/cache/structuredcachestore.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 3648 |
1 files changed, 0 insertions, 3648 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp deleted file mode 100644 index 26e970073..000000000 --- a/zenserver/cache/structuredcachestore.cpp +++ /dev/null @@ -1,3648 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "structuredcachestore.h" - -#include <zencore/except.h> - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/compress.h> -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zencore/string.h> -#include <zencore/thread.h> -#include <zencore/timer.h> -#include <zencore/trace.h> -#include <zenstore/cidstore.h> -#include <zenstore/scrubcontext.h> - -#include <xxhash.h> - -#include <limits> - -#if ZEN_PLATFORM_WINDOWS -# include <zencore/windows.h> -#endif - -ZEN_THIRD_PARTY_INCLUDES_START -#include <fmt/core.h> -#include <gsl/gsl-lite.hpp> -ZEN_THIRD_PARTY_INCLUDES_END - -#if ZEN_WITH_TESTS -# include <zencore/testing.h> -# include <zencore/testutils.h> -# include <zencore/workthreadpool.h> -# include <random> -#endif - -////////////////////////////////////////////////////////////////////////// - -namespace zen { - -namespace { - -#pragma pack(push) -#pragma pack(1) - - struct CacheBucketIndexHeader - { - static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; - static constexpr uint32_t Version2 = 2; - static constexpr uint32_t CurrentVersion = Version2; - - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - uint64_t EntryCount = 0; - uint64_t LogPosition = 0; - uint32_t PayloadAlignment = 0; - uint32_t Checksum = 0; - - static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) - { - return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); - } - }; - - static_assert(sizeof(CacheBucketIndexHeader) == 32); - -#pragma pack(pop) - - const char* IndexExtension = ".uidx"; - const char* LogExtension = ".slog"; - - std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - return BucketDir / (BucketName + IndexExtension); - } - - std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - return BucketDir / (BucketName + ".tmp"); - } - - std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - return BucketDir / (BucketName + LogExtension); - } - - bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason) - { - if (Entry.Key == IoHash::Zero) - { - OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); - return false; - } - if (Entry.Location.GetFlags() & - ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) - { - OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); - return false; - } - if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) - { - return true; - } - if (Entry.Location.Reserved != 0) - { - OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); - return false; - } - uint64_t Size = Entry.Location.Size(); - if (Size == 0) - { - OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); - return false; - } - return true; - } - - bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) - { - int DropIndex = 0; - do - { - if (!std::filesystem::exists(Dir)) - { - return false; - } - - std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); - std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; - if (std::filesystem::exists(DroppedBucketPath)) - { - DropIndex++; - continue; - } - - std::error_code Ec; - std::filesystem::rename(Dir, DroppedBucketPath, Ec); - if (!Ec) - { - DeleteDirectories(DroppedBucketPath); - return true; - } - // TODO: Do we need to bail at some point? - zen::Sleep(100); - } while (true); - } - -} // namespace - -namespace fs = std::filesystem; - -static CbObject -LoadCompactBinaryObject(const fs::path& Path) -{ - FileContents Result = ReadFile(Path); - - if (!Result.ErrorCode) - { - IoBuffer Buffer = Result.Flatten(); - if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) - { - return LoadCompactBinaryObject(Buffer); - } - } - - return CbObject(); -} - -static void -SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) -{ - WriteFile(Path, Object.GetBuffer().AsIoBuffer()); -} - -ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir) -: GcStorage(Gc) -, GcContributor(Gc) -, m_RootDir(RootDir) -, m_DiskLayer(RootDir) -{ - ZEN_INFO("initializing structured cache at '{}'", RootDir); - CreateDirectories(RootDir); - - m_DiskLayer.DiscoverBuckets(); - -#if ZEN_USE_CACHE_TRACKER - m_AccessTracker.reset(new ZenCacheTracker(RootDir)); -#endif -} - -ZenCacheNamespace::~ZenCacheNamespace() -{ -} - -bool -ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - ZEN_TRACE_CPU("Z$::Get"); - - bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); - -#if ZEN_USE_CACHE_TRACKER - auto _ = MakeGuard([&] { - if (!Ok) - return; - - m_AccessTracker->TrackAccess(InBucket, HashKey); - }); -#endif - - if (Ok) - { - ZEN_ASSERT(OutValue.Value.Size()); - - return true; - } - - Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); - - if (Ok) - { - ZEN_ASSERT(OutValue.Value.Size()); - - if (OutValue.Value.Size() <= m_DiskLayerSizeThreshold) - { - m_MemLayer.Put(InBucket, HashKey, OutValue); - } - } - - return Ok; -} - -void -ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) -{ - ZEN_TRACE_CPU("Z$::Put"); - - // Store value and index - - ZEN_ASSERT(Value.Value.Size()); - - m_DiskLayer.Put(InBucket, HashKey, Value); - -#if ZEN_USE_REF_TRACKING - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - if (ValidateCompactBinary(Value.Value, CbValidateMode::All) == CbValidateError::None) - { - CbObject Object{SharedBuffer(Value.Value)}; - - uint8_t TempBuffer[8 * sizeof(IoHash)]; - std::pmr::monotonic_buffer_resource Linear{TempBuffer, sizeof TempBuffer}; - std::pmr::polymorphic_allocator Allocator{&Linear}; - std::pmr::vector<IoHash> CidReferences{Allocator}; - - Object.IterateAttachments([&](CbFieldView Field) { CidReferences.push_back(Field.AsAttachment()); }); - - m_Gc.OnNewCidReferences(CidReferences); - } - } -#endif - - if (Value.Value.Size() <= m_DiskLayerSizeThreshold) - { - m_MemLayer.Put(InBucket, HashKey, Value); - } -} - -bool -ZenCacheNamespace::DropBucket(std::string_view Bucket) -{ - ZEN_INFO("dropping bucket '{}'", Bucket); - - // TODO: should ensure this is done atomically across all layers - - const bool MemDropped = m_MemLayer.DropBucket(Bucket); - const bool DiskDropped = m_DiskLayer.DropBucket(Bucket); - const bool AnyDropped = MemDropped || DiskDropped; - - ZEN_INFO("bucket '{}' was {}", Bucket, AnyDropped ? "dropped" : "not found"); - - return AnyDropped; -} - -bool -ZenCacheNamespace::Drop() -{ - m_MemLayer.Drop(); - return m_DiskLayer.Drop(); -} - -void -ZenCacheNamespace::Flush() -{ - m_DiskLayer.Flush(); -} - -void -ZenCacheNamespace::Scrub(ScrubContext& Ctx) -{ - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_DiskLayer.Scrub(Ctx); - m_MemLayer.Scrub(Ctx); -} - -void -ZenCacheNamespace::GatherReferences(GcContext& GcCtx) -{ - Stopwatch Timer; - const auto Guard = - MakeGuard([&] { ZEN_DEBUG("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - - access_tracking::AccessTimes AccessTimes; - m_MemLayer.GatherAccessTimes(AccessTimes); - - m_DiskLayer.UpdateAccessTimes(AccessTimes); - m_DiskLayer.GatherReferences(GcCtx); -} - -void -ZenCacheNamespace::CollectGarbage(GcContext& GcCtx) -{ - m_MemLayer.Reset(); - m_DiskLayer.CollectGarbage(GcCtx); -} - -GcStorageSize -ZenCacheNamespace::StorageSize() const -{ - return {.DiskSize = m_DiskLayer.TotalSize(), .MemorySize = m_MemLayer.TotalSize()}; -} - -ZenCacheNamespace::Info -ZenCacheNamespace::GetInfo() const -{ - ZenCacheNamespace::Info Info = {.Config = {.RootDir = m_RootDir, .DiskLayerThreshold = m_DiskLayerSizeThreshold}, - .DiskLayerInfo = m_DiskLayer.GetInfo(), - .MemoryLayerInfo = m_MemLayer.GetInfo()}; - std::unordered_set<std::string> BucketNames; - for (const std::string& BucketName : Info.DiskLayerInfo.BucketNames) - { - BucketNames.insert(BucketName); - } - for (const std::string& BucketName : Info.MemoryLayerInfo.BucketNames) - { - BucketNames.insert(BucketName); - } - Info.BucketNames.insert(Info.BucketNames.end(), BucketNames.begin(), BucketNames.end()); - return Info; -} - -std::optional<ZenCacheNamespace::BucketInfo> -ZenCacheNamespace::GetBucketInfo(std::string_view Bucket) const -{ - std::optional<ZenCacheDiskLayer::BucketInfo> DiskBucketInfo = m_DiskLayer.GetBucketInfo(Bucket); - if (!DiskBucketInfo.has_value()) - { - return {}; - } - ZenCacheNamespace::BucketInfo Info = {.DiskLayerInfo = *DiskBucketInfo, - .MemoryLayerInfo = m_MemLayer.GetBucketInfo(Bucket).value_or(ZenCacheMemoryLayer::BucketInfo{})}; - return Info; -} - -CacheValueDetails::NamespaceDetails -ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const -{ - return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter); -} - -////////////////////////////////////////////////////////////////////////// - -ZenCacheMemoryLayer::ZenCacheMemoryLayer() -{ -} - -ZenCacheMemoryLayer::~ZenCacheMemoryLayer() -{ -} - -bool -ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It == m_Buckets.end()) - { - return false; - } - - CacheBucket* Bucket = It->second.get(); - - _.ReleaseNow(); - - // There's a race here. Since the lock is released early to allow - // inserts, the bucket delete path could end up deleting the - // underlying data structure - - return Bucket->Get(HashKey, OutValue); -} - -void -ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) -{ - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - } - - if (Bucket == nullptr) - { - // New bucket - - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>()); - Bucket = InsertResult.first->second.get(); - } - } - - // Note that since the underlying IoBuffer is retained, the content type is also - - Bucket->Put(HashKey, Value); -} - -bool -ZenCacheMemoryLayer::DropBucket(std::string_view InBucket) -{ - RwLock::ExclusiveLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It); - Bucket.Drop(); - return true; - } - return false; -} - -void -ZenCacheMemoryLayer::Drop() -{ - RwLock::ExclusiveLockScope _(m_Lock); - std::vector<std::unique_ptr<CacheBucket>> Buckets; - Buckets.reserve(m_Buckets.size()); - while (!m_Buckets.empty()) - { - const auto& It = m_Buckets.begin(); - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It->first); - Bucket.Drop(); - } -} - -void -ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - Kv.second->Scrub(Ctx); - } -} - -void -ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes) -{ - using namespace zen::access_tracking; - - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first]; - Kv.second->GatherAccessTimes(Bucket); - } -} - -void -ZenCacheMemoryLayer::Reset() -{ - RwLock::ExclusiveLockScope _(m_Lock); - m_Buckets.clear(); -} - -uint64_t -ZenCacheMemoryLayer::TotalSize() const -{ - uint64_t TotalSize{}; - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - TotalSize += Kv.second->TotalSize(); - } - - return TotalSize; -} - -ZenCacheMemoryLayer::Info -ZenCacheMemoryLayer::GetInfo() const -{ - ZenCacheMemoryLayer::Info Info = {.Config = m_Configuration, .TotalSize = TotalSize()}; - - RwLock::SharedLockScope _(m_Lock); - Info.BucketNames.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Info.BucketNames.push_back(Kv.first); - Info.EntryCount += Kv.second->EntryCount(); - } - return Info; -} - -std::optional<ZenCacheMemoryLayer::BucketInfo> -ZenCacheMemoryLayer::GetBucketInfo(std::string_view Bucket) const -{ - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) - { - return ZenCacheMemoryLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; - } - return {}; -} - -void -ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_BucketLock); - - std::vector<IoHash> BadHashes; - - auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { - if (ContentType == ZenContentType::kCbObject) - { - CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); - return Error == CbValidateError::None; - } - if (ContentType == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - return false; - } - if (Hash != RawHash) - { - return false; - } - } - return true; - }; - - for (auto& Kv : m_CacheMap) - { - const BucketPayload& Payload = m_Payloads[Kv.second]; - if (!ValidateEntry(Kv.first, Payload.Payload.GetContentType(), Payload.Payload)) - { - BadHashes.push_back(Kv.first); - } - } - - if (!BadHashes.empty()) - { - Ctx.ReportBadCidChunks(BadHashes); - } -} - -void -ZenCacheMemoryLayer::CacheBucket::GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) -{ - RwLock::SharedLockScope _(m_BucketLock); - std::transform(m_CacheMap.begin(), m_CacheMap.end(), std::back_inserter(AccessTimes), [this](const auto& Kv) { - return access_tracking::KeyAccessTime{.Key = Kv.first, .LastAccess = m_AccessTimes[Kv.second]}; - }); -} - -bool -ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) -{ - RwLock::SharedLockScope _(m_BucketLock); - - if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) - { - uint32_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); - - const BucketPayload& Payload = m_Payloads[EntryIndex]; - OutValue = {.Value = Payload.Payload, .RawSize = Payload.RawSize, .RawHash = Payload.RawHash}; - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - - return true; - } - - return false; -} - -void -ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) -{ - size_t PayloadSize = Value.Value.GetSize(); - { - GcClock::Tick AccessTime = GcClock::TickCount(); - RwLock::ExclusiveLockScope _(m_BucketLock); - if (m_CacheMap.size() == std::numeric_limits<uint32_t>::max()) - { - // No more space in our memory cache! - return; - } - if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end()) - { - uint32_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - m_TotalSize.fetch_sub(PayloadSize, std::memory_order::relaxed); - BucketPayload& Payload = m_Payloads[EntryIndex]; - Payload.Payload = Value.Value; - Payload.RawHash = Value.RawHash; - Payload.RawSize = gsl::narrow<uint32_t>(Value.RawSize); - m_AccessTimes[EntryIndex] = AccessTime; - } - else - { - uint32_t EntryIndex = gsl::narrow<uint32_t>(m_Payloads.size()); - m_Payloads.emplace_back( - BucketPayload{.Payload = Value.Value, .RawSize = gsl::narrow<uint32_t>(Value.RawSize), .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(AccessTime); - m_CacheMap.insert_or_assign(HashKey, EntryIndex); - } - ZEN_ASSERT_SLOW(m_Payloads.size() == m_CacheMap.size()); - ZEN_ASSERT_SLOW(m_AccessTimes.size() == m_Payloads.size()); - } - - m_TotalSize.fetch_add(PayloadSize, std::memory_order::relaxed); -} - -void -ZenCacheMemoryLayer::CacheBucket::Drop() -{ - RwLock::ExclusiveLockScope _(m_BucketLock); - m_CacheMap.clear(); - m_AccessTimes.clear(); - m_Payloads.clear(); - m_TotalSize.store(0); -} - -uint64_t -ZenCacheMemoryLayer::CacheBucket::EntryCount() const -{ - RwLock::SharedLockScope _(m_BucketLock); - return static_cast<uint64_t>(m_CacheMap.size()); -} - -////////////////////////////////////////////////////////////////////////// - -ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero) -{ -} - -ZenCacheDiskLayer::CacheBucket::~CacheBucket() -{ -} - -bool -ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) -{ - using namespace std::literals; - - m_BlocksBasePath = BucketDir / "blocks"; - m_BucketDir = BucketDir; - - CreateDirectories(m_BucketDir); - - std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; - - bool IsNew = false; - - CbObject Manifest = LoadCompactBinaryObject(ManifestPath); - - if (Manifest) - { - m_BucketId = Manifest["BucketId"sv].AsObjectId(); - if (m_BucketId == Oid::Zero) - { - return false; - } - } - else if (AllowCreate) - { - m_BucketId.Generate(); - - CbObjectWriter Writer; - Writer << "BucketId"sv << m_BucketId; - Manifest = Writer.Save(); - SaveCompactBinaryObject(ManifestPath, Manifest); - IsNew = true; - } - else - { - return false; - } - - OpenLog(IsNew); - - if (!IsNew) - { - Stopwatch Timer; - const auto _ = - MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - - for (CbFieldView Entry : Manifest["Timestamps"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); - } - } - for (CbFieldView Entry : Manifest["RawInfo"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - m_Payloads[EntryIndex].RawHash = Obj["RawHash"sv].AsHash(); - m_Payloads[EntryIndex].RawSize = Obj["RawSize"sv].AsUInt64(); - } - } - } - - return true; -} - -void -ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot() -{ - uint64_t LogCount = m_SlogFile.GetLogCount(); - if (m_LogFlushPosition == LogCount) - { - return; - } - - ZEN_DEBUG("write store snapshot for '{}'", m_BucketDir / m_BucketName); - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", - m_BucketDir / m_BucketName, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - namespace fs = std::filesystem; - - fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(STmpIndexPath); - } - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, STmpIndexPath); - } - - try - { - // Write the current state of the location map to a new index state - std::vector<DiskIndexEntry> Entries; - - { - Entries.resize(m_Index.size()); - - uint64_t EntryIndex = 0; - for (auto& Entry : m_Index) - { - DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Location = m_Payloads[Entry.second].Location; - } - } - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); - CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), - .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; - - Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); - - ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); - ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); - ObjectIndexFile.Flush(); - ObjectIndexFile.Close(); - EntryCount = Entries.size(); - m_LogFlushPosition = LogCount; - } - catch (std::exception& Err) - { - ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); - - // Restore any previous snapshot - - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(IndexPath); - fs::rename(STmpIndexPath, IndexPath); - } - } - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(STmpIndexPath); - } -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) -{ - if (std::filesystem::is_regular_file(IndexPath)) - { - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CacheBucketIndexHeader)) - { - CacheBucketIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && - (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0)) - { - switch (Header.Version) - { - case CacheBucketIndexHeader::Version2: - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); - if (Header.EntryCount > ExpectedEntryCount) - { - break; - } - size_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", - IndexPath, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - m_PayloadAlignment = Header.PayloadAlignment; - - std::vector<DiskIndexEntry> Entries; - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), - Header.EntryCount * sizeof(DiskIndexEntry), - sizeof(CacheBucketIndexHeader)); - - m_Payloads.reserve(Header.EntryCount); - m_AccessTimes.reserve(Header.EntryCount); - m_Index.reserve(Header.EntryCount); - - std::string InvalidEntryReason; - for (const DiskIndexEntry& Entry : Entries) - { - if (!ValidateEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(Entry.Key, EntryIndex); - EntryCount++; - } - OutVersion = CacheBucketIndexHeader::Version2; - return Header.LogPosition; - } - break; - default: - break; - } - } - } - ZEN_WARN("skipping invalid index file '{}'", IndexPath); - } - return 0; -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) -{ - if (std::filesystem::is_regular_file(LogPath)) - { - uint64_t LogEntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - TCasLogFile<DiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - if (CasLog.Initialize()) - { - uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); - SkipEntryCount = 0; - } - LogEntryCount = EntryCount - SkipEntryCount; - m_Index.reserve(LogEntryCount); - uint64_t InvalidEntryCount = 0; - CasLog.Replay( - [&](const DiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Location.Flags & DiskLocation::kTombStone) - { - m_Index.erase(Record.Key); - return; - } - if (!ValidateEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); - ++InvalidEntryCount; - return; - } - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(Record.Key, EntryIndex); - }, - SkipEntryCount); - if (InvalidEntryCount) - { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); - } - return LogEntryCount; - } - } - return 0; -}; - -void -ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) -{ - m_TotalStandaloneSize = 0; - - m_Index.clear(); - m_Payloads.clear(); - m_AccessTimes.clear(); - - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); - std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - - if (IsNew) - { - fs::remove(LogPath); - fs::remove(IndexPath); - fs::remove_all(m_BlocksBasePath); - } - - uint64_t LogEntryCount = 0; - { - uint32_t IndexVersion = 0; - m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); - if (IndexVersion == 0 && std::filesystem::is_regular_file(IndexPath)) - { - ZEN_WARN("removing invalid index file at '{}'", IndexPath); - fs::remove(IndexPath); - } - - if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath)) - { - LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); - } - else - { - ZEN_WARN("removing invalid cas log at '{}'", LogPath); - fs::remove(LogPath); - } - } - - CreateDirectories(m_BucketDir); - - m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - - std::vector<BlockStoreLocation> KnownLocations; - KnownLocations.reserve(m_Index.size()); - for (const auto& Entry : m_Index) - { - size_t EntryIndex = Entry.second; - const BucketPayload& Payload = m_Payloads[EntryIndex]; - const DiskLocation& Location = Payload.Location; - - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); - continue; - } - const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); - KnownLocations.push_back(BlockLocation); - } - - m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); - if (IsNew || LogEntryCount > 0) - { - MakeIndexSnapshot(); - } - // TODO: should validate integrity of container files here -} - -void -ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const -{ - char HexString[sizeof(HashKey.Hash) * 2]; - ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); - - Path.Append(m_BucketDir); - Path.AppendSeparator(); - Path.Append(L"blob"); - Path.AppendSeparator(); - Path.AppendAsciiRange(HexString, HexString + 3); - Path.AppendSeparator(); - Path.AppendAsciiRange(HexString + 3, HexString + 5); - Path.AppendSeparator(); - Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); -} - -IoBuffer -ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const -{ - BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); - - IoBuffer Value = m_BlockStore.TryGetChunk(Location); - if (Value) - { - Value.SetContentType(Loc.GetContentType()); - } - - return Value; -} - -IoBuffer -ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const -{ - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - - RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); - - if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) - { - Data.SetContentType(Loc.GetContentType()); - - return Data; - } - - return {}; -} - -bool -ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) -{ - RwLock::SharedLockScope _(m_IndexLock); - auto It = m_Index.find(HashKey); - if (It == m_Index.end()) - { - return false; - } - size_t EntryIndex = It.value(); - const BucketPayload& Payload = m_Payloads[EntryIndex]; - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - DiskLocation Location = Payload.Location; - OutValue.RawSize = Payload.RawSize; - OutValue.RawHash = Payload.RawHash; - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - // We don't need to hold the index lock when we read a standalone file - _.ReleaseNow(); - OutValue.Value = GetStandaloneCacheValue(Location, HashKey); - } - else - { - OutValue.Value = GetInlineCacheValue(Location); - } - _.ReleaseNow(); - - if (!Location.IsFlagSet(DiskLocation::kStructured)) - { - if (OutValue.RawHash == IoHash::Zero && OutValue.RawSize == 0 && OutValue.Value.GetSize() > 0) - { - if (Location.IsFlagSet(DiskLocation::kCompressed)) - { - (void)CompressedBuffer::FromCompressed(SharedBuffer(OutValue.Value), OutValue.RawHash, OutValue.RawSize); - } - else - { - OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); - OutValue.RawSize = OutValue.Value.GetSize(); - } - RwLock::ExclusiveLockScope __(m_IndexLock); - if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) - { - BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; - WritePayload.RawHash = OutValue.RawHash; - WritePayload.RawSize = OutValue.RawSize; - - m_LogFlushPosition = 0; // Force resave of index on exit - } - } - } - - return (bool)OutValue.Value; -} - -void -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) -{ - if (Value.Value.Size() >= m_LargeObjectThreshold) - { - return PutStandaloneCacheValue(HashKey, Value); - } - PutInlineCacheValue(HashKey, Value); -} - -bool -ZenCacheDiskLayer::CacheBucket::Drop() -{ - RwLock::ExclusiveLockScope _(m_IndexLock); - - std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks; - ShardLocks.reserve(256); - for (RwLock& Lock : m_ShardedLocks) - { - ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock)); - } - m_BlockStore.Close(); - m_SlogFile.Close(); - - bool Deleted = MoveAndDeleteDirectory(m_BucketDir); - - m_Index.clear(); - m_Payloads.clear(); - m_AccessTimes.clear(); - return Deleted; -} - -void -ZenCacheDiskLayer::CacheBucket::Flush() -{ - m_BlockStore.Flush(); - - RwLock::SharedLockScope _(m_IndexLock); - m_SlogFile.Flush(); - MakeIndexSnapshot(); - SaveManifest(); -} - -void -ZenCacheDiskLayer::CacheBucket::SaveManifest() -{ - using namespace std::literals; - - CbObjectWriter Writer; - Writer << "BucketId"sv << m_BucketId; - - if (!m_Index.empty()) - { - Writer.BeginArray("Timestamps"sv); - for (auto& Kv : m_Index) - { - const IoHash& Key = Kv.first; - GcClock::Tick AccessTime = m_AccessTimes[Kv.second]; - - Writer.BeginObject(); - Writer << "Key"sv << Key; - Writer << "LastAccess"sv << AccessTime; - Writer.EndObject(); - } - Writer.EndArray(); - - Writer.BeginArray("RawInfo"sv); - { - for (auto& Kv : m_Index) - { - const IoHash& Key = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - if (Payload.RawHash != IoHash::Zero) - { - Writer.BeginObject(); - Writer << "Key"sv << Key; - Writer << "RawHash"sv << Payload.RawHash; - Writer << "RawSize"sv << Payload.RawSize; - Writer.EndObject(); - } - } - } - Writer.EndArray(); - } - - SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); -} - -void -ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) -{ - std::vector<IoHash> BadKeys; - uint64_t ChunkCount{0}, ChunkBytes{0}; - std::vector<BlockStoreLocation> ChunkLocations; - std::vector<IoHash> ChunkIndexToChunkHash; - - auto ValidateEntry = [](const IoHash& Hash, ZenContentType ContentType, IoBuffer Buffer) { - if (ContentType == ZenContentType::kCbObject) - { - CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); - return Error == CbValidateError::None; - } - if (ContentType == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) - { - return false; - } - if (RawHash != Hash) - { - return false; - } - } - return true; - }; - - RwLock::SharedLockScope _(m_IndexLock); - - const size_t BlockChunkInitialCount = m_Index.size() / 4; - ChunkLocations.reserve(BlockChunkInitialCount); - ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); - - for (auto& Kv : m_Index) - { - const IoHash& HashKey = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - const DiskLocation& Loc = Payload.Location; - - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - ++ChunkCount; - ChunkBytes += Loc.Size(); - if (Loc.GetContentType() == ZenContentType::kBinary) - { - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - - RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); - - std::error_code Ec; - uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); - if (Ec) - { - BadKeys.push_back(HashKey); - } - if (size != Loc.Size()) - { - BadKeys.push_back(HashKey); - } - continue; - } - IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); - if (!Buffer) - { - BadKeys.push_back(HashKey); - continue; - } - if (!ValidateEntry(HashKey, Loc.GetContentType(), Buffer)) - { - BadKeys.push_back(HashKey); - continue; - } - } - else - { - ChunkLocations.emplace_back(Loc.GetBlockLocation(m_PayloadAlignment)); - ChunkIndexToChunkHash.push_back(HashKey); - continue; - } - } - - const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - if (!Data) - { - // ChunkLocation out of range of stored blocks - BadKeys.push_back(Hash); - return; - } - IoBuffer Buffer(IoBuffer::Wrap, Data, Size); - if (!Buffer) - { - BadKeys.push_back(Hash); - return; - } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); - if (!ValidateEntry(Hash, ContentType, Buffer)) - { - BadKeys.push_back(Hash); - return; - } - }; - - const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - ++ChunkCount; - ChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file - IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); - if (!Buffer) - { - BadKeys.push_back(Hash); - return; - } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); - if (!ValidateEntry(Hash, ContentType, Buffer)) - { - BadKeys.push_back(Hash); - return; - } - }; - - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); - - _.ReleaseNow(); - - Ctx.ReportScrubbed(ChunkCount, ChunkBytes); - - if (!BadKeys.empty()) - { - ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName); - - if (Ctx.RunRecovery()) - { - // Deal with bad chunks by removing them from our lookup map - - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(BadKeys.size()); - - { - RwLock::ExclusiveLockScope __(m_IndexLock); - for (const IoHash& BadKey : BadKeys) - { - // Log a tombstone and delete the in-memory index for the bad entry - const auto It = m_Index.find(BadKey); - const BucketPayload& Payload = m_Payloads[It->second]; - DiskLocation Location = Payload.Location; - Location.Flags |= DiskLocation::kTombStone; - LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); - m_Index.erase(BadKey); - } - } - for (const DiskIndexEntry& Entry : LogEntries) - { - if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - ExtendablePathBuilder<256> Path; - BuildPath(Path, Entry.Key); - fs::path FilePath = Path.ToPath(); - RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); - if (fs::is_regular_file(FilePath)) - { - ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); - std::error_code Ec; - fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... - } - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - } - } - m_SlogFile.Append(LogEntries); - - // Clean up m_AccessTimes and m_Payloads vectors - { - std::vector<BucketPayload> Payloads; - std::vector<AccessTime> AccessTimes; - IndexMap Index; - - { - RwLock::ExclusiveLockScope __(m_IndexLock); - size_t EntryCount = m_Index.size(); - Payloads.reserve(EntryCount); - AccessTimes.reserve(EntryCount); - Index.reserve(EntryCount); - for (auto It : m_Index) - { - size_t EntryIndex = Payloads.size(); - Payloads.push_back(m_Payloads[EntryIndex]); - AccessTimes.push_back(m_AccessTimes[EntryIndex]); - Index.insert({It.first, EntryIndex}); - } - m_Index.swap(Index); - m_Payloads.swap(Payloads); - m_AccessTimes.swap(AccessTimes); - } - } - } - } - - // Let whomever it concerns know about the bad chunks. This could - // be used to invalidate higher level data structures more efficiently - // than a full validation pass might be able to do - Ctx.ReportBadCidChunks(BadKeys); - - ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); -} - -void -ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); - - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - - Stopwatch TotalTimer; - const auto _ = MakeGuard([&] { - ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", - m_BucketDir / m_BucketName, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs)); - }); - - const GcClock::TimePoint ExpireTime = GcCtx.ExpireTime(); - - const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - - IndexMap Index; - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - { - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - Index = m_Index; - AccessTimes = m_AccessTimes; - Payloads = m_Payloads; - } - - std::vector<IoHash> ExpiredKeys; - ExpiredKeys.reserve(1024); - - std::vector<IoHash> Cids; - Cids.reserve(1024); - - for (const auto& Entry : Index) - { - const IoHash& Key = Entry.first; - GcClock::Tick AccessTime = AccessTimes[Entry.second]; - if (AccessTime < ExpireTicks) - { - ExpiredKeys.push_back(Key); - continue; - } - - const DiskLocation& Loc = Payloads[Entry.second].Location; - - if (Loc.IsFlagSet(DiskLocation::kStructured)) - { - if (Cids.size() > 1024) - { - GcCtx.AddRetainedCids(Cids); - Cids.clear(); - } - - IoBuffer Buffer; - { - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - // We don't need to hold the index lock when we read a standalone file - __.ReleaseNow(); - if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer) - { - continue; - } - } - else if (Buffer = GetInlineCacheValue(Loc); !Buffer) - { - continue; - } - } - - ZEN_ASSERT(Buffer); - ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); - CbObject Obj(SharedBuffer{Buffer}); - Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); - } - } - - GcCtx.AddRetainedCids(Cids); - GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); -} - -void -ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::CollectGarbage"); - - ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir / m_BucketName); - - Stopwatch TotalTimer; - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - uint64_t TotalChunkCount = 0; - uint64_t DeletedSize = 0; - uint64_t OldTotalSize = TotalSize(); - - std::unordered_set<IoHash> DeletedChunks; - uint64_t MovedCount = 0; - - const auto _ = MakeGuard([&] { - ZEN_DEBUG( - "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " - "{} " - "of {} " - "entires ({}).", - m_BucketDir / m_BucketName, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs), - NiceBytes(DeletedSize), - DeletedChunks.size(), - MovedCount, - TotalChunkCount, - NiceBytes(OldTotalSize)); - RwLock::SharedLockScope _(m_IndexLock); - SaveManifest(); - }); - - m_SlogFile.Flush(); - - std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); - std::vector<IoHash> DeleteCacheKeys; - DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); - GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { - if (Keep) - { - return; - } - DeleteCacheKeys.push_back(ChunkHash); - }); - if (DeleteCacheKeys.empty()) - { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', no expired cache keys found", m_BucketDir / m_BucketName); - return; - } - - auto __ = MakeGuard([&]() { - if (!DeletedChunks.empty()) - { - // Clean up m_AccessTimes and m_Payloads vectors - std::vector<BucketPayload> Payloads; - std::vector<AccessTime> AccessTimes; - IndexMap Index; - - { - RwLock::ExclusiveLockScope _(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - size_t EntryCount = m_Index.size(); - Payloads.reserve(EntryCount); - AccessTimes.reserve(EntryCount); - Index.reserve(EntryCount); - for (auto It : m_Index) - { - size_t EntryIndex = Payloads.size(); - Payloads.push_back(m_Payloads[EntryIndex]); - AccessTimes.push_back(m_AccessTimes[EntryIndex]); - Index.insert({It.first, EntryIndex}); - } - m_Index.swap(Index); - m_Payloads.swap(Payloads); - m_AccessTimes.swap(AccessTimes); - } - GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end())); - } - }); - - std::vector<DiskIndexEntry> ExpiredStandaloneEntries; - IndexMap Index; - BlockStore::ReclaimSnapshotState BlockStoreState; - { - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_Index.empty()) - { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); - return; - } - BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - - SaveManifest(); - Index = m_Index; - - for (const IoHash& Key : DeleteCacheKeys) - { - if (auto It = Index.find(Key); It != Index.end()) - { - const BucketPayload& Payload = m_Payloads[It->second]; - DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; - if (Entry.Location.Flags & DiskLocation::kStandaloneFile) - { - Entry.Location.Flags |= DiskLocation::kTombStone; - ExpiredStandaloneEntries.push_back(Entry); - } - } - } - if (GcCtx.IsDeletionMode()) - { - for (const auto& Entry : ExpiredStandaloneEntries) - { - m_Index.erase(Entry.Key); - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - DeletedChunks.insert(Entry.Key); - } - m_SlogFile.Append(ExpiredStandaloneEntries); - } - } - - if (GcCtx.IsDeletionMode()) - { - std::error_code Ec; - ExtendablePathBuilder<256> Path; - - for (const auto& Entry : ExpiredStandaloneEntries) - { - const IoHash& Key = Entry.Key; - const DiskLocation& Loc = Entry.Location; - - Path.Reset(); - BuildPath(Path, Key); - fs::path FilePath = Path.ToPath(); - - { - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_Index.contains(Key)) - { - // Someone added it back, let the file on disk be - ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); - continue; - } - __.ReleaseNow(); - - RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); - if (fs::is_regular_file(FilePath)) - { - ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); - fs::remove(FilePath, Ec); - } - } - - if (Ec) - { - ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); - Ec.clear(); - DiskLocation RestoreLocation = Loc; - RestoreLocation.Flags &= ~DiskLocation::kTombStone; - - RwLock::ExclusiveLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - if (m_Index.contains(Key)) - { - continue; - } - m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation}); - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert({Key, EntryIndex}); - m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed); - DeletedChunks.erase(Key); - continue; - } - DeletedSize += Entry.Location.Size(); - } - } - - TotalChunkCount = Index.size(); - - std::vector<IoHash> TotalChunkHashes; - TotalChunkHashes.reserve(TotalChunkCount); - for (const auto& Entry : Index) - { - const DiskLocation& Location = m_Payloads[Entry.second].Location; - - if (Location.Flags & DiskLocation::kStandaloneFile) - { - continue; - } - TotalChunkHashes.push_back(Entry.first); - } - - if (TotalChunkHashes.empty()) - { - return; - } - TotalChunkCount = TotalChunkHashes.size(); - - std::vector<BlockStoreLocation> ChunkLocations; - BlockStore::ChunkIndexArray KeepChunkIndexes; - std::vector<IoHash> ChunkIndexToChunkHash; - ChunkLocations.reserve(TotalChunkCount); - ChunkLocations.reserve(TotalChunkCount); - ChunkIndexToChunkHash.reserve(TotalChunkCount); - - GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { - auto KeyIt = Index.find(ChunkHash); - const DiskLocation& DiskLocation = m_Payloads[KeyIt->second].Location; - BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); - size_t ChunkIndex = ChunkLocations.size(); - ChunkLocations.push_back(Location); - ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; - if (Keep) - { - KeepChunkIndexes.push_back(ChunkIndex); - } - }); - - size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); - - const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); - if (!PerformDelete) - { - m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); - uint64_t CurrentTotalSize = TotalSize(); - ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} {}", - m_BucketDir / m_BucketName, - DeleteCount, - TotalChunkCount, - NiceBytes(CurrentTotalSize)); - return; - } - - m_BlockStore.ReclaimSpace( - BlockStoreState, - ChunkLocations, - KeepChunkIndexes, - m_PayloadAlignment, - false, - [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const BlockStoreLocation& NewLocation = Entry.second; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; - const DiskLocation& OldDiskLocation = OldPayload.Location; - LogEntries.push_back( - {.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, OldDiskLocation.GetFlags())}); - } - for (const size_t ChunkIndex : RemovedChunks) - { - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const BucketPayload& OldPayload = m_Payloads[Index[ChunkHash]]; - const DiskLocation& OldDiskLocation = OldPayload.Location; - LogEntries.push_back({.Key = ChunkHash, - .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_PayloadAlignment), - m_PayloadAlignment, - OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); - DeletedChunks.insert(ChunkHash); - } - - m_SlogFile.Append(LogEntries); - m_SlogFile.Flush(); - { - RwLock::ExclusiveLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - for (const DiskIndexEntry& Entry : LogEntries) - { - if (Entry.Location.GetFlags() & DiskLocation::kTombStone) - { - m_Index.erase(Entry.Key); - continue; - } - m_Payloads[m_Index[Entry.Key]].Location = Entry.Location; - } - } - }, - [&]() { return GcCtx.CollectSmallObjects(); }); -} - -void -ZenCacheDiskLayer::CacheBucket::UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes) -{ - using namespace access_tracking; - - for (const KeyAccessTime& KeyTime : AccessTimes) - { - if (auto It = m_Index.find(KeyTime.Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = KeyTime.LastAccess; - } - } -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::EntryCount() const -{ - RwLock::SharedLockScope _(m_IndexLock); - return static_cast<uint64_t>(m_Index.size()); -} - -CacheValueDetails::ValueDetails -ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index) const -{ - std::vector<IoHash> Attachments; - const BucketPayload& Payload = m_Payloads[Index]; - if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) - { - IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key) - : GetInlineCacheValue(Payload.Location); - CbObject Obj(SharedBuffer{Value}); - Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); - } - return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), - .RawSize = Payload.RawSize, - .RawHash = Payload.RawHash, - .LastAccess = m_AccessTimes[Index], - .Attachments = std::move(Attachments), - .ContentType = Payload.Location.GetContentType()}; -} - -CacheValueDetails::BucketDetails -ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const -{ - CacheValueDetails::BucketDetails Details; - RwLock::SharedLockScope _(m_IndexLock); - if (ValueFilter.empty()) - { - Details.Values.reserve(m_Index.size()); - for (const auto& It : m_Index) - { - Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second)); - } - } - else - { - IoHash Key = IoHash::FromHexString(ValueFilter); - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second)); - } - } - return Details; -} - -void -ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - CacheBucket& Bucket = *Kv.second; - Bucket.CollectGarbage(GcCtx); - } -} - -void -ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& AccessTimes) -{ - RwLock::SharedLockScope _(m_Lock); - - for (const auto& Kv : AccessTimes.Buckets) - { - if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - Bucket.UpdateAccessTimes(Kv.second); - } - } -} - -void -ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) -{ - uint64_t NewFileSize = Value.Value.Size(); - - TemporaryFile DataFile; - - std::error_code Ec; - DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); - } - - bool CleanUpTempFile = false; - auto __ = MakeGuard([&] { - if (CleanUpTempFile) - { - std::error_code Ec; - std::filesystem::remove(DataFile.GetPath(), Ec); - if (Ec) - { - ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", - DataFile.GetPath(), - m_BucketDir, - Ec.message()); - } - } - }); - - DataFile.WriteAll(Value.Value, Ec); - if (Ec) - { - throw std::system_error(Ec, - fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", - NiceBytes(NewFileSize), - DataFile.GetPath().string(), - m_BucketDir)); - } - - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - std::filesystem::path FsPath{DataFilePath.ToPath()}; - - RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - - // We do a speculative remove of the file instead of probing with a exists call and check the error code instead - std::filesystem::remove(FsPath, Ec); - if (Ec) - { - if (Ec.value() != ENOENT) - { - ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); - Sleep(100); - Ec.clear(); - std::filesystem::remove(FsPath, Ec); - if (Ec && Ec.value() != ENOENT) - { - throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); - } - } - } - - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - if (Ec) - { - CreateDirectories(FsPath.parent_path()); - Ec.clear(); - - // Try again - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - if (Ec) - { - ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retrying.", - FsPath, - DataFile.GetPath(), - m_BucketDir, - Ec.message()); - Sleep(100); - Ec.clear(); - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - if (Ec) - { - throw std::system_error( - Ec, - fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); - } - } - } - - // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file - // will be disabled as the file handle has already been closed - CleanUpTempFile = false; - - uint8_t EntryFlags = DiskLocation::kStandaloneFile; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - DiskLocation Loc(NewFileSize, EntryFlags); - - RwLock::ExclusiveLockScope _(m_IndexLock); - if (auto It = m_Index.find(HashKey); It == m_Index.end()) - { - // Previously unknown object - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(HashKey, EntryIndex); - } - else - { - // TODO: should check if write is idempotent and bail out if it is? - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash}; - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed); - } - - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); -} - -void -ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) -{ - uint8_t EntryFlags = 0; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { - DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); - m_SlogFile.Append({.Key = HashKey, .Location = Location}); - - RwLock::ExclusiveLockScope _(m_IndexLock); - if (auto It = m_Index.find(HashKey); It != m_Index.end()) - { - // TODO: should check if write is idempotent and bail out if it is? - // this would requiring comparing contents on disk unless we add a - // content hash to the index entry - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - } - else - { - size_t EntryIndex = m_Payloads.size(); - m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - m_Index.insert_or_assign(HashKey, EntryIndex); - } - }); -} - -////////////////////////////////////////////////////////////////////////// - -ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) -{ -} - -ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; - -bool -ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(BucketName); - - if (It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - } - - if (Bucket == nullptr) - { - // Bucket needs to be opened/created - - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); - Bucket = InsertResult.first->second.get(); - - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; - - if (!Bucket->OpenOrCreate(BucketPath)) - { - m_Buckets.erase(InsertResult.first); - return false; - } - } - } - - ZEN_ASSERT(Bucket != nullptr); - return Bucket->Get(HashKey, OutValue); -} - -void -ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) -{ - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(BucketName); - - if (It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - } - - if (Bucket == nullptr) - { - // New bucket needs to be created - - RwLock::ExclusiveLockScope _(m_Lock); - - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); - Bucket = InsertResult.first->second.get(); - - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; - - try - { - if (!Bucket->OpenOrCreate(BucketPath)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - m_Buckets.erase(InsertResult.first); - return; - } - } - catch (const std::exception& Err) - { - ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - return; - } - } - } - - ZEN_ASSERT(Bucket != nullptr); - - Bucket->Put(HashKey, Value); -} - -void -ZenCacheDiskLayer::DiscoverBuckets() -{ - DirectoryContent DirContent; - GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); - - // Initialize buckets - - RwLock::ExclusiveLockScope _(m_Lock); - - for (const std::filesystem::path& BucketPath : DirContent.Directories) - { - const std::string BucketName = PathToUtf8(BucketPath.stem()); - // New bucket needs to be created - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - continue; - } - - auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); - CacheBucket& Bucket = *InsertResult.first->second; - - try - { - if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - - m_Buckets.erase(InsertResult.first); - continue; - } - } - catch (const std::exception& Err) - { - ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - return; - } - ZEN_INFO("Discovered bucket '{}'", BucketName); - } -} - -bool -ZenCacheDiskLayer::DropBucket(std::string_view InBucket) -{ - RwLock::ExclusiveLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It); - - return Bucket.Drop(); - } - - // Make sure we remove the folder even if we don't know about the bucket - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= std::string(InBucket); - return MoveAndDeleteDirectory(BucketPath); -} - -bool -ZenCacheDiskLayer::Drop() -{ - RwLock::ExclusiveLockScope _(m_Lock); - - std::vector<std::unique_ptr<CacheBucket>> Buckets; - Buckets.reserve(m_Buckets.size()); - while (!m_Buckets.empty()) - { - const auto& It = m_Buckets.begin(); - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It->first); - if (!Bucket.Drop()) - { - return false; - } - } - return MoveAndDeleteDirectory(m_RootDir); -} - -void -ZenCacheDiskLayer::Flush() -{ - std::vector<CacheBucket*> Buckets; - - { - RwLock::SharedLockScope _(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - CacheBucket* Bucket = Kv.second.get(); - Buckets.push_back(Bucket); - } - } - - for (auto& Bucket : Buckets) - { - Bucket->Flush(); - } -} - -void -ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - CacheBucket& Bucket = *Kv.second; - Bucket.Scrub(Ctx); - } -} - -void -ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) -{ - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - CacheBucket& Bucket = *Kv.second; - Bucket.GatherReferences(GcCtx); - } -} - -uint64_t -ZenCacheDiskLayer::TotalSize() const -{ - uint64_t TotalSize{}; - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - TotalSize += Kv.second->TotalSize(); - } - - return TotalSize; -} - -ZenCacheDiskLayer::Info -ZenCacheDiskLayer::GetInfo() const -{ - ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir}, .TotalSize = TotalSize()}; - - RwLock::SharedLockScope _(m_Lock); - Info.BucketNames.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Info.BucketNames.push_back(Kv.first); - Info.EntryCount += Kv.second->EntryCount(); - } - return Info; -} - -std::optional<ZenCacheDiskLayer::BucketInfo> -ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const -{ - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) - { - return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .TotalSize = It->second->TotalSize()}; - } - return {}; -} - -CacheValueDetails::NamespaceDetails -ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const -{ - RwLock::SharedLockScope _(m_Lock); - CacheValueDetails::NamespaceDetails Details; - if (BucketFilter.empty()) - { - Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); - for (auto& Kv : m_Buckets) - { - Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter); - } - } - else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) - { - Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter); - } - return Details; -} - -//////////////////////////// ZenCacheStore - -static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc"; - -ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) : m_Gc(Gc), m_Configuration(Configuration) -{ - CreateDirectories(m_Configuration.BasePath); - - DirectoryContent DirContent; - GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent); - - std::vector<std::string> Namespaces; - for (const std::filesystem::path& DirPath : DirContent.Directories) - { - std::string DirName = PathToUtf8(DirPath.filename()); - if (DirName.starts_with(NamespaceDiskPrefix)) - { - Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length())); - continue; - } - } - - ZEN_INFO("Found {} namespaces in '{}'", Namespaces.size(), m_Configuration.BasePath); - - if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end()) - { - // default (unspecified) and ue4-ddc namespace points to the same namespace instance - - std::filesystem::path DefaultNamespaceFolder = - m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName); - CreateDirectories(DefaultNamespaceFolder); - Namespaces.push_back(std::string(UE4DDCNamespaceName)); - } - - for (const std::string& NamespaceName : Namespaces) - { - m_Namespaces[NamespaceName] = - std::make_unique<ZenCacheNamespace>(Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName)); - } -} - -ZenCacheStore::~ZenCacheStore() -{ - m_Namespaces.clear(); -} - -bool -ZenCacheStore::Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) - { - return Store->Get(Bucket, HashKey, OutValue); - } - ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Get, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); - - return false; -} - -void -ZenCacheStore::Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value) -{ - if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) - { - return Store->Put(Bucket, HashKey, Value); - } - ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}', key '{}'", Namespace, Bucket, HashKey.ToHexString()); -} - -bool -ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket) -{ - if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) - { - return Store->DropBucket(Bucket); - } - ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropBucket, bucket '{}'", Namespace, Bucket); - return false; -} - -bool -ZenCacheStore::DropNamespace(std::string_view InNamespace) -{ - RwLock::SharedLockScope _(m_NamespacesLock); - if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end()) - { - ZenCacheNamespace& Namespace = *It->second; - m_DroppedNamespaces.push_back(std::move(It->second)); - m_Namespaces.erase(It); - return Namespace.Drop(); - } - ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace); - return false; -} - -void -ZenCacheStore::Flush() -{ - IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); }); -} - -void -ZenCacheStore::Scrub(ScrubContext& Ctx) -{ - IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); }); -} - -CacheValueDetails -ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter, - const std::string_view BucketFilter, - const std::string_view ValueFilter) const -{ - CacheValueDetails Details; - if (NamespaceFilter.empty()) - { - IterateNamespaces([&](std::string_view Namespace, ZenCacheNamespace& Store) { - Details.Namespaces[std::string(Namespace)] = Store.GetValueDetails(BucketFilter, ValueFilter); - }); - } - else if (const ZenCacheNamespace* Store = FindNamespace(NamespaceFilter); Store != nullptr) - { - Details.Namespaces[std::string(NamespaceFilter)] = Store->GetValueDetails(BucketFilter, ValueFilter); - } - return Details; -} - -ZenCacheNamespace* -ZenCacheStore::GetNamespace(std::string_view Namespace) -{ - RwLock::SharedLockScope _(m_NamespacesLock); - if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) - { - return It->second.get(); - } - if (Namespace == DefaultNamespace) - { - if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end()) - { - return It->second.get(); - } - } - _.ReleaseNow(); - - if (!m_Configuration.AllowAutomaticCreationOfNamespaces) - { - return nullptr; - } - - RwLock::ExclusiveLockScope __(m_NamespacesLock); - if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) - { - return It->second.get(); - } - - auto NewNamespace = m_Namespaces.insert_or_assign( - std::string(Namespace), - std::make_unique<ZenCacheNamespace>(m_Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace))); - return NewNamespace.first->second.get(); -} - -const ZenCacheNamespace* -ZenCacheStore::FindNamespace(std::string_view Namespace) const -{ - RwLock::SharedLockScope _(m_NamespacesLock); - if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) - { - return It->second.get(); - } - if (Namespace == DefaultNamespace) - { - if (auto It = m_Namespaces.find(std::string(UE4DDCNamespaceName)); It != m_Namespaces.end()) - { - return It->second.get(); - } - } - return nullptr; -} - -void -ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const -{ - std::vector<std::pair<std::string, ZenCacheNamespace&>> Namespaces; - { - RwLock::SharedLockScope _(m_NamespacesLock); - Namespaces.reserve(m_Namespaces.size()); - for (const auto& Entry : m_Namespaces) - { - if (Entry.first == DefaultNamespace) - { - continue; - } - Namespaces.push_back({Entry.first, *Entry.second}); - } - } - for (auto& Entry : Namespaces) - { - Callback(Entry.first, Entry.second); - } -} - -GcStorageSize -ZenCacheStore::StorageSize() const -{ - GcStorageSize Size; - IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { - GcStorageSize StoreSize = Store.StorageSize(); - Size.MemorySize += StoreSize.MemorySize; - Size.DiskSize += StoreSize.DiskSize; - }); - return Size; -} - -ZenCacheStore::Info -ZenCacheStore::GetInfo() const -{ - ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()}; - - IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) { - Info.NamespaceNames.push_back(std::string(NamespaceName)); - ZenCacheNamespace::Info NamespaceInfo = Namespace.GetInfo(); - Info.DiskEntryCount += NamespaceInfo.DiskLayerInfo.EntryCount; - Info.MemoryEntryCount += NamespaceInfo.MemoryLayerInfo.EntryCount; - }); - - return Info; -} - -std::optional<ZenCacheNamespace::Info> -ZenCacheStore::GetNamespaceInfo(std::string_view NamespaceName) -{ - if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace) - { - return Namespace->GetInfo(); - } - return {}; -} - -std::optional<ZenCacheNamespace::BucketInfo> -ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view BucketName) -{ - if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace) - { - return Namespace->GetBucketInfo(BucketName); - } - return {}; -} - -////////////////////////////////////////////////////////////////////////// - -#if ZEN_WITH_TESTS - -using namespace std::literals; - -namespace testutils { - IoHash CreateKey(size_t KeyValue) { return IoHash::HashBuffer(&KeyValue, sizeof(size_t)); } - - IoBuffer CreateBinaryCacheValue(uint64_t Size) - { - static std::random_device rd; - static std::mt19937 g(rd()); - - std::vector<uint8_t> Values; - Values.resize(Size); - for (size_t Idx = 0; Idx < Size; ++Idx) - { - Values[Idx] = static_cast<uint8_t>(Idx); - } - std::shuffle(Values.begin(), Values.end(), g); - - IoBuffer Buf(IoBuffer::Clone, Values.data(), Values.size()); - Buf.SetContentType(ZenContentType::kBinary); - return Buf; - }; - -} // namespace testutils - -TEST_CASE("z$.store") -{ - ScopedTemporaryDirectory TempDir; - - GcManager Gc; - - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - - const int kIterationCount = 100; - - for (int i = 0; i < kIterationCount; ++i) - { - const IoHash Key = IoHash::HashBuffer(&i, sizeof i); - - CbObjectWriter Cbo; - Cbo << "hey" << i; - CbObject Obj = Cbo.Save(); - - ZenCacheValue Value; - Value.Value = Obj.GetBuffer().AsIoBuffer(); - Value.Value.SetContentType(ZenContentType::kCbObject); - - Zcs.Put("test_bucket"sv, Key, Value); - } - - for (int i = 0; i < kIterationCount; ++i) - { - const IoHash Key = IoHash::HashBuffer(&i, sizeof i); - - ZenCacheValue Value; - Zcs.Get("test_bucket"sv, Key, /* out */ Value); - - REQUIRE(Value.Value); - CHECK(Value.Value.GetContentType() == ZenContentType::kCbObject); - CHECK_EQ(ValidateCompactBinary(Value.Value, CbValidateMode::All), CbValidateError::None); - CbObject Obj = LoadCompactBinaryObject(Value.Value); - CHECK_EQ(Obj["hey"].AsInt32(), i); - } -} - -TEST_CASE("z$.size") -{ - const auto CreateCacheValue = [](size_t Size) -> CbObject { - std::vector<uint8_t> Buf; - Buf.resize(Size); - - CbObjectWriter Writer; - Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); - return Writer.Save(); - }; - - SUBCASE("mem/disklayer") - { - const size_t Count = 16; - ScopedTemporaryDirectory TempDir; - - GcStorageSize CacheSize; - - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - - CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256); - - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - for (size_t Key = 0; Key < Count; ++Key) - { - const size_t Bucket = Key % 4; - Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), ZenCacheValue{.Value = Buffer}); - } - - CacheSize = Zcs.StorageSize(); - CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); - CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize); - } - - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - - const GcStorageSize SerializedSize = Zcs.StorageSize(); - CHECK_EQ(SerializedSize.MemorySize, 0); - CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize); - - for (size_t Bucket = 0; Bucket < 4; ++Bucket) - { - Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); - } - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - } - } - - SUBCASE("disklayer") - { - const size_t Count = 16; - ScopedTemporaryDirectory TempDir; - - GcStorageSize CacheSize; - - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - - CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64); - - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - for (size_t Key = 0; Key < Count; ++Key) - { - const size_t Bucket = Key % 4; - Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}); - } - - CacheSize = Zcs.StorageSize(); - CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); - CHECK_EQ(0, CacheSize.MemorySize); - } - - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - - const GcStorageSize SerializedSize = Zcs.StorageSize(); - CHECK_EQ(SerializedSize.MemorySize, 0); - CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize); - - for (size_t Bucket = 0; Bucket < 4; ++Bucket) - { - Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); - } - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - } - } -} - -TEST_CASE("z$.gc") -{ - using namespace testutils; - - SUBCASE("gather references does NOT add references for expired cache entries") - { - ScopedTemporaryDirectory TempDir; - std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)}; - - const auto CollectAndFilter = [](GcManager& Gc, - GcClock::TimePoint Time, - GcClock::Duration MaxDuration, - std::span<const IoHash> Cids, - std::vector<IoHash>& OutKeep) { - GcContext GcCtx(Time - MaxDuration); - Gc.CollectGarbage(GcCtx); - OutKeep.clear(); - GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); }); - }; - - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - const auto Bucket = "teardrinker"sv; - - // Create a cache record - const IoHash Key = CreateKey(42); - CbObjectWriter Record; - Record << "Key"sv - << "SomeRecord"sv; - - for (size_t Idx = 0; auto& Cid : Cids) - { - Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid); - } - - IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - Zcs.Put(Bucket, Key, {.Value = Buffer}); - - std::vector<IoHash> Keep; - - // Collect garbage with 1 hour max cache duration - { - CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); - CHECK_EQ(Cids.size(), Keep.size()); - } - - // Move forward in time - { - CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); - CHECK_EQ(0, Keep.size()); - } - } - - // Expect timestamps to be serialized - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - std::vector<IoHash> Keep; - - // Collect garbage with 1 hour max cache duration - { - CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); - CHECK_EQ(3, Keep.size()); - } - - // Move forward in time - { - CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); - CHECK_EQ(0, Keep.size()); - } - } - } - - SUBCASE("gc removes standalone values") - { - ScopedTemporaryDirectory TempDir; - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - const auto Bucket = "fortysixandtwo"sv; - const GcClock::TimePoint CurrentTime = GcClock::Now(); - - std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; - - for (const auto& Key : Keys) - { - IoBuffer Value = testutils::CreateBinaryCacheValue(128 << 10); - Zcs.Put(Bucket, Key, {.Value = Value}); - } - - { - GcContext GcCtx(CurrentTime - std::chrono::hours(46)); - - Gc.CollectGarbage(GcCtx); - - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(Exists); - } - } - - // Move forward in time and collect again - { - GcContext GcCtx(CurrentTime + std::chrono::minutes(2)); - Gc.CollectGarbage(GcCtx); - - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(!Exists); - } - - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - } - } - - SUBCASE("gc removes small objects") - { - ScopedTemporaryDirectory TempDir; - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - const auto Bucket = "rightintwo"sv; - - std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; - - for (const auto& Key : Keys) - { - IoBuffer Value = testutils::CreateBinaryCacheValue(128); - Zcs.Put(Bucket, Key, {.Value = Value}); - } - - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(2)); - GcCtx.CollectSmallObjects(true); - - Gc.CollectGarbage(GcCtx); - - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(Exists); - } - } - - // Move forward in time and collect again - { - GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2)); - GcCtx.CollectSmallObjects(true); - - Zcs.Flush(); - Gc.CollectGarbage(GcCtx); - - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(!Exists); - } - - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - } - } -} - -TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) -{ - // for (uint32_t i = 0; i < 100; ++i) - { - ScopedTemporaryDirectory TempDir; - - const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 8192; - - struct Chunk - { - std::string Bucket; - IoBuffer Buffer; - }; - std::unordered_map<IoHash, Chunk, IoHash::Hasher> Chunks; - Chunks.reserve(kChunkCount); - - const std::string Bucket1 = "rightinone"; - const std::string Bucket2 = "rightintwo"; - - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) - { - while (true) - { - IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - if (Chunks.contains(Hash)) - { - continue; - } - Chunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; - break; - } - while (true) - { - IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - if (Chunks.contains(Hash)) - { - continue; - } - Chunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; - break; - } - } - - CreateDirectories(TempDir.Path()); - - WorkerThreadPool ThreadPool(4); - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path()); - - { - std::atomic<size_t> WorkCompleted = 0; - for (const auto& Chunk : Chunks) - { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } - } - - const uint64_t TotalSize = Zcs.StorageSize().DiskSize; - CHECK_LE(kChunkSize * Chunks.size(), TotalSize); - - { - std::atomic<size_t> WorkCompleted = 0; - for (const auto& Chunk : Chunks) - { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - std::string Bucket = Chunk.second.Bucket; - IoHash ChunkHash = Chunk.first; - ZenCacheValue CacheValue; - - CHECK(Zcs.Get(Bucket, ChunkHash, CacheValue)); - IoHash Hash = IoHash::HashBuffer(CacheValue.Value); - CHECK(ChunkHash == Hash); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < Chunks.size()) - { - Sleep(1); - } - } - std::unordered_map<IoHash, std::string, IoHash::Hasher> GcChunkHashes; - GcChunkHashes.reserve(Chunks.size()); - for (const auto& Chunk : Chunks) - { - GcChunkHashes[Chunk.first] = Chunk.second.Bucket; - } - { - std::unordered_map<IoHash, Chunk, IoHash::Hasher> NewChunks; - - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) - { - { - IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - NewChunks[Hash] = {.Bucket = Bucket1, .Buffer = Chunk}; - } - { - IoBuffer Chunk = testutils::CreateBinaryCacheValue(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - NewChunks[Hash] = {.Bucket = Bucket2, .Buffer = Chunk}; - } - } - - std::atomic<size_t> WorkCompleted = 0; - std::atomic_uint32_t AddedChunkCount = 0; - for (const auto& Chunk : NewChunks) - { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}); - AddedChunkCount.fetch_add(1); - WorkCompleted.fetch_add(1); - }); - } - - for (const auto& Chunk : Chunks) - { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { - ZenCacheValue CacheValue; - if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) - { - CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); - } - WorkCompleted.fetch_add(1); - }); - } - while (AddedChunkCount.load() < NewChunks.size()) - { - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const auto& Chunk : NewChunks) - { - ZenCacheValue CacheValue; - if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) - { - GcChunkHashes[Chunk.first] = Chunk.second.Bucket; - } - } - std::vector<IoHash> KeepHashes; - KeepHashes.reserve(GcChunkHashes.size()); - for (const auto& Entry : GcChunkHashes) - { - KeepHashes.push_back(Entry.first); - } - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 155 == 0) - { - if (C < KeepHashes.size() - 1) - { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - if (C + 3 < KeepHashes.size() - 1) - { - KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - } - C++; - } - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - GcCtx.AddRetainedCids(KeepHashes); - Zcs.CollectGarbage(GcCtx); - const HashKeySet& Deleted = GcCtx.DeletedCids(); - Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); - } - - while (WorkCompleted < NewChunks.size() + Chunks.size()) - { - Sleep(1); - } - - { - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const auto& Chunk : NewChunks) - { - ZenCacheValue CacheValue; - if (Zcs.Get(Chunk.second.Bucket, Chunk.first, CacheValue)) - { - GcChunkHashes[Chunk.first] = Chunk.second.Bucket; - } - } - std::vector<IoHash> KeepHashes; - KeepHashes.reserve(GcChunkHashes.size()); - for (const auto& Entry : GcChunkHashes) - { - KeepHashes.push_back(Entry.first); - } - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 155 == 0) - { - if (C < KeepHashes.size() - 1) - { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - if (C + 3 < KeepHashes.size() - 1) - { - KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - } - C++; - } - - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - GcCtx.AddRetainedCids(KeepHashes); - Zcs.CollectGarbage(GcCtx); - const HashKeySet& Deleted = GcCtx.DeletedCids(); - Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); - } - } - { - std::atomic<size_t> WorkCompleted = 0; - for (const auto& Chunk : GcChunkHashes) - { - ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk]() { - ZenCacheValue CacheValue; - CHECK(Zcs.Get(Chunk.second, Chunk.first, CacheValue)); - CHECK(Chunk.first == IoHash::HashBuffer(CacheValue.Value)); - WorkCompleted.fetch_add(1); - }); - } - while (WorkCompleted < GcChunkHashes.size()) - { - Sleep(1); - } - } - } -} - -TEST_CASE("z$.namespaces") -{ - using namespace testutils; - - const auto CreateCacheValue = [](size_t Size) -> CbObject { - std::vector<uint8_t> Buf; - Buf.resize(Size); - - CbObjectWriter Writer; - Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); - return Writer.Save(); - }; - - ScopedTemporaryDirectory TempDir; - CreateDirectories(TempDir.Path()); - - IoHash Key1; - IoHash Key2; - { - GcManager Gc; - ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}); - const auto Bucket = "teardrinker"sv; - const auto CustomNamespace = "mynamespace"sv; - - // Create a cache record - Key1 = CreateKey(42); - CbObject CacheValue = CreateCacheValue(4096); - - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue); - - ZenCacheValue GetValue; - CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); - CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); - - // This should just be dropped as we don't allow creating of namespaces on the fly - Zcs.Put(CustomNamespace, Bucket, Key1, PutValue); - CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); - } - - { - GcManager Gc; - ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); - const auto Bucket = "teardrinker"sv; - const auto CustomNamespace = "mynamespace"sv; - - Key2 = CreateKey(43); - CbObject CacheValue2 = CreateCacheValue(4096); - - IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); - Buffer2.SetContentType(ZenContentType::kCbObject); - ZenCacheValue PutValue2 = {.Value = Buffer2}; - Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2); - - ZenCacheValue GetValue; - CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); - CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); - CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); - CHECK(Zcs.Get(CustomNamespace, Bucket, Key2, GetValue)); - } -} - -TEST_CASE("z$.drop.bucket") -{ - using namespace testutils; - - const auto CreateCacheValue = [](size_t Size) -> CbObject { - std::vector<uint8_t> Buf; - Buf.resize(Size); - - CbObjectWriter Writer; - Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); - return Writer.Save(); - }; - - ScopedTemporaryDirectory TempDir; - CreateDirectories(TempDir.Path()); - - IoHash Key1; - IoHash Key2; - - auto PutValue = - [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { - // Create a cache record - IoHash Key = CreateKey(KeyIndex); - CbObject CacheValue = CreateCacheValue(Size); - - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Namespace, Bucket, Key, PutValue); - return Key; - }; - auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { - ZenCacheValue GetValue; - Zcs.Get(Namespace, Bucket, Key, GetValue); - return GetValue; - }; - WorkerThreadPool Workers(1); - { - GcManager Gc; - ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); - const auto Bucket = "teardrinker"sv; - const auto Namespace = "mynamespace"sv; - - Key1 = PutValue(Zcs, Namespace, Bucket, 42, 4096); - Key2 = PutValue(Zcs, Namespace, Bucket, 43, 2048); - - ZenCacheValue Value1 = GetValue(Zcs, Namespace, Bucket, Key1); - CHECK(Value1.Value); - - std::atomic_bool WorkComplete = false; - Workers.ScheduleWork([&]() { - zen::Sleep(100); - Value1.Value = IoBuffer{}; - WorkComplete = true; - }); - // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket - // Our DropBucket execution blocks any incoming request from completing until we are done with the drop - CHECK(Zcs.DropBucket(Namespace, Bucket)); - while (!WorkComplete) - { - zen::Sleep(1); - } - - // Entire bucket should be dropped, but doing a request should will re-create the namespace but it must still be empty - Value1 = GetValue(Zcs, Namespace, Bucket, Key1); - CHECK(!Value1.Value); - ZenCacheValue Value2 = GetValue(Zcs, Namespace, Bucket, Key2); - CHECK(!Value2.Value); - } -} - -TEST_CASE("z$.drop.namespace") -{ - using namespace testutils; - - const auto CreateCacheValue = [](size_t Size) -> CbObject { - std::vector<uint8_t> Buf; - Buf.resize(Size); - - CbObjectWriter Writer; - Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); - return Writer.Save(); - }; - - ScopedTemporaryDirectory TempDir; - CreateDirectories(TempDir.Path()); - - auto PutValue = - [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { - // Create a cache record - IoHash Key = CreateKey(KeyIndex); - CbObject CacheValue = CreateCacheValue(Size); - - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Namespace, Bucket, Key, PutValue); - return Key; - }; - auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { - ZenCacheValue GetValue; - Zcs.Get(Namespace, Bucket, Key, GetValue); - return GetValue; - }; - WorkerThreadPool Workers(1); - { - GcManager Gc; - ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); - const auto Bucket1 = "teardrinker1"sv; - const auto Bucket2 = "teardrinker2"sv; - const auto Namespace1 = "mynamespace1"sv; - const auto Namespace2 = "mynamespace2"sv; - - IoHash Key1 = PutValue(Zcs, Namespace1, Bucket1, 42, 4096); - IoHash Key2 = PutValue(Zcs, Namespace1, Bucket2, 43, 2048); - IoHash Key3 = PutValue(Zcs, Namespace2, Bucket1, 44, 4096); - IoHash Key4 = PutValue(Zcs, Namespace2, Bucket2, 45, 2048); - - ZenCacheValue Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1); - CHECK(Value1.Value); - ZenCacheValue Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2); - CHECK(Value2.Value); - ZenCacheValue Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3); - CHECK(Value3.Value); - ZenCacheValue Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4); - CHECK(Value4.Value); - - std::atomic_bool WorkComplete = false; - Workers.ScheduleWork([&]() { - zen::Sleep(100); - Value1.Value = IoBuffer{}; - Value2.Value = IoBuffer{}; - Value3.Value = IoBuffer{}; - Value4.Value = IoBuffer{}; - WorkComplete = true; - }); - // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket - // Our DropBucket execution blocks any incoming request from completing until we are done with the drop - CHECK(Zcs.DropNamespace(Namespace1)); - while (!WorkComplete) - { - zen::Sleep(1); - } - - // Entire namespace should be dropped, but doing a request should will re-create the namespace but it must still be empty - Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1); - CHECK(!Value1.Value); - Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2); - CHECK(!Value2.Value); - Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3); - CHECK(Value3.Value); - Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4); - CHECK(Value4.Value); - } -} - -TEST_CASE("z$.blocked.disklayer.put") -{ - ScopedTemporaryDirectory TempDir; - - GcStorageSize CacheSize; - - const auto CreateCacheValue = [](size_t Size) -> CbObject { - std::vector<uint8_t> Buf; - Buf.resize(Size, Size & 0xff); - - CbObjectWriter Writer; - Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); - return Writer.Save(); - }; - - GcManager Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - - CbObject CacheValue = CreateCacheValue(64 * 1024 + 64); - - IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - size_t Key = Buffer.Size(); - IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t)); - Zcs.Put("test_bucket", HashKey, {.Value = Buffer}); - - ZenCacheValue BufferGet; - CHECK(Zcs.Get("test_bucket", HashKey, BufferGet)); - - CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1); - IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); - Buffer2.SetContentType(ZenContentType::kCbObject); - - // We should be able to overwrite even if the file is open for read - Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}); - - MemoryView OldView = BufferGet.Value.GetView(); - - ZenCacheValue BufferGet2; - CHECK(Zcs.Get("test_bucket", HashKey, BufferGet2)); - MemoryView NewView = BufferGet2.Value.GetView(); - - // Make sure file openend for read before we wrote it still have old data - CHECK(OldView.GetSize() == Buffer.GetSize()); - CHECK(memcmp(OldView.GetData(), Buffer.GetData(), OldView.GetSize()) == 0); - - // Make sure we get the new data when reading after we write new data - CHECK(NewView.GetSize() == Buffer2.GetSize()); - CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0); -} - -TEST_CASE("z$.scrub") -{ - ScopedTemporaryDirectory TempDir; - - using namespace testutils; - - struct CacheRecord - { - IoBuffer Record; - std::vector<CompressedBuffer> Attachments; - }; - - auto CreateCacheRecord = [](bool Structured, std::string_view Bucket, const IoHash& Key, const std::vector<size_t>& AttachmentSizes) { - CacheRecord Result; - if (Structured) - { - Result.Attachments.resize(AttachmentSizes.size()); - CbObjectWriter Record; - Record.BeginObject("Key"sv); - { - Record << "Bucket"sv << Bucket; - Record << "Hash"sv << Key; - } - Record.EndObject(); - for (size_t Index = 0; Index < AttachmentSizes.size(); Index++) - { - IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]); - CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData)); - Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), CompressedAttachmentData.DecodeRawHash()); - Result.Attachments[Index] = CompressedAttachmentData; - } - Result.Record = Record.Save().GetBuffer().AsIoBuffer(); - Result.Record.SetContentType(ZenContentType::kCbObject); - } - else - { - std::string RecordData = fmt::format("{}:{}", Bucket, Key.ToHexString()); - size_t TotalSize = RecordData.length() + 1; - for (size_t AttachmentSize : AttachmentSizes) - { - TotalSize += AttachmentSize; - } - Result.Record = IoBuffer(TotalSize); - char* DataPtr = (char*)Result.Record.MutableData(); - memcpy(DataPtr, RecordData.c_str(), RecordData.length() + 1); - DataPtr += RecordData.length() + 1; - for (size_t AttachmentSize : AttachmentSizes) - { - IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSize); - memcpy(DataPtr, AttachmentData.GetData(), AttachmentData.GetSize()); - DataPtr += AttachmentData.GetSize(); - } - } - return Result; - }; - - GcManager Gc; - CidStore CidStore(Gc); - ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); - CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); - - auto CreateRecords = - [&](bool IsStructured, std::string_view BucketName, const std::vector<IoHash>& Cids, const std::vector<size_t>& AttachmentSizes) { - for (const IoHash& Cid : Cids) - { - CacheRecord Record = CreateCacheRecord(IsStructured, BucketName, Cid, AttachmentSizes); - Zcs.Put("mybucket", Cid, {.Value = Record.Record}); - for (const CompressedBuffer& Attachment : Record.Attachments) - { - CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), Attachment.DecodeRawHash()); - } - } - }; - - std::vector<size_t> AttachmentSizes = {16, 1000, 2000, 4000, 8000, 64000, 80000}; - - std::vector<IoHash> UnstructuredCids{CreateKey(4), CreateKey(5), CreateKey(6)}; - CreateRecords(false, "mybucket"sv, UnstructuredCids, AttachmentSizes); - - std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)}; - CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes); - - ScrubContext ScrubCtx; - Zcs.Scrub(ScrubCtx); - CidStore.Scrub(ScrubCtx); - CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size()); - CHECK(ScrubCtx.BadCids().GetSize() == 0); -} - -#endif - -void -z$_forcelink() -{ -} - -} // namespace zen |