diff options
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 1038 |
1 files changed, 321 insertions, 717 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 110acba9e..cbc1d6e83 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -22,9 +22,19 @@ ////////////////////////////////////////////////////////////////////////// +#include <zencore/memory/llm.h> + namespace zen { -namespace { +const FLLMTag& +GetCacheDiskTag() +{ + static FLLMTag _("disk", FLLMTag("cache")); + + return _; +} + +namespace cache::impl { #pragma pack(push) #pragma pack(1) @@ -214,11 +224,15 @@ namespace { zen::Sleep(100); } while (true); } -} // namespace +} // namespace cache::impl namespace fs = std::filesystem; using namespace std::literals; +} // namespace zen + +namespace zen::cache::impl { + class BucketManifestSerializer { using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; @@ -561,7 +575,8 @@ BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& B if (Header.EntryCount > ExpectedEntryCount) { ZEN_WARN( - "Failed to read sidecar file '{}'. File is not large enough to hold expected entry count. Header count: {}, file size count: " + "Failed to read sidecar file '{}'. File is not large enough to hold expected entry count. Header count: {}, file size " + "count: " "{}", SidecarPath, Header.EntryCount, @@ -685,6 +700,12 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&, static const float IndexMinLoadFactor = 0.2f; static const float IndexMaxLoadFactor = 0.7f; +} // namespace zen::cache::impl + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, @@ -695,8 +716,8 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, , m_Configuration(Config) , m_BucketId(Oid::Zero) { - m_Index.min_load_factor(IndexMinLoadFactor); - m_Index.max_load_factor(IndexMaxLoadFactor); + m_Index.min_load_factor(cache::impl::IndexMinLoadFactor); + m_Index.max_load_factor(cache::impl::IndexMaxLoadFactor); if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) { @@ -740,11 +761,11 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo CreateDirectories(m_BucketDir); - std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); + std::filesystem::path ManifestPath = cache::impl::GetManifestPath(m_BucketDir, m_BucketName); bool IsNew = false; - BucketManifestSerializer ManifestReader; + cache::impl::BucketManifestSerializer ManifestReader; if (ManifestReader.Open(ManifestPath)) { @@ -760,7 +781,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, - BucketManifestSerializer::CurrentDiskBucketVersion); + cache::impl::BucketManifestSerializer::CurrentDiskBucketVersion); IsNew = true; } } @@ -814,11 +835,11 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, namespace fs = std::filesystem; - fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + fs::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName); try { - const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); + const uint64_t IndexSize = sizeof(cache::impl::CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) @@ -852,14 +873,14 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, // all data is written to the file BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024); - CacheBucketIndexHeader Header = {.EntryCount = EntryCount, - .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; + cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount, + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; - Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); - IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + Header.Checksum = cache::impl::CacheBucketIndexHeader::ComputeChecksum(Header); + IndexWriter.Write(&Header, sizeof(cache::impl::CacheBucketIndexHeader), 0); - uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader); + uint64_t IndexWriteOffset = sizeof(cache::impl::CacheBucketIndexHeader); for (auto& Entry : m_Index) { @@ -886,7 +907,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition, // We must only update the log flush position once the snapshot write succeeds if (FlushLockPosition) { - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName); if (std::filesystem::is_regular_file(LogPath)) { @@ -928,12 +949,12 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t FileSize = ObjectIndexFile.FileSize(); - if (FileSize < sizeof(CacheBucketIndexHeader)) + if (FileSize < sizeof(cache::impl::CacheBucketIndexHeader)) { return 0; } - CacheBucketIndexHeader Header; + cache::impl::CacheBucketIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if (!Header.IsValid()) @@ -941,12 +962,12 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const return 0; } - if (Header.Version != CacheBucketIndexHeader::Version2) + if (Header.Version != cache::impl::CacheBucketIndexHeader::Version2) { return 0; } - const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(cache::impl::CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); if (Header.EntryCount > ExpectedEntryCount) { return 0; @@ -967,7 +988,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024); - uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader); + uint64_t CurrentReadOffset = sizeof(cache::impl::CacheBucketIndexHeader); uint64_t RemainingEntryCount = Header.EntryCount; std::string InvalidEntryReason; @@ -976,7 +997,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset); CurrentReadOffset += sizeof(DiskIndexEntry); - if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) + if (!cache::impl::ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; @@ -993,7 +1014,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount())); - OutVersion = CacheBucketIndexHeader::Version2; + OutVersion = cache::impl::CacheBucketIndexHeader::Version2; return Header.LogPosition; } @@ -1040,7 +1061,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std:: return; } - if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) + if (!cache::impl::ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; @@ -1077,8 +1098,8 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco m_MemCachedPayloads.clear(); m_FreeMemCachedPayloads.clear(); - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); - std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName); + std::filesystem::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName); if (IsNew) { @@ -1136,8 +1157,8 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco } else { - const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); - KnownBlocks.Add(BlockLocation.BlockIndex); + uint32_t BlockIndex = Location.Location.BlockLocation.GetBlockIndex(); + KnownBlocks.insert(BlockIndex); } } m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); @@ -1246,6 +1267,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept size_t IndexOffset = 0; m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { + ZEN_MEMSCOPE(GetCacheDiskTag()); std::vector<DiskIndexEntry> DiskEntries; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); @@ -1262,7 +1284,9 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept } if (m_TrackedReferences && HashKeyAndReferences.size() > 1) { - m_TrackedReferences->insert(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + m_TrackedReferences->insert(m_TrackedReferences->end(), + HashKeyAndReferences.begin() + 1, + HashKeyAndReferences.end()); } if (auto It = m_Index.find(HashKey); It != m_Index.end()) { @@ -1311,6 +1335,8 @@ struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle ResultIndexes.reserve(OutResults.capacity()); } + ~GetBatchHandle() {} + std::vector<IoHash> Keys; std::vector<size_t> ResultIndexes; @@ -1432,7 +1458,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept if (SetMetaInfo) { // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the - // metadata separately, check if it had time to set the metdata + // metadata separately, check if it had time to set the metadata RwLock::SharedLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { @@ -1501,7 +1527,8 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept { ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline"); m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool { - const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 32u * 1024u); + // Only read into memory the IoBuffers we could potentially add to memcache + const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 1u * 1024u); m_BlockStore.IterateBlock( InlineBlockLocations, ChunkIndexes, @@ -1552,7 +1579,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept ZenCacheValue& OutValue = Batch->OutResults[ResultIndex]; { // See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the - // metadata separately, check if it had time to set the metdata + // metadata separately, check if it had time to set the metadata RwLock::SharedLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end()) { @@ -1622,6 +1649,8 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept } } } + + delete Batch; } catch (const std::exception& Ex) { @@ -1798,43 +1827,41 @@ ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) uint64_t Trimmed = 0; GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); - if (MemCachedCount == 0) - { - return 0; - } + std::vector<IoBuffer> PurgedBuffers; - uint32_t WriteIndex = 0; - for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) { - MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; - if (!Data.Payload) + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); + if (MemCachedCount == 0) { - continue; + return 0; } - PayloadIndex Index = Data.OwnerIndex; - ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); - GcClock::Tick AccessTime = m_AccessTimes[Index]; - if (AccessTime < ExpireTicks) - { - size_t PayloadSize = Data.Payload.GetSize(); - RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); - Data = {}; - m_Payloads[Index].MemCached = {}; - Trimmed += PayloadSize; - continue; - } - if (ReadIndex > WriteIndex) + PurgedBuffers.reserve(MemCachedCount); + + for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) { - m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index}; - m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex); + MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; + if (!Data.Payload) + { + continue; + } + PayloadIndex Index = Data.OwnerIndex; + ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); + GcClock::Tick AccessTime = m_AccessTimes[Index]; + if (AccessTime < ExpireTicks) + { + size_t PayloadSize = Data.Payload.GetSize(); + RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); + PurgedBuffers.emplace_back(std::move(Data.Payload)); + Data.OwnerIndex = {}; + // Data = {}; + m_Payloads[Index].MemCached = {}; + Trimmed += PayloadSize; + m_FreeMemCachedPayloads.push_back(MemCachedIndex(ReadIndex)); + } } - WriteIndex++; } - m_MemCachedPayloads.resize(WriteIndex); - m_MemCachedPayloads.shrink_to_fit(); - zen::Reset(m_FreeMemCachedPayloads); + PurgedBuffers.clear(); return Trimmed; } @@ -1843,23 +1870,40 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock { ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess"); - size_t SlotCount = InOutUsageSlots.capacity(); - RwLock::SharedLockScope _(m_IndexLock); - uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); - if (MemCachedCount == 0) - { - return; - } - for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) + std::vector<uint64_t> PayloadSizes; + std::vector<AccessTime> AccessTimes; + + size_t SlotCount = InOutUsageSlots.capacity(); + { - MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; - if (!Data.Payload) + RwLock::SharedLockScope _(m_IndexLock); + uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); + if (MemCachedCount == 0) { - continue; + return; + } + PayloadSizes.reserve(MemCachedCount); + AccessTimes.reserve(MemCachedCount); + for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) + { + MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; + if (!Data.Payload) + { + continue; + } + PayloadIndex Index = Data.OwnerIndex; + ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); + PayloadSizes.push_back(Data.Payload.GetSize()); + AccessTimes.push_back(m_AccessTimes[Index]); } - PayloadIndex Index = Data.OwnerIndex; - ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); - GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); + } + + auto PayloadSizeIt = PayloadSizes.begin(); + auto AccessTimeIt = AccessTimes.begin(); + for (PayloadSizeIt = PayloadSizes.begin(); PayloadSizeIt != PayloadSizes.end(); PayloadSizeIt++) + { + ZEN_ASSERT_SLOW(AccessTimeIt != AccessTimes.end()); + GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(*AccessTimeIt)); GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0); size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1); ZEN_ASSERT_SLOW(Slot < SlotCount); @@ -1867,7 +1911,8 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock { InOutUsageSlots.resize(Slot + 1, 0); } - InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize()); + InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(*PayloadSizeIt); + AccessTimeIt++; } } @@ -1887,7 +1932,7 @@ ZenCacheDiskLayer::CacheBucket::Drop() m_BlockStore.Close(); m_SlogFile.Close(); - bool Deleted = MoveAndDeleteDirectory(m_BucketDir); + const bool Deleted = cache::impl::MoveAndDeleteDirectory(m_BucketDir); m_Index.clear(); m_Payloads.clear(); @@ -1937,8 +1982,8 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl { bool UseLegacyScheme = false; - IoBuffer Buffer; - BucketManifestSerializer ManifestWriter; + IoBuffer Buffer; + cache::impl::BucketManifestSerializer ManifestWriter; if (UseLegacyScheme) { @@ -2018,7 +2063,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl } ManifestWriter.WriteSidecarFile(IndexLock, - GetMetaPath(m_BucketDir, m_BucketName), + cache::impl::GetMetaPath(m_BucketDir, m_BucketName), m_LogFlushPosition, m_Index, m_AccessTimes, @@ -2026,7 +2071,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl m_MetaDatas); } - std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); + const std::filesystem::path ManifestPath = cache::impl::GetManifestPath(m_BucketDir, m_BucketName); TemporaryFile::SafeWriteFile(ManifestPath, Buffer.GetView()); } catch (const std::exception& Err) @@ -2190,7 +2235,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) }; m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { - return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk); + return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0); }); } catch (ScrubDeadlineExpiredException&) @@ -2275,466 +2320,6 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) } } -void -ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::Bucket::GatherReferences"); - -#define CALCULATE_BLOCKING_TIME 0 - -#if CALCULATE_BLOCKING_TIME - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; -#endif // CALCULATE_BLOCKING_TIME - - Stopwatch TotalTimer; - const auto _ = MakeGuard([&] { -#if CALCULATE_BLOCKING_TIME - ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", - m_BucketDir, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs)); -#else - ZEN_DEBUG("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); -#endif // CALCULATE_BLOCKING_TIME - }); - - const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime(); - - const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - - IndexMap Index; - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - { - RwLock::SharedLockScope __(m_IndexLock); -#if CALCULATE_BLOCKING_TIME - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); -#endif // CALCULATE_BLOCKING_TIME - if (m_Index.empty()) - { - return; - } - Index = m_Index; - AccessTimes = m_AccessTimes; - Payloads = m_Payloads; - } - - std::vector<IoHash> ExpiredKeys; - ExpiredKeys.reserve(1024); - - std::vector<IoHash> Cids; - if (!GcCtx.SkipCid()) - { - Cids.reserve(1024); - } - - std::vector<std::pair<IoHash, size_t>> StructuredItemsWithUnknownAttachments; - - for (const auto& Entry : Index) - { - const IoHash& Key = Entry.first; - size_t PayloadIndex = Entry.second; - GcClock::Tick AccessTime = AccessTimes[PayloadIndex]; - if (AccessTime < ExpireTicks) - { - ExpiredKeys.push_back(Key); - continue; - } - - if (GcCtx.SkipCid()) - { - continue; - } - - BucketPayload& Payload = Payloads[PayloadIndex]; - const DiskLocation& Loc = Payload.Location; - - if (!Loc.IsFlagSet(DiskLocation::kStructured)) - { - continue; - } - StructuredItemsWithUnknownAttachments.push_back(Entry); - } - - for (const auto& Entry : StructuredItemsWithUnknownAttachments) - { - const IoHash& Key = Entry.first; - BucketPayload& Payload = Payloads[Entry.second]; - const DiskLocation& Loc = Payload.Location; - { - IoBuffer Buffer; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer) - { - continue; - } - } - else - { - RwLock::SharedLockScope IndexLock(m_IndexLock); -#if CALCULATE_BLOCKING_TIME - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); -#endif // CALCULATE_BLOCKING_TIME - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - const BucketPayload& CachedPayload = m_Payloads[It->second]; - if (CachedPayload.MemCached) - { - Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload; - ZEN_ASSERT_SLOW(Buffer); - } - else - { - DiskLocation Location = m_Payloads[It->second].Location; - IndexLock.ReleaseNow(); - Buffer = GetInlineCacheValue(Location); - // Don't memcache items when doing GC - } - } - if (!Buffer) - { - continue; - } - } - - ZEN_ASSERT(Buffer); - ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); - CbObjectView Obj(Buffer.GetData()); - Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); - if (Cids.size() >= 1024) - { - GcCtx.AddRetainedCids(Cids); - Cids.clear(); - } - } - } - - GcCtx.AddRetainedCids(Cids); - GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); -} - -void -ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage"); - - ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); - - Stopwatch TotalTimer; - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - uint64_t TotalChunkCount = 0; - uint64_t DeletedSize = 0; - GcStorageSize OldTotalSize = StorageSize(); - - std::unordered_set<IoHash> DeletedChunks; - uint64_t MovedCount = 0; - - const auto _ = MakeGuard([&] { - ZEN_DEBUG( - "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " - "{} " - "of {} " - "entries ({}/{}).", - m_BucketDir, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs), - NiceBytes(DeletedSize), - DeletedChunks.size(), - MovedCount, - TotalChunkCount, - NiceBytes(OldTotalSize.DiskSize), - NiceBytes(OldTotalSize.MemorySize)); - - bool Expected = false; - if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) - { - return; - } - auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - - try - { - SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed to write index and manifest after GC in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); - } - }); - - auto __ = MakeGuard([&]() { - if (!DeletedChunks.empty()) - { - // Clean up m_AccessTimes and m_Payloads vectors - std::vector<BucketPayload> Payloads; - std::vector<AccessTime> AccessTimes; - std::vector<BucketMetaData> MetaDatas; - std::vector<MemCacheData> MemCachedPayloads; - IndexMap Index; - { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index); - } - GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end())); - } - }); - - std::span<const IoHash> ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); - if (ExpiredCacheKeySpan.empty()) - { - return; - } - - m_SlogFile.Flush(); - - std::unordered_set<IoHash, IoHash::Hasher> ExpiredCacheKeys(ExpiredCacheKeySpan.begin(), ExpiredCacheKeySpan.end()); - - std::vector<DiskIndexEntry> ExpiredStandaloneEntries; - IndexMap IndexSnapshot; - std::vector<BucketPayload> PayloadsSnapshot; - BlockStore::ReclaimSnapshotState BlockStoreState; - { - bool Expected = false; - if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) - { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir); - return; - } - auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - - { - ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::State"); - RwLock::SharedLockScope IndexLock(m_IndexLock); - - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - - BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - - for (const IoHash& Key : ExpiredCacheKeys) - { - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - const BucketPayload& Payload = m_Payloads[It->second]; - if (Payload.Location.Flags & DiskLocation::kStandaloneFile) - { - DiskIndexEntry Entry = {.Key = Key, .Location = Payload.Location}; - Entry.Location.Flags |= DiskLocation::kTombStone; - ExpiredStandaloneEntries.push_back(Entry); - } - } - } - - PayloadsSnapshot = m_Payloads; - IndexSnapshot = m_Index; - - if (GcCtx.IsDeletionMode()) - { - IndexLock.ReleaseNow(); - RwLock::ExclusiveLockScope __(m_IndexLock); - for (const auto& Entry : ExpiredStandaloneEntries) - { - if (m_Index.erase(Entry.Key) == 1) - { - m_StandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - DeletedChunks.insert(Entry.Key); - } - } - m_SlogFile.Append(ExpiredStandaloneEntries); - } - } - } - - if (GcCtx.IsDeletionMode()) - { - ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::Delete"); - - ExtendablePathBuilder<256> Path; - - for (const auto& Entry : ExpiredStandaloneEntries) - { - const IoHash& Key = Entry.Key; - - Path.Reset(); - BuildPath(Path, Key); - fs::path FilePath = Path.ToPath(); - - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_Index.contains(Key)) - { - // Someone added it back, let the file on disk be - ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); - continue; - } - IndexLock.ReleaseNow(); - - RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); - if (fs::is_regular_file(FilePath)) - { - ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); - std::error_code Ec; - fs::remove(FilePath, Ec); - if (Ec) - { - ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); - continue; - } - } - } - DeletedSize += Entry.Location.Size(); - } - } - - TotalChunkCount = IndexSnapshot.size(); - - std::vector<BlockStoreLocation> ChunkLocations; - BlockStore::ChunkIndexArray KeepChunkIndexes; - std::vector<IoHash> ChunkIndexToChunkHash; - ChunkLocations.reserve(TotalChunkCount); - ChunkLocations.reserve(TotalChunkCount); - ChunkIndexToChunkHash.reserve(TotalChunkCount); - { - TotalChunkCount = 0; - for (const auto& Entry : IndexSnapshot) - { - size_t EntryIndex = Entry.second; - const DiskLocation& DiskLocation = PayloadsSnapshot[EntryIndex].Location; - - if (DiskLocation.Flags & DiskLocation::kStandaloneFile) - { - continue; - } - const IoHash& Key = Entry.first; - BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment); - size_t ChunkIndex = ChunkLocations.size(); - ChunkLocations.push_back(Location); - ChunkIndexToChunkHash.push_back(Key); - if (ExpiredCacheKeys.contains(Key)) - { - continue; - } - KeepChunkIndexes.push_back(ChunkIndex); - } - } - TotalChunkCount = ChunkLocations.size(); - size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); - - const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); - if (!PerformDelete) - { - m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, true); - GcStorageSize CurrentTotalSize = StorageSize(); - ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} ({}/{})", - m_BucketDir, - DeleteCount, - TotalChunkCount, - NiceBytes(CurrentTotalSize.DiskSize), - NiceBytes(CurrentTotalSize.MemorySize)); - return; - } - - m_BlockStore.ReclaimSpace( - BlockStoreState, - ChunkLocations, - KeepChunkIndexes, - m_Configuration.PayloadAlignment, - false, - [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); - { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const BlockStoreLocation& NewLocation = Entry.second; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - size_t EntryIndex = m_Index[ChunkHash]; - BucketPayload& Payload = m_Payloads[EntryIndex]; - if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != m_Payloads[EntryIndex].Location) - { - // Entry has been updated while GC was running, ignore the move - continue; - } - Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); - LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location}); - } - for (const size_t ChunkIndex : RemovedChunks) - { - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - size_t EntryIndex = m_Index[ChunkHash]; - BucketPayload& Payload = m_Payloads[EntryIndex]; - if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != Payload.Location) - { - // Entry has been updated while GC was running, ignore the delete - continue; - } - const DiskLocation& OldDiskLocation = Payload.Location; - LogEntries.push_back({.Key = ChunkHash, - .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment), - m_Configuration.PayloadAlignment, - OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); - - RemoveMemCachedData(IndexLock, Payload); - RemoveMetaData(IndexLock, Payload); - - m_Index.erase(ChunkHash); - DeletedChunks.insert(ChunkHash); - } - } - - m_SlogFile.Append(LogEntries); - m_SlogFile.Flush(); - }, - [&]() { return GcCtx.ClaimGCReserve(); }); -} - ZenCacheDiskLayer::BucketStats ZenCacheDiskLayer::CacheBucket::Stats() { @@ -2817,30 +2402,6 @@ ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents( } void -ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::CollectGarbage"); - - std::vector<CacheBucket*> Buckets; - { - RwLock::SharedLockScope _(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Buckets.push_back(Kv.second.get()); - } - } - for (CacheBucket* Bucket : Buckets) - { - Bucket->CollectGarbage(GcCtx); - } - if (!m_IsMemCacheTrimming) - { - MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); - } -} - -void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) { ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue"); @@ -2963,7 +2524,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c } if (m_TrackedReferences) { - m_TrackedReferences->insert(References.begin(), References.end()); + m_TrackedReferences->insert(m_TrackedReferences->end(), References.begin(), References.end()); } PayloadIndex EntryIndex = {}; @@ -3119,6 +2680,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, Value.Value.Size(), m_Configuration.PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { + ZEN_MEMSCOPE(GetCacheDiskTag()); ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation"); DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); m_SlogFile.Append({.Key = HashKey, .Location = Location}); @@ -3130,7 +2692,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, } if (m_TrackedReferences) { - m_TrackedReferences->insert(References.begin(), References.end()); + m_TrackedReferences->insert(m_TrackedReferences->end(), References.begin(), References.end()); } if (auto It = m_Index.find(HashKey); It != m_Index.end()) { @@ -3184,7 +2746,7 @@ public: Stopwatch Timer; const auto _ = MakeGuard([&] { - Reset(m_ExpiredStandaloneKeys); + cache::impl::Reset(m_ExpiredStandaloneKeys); if (!Ctx.Settings.Verbose) { return; @@ -3457,10 +3019,6 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { return nullptr; } - if (m_Index.empty()) - { - return nullptr; - } TotalEntries = m_Index.size(); @@ -3556,7 +3114,7 @@ ZenCacheDiskLayer::CacheBucket::ReadAttachmentsFromMetaData(uint32_t BlockI ZEN_TRACE_CPU("Z$::Bucket::GetAttachmentsFromMetaData"); return GetAttachmentsFromMetaData<IoHash, IoHash>( MetaDataPayload, - BlockMetaDataExpectedMagic, + cache::impl::BlockMetaDataExpectedMagic, [&](std::span<const IoHash> Keys, std::span<const uint32_t> AttachmentCounts, std::span<const IoHash> Attachments) { auto AttachmentReadIt = Attachments.begin(); OutReferences.resize(OutReferences.size() + Attachments.size()); @@ -3568,10 +3126,9 @@ ZenCacheDiskLayer::CacheBucket::ReadAttachmentsFromMetaData(uint32_t BlockI { if (WantedKeys.contains(*KeyIt)) { - for (uint32_t It = 0u; It < AttachmentCount; It++) - { - *OutReferencesWriteIt++ = *AttachmentReadIt++; - } + memcpy(&(*OutReferencesWriteIt), &(*AttachmentReadIt), sizeof(IoHash) * AttachmentCount); + OutReferencesWriteIt += AttachmentCount; + AttachmentReadIt += AttachmentCount; } else { @@ -3587,11 +3144,17 @@ ZenCacheDiskLayer::CacheBucket::ReadAttachmentsFromMetaData(uint32_t BlockI } bool -ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHash>& OutReferences) +ZenCacheDiskLayer::CacheBucket::GetReferences(const LoggerRef& Logger, + std::atomic_bool& IsCancelledFlag, + bool StateIsAlreadyLocked, + bool ReadCacheAttachmentMetaData, + bool WriteCacheAttachmentMetaData, + std::vector<IoHash>& OutReferences, + ReferencesStats* OptionalOutReferencesStats) { ZEN_TRACE_CPU("Z$::Bucket::GetReferencesLocked"); - auto Log = [&Ctx]() { return Ctx.Logger; }; + auto Log = [&Logger]() { return Logger; }; auto GetAttachments = [&](MemoryView Data) -> bool { if (ValidateCompactBinary(Data, CbValidateMode::Default) == CbValidateError::None) @@ -3611,10 +3174,14 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa { std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes; - + std::unique_ptr<RwLock::SharedLockScope> StateLock; + if (!StateIsAlreadyLocked) + { + StateLock = std::make_unique<RwLock::SharedLockScope>(m_IndexLock); + } for (const auto& Entry : m_Index) { - if (Ctx.IsCancelledFlag.load()) + if (IsCancelledFlag.load()) { return false; } @@ -3623,14 +3190,29 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa const BucketPayload& Payload = m_Payloads[EntryIndex]; const DiskLocation& Loc = Payload.Location; + if (OptionalOutReferencesStats != nullptr) + { + OptionalOutReferencesStats->ValueSizes.push_back(Loc.Size()); + } + if (!Loc.IsFlagSet(DiskLocation::kStructured)) { continue; } + + if (OptionalOutReferencesStats) + { + OptionalOutReferencesStats->StructuredValuesCount++; + } + const IoHash& Key = Entry.first; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { StandaloneKeys.push_back(std::make_pair(Key, Loc)); + if (OptionalOutReferencesStats) + { + OptionalOutReferencesStats->StandaloneValuesCount++; + } continue; } @@ -3650,20 +3232,22 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa } } - for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes) + OutReferences.reserve(OutReferences.size() + InlineKeys.size() + + StandaloneKeys.size()); // Make space for at least one attachment per record + + for (const std::vector<std::size_t>& ChunkIndexes : InlineBlockChunkIndexes) { ZEN_ASSERT(!ChunkIndexes.empty()); uint32_t BlockIndex = InlineLocations[ChunkIndexes[0]].BlockIndex; - if (!m_Configuration.StoreAttachmentMetaData || - !ReadAttachmentsFromMetaData(BlockIndex, InlineKeys, ChunkIndexes, OutReferences)) + if (!ReadCacheAttachmentMetaData || !ReadAttachmentsFromMetaData(BlockIndex, InlineKeys, ChunkIndexes, OutReferences)) { std::vector<IoHash> Keys; std::vector<uint32_t> AttachmentCounts; size_t PrecachedReferencesStart = OutReferences.size(); size_t NextPrecachedReferencesStart = PrecachedReferencesStart; - bool WriteMetaData = m_Configuration.StoreAttachmentMetaData && !m_BlockStore.IsWriting(BlockIndex); + bool WriteMetaData = WriteCacheAttachmentMetaData && !m_BlockStore.IsWriting(BlockIndex); if (WriteMetaData) { Keys.reserve(InlineLocations.size()); @@ -3672,12 +3256,15 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa auto CaptureAttachments = [&](size_t ChunkIndex, MemoryView Data) { if (GetAttachments(Data)) { - size_t AttachmentCount = OutReferences.size() - NextPrecachedReferencesStart; - if (WriteMetaData && AttachmentCount > 0) + if (WriteMetaData) { - Keys.push_back(InlineKeys[ChunkIndex]); - AttachmentCounts.push_back(gsl::narrow<uint32_t>(AttachmentCount)); - NextPrecachedReferencesStart += AttachmentCount; + size_t AttachmentCount = OutReferences.size() - NextPrecachedReferencesStart; + if (AttachmentCount > 0) + { + Keys.push_back(InlineKeys[ChunkIndex]); + AttachmentCounts.push_back(gsl::narrow<uint32_t>(AttachmentCount)); + NextPrecachedReferencesStart += AttachmentCount; + } } } }; @@ -3688,13 +3275,14 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa [&](size_t ChunkIndex, const void* Data, uint64_t Size) { ZEN_UNUSED(ChunkIndex); CaptureAttachments(ChunkIndex, MemoryView(Data, Size)); - return !Ctx.IsCancelledFlag.load(); + return !IsCancelledFlag.load(); }, [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { ZEN_UNUSED(ChunkIndex); CaptureAttachments(ChunkIndex, File.GetChunk(Offset, Size).GetView()); - return !Ctx.IsCancelledFlag.load(); - }); + return !IsCancelledFlag.load(); + }, + 0); if (Continue) { @@ -3703,7 +3291,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa ZEN_ASSERT(Keys.size() == AttachmentCounts.size()); IoBuffer MetaDataPayload = BuildReferenceMetaData<IoHash>( - BlockMetaDataExpectedMagic, + cache::impl::BlockMetaDataExpectedMagic, Keys, AttachmentCounts, std::span<const IoHash>(OutReferences) @@ -3718,7 +3306,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa return false; } } - if (Ctx.IsCancelledFlag.load()) + if (IsCancelledFlag.load()) { return false; } @@ -3727,7 +3315,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa for (const auto& It : StandaloneKeys) { - if (Ctx.IsCancelledFlag.load()) + if (IsCancelledFlag.load()) { return false; } @@ -3778,19 +3366,25 @@ public: } ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, - m_References.size(), + m_PrecachedReferences.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); }); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<std::vector<IoHash>>(); }); - RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); - bool Continue = m_CacheBucket.GetReferencesLocked(Ctx, m_References); - IndexLock.ReleaseNow(); + bool Continue = m_CacheBucket.GetReferences(Ctx.Logger, + Ctx.IsCancelledFlag, + /*StateIsAlreadyLocked*/ false, + Ctx.Settings.StoreCacheAttachmentMetaData, + Ctx.Settings.StoreCacheAttachmentMetaData, + m_PrecachedReferences, + /*OptionalOutReferencesStats*/ nullptr); if (!Continue) { m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); + return; } + FilterReferences(Ctx, fmt::format("cachebucket [PRECACHE] '{}'", m_CacheBucket.m_BucketDir), m_PrecachedReferences); } virtual void UpdateLockedState(GcCtx& Ctx) override @@ -3807,32 +3401,32 @@ public: } ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, - m_References.size(), + m_PrecachedReferences.size() + m_AddedReferences.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); if (Ctx.IsCancelledFlag.load()) { - m_References = {}; + m_PrecachedReferences = {}; m_CacheBucket.m_TrackedReferences.reset(); return; } ZEN_ASSERT(m_CacheBucket.m_TrackedReferences); - HashSet& AddedReferences(*m_CacheBucket.m_TrackedReferences); - m_References.reserve(m_References.size() + AddedReferences.size()); - m_References.insert(m_References.end(), AddedReferences.begin(), AddedReferences.end()); - AddedReferences = {}; + m_AddedReferences = std::move(*m_CacheBucket.m_TrackedReferences); + FilterReferences(Ctx, fmt::format("cachebucket [LOCKSTATE] '{}'", m_CacheBucket.m_BucketDir), m_AddedReferences); } - virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override + virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override { - ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet"); + ZEN_TRACE_CPU("Z$::Bucket::GetUnusedReferences"); auto Log = [&Ctx]() { return Ctx.Logger; }; - size_t InitialCount = IoCids.size(); + const size_t InitialCount = IoCids.size(); + size_t UsedCount = InitialCount; + Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) @@ -3841,24 +3435,20 @@ public: } ZEN_INFO("GCV2: cachebucket [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", m_CacheBucket.m_BucketDir, - InitialCount - IoCids.size(), + UsedCount, InitialCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - for (const IoHash& ReferenceHash : m_References) - { - if (IoCids.erase(ReferenceHash) == 1) - { - if (IoCids.empty()) - { - return; - } - } - } + std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_PrecachedReferences, IoCids); + UnusedReferences = KeepUnusedReferences(m_AddedReferences, UnusedReferences); + UsedCount = IoCids.size() - UnusedReferences.size(); + return UnusedReferences; } + CacheBucket& m_CacheBucket; - std::vector<IoHash> m_References; + std::vector<IoHash> m_PrecachedReferences; + std::vector<IoHash> m_AddedReferences; }; std::vector<GcReferenceChecker*> @@ -3888,6 +3478,12 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) return {new DiskBucketReferenceChecker(*this)}; } +std::vector<GcReferenceValidator*> +ZenCacheDiskLayer::CacheBucket::CreateReferenceValidators(GcCtx& /*Ctx*/) +{ + return {}; +} + void ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, std::vector<BucketPayload>& Payloads, @@ -3902,8 +3498,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, Payloads.reserve(EntryCount); AccessTimes.reserve(EntryCount); Index.reserve(EntryCount); - Index.min_load_factor(IndexMinLoadFactor); - Index.max_load_factor(IndexMaxLoadFactor); + Index.min_load_factor(cache::impl::IndexMinLoadFactor); + Index.max_load_factor(cache::impl::IndexMaxLoadFactor); for (auto It : m_Index) { PayloadIndex EntryIndex = PayloadIndex(Payloads.size()); @@ -3927,9 +3523,9 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, m_Payloads.swap(Payloads); m_AccessTimes.swap(AccessTimes); m_MetaDatas.swap(MetaDatas); - Reset(m_FreeMetaDatas); + cache::impl::Reset(m_FreeMetaDatas); m_MemCachedPayloads.swap(MemCachedPayloads); - Reset(m_FreeMemCachedPayloads); + cache::impl::Reset(m_FreeMemCachedPayloads); } RwLock::SharedLockScope @@ -4028,12 +3624,10 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) CacheBucket* Result = Bucket.get(); m_Buckets.emplace(BucketName, std::move(Bucket)); - m_UpdateCaptureLock.WithExclusiveLock([&]() { - if (m_CapturedBuckets) - { - m_CapturedBuckets->push_back(std::string(BucketName)); - } - }); + if (m_CapturedBuckets) + { + m_CapturedBuckets->push_back(std::string(BucketName)); + } return Result; } @@ -4323,6 +3917,8 @@ ZenCacheDiskLayer::DiscoverBuckets() { WorkLatch.AddCount(1); Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() { + ZEN_MEMSCOPE(GetCacheDiskTag()); + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); const std::string BucketName = PathToUtf8(BucketPath.stem()); try @@ -4380,7 +3976,7 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket) // Make sure we remove the folder even if we don't know about the bucket std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); - return MoveAndDeleteDirectory(BucketPath); + return cache::impl::MoveAndDeleteDirectory(BucketPath); } bool @@ -4403,12 +3999,13 @@ ZenCacheDiskLayer::Drop() return false; } } - return MoveAndDeleteDirectory(m_RootDir); + return cache::impl::MoveAndDeleteDirectory(m_RootDir); } void ZenCacheDiskLayer::Flush() { + ZEN_MEMSCOPE(GetCacheDiskTag()); ZEN_TRACE_CPU("Z$::Flush"); std::vector<CacheBucket*> Buckets; @@ -4439,6 +4036,8 @@ ZenCacheDiskLayer::Flush() { WorkLatch.AddCount(1); Pool.ScheduleWork([&WorkLatch, Bucket]() { + ZEN_MEMSCOPE(GetCacheDiskTag()); + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); try { @@ -4498,26 +4097,6 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) } } -void -ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::GatherReferences"); - - std::vector<CacheBucket*> Buckets; - { - RwLock::SharedLockScope _(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Buckets.push_back(Kv.second.get()); - } - } - for (CacheBucket* Bucket : Buckets) - { - Bucket->GatherReferences(GcCtx); - } -} - GcStorageSize ZenCacheDiskLayer::StorageSize() const { @@ -4630,7 +4209,7 @@ ZenCacheDiskLayer::GetGcReferencerLocks() void ZenCacheDiskLayer::EnableUpdateCapture() { - m_UpdateCaptureLock.WithExclusiveLock([&]() { + m_Lock.WithExclusiveLock([&]() { if (m_UpdateCaptureRefCounter == 0) { ZEN_ASSERT(!m_CapturedBuckets); @@ -4647,7 +4226,7 @@ ZenCacheDiskLayer::EnableUpdateCapture() void ZenCacheDiskLayer::DisableUpdateCapture() { - m_UpdateCaptureLock.WithExclusiveLock([&]() { + m_Lock.WithExclusiveLock([&]() { ZEN_ASSERT(m_CapturedBuckets); ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); m_UpdateCaptureRefCounter--; @@ -4659,9 +4238,8 @@ ZenCacheDiskLayer::DisableUpdateCapture() } std::vector<std::string> -ZenCacheDiskLayer::GetCapturedBuckets() +ZenCacheDiskLayer::GetCapturedBucketsLocked() { - RwLock::SharedLockScope _(m_UpdateCaptureLock); if (m_CapturedBuckets) { return *m_CapturedBuckets; @@ -4669,8 +4247,26 @@ ZenCacheDiskLayer::GetCapturedBuckets() return {}; } -void -ZenCacheDiskLayer::MemCacheTrim() +bool +ZenCacheDiskLayer::GetContentStats(std::string_view BucketName, CacheContentStats& OutContentStats) const +{ + std::atomic_bool CancelFlag = false; + if (auto It = m_Buckets.find(std::string(BucketName)); It != m_Buckets.end()) + { + CacheBucket::ReferencesStats BucketStats; + if (It->second->GetReferences(Log(), CancelFlag, false, true, false, OutContentStats.Attachments, &BucketStats)) + { + OutContentStats.ValueSizes = std::move(BucketStats.ValueSizes); + OutContentStats.StructuredValuesCount = BucketStats.StructuredValuesCount; + OutContentStats.StandaloneValuesCount = BucketStats.StandaloneValuesCount; + return true; + } + } + return false; +} + +bool +ZenCacheDiskLayer::StartAsyncMemCacheTrim() { ZEN_TRACE_CPU("Z$::MemCacheTrim"); @@ -4681,71 +4277,79 @@ ZenCacheDiskLayer::MemCacheTrim() bool Expected = false; if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true)) { - return; + return false; } try { - m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) { - ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); - - const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); - uint64_t TrimmedSize = 0; - Stopwatch Timer; - const auto Guard = MakeGuard([&] { - ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", - NiceBytes(TrimmedSize), - NiceBytes(m_TotalMemCachedSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - - const GcClock::Tick NowTick = GcClock::TickCount(); - const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_NextAllowedTrimTick.store(NextTrimTick); - m_IsMemCacheTrimming.store(false); - }); + m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) { MemCacheTrim(); }); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what()); + m_IsMemCacheTrimming.store(false); + return false; + } + return true; +} - const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); +void +ZenCacheDiskLayer::MemCacheTrim() +{ + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim"); + + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); + uint64_t TrimmedSize = 0; + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", + NiceBytes(TrimmedSize), + NiceBytes(m_TotalMemCachedSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - static const size_t UsageSlotCount = 2048; - std::vector<uint64_t> UsageSlots; - UsageSlots.reserve(UsageSlotCount); + const GcClock::Tick NowTick = GcClock::TickCount(); + const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_NextAllowedTrimTick.store(NextTrimTick); + m_IsMemCacheTrimming.store(false); + }); - std::vector<CacheBucket*> Buckets; - { - RwLock::SharedLockScope __(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Buckets.push_back(Kv.second.get()); - } - } + const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); - const GcClock::TimePoint Now = GcClock::Now(); - { - ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess"); - for (CacheBucket* Bucket : Buckets) - { - Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots); - } - } + static const size_t UsageSlotCount = 2048; + std::vector<uint64_t> UsageSlots; + UsageSlots.reserve(UsageSlotCount); - uint64_t TotalSize = 0; - for (size_t Index = 0; Index < UsageSlots.size(); ++Index) - { - TotalSize += UsageSlots[Index]; - if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) - { - GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount); - TrimmedSize = MemCacheTrim(Buckets, ExpireTime); - break; - } - } - }); + std::vector<CacheBucket*> Buckets; + { + RwLock::SharedLockScope __(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Buckets.push_back(Kv.second.get()); + } } - catch (const std::exception& Ex) + + const GcClock::TimePoint Now = GcClock::Now(); { - ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what()); - m_IsMemCacheTrimming.store(false); + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess"); + for (CacheBucket* Bucket : Buckets) + { + Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots); + } + } + + const uint64_t MemCacheTargetFootprintBytes = (m_Configuration.MemCacheTargetFootprintBytes * 75) / 100; + + uint64_t TotalSize = 0; + for (size_t Index = 0; Index < UsageSlots.size(); ++Index) + { + TotalSize += UsageSlots[Index]; + if (TotalSize >= MemCacheTargetFootprintBytes) + { + GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount); + TrimmedSize = MemCacheTrim(Buckets, ExpireTime); + break; + } } } |