aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-12-12 12:04:31 +0100
committerPer Larsson <[email protected]>2021-12-12 12:04:31 +0100
commita40133fe893100631f9bb8cd68fb7c2edbab0759 (patch)
tree80cdcb8af8fad50d1004116671a655ec4495d729
parentAdded size to GcStorage. (diff)
downloadzen-a40133fe893100631f9bb8cd68fb7c2edbab0759.tar.xz
zen-a40133fe893100631f9bb8cd68fb7c2edbab0759.zip
Added support for triggering GC with different params and refactored GC scheduler.
-rw-r--r--zenserver/admin/admin.cpp51
-rw-r--r--zenserver/admin/admin.h14
-rw-r--r--zenserver/zenserver.cpp17
-rw-r--r--zenstore/gc.cpp181
-rw-r--r--zenstore/include/zenstore/gc.h48
5 files changed, 189 insertions, 122 deletions
diff --git a/zenserver/admin/admin.cpp b/zenserver/admin/admin.cpp
index 0d7a57e36..8a3ab460c 100644
--- a/zenserver/admin/admin.cpp
+++ b/zenserver/admin/admin.cpp
@@ -6,10 +6,13 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/string.h>
+#include <zenstore/gc.h>
+
+#include <chrono>
namespace zen {
-HttpAdminService::HttpAdminService()
+HttpAdminService::HttpAdminService(GcScheduler& Scheduler) : m_GcScheduler(Scheduler)
{
using namespace std::literals;
@@ -25,42 +28,42 @@ HttpAdminService::HttpAdminService()
m_Router.RegisterRoute(
"gc",
[this](HttpRouterRequest& Req) {
- CbObject Response;
- if (m_GcHandler.Status)
- {
- Response = m_GcHandler.Status();
- }
- else
- {
- CbObjectWriter Writer;
- Writer << "Status"sv
- << "Ok"sv;
- Response = Writer.Save();
- }
- Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response);
+ const GcSchedulerStatus Status = m_GcScheduler.Status();
+
+ CbObjectWriter Response;
+ Response << "Status"sv << (GcSchedulerStatus::kIdle == Status ? "Idle"sv : "Running"sv);
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save());
},
HttpVerb::kGet);
m_Router.RegisterRoute(
"gc",
[this](HttpRouterRequest& Req) {
- CbObject Response;
- if (m_GcHandler.Trigger)
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+ GcScheduler::TriggerParams GcParams;
+
+ if (auto Param = Params.GetValue("smallobjects"); Param == "true"sv)
{
- Response = m_GcHandler.Trigger();
+ GcParams.CollectSmallObjects = true;
}
- else
+
+ if (auto Param = Params.GetValue("maxcacheduration"); Param.empty() == false)
{
- CbObjectWriter Writer;
- Writer << "Status"sv
- << "Ok"sv;
- Response = Writer.Save();
+ if (auto Value = ParseInt<uint64_t>(Param))
+ {
+ GcParams.MaxCacheDuration = std::chrono::seconds(Value.value());
+ }
}
- Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response);
+
+ const bool Started = m_GcScheduler.Trigger(GcParams);
+
+ CbObjectWriter Response;
+ Response << "Status"sv << (Started ? "Started"sv : "Running"sv);
+ HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
},
HttpVerb::kPost);
- // RPC endpoint
m_Router.RegisterRoute(
"",
[this](HttpRouterRequest& Req) {
diff --git a/zenserver/admin/admin.h b/zenserver/admin/admin.h
index f8fcab7de..9463ffbb3 100644
--- a/zenserver/admin/admin.h
+++ b/zenserver/admin/admin.h
@@ -7,26 +7,20 @@
namespace zen {
+class GcScheduler;
+
class HttpAdminService : public zen::HttpService
{
public:
- HttpAdminService();
+ HttpAdminService(GcScheduler& Scheduler);
~HttpAdminService();
virtual const char* BaseUri() const override;
virtual void HandleRequest(zen::HttpServerRequest& Request) override;
- struct GcHandler
- {
- std::function<CbObject()> Trigger;
- std::function<CbObject()> Status;
- };
-
- void RegisterGcHandler(GcHandler&& Handler) { m_GcHandler = std::forward<GcHandler>(Handler); }
-
private:
HttpRequestRouter m_Router;
- GcHandler m_GcHandler;
+ GcScheduler& m_GcScheduler;
};
} // namespace zen
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index c4cc22140..e14f93f5b 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -195,21 +195,6 @@ public:
.HttpServerClass = std::string(ServerOptions.HttpServerClass),
.BuildVersion = std::string(BUILD_VERSION)});
- m_AdminService.RegisterGcHandler({.Trigger =
- [this]() {
- CbObjectWriter Writer;
- const bool Started = m_GcScheduler.ScheduleNow();
- Writer << "Status"sv << (Started ? "Started"sv : "Running"sv);
- return Writer.Save();
- },
- .Status =
- [this]() {
- CbObjectWriter Writer;
- const GcSchedulerStatus Status = m_GcScheduler.Status();
- Writer << "Status"sv << (GcSchedulerStatus::kIdle == Status ? "Idle"sv : "Running"sv);
- return Writer.Save();
- }});
-
// Ok so now we're configured, let's kick things off
m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass);
@@ -540,7 +525,7 @@ private:
zen::RefPtr<zen::ProjectStore> m_ProjectStore;
std::unique_ptr<zen::HttpProjectService> m_HttpProjectService;
std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService;
- zen::HttpAdminService m_AdminService;
+ zen::HttpAdminService m_AdminService{m_GcScheduler};
zen::HttpHealthService m_HealthService;
zen::Mesh m_ZenMesh{m_IoContext};
std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index 86f215b97..8f08c8b1f 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -6,22 +6,25 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/string.h>
#include <zencore/timer.h>
#include <zenstore/CAS.h>
#include <zenstore/cidstore.h>
+#include <fmt/format.h>
#include <filesystem>
namespace zen {
using namespace std::literals;
+namespace fs = std::filesystem;
//////////////////////////////////////////////////////////////////////////
CbObject
-LoadCompactBinaryObject(const std::filesystem::path& Path)
+LoadCompactBinaryObject(const fs::path& Path)
{
FileContents Result = ReadFile(Path);
@@ -38,7 +41,7 @@ LoadCompactBinaryObject(const std::filesystem::path& Path)
}
void
-SaveCompactBinaryObject(const std::filesystem::path& Path, const CbObject& Object)
+SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
{
WriteFile(Path, Object.GetBuffer().AsIoBuffer());
}
@@ -269,6 +272,23 @@ CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes)
ZEN_UNUSED(Hashes);
}
+GcStorageSize
+CasGc::TotalStorageSize() const
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ GcStorageSize TotalSize;
+
+ for (GcStorage* Storage : m_GcStorage)
+ {
+ const auto Size = Storage->StorageSize();
+ TotalSize.DiskSize += Size.DiskSize;
+ TotalSize.MemorySize += Size.MemorySize;
+ }
+
+ return TotalSize;
+}
+
//////////////////////////////////////////////////////////////////////////
GcScheduler::GcScheduler(CasGc& CasGc) : m_Log(logging::Get("gc")), m_CasGc(CasGc)
@@ -287,21 +307,27 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config)
m_Config = Config;
+ if (m_Config.Interval.count() && m_Config.Interval < m_Config.MonitorInterval)
+ {
+ m_Config.Interval = m_Config.MonitorInterval;
+ }
+
std::filesystem::create_directories(Config.RootDirectory);
- GcClock::TimePoint LastGcTime = GcClock::Now();
+ m_LastGcTime = GcClock::Now();
if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state"))
{
- LastGcTime = GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcTime"sv].AsInt64()));
+ m_LastGcTime = GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcTime"sv].AsInt64()));
- if (LastGcTime + m_Config.Interval < GcClock::Now())
+ if (m_LastGcTime + m_Config.Interval < GcClock::Now())
{
- LastGcTime = GcClock::Now();
+ // TODO: Trigger GC?
+ m_LastGcTime = GcClock::Now();
}
}
- m_NextGcTime = NextGcTime(LastGcTime);
+ m_NextGcTime = NextGcTime(m_LastGcTime);
m_GcThread = std::jthread(&GcScheduler::SchedulerThread, this);
}
@@ -315,64 +341,129 @@ GcScheduler::Shutdown()
}
}
+bool
+GcScheduler::Trigger(const GcScheduler::TriggerParams& Params)
+{
+ if (m_Config.Enabled)
+ {
+ std::unique_lock Lock(m_GcMutex);
+ if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
+ {
+ m_TriggerParams = Params;
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
+ m_GcSignal.notify_one();
+ return true;
+ }
+ }
+
+ return false;
+}
+
void
GcScheduler::SchedulerThread()
{
+ using namespace fmt::literals;
+
+ std::chrono::seconds WaitTime = m_Config.MonitorInterval;
+
for (;;)
{
+ bool Timeout = false;
{
+ ZEN_ASSERT(WaitTime.count() >= 0);
std::unique_lock Lock(m_GcMutex);
- if (m_GcSignal.wait_until(Lock, m_NextGcTime, [this]() { return Status() != GcSchedulerStatus::kIdle; }))
- {
- if (Status() == GcSchedulerStatus::kStopped)
- {
- break;
- }
- }
- else
- {
- m_Status.store(static_cast<uint32_t>(GcSchedulerStatus::kRunning));
- }
+ Timeout = std::cv_status::timeout == m_GcSignal.wait_for(Lock, WaitTime);
}
- if (!m_Config.Enabled)
+ if (Status() == GcSchedulerStatus::kStopped)
+ {
+ break;
+ }
+
+ if (!m_Config.Enabled || (!Timeout && Status() == GcSchedulerStatus::kIdle))
{
- ZEN_INFO("disabled");
continue;
}
- Stopwatch Timer;
+ if (Timeout && Status() == GcSchedulerStatus::kIdle)
+ {
+ std::error_code Ec;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec);
+ GcStorageSize TotalSize = m_CasGc.TotalStorageSize();
+ std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now());
+
+ if (RemaingTime < std::chrono::seconds::zero())
+ {
+ RemaingTime = std::chrono::seconds::zero();
+ }
+
+ if (Ec)
+ {
+ ZEN_WARN("get disk space info FAILED, reason '{}'", Ec.message());
+ }
+
+ ZEN_INFO("{} in use, {} of total {} free disk space, {}",
+ NiceBytes(TotalSize.DiskSize),
+ NiceBytes(Space.Free),
+ NiceBytes(Space.Total),
+ m_Config.Interval.count()
+ ? "{} until next GC"_format(NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(RemaingTime).count())))
+ : std::string("next scheduled GC no set"));
+
+ // TODO: Trigger GC if max disk usage water mark is reached
+
+ if (RemaingTime.count() > 0)
+ {
+ WaitTime = m_Config.MonitorInterval < RemaingTime ? m_Config.MonitorInterval : RemaingTime;
+ continue;
+ }
+
+ WaitTime = m_Config.MonitorInterval;
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
+ }
- DiskSpace Space;
- DiskSpaceInfo(m_Config.RootDirectory, Space);
- ZEN_INFO("garbage collection STARTING, disk space {}/{} (free/total)", NiceBytes(Space.Free), NiceBytes(Space.Total));
+ ZEN_ASSERT(Status() == GcSchedulerStatus::kRunning);
GcContext GcCtx;
GcCtx.SetDeletionMode(true);
GcCtx.CollectSmallObjects(m_Config.CollectSmallObjects);
GcCtx.MaxCacheDuration(m_Config.MaxCacheDuration);
- ZEN_DEBUG("collecting small objects {}, max cache duration {}s",
- m_Config.CollectSmallObjects ? "YES"sv : "NO"sv,
- m_Config.MaxCacheDuration.count());
+ if (m_TriggerParams)
+ {
+ const auto TriggerParams = m_TriggerParams.value();
+ m_TriggerParams.reset();
+
+ GcCtx.CollectSmallObjects(TriggerParams.CollectSmallObjects);
+ if (TriggerParams.MaxCacheDuration != std::chrono::seconds::max())
+ {
+ GcCtx.MaxCacheDuration(TriggerParams.MaxCacheDuration);
+ }
+ }
+
+ Stopwatch Timer;
+
+ ZEN_INFO("garbage collection STARTING, small objects gc {}, max cache duration {}",
+ GcCtx.CollectSmallObjects() ? "ENABLED"sv : "DISABLED"sv,
+ NiceTimeSpanMs(uint64_t(std::chrono::duration_cast<std::chrono::milliseconds>(GcCtx.MaxCacheDuration()).count())));
m_CasGc.CollectGarbage(GcCtx);
- m_Status = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
- m_NextGcTime = NextGcTime(GcClock::Now());
+ m_LastGcTime = GcClock::Now();
+ m_NextGcTime = NextGcTime(m_LastGcTime);
+ WaitTime = m_Config.MonitorInterval;
{
+ const fs::path Path = m_Config.RootDirectory / "gc_state";
+ ZEN_DEBUG("saving scheduler state to '{}'", Path);
CbObjectWriter SchedulderState;
- SchedulderState << "LastGcTime"sv << static_cast<int64_t>(GcClock::Now().time_since_epoch().count());
- SaveCompactBinaryObject(m_Config.RootDirectory / "gc_state", SchedulderState.Save());
+ SchedulderState << "LastGcTime"sv << static_cast<int64_t>(m_LastGcTime.time_since_epoch().count());
+ SaveCompactBinaryObject(Path, SchedulderState.Save());
}
- DiskSpaceInfo(m_Config.RootDirectory, Space);
+ ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- ZEN_INFO("garbage collection DONE after {}, disk space {}/{} (free/total)",
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- NiceBytes(Space.Free),
- NiceBytes(Space.Total));
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
}
}
@@ -389,24 +480,6 @@ GcScheduler::NextGcTime(GcClock::TimePoint CurrentTime)
}
}
-bool
-GcScheduler::ScheduleNow()
-{
- if (m_Config.Enabled)
- {
- ZEN_DEBUG("schedule NOW");
- std::unique_lock Lock(m_GcMutex);
- if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
- {
- m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
- m_GcSignal.notify_one();
- return true;
- }
- }
-
- return false;
-}
-
//////////////////////////////////////////////////////////////////////////
} // namespace zen
diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h
index 2ea35b131..20aada746 100644
--- a/zenstore/include/zenstore/gc.h
+++ b/zenstore/include/zenstore/gc.h
@@ -9,6 +9,7 @@
#include <chrono>
#include <filesystem>
#include <functional>
+#include <optional>
#include <span>
#include <thread>
@@ -42,7 +43,6 @@ public:
/** Garbage Collection context object
*/
-
class GcContext
{
public:
@@ -83,7 +83,6 @@ private:
retain.
*/
-
class GcContributor
{
public:
@@ -119,7 +118,6 @@ private:
/** GC orchestrator
*/
-
class CasGc
{
public:
@@ -134,13 +132,14 @@ public:
void CollectGarbage(GcContext& GcCtx);
- void SetCidStore(CidStore* Cids);
- void OnNewCidReferences(std::span<IoHash> Hashes);
- void OnCommittedCidReferences(std::span<IoHash> Hashes);
- void OnDroppedCidReferences(std::span<IoHash> Hashes);
+ void SetCidStore(CidStore* Cids);
+ void OnNewCidReferences(std::span<IoHash> Hashes);
+ void OnCommittedCidReferences(std::span<IoHash> Hashes);
+ void OnDroppedCidReferences(std::span<IoHash> Hashes);
+ GcStorageSize TotalStorageSize() const;
private:
- RwLock m_Lock;
+ mutable RwLock m_Lock;
std::vector<GcContributor*> m_GcContribs;
std::vector<GcStorage*> m_GcStorage;
CidStore* m_CidStore;
@@ -156,12 +155,16 @@ enum class GcSchedulerStatus : uint32_t
struct GcSchedulerConfig
{
std::filesystem::path RootDirectory;
+ std::chrono::seconds MonitorInterval{30};
std::chrono::seconds Interval{};
std::chrono::seconds MaxCacheDuration{86400};
bool CollectSmallObjects = false;
bool Enabled = false;
};
+/**
+ * GC scheduler
+ */
class GcScheduler
{
public:
@@ -169,23 +172,32 @@ public:
~GcScheduler();
void Initialize(const GcSchedulerConfig& Config);
- bool ScheduleNow();
- GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
void Shutdown();
+ GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
+
+ struct TriggerParams
+ {
+ bool CollectSmallObjects = false;
+ std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max();
+ };
+
+ bool Trigger(const TriggerParams& Params);
private:
void SchedulerThread();
GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime);
spdlog::logger& Log() { return m_Log; }
- spdlog::logger& m_Log;
- CasGc& m_CasGc;
- GcSchedulerConfig m_Config;
- GcClock::TimePoint m_NextGcTime{};
- std::atomic_uint32_t m_Status{};
- std::jthread m_GcThread;
- std::mutex m_GcMutex;
- std::condition_variable m_GcSignal;
+ spdlog::logger& m_Log;
+ CasGc& m_CasGc;
+ GcSchedulerConfig m_Config;
+ GcClock::TimePoint m_LastGcTime{};
+ GcClock::TimePoint m_NextGcTime{};
+ std::atomic_uint32_t m_Status{};
+ std::jthread m_GcThread;
+ std::mutex m_GcMutex;
+ std::condition_variable m_GcSignal;
+ std::optional<TriggerParams> m_TriggerParams;
};
} // namespace zen