aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp1038
1 files changed, 321 insertions, 717 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 110acba9e..cbc1d6e83 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -22,9 +22,19 @@
//////////////////////////////////////////////////////////////////////////
+#include <zencore/memory/llm.h>
+
namespace zen {
-namespace {
+const FLLMTag&
+GetCacheDiskTag()
+{
+ static FLLMTag _("disk", FLLMTag("cache"));
+
+ return _;
+}
+
+namespace cache::impl {
#pragma pack(push)
#pragma pack(1)
@@ -214,11 +224,15 @@ namespace {
zen::Sleep(100);
} while (true);
}
-} // namespace
+} // namespace cache::impl
namespace fs = std::filesystem;
using namespace std::literals;
+} // namespace zen
+
+namespace zen::cache::impl {
+
class BucketManifestSerializer
{
using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex;
@@ -561,7 +575,8 @@ BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& B
if (Header.EntryCount > ExpectedEntryCount)
{
ZEN_WARN(
- "Failed to read sidecar file '{}'. File is not large enough to hold expected entry count. Header count: {}, file size count: "
+ "Failed to read sidecar file '{}'. File is not large enough to hold expected entry count. Header count: {}, file size "
+ "count: "
"{}",
SidecarPath,
Header.EntryCount,
@@ -685,6 +700,12 @@ BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&,
static const float IndexMinLoadFactor = 0.2f;
static const float IndexMaxLoadFactor = 0.7f;
+} // namespace zen::cache::impl
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
std::atomic_uint64_t& OuterCacheMemoryUsage,
std::string BucketName,
@@ -695,8 +716,8 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
, m_Configuration(Config)
, m_BucketId(Oid::Zero)
{
- m_Index.min_load_factor(IndexMinLoadFactor);
- m_Index.max_load_factor(IndexMaxLoadFactor);
+ m_Index.min_load_factor(cache::impl::IndexMinLoadFactor);
+ m_Index.max_load_factor(cache::impl::IndexMaxLoadFactor);
if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap")))
{
@@ -740,11 +761,11 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
CreateDirectories(m_BucketDir);
- std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
+ std::filesystem::path ManifestPath = cache::impl::GetManifestPath(m_BucketDir, m_BucketName);
bool IsNew = false;
- BucketManifestSerializer ManifestReader;
+ cache::impl::BucketManifestSerializer ManifestReader;
if (ManifestReader.Open(ManifestPath))
{
@@ -760,7 +781,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
ZEN_INFO("Wiping bucket '{}', found version {}, required version {}",
BucketDir,
Version,
- BucketManifestSerializer::CurrentDiskBucketVersion);
+ cache::impl::BucketManifestSerializer::CurrentDiskBucketVersion);
IsNew = true;
}
}
@@ -814,11 +835,11 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition,
namespace fs = std::filesystem;
- fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ fs::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName);
try
{
- const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry);
+ const uint64_t IndexSize = sizeof(cache::impl::CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry);
std::error_code Error;
DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
if (Error)
@@ -852,14 +873,14 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition,
// all data is written to the file
BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024);
- CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
- .LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
+ cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
- Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
- IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
+ Header.Checksum = cache::impl::CacheBucketIndexHeader::ComputeChecksum(Header);
+ IndexWriter.Write(&Header, sizeof(cache::impl::CacheBucketIndexHeader), 0);
- uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader);
+ uint64_t IndexWriteOffset = sizeof(cache::impl::CacheBucketIndexHeader);
for (auto& Entry : m_Index)
{
@@ -886,7 +907,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool FlushLockPosition,
// We must only update the log flush position once the snapshot write succeeds
if (FlushLockPosition)
{
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName);
if (std::filesystem::is_regular_file(LogPath))
{
@@ -928,12 +949,12 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
BasicFile ObjectIndexFile;
ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
uint64_t FileSize = ObjectIndexFile.FileSize();
- if (FileSize < sizeof(CacheBucketIndexHeader))
+ if (FileSize < sizeof(cache::impl::CacheBucketIndexHeader))
{
return 0;
}
- CacheBucketIndexHeader Header;
+ cache::impl::CacheBucketIndexHeader Header;
ObjectIndexFile.Read(&Header, sizeof(Header), 0);
if (!Header.IsValid())
@@ -941,12 +962,12 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
return 0;
}
- if (Header.Version != CacheBucketIndexHeader::Version2)
+ if (Header.Version != cache::impl::CacheBucketIndexHeader::Version2)
{
return 0;
}
- const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(cache::impl::CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
if (Header.EntryCount > ExpectedEntryCount)
{
return 0;
@@ -967,7 +988,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024);
- uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader);
+ uint64_t CurrentReadOffset = sizeof(cache::impl::CacheBucketIndexHeader);
uint64_t RemainingEntryCount = Header.EntryCount;
std::string InvalidEntryReason;
@@ -976,7 +997,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset);
CurrentReadOffset += sizeof(DiskIndexEntry);
- if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason))
+ if (!cache::impl::ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason))
{
ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
continue;
@@ -993,7 +1014,7 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const
m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount()));
- OutVersion = CacheBucketIndexHeader::Version2;
+ OutVersion = cache::impl::CacheBucketIndexHeader::Version2;
return Header.LogPosition;
}
@@ -1040,7 +1061,7 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::
return;
}
- if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
+ if (!cache::impl::ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
{
ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
++InvalidEntryCount;
@@ -1077,8 +1098,8 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
m_MemCachedPayloads.clear();
m_FreeMemCachedPayloads.clear();
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
- std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ std::filesystem::path LogPath = cache::impl::GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path IndexPath = cache::impl::GetIndexPath(m_BucketDir, m_BucketName);
if (IsNew)
{
@@ -1136,8 +1157,8 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
}
else
{
- const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment);
- KnownBlocks.Add(BlockLocation.BlockIndex);
+ uint32_t BlockIndex = Location.Location.BlockLocation.GetBlockIndex();
+ KnownBlocks.insert(BlockIndex);
}
}
m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
@@ -1246,6 +1267,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
size_t IndexOffset = 0;
m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
+ ZEN_MEMSCOPE(GetCacheDiskTag());
std::vector<DiskIndexEntry> DiskEntries;
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
@@ -1262,7 +1284,9 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
}
if (m_TrackedReferences && HashKeyAndReferences.size() > 1)
{
- m_TrackedReferences->insert(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end());
+ m_TrackedReferences->insert(m_TrackedReferences->end(),
+ HashKeyAndReferences.begin() + 1,
+ HashKeyAndReferences.end());
}
if (auto It = m_Index.find(HashKey); It != m_Index.end())
{
@@ -1311,6 +1335,8 @@ struct ZenCacheDiskLayer::CacheBucket::GetBatchHandle
ResultIndexes.reserve(OutResults.capacity());
}
+ ~GetBatchHandle() {}
+
std::vector<IoHash> Keys;
std::vector<size_t> ResultIndexes;
@@ -1432,7 +1458,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
if (SetMetaInfo)
{
// See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the
- // metadata separately, check if it had time to set the metdata
+ // metadata separately, check if it had time to set the metadata
RwLock::SharedLockScope UpdateIndexLock(m_IndexLock);
if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
{
@@ -1501,7 +1527,8 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
{
ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::ReadInline");
m_BlockStore.IterateChunks(InlineBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) -> bool {
- const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 32u * 1024u);
+ // Only read into memory the IoBuffers we could potentially add to memcache
+ const uint64_t LargeChunkSizeLimit = Max(m_Configuration.MemCacheSizeThreshold, 1u * 1024u);
m_BlockStore.IterateBlock(
InlineBlockLocations,
ChunkIndexes,
@@ -1552,7 +1579,7 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
ZenCacheValue& OutValue = Batch->OutResults[ResultIndex];
{
// See ZenCacheDiskLayer::CacheBucket::Get - it sets the memcache part first and then if it needs to it set the
- // metadata separately, check if it had time to set the metdata
+ // metadata separately, check if it had time to set the metadata
RwLock::SharedLockScope UpdateIndexLock(m_IndexLock);
if (auto UpdateIt = m_Index.find(Key); UpdateIt != m_Index.end())
{
@@ -1622,6 +1649,8 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
}
}
}
+
+ delete Batch;
}
catch (const std::exception& Ex)
{
@@ -1798,43 +1827,41 @@ ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime)
uint64_t Trimmed = 0;
GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
- if (MemCachedCount == 0)
- {
- return 0;
- }
+ std::vector<IoBuffer> PurgedBuffers;
- uint32_t WriteIndex = 0;
- for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
{
- MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
- if (!Data.Payload)
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
+ if (MemCachedCount == 0)
{
- continue;
+ return 0;
}
- PayloadIndex Index = Data.OwnerIndex;
- ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
- GcClock::Tick AccessTime = m_AccessTimes[Index];
- if (AccessTime < ExpireTicks)
- {
- size_t PayloadSize = Data.Payload.GetSize();
- RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
- Data = {};
- m_Payloads[Index].MemCached = {};
- Trimmed += PayloadSize;
- continue;
- }
- if (ReadIndex > WriteIndex)
+ PurgedBuffers.reserve(MemCachedCount);
+
+ for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
{
- m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index};
- m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex);
+ MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
+ if (!Data.Payload)
+ {
+ continue;
+ }
+ PayloadIndex Index = Data.OwnerIndex;
+ ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
+ GcClock::Tick AccessTime = m_AccessTimes[Index];
+ if (AccessTime < ExpireTicks)
+ {
+ size_t PayloadSize = Data.Payload.GetSize();
+ RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
+ PurgedBuffers.emplace_back(std::move(Data.Payload));
+ Data.OwnerIndex = {};
+ // Data = {};
+ m_Payloads[Index].MemCached = {};
+ Trimmed += PayloadSize;
+ m_FreeMemCachedPayloads.push_back(MemCachedIndex(ReadIndex));
+ }
}
- WriteIndex++;
}
- m_MemCachedPayloads.resize(WriteIndex);
- m_MemCachedPayloads.shrink_to_fit();
- zen::Reset(m_FreeMemCachedPayloads);
+ PurgedBuffers.clear();
return Trimmed;
}
@@ -1843,23 +1870,40 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock
{
ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess");
- size_t SlotCount = InOutUsageSlots.capacity();
- RwLock::SharedLockScope _(m_IndexLock);
- uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
- if (MemCachedCount == 0)
- {
- return;
- }
- for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
+ std::vector<uint64_t> PayloadSizes;
+ std::vector<AccessTime> AccessTimes;
+
+ size_t SlotCount = InOutUsageSlots.capacity();
+
{
- MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
- if (!Data.Payload)
+ RwLock::SharedLockScope _(m_IndexLock);
+ uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
+ if (MemCachedCount == 0)
{
- continue;
+ return;
+ }
+ PayloadSizes.reserve(MemCachedCount);
+ AccessTimes.reserve(MemCachedCount);
+ for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
+ {
+ MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
+ if (!Data.Payload)
+ {
+ continue;
+ }
+ PayloadIndex Index = Data.OwnerIndex;
+ ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
+ PayloadSizes.push_back(Data.Payload.GetSize());
+ AccessTimes.push_back(m_AccessTimes[Index]);
}
- PayloadIndex Index = Data.OwnerIndex;
- ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
- GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
+ }
+
+ auto PayloadSizeIt = PayloadSizes.begin();
+ auto AccessTimeIt = AccessTimes.begin();
+ for (PayloadSizeIt = PayloadSizes.begin(); PayloadSizeIt != PayloadSizes.end(); PayloadSizeIt++)
+ {
+ ZEN_ASSERT_SLOW(AccessTimeIt != AccessTimes.end());
+ GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(*AccessTimeIt));
GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0);
size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1);
ZEN_ASSERT_SLOW(Slot < SlotCount);
@@ -1867,7 +1911,8 @@ ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock
{
InOutUsageSlots.resize(Slot + 1, 0);
}
- InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize());
+ InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(*PayloadSizeIt);
+ AccessTimeIt++;
}
}
@@ -1887,7 +1932,7 @@ ZenCacheDiskLayer::CacheBucket::Drop()
m_BlockStore.Close();
m_SlogFile.Close();
- bool Deleted = MoveAndDeleteDirectory(m_BucketDir);
+ const bool Deleted = cache::impl::MoveAndDeleteDirectory(m_BucketDir);
m_Index.clear();
m_Payloads.clear();
@@ -1937,8 +1982,8 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
{
bool UseLegacyScheme = false;
- IoBuffer Buffer;
- BucketManifestSerializer ManifestWriter;
+ IoBuffer Buffer;
+ cache::impl::BucketManifestSerializer ManifestWriter;
if (UseLegacyScheme)
{
@@ -2018,7 +2063,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
}
ManifestWriter.WriteSidecarFile(IndexLock,
- GetMetaPath(m_BucketDir, m_BucketName),
+ cache::impl::GetMetaPath(m_BucketDir, m_BucketName),
m_LogFlushPosition,
m_Index,
m_AccessTimes,
@@ -2026,7 +2071,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
m_MetaDatas);
}
- std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
+ const std::filesystem::path ManifestPath = cache::impl::GetManifestPath(m_BucketDir, m_BucketName);
TemporaryFile::SafeWriteFile(ManifestPath, Buffer.GetView());
}
catch (const std::exception& Err)
@@ -2190,7 +2235,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
};
m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
- return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk);
+ return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0);
});
}
catch (ScrubDeadlineExpiredException&)
@@ -2275,466 +2320,6 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
}
}
-void
-ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::Bucket::GatherReferences");
-
-#define CALCULATE_BLOCKING_TIME 0
-
-#if CALCULATE_BLOCKING_TIME
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
-#endif // CALCULATE_BLOCKING_TIME
-
- Stopwatch TotalTimer;
- const auto _ = MakeGuard([&] {
-#if CALCULATE_BLOCKING_TIME
- ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})",
- m_BucketDir,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs));
-#else
- ZEN_DEBUG("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()));
-#endif // CALCULATE_BLOCKING_TIME
- });
-
- const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime();
-
- const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
-
- IndexMap Index;
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- {
- RwLock::SharedLockScope __(m_IndexLock);
-#if CALCULATE_BLOCKING_TIME
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
-#endif // CALCULATE_BLOCKING_TIME
- if (m_Index.empty())
- {
- return;
- }
- Index = m_Index;
- AccessTimes = m_AccessTimes;
- Payloads = m_Payloads;
- }
-
- std::vector<IoHash> ExpiredKeys;
- ExpiredKeys.reserve(1024);
-
- std::vector<IoHash> Cids;
- if (!GcCtx.SkipCid())
- {
- Cids.reserve(1024);
- }
-
- std::vector<std::pair<IoHash, size_t>> StructuredItemsWithUnknownAttachments;
-
- for (const auto& Entry : Index)
- {
- const IoHash& Key = Entry.first;
- size_t PayloadIndex = Entry.second;
- GcClock::Tick AccessTime = AccessTimes[PayloadIndex];
- if (AccessTime < ExpireTicks)
- {
- ExpiredKeys.push_back(Key);
- continue;
- }
-
- if (GcCtx.SkipCid())
- {
- continue;
- }
-
- BucketPayload& Payload = Payloads[PayloadIndex];
- const DiskLocation& Loc = Payload.Location;
-
- if (!Loc.IsFlagSet(DiskLocation::kStructured))
- {
- continue;
- }
- StructuredItemsWithUnknownAttachments.push_back(Entry);
- }
-
- for (const auto& Entry : StructuredItemsWithUnknownAttachments)
- {
- const IoHash& Key = Entry.first;
- BucketPayload& Payload = Payloads[Entry.second];
- const DiskLocation& Loc = Payload.Location;
- {
- IoBuffer Buffer;
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer)
- {
- continue;
- }
- }
- else
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
-#if CALCULATE_BLOCKING_TIME
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
-#endif // CALCULATE_BLOCKING_TIME
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- const BucketPayload& CachedPayload = m_Payloads[It->second];
- if (CachedPayload.MemCached)
- {
- Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload;
- ZEN_ASSERT_SLOW(Buffer);
- }
- else
- {
- DiskLocation Location = m_Payloads[It->second].Location;
- IndexLock.ReleaseNow();
- Buffer = GetInlineCacheValue(Location);
- // Don't memcache items when doing GC
- }
- }
- if (!Buffer)
- {
- continue;
- }
- }
-
- ZEN_ASSERT(Buffer);
- ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
- CbObjectView Obj(Buffer.GetData());
- Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
- if (Cids.size() >= 1024)
- {
- GcCtx.AddRetainedCids(Cids);
- Cids.clear();
- }
- }
- }
-
- GcCtx.AddRetainedCids(Cids);
- GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys));
-}
-
-void
-ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage");
-
- ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir);
-
- Stopwatch TotalTimer;
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
- uint64_t TotalChunkCount = 0;
- uint64_t DeletedSize = 0;
- GcStorageSize OldTotalSize = StorageSize();
-
- std::unordered_set<IoHash> DeletedChunks;
- uint64_t MovedCount = 0;
-
- const auto _ = MakeGuard([&] {
- ZEN_DEBUG(
- "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved "
- "{} "
- "of {} "
- "entries ({}/{}).",
- m_BucketDir,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs),
- NiceBytes(DeletedSize),
- DeletedChunks.size(),
- MovedCount,
- TotalChunkCount,
- NiceBytes(OldTotalSize.DiskSize),
- NiceBytes(OldTotalSize.MemorySize));
-
- bool Expected = false;
- if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
- {
- return;
- }
- auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
-
- try
- {
- SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); });
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed to write index and manifest after GC in '{}'. Reason: '{}'", m_BucketDir, Ex.what());
- }
- });
-
- auto __ = MakeGuard([&]() {
- if (!DeletedChunks.empty())
- {
- // Clean up m_AccessTimes and m_Payloads vectors
- std::vector<BucketPayload> Payloads;
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketMetaData> MetaDatas;
- std::vector<MemCacheData> MemCachedPayloads;
- IndexMap Index;
- {
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index);
- }
- GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end()));
- }
- });
-
- std::span<const IoHash> ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string());
- if (ExpiredCacheKeySpan.empty())
- {
- return;
- }
-
- m_SlogFile.Flush();
-
- std::unordered_set<IoHash, IoHash::Hasher> ExpiredCacheKeys(ExpiredCacheKeySpan.begin(), ExpiredCacheKeySpan.end());
-
- std::vector<DiskIndexEntry> ExpiredStandaloneEntries;
- IndexMap IndexSnapshot;
- std::vector<BucketPayload> PayloadsSnapshot;
- BlockStore::ReclaimSnapshotState BlockStoreState;
- {
- bool Expected = false;
- if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
- {
- ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir);
- return;
- }
- auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
-
- {
- ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::State");
- RwLock::SharedLockScope IndexLock(m_IndexLock);
-
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
-
- BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
-
- for (const IoHash& Key : ExpiredCacheKeys)
- {
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- const BucketPayload& Payload = m_Payloads[It->second];
- if (Payload.Location.Flags & DiskLocation::kStandaloneFile)
- {
- DiskIndexEntry Entry = {.Key = Key, .Location = Payload.Location};
- Entry.Location.Flags |= DiskLocation::kTombStone;
- ExpiredStandaloneEntries.push_back(Entry);
- }
- }
- }
-
- PayloadsSnapshot = m_Payloads;
- IndexSnapshot = m_Index;
-
- if (GcCtx.IsDeletionMode())
- {
- IndexLock.ReleaseNow();
- RwLock::ExclusiveLockScope __(m_IndexLock);
- for (const auto& Entry : ExpiredStandaloneEntries)
- {
- if (m_Index.erase(Entry.Key) == 1)
- {
- m_StandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
- DeletedChunks.insert(Entry.Key);
- }
- }
- m_SlogFile.Append(ExpiredStandaloneEntries);
- }
- }
- }
-
- if (GcCtx.IsDeletionMode())
- {
- ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::Delete");
-
- ExtendablePathBuilder<256> Path;
-
- for (const auto& Entry : ExpiredStandaloneEntries)
- {
- const IoHash& Key = Entry.Key;
-
- Path.Reset();
- BuildPath(Path, Key);
- fs::path FilePath = Path.ToPath();
-
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (m_Index.contains(Key))
- {
- // Someone added it back, let the file on disk be
- ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
- continue;
- }
- IndexLock.ReleaseNow();
-
- RwLock::ExclusiveLockScope ValueLock(LockForHash(Key));
- if (fs::is_regular_file(FilePath))
- {
- ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
- std::error_code Ec;
- fs::remove(FilePath, Ec);
- if (Ec)
- {
- ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message());
- continue;
- }
- }
- }
- DeletedSize += Entry.Location.Size();
- }
- }
-
- TotalChunkCount = IndexSnapshot.size();
-
- std::vector<BlockStoreLocation> ChunkLocations;
- BlockStore::ChunkIndexArray KeepChunkIndexes;
- std::vector<IoHash> ChunkIndexToChunkHash;
- ChunkLocations.reserve(TotalChunkCount);
- ChunkLocations.reserve(TotalChunkCount);
- ChunkIndexToChunkHash.reserve(TotalChunkCount);
- {
- TotalChunkCount = 0;
- for (const auto& Entry : IndexSnapshot)
- {
- size_t EntryIndex = Entry.second;
- const DiskLocation& DiskLocation = PayloadsSnapshot[EntryIndex].Location;
-
- if (DiskLocation.Flags & DiskLocation::kStandaloneFile)
- {
- continue;
- }
- const IoHash& Key = Entry.first;
- BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment);
- size_t ChunkIndex = ChunkLocations.size();
- ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash.push_back(Key);
- if (ExpiredCacheKeys.contains(Key))
- {
- continue;
- }
- KeepChunkIndexes.push_back(ChunkIndex);
- }
- }
- TotalChunkCount = ChunkLocations.size();
- size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size();
-
- const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
- if (!PerformDelete)
- {
- m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, true);
- GcStorageSize CurrentTotalSize = StorageSize();
- ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} ({}/{})",
- m_BucketDir,
- DeleteCount,
- TotalChunkCount,
- NiceBytes(CurrentTotalSize.DiskSize),
- NiceBytes(CurrentTotalSize.MemorySize));
- return;
- }
-
- m_BlockStore.ReclaimSpace(
- BlockStoreState,
- ChunkLocations,
- KeepChunkIndexes,
- m_Configuration.PayloadAlignment,
- false,
- [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
- {
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- size_t EntryIndex = m_Index[ChunkHash];
- BucketPayload& Payload = m_Payloads[EntryIndex];
- if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != m_Payloads[EntryIndex].Location)
- {
- // Entry has been updated while GC was running, ignore the move
- continue;
- }
- Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags());
- LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location});
- }
- for (const size_t ChunkIndex : RemovedChunks)
- {
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- size_t EntryIndex = m_Index[ChunkHash];
- BucketPayload& Payload = m_Payloads[EntryIndex];
- if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != Payload.Location)
- {
- // Entry has been updated while GC was running, ignore the delete
- continue;
- }
- const DiskLocation& OldDiskLocation = Payload.Location;
- LogEntries.push_back({.Key = ChunkHash,
- .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment),
- m_Configuration.PayloadAlignment,
- OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
-
- RemoveMemCachedData(IndexLock, Payload);
- RemoveMetaData(IndexLock, Payload);
-
- m_Index.erase(ChunkHash);
- DeletedChunks.insert(ChunkHash);
- }
- }
-
- m_SlogFile.Append(LogEntries);
- m_SlogFile.Flush();
- },
- [&]() { return GcCtx.ClaimGCReserve(); });
-}
-
ZenCacheDiskLayer::BucketStats
ZenCacheDiskLayer::CacheBucket::Stats()
{
@@ -2817,30 +2402,6 @@ ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents(
}
void
-ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::CollectGarbage");
-
- std::vector<CacheBucket*> Buckets;
- {
- RwLock::SharedLockScope _(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Buckets.push_back(Kv.second.get());
- }
- }
- for (CacheBucket* Bucket : Buckets)
- {
- Bucket->CollectGarbage(GcCtx);
- }
- if (!m_IsMemCacheTrimming)
- {
- MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
- }
-}
-
-void
ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue");
@@ -2963,7 +2524,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
}
if (m_TrackedReferences)
{
- m_TrackedReferences->insert(References.begin(), References.end());
+ m_TrackedReferences->insert(m_TrackedReferences->end(), References.begin(), References.end());
}
PayloadIndex EntryIndex = {};
@@ -3119,6 +2680,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
Value.Value.Size(),
m_Configuration.PayloadAlignment,
[&](const BlockStoreLocation& BlockStoreLocation) {
+ ZEN_MEMSCOPE(GetCacheDiskTag());
ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation");
DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags);
m_SlogFile.Append({.Key = HashKey, .Location = Location});
@@ -3130,7 +2692,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
}
if (m_TrackedReferences)
{
- m_TrackedReferences->insert(References.begin(), References.end());
+ m_TrackedReferences->insert(m_TrackedReferences->end(), References.begin(), References.end());
}
if (auto It = m_Index.find(HashKey); It != m_Index.end())
{
@@ -3184,7 +2746,7 @@ public:
Stopwatch Timer;
const auto _ = MakeGuard([&] {
- Reset(m_ExpiredStandaloneKeys);
+ cache::impl::Reset(m_ExpiredStandaloneKeys);
if (!Ctx.Settings.Verbose)
{
return;
@@ -3457,10 +3019,6 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
{
return nullptr;
}
- if (m_Index.empty())
- {
- return nullptr;
- }
TotalEntries = m_Index.size();
@@ -3556,7 +3114,7 @@ ZenCacheDiskLayer::CacheBucket::ReadAttachmentsFromMetaData(uint32_t BlockI
ZEN_TRACE_CPU("Z$::Bucket::GetAttachmentsFromMetaData");
return GetAttachmentsFromMetaData<IoHash, IoHash>(
MetaDataPayload,
- BlockMetaDataExpectedMagic,
+ cache::impl::BlockMetaDataExpectedMagic,
[&](std::span<const IoHash> Keys, std::span<const uint32_t> AttachmentCounts, std::span<const IoHash> Attachments) {
auto AttachmentReadIt = Attachments.begin();
OutReferences.resize(OutReferences.size() + Attachments.size());
@@ -3568,10 +3126,9 @@ ZenCacheDiskLayer::CacheBucket::ReadAttachmentsFromMetaData(uint32_t BlockI
{
if (WantedKeys.contains(*KeyIt))
{
- for (uint32_t It = 0u; It < AttachmentCount; It++)
- {
- *OutReferencesWriteIt++ = *AttachmentReadIt++;
- }
+ memcpy(&(*OutReferencesWriteIt), &(*AttachmentReadIt), sizeof(IoHash) * AttachmentCount);
+ OutReferencesWriteIt += AttachmentCount;
+ AttachmentReadIt += AttachmentCount;
}
else
{
@@ -3587,11 +3144,17 @@ ZenCacheDiskLayer::CacheBucket::ReadAttachmentsFromMetaData(uint32_t BlockI
}
bool
-ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHash>& OutReferences)
+ZenCacheDiskLayer::CacheBucket::GetReferences(const LoggerRef& Logger,
+ std::atomic_bool& IsCancelledFlag,
+ bool StateIsAlreadyLocked,
+ bool ReadCacheAttachmentMetaData,
+ bool WriteCacheAttachmentMetaData,
+ std::vector<IoHash>& OutReferences,
+ ReferencesStats* OptionalOutReferencesStats)
{
ZEN_TRACE_CPU("Z$::Bucket::GetReferencesLocked");
- auto Log = [&Ctx]() { return Ctx.Logger; };
+ auto Log = [&Logger]() { return Logger; };
auto GetAttachments = [&](MemoryView Data) -> bool {
if (ValidateCompactBinary(Data, CbValidateMode::Default) == CbValidateError::None)
@@ -3611,10 +3174,14 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa
{
std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes;
-
+ std::unique_ptr<RwLock::SharedLockScope> StateLock;
+ if (!StateIsAlreadyLocked)
+ {
+ StateLock = std::make_unique<RwLock::SharedLockScope>(m_IndexLock);
+ }
for (const auto& Entry : m_Index)
{
- if (Ctx.IsCancelledFlag.load())
+ if (IsCancelledFlag.load())
{
return false;
}
@@ -3623,14 +3190,29 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa
const BucketPayload& Payload = m_Payloads[EntryIndex];
const DiskLocation& Loc = Payload.Location;
+ if (OptionalOutReferencesStats != nullptr)
+ {
+ OptionalOutReferencesStats->ValueSizes.push_back(Loc.Size());
+ }
+
if (!Loc.IsFlagSet(DiskLocation::kStructured))
{
continue;
}
+
+ if (OptionalOutReferencesStats)
+ {
+ OptionalOutReferencesStats->StructuredValuesCount++;
+ }
+
const IoHash& Key = Entry.first;
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
StandaloneKeys.push_back(std::make_pair(Key, Loc));
+ if (OptionalOutReferencesStats)
+ {
+ OptionalOutReferencesStats->StandaloneValuesCount++;
+ }
continue;
}
@@ -3650,20 +3232,22 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa
}
}
- for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes)
+ OutReferences.reserve(OutReferences.size() + InlineKeys.size() +
+ StandaloneKeys.size()); // Make space for at least one attachment per record
+
+ for (const std::vector<std::size_t>& ChunkIndexes : InlineBlockChunkIndexes)
{
ZEN_ASSERT(!ChunkIndexes.empty());
uint32_t BlockIndex = InlineLocations[ChunkIndexes[0]].BlockIndex;
- if (!m_Configuration.StoreAttachmentMetaData ||
- !ReadAttachmentsFromMetaData(BlockIndex, InlineKeys, ChunkIndexes, OutReferences))
+ if (!ReadCacheAttachmentMetaData || !ReadAttachmentsFromMetaData(BlockIndex, InlineKeys, ChunkIndexes, OutReferences))
{
std::vector<IoHash> Keys;
std::vector<uint32_t> AttachmentCounts;
size_t PrecachedReferencesStart = OutReferences.size();
size_t NextPrecachedReferencesStart = PrecachedReferencesStart;
- bool WriteMetaData = m_Configuration.StoreAttachmentMetaData && !m_BlockStore.IsWriting(BlockIndex);
+ bool WriteMetaData = WriteCacheAttachmentMetaData && !m_BlockStore.IsWriting(BlockIndex);
if (WriteMetaData)
{
Keys.reserve(InlineLocations.size());
@@ -3672,12 +3256,15 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa
auto CaptureAttachments = [&](size_t ChunkIndex, MemoryView Data) {
if (GetAttachments(Data))
{
- size_t AttachmentCount = OutReferences.size() - NextPrecachedReferencesStart;
- if (WriteMetaData && AttachmentCount > 0)
+ if (WriteMetaData)
{
- Keys.push_back(InlineKeys[ChunkIndex]);
- AttachmentCounts.push_back(gsl::narrow<uint32_t>(AttachmentCount));
- NextPrecachedReferencesStart += AttachmentCount;
+ size_t AttachmentCount = OutReferences.size() - NextPrecachedReferencesStart;
+ if (AttachmentCount > 0)
+ {
+ Keys.push_back(InlineKeys[ChunkIndex]);
+ AttachmentCounts.push_back(gsl::narrow<uint32_t>(AttachmentCount));
+ NextPrecachedReferencesStart += AttachmentCount;
+ }
}
}
};
@@ -3688,13 +3275,14 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa
[&](size_t ChunkIndex, const void* Data, uint64_t Size) {
ZEN_UNUSED(ChunkIndex);
CaptureAttachments(ChunkIndex, MemoryView(Data, Size));
- return !Ctx.IsCancelledFlag.load();
+ return !IsCancelledFlag.load();
},
[&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
ZEN_UNUSED(ChunkIndex);
CaptureAttachments(ChunkIndex, File.GetChunk(Offset, Size).GetView());
- return !Ctx.IsCancelledFlag.load();
- });
+ return !IsCancelledFlag.load();
+ },
+ 0);
if (Continue)
{
@@ -3703,7 +3291,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa
ZEN_ASSERT(Keys.size() == AttachmentCounts.size());
IoBuffer MetaDataPayload =
BuildReferenceMetaData<IoHash>(
- BlockMetaDataExpectedMagic,
+ cache::impl::BlockMetaDataExpectedMagic,
Keys,
AttachmentCounts,
std::span<const IoHash>(OutReferences)
@@ -3718,7 +3306,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa
return false;
}
}
- if (Ctx.IsCancelledFlag.load())
+ if (IsCancelledFlag.load())
{
return false;
}
@@ -3727,7 +3315,7 @@ ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHa
for (const auto& It : StandaloneKeys)
{
- if (Ctx.IsCancelledFlag.load())
+ if (IsCancelledFlag.load())
{
return false;
}
@@ -3778,19 +3366,25 @@ public:
}
ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}",
m_CacheBucket.m_BucketDir,
- m_References.size(),
+ m_PrecachedReferences.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); });
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<std::vector<IoHash>>(); });
- RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock);
- bool Continue = m_CacheBucket.GetReferencesLocked(Ctx, m_References);
- IndexLock.ReleaseNow();
+ bool Continue = m_CacheBucket.GetReferences(Ctx.Logger,
+ Ctx.IsCancelledFlag,
+ /*StateIsAlreadyLocked*/ false,
+ Ctx.Settings.StoreCacheAttachmentMetaData,
+ Ctx.Settings.StoreCacheAttachmentMetaData,
+ m_PrecachedReferences,
+ /*OptionalOutReferencesStats*/ nullptr);
if (!Continue)
{
m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
+ return;
}
+ FilterReferences(Ctx, fmt::format("cachebucket [PRECACHE] '{}'", m_CacheBucket.m_BucketDir), m_PrecachedReferences);
}
virtual void UpdateLockedState(GcCtx& Ctx) override
@@ -3807,32 +3401,32 @@ public:
}
ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}",
m_CacheBucket.m_BucketDir,
- m_References.size(),
+ m_PrecachedReferences.size() + m_AddedReferences.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
if (Ctx.IsCancelledFlag.load())
{
- m_References = {};
+ m_PrecachedReferences = {};
m_CacheBucket.m_TrackedReferences.reset();
return;
}
ZEN_ASSERT(m_CacheBucket.m_TrackedReferences);
- HashSet& AddedReferences(*m_CacheBucket.m_TrackedReferences);
- m_References.reserve(m_References.size() + AddedReferences.size());
- m_References.insert(m_References.end(), AddedReferences.begin(), AddedReferences.end());
- AddedReferences = {};
+ m_AddedReferences = std::move(*m_CacheBucket.m_TrackedReferences);
+ FilterReferences(Ctx, fmt::format("cachebucket [LOCKSTATE] '{}'", m_CacheBucket.m_BucketDir), m_AddedReferences);
}
- virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
+ virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override
{
- ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet");
+ ZEN_TRACE_CPU("Z$::Bucket::GetUnusedReferences");
auto Log = [&Ctx]() { return Ctx.Logger; };
- size_t InitialCount = IoCids.size();
+ const size_t InitialCount = IoCids.size();
+ size_t UsedCount = InitialCount;
+
Stopwatch Timer;
const auto _ = MakeGuard([&] {
if (!Ctx.Settings.Verbose)
@@ -3841,24 +3435,20 @@ public:
}
ZEN_INFO("GCV2: cachebucket [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}",
m_CacheBucket.m_BucketDir,
- InitialCount - IoCids.size(),
+ UsedCount,
InitialCount,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- for (const IoHash& ReferenceHash : m_References)
- {
- if (IoCids.erase(ReferenceHash) == 1)
- {
- if (IoCids.empty())
- {
- return;
- }
- }
- }
+ std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_PrecachedReferences, IoCids);
+ UnusedReferences = KeepUnusedReferences(m_AddedReferences, UnusedReferences);
+ UsedCount = IoCids.size() - UnusedReferences.size();
+ return UnusedReferences;
}
+
CacheBucket& m_CacheBucket;
- std::vector<IoHash> m_References;
+ std::vector<IoHash> m_PrecachedReferences;
+ std::vector<IoHash> m_AddedReferences;
};
std::vector<GcReferenceChecker*>
@@ -3888,6 +3478,12 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
return {new DiskBucketReferenceChecker(*this)};
}
+std::vector<GcReferenceValidator*>
+ZenCacheDiskLayer::CacheBucket::CreateReferenceValidators(GcCtx& /*Ctx*/)
+{
+ return {};
+}
+
void
ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
std::vector<BucketPayload>& Payloads,
@@ -3902,8 +3498,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
Payloads.reserve(EntryCount);
AccessTimes.reserve(EntryCount);
Index.reserve(EntryCount);
- Index.min_load_factor(IndexMinLoadFactor);
- Index.max_load_factor(IndexMaxLoadFactor);
+ Index.min_load_factor(cache::impl::IndexMinLoadFactor);
+ Index.max_load_factor(cache::impl::IndexMaxLoadFactor);
for (auto It : m_Index)
{
PayloadIndex EntryIndex = PayloadIndex(Payloads.size());
@@ -3927,9 +3523,9 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
m_Payloads.swap(Payloads);
m_AccessTimes.swap(AccessTimes);
m_MetaDatas.swap(MetaDatas);
- Reset(m_FreeMetaDatas);
+ cache::impl::Reset(m_FreeMetaDatas);
m_MemCachedPayloads.swap(MemCachedPayloads);
- Reset(m_FreeMemCachedPayloads);
+ cache::impl::Reset(m_FreeMemCachedPayloads);
}
RwLock::SharedLockScope
@@ -4028,12 +3624,10 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
CacheBucket* Result = Bucket.get();
m_Buckets.emplace(BucketName, std::move(Bucket));
- m_UpdateCaptureLock.WithExclusiveLock([&]() {
- if (m_CapturedBuckets)
- {
- m_CapturedBuckets->push_back(std::string(BucketName));
- }
- });
+ if (m_CapturedBuckets)
+ {
+ m_CapturedBuckets->push_back(std::string(BucketName));
+ }
return Result;
}
@@ -4323,6 +3917,8 @@ ZenCacheDiskLayer::DiscoverBuckets()
{
WorkLatch.AddCount(1);
Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() {
+ ZEN_MEMSCOPE(GetCacheDiskTag());
+
auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
const std::string BucketName = PathToUtf8(BucketPath.stem());
try
@@ -4380,7 +3976,7 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
// Make sure we remove the folder even if we don't know about the bucket
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= std::string(InBucket);
- return MoveAndDeleteDirectory(BucketPath);
+ return cache::impl::MoveAndDeleteDirectory(BucketPath);
}
bool
@@ -4403,12 +3999,13 @@ ZenCacheDiskLayer::Drop()
return false;
}
}
- return MoveAndDeleteDirectory(m_RootDir);
+ return cache::impl::MoveAndDeleteDirectory(m_RootDir);
}
void
ZenCacheDiskLayer::Flush()
{
+ ZEN_MEMSCOPE(GetCacheDiskTag());
ZEN_TRACE_CPU("Z$::Flush");
std::vector<CacheBucket*> Buckets;
@@ -4439,6 +4036,8 @@ ZenCacheDiskLayer::Flush()
{
WorkLatch.AddCount(1);
Pool.ScheduleWork([&WorkLatch, Bucket]() {
+ ZEN_MEMSCOPE(GetCacheDiskTag());
+
auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
try
{
@@ -4498,26 +4097,6 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
}
}
-void
-ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("Z$::GatherReferences");
-
- std::vector<CacheBucket*> Buckets;
- {
- RwLock::SharedLockScope _(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Buckets.push_back(Kv.second.get());
- }
- }
- for (CacheBucket* Bucket : Buckets)
- {
- Bucket->GatherReferences(GcCtx);
- }
-}
-
GcStorageSize
ZenCacheDiskLayer::StorageSize() const
{
@@ -4630,7 +4209,7 @@ ZenCacheDiskLayer::GetGcReferencerLocks()
void
ZenCacheDiskLayer::EnableUpdateCapture()
{
- m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ m_Lock.WithExclusiveLock([&]() {
if (m_UpdateCaptureRefCounter == 0)
{
ZEN_ASSERT(!m_CapturedBuckets);
@@ -4647,7 +4226,7 @@ ZenCacheDiskLayer::EnableUpdateCapture()
void
ZenCacheDiskLayer::DisableUpdateCapture()
{
- m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ m_Lock.WithExclusiveLock([&]() {
ZEN_ASSERT(m_CapturedBuckets);
ZEN_ASSERT(m_UpdateCaptureRefCounter > 0);
m_UpdateCaptureRefCounter--;
@@ -4659,9 +4238,8 @@ ZenCacheDiskLayer::DisableUpdateCapture()
}
std::vector<std::string>
-ZenCacheDiskLayer::GetCapturedBuckets()
+ZenCacheDiskLayer::GetCapturedBucketsLocked()
{
- RwLock::SharedLockScope _(m_UpdateCaptureLock);
if (m_CapturedBuckets)
{
return *m_CapturedBuckets;
@@ -4669,8 +4247,26 @@ ZenCacheDiskLayer::GetCapturedBuckets()
return {};
}
-void
-ZenCacheDiskLayer::MemCacheTrim()
+bool
+ZenCacheDiskLayer::GetContentStats(std::string_view BucketName, CacheContentStats& OutContentStats) const
+{
+ std::atomic_bool CancelFlag = false;
+ if (auto It = m_Buckets.find(std::string(BucketName)); It != m_Buckets.end())
+ {
+ CacheBucket::ReferencesStats BucketStats;
+ if (It->second->GetReferences(Log(), CancelFlag, false, true, false, OutContentStats.Attachments, &BucketStats))
+ {
+ OutContentStats.ValueSizes = std::move(BucketStats.ValueSizes);
+ OutContentStats.StructuredValuesCount = BucketStats.StructuredValuesCount;
+ OutContentStats.StandaloneValuesCount = BucketStats.StandaloneValuesCount;
+ return true;
+ }
+ }
+ return false;
+}
+
+bool
+ZenCacheDiskLayer::StartAsyncMemCacheTrim()
{
ZEN_TRACE_CPU("Z$::MemCacheTrim");
@@ -4681,71 +4277,79 @@ ZenCacheDiskLayer::MemCacheTrim()
bool Expected = false;
if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true))
{
- return;
+ return false;
}
try
{
- m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) {
- ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
-
- const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
- uint64_t TrimmedSize = 0;
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] {
- ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
- NiceBytes(TrimmedSize),
- NiceBytes(m_TotalMemCachedSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
-
- const GcClock::Tick NowTick = GcClock::TickCount();
- const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_NextAllowedTrimTick.store(NextTrimTick);
- m_IsMemCacheTrimming.store(false);
- });
+ m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) { MemCacheTrim(); });
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what());
+ m_IsMemCacheTrimming.store(false);
+ return false;
+ }
+ return true;
+}
- const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
+void
+ZenCacheDiskLayer::MemCacheTrim()
+{
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim");
+
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
+ uint64_t TrimmedSize = 0;
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] {
+ ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
+ NiceBytes(TrimmedSize),
+ NiceBytes(m_TotalMemCachedSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- static const size_t UsageSlotCount = 2048;
- std::vector<uint64_t> UsageSlots;
- UsageSlots.reserve(UsageSlotCount);
+ const GcClock::Tick NowTick = GcClock::TickCount();
+ const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_NextAllowedTrimTick.store(NextTrimTick);
+ m_IsMemCacheTrimming.store(false);
+ });
- std::vector<CacheBucket*> Buckets;
- {
- RwLock::SharedLockScope __(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
- Buckets.push_back(Kv.second.get());
- }
- }
+ const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
- const GcClock::TimePoint Now = GcClock::Now();
- {
- ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess");
- for (CacheBucket* Bucket : Buckets)
- {
- Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots);
- }
- }
+ static const size_t UsageSlotCount = 2048;
+ std::vector<uint64_t> UsageSlots;
+ UsageSlots.reserve(UsageSlotCount);
- uint64_t TotalSize = 0;
- for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
- {
- TotalSize += UsageSlots[Index];
- if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
- {
- GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount);
- TrimmedSize = MemCacheTrim(Buckets, ExpireTime);
- break;
- }
- }
- });
+ std::vector<CacheBucket*> Buckets;
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ Buckets.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Buckets.push_back(Kv.second.get());
+ }
}
- catch (const std::exception& Ex)
+
+ const GcClock::TimePoint Now = GcClock::Now();
{
- ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what());
- m_IsMemCacheTrimming.store(false);
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess");
+ for (CacheBucket* Bucket : Buckets)
+ {
+ Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots);
+ }
+ }
+
+ const uint64_t MemCacheTargetFootprintBytes = (m_Configuration.MemCacheTargetFootprintBytes * 75) / 100;
+
+ uint64_t TotalSize = 0;
+ for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
+ {
+ TotalSize += UsageSlots[Index];
+ if (TotalSize >= MemCacheTargetFootprintBytes)
+ {
+ GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount);
+ TrimmedSize = MemCacheTrim(Buckets, ExpireTime);
+ break;
+ }
}
}