diff options
| author | Per Larsson <[email protected]> | 2021-12-08 19:13:43 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-12-08 19:13:43 +0100 |
| commit | d85c32dbf8c0d9ef50974ed609ecf073bd91c1ed (patch) | |
| tree | 2ad88639ff2a5c3c2ecee9afbe3f5cfbe7d52fad | |
| parent | First pass of z$ garbage collection. (diff) | |
| download | zen-d85c32dbf8c0d9ef50974ed609ecf073bd91c1ed.tar.xz zen-d85c32dbf8c0d9ef50974ed609ecf073bd91c1ed.zip | |
Added support for z$ small object garbage collection.
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 247 |
1 files changed, 205 insertions, 42 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 740023098..89214ded6 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -17,6 +17,7 @@ #include <zencore/testing.h> #include <zencore/testutils.h> #include <zencore/thread.h> +#include <zencore/timer.h> #include <zencore/windows.h> #include <zenstore/basicfile.h> #include <zenstore/caslog.h> @@ -460,6 +461,7 @@ struct ZenCacheDiskLayer::CacheBucket void Put(const IoHash& HashKey, const ZenCacheValue& Value); void Drop(); void Flush(); + void SaveManifest(); void Scrub(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); void CollectGarbage(GcContext& GcCtx); @@ -484,10 +486,12 @@ private: GcClock::Tick LastAccess{}; }; - RwLock m_IndexLock; - tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher> m_Index; - uint64_t m_WriteCursor = 0; - std::atomic_uint64_t m_TotalSize{}; + using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>; + + RwLock m_IndexLock; + IndexMap m_Index; + uint64_t m_WriteCursor = 0; + std::atomic_uint64_t m_TotalSize{}; void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); @@ -773,32 +777,35 @@ ZenCacheDiskLayer::CacheBucket::Drop() void ZenCacheDiskLayer::CacheBucket::Flush() { - using namespace std::literals; - RwLock::SharedLockScope _(m_IndexLock); m_SobsFile.Flush(); m_SlogFile.Flush(); - // Update manifest - { - CbObjectWriter Writer; - Writer << "BucketId"sv << m_BucketId; + SaveManifest(); +} + +void +ZenCacheDiskLayer::CacheBucket::SaveManifest() +{ + using namespace std::literals; + + CbObjectWriter Writer; + Writer << "BucketId"sv << m_BucketId; - if (!m_Index.empty()) + if (!m_Index.empty()) + { + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : m_Index) { - Writer.BeginArray("Timestamps"sv); - for (auto& Kv : m_Index) - { - const IoHash& Key = Kv.first; - const IndexEntry& Entry = Kv.second; - Writer << "Key"sv << Key << "LastAccess"sv << Entry.LastAccess; - } - Writer.EndArray(); + const IoHash& Key = Kv.first; + const IndexEntry& Entry = Kv.second; + Writer << "Key"sv << Key << "LastAccess"sv << Entry.LastAccess; } - - SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); + Writer.EndArray(); } + + SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); } void @@ -866,8 +873,12 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) const IoHash& HashKey = Kv.first; const DiskLocation& Loc = Kv.second.Location; - if (Loc.IsFlagSet(DiskLocation::kStructured) == false || Loc.IsFlagSet(DiskLocation::kTombStone) || - GcCtx.Expired(Kv.second.LastAccess)) + if (!Loc.IsFlagSet(DiskLocation::kStructured) || Loc.IsFlagSet(DiskLocation::kTombStone)) + { + continue; + } + + if (GcCtx.Expired(Kv.second.LastAccess)) { continue; } @@ -893,41 +904,193 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) void ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { + ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir); + + Flush(); + + Stopwatch Timer; + const auto Guard = MakeGuard([this, &Timer] { + ZEN_INFO("garbage collect from z$ bucket '{}' DONE after {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + SaveManifest(); + }); + RwLock::ExclusiveLockScope _(m_IndexLock); - const auto Reserved = size_t(m_Index.size() * 0.5); + std::vector<DiskIndexEntry> DiskEntries; + std::vector<std::pair<size_t, GcClock::Tick>> Timestamps; + uint64_t TotalSize{}; - std::vector<std::pair<IoHash, DiskLocation>> Expired; - Expired.reserve(Reserved); + DiskEntries.reserve(m_Index.size()); + Timestamps.reserve(m_Index.size()); - for (auto& Kv : m_Index) + for (size_t Idx = 0; auto& Kv : m_Index) { - const IoHash& HashKey = Kv.first; - const IndexEntry& Entry = Kv.second; - const bool Standalone = Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile); - - if (Standalone && GcCtx.Expired(Entry.LastAccess)) - { - Expired.push_back(std::make_pair(HashKey, Entry.Location)); - } + DiskEntries.push_back({.Key = Kv.first, .Location = Kv.second.Location}); + Timestamps.push_back(std::make_pair(Idx++, Kv.second.LastAccess)); + TotalSize += Kv.second.Location.Size(); } + std::sort(Timestamps.begin(), Timestamps.end(), [](const auto& LHS, const auto& RHS) { return LHS.second < RHS.second; }); + + const GcClock::TimePoint Tp = GcCtx.Time() - GcCtx.MaxCacheDuration(); + const GcClock::Tick TicksAllowed = Tp.time_since_epoch().count(); + + const auto LowerIt = + std::lower_bound(Timestamps.begin(), Timestamps.end(), TicksAllowed, [](const auto& Timestamp, GcClock::Tick Ticks) { + return Timestamp.second < Ticks; + }); + + const size_t FirstValid = std::distance(Timestamps.begin(), LowerIt); + const uint64_t NewCount = std::distance(LowerIt, Timestamps.end()); + const uint64_t Count = Timestamps.size(); + + // Remove all standalone file(s) { std::error_code Ec; WideStringBuilder<128> Path; - for (const auto& E : Expired) + + for (size_t Idx = 0; Idx < FirstValid; ++Idx) { - const IoHash& HashKey = E.first; - const DiskLocation& Loc = E.second; + const auto& Entry = DiskEntries[Idx]; + + if (!Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + continue; + } - Ec.clear(); Path.Reset(); - BuildPath(Path, HashKey); - DeleteStandaloneCacheValue(Loc, HashKey, Path.c_str(), Ec); + BuildPath(Path, Entry.Key); + + // NOTE: this will update index and log file + DeleteStandaloneCacheValue(Entry.Location, Entry.Key, Path.c_str(), Ec); if (Ec) { - ZEN_ERROR("delete standalone cache file '{}' FAILED, reason '{}'", WideToUtf8(Path.ToString()), Ec.message()); + ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", WideToUtf8(Path.ToString()), Ec.message()); + Ec.clear(); + } + } + } + + if (GcCtx.IsContainerGcEnabled()) + { + // super naive GC implementation for small object container file + + uint64_t NewContainerSize{}; + for (size_t Idx = FirstValid; Idx < Count; ++Idx) + { + const DiskIndexEntry& Entry = DiskEntries[Idx]; + if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile) == false) + { + NewContainerSize += Entry.Location.Size(); + } + } + + { + const uint64_t DiskSpaceMargin = (256 << 10); + + std::error_code Ec; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Ec); + if (Ec || Space.Free < NewContainerSize + DiskSpaceMargin) + { + ZEN_WARN("garbage collect z$ bucket '{}' FAILED, not enough disk space {}/{} (required/free)", + m_BucketDir, + NiceBytes(NewContainerSize), + NiceBytes(Space.Free)); + return; + } + } + + // Copy non expired entries to temporary container file + + std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"}; + std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"}; + TCasLogFile<DiskIndexEntry> TmpLog; + BasicFile TmpSobs; + IndexMap TmpIndex; + uint64_t TmpCursor{}; + uint64_t TmpTotalSize{}; + + TmpSobs.Open(TmpSobsPath, true); + std::vector<uint8_t> Chunk; + + for (size_t Idx = FirstValid; Idx != Count; ++Idx) + { + const DiskIndexEntry& Entry = DiskEntries[Idx]; + + DiskLocation NewLoc; + + if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + NewLoc = DiskLocation(0, Entry.Location.Size(), 0, DiskLocation::kStandaloneFile); + } + else + { + Chunk.resize(Entry.Location.Size()); + m_SobsFile.Read(Chunk.data(), Chunk.size(), Entry.Location.Offset()); + + NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, 0); + TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor); + TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16); + } + + TmpLog.Append({.Key = Entry.Key, .Location = NewLoc}); + TmpIndex.insert({Entry.Key, {NewLoc, GcClock::TickCount()}}); + + TmpTotalSize += NewLoc.Size(); + } + + // Swap state + try + { + fs::path SobsPath{m_BucketDir / "zen.sobs"}; + fs::path SlogPath{m_BucketDir / "zen.slog"}; + + m_SobsFile.Close(); + m_SlogFile.Close(); + + fs::remove(SobsPath); + fs::remove(SlogPath); + + fs::rename(TmpSobsPath, SobsPath); + fs::rename(TmpSlogPath, SlogPath); + + const bool IsNew = false; + m_SobsFile.Open(SobsPath, IsNew); + m_SlogFile.Open(SobsPath, IsNew); + + std::swap(m_Index, TmpIndex); + std::swap(m_WriteCursor, TmpCursor); + m_TotalSize = TmpTotalSize; + } + catch (std::exception& Err) + { + ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); + + // Reset the container file and add standalone file(s) + m_SobsFile.Close(); + m_SlogFile.Close(); + + const bool IsNew = true; + m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew); + m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew); + + m_Index = IndexMap(); + m_WriteCursor = 0; + m_TotalSize = 0; + + for (size_t Idx = FirstValid; Idx != Count; ++Idx) + { + const DiskIndexEntry& Entry = DiskEntries[Idx]; + + if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + DiskLocation Loc(0, Entry.Location.Size(), 0, DiskLocation::kStandaloneFile); + + m_SlogFile.Append({.Key = Entry.Key, .Location = Loc}); + m_Index.insert({Entry.Key, {Loc, GcClock::TickCount()}}); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + } } } } |