diff options
| author | Dan Engelbrecht <[email protected]> | 2023-10-03 13:31:02 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-03 13:31:02 +0200 |
| commit | 68a72b68592c416969bd36f413eb2b2762b9fcff (patch) | |
| tree | 9a5fc28eb9040f010c92f86a1745f9418dfc91ca /src/zenserver/cache/cachedisklayer.cpp | |
| parent | clean up date formatting (#440) (diff) | |
| download | zen-68a72b68592c416969bd36f413eb2b2762b9fcff.tar.xz zen-68a72b68592c416969bd36f413eb2b2762b9fcff.zip | |
faster accesstime save restore (#439)
- Improvement: Reduce time a cache bucket is locked for write when flushing/garbage collecting
- Change format for faster read/write and reduced size on disk
- Don't lock index while writing manifest to disk
- Skip garbage collect if we are currently in a Flush operation
- BlockStore::Flush no longer terminates currently writing block
- Garbage collect references to currently writing block but keep the block as new data may be added
- Fix BlockStore::Prune used disk space calculation
- Don't materialize data in filecas when we just need the size
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 300 |
1 files changed, 207 insertions, 93 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 98a24116f..9883e2119 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -238,37 +238,90 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo const auto _ = MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - for (CbFieldView Entry : Manifest["Timestamps"sv]) + uint64_t Count = Manifest["Count"sv].AsUInt64(0); + if (Count != 0) { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - - if (auto It = m_Index.find(Key); It != m_Index.end()) + std::vector<size_t> KeysIndexes; + KeysIndexes.reserve(Count); + CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); + for (CbFieldView& KeyView : KeyArray) + { + if (auto It = m_Index.find(KeyView.AsHash()); It != m_Index.end()) + { + KeysIndexes.push_back(It.value()); + continue; + } + KeysIndexes.push_back((uint64_t)-1); + } + size_t KeyIndexOffset = 0; + CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); + for (CbFieldView& TimeStampView : TimeStampArray) + { + size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex == (uint64_t)-1) + { + continue; + } + m_AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + } + KeyIndexOffset = 0; + CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); + for (CbFieldView& RawHashView : RawHashArray) { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); + size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex == (uint64_t)-1) + { + continue; + } + m_Payloads[KeyIndex].RawHash = RawHashView.AsHash(); + } + KeyIndexOffset = 0; + CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); + for (CbFieldView& RawSizeView : RawSizeArray) + { + size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex == (uint64_t)-1) + { + continue; + } + m_Payloads[KeyIndex].RawSize = RawSizeView.AsUInt64(); } } - for (CbFieldView Entry : Manifest["RawInfo"sv]) + + ////// Legacy format read { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - if (auto It = m_Index.find(Key); It != m_Index.end()) + for (CbFieldView Entry : Manifest["Timestamps"sv]) { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - const IoHash RawHash = Obj["RawHash"sv].AsHash(); - const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); + const CbObjectView Obj = Entry.AsObjectView(); + const IoHash Key = Obj["Key"sv].AsHash(); - if (RawHash == IoHash::Zero || RawSize == 0) + if (auto It = m_Index.find(Key); It != m_Index.end()) { - ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); } + } + for (CbFieldView Entry : Manifest["RawInfo"sv]) + { + const CbObjectView Obj = Entry.AsObjectView(); + const IoHash Key = Obj["Key"sv].AsHash(); + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - m_Payloads[EntryIndex].RawHash = RawHash; - m_Payloads[EntryIndex].RawSize = RawSize; + const IoHash RawHash = Obj["RawHash"sv].AsHash(); + const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); + + if (RawHash == IoHash::Zero || RawSize == 0) + { + ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); + } + + m_Payloads[EntryIndex].RawHash = RawHash; + m_Payloads[EntryIndex].RawSize = RawSize; + } } } } @@ -578,14 +631,17 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) auto BlockIt = BlockSizes.find(BlockLocation.BlockIndex); if (BlockIt == BlockSizes.end()) { - ZEN_WARN("Unknown block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + ZEN_WARN("Unknown block {} for entry {} in '{}'", BlockLocation.BlockIndex, Entry.first.ToHexString(), m_BucketDir); } else { uint64_t BlockSize = BlockIt->second; if (BlockLocation.Offset + BlockLocation.Size > BlockSize) { - ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + ZEN_WARN("Range is outside of block {} for entry {} in '{}'", + BlockLocation.BlockIndex, + Entry.first.ToHexString(), + m_BucketDir); } else { @@ -783,21 +839,50 @@ void ZenCacheDiskLayer::CacheBucket::Flush() { ZEN_TRACE_CPU("Z$::Disk::Bucket::Flush"); + bool Expected = false; + if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) + { + return; + } + auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - m_BlockStore.Flush(); - - RwLock::SharedLockScope _(m_IndexLock); + m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); - MakeIndexSnapshot(); - SaveManifest(); + + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + IndexMap Index; + + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + MakeIndexSnapshot(); + Index = m_Index; + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + } + SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), std::move(Payloads))); } void -ZenCacheDiskLayer::CacheBucket::SaveManifest() +ZenCacheDiskLayer::CacheBucket::SaveManifest(CbObject&& Manifest) +{ + ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest"); + try + { + SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Manifest); + } + catch (std::exception& Err) + { + ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what()); + } +} + +CbObject +ZenCacheDiskLayer::CacheBucket::MakeManifest(IndexMap&& Index, std::vector<AccessTime>&& AccessTimes, std::vector<BucketPayload>&& Payloads) { using namespace std::literals; - ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest"); + ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeManifest"); CbObjectWriter Writer; Writer << "BucketId"sv << m_BucketId; @@ -805,46 +890,40 @@ ZenCacheDiskLayer::CacheBucket::SaveManifest() if (!m_Index.empty()) { - Writer.BeginArray("Timestamps"sv); - for (auto& Kv : m_Index) + Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size())); + Writer.BeginArray("Keys"sv); + for (auto& Kv : Index) { - const IoHash& Key = Kv.first; - GcClock::Tick AccessTime = m_AccessTimes[Kv.second]; + const IoHash& Key = Kv.first; + Writer.AddHash(Key); + } + Writer.EndArray(); - Writer.BeginObject(); - Writer << "Key"sv << Key; - Writer << "LastAccess"sv << AccessTime; - Writer.EndObject(); + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : Index) + { + GcClock::Tick AccessTime = AccessTimes[Kv.second]; + Writer.AddInteger(AccessTime); } Writer.EndArray(); - Writer.BeginArray("RawInfo"sv); + Writer.BeginArray("RawHash"sv); + for (auto& Kv : Index) { - for (auto& Kv : m_Index) - { - const IoHash& Key = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - if (Payload.RawHash != IoHash::Zero) - { - Writer.BeginObject(); - Writer << "Key"sv << Key; - Writer << "RawHash"sv << Payload.RawHash; - Writer << "RawSize"sv << Payload.RawSize; - Writer.EndObject(); - } - } + const BucketPayload& Payload = Payloads[Kv.second]; + Writer.AddHash(Payload.RawHash); } Writer.EndArray(); - } - try - { - SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); - } - catch (std::exception& Err) - { - ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what()); + Writer.BeginArray("RawSize"sv); + for (auto& Kv : Index) + { + const BucketPayload& Payload = Payloads[Kv.second]; + Writer.AddInteger(Payload.RawSize); + } + Writer.EndArray(); } + return Writer.Save(); } IoHash @@ -1200,7 +1279,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) ExpiredKeys.reserve(1024); std::vector<IoHash> Cids; - Cids.reserve(1024); + if (!GcCtx.SkipCid()) + { + Cids.reserve(1024); + } for (const auto& Entry : Index) { @@ -1298,8 +1380,25 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) MovedCount, TotalChunkCount, NiceBytes(OldTotalSize)); - RwLock::SharedLockScope _(m_IndexLock); - SaveManifest(); + + bool Expected = false; + if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) + { + return; + } + auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); + + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + IndexMap Index; + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + MakeIndexSnapshot(); + Index = m_Index; + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + } + SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), std::move(Payloads))); }); m_SlogFile.Flush(); @@ -1360,48 +1459,63 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) IndexMap Index; BlockStore::ReclaimSnapshotState BlockStoreState; { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State"); - - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_Index.empty()) + bool Expected = false; + if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir); + ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir); return; } - BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - - SaveManifest(); - Index = m_Index; + auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - for (const IoHash& Key : DeleteCacheKeys) + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; { - if (auto It = Index.find(Key); It != Index.end()) + ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State"); + RwLock::SharedLockScope IndexLock(m_IndexLock); + + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.empty()) { - const BucketPayload& Payload = m_Payloads[It->second]; - DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; - if (Entry.Location.Flags & DiskLocation::kStandaloneFile) + ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir); + return; + } + + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); + + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + Index = m_Index; + + for (const IoHash& Key : DeleteCacheKeys) + { + if (auto It = Index.find(Key); It != Index.end()) { - Entry.Location.Flags |= DiskLocation::kTombStone; - ExpiredStandaloneEntries.push_back(Entry); + const BucketPayload& Payload = m_Payloads[It->second]; + DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; + if (Entry.Location.Flags & DiskLocation::kStandaloneFile) + { + Entry.Location.Flags |= DiskLocation::kTombStone; + ExpiredStandaloneEntries.push_back(Entry); + } } } - } - if (GcCtx.IsDeletionMode()) - { - for (const auto& Entry : ExpiredStandaloneEntries) + if (GcCtx.IsDeletionMode()) { - m_Index.erase(Entry.Key); - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - DeletedChunks.insert(Entry.Key); + for (const auto& Entry : ExpiredStandaloneEntries) + { + m_Index.erase(Entry.Key); + m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + DeletedChunks.insert(Entry.Key); + } + m_SlogFile.Append(ExpiredStandaloneEntries); } - m_SlogFile.Append(ExpiredStandaloneEntries); } + SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), std::move(Payloads))); } if (GcCtx.IsDeletionMode()) |