aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-12-01 04:48:58 -0500
committerGitHub <[email protected]>2023-12-01 10:48:58 +0100
commit1bbdc86732464170c2e7c6145a5a19cdb48fe396 (patch)
tree8f224088f9621406b0a8a459b91612c612af63b5 /src
parentWinIoThreadPool teardown cleaned up (#580) (diff)
downloadzen-1bbdc86732464170c2e7c6145a5a19cdb48fe396.tar.xz
zen-1bbdc86732464170c2e7c6145a5a19cdb48fe396.zip
add separate PreCache step for GcReferenceChecker (#578)
- Improvement: GCv2: Use separate PreCache step to improve concurrency when checking references - Improvement: GCv2: Improved verbose logging - Improvement: GCv2: Sort chunks to read by block/offset when finding references - Improvement: GCv2: Exit as soon as no more unreferenced items are left
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp483
-rw-r--r--src/zenserver/cache/cachedisklayer.h2
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp99
-rw-r--r--src/zenserver/projectstore/projectstore.cpp43
-rw-r--r--src/zenstore/blockstore.cpp11
-rw-r--r--src/zenstore/gc.cpp60
-rw-r--r--src/zenstore/include/zenstore/blockstore.h2
-rw-r--r--src/zenstore/include/zenstore/gc.h4
8 files changed, 481 insertions, 223 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 9117b8820..955ab3a04 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -2502,6 +2502,10 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
ValueLock.ReleaseNow();
+ if (m_UpdatedKeys)
+ {
+ m_UpdatedKeys->insert(HashKey);
+ }
PayloadIndex EntryIndex = {};
if (auto It = m_Index.find(HashKey); It == m_Index.end())
@@ -2652,6 +2656,10 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
m_SlogFile.Append({.Key = HashKey, .Location = Location});
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ if (m_UpdatedKeys)
+ {
+ m_UpdatedKeys->insert(HashKey);
+ }
if (auto It = m_Index.find(HashKey); It != m_Index.end())
{
PayloadIndex EntryIndex = It.value();
@@ -2767,6 +2775,10 @@ public:
}
else
{
+ RwLock::SharedLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first));
+ IndexLock.ReleaseNow();
+ ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8());
+
std::error_code Ec;
bool Existed = std::filesystem::is_regular_file(FilePath, Ec);
if (Ec)
@@ -2792,9 +2804,12 @@ public:
if (Ctx.Settings.CollectSmallObjects)
{
+ m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys = std::make_unique<HashSet>(); });
+ auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys.reset(); }); });
+
std::unordered_map<uint32_t, uint64_t> BlockUsage;
{
- RwLock::SharedLockScope __(m_Bucket.m_IndexLock);
+ RwLock::SharedLockScope ___(m_Bucket.m_IndexLock);
for (const auto& Entry : m_Bucket.m_Index)
{
ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second;
@@ -2807,14 +2822,13 @@ public:
}
uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex();
uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment);
- auto It = BlockUsage.find(BlockIndex);
- if (It == BlockUsage.end())
+ if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end())
{
- BlockUsage.insert_or_assign(BlockIndex, ChunkSize);
+ It->second += ChunkSize;
}
else
{
- It->second += ChunkSize;
+ BlockUsage.insert_or_assign(BlockIndex, ChunkSize);
}
}
}
@@ -2830,7 +2844,7 @@ public:
if (BlocksToCompact.size() > 0)
{
{
- RwLock::SharedLockScope __(m_Bucket.m_IndexLock);
+ RwLock::SharedLockScope ___(m_Bucket.m_IndexLock);
for (const auto& Entry : m_Bucket.m_Index)
{
ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second;
@@ -2863,27 +2877,25 @@ public:
m_Bucket.m_Configuration.PayloadAlignment,
[&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
std::vector<DiskIndexEntry> MovedEntries;
- RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock);
+ MovedEntries.reserve(MovedArray.size());
+ RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock);
for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
{
size_t ChunkIndex = Moved.first;
const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
+ if (m_Bucket.m_UpdatedKeys->contains(Key))
+ {
+ continue;
+ }
+
if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end())
{
- ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second];
- const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex);
- if (Payload.Location.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment) != OldLocation)
- {
- // Someone has moved our chunk so lets just skip the new location we were provided, it will be
- // GC:d at a later time
- continue;
- }
- const BlockStoreLocation& NewLocation = Moved.second;
-
- Payload.Location = DiskLocation(NewLocation,
- m_Bucket.m_Configuration.PayloadAlignment,
- Payload.Location.GetFlags());
+ ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second];
+ const BlockStoreLocation& NewLocation = Moved.second;
+ Payload.Location = DiskLocation(NewLocation,
+ m_Bucket.m_Configuration.PayloadAlignment,
+ Payload.Location.GetFlags());
MovedEntries.push_back({.Key = Key, .Location = Payload.Location});
}
}
@@ -2955,9 +2967,9 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
// Find out expired keys
for (const auto& Entry : m_Index)
{
- const IoHash& Key = Entry.first;
- ZenCacheDiskLayer::CacheBucket::PayloadIndex EntryIndex = Entry.second;
- GcClock::Tick AccessTime = m_AccessTimes[EntryIndex];
+ const IoHash& Key = Entry.first;
+ PayloadIndex EntryIndex = Entry.second;
+ GcClock::Tick AccessTime = m_AccessTimes[EntryIndex];
if (AccessTime >= ExpireTicks)
{
continue;
@@ -3004,7 +3016,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
}
}
- if (!ExpiredEntries.empty())
+ if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty())
{
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
@@ -3028,22 +3040,253 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
class DiskBucketReferenceChecker : public GcReferenceChecker
{
+ using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex;
+ using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload;
+ using CacheBucket = ZenCacheDiskLayer::CacheBucket;
+ using ReferenceIndex = ZenCacheDiskLayer::CacheBucket::ReferenceIndex;
+
public:
- DiskBucketReferenceChecker(ZenCacheDiskLayer::CacheBucket& Owner) : m_CacheBucket(Owner) {}
+ DiskBucketReferenceChecker(CacheBucket& Owner) : m_CacheBucket(Owner) {}
virtual ~DiskBucketReferenceChecker()
{
- m_IndexLock.reset();
- if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ try
+ {
+ m_IndexLock.reset();
+ if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it
+ m_CacheBucket.ClearReferenceCache();
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("~DiskBucketReferenceChecker threw exception: '{}'", Ex.what());
+ }
+ }
+
+ virtual void PreCache(GcCtx& Ctx) override
+ {
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::PreCache");
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}",
+ m_CacheBucket.m_BucketDir,
+ m_CacheBucket.m_ReferenceCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ std::vector<IoHash> UpdateKeys;
+ std::vector<size_t> ReferenceCounts;
+ std::vector<IoHash> References;
+
+ auto GetAttachments = [&References, &ReferenceCounts](const void* CbObjectData) {
+ size_t CurrentReferenceCount = References.size();
+ CbObjectView Obj(CbObjectData);
+ Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); });
+ ReferenceCounts.push_back(References.size() - CurrentReferenceCount);
+ };
+
+ // Refresh cache
{
- // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it
- m_CacheBucket.ClearReferenceCache();
+ // If reference caching is enabled the references will be updated at modification for us so we don't need to track modifications
+ if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys = std::make_unique<HashSet>(); });
+ }
+
+ std::vector<IoHash> StandaloneKeys;
+ {
+ std::vector<IoHash> InlineKeys;
+ std::unordered_map<uint32_t, std::size_t> BlockIndexToEntriesPerBlockIndex;
+ struct InlineEntry
+ {
+ uint32_t InlineKeyIndex;
+ uint32_t Offset;
+ uint32_t Size;
+ };
+ std::vector<std::vector<InlineEntry>> EntriesPerBlock;
+
+ {
+ RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock);
+ for (const auto& Entry : m_CacheBucket.m_Index)
+ {
+ if (Ctx.IsCancelledFlag.load())
+ {
+ IndexLock.ReleaseNow();
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ return;
+ }
+
+ PayloadIndex EntryIndex = Entry.second;
+ const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
+ if (m_CacheBucket.m_Configuration.EnableReferenceCaching &&
+ m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown())
+ {
+ continue;
+ }
+ const IoHash& Key = Entry.first;
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ StandaloneKeys.push_back(Key);
+ continue;
+ }
+
+ BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment);
+ InlineEntry UpdateEntry = {.InlineKeyIndex = gsl::narrow<uint32_t>(InlineKeys.size()),
+ .Offset = gsl::narrow<uint32_t>(ChunkLocation.Offset),
+ .Size = gsl::narrow<uint32_t>(ChunkLocation.Size)};
+ InlineKeys.push_back(Key);
+
+ if (auto It = BlockIndexToEntriesPerBlockIndex.find(ChunkLocation.BlockIndex);
+ It != BlockIndexToEntriesPerBlockIndex.end())
+ {
+ EntriesPerBlock[It->second].emplace_back(UpdateEntry);
+ }
+ else
+ {
+ BlockIndexToEntriesPerBlockIndex.insert_or_assign(ChunkLocation.BlockIndex, EntriesPerBlock.size());
+ EntriesPerBlock.emplace_back(std::vector<InlineEntry>{UpdateEntry});
+ }
+ }
+ }
+
+ for (auto It : BlockIndexToEntriesPerBlockIndex)
+ {
+ uint32_t BlockIndex = It.first;
+
+ Ref<BlockStoreFile> BlockFile = m_CacheBucket.m_BlockStore.GetBlockFile(BlockIndex);
+ if (BlockFile)
+ {
+ size_t EntriesPerBlockIndex = It.second;
+ std::vector<InlineEntry>& InlineEntries = EntriesPerBlock[EntriesPerBlockIndex];
+
+ std::sort(InlineEntries.begin(), InlineEntries.end(), [&](const InlineEntry& Lhs, const InlineEntry& Rhs) -> bool {
+ return Lhs.Offset < Rhs.Offset;
+ });
+
+ uint64_t BlockFileSize = BlockFile->FileSize();
+ BasicFileBuffer BlockBuffer(BlockFile->GetBasicFile(), 32768);
+ for (const InlineEntry& InlineEntry : InlineEntries)
+ {
+ if ((InlineEntry.Offset + InlineEntry.Size) > BlockFileSize)
+ {
+ ReferenceCounts.push_back(0);
+ }
+ else
+ {
+ MemoryView ChunkView = BlockBuffer.MakeView(InlineEntry.Size, InlineEntry.Offset);
+ if (ChunkView.GetSize() == InlineEntry.Size)
+ {
+ GetAttachments(ChunkView.GetData());
+ }
+ else
+ {
+ std::vector<uint8_t> Buffer(InlineEntry.Size);
+ BlockBuffer.Read(Buffer.data(), InlineEntry.Size, InlineEntry.Offset);
+ GetAttachments(Buffer.data());
+ }
+ }
+ const IoHash& Key = InlineKeys[InlineEntry.InlineKeyIndex];
+ UpdateKeys.push_back(Key);
+ }
+ }
+ }
+ }
+ {
+ for (const IoHash& Key : StandaloneKeys)
+ {
+ if (Ctx.IsCancelledFlag.load())
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ return;
+ }
+
+ IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(ZenContentType::kCbObject, Key);
+ if (!Buffer)
+ {
+ continue;
+ }
+
+ GetAttachments(Buffer.GetData());
+ UpdateKeys.push_back(Key);
+ }
+ }
+ }
+
+ {
+ size_t ReferenceOffset = 0;
+ RwLock::ExclusiveLockScope IndexLock(m_CacheBucket.m_IndexLock);
+
+ if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ ZEN_ASSERT(m_CacheBucket.m_FirstReferenceIndex.empty());
+ ZEN_ASSERT(m_CacheBucket.m_ReferenceHashes.empty());
+ ZEN_ASSERT(m_CacheBucket.m_NextReferenceHashesIndexes.empty());
+ ZEN_ASSERT(m_CacheBucket.m_ReferenceCount == 0);
+ ZEN_ASSERT(m_CacheBucket.m_UpdatedKeys);
+
+ // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when
+ // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted.
+ m_CacheBucket.m_FirstReferenceIndex.resize(m_CacheBucket.m_Payloads.size(), ReferenceIndex::Unknown());
+ m_CacheBucket.m_ReferenceHashes.reserve(References.size());
+ m_CacheBucket.m_NextReferenceHashesIndexes.reserve(References.size());
+ }
+ else
+ {
+ ZEN_ASSERT(!m_CacheBucket.m_UpdatedKeys);
+ }
+
+ for (size_t Index = 0; Index < UpdateKeys.size(); Index++)
+ {
+ const IoHash& Key = UpdateKeys[Index];
+ size_t ReferenceCount = ReferenceCounts[Index];
+ if (auto It = m_CacheBucket.m_Index.find(Key); It != m_CacheBucket.m_Index.end())
+ {
+ PayloadIndex EntryIndex = It->second;
+ if (m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ if (m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown())
+ {
+ // The reference data is valid and what we have is old/redundant
+ continue;
+ }
+ }
+ else if (m_CacheBucket.m_UpdatedKeys->contains(Key))
+ {
+ // Our pre-cache data is invalid
+ continue;
+ }
+
+ m_CacheBucket.SetReferences(IndexLock,
+ m_CacheBucket.m_FirstReferenceIndex[EntryIndex],
+ std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount});
+ }
+ ReferenceOffset += ReferenceCount;
+ }
+
+ if (m_CacheBucket.m_Configuration.EnableReferenceCaching && !UpdateKeys.empty())
+ {
+ m_CacheBucket.CompactReferences(IndexLock);
+ }
}
}
virtual void LockState(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveExpiredData");
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::LockState");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3062,31 +3305,38 @@ public:
{
m_UncachedReferences.clear();
m_IndexLock.reset();
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
return;
}
- // Rescan to see if any cache items needs refreshing since last pass when we had the lock
- for (const auto& Entry : m_CacheBucket.m_Index)
+ if (m_CacheBucket.m_UpdatedKeys)
{
- if (Ctx.IsCancelledFlag.load())
+ const HashSet& UpdatedKeys(*m_CacheBucket.m_UpdatedKeys);
+ for (const IoHash& Key : UpdatedKeys)
{
- m_UncachedReferences.clear();
- m_IndexLock.reset();
- return;
- }
+ if (Ctx.IsCancelledFlag.load())
+ {
+ m_UncachedReferences.clear();
+ m_IndexLock.reset();
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ return;
+ }
+
+ auto It = m_CacheBucket.m_Index.find(Key);
+ if (It == m_CacheBucket.m_Index.end())
+ {
+ continue;
+ }
- size_t PayloadIndex = Entry.second;
- const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_CacheBucket.m_Payloads[PayloadIndex];
- const DiskLocation& Loc = Payload.Location;
+ PayloadIndex EntryIndex = It->second;
+ const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
- if (!Loc.IsFlagSet(DiskLocation::kStructured))
- {
- continue;
- }
- ZEN_ASSERT(!m_CacheBucket.m_FirstReferenceIndex.empty());
- const IoHash& Key = Entry.first;
- if (m_CacheBucket.m_FirstReferenceIndex[PayloadIndex] == ZenCacheDiskLayer::CacheBucket::ReferenceIndex::Unknown())
- {
IoBuffer Buffer;
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
@@ -3128,15 +3378,27 @@ public:
for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes)
{
- IoCids.erase(ReferenceHash);
+ if (IoCids.erase(ReferenceHash) == 1)
+ {
+ if (IoCids.empty())
+ {
+ return;
+ }
+ }
}
for (const IoHash& ReferenceHash : m_UncachedReferences)
{
- IoCids.erase(ReferenceHash);
+ if (IoCids.erase(ReferenceHash) == 1)
+ {
+ if (IoCids.empty())
+ {
+ return;
+ }
+ }
}
}
- ZenCacheDiskLayer::CacheBucket& m_CacheBucket;
+ CacheBucket& m_CacheBucket;
std::unique_ptr<RwLock::SharedLockScope> m_IndexLock;
HashSet m_UncachedReferences;
};
@@ -3152,126 +3414,9 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
{
return;
}
- ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': found {} references in {}",
- m_BucketDir,
- m_ReferenceCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': completed in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- std::vector<IoHash> UpdateKeys;
- std::vector<IoHash> StandaloneKeys;
- std::vector<size_t> ReferenceCounts;
- std::vector<IoHash> References;
-
- // Refresh cache
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- for (const auto& Entry : m_Index)
- {
- if (Ctx.IsCancelledFlag.load())
- {
- return {};
- }
-
- size_t PayloadIndex = Entry.second;
- const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Payloads[PayloadIndex];
- const DiskLocation& Loc = Payload.Location;
-
- if (!Loc.IsFlagSet(DiskLocation::kStructured))
- {
- continue;
- }
- if (m_Configuration.EnableReferenceCaching &&
- m_FirstReferenceIndex[PayloadIndex] != ZenCacheDiskLayer::CacheBucket::ReferenceIndex::Unknown())
- {
- continue;
- }
- const IoHash& Key = Entry.first;
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- StandaloneKeys.push_back(Key);
- continue;
- }
- IoBuffer Buffer = GetInlineCacheValue(Loc);
- if (!Buffer)
- {
- UpdateKeys.push_back(Key);
- ReferenceCounts.push_back(0);
- continue;
- }
- size_t CurrentReferenceCount = References.size();
- {
- CbObjectView Obj(Buffer.GetData());
- Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); });
- Buffer = {};
- }
- UpdateKeys.push_back(Key);
- ReferenceCounts.push_back(References.size() - CurrentReferenceCount);
- }
- }
- {
- for (const IoHash& Key : StandaloneKeys)
- {
- if (Ctx.IsCancelledFlag.load())
- {
- return {};
- }
-
- IoBuffer Buffer = GetStandaloneCacheValue(ZenContentType::kCbObject, Key);
- if (!Buffer)
- {
- continue;
- }
-
- size_t CurrentReferenceCount = References.size();
- {
- CbObjectView Obj(Buffer.GetData());
- Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); });
- Buffer = {};
- }
- UpdateKeys.push_back(Key);
- ReferenceCounts.push_back(References.size() - CurrentReferenceCount);
- }
- }
-
- {
- size_t ReferenceOffset = 0;
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- if (!m_Configuration.EnableReferenceCaching)
- {
- ZEN_ASSERT(m_FirstReferenceIndex.empty());
- ZEN_ASSERT(m_ReferenceHashes.empty());
- ZEN_ASSERT(m_NextReferenceHashesIndexes.empty());
- ZEN_ASSERT(m_ReferenceCount == 0);
- // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when
- // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted.
- m_FirstReferenceIndex.resize(m_Payloads.size());
- }
- for (size_t Index = 0; Index < UpdateKeys.size(); Index++)
- {
- const IoHash& Key = UpdateKeys[Index];
- size_t ReferenceCount = ReferenceCounts[Index];
- auto It = m_Index.find(Key);
- if (It == m_Index.end())
- {
- ReferenceOffset += ReferenceCount;
- continue;
- }
- if (m_FirstReferenceIndex[It->second] != ReferenceIndex::Unknown())
- {
- continue;
- }
- SetReferences(IndexLock,
- m_FirstReferenceIndex[It->second],
- std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount});
- ReferenceOffset += ReferenceCount;
- }
- if (m_Configuration.EnableReferenceCaching)
- {
- CompactReferences(IndexLock);
- }
- }
-
return {new DiskBucketReferenceChecker(*this)};
}
diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h
index 8d015d127..55d2a98f4 100644
--- a/src/zenserver/cache/cachedisklayer.h
+++ b/src/zenserver/cache/cachedisklayer.h
@@ -293,7 +293,6 @@ public:
using IndexMap = tsl::robin_map<IoHash, PayloadIndex, IoHash::Hasher>;
- private:
GcManager& m_Gc;
std::atomic_uint64_t& m_OuterCacheMemoryUsage;
std::string m_BucketName;
@@ -329,6 +328,7 @@ public:
std::vector<ReferenceIndex> m_FirstReferenceIndex;
std::vector<IoHash> m_ReferenceHashes;
std::vector<ReferenceIndex> m_NextReferenceHashesIndexes;
+ std::unique_ptr<HashSet> m_UpdatedKeys;
size_t m_ReferenceCount = 0;
std::atomic_uint64_t m_StandaloneSize{};
std::atomic_uint64_t m_MemCachedSize{};
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index 25dfd103d..9155e209c 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -816,16 +816,28 @@ namespace testutils {
return {Key, Buffer};
}
+ struct FalseType
+ {
+ static const bool Enabled = false;
+ };
+ struct TrueType
+ {
+ static const bool Enabled = true;
+ };
+
} // namespace testutils
-TEST_CASE("z$.store")
+TEST_CASE_TEMPLATE("z$.store", ReferenceCaching, testutils::FalseType, testutils::TrueType)
{
ScopedTemporaryDirectory TempDir;
GcManager Gc;
auto JobQueue = MakeJobQueue(1, "testqueue");
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
const int kIterationCount = 100;
@@ -859,7 +871,7 @@ TEST_CASE("z$.store")
}
}
-TEST_CASE("z$.size")
+TEST_CASE_TEMPLATE("z$.size", ReferenceCaching, testutils::FalseType, testutils::TrueType)
{
auto JobQueue = MakeJobQueue(1, "testqueue");
@@ -881,7 +893,10 @@ TEST_CASE("z$.size")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CbObject CacheValue = CreateCacheValue(Zcs.GetConfig().DiskLayerConfig.BucketConfig.MemCacheSizeThreshold - 256);
@@ -915,7 +930,10 @@ TEST_CASE("z$.size")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
const GcStorageSize SerializedSize = Zcs.StorageSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
@@ -939,7 +957,10 @@ TEST_CASE("z$.size")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CbObject CacheValue = CreateCacheValue(Zcs.GetConfig().DiskLayerConfig.BucketConfig.MemCacheSizeThreshold + 64);
@@ -959,7 +980,10 @@ TEST_CASE("z$.size")
{
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
const GcStorageSize SerializedSize = Zcs.StorageSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
@@ -974,7 +998,7 @@ TEST_CASE("z$.size")
}
}
-TEST_CASE("z$.gc")
+TEST_CASE_TEMPLATE("z$.gc", ReferenceCaching, testutils::FalseType, testutils::TrueType)
{
using namespace testutils;
@@ -1001,7 +1025,7 @@ TEST_CASE("z$.gc")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
const auto Bucket = "teardrinker"sv;
// Create a cache record
@@ -1041,7 +1065,7 @@ TEST_CASE("z$.gc")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
std::vector<IoHash> Keep;
// Collect garbage with 1 hour max cache duration
@@ -1065,7 +1089,7 @@ TEST_CASE("z$.gc")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
const auto Bucket = "fortysixandtwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -1114,7 +1138,7 @@ TEST_CASE("z$.gc")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
const auto Bucket = "rightintwo"sv;
std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
@@ -1162,13 +1186,13 @@ TEST_CASE("z$.gc")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(0, Zcs.StorageSize().DiskSize);
}
}
}
-TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
+TEST_CASE_TEMPLATE("z$.threadedinsert", ReferenceCaching, testutils::FalseType, testutils::TrueType) // * doctest::skip(true))
{
// for (uint32_t i = 0; i < 100; ++i)
{
@@ -1219,7 +1243,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
WorkerThreadPool ThreadPool(4);
GcManager Gc;
auto JobQueue = MakeJobQueue(1, "testqueue");
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path(), {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path(),
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
{
std::atomic<size_t> WorkCompleted = 0;
@@ -1648,7 +1675,7 @@ TEST_CASE("z$.drop.namespace")
}
}
-TEST_CASE("z$.blocked.disklayer.put")
+TEST_CASE_TEMPLATE("z$.blocked.disklayer.put", ReferenceCaching, testutils::FalseType, testutils::TrueType)
{
ScopedTemporaryDirectory TempDir;
@@ -1665,7 +1692,10 @@ TEST_CASE("z$.blocked.disklayer.put")
GcManager Gc;
auto JobQueue = MakeJobQueue(1, "testqueue");
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
@@ -1701,7 +1731,7 @@ TEST_CASE("z$.blocked.disklayer.put")
CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0);
}
-TEST_CASE("z$.scrub")
+TEST_CASE_TEMPLATE("z$.scrub", ReferenceCaching, testutils::FalseType, testutils::TrueType)
{
ScopedTemporaryDirectory TempDir;
@@ -1760,7 +1790,10 @@ TEST_CASE("z$.scrub")
GcManager Gc;
CidStore CidStore(Gc);
auto JobQueue = MakeJobQueue(1, "testqueue");
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {});
+ ZenCacheNamespace Zcs(Gc,
+ *JobQueue,
+ TempDir.Path() / "cache",
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
CidStore.Initialize(CidConfig);
@@ -1795,7 +1828,7 @@ TEST_CASE("z$.scrub")
CHECK(ScrubCtx.BadCids().GetSize() == 0);
}
-TEST_CASE("z$.newgc.basics")
+TEST_CASE_TEMPLATE("z$.newgc.basics", ReferenceCaching, testutils::FalseType, testutils::TrueType)
{
using namespace testutils;
@@ -1915,7 +1948,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
// Create some basic data
{
@@ -1949,7 +1982,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() - std::chrono::hours(1),
@@ -1983,7 +2016,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
@@ -2017,7 +2050,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
@@ -2051,7 +2084,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
@@ -2086,7 +2119,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
@@ -2121,7 +2154,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
@@ -2162,7 +2195,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
GcResult Result = Gc.CollectGarbage(GcSettings{.CacheExpireTime = GcClock::Now() + std::chrono::hours(1),
@@ -2198,7 +2231,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::hours(2));
@@ -2238,7 +2271,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
Zcs.SetAccessTime(TearDrinkerBucket, CacheRecords[0], GcClock::Now() + std::chrono::hours(2));
@@ -2277,7 +2310,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
Zcs.SetAccessTime(TearDrinkerBucket, UnstructuredCacheValues[1], GcClock::Now() + std::chrono::hours(2));
@@ -2317,7 +2350,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
// Prime so we can check GC of memory layer
@@ -2370,7 +2403,7 @@ TEST_CASE("z$.newgc.basics")
ZenCacheNamespace Zcs(Gc,
*JobQueue,
TempDir.Path() / "cache",
- {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = true}}});
+ {.DiskLayerConfig = {.BucketConfig = {.EnableReferenceCaching = ReferenceCaching::Enabled}}});
CHECK_EQ(7, Zcs.GetBucketInfo(TearDrinkerBucket).value().DiskLayerInfo.EntryCount);
auto Attachments =
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index dde824b6f..c6097dea2 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -3311,11 +3311,15 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
class ProjectStoreReferenceChecker : public GcReferenceChecker
{
public:
- ProjectStoreReferenceChecker(GcCtx& Ctx, ProjectStore::Oplog& Owner, bool PreCache) : m_Oplog(Owner)
+ ProjectStoreReferenceChecker(ProjectStore::Oplog& Owner, bool PreCache) : m_Oplog(Owner), m_PreCache(PreCache) {}
+
+ virtual ~ProjectStoreReferenceChecker() {}
+
+ virtual void PreCache(GcCtx& Ctx) override
{
- if (PreCache)
+ if (m_PreCache)
{
- ZEN_TRACE_CPU("Store::ReferencesPreCache");
+ ZEN_TRACE_CPU("Store::PreCache");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3323,7 +3327,7 @@ public:
{
return;
}
- ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': precached {} references in {} from {}/{}",
+ ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}",
m_Oplog.m_BasePath,
m_References.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
@@ -3332,14 +3336,17 @@ public:
});
RwLock::SharedLockScope __(m_Oplog.m_OplogLock);
- m_Oplog.IterateOplog(
- [&](CbObjectView Op) { Op.IterateAttachments([&](CbFieldView Visitor) { m_References.insert(Visitor.AsAttachment()); }); });
+ if (Ctx.IsCancelledFlag)
+ {
+ return;
+ }
+ m_Oplog.IterateOplog([&](CbObjectView Op) {
+ Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
+ });
m_PreCachedLsn = m_Oplog.GetMaxOpIndex();
}
}
- virtual ~ProjectStoreReferenceChecker() {}
-
virtual void LockState(GcCtx& Ctx) override
{
ZEN_TRACE_CPU("Store::LockState");
@@ -3363,9 +3370,10 @@ public:
{
// TODO: Maybe we could just check the added oplog entries - we might get a few extra references from obsolete entries
// but I don't think that would be critical
- m_References.clear();
- m_Oplog.IterateOplog(
- [&](CbObjectView Op) { Op.IterateAttachments([&](CbFieldView Visitor) { m_References.insert(Visitor.AsAttachment()); }); });
+ m_References.resize(0);
+ m_Oplog.IterateOplog([&](CbObjectView Op) {
+ Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
+ });
}
}
@@ -3391,12 +3399,19 @@ public:
for (const IoHash& ReferenceHash : m_References)
{
- IoCids.erase(ReferenceHash);
+ if (IoCids.erase(ReferenceHash) == 1)
+ {
+ if (IoCids.empty())
+ {
+ return;
+ }
+ }
}
}
ProjectStore::Oplog& m_Oplog;
+ bool m_PreCache;
std::unique_ptr<RwLock::SharedLockScope> m_OplogLock;
- HashSet m_References;
+ std::vector<IoHash> m_References;
int m_PreCachedLsn = -1;
};
@@ -3446,7 +3461,7 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx)
ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId);
GcClock::TimePoint Now = GcClock::Now();
bool TryPreCache = Project->LastOplogAccessTime(OpLogId) < (Now - std::chrono::minutes(5));
- Checkers.emplace_back(new ProjectStoreReferenceChecker(Ctx, *Oplog, TryPreCache));
+ Checkers.emplace_back(new ProjectStoreReferenceChecker(*Oplog, TryPreCache));
}
OplogCount += OpLogs.size();
}
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 03c7f4b95..cc727787f 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1274,6 +1274,17 @@ BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint
return Path.ToPath();
}
+Ref<BlockStoreFile>
+BlockStore::GetBlockFile(uint32_t BlockIndex)
+{
+ RwLock::SharedLockScope _(m_InsertLock);
+ if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end())
+ {
+ return It->second;
+ }
+ return {};
+}
+
#if ZEN_WITH_TESTS
TEST_CASE("blockstore.blockstoredisklocation")
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index e2ab34d1e..2660c2643 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -381,6 +381,7 @@ WriteReferencerStats(CbObjectWriter& Writer, const GcReferencerStats& Stats, boo
Writer.EndObject();
Writer << "CreateReferenceCheckers" << ToTimeSpan(Stats.CreateReferenceCheckersMS);
+ Writer << "PreCacheState" << ToTimeSpan(Stats.PreCacheStateMS);
Writer << "LockState" << ToTimeSpan(Stats.LockStateMS);
Writer << "Elapsed" << ToTimeSpan(Stats.ElapsedMS);
};
@@ -449,6 +450,7 @@ WriteGCResult(CbObjectWriter& Writer, const GcResult& Result, bool HumanReadable
Writer << "RemoveExpiredData" << ToTimeSpan(Result.RemoveExpiredDataMS);
Writer << "CreateReferenceCheckers" << ToTimeSpan(Result.CreateReferenceCheckersMS);
+ Writer << "PreCacheState" << ToTimeSpan(Result.PreCacheStateMS);
Writer << "LockState" << ToTimeSpan(Result.LockStateMS);
Writer << "CreateReferencePruners" << ToTimeSpan(Result.CreateReferencePrunersMS);
@@ -507,8 +509,8 @@ Add(GcStats& Sum, const GcStats& Sub)
void
Sum(GcReferencerStats& Stat)
{
- Stat.ElapsedMS =
- Stat.RemoveExpiredDataStats.ElapsedMS + Stat.CompactStoreStats.ElapsedMS + Stat.CreateReferenceCheckersMS + Stat.LockStateMS;
+ Stat.ElapsedMS = Stat.RemoveExpiredDataStats.ElapsedMS + Stat.CompactStoreStats.ElapsedMS + Stat.CreateReferenceCheckersMS +
+ Stat.PreCacheStateMS + Stat.LockStateMS;
}
void
@@ -518,6 +520,7 @@ Add(GcReferencerStats& Sum, const GcReferencerStats& Sub)
Add(Sum.CompactStoreStats, Sub.CompactStoreStats);
Sum.CreateReferenceCheckersMS += Sub.CreateReferenceCheckersMS;
+ Sum.PreCacheStateMS += Sub.PreCacheStateMS;
Sum.LockStateMS += Sub.LockStateMS;
Sum.ElapsedMS += Sub.ElapsedMS;
@@ -802,10 +805,54 @@ GcManager::CollectGarbage(const GcSettings& Settings)
}
}
- ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size());
{
- SCOPED_TIMER(uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); Result.WriteBlockMS = std::chrono::milliseconds(ElapsedMS);
- ZEN_INFO("GCV2: Writes blocked for {}", NiceTimeSpanMs(ElapsedMS)));
+ ZEN_INFO("GCV2: Precaching state for {} reference checkers", ReferenceCheckers.size());
+ if (!ReferenceCheckers.empty())
+ {
+ if (CheckGCCancel())
+ {
+ return Sum(Result, true);
+ }
+ ZEN_TRACE_CPU("GcV2::PreCache");
+
+ Latch WorkLeft(1);
+
+ {
+ SCOPED_TIMER(Result.PreCacheStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());
+ if (Ctx.Settings.Verbose) {
+ ZEN_INFO("GCV2: Precached state using {} reference checkers in {}",
+ ReferenceCheckers.size(),
+ NiceTimeSpanMs(Result.PreCacheStateMS.count()));
+ });
+ for (auto& It : ReferenceCheckers)
+ {
+ if (CheckGCCancel())
+ {
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ return Sum(Result, true);
+ }
+
+ GcReferenceChecker* Checker = It.first.get();
+ size_t Index = It.second;
+ std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index];
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([&Ctx, Checker, Index, &Stats, &WorkLeft]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ SCOPED_TIMER(Stats.second.PreCacheStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Checker->PreCache(Ctx);
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+ }
+ }
+
+ SCOPED_TIMER(uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); Result.WriteBlockMS = std::chrono::milliseconds(ElapsedMS);
+ ZEN_INFO("GCV2: Writes blocked for {}", NiceTimeSpanMs(ElapsedMS)));
+ {
+ ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size());
if (!ReferenceCheckers.empty())
{
if (CheckGCCancel())
@@ -849,7 +896,8 @@ GcManager::CollectGarbage(const GcSettings& Settings)
WorkLeft.Wait();
}
}
-
+ }
+ {
ZEN_INFO("GCV2: Removing unreferenced data for {} reference pruners", ReferencePruners.size());
{
const auto GetUnusedReferences = [&ReferenceCheckers, &Ctx](std::span<IoHash> References) -> std::vector<IoHash> {
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 82e1c71c6..dcd4b5e87 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -180,6 +180,8 @@ public:
inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); }
+ Ref<BlockStoreFile> GetBlockFile(uint32_t BlockIndex);
+
private:
uint32_t GetFreeBlockIndex(uint32_t StartProbeIndex, RwLock::ExclusiveLockScope&, std::filesystem::path& OutBlockPath) const;
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index 7a6249970..698b0d4e8 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -87,6 +87,7 @@ struct GcReferencerStats
GcCompactStoreStats CompactStoreStats;
std::chrono::milliseconds CreateReferenceCheckersMS = {};
+ std::chrono::milliseconds PreCacheStateMS = {};
std::chrono::milliseconds LockStateMS = {};
std::chrono::milliseconds ElapsedMS = {};
};
@@ -112,6 +113,7 @@ struct GcResult
// Wall times, not sum of each
std::chrono::milliseconds RemoveExpiredDataMS = {};
std::chrono::milliseconds CreateReferenceCheckersMS = {};
+ std::chrono::milliseconds PreCacheStateMS = {};
std::chrono::milliseconds LockStateMS = {};
std::chrono::milliseconds CreateReferencePrunersMS = {};
@@ -171,6 +173,8 @@ public:
// Destructor should unlock what was locked in LockState
virtual ~GcReferenceChecker() = default;
+ virtual void PreCache(GcCtx& Ctx) = 0;
+
// Lock the state and make sure no references changes, usually a read-lock is taken until the destruction
// of the instance. Called once before any calls to RemoveUsedReferencesFromSet
// The implementation should be as fast as possible as LockState is part of a stop the world (from changes)