diff options
| author | Dan Engelbrecht <[email protected]> | 2023-10-03 13:31:02 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-03 13:31:02 +0200 |
| commit | 68a72b68592c416969bd36f413eb2b2762b9fcff (patch) | |
| tree | 9a5fc28eb9040f010c92f86a1745f9418dfc91ca /src/zenserver/cache | |
| parent | clean up date formatting (#440) (diff) | |
| download | zen-68a72b68592c416969bd36f413eb2b2762b9fcff.tar.xz zen-68a72b68592c416969bd36f413eb2b2762b9fcff.zip | |
faster accesstime save restore (#439)
- Improvement: Reduce time a cache bucket is locked for write when flushing/garbage collecting
- Change format for faster read/write and reduced size on disk
- Don't lock index while writing manifest to disk
- Skip garbage collect if we are currently in a Flush operation
- BlockStore::Flush no longer terminates currently writing block
- Garbage collect references to currently writing block but keep the block as new data may be added
- Fix BlockStore::Prune used disk space calculation
- Don't materialize data in filecas when we just need the size
Diffstat (limited to 'src/zenserver/cache')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 300 | ||||
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.h | 22 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 62 |
3 files changed, 252 insertions, 132 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 98a24116f..9883e2119 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -238,37 +238,90 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo const auto _ = MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - for (CbFieldView Entry : Manifest["Timestamps"sv]) + uint64_t Count = Manifest["Count"sv].AsUInt64(0); + if (Count != 0) { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - - if (auto It = m_Index.find(Key); It != m_Index.end()) + std::vector<size_t> KeysIndexes; + KeysIndexes.reserve(Count); + CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); + for (CbFieldView& KeyView : KeyArray) + { + if (auto It = m_Index.find(KeyView.AsHash()); It != m_Index.end()) + { + KeysIndexes.push_back(It.value()); + continue; + } + KeysIndexes.push_back((uint64_t)-1); + } + size_t KeyIndexOffset = 0; + CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); + for (CbFieldView& TimeStampView : TimeStampArray) + { + size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex == (uint64_t)-1) + { + continue; + } + m_AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + } + KeyIndexOffset = 0; + CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); + for (CbFieldView& RawHashView : RawHashArray) { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); + size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex == (uint64_t)-1) + { + continue; + } + m_Payloads[KeyIndex].RawHash = RawHashView.AsHash(); + } + KeyIndexOffset = 0; + CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); + for (CbFieldView& RawSizeView : RawSizeArray) + { + size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex == (uint64_t)-1) + { + continue; + } + m_Payloads[KeyIndex].RawSize = RawSizeView.AsUInt64(); } } - for (CbFieldView Entry : Manifest["RawInfo"sv]) + + ////// Legacy format read { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - if (auto It = m_Index.find(Key); It != m_Index.end()) + for (CbFieldView Entry : Manifest["Timestamps"sv]) { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - const IoHash RawHash = Obj["RawHash"sv].AsHash(); - const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); + const CbObjectView Obj = Entry.AsObjectView(); + const IoHash Key = Obj["Key"sv].AsHash(); - if (RawHash == IoHash::Zero || RawSize == 0) + if (auto It = m_Index.find(Key); It != m_Index.end()) { - ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); } + } + for (CbFieldView Entry : Manifest["RawInfo"sv]) + { + const CbObjectView Obj = Entry.AsObjectView(); + const IoHash Key = Obj["Key"sv].AsHash(); + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - m_Payloads[EntryIndex].RawHash = RawHash; - m_Payloads[EntryIndex].RawSize = RawSize; + const IoHash RawHash = Obj["RawHash"sv].AsHash(); + const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); + + if (RawHash == IoHash::Zero || RawSize == 0) + { + ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); + } + + m_Payloads[EntryIndex].RawHash = RawHash; + m_Payloads[EntryIndex].RawSize = RawSize; + } } } } @@ -578,14 +631,17 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) auto BlockIt = BlockSizes.find(BlockLocation.BlockIndex); if (BlockIt == BlockSizes.end()) { - ZEN_WARN("Unknown block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + ZEN_WARN("Unknown block {} for entry {} in '{}'", BlockLocation.BlockIndex, Entry.first.ToHexString(), m_BucketDir); } else { uint64_t BlockSize = BlockIt->second; if (BlockLocation.Offset + BlockLocation.Size > BlockSize) { - ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + ZEN_WARN("Range is outside of block {} for entry {} in '{}'", + BlockLocation.BlockIndex, + Entry.first.ToHexString(), + m_BucketDir); } else { @@ -783,21 +839,50 @@ void ZenCacheDiskLayer::CacheBucket::Flush() { ZEN_TRACE_CPU("Z$::Disk::Bucket::Flush"); + bool Expected = false; + if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) + { + return; + } + auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - m_BlockStore.Flush(); - - RwLock::SharedLockScope _(m_IndexLock); + m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); - MakeIndexSnapshot(); - SaveManifest(); + + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + IndexMap Index; + + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + MakeIndexSnapshot(); + Index = m_Index; + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + } + SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), std::move(Payloads))); } void -ZenCacheDiskLayer::CacheBucket::SaveManifest() +ZenCacheDiskLayer::CacheBucket::SaveManifest(CbObject&& Manifest) +{ + ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest"); + try + { + SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Manifest); + } + catch (std::exception& Err) + { + ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what()); + } +} + +CbObject +ZenCacheDiskLayer::CacheBucket::MakeManifest(IndexMap&& Index, std::vector<AccessTime>&& AccessTimes, std::vector<BucketPayload>&& Payloads) { using namespace std::literals; - ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest"); + ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeManifest"); CbObjectWriter Writer; Writer << "BucketId"sv << m_BucketId; @@ -805,46 +890,40 @@ ZenCacheDiskLayer::CacheBucket::SaveManifest() if (!m_Index.empty()) { - Writer.BeginArray("Timestamps"sv); - for (auto& Kv : m_Index) + Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size())); + Writer.BeginArray("Keys"sv); + for (auto& Kv : Index) { - const IoHash& Key = Kv.first; - GcClock::Tick AccessTime = m_AccessTimes[Kv.second]; + const IoHash& Key = Kv.first; + Writer.AddHash(Key); + } + Writer.EndArray(); - Writer.BeginObject(); - Writer << "Key"sv << Key; - Writer << "LastAccess"sv << AccessTime; - Writer.EndObject(); + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : Index) + { + GcClock::Tick AccessTime = AccessTimes[Kv.second]; + Writer.AddInteger(AccessTime); } Writer.EndArray(); - Writer.BeginArray("RawInfo"sv); + Writer.BeginArray("RawHash"sv); + for (auto& Kv : Index) { - for (auto& Kv : m_Index) - { - const IoHash& Key = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - if (Payload.RawHash != IoHash::Zero) - { - Writer.BeginObject(); - Writer << "Key"sv << Key; - Writer << "RawHash"sv << Payload.RawHash; - Writer << "RawSize"sv << Payload.RawSize; - Writer.EndObject(); - } - } + const BucketPayload& Payload = Payloads[Kv.second]; + Writer.AddHash(Payload.RawHash); } Writer.EndArray(); - } - try - { - SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); - } - catch (std::exception& Err) - { - ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what()); + Writer.BeginArray("RawSize"sv); + for (auto& Kv : Index) + { + const BucketPayload& Payload = Payloads[Kv.second]; + Writer.AddInteger(Payload.RawSize); + } + Writer.EndArray(); } + return Writer.Save(); } IoHash @@ -1200,7 +1279,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) ExpiredKeys.reserve(1024); std::vector<IoHash> Cids; - Cids.reserve(1024); + if (!GcCtx.SkipCid()) + { + Cids.reserve(1024); + } for (const auto& Entry : Index) { @@ -1298,8 +1380,25 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) MovedCount, TotalChunkCount, NiceBytes(OldTotalSize)); - RwLock::SharedLockScope _(m_IndexLock); - SaveManifest(); + + bool Expected = false; + if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) + { + return; + } + auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); + + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + IndexMap Index; + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + MakeIndexSnapshot(); + Index = m_Index; + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + } + SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), std::move(Payloads))); }); m_SlogFile.Flush(); @@ -1360,48 +1459,63 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) IndexMap Index; BlockStore::ReclaimSnapshotState BlockStoreState; { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State"); - - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_Index.empty()) + bool Expected = false; + if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir); + ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir); return; } - BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - - SaveManifest(); - Index = m_Index; + auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - for (const IoHash& Key : DeleteCacheKeys) + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; { - if (auto It = Index.find(Key); It != Index.end()) + ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State"); + RwLock::SharedLockScope IndexLock(m_IndexLock); + + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.empty()) { - const BucketPayload& Payload = m_Payloads[It->second]; - DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; - if (Entry.Location.Flags & DiskLocation::kStandaloneFile) + ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir); + return; + } + + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); + + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + Index = m_Index; + + for (const IoHash& Key : DeleteCacheKeys) + { + if (auto It = Index.find(Key); It != Index.end()) { - Entry.Location.Flags |= DiskLocation::kTombStone; - ExpiredStandaloneEntries.push_back(Entry); + const BucketPayload& Payload = m_Payloads[It->second]; + DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; + if (Entry.Location.Flags & DiskLocation::kStandaloneFile) + { + Entry.Location.Flags |= DiskLocation::kTombStone; + ExpiredStandaloneEntries.push_back(Entry); + } } } - } - if (GcCtx.IsDeletionMode()) - { - for (const auto& Entry : ExpiredStandaloneEntries) + if (GcCtx.IsDeletionMode()) { - m_Index.erase(Entry.Key); - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - DeletedChunks.insert(Entry.Key); + for (const auto& Entry : ExpiredStandaloneEntries) + { + m_Index.erase(Entry.Key); + m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + DeletedChunks.insert(Entry.Key); + } + m_SlogFile.Append(ExpiredStandaloneEntries); } - m_SlogFile.Append(ExpiredStandaloneEntries); } + SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), std::move(Payloads))); } if (GcCtx.IsDeletionMode()) diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h index 80c643afa..c4bedfee8 100644 --- a/src/zenserver/cache/cachedisklayer.h +++ b/src/zenserver/cache/cachedisklayer.h @@ -188,6 +188,7 @@ private: BlockStore m_BlockStore; Oid m_BucketId; uint64_t m_LargeObjectThreshold = 128 * 1024; + std::atomic_bool m_IsFlushing{}; // These files are used to manage storage of small objects for this bucket @@ -221,16 +222,17 @@ private: std::atomic_uint64_t m_TotalStandaloneSize{}; - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; - void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; - void MakeIndexSnapshot(); - uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); - uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition); - void OpenLog(const bool IsNew); - void SaveManifest(); + void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; + void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; + void MakeIndexSnapshot(); + uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); + uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition); + void OpenLog(const bool IsNew); + CbObject MakeManifest(IndexMap&& Index, std::vector<AccessTime>&& AccessTimes, std::vector<BucketPayload>&& Payloads); + void SaveManifest(CbObject&& Manifest); CacheValueDetails::ValueDetails GetValueDetails(const IoHash& Key, size_t Index) const; // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index 0a2947b16..1b6eeca3a 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -1030,46 +1030,50 @@ TEST_CASE("z$.gc") { ScopedTemporaryDirectory TempDir; GcManager Gc; - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); - const auto Bucket = "rightintwo"sv; - - std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; - - for (const auto& Key : Keys) { - IoBuffer Value = testutils::CreateBinaryCacheValue(128); - Zcs.Put(Bucket, Key, {.Value = Value}); - } - - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(2), GcClock::Now() - std::chrono::hours(2)); - GcCtx.CollectSmallObjects(true); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); + const auto Bucket = "rightintwo"sv; - Gc.CollectGarbage(GcCtx); + std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; for (const auto& Key : Keys) { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(Exists); + IoBuffer Value = testutils::CreateBinaryCacheValue(128); + Zcs.Put(Bucket, Key, {.Value = Value}); } - } - // Move forward in time and collect again - { - GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2), GcClock::Now() + std::chrono::minutes(2)); - GcCtx.CollectSmallObjects(true); + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(2), GcClock::Now() - std::chrono::hours(2)); + GcCtx.CollectSmallObjects(true); - Zcs.Flush(); - Gc.CollectGarbage(GcCtx); + Gc.CollectGarbage(GcCtx); - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(!Exists); + for (const auto& Key : Keys) + { + ZenCacheValue CacheValue; + const bool Exists = Zcs.Get(Bucket, Key, CacheValue); + CHECK(Exists); + } } + // Move forward in time and collect again + { + GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2), GcClock::Now() + std::chrono::minutes(2)); + GcCtx.CollectSmallObjects(true); + + Zcs.Flush(); + Gc.CollectGarbage(GcCtx); + + for (const auto& Key : Keys) + { + ZenCacheValue CacheValue; + const bool Exists = Zcs.Get(Bucket, Key, CacheValue); + CHECK(!Exists); + } + } + } + { + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); CHECK_EQ(0, Zcs.StorageSize().DiskSize); } } |