aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-12-08 19:13:43 +0100
committerPer Larsson <[email protected]>2021-12-08 19:13:43 +0100
commitd85c32dbf8c0d9ef50974ed609ecf073bd91c1ed (patch)
tree2ad88639ff2a5c3c2ecee9afbe3f5cfbe7d52fad /zenserver/cache/structuredcachestore.cpp
parentFirst pass of z$ garbage collection. (diff)
downloadzen-d85c32dbf8c0d9ef50974ed609ecf073bd91c1ed.tar.xz
zen-d85c32dbf8c0d9ef50974ed609ecf073bd91c1ed.zip
Added support for z$ small object garbage collection.
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp247
1 files changed, 205 insertions, 42 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 740023098..89214ded6 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -17,6 +17,7 @@
#include <zencore/testing.h>
#include <zencore/testutils.h>
#include <zencore/thread.h>
+#include <zencore/timer.h>
#include <zencore/windows.h>
#include <zenstore/basicfile.h>
#include <zenstore/caslog.h>
@@ -460,6 +461,7 @@ struct ZenCacheDiskLayer::CacheBucket
void Put(const IoHash& HashKey, const ZenCacheValue& Value);
void Drop();
void Flush();
+ void SaveManifest();
void Scrub(ScrubContext& Ctx);
void GatherReferences(GcContext& GcCtx);
void CollectGarbage(GcContext& GcCtx);
@@ -484,10 +486,12 @@ private:
GcClock::Tick LastAccess{};
};
- RwLock m_IndexLock;
- tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher> m_Index;
- uint64_t m_WriteCursor = 0;
- std::atomic_uint64_t m_TotalSize{};
+ using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>;
+
+ RwLock m_IndexLock;
+ IndexMap m_Index;
+ uint64_t m_WriteCursor = 0;
+ std::atomic_uint64_t m_TotalSize{};
void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
@@ -773,32 +777,35 @@ ZenCacheDiskLayer::CacheBucket::Drop()
void
ZenCacheDiskLayer::CacheBucket::Flush()
{
- using namespace std::literals;
-
RwLock::SharedLockScope _(m_IndexLock);
m_SobsFile.Flush();
m_SlogFile.Flush();
- // Update manifest
- {
- CbObjectWriter Writer;
- Writer << "BucketId"sv << m_BucketId;
+ SaveManifest();
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::SaveManifest()
+{
+ using namespace std::literals;
+
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << m_BucketId;
- if (!m_Index.empty())
+ if (!m_Index.empty())
+ {
+ Writer.BeginArray("Timestamps"sv);
+ for (auto& Kv : m_Index)
{
- Writer.BeginArray("Timestamps"sv);
- for (auto& Kv : m_Index)
- {
- const IoHash& Key = Kv.first;
- const IndexEntry& Entry = Kv.second;
- Writer << "Key"sv << Key << "LastAccess"sv << Entry.LastAccess;
- }
- Writer.EndArray();
+ const IoHash& Key = Kv.first;
+ const IndexEntry& Entry = Kv.second;
+ Writer << "Key"sv << Key << "LastAccess"sv << Entry.LastAccess;
}
-
- SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save());
+ Writer.EndArray();
}
+
+ SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save());
}
void
@@ -866,8 +873,12 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
const IoHash& HashKey = Kv.first;
const DiskLocation& Loc = Kv.second.Location;
- if (Loc.IsFlagSet(DiskLocation::kStructured) == false || Loc.IsFlagSet(DiskLocation::kTombStone) ||
- GcCtx.Expired(Kv.second.LastAccess))
+ if (!Loc.IsFlagSet(DiskLocation::kStructured) || Loc.IsFlagSet(DiskLocation::kTombStone))
+ {
+ continue;
+ }
+
+ if (GcCtx.Expired(Kv.second.LastAccess))
{
continue;
}
@@ -893,41 +904,193 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
void
ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
{
+ ZEN_INFO("collecting garbage from z$ bucket '{}'", m_BucketDir);
+
+ Flush();
+
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([this, &Timer] {
+ ZEN_INFO("garbage collect from z$ bucket '{}' DONE after {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ SaveManifest();
+ });
+
RwLock::ExclusiveLockScope _(m_IndexLock);
- const auto Reserved = size_t(m_Index.size() * 0.5);
+ std::vector<DiskIndexEntry> DiskEntries;
+ std::vector<std::pair<size_t, GcClock::Tick>> Timestamps;
+ uint64_t TotalSize{};
- std::vector<std::pair<IoHash, DiskLocation>> Expired;
- Expired.reserve(Reserved);
+ DiskEntries.reserve(m_Index.size());
+ Timestamps.reserve(m_Index.size());
- for (auto& Kv : m_Index)
+ for (size_t Idx = 0; auto& Kv : m_Index)
{
- const IoHash& HashKey = Kv.first;
- const IndexEntry& Entry = Kv.second;
- const bool Standalone = Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile);
-
- if (Standalone && GcCtx.Expired(Entry.LastAccess))
- {
- Expired.push_back(std::make_pair(HashKey, Entry.Location));
- }
+ DiskEntries.push_back({.Key = Kv.first, .Location = Kv.second.Location});
+ Timestamps.push_back(std::make_pair(Idx++, Kv.second.LastAccess));
+ TotalSize += Kv.second.Location.Size();
}
+ std::sort(Timestamps.begin(), Timestamps.end(), [](const auto& LHS, const auto& RHS) { return LHS.second < RHS.second; });
+
+ const GcClock::TimePoint Tp = GcCtx.Time() - GcCtx.MaxCacheDuration();
+ const GcClock::Tick TicksAllowed = Tp.time_since_epoch().count();
+
+ const auto LowerIt =
+ std::lower_bound(Timestamps.begin(), Timestamps.end(), TicksAllowed, [](const auto& Timestamp, GcClock::Tick Ticks) {
+ return Timestamp.second < Ticks;
+ });
+
+ const size_t FirstValid = std::distance(Timestamps.begin(), LowerIt);
+ const uint64_t NewCount = std::distance(LowerIt, Timestamps.end());
+ const uint64_t Count = Timestamps.size();
+
+ // Remove all standalone file(s)
{
std::error_code Ec;
WideStringBuilder<128> Path;
- for (const auto& E : Expired)
+
+ for (size_t Idx = 0; Idx < FirstValid; ++Idx)
{
- const IoHash& HashKey = E.first;
- const DiskLocation& Loc = E.second;
+ const auto& Entry = DiskEntries[Idx];
+
+ if (!Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
- Ec.clear();
Path.Reset();
- BuildPath(Path, HashKey);
- DeleteStandaloneCacheValue(Loc, HashKey, Path.c_str(), Ec);
+ BuildPath(Path, Entry.Key);
+
+ // NOTE: this will update index and log file
+ DeleteStandaloneCacheValue(Entry.Location, Entry.Key, Path.c_str(), Ec);
if (Ec)
{
- ZEN_ERROR("delete standalone cache file '{}' FAILED, reason '{}'", WideToUtf8(Path.ToString()), Ec.message());
+ ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason '{}'", WideToUtf8(Path.ToString()), Ec.message());
+ Ec.clear();
+ }
+ }
+ }
+
+ if (GcCtx.IsContainerGcEnabled())
+ {
+ // super naive GC implementation for small object container file
+
+ uint64_t NewContainerSize{};
+ for (size_t Idx = FirstValid; Idx < Count; ++Idx)
+ {
+ const DiskIndexEntry& Entry = DiskEntries[Idx];
+ if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile) == false)
+ {
+ NewContainerSize += Entry.Location.Size();
+ }
+ }
+
+ {
+ const uint64_t DiskSpaceMargin = (256 << 10);
+
+ std::error_code Ec;
+ DiskSpace Space = DiskSpaceInfo(m_BucketDir, Ec);
+ if (Ec || Space.Free < NewContainerSize + DiskSpaceMargin)
+ {
+ ZEN_WARN("garbage collect z$ bucket '{}' FAILED, not enough disk space {}/{} (required/free)",
+ m_BucketDir,
+ NiceBytes(NewContainerSize),
+ NiceBytes(Space.Free));
+ return;
+ }
+ }
+
+ // Copy non expired entries to temporary container file
+
+ std::filesystem::path TmpSobsPath{m_BucketDir / "zen.sobs.tmp"};
+ std::filesystem::path TmpSlogPath{m_BucketDir / "zen.slog.tmp"};
+ TCasLogFile<DiskIndexEntry> TmpLog;
+ BasicFile TmpSobs;
+ IndexMap TmpIndex;
+ uint64_t TmpCursor{};
+ uint64_t TmpTotalSize{};
+
+ TmpSobs.Open(TmpSobsPath, true);
+ std::vector<uint8_t> Chunk;
+
+ for (size_t Idx = FirstValid; Idx != Count; ++Idx)
+ {
+ const DiskIndexEntry& Entry = DiskEntries[Idx];
+
+ DiskLocation NewLoc;
+
+ if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ NewLoc = DiskLocation(0, Entry.Location.Size(), 0, DiskLocation::kStandaloneFile);
+ }
+ else
+ {
+ Chunk.resize(Entry.Location.Size());
+ m_SobsFile.Read(Chunk.data(), Chunk.size(), Entry.Location.Offset());
+
+ NewLoc = DiskLocation(TmpCursor, Chunk.size(), 0, 0);
+ TmpSobs.Write(Chunk.data(), Chunk.size(), TmpCursor);
+ TmpCursor = RoundUp(TmpCursor + Chunk.size(), 16);
+ }
+
+ TmpLog.Append({.Key = Entry.Key, .Location = NewLoc});
+ TmpIndex.insert({Entry.Key, {NewLoc, GcClock::TickCount()}});
+
+ TmpTotalSize += NewLoc.Size();
+ }
+
+ // Swap state
+ try
+ {
+ fs::path SobsPath{m_BucketDir / "zen.sobs"};
+ fs::path SlogPath{m_BucketDir / "zen.slog"};
+
+ m_SobsFile.Close();
+ m_SlogFile.Close();
+
+ fs::remove(SobsPath);
+ fs::remove(SlogPath);
+
+ fs::rename(TmpSobsPath, SobsPath);
+ fs::rename(TmpSlogPath, SlogPath);
+
+ const bool IsNew = false;
+ m_SobsFile.Open(SobsPath, IsNew);
+ m_SlogFile.Open(SobsPath, IsNew);
+
+ std::swap(m_Index, TmpIndex);
+ std::swap(m_WriteCursor, TmpCursor);
+ m_TotalSize = TmpTotalSize;
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what());
+
+ // Reset the container file and add standalone file(s)
+ m_SobsFile.Close();
+ m_SlogFile.Close();
+
+ const bool IsNew = true;
+ m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew);
+ m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew);
+
+ m_Index = IndexMap();
+ m_WriteCursor = 0;
+ m_TotalSize = 0;
+
+ for (size_t Idx = FirstValid; Idx != Count; ++Idx)
+ {
+ const DiskIndexEntry& Entry = DiskEntries[Idx];
+
+ if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ DiskLocation Loc(0, Entry.Location.Size(), 0, DiskLocation::kStandaloneFile);
+
+ m_SlogFile.Append({.Key = Entry.Key, .Location = Loc});
+ m_Index.insert({Entry.Key, {Loc, GcClock::TickCount()}});
+ m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
+ }
}
}
}