diff options
Diffstat (limited to 'zenstore')
| -rw-r--r-- | zenstore/gc.cpp | 181 | ||||
| -rw-r--r-- | zenstore/include/zenstore/gc.h | 48 |
2 files changed, 157 insertions, 72 deletions
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index 86f215b97..8f08c8b1f 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -6,22 +6,25 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> +#include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/string.h> #include <zencore/timer.h> #include <zenstore/CAS.h> #include <zenstore/cidstore.h> +#include <fmt/format.h> #include <filesystem> namespace zen { using namespace std::literals; +namespace fs = std::filesystem; ////////////////////////////////////////////////////////////////////////// CbObject -LoadCompactBinaryObject(const std::filesystem::path& Path) +LoadCompactBinaryObject(const fs::path& Path) { FileContents Result = ReadFile(Path); @@ -38,7 +41,7 @@ LoadCompactBinaryObject(const std::filesystem::path& Path) } void -SaveCompactBinaryObject(const std::filesystem::path& Path, const CbObject& Object) +SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) { WriteFile(Path, Object.GetBuffer().AsIoBuffer()); } @@ -269,6 +272,23 @@ CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes) ZEN_UNUSED(Hashes); } +GcStorageSize +CasGc::TotalStorageSize() const +{ + RwLock::SharedLockScope _(m_Lock); + + GcStorageSize TotalSize; + + for (GcStorage* Storage : m_GcStorage) + { + const auto Size = Storage->StorageSize(); + TotalSize.DiskSize += Size.DiskSize; + TotalSize.MemorySize += Size.MemorySize; + } + + return TotalSize; +} + ////////////////////////////////////////////////////////////////////////// GcScheduler::GcScheduler(CasGc& CasGc) : m_Log(logging::Get("gc")), m_CasGc(CasGc) @@ -287,21 +307,27 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config) m_Config = Config; + if (m_Config.Interval.count() && m_Config.Interval < m_Config.MonitorInterval) + { + m_Config.Interval = m_Config.MonitorInterval; + } + std::filesystem::create_directories(Config.RootDirectory); - GcClock::TimePoint LastGcTime = GcClock::Now(); + m_LastGcTime = GcClock::Now(); if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state")) { - LastGcTime = GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcTime"sv].AsInt64())); + m_LastGcTime = GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcTime"sv].AsInt64())); - if (LastGcTime + m_Config.Interval < GcClock::Now()) + if (m_LastGcTime + m_Config.Interval < GcClock::Now()) { - LastGcTime = GcClock::Now(); + // TODO: Trigger GC? + m_LastGcTime = GcClock::Now(); } } - m_NextGcTime = NextGcTime(LastGcTime); + m_NextGcTime = NextGcTime(m_LastGcTime); m_GcThread = std::jthread(&GcScheduler::SchedulerThread, this); } @@ -315,64 +341,129 @@ GcScheduler::Shutdown() } } +bool +GcScheduler::Trigger(const GcScheduler::TriggerParams& Params) +{ + if (m_Config.Enabled) + { + std::unique_lock Lock(m_GcMutex); + if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) + { + m_TriggerParams = Params; + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning); + m_GcSignal.notify_one(); + return true; + } + } + + return false; +} + void GcScheduler::SchedulerThread() { + using namespace fmt::literals; + + std::chrono::seconds WaitTime = m_Config.MonitorInterval; + for (;;) { + bool Timeout = false; { + ZEN_ASSERT(WaitTime.count() >= 0); std::unique_lock Lock(m_GcMutex); - if (m_GcSignal.wait_until(Lock, m_NextGcTime, [this]() { return Status() != GcSchedulerStatus::kIdle; })) - { - if (Status() == GcSchedulerStatus::kStopped) - { - break; - } - } - else - { - m_Status.store(static_cast<uint32_t>(GcSchedulerStatus::kRunning)); - } + Timeout = std::cv_status::timeout == m_GcSignal.wait_for(Lock, WaitTime); } - if (!m_Config.Enabled) + if (Status() == GcSchedulerStatus::kStopped) + { + break; + } + + if (!m_Config.Enabled || (!Timeout && Status() == GcSchedulerStatus::kIdle)) { - ZEN_INFO("disabled"); continue; } - Stopwatch Timer; + if (Timeout && Status() == GcSchedulerStatus::kIdle) + { + std::error_code Ec; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec); + GcStorageSize TotalSize = m_CasGc.TotalStorageSize(); + std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now()); + + if (RemaingTime < std::chrono::seconds::zero()) + { + RemaingTime = std::chrono::seconds::zero(); + } + + if (Ec) + { + ZEN_WARN("get disk space info FAILED, reason '{}'", Ec.message()); + } + + ZEN_INFO("{} in use, {} of total {} free disk space, {}", + NiceBytes(TotalSize.DiskSize), + NiceBytes(Space.Free), + NiceBytes(Space.Total), + m_Config.Interval.count() + ? "{} until next GC"_format(NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(RemaingTime).count()))) + : std::string("next scheduled GC no set")); + + // TODO: Trigger GC if max disk usage water mark is reached + + if (RemaingTime.count() > 0) + { + WaitTime = m_Config.MonitorInterval < RemaingTime ? m_Config.MonitorInterval : RemaingTime; + continue; + } + + WaitTime = m_Config.MonitorInterval; + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning); + } - DiskSpace Space; - DiskSpaceInfo(m_Config.RootDirectory, Space); - ZEN_INFO("garbage collection STARTING, disk space {}/{} (free/total)", NiceBytes(Space.Free), NiceBytes(Space.Total)); + ZEN_ASSERT(Status() == GcSchedulerStatus::kRunning); GcContext GcCtx; GcCtx.SetDeletionMode(true); GcCtx.CollectSmallObjects(m_Config.CollectSmallObjects); GcCtx.MaxCacheDuration(m_Config.MaxCacheDuration); - ZEN_DEBUG("collecting small objects {}, max cache duration {}s", - m_Config.CollectSmallObjects ? "YES"sv : "NO"sv, - m_Config.MaxCacheDuration.count()); + if (m_TriggerParams) + { + const auto TriggerParams = m_TriggerParams.value(); + m_TriggerParams.reset(); + + GcCtx.CollectSmallObjects(TriggerParams.CollectSmallObjects); + if (TriggerParams.MaxCacheDuration != std::chrono::seconds::max()) + { + GcCtx.MaxCacheDuration(TriggerParams.MaxCacheDuration); + } + } + + Stopwatch Timer; + + ZEN_INFO("garbage collection STARTING, small objects gc {}, max cache duration {}", + GcCtx.CollectSmallObjects() ? "ENABLED"sv : "DISABLED"sv, + NiceTimeSpanMs(uint64_t(std::chrono::duration_cast<std::chrono::milliseconds>(GcCtx.MaxCacheDuration()).count()))); m_CasGc.CollectGarbage(GcCtx); - m_Status = static_cast<uint32_t>(GcSchedulerStatus::kIdle); - m_NextGcTime = NextGcTime(GcClock::Now()); + m_LastGcTime = GcClock::Now(); + m_NextGcTime = NextGcTime(m_LastGcTime); + WaitTime = m_Config.MonitorInterval; { + const fs::path Path = m_Config.RootDirectory / "gc_state"; + ZEN_DEBUG("saving scheduler state to '{}'", Path); CbObjectWriter SchedulderState; - SchedulderState << "LastGcTime"sv << static_cast<int64_t>(GcClock::Now().time_since_epoch().count()); - SaveCompactBinaryObject(m_Config.RootDirectory / "gc_state", SchedulderState.Save()); + SchedulderState << "LastGcTime"sv << static_cast<int64_t>(m_LastGcTime.time_since_epoch().count()); + SaveCompactBinaryObject(Path, SchedulderState.Save()); } - DiskSpaceInfo(m_Config.RootDirectory, Space); + ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - ZEN_INFO("garbage collection DONE after {}, disk space {}/{} (free/total)", - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - NiceBytes(Space.Free), - NiceBytes(Space.Total)); + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kIdle); } } @@ -389,24 +480,6 @@ GcScheduler::NextGcTime(GcClock::TimePoint CurrentTime) } } -bool -GcScheduler::ScheduleNow() -{ - if (m_Config.Enabled) - { - ZEN_DEBUG("schedule NOW"); - std::unique_lock Lock(m_GcMutex); - if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) - { - m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning); - m_GcSignal.notify_one(); - return true; - } - } - - return false; -} - ////////////////////////////////////////////////////////////////////////// } // namespace zen diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h index 2ea35b131..20aada746 100644 --- a/zenstore/include/zenstore/gc.h +++ b/zenstore/include/zenstore/gc.h @@ -9,6 +9,7 @@ #include <chrono> #include <filesystem> #include <functional> +#include <optional> #include <span> #include <thread> @@ -42,7 +43,6 @@ public: /** Garbage Collection context object */ - class GcContext { public: @@ -83,7 +83,6 @@ private: retain. */ - class GcContributor { public: @@ -119,7 +118,6 @@ private: /** GC orchestrator */ - class CasGc { public: @@ -134,13 +132,14 @@ public: void CollectGarbage(GcContext& GcCtx); - void SetCidStore(CidStore* Cids); - void OnNewCidReferences(std::span<IoHash> Hashes); - void OnCommittedCidReferences(std::span<IoHash> Hashes); - void OnDroppedCidReferences(std::span<IoHash> Hashes); + void SetCidStore(CidStore* Cids); + void OnNewCidReferences(std::span<IoHash> Hashes); + void OnCommittedCidReferences(std::span<IoHash> Hashes); + void OnDroppedCidReferences(std::span<IoHash> Hashes); + GcStorageSize TotalStorageSize() const; private: - RwLock m_Lock; + mutable RwLock m_Lock; std::vector<GcContributor*> m_GcContribs; std::vector<GcStorage*> m_GcStorage; CidStore* m_CidStore; @@ -156,12 +155,16 @@ enum class GcSchedulerStatus : uint32_t struct GcSchedulerConfig { std::filesystem::path RootDirectory; + std::chrono::seconds MonitorInterval{30}; std::chrono::seconds Interval{}; std::chrono::seconds MaxCacheDuration{86400}; bool CollectSmallObjects = false; bool Enabled = false; }; +/** + * GC scheduler + */ class GcScheduler { public: @@ -169,23 +172,32 @@ public: ~GcScheduler(); void Initialize(const GcSchedulerConfig& Config); - bool ScheduleNow(); - GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); } void Shutdown(); + GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); } + + struct TriggerParams + { + bool CollectSmallObjects = false; + std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max(); + }; + + bool Trigger(const TriggerParams& Params); private: void SchedulerThread(); GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime); spdlog::logger& Log() { return m_Log; } - spdlog::logger& m_Log; - CasGc& m_CasGc; - GcSchedulerConfig m_Config; - GcClock::TimePoint m_NextGcTime{}; - std::atomic_uint32_t m_Status{}; - std::jthread m_GcThread; - std::mutex m_GcMutex; - std::condition_variable m_GcSignal; + spdlog::logger& m_Log; + CasGc& m_CasGc; + GcSchedulerConfig m_Config; + GcClock::TimePoint m_LastGcTime{}; + GcClock::TimePoint m_NextGcTime{}; + std::atomic_uint32_t m_Status{}; + std::jthread m_GcThread; + std::mutex m_GcMutex; + std::condition_variable m_GcSignal; + std::optional<TriggerParams> m_TriggerParams; }; } // namespace zen |