diff options
| author | Per Larsson <[email protected]> | 2021-12-12 12:04:31 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-12-12 12:04:31 +0100 |
| commit | a40133fe893100631f9bb8cd68fb7c2edbab0759 (patch) | |
| tree | 80cdcb8af8fad50d1004116671a655ec4495d729 | |
| parent | Added size to GcStorage. (diff) | |
| download | zen-a40133fe893100631f9bb8cd68fb7c2edbab0759.tar.xz zen-a40133fe893100631f9bb8cd68fb7c2edbab0759.zip | |
Added support for triggering GC with different params and refactored GC scheduler.
| -rw-r--r-- | zenserver/admin/admin.cpp | 51 | ||||
| -rw-r--r-- | zenserver/admin/admin.h | 14 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 17 | ||||
| -rw-r--r-- | zenstore/gc.cpp | 181 | ||||
| -rw-r--r-- | zenstore/include/zenstore/gc.h | 48 |
5 files changed, 189 insertions, 122 deletions
diff --git a/zenserver/admin/admin.cpp b/zenserver/admin/admin.cpp index 0d7a57e36..8a3ab460c 100644 --- a/zenserver/admin/admin.cpp +++ b/zenserver/admin/admin.cpp @@ -6,10 +6,13 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/string.h> +#include <zenstore/gc.h> + +#include <chrono> namespace zen { -HttpAdminService::HttpAdminService() +HttpAdminService::HttpAdminService(GcScheduler& Scheduler) : m_GcScheduler(Scheduler) { using namespace std::literals; @@ -25,42 +28,42 @@ HttpAdminService::HttpAdminService() m_Router.RegisterRoute( "gc", [this](HttpRouterRequest& Req) { - CbObject Response; - if (m_GcHandler.Status) - { - Response = m_GcHandler.Status(); - } - else - { - CbObjectWriter Writer; - Writer << "Status"sv - << "Ok"sv; - Response = Writer.Save(); - } - Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response); + const GcSchedulerStatus Status = m_GcScheduler.Status(); + + CbObjectWriter Response; + Response << "Status"sv << (GcSchedulerStatus::kIdle == Status ? "Idle"sv : "Running"sv); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "gc", [this](HttpRouterRequest& Req) { - CbObject Response; - if (m_GcHandler.Trigger) + HttpServerRequest& HttpReq = Req.ServerRequest(); + const HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + GcScheduler::TriggerParams GcParams; + + if (auto Param = Params.GetValue("smallobjects"); Param == "true"sv) { - Response = m_GcHandler.Trigger(); + GcParams.CollectSmallObjects = true; } - else + + if (auto Param = Params.GetValue("maxcacheduration"); Param.empty() == false) { - CbObjectWriter Writer; - Writer << "Status"sv - << "Ok"sv; - Response = Writer.Save(); + if (auto Value = ParseInt<uint64_t>(Param)) + { + GcParams.MaxCacheDuration = std::chrono::seconds(Value.value()); + } } - Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response); + + const bool Started = m_GcScheduler.Trigger(GcParams); + + CbObjectWriter Response; + Response << "Status"sv << (Started ? "Started"sv : "Running"sv); + HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); }, HttpVerb::kPost); - // RPC endpoint m_Router.RegisterRoute( "", [this](HttpRouterRequest& Req) { diff --git a/zenserver/admin/admin.h b/zenserver/admin/admin.h index f8fcab7de..9463ffbb3 100644 --- a/zenserver/admin/admin.h +++ b/zenserver/admin/admin.h @@ -7,26 +7,20 @@ namespace zen { +class GcScheduler; + class HttpAdminService : public zen::HttpService { public: - HttpAdminService(); + HttpAdminService(GcScheduler& Scheduler); ~HttpAdminService(); virtual const char* BaseUri() const override; virtual void HandleRequest(zen::HttpServerRequest& Request) override; - struct GcHandler - { - std::function<CbObject()> Trigger; - std::function<CbObject()> Status; - }; - - void RegisterGcHandler(GcHandler&& Handler) { m_GcHandler = std::forward<GcHandler>(Handler); } - private: HttpRequestRouter m_Router; - GcHandler m_GcHandler; + GcScheduler& m_GcScheduler; }; } // namespace zen diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index c4cc22140..e14f93f5b 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -195,21 +195,6 @@ public: .HttpServerClass = std::string(ServerOptions.HttpServerClass), .BuildVersion = std::string(BUILD_VERSION)}); - m_AdminService.RegisterGcHandler({.Trigger = - [this]() { - CbObjectWriter Writer; - const bool Started = m_GcScheduler.ScheduleNow(); - Writer << "Status"sv << (Started ? "Started"sv : "Running"sv); - return Writer.Save(); - }, - .Status = - [this]() { - CbObjectWriter Writer; - const GcSchedulerStatus Status = m_GcScheduler.Status(); - Writer << "Status"sv << (GcSchedulerStatus::kIdle == Status ? "Idle"sv : "Running"sv); - return Writer.Save(); - }}); - // Ok so now we're configured, let's kick things off m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass); @@ -540,7 +525,7 @@ private: zen::RefPtr<zen::ProjectStore> m_ProjectStore; std::unique_ptr<zen::HttpProjectService> m_HttpProjectService; std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService; - zen::HttpAdminService m_AdminService; + zen::HttpAdminService m_AdminService{m_GcScheduler}; zen::HttpHealthService m_HealthService; zen::Mesh m_ZenMesh{m_IoContext}; std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index 86f215b97..8f08c8b1f 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -6,22 +6,25 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> +#include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/string.h> #include <zencore/timer.h> #include <zenstore/CAS.h> #include <zenstore/cidstore.h> +#include <fmt/format.h> #include <filesystem> namespace zen { using namespace std::literals; +namespace fs = std::filesystem; ////////////////////////////////////////////////////////////////////////// CbObject -LoadCompactBinaryObject(const std::filesystem::path& Path) +LoadCompactBinaryObject(const fs::path& Path) { FileContents Result = ReadFile(Path); @@ -38,7 +41,7 @@ LoadCompactBinaryObject(const std::filesystem::path& Path) } void -SaveCompactBinaryObject(const std::filesystem::path& Path, const CbObject& Object) +SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) { WriteFile(Path, Object.GetBuffer().AsIoBuffer()); } @@ -269,6 +272,23 @@ CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes) ZEN_UNUSED(Hashes); } +GcStorageSize +CasGc::TotalStorageSize() const +{ + RwLock::SharedLockScope _(m_Lock); + + GcStorageSize TotalSize; + + for (GcStorage* Storage : m_GcStorage) + { + const auto Size = Storage->StorageSize(); + TotalSize.DiskSize += Size.DiskSize; + TotalSize.MemorySize += Size.MemorySize; + } + + return TotalSize; +} + ////////////////////////////////////////////////////////////////////////// GcScheduler::GcScheduler(CasGc& CasGc) : m_Log(logging::Get("gc")), m_CasGc(CasGc) @@ -287,21 +307,27 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config) m_Config = Config; + if (m_Config.Interval.count() && m_Config.Interval < m_Config.MonitorInterval) + { + m_Config.Interval = m_Config.MonitorInterval; + } + std::filesystem::create_directories(Config.RootDirectory); - GcClock::TimePoint LastGcTime = GcClock::Now(); + m_LastGcTime = GcClock::Now(); if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state")) { - LastGcTime = GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcTime"sv].AsInt64())); + m_LastGcTime = GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcTime"sv].AsInt64())); - if (LastGcTime + m_Config.Interval < GcClock::Now()) + if (m_LastGcTime + m_Config.Interval < GcClock::Now()) { - LastGcTime = GcClock::Now(); + // TODO: Trigger GC? + m_LastGcTime = GcClock::Now(); } } - m_NextGcTime = NextGcTime(LastGcTime); + m_NextGcTime = NextGcTime(m_LastGcTime); m_GcThread = std::jthread(&GcScheduler::SchedulerThread, this); } @@ -315,64 +341,129 @@ GcScheduler::Shutdown() } } +bool +GcScheduler::Trigger(const GcScheduler::TriggerParams& Params) +{ + if (m_Config.Enabled) + { + std::unique_lock Lock(m_GcMutex); + if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) + { + m_TriggerParams = Params; + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning); + m_GcSignal.notify_one(); + return true; + } + } + + return false; +} + void GcScheduler::SchedulerThread() { + using namespace fmt::literals; + + std::chrono::seconds WaitTime = m_Config.MonitorInterval; + for (;;) { + bool Timeout = false; { + ZEN_ASSERT(WaitTime.count() >= 0); std::unique_lock Lock(m_GcMutex); - if (m_GcSignal.wait_until(Lock, m_NextGcTime, [this]() { return Status() != GcSchedulerStatus::kIdle; })) - { - if (Status() == GcSchedulerStatus::kStopped) - { - break; - } - } - else - { - m_Status.store(static_cast<uint32_t>(GcSchedulerStatus::kRunning)); - } + Timeout = std::cv_status::timeout == m_GcSignal.wait_for(Lock, WaitTime); } - if (!m_Config.Enabled) + if (Status() == GcSchedulerStatus::kStopped) + { + break; + } + + if (!m_Config.Enabled || (!Timeout && Status() == GcSchedulerStatus::kIdle)) { - ZEN_INFO("disabled"); continue; } - Stopwatch Timer; + if (Timeout && Status() == GcSchedulerStatus::kIdle) + { + std::error_code Ec; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec); + GcStorageSize TotalSize = m_CasGc.TotalStorageSize(); + std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now()); + + if (RemaingTime < std::chrono::seconds::zero()) + { + RemaingTime = std::chrono::seconds::zero(); + } + + if (Ec) + { + ZEN_WARN("get disk space info FAILED, reason '{}'", Ec.message()); + } + + ZEN_INFO("{} in use, {} of total {} free disk space, {}", + NiceBytes(TotalSize.DiskSize), + NiceBytes(Space.Free), + NiceBytes(Space.Total), + m_Config.Interval.count() + ? "{} until next GC"_format(NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(RemaingTime).count()))) + : std::string("next scheduled GC no set")); + + // TODO: Trigger GC if max disk usage water mark is reached + + if (RemaingTime.count() > 0) + { + WaitTime = m_Config.MonitorInterval < RemaingTime ? m_Config.MonitorInterval : RemaingTime; + continue; + } + + WaitTime = m_Config.MonitorInterval; + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning); + } - DiskSpace Space; - DiskSpaceInfo(m_Config.RootDirectory, Space); - ZEN_INFO("garbage collection STARTING, disk space {}/{} (free/total)", NiceBytes(Space.Free), NiceBytes(Space.Total)); + ZEN_ASSERT(Status() == GcSchedulerStatus::kRunning); GcContext GcCtx; GcCtx.SetDeletionMode(true); GcCtx.CollectSmallObjects(m_Config.CollectSmallObjects); GcCtx.MaxCacheDuration(m_Config.MaxCacheDuration); - ZEN_DEBUG("collecting small objects {}, max cache duration {}s", - m_Config.CollectSmallObjects ? "YES"sv : "NO"sv, - m_Config.MaxCacheDuration.count()); + if (m_TriggerParams) + { + const auto TriggerParams = m_TriggerParams.value(); + m_TriggerParams.reset(); + + GcCtx.CollectSmallObjects(TriggerParams.CollectSmallObjects); + if (TriggerParams.MaxCacheDuration != std::chrono::seconds::max()) + { + GcCtx.MaxCacheDuration(TriggerParams.MaxCacheDuration); + } + } + + Stopwatch Timer; + + ZEN_INFO("garbage collection STARTING, small objects gc {}, max cache duration {}", + GcCtx.CollectSmallObjects() ? "ENABLED"sv : "DISABLED"sv, + NiceTimeSpanMs(uint64_t(std::chrono::duration_cast<std::chrono::milliseconds>(GcCtx.MaxCacheDuration()).count()))); m_CasGc.CollectGarbage(GcCtx); - m_Status = static_cast<uint32_t>(GcSchedulerStatus::kIdle); - m_NextGcTime = NextGcTime(GcClock::Now()); + m_LastGcTime = GcClock::Now(); + m_NextGcTime = NextGcTime(m_LastGcTime); + WaitTime = m_Config.MonitorInterval; { + const fs::path Path = m_Config.RootDirectory / "gc_state"; + ZEN_DEBUG("saving scheduler state to '{}'", Path); CbObjectWriter SchedulderState; - SchedulderState << "LastGcTime"sv << static_cast<int64_t>(GcClock::Now().time_since_epoch().count()); - SaveCompactBinaryObject(m_Config.RootDirectory / "gc_state", SchedulderState.Save()); + SchedulderState << "LastGcTime"sv << static_cast<int64_t>(m_LastGcTime.time_since_epoch().count()); + SaveCompactBinaryObject(Path, SchedulderState.Save()); } - DiskSpaceInfo(m_Config.RootDirectory, Space); + ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - ZEN_INFO("garbage collection DONE after {}, disk space {}/{} (free/total)", - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - NiceBytes(Space.Free), - NiceBytes(Space.Total)); + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kIdle); } } @@ -389,24 +480,6 @@ GcScheduler::NextGcTime(GcClock::TimePoint CurrentTime) } } -bool -GcScheduler::ScheduleNow() -{ - if (m_Config.Enabled) - { - ZEN_DEBUG("schedule NOW"); - std::unique_lock Lock(m_GcMutex); - if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) - { - m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning); - m_GcSignal.notify_one(); - return true; - } - } - - return false; -} - ////////////////////////////////////////////////////////////////////////// } // namespace zen diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h index 2ea35b131..20aada746 100644 --- a/zenstore/include/zenstore/gc.h +++ b/zenstore/include/zenstore/gc.h @@ -9,6 +9,7 @@ #include <chrono> #include <filesystem> #include <functional> +#include <optional> #include <span> #include <thread> @@ -42,7 +43,6 @@ public: /** Garbage Collection context object */ - class GcContext { public: @@ -83,7 +83,6 @@ private: retain. */ - class GcContributor { public: @@ -119,7 +118,6 @@ private: /** GC orchestrator */ - class CasGc { public: @@ -134,13 +132,14 @@ public: void CollectGarbage(GcContext& GcCtx); - void SetCidStore(CidStore* Cids); - void OnNewCidReferences(std::span<IoHash> Hashes); - void OnCommittedCidReferences(std::span<IoHash> Hashes); - void OnDroppedCidReferences(std::span<IoHash> Hashes); + void SetCidStore(CidStore* Cids); + void OnNewCidReferences(std::span<IoHash> Hashes); + void OnCommittedCidReferences(std::span<IoHash> Hashes); + void OnDroppedCidReferences(std::span<IoHash> Hashes); + GcStorageSize TotalStorageSize() const; private: - RwLock m_Lock; + mutable RwLock m_Lock; std::vector<GcContributor*> m_GcContribs; std::vector<GcStorage*> m_GcStorage; CidStore* m_CidStore; @@ -156,12 +155,16 @@ enum class GcSchedulerStatus : uint32_t struct GcSchedulerConfig { std::filesystem::path RootDirectory; + std::chrono::seconds MonitorInterval{30}; std::chrono::seconds Interval{}; std::chrono::seconds MaxCacheDuration{86400}; bool CollectSmallObjects = false; bool Enabled = false; }; +/** + * GC scheduler + */ class GcScheduler { public: @@ -169,23 +172,32 @@ public: ~GcScheduler(); void Initialize(const GcSchedulerConfig& Config); - bool ScheduleNow(); - GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); } void Shutdown(); + GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); } + + struct TriggerParams + { + bool CollectSmallObjects = false; + std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max(); + }; + + bool Trigger(const TriggerParams& Params); private: void SchedulerThread(); GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime); spdlog::logger& Log() { return m_Log; } - spdlog::logger& m_Log; - CasGc& m_CasGc; - GcSchedulerConfig m_Config; - GcClock::TimePoint m_NextGcTime{}; - std::atomic_uint32_t m_Status{}; - std::jthread m_GcThread; - std::mutex m_GcMutex; - std::condition_variable m_GcSignal; + spdlog::logger& m_Log; + CasGc& m_CasGc; + GcSchedulerConfig m_Config; + GcClock::TimePoint m_LastGcTime{}; + GcClock::TimePoint m_NextGcTime{}; + std::atomic_uint32_t m_Status{}; + std::jthread m_GcThread; + std::mutex m_GcMutex; + std::condition_variable m_GcSignal; + std::optional<TriggerParams> m_TriggerParams; }; } // namespace zen |