aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp443
1 files changed, 387 insertions, 56 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 177d37aa9..ce5770763 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -164,7 +164,13 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
//////////////////////////////////////////////////////////////////////////
-ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero)
+const size_t ZenCacheDiskLayer::CacheBucket::UnknownReferencesIndex;
+const size_t ZenCacheDiskLayer::CacheBucket::NoReferencesIndex;
+
+ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName, bool EnableReferenceCaching)
+: m_BucketName(std::move(BucketName))
+, m_BucketId(Oid::Zero)
+, m_EnableReferenceCaching(EnableReferenceCaching)
{
if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap")))
{
@@ -473,6 +479,10 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
sizeof(CacheBucketIndexHeader));
m_Payloads.reserve(Header.EntryCount);
+ if (m_EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.reserve(Header.EntryCount);
+ }
m_AccessTimes.reserve(Header.EntryCount);
m_Index.reserve(Header.EntryCount);
@@ -487,6 +497,10 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
size_t EntryIndex = m_Payloads.size();
m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location, .RawSize = 0, .RawHash = IoHash::Zero});
m_AccessTimes.emplace_back(GcClock::TickCount());
+ if (m_EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
+ }
m_Index.insert_or_assign(Entry.Key, EntryIndex);
EntryCount++;
}
@@ -546,6 +560,10 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
size_t EntryIndex = m_Payloads.size();
m_Payloads.emplace_back(BucketPayload{.Location = Record.Location, .RawSize = 0u, .RawHash = IoHash::Zero});
m_AccessTimes.emplace_back(GcClock::TickCount());
+ if (m_EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
+ }
m_Index.insert_or_assign(Record.Key, EntryIndex);
},
SkipEntryCount);
@@ -569,6 +587,10 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
m_Index.clear();
m_Payloads.clear();
m_AccessTimes.clear();
+ m_FirstReferenceIndex.clear();
+ m_ReferenceHashes.clear();
+ m_NextReferenceHashesIndexes.clear();
+ m_ReferenceCount = 0;
std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
@@ -711,7 +733,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con
}
IoBuffer
-ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const
+ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::GetStandaloneCacheValue");
@@ -722,7 +744,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath()))
{
- Data.SetContentType(Loc.GetContentType());
+ Data.SetContentType(ContentType);
return Data;
}
@@ -752,7 +774,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
{
// We don't need to hold the index lock when we read a standalone file
_.ReleaseNow();
- OutValue.Value = GetStandaloneCacheValue(Location, HashKey);
+ OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey);
}
else
{
@@ -798,15 +820,15 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
void
-ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
if (Value.Value.Size() >= m_LargeObjectThreshold)
{
- return PutStandaloneCacheValue(HashKey, Value);
+ return PutStandaloneCacheValue(HashKey, Value, References);
}
- PutInlineCacheValue(HashKey, Value);
+ PutInlineCacheValue(HashKey, Value, References);
m_WriteCount++;
}
@@ -832,6 +854,11 @@ ZenCacheDiskLayer::CacheBucket::Drop()
m_Index.clear();
m_Payloads.clear();
m_AccessTimes.clear();
+ m_FirstReferenceIndex.clear();
+ m_ReferenceHashes.clear();
+ m_NextReferenceHashesIndexes.clear();
+ m_ReferenceCount = 0;
+
return Deleted;
}
@@ -1081,7 +1108,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
else
{
// Structured cache value
- IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey);
+ IoBuffer Buffer = GetStandaloneCacheValue(Loc.GetContentType(), HashKey);
if (!Buffer)
{
ReportBadKey(HashKey);
@@ -1176,13 +1203,17 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
LogEntries.reserve(BadKeys.size());
{
- RwLock::ExclusiveLockScope __(m_IndexLock);
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
for (const IoHash& BadKey : BadKeys)
{
// Log a tombstone and delete the in-memory index for the bad entry
- const auto It = m_Index.find(BadKey);
- const BucketPayload& Payload = m_Payloads[It->second];
- DiskLocation Location = Payload.Location;
+ const auto It = m_Index.find(BadKey);
+ BucketPayload& Payload = m_Payloads[It->second];
+ if (m_EnableReferenceCaching)
+ {
+ RemoveReferences(IndexLock, m_FirstReferenceIndex[It->second]);
+ }
+ DiskLocation Location = Payload.Location;
Location.Flags |= DiskLocation::kTombStone;
LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
m_Index.erase(BadKey);
@@ -1211,6 +1242,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
{
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
+ std::vector<size_t> FirstReferenceIndex;
IndexMap Index;
{
@@ -1218,17 +1250,27 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
size_t EntryCount = m_Index.size();
Payloads.reserve(EntryCount);
AccessTimes.reserve(EntryCount);
+ if (m_EnableReferenceCaching)
+ {
+ FirstReferenceIndex.reserve(EntryCount);
+ }
Index.reserve(EntryCount);
for (auto It : m_Index)
{
size_t EntryIndex = Payloads.size();
Payloads.push_back(m_Payloads[It.second]);
AccessTimes.push_back(m_AccessTimes[It.second]);
+ if (m_EnableReferenceCaching)
+ {
+ FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]);
+ }
Index.insert({It.first, EntryIndex});
}
m_Index.swap(Index);
m_Payloads.swap(Payloads);
m_AccessTimes.swap(AccessTimes);
+ m_FirstReferenceIndex.swap(FirstReferenceIndex);
+ CompactReferences(__);
}
}
}
@@ -1248,13 +1290,18 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::GatherReferences");
+#define CALCULATE_BLOCKING_TIME 0
+
+#if CALCULATE_BLOCKING_TIME
uint64_t WriteBlockTimeUs = 0;
uint64_t WriteBlockLongestTimeUs = 0;
uint64_t ReadBlockTimeUs = 0;
uint64_t ReadBlockLongestTimeUs = 0;
+#endif // CALCULATE_BLOCKING_TIME
Stopwatch TotalTimer;
const auto _ = MakeGuard([&] {
+#if CALCULATE_BLOCKING_TIME
ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})",
m_BucketDir,
NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
@@ -1262,6 +1309,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
NiceLatencyNs(WriteBlockLongestTimeUs),
NiceLatencyNs(ReadBlockTimeUs),
NiceLatencyNs(ReadBlockLongestTimeUs));
+#else
+ ZEN_DEBUG("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()));
+#endif // CALCULATE_BLOCKING_TIME
});
const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime();
@@ -1271,17 +1321,21 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
IndexMap Index;
std::vector<AccessTime> AccessTimes;
std::vector<BucketPayload> Payloads;
+ std::vector<size_t> FirstReferenceIndex;
{
RwLock::SharedLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- Index = m_Index;
- AccessTimes = m_AccessTimes;
- Payloads = m_Payloads;
+#if CALCULATE_BLOCKING_TIME
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+#endif // CALCULATE_BLOCKING_TIME
+ Index = m_Index;
+ AccessTimes = m_AccessTimes;
+ Payloads = m_Payloads;
+ FirstReferenceIndex = m_FirstReferenceIndex;
}
std::vector<IoHash> ExpiredKeys;
@@ -1293,6 +1347,8 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
Cids.reserve(1024);
}
+ std::vector<std::pair<IoHash, size_t>> StructuredItemsWithUnknownAttachments;
+
for (const auto& Entry : Index)
{
const IoHash& Key = Entry.first;
@@ -1308,35 +1364,80 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
continue;
}
- const DiskLocation& Loc = Payloads[Entry.second].Location;
+ BucketPayload& Payload = Payloads[Entry.second];
+ const DiskLocation& Loc = Payload.Location;
- if (Loc.IsFlagSet(DiskLocation::kStructured))
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
{
- if (Cids.size() > 1024)
+ continue;
+ }
+ if (m_EnableReferenceCaching)
+ {
+ if (FirstReferenceIndex.empty() || (FirstReferenceIndex[Entry.second] == UnknownReferencesIndex))
{
- GcCtx.AddRetainedCids(Cids);
- Cids.clear();
+ StructuredItemsWithUnknownAttachments.push_back(Entry);
+ continue;
}
+ bool ReferencesAreKnown = false;
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+#if CALCULATE_BLOCKING_TIME
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+#endif // CALCULATE_BLOCKING_TIME
+ if (auto It = m_Index.find(Entry.first); It != m_Index.end())
+ {
+ ReferencesAreKnown = GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids);
+ }
+ }
+ if (ReferencesAreKnown)
+ {
+ if (Cids.size() >= 1024)
+ {
+ GcCtx.AddRetainedCids(Cids);
+ Cids.clear();
+ }
+ continue;
+ }
+ }
+ StructuredItemsWithUnknownAttachments.push_back(Entry);
+ }
+
+ for (const auto& Entry : StructuredItemsWithUnknownAttachments)
+ {
+ BucketPayload& Payload = Payloads[Entry.second];
+ const DiskLocation& Loc = Payload.Location;
+ {
IoBuffer Buffer;
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- RwLock::SharedLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Entry.first); !Buffer)
{
- // We don't need to hold the index lock when we read a standalone file
- __.ReleaseNow();
- if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer)
- {
- continue;
- }
+ continue;
}
- else if (Buffer = GetInlineCacheValue(Loc); !Buffer)
+ }
+ else
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+#if CALCULATE_BLOCKING_TIME
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+#endif // CALCULATE_BLOCKING_TIME
+ if (auto It = m_Index.find(Entry.first); It != m_Index.end())
+ {
+ BucketPayload& UpdatePayload = m_Payloads[It->second];
+ Buffer = GetInlineCacheValue(UpdatePayload.Location);
+ }
+ if (!Buffer)
{
continue;
}
@@ -1345,7 +1446,39 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
ZEN_ASSERT(Buffer);
ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject);
CbObject Obj(SharedBuffer{Buffer});
+ size_t CurrentCidCount = Cids.size();
Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); });
+ if (m_EnableReferenceCaching)
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+#if CALCULATE_BLOCKING_TIME
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+#endif // CALCULATE_BLOCKING_TIME
+ if (auto It = m_Index.find(Entry.first); It != m_Index.end())
+ {
+ if (m_FirstReferenceIndex[It->second] == UnknownReferencesIndex)
+ {
+ SetReferences(IndexLock,
+ m_FirstReferenceIndex[It->second],
+ std::span<IoHash>(Cids.data() + CurrentCidCount, Cids.size() - CurrentCidCount));
+ }
+ else
+ {
+ Cids.resize(CurrentCidCount);
+ (void)GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids);
+ }
+ }
+ }
+ if (Cids.size() >= 1024)
+ {
+ GcCtx.AddRetainedCids(Cids);
+ Cids.clear();
+ }
}
}
@@ -1434,6 +1567,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
// Clean up m_AccessTimes and m_Payloads vectors
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
+ std::vector<size_t> FirstReferenceIndex;
IndexMap Index;
{
@@ -1447,6 +1581,10 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
size_t EntryCount = m_Index.size();
Payloads.reserve(EntryCount);
AccessTimes.reserve(EntryCount);
+ if (m_EnableReferenceCaching)
+ {
+ FirstReferenceIndex.reserve(EntryCount);
+ }
Index.reserve(EntryCount);
for (auto It : m_Index)
{
@@ -1454,11 +1592,17 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
size_t NewEntryIndex = Payloads.size();
Payloads.push_back(m_Payloads[OldEntryIndex]);
AccessTimes.push_back(m_AccessTimes[OldEntryIndex]);
+ if (m_EnableReferenceCaching)
+ {
+ FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]);
+ }
Index.insert({It.first, NewEntryIndex});
}
m_Index.swap(Index);
m_Payloads.swap(Payloads);
m_AccessTimes.swap(AccessTimes);
+ m_FirstReferenceIndex.swap(FirstReferenceIndex);
+ CompactReferences(_);
}
GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end()));
}
@@ -1589,6 +1733,10 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
size_t EntryIndex = m_Payloads.size();
m_Payloads.emplace_back(BucketPayload{.Location = RestoreLocation});
m_AccessTimes.emplace_back(GcClock::TickCount());
+ if (m_EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
+ }
m_Index.insert({Key, EntryIndex});
m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed);
DeletedChunks.erase(Key);
@@ -1757,8 +1905,9 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, size_t Index)
const BucketPayload& Payload = m_Payloads[Index];
if (Payload.Location.IsFlagSet(DiskLocation::kStructured))
{
- IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key)
- : GetInlineCacheValue(Payload.Location);
+ IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile)
+ ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key)
+ : GetInlineCacheValue(Payload.Location);
CbObject Obj(SharedBuffer{Value});
Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); });
}
@@ -1843,7 +1992,7 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac
}
void
-ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::PutStandaloneCacheValue");
@@ -1957,13 +2106,18 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
DiskLocation Loc(NewFileSize, EntryFlags);
- RwLock::ExclusiveLockScope _(m_IndexLock);
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
if (auto It = m_Index.find(HashKey); It == m_Index.end())
{
// Previously unknown object
size_t EntryIndex = m_Payloads.size();
m_Payloads.emplace_back(BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
m_AccessTimes.emplace_back(GcClock::TickCount());
+ if (m_EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
+ SetReferences(IndexLock, m_FirstReferenceIndex.back(), References);
+ }
m_Index.insert_or_assign(HashKey, EntryIndex);
}
else
@@ -1971,8 +2125,13 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
// TODO: should check if write is idempotent and bail out if it is?
size_t EntryIndex = It.value();
ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_Payloads[EntryIndex] = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash};
- m_AccessTimes.emplace_back(GcClock::TickCount());
+ BucketPayload& Payload = m_Payloads[EntryIndex];
+ Payload = BucketPayload{.Location = Loc, .RawSize = Value.RawSize, .RawHash = Value.RawHash};
+ if (m_EnableReferenceCaching)
+ {
+ SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
+ }
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
m_TotalStandaloneSize.fetch_sub(Loc.Size(), std::memory_order::relaxed);
}
@@ -1981,7 +2140,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
}
void
-ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::PutInlineCacheValue");
@@ -2000,7 +2159,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
m_SlogFile.Append({.Key = HashKey, .Location = Location});
- RwLock::ExclusiveLockScope _(m_IndexLock);
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
if (auto It = m_Index.find(HashKey); It != m_Index.end())
{
// TODO: should check if write is idempotent and bail out if it is?
@@ -2010,20 +2169,191 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
m_Payloads[EntryIndex] = (BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ if (m_EnableReferenceCaching)
+ {
+ SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
+ }
}
else
{
size_t EntryIndex = m_Payloads.size();
m_Payloads.emplace_back(BucketPayload{.Location = Location, .RawSize = Value.RawSize, .RawHash = Value.RawHash});
m_AccessTimes.emplace_back(GcClock::TickCount());
+ if (m_EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.emplace_back(UnknownReferencesIndex);
+ SetReferences(IndexLock, m_FirstReferenceIndex.back(), References);
+ }
m_Index.insert_or_assign(HashKey, EntryIndex);
}
});
}
+void
+ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&)
+{
+ std::vector<size_t> FirstReferenceIndex;
+ std::vector<IoHash> NewReferenceHashes;
+ std::vector<size_t> NewNextReferenceHashesIndexes;
+
+ FirstReferenceIndex.reserve(m_ReferenceCount);
+ NewReferenceHashes.reserve(m_ReferenceCount);
+ NewNextReferenceHashesIndexes.reserve(m_ReferenceCount);
+
+ for (const auto& It : m_Index)
+ {
+ size_t SourceIndex = m_FirstReferenceIndex[It.second];
+ if (SourceIndex == UnknownReferencesIndex)
+ {
+ continue;
+ }
+ if (SourceIndex == NoReferencesIndex)
+ {
+ continue;
+ }
+ FirstReferenceIndex.push_back(NewNextReferenceHashesIndexes.size());
+ NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]);
+ NewNextReferenceHashesIndexes.push_back(NoReferencesIndex);
+
+ SourceIndex = m_NextReferenceHashesIndexes[SourceIndex];
+ while (SourceIndex != NoReferencesIndex)
+ {
+ NewNextReferenceHashesIndexes.back() = NewReferenceHashes.size();
+ NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]);
+ NewNextReferenceHashesIndexes.push_back(NoReferencesIndex);
+ SourceIndex = m_NextReferenceHashesIndexes[SourceIndex];
+ }
+ }
+ m_FirstReferenceIndex.swap(FirstReferenceIndex);
+ m_ReferenceHashes.swap(NewReferenceHashes);
+ m_NextReferenceHashesIndexes.swap(NewNextReferenceHashesIndexes);
+ m_ReferenceCount = m_ReferenceHashes.size();
+}
+
+size_t
+ZenCacheDiskLayer::CacheBucket::AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key)
+{
+ size_t ReferenceIndex = m_ReferenceHashes.size();
+ m_ReferenceHashes.push_back(Key);
+ m_NextReferenceHashesIndexes.push_back(NoReferencesIndex);
+ m_ReferenceCount++;
+ return ReferenceIndex;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::SetReferences(RwLock::ExclusiveLockScope& Lock,
+ std::size_t& FirstReferenceIndex,
+ std::span<IoHash> References)
+{
+ auto ReferenceIt = References.begin();
+
+ if (FirstReferenceIndex == UnknownReferencesIndex)
+ {
+ FirstReferenceIndex = NoReferencesIndex;
+ }
+
+ size_t CurrentIndex = FirstReferenceIndex;
+ if (CurrentIndex != NoReferencesIndex)
+ {
+ if (ReferenceIt != References.end())
+ {
+ ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero);
+ if (CurrentIndex == NoReferencesIndex)
+ {
+ CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt);
+ FirstReferenceIndex = CurrentIndex;
+ }
+ else
+ {
+ m_ReferenceHashes[CurrentIndex] = *ReferenceIt;
+ }
+ ReferenceIt++;
+ }
+ }
+ else
+ {
+ if (ReferenceIt != References.end())
+ {
+ ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero);
+ CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt);
+ ReferenceIt++;
+ }
+ FirstReferenceIndex = CurrentIndex;
+ }
+
+ while (ReferenceIt != References.end())
+ {
+ ZEN_ASSERT(CurrentIndex != NoReferencesIndex);
+ ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero);
+ size_t ReferenceIndex = m_NextReferenceHashesIndexes[CurrentIndex];
+ if (ReferenceIndex == NoReferencesIndex)
+ {
+ ReferenceIndex = AllocateReferenceEntry(Lock, *ReferenceIt);
+ m_NextReferenceHashesIndexes[CurrentIndex] = ReferenceIndex;
+ }
+ else
+ {
+ m_ReferenceHashes[ReferenceIndex] = *ReferenceIt;
+ }
+ CurrentIndex = ReferenceIndex;
+ ReferenceIt++;
+ }
+
+ while (CurrentIndex != NoReferencesIndex)
+ {
+ size_t NextIndex = m_NextReferenceHashesIndexes[CurrentIndex];
+ if (NextIndex != NoReferencesIndex)
+ {
+ m_ReferenceHashes[CurrentIndex] = IoHash::Zero;
+ ZEN_ASSERT(m_ReferenceCount > 0);
+ m_ReferenceCount--;
+ m_NextReferenceHashesIndexes[CurrentIndex] = NoReferencesIndex;
+ }
+ CurrentIndex = NextIndex;
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::RemoveReferences(RwLock::ExclusiveLockScope&, std::size_t& FirstReferenceIndex)
+{
+ if (FirstReferenceIndex == UnknownReferencesIndex)
+ {
+ return;
+ }
+ size_t CurrentIndex = FirstReferenceIndex;
+ while (CurrentIndex != NoReferencesIndex)
+ {
+ m_ReferenceHashes[CurrentIndex] = IoHash::Zero;
+ ZEN_ASSERT(m_ReferenceCount > 0);
+ m_ReferenceCount--;
+ CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex];
+ }
+ FirstReferenceIndex = UnknownReferencesIndex;
+}
+
+bool
+ZenCacheDiskLayer::CacheBucket::LockedGetReferences(std::size_t FirstReferenceIndex, std::vector<IoHash>& OutReferences) const
+{
+ if (FirstReferenceIndex == UnknownReferencesIndex)
+ {
+ return false;
+ }
+
+ size_t CurrentIndex = FirstReferenceIndex;
+ while (CurrentIndex != NoReferencesIndex)
+ {
+ ZEN_ASSERT_SLOW(m_ReferenceHashes[CurrentIndex] != IoHash::Zero);
+ OutReferences.push_back(m_ReferenceHashes[CurrentIndex]);
+ CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex];
+ }
+ return true;
+}
+
//////////////////////////////////////////////////////////////////////////
-ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir)
+ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir, bool EnableReferenceCaching)
+: m_RootDir(RootDir)
+, m_EnableReferenceCaching(EnableReferenceCaching)
{
}
@@ -2060,7 +2390,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
else
{
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching));
Bucket = InsertResult.first->second.get();
std::filesystem::path BucketPath = m_RootDir;
@@ -2079,7 +2409,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
void
-ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
{
ZEN_TRACE_CPU("Z$::Disk::Put");
@@ -2109,7 +2439,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
}
else
{
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching));
Bucket = InsertResult.first->second.get();
std::filesystem::path BucketPath = m_RootDir;
@@ -2134,7 +2464,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
ZEN_ASSERT(Bucket != nullptr);
- Bucket->Put(HashKey, Value);
+ Bucket->Put(HashKey, Value, References);
}
void
@@ -2165,7 +2495,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
continue;
}
- auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName, m_EnableReferenceCaching));
CacheBucket& Bucket = *InsertResult.first->second;
try
@@ -2352,7 +2682,8 @@ ZenCacheDiskLayer::Stats() const
ZenCacheDiskLayer::Info
ZenCacheDiskLayer::GetInfo() const
{
- ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir}, .TotalSize = TotalSize()};
+ ZenCacheDiskLayer::Info Info = {.Config = {.RootDir = m_RootDir, .EnableReferenceCaching = m_EnableReferenceCaching},
+ .TotalSize = TotalSize()};
{
RwLock::SharedLockScope _(m_Lock);
Info.BucketNames.reserve(m_Buckets.size());