diff options
| author | Per Larsson <[email protected]> | 2021-12-09 12:11:13 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-12-09 12:11:13 +0100 |
| commit | 132a1c285ed6d07d0a210afb9249fbf4669e554d (patch) | |
| tree | 15fb27b2cfd0bca0b8f5fda0472db8b9cccbf67a /zenserver/cache/structuredcachestore.cpp | |
| parent | Added z$ GC tests. (diff) | |
| download | zen-132a1c285ed6d07d0a210afb9249fbf4669e554d.tar.xz zen-132a1c285ed6d07d0a210afb9249fbf4669e554d.zip | |
Fixed bug in z$ garbage collection.
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 267 |
1 files changed, 150 insertions, 117 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index b6f91857c..d97e2d250 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -392,7 +392,7 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GcClock::TickCount(), .Payload = Value.Value}); } - m_TotalSize.fetch_add(Value.Value.GetSize()); + m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order::relaxed); } ////////////////////////////////////////////////////////////////////////// @@ -589,12 +589,12 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) { - m_TotalSize.fetch_sub(Entry.Location.Size()); + m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); } else { m_Index[Entry.Key] = {.Location = Entry.Location, .LastAccess = GcClock::TickCount()}; - m_TotalSize.fetch_add(Entry.Location.Size()); + m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); } MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size()); }); @@ -761,7 +761,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& m_SlogFile.Append({.Key = HashKey, .Location = Loc}); m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset()); - m_TotalSize.fetch_add(Loc.Size()); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); } } @@ -905,93 +905,108 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) void ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { - ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir); - Flush(); + RwLock::ExclusiveLockScope _(m_IndexLock); + + const uint64_t OldCount = m_Index.size(); + const uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + uint64_t NewCount{}; + + ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir); + Stopwatch Timer; - const auto Guard = MakeGuard([this, &Timer] { - ZEN_INFO("garbage collect from z$ bucket '{}' DONE after {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + const auto Guard = MakeGuard([this, &Timer, &OldCount, &OldTotalSize, &NewCount] { + const uint64_t NewTotalSize = m_TotalSize.load(std::memory_order::relaxed); + ZEN_INFO("garbage collect from '{}' DONE after {}, collected #{} {} chunks of total #{} {}", + m_BucketDir, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + OldCount - NewCount, + NiceBytes(OldTotalSize - NewTotalSize), + OldCount, + NiceBytes(OldTotalSize)); SaveManifest(); }); - RwLock::ExclusiveLockScope _(m_IndexLock); - if (m_Index.empty()) { return; } - std::vector<DiskIndexEntry> DiskEntries; - std::vector<std::pair<size_t, GcClock::Tick>> Timestamps; - uint64_t TotalSize{}; + struct Candidate + { + IoHash Key; + DiskLocation Loc; + GcClock::Tick LastAccess; + + bool operator<(const Candidate& RHS) const { return LastAccess < RHS.LastAccess; } + }; - DiskEntries.reserve(m_Index.size()); - Timestamps.reserve(m_Index.size()); + std::vector<Candidate> Candidates; + Candidates.reserve(m_Index.size()); - for (size_t Idx = 0; auto& Kv : m_Index) + for (auto& Kv : m_Index) { - DiskEntries.push_back({.Key = Kv.first, .Location = Kv.second.Location}); - Timestamps.push_back(std::make_pair(Idx++, Kv.second.LastAccess)); - TotalSize += Kv.second.Location.Size(); + Candidates.push_back({.Key = Kv.first, .Loc = Kv.second.Location, .LastAccess = Kv.second.LastAccess}); } - std::sort(Timestamps.begin(), Timestamps.end(), [](const auto& LHS, const auto& RHS) { return LHS.second < RHS.second; }); - const GcClock::TimePoint Tp = GcCtx.Time() - GcCtx.MaxCacheDuration(); const GcClock::Tick TicksAllowed = Tp.time_since_epoch().count(); - const auto LowerIt = - std::lower_bound(Timestamps.begin(), Timestamps.end(), TicksAllowed, [](const auto& Timestamp, GcClock::Tick Ticks) { - return Timestamp.second < Ticks; - }); + std::sort(Candidates.begin(), Candidates.end()); + const auto ValidIt = std::lower_bound(Candidates.begin(), Candidates.end(), TicksAllowed, [](const auto& C, auto Ticks) { + return C.LastAccess < Ticks; + }); - const size_t FirstValid = std::distance(Timestamps.begin(), LowerIt); - const uint64_t NewCount = std::distance(LowerIt, Timestamps.end()); - const uint64_t Count = Timestamps.size(); + const size_t ValidIndex = std::distance(Candidates.begin(), ValidIt); + NewCount = std::distance(ValidIt, Candidates.end()); + + if (NewCount == OldCount) + { + return; + } + + const std::span<Candidate const> ValidEntries(Candidates.begin() + ValidIndex, NewCount); + const std::span<Candidate const> ExpiredEntries(Candidates.begin(), Candidates.size() - NewCount); // Remove all standalone file(s) { std::error_code Ec; PathBuilder Path; - for (size_t Idx = 0; Idx < FirstValid; ++Idx) + for (const auto& Entry : ExpiredEntries) { - const auto& Entry = DiskEntries[Idx]; - - if (!Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) + if (Entry.Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - continue; - } - - Path.Reset(); - BuildPath(Path, Entry.Key); + Path.Reset(); + BuildPath(Path, Entry.Key); - // NOTE: this will update index and log file - DeleteStandaloneCacheValue(Entry.Location, Entry.Key, Path.c_str(), Ec); + // NOTE: this will update index and log file + DeleteStandaloneCacheValue(Entry.Loc, Entry.Key, Path.c_str(), Ec); - if (Ec) - { - ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", WideToUtf8(Path.ToString()), Ec.message()); - Ec.clear(); + if (Ec) + { + ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", WideToUtf8(Path.ToString()), Ec.message()); + Ec.clear(); + } } } } - if (GcCtx.IsContainerGcEnabled() && Count != NewCount) + if (GcCtx.IsContainerGcEnabled() && !ExpiredEntries.empty()) { // super naive GC implementation for small object container file uint64_t NewContainerSize{}; - for (size_t Idx = FirstValid; Idx < Count; ++Idx) + for (const auto& Entry : ValidEntries) { - const DiskIndexEntry& Entry = DiskEntries[Idx]; - if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile) == false) + if (Entry.Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false) { - NewContainerSize += Entry.Location.Size(); + NewContainerSize += (Entry.Loc.Size() + sizeof(DiskLocation)); } } + if (NewContainerSize > 0) { const uint64_t DiskSpaceMargin = (256 << 10); @@ -1005,77 +1020,110 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) NiceBytes(Space.Free)); return; } - } - - // Copy non expired entries to temporary container file - std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"}; - std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"}; - TCasLogFile<DiskIndexEntry> TmpLog; - IndexMap TmpIndex; - uint64_t TmpCursor{}; - uint64_t TmpTotalSize{}; + // Copy non expired entries to temporary container file - { - BasicFile TmpSobs; - TmpSobs.Open(TmpSobsPath, true); - std::vector<uint8_t> Chunk; + std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"}; + std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"}; + TCasLogFile<DiskIndexEntry> TmpLog; + IndexMap TmpIndex; + uint64_t TmpCursor{}; + uint64_t TmpTotalSize{}; - for (size_t Idx = FirstValid; Idx != Count; ++Idx) { - const DiskIndexEntry& Entry = DiskEntries[Idx]; - - DiskLocation NewLoc; + BasicFile TmpSobs; + TmpSobs.Open(TmpSobsPath, true); + std::vector<uint8_t> Chunk; - if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) + for (const auto& Entry : ValidEntries) { - NewLoc = DiskLocation(0, Entry.Location.Size(), 0, DiskLocation::kStandaloneFile); + DiskLocation NewLoc; + + if (Entry.Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + NewLoc = DiskLocation(0, Entry.Loc.Size(), 0, DiskLocation::kStandaloneFile); + } + else + { + Chunk.resize(Entry.Loc.Size()); + m_SobsFile.Read(Chunk.data(), Chunk.size(), Entry.Loc.Offset()); + + NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, 0); + TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor); + TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16); + } + + TmpLog.Append({.Key = Entry.Key, .Location = NewLoc}); + TmpIndex.insert({Entry.Key, {NewLoc, GcClock::TickCount()}}); + + TmpTotalSize += NewLoc.Size(); } - else - { - Chunk.resize(Entry.Location.Size()); - m_SobsFile.Read(Chunk.data(), Chunk.size(), Entry.Location.Offset()); + } - NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, 0); - TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor); - TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16); - } + // Swap state + try + { + fs::path SobsPath{m_BucketDir / "zen.sobs"}; + fs::path SlogPath{m_BucketDir / "zen.slog"}; - TmpLog.Append({.Key = Entry.Key, .Location = NewLoc}); - TmpIndex.insert({Entry.Key, {NewLoc, GcClock::TickCount()}}); + m_SobsFile.Close(); + m_SlogFile.Close(); - TmpTotalSize += NewLoc.Size(); - } - } + fs::remove(SobsPath); + fs::remove(SlogPath); - // Swap state - try - { - fs::path SobsPath{m_BucketDir / "zen.sobs"}; - fs::path SlogPath{m_BucketDir / "zen.slog"}; + fs::rename(TmpSobsPath, SobsPath); + fs::rename(TmpSlogPath, SlogPath); - m_SobsFile.Close(); - m_SlogFile.Close(); + const bool IsNew = false; + m_SobsFile.Open(SobsPath, IsNew); + m_SlogFile.Open(SobsPath, IsNew); - fs::remove(SobsPath); - fs::remove(SlogPath); + std::swap(m_Index, TmpIndex); + std::swap(m_WriteCursor, TmpCursor); + m_TotalSize = TmpTotalSize; + } + catch (std::exception& Err) + { + ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); + + // Reset the container file and add standalone file(s) + m_SobsFile.Close(); + m_SlogFile.Close(); - fs::rename(TmpSobsPath, SobsPath); - fs::rename(TmpSlogPath, SlogPath); + const bool IsNew = true; + m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew); + m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew); - const bool IsNew = false; - m_SobsFile.Open(SobsPath, IsNew); - m_SlogFile.Open(SobsPath, IsNew); + m_Index = IndexMap(); + m_WriteCursor = 0; + m_TotalSize = 0; - std::swap(m_Index, TmpIndex); - std::swap(m_WriteCursor, TmpCursor); - m_TotalSize = TmpTotalSize; + for (const auto& Entry : ValidEntries) + { + if (Entry.Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + DiskLocation Loc(0, Entry.Loc.Size(), 0, DiskLocation::kStandaloneFile); + + m_SlogFile.Append({.Key = Entry.Key, .Location = Loc}); + m_Index.insert({Entry.Key, {Loc, GcClock::TickCount()}}); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + } + } + } } - catch (std::exception& Err) + else { - ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); + uint64_t SobSize{}; + for (const auto& Entry : ExpiredEntries) + { + if (Entry.Loc.IsFlagSet(DiskLocation::kStandaloneFile) == false) + { + m_Index.erase(Entry.Key); + SobSize += Entry.Loc.Size(); + } + } - // Reset the container file and add standalone file(s) m_SobsFile.Close(); m_SlogFile.Close(); @@ -1083,23 +1131,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew); m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew); - m_Index = IndexMap(); m_WriteCursor = 0; - m_TotalSize = 0; - - for (size_t Idx = FirstValid; Idx != Count; ++Idx) - { - const DiskIndexEntry& Entry = DiskEntries[Idx]; - - if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - DiskLocation Loc(0, Entry.Location.Size(), 0, DiskLocation::kStandaloneFile); - - m_SlogFile.Append({.Key = Entry.Key, .Location = Loc}); - m_Index.insert({Entry.Key, {Loc, GcClock::TickCount()}}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); - } - } + m_TotalSize.fetch_sub(SobSize, std::memory_order::relaxed); } } } @@ -1207,7 +1240,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalSize.fetch_add(Loc.Size()); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); } ////////////////////////////////////////////////////////////////////////// |