aboutsummaryrefslogtreecommitdiff
path: root/zenstore
diff options
context:
space:
mode:
Diffstat (limited to 'zenstore')
-rw-r--r--zenstore/gc.cpp181
-rw-r--r--zenstore/include/zenstore/gc.h48
2 files changed, 157 insertions, 72 deletions
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index 86f215b97..8f08c8b1f 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -6,22 +6,25 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/string.h>
#include <zencore/timer.h>
#include <zenstore/CAS.h>
#include <zenstore/cidstore.h>
+#include <fmt/format.h>
#include <filesystem>
namespace zen {
using namespace std::literals;
+namespace fs = std::filesystem;
//////////////////////////////////////////////////////////////////////////
CbObject
-LoadCompactBinaryObject(const std::filesystem::path& Path)
+LoadCompactBinaryObject(const fs::path& Path)
{
FileContents Result = ReadFile(Path);
@@ -38,7 +41,7 @@ LoadCompactBinaryObject(const std::filesystem::path& Path)
}
void
-SaveCompactBinaryObject(const std::filesystem::path& Path, const CbObject& Object)
+SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
{
WriteFile(Path, Object.GetBuffer().AsIoBuffer());
}
@@ -269,6 +272,23 @@ CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes)
ZEN_UNUSED(Hashes);
}
+GcStorageSize
+CasGc::TotalStorageSize() const
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ GcStorageSize TotalSize;
+
+ for (GcStorage* Storage : m_GcStorage)
+ {
+ const auto Size = Storage->StorageSize();
+ TotalSize.DiskSize += Size.DiskSize;
+ TotalSize.MemorySize += Size.MemorySize;
+ }
+
+ return TotalSize;
+}
+
//////////////////////////////////////////////////////////////////////////
GcScheduler::GcScheduler(CasGc& CasGc) : m_Log(logging::Get("gc")), m_CasGc(CasGc)
@@ -287,21 +307,27 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config)
m_Config = Config;
+ if (m_Config.Interval.count() && m_Config.Interval < m_Config.MonitorInterval)
+ {
+ m_Config.Interval = m_Config.MonitorInterval;
+ }
+
std::filesystem::create_directories(Config.RootDirectory);
- GcClock::TimePoint LastGcTime = GcClock::Now();
+ m_LastGcTime = GcClock::Now();
if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state"))
{
- LastGcTime = GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcTime"sv].AsInt64()));
+ m_LastGcTime = GcClock::TimePoint(GcClock::Duration(SchedulerState["LastGcTime"sv].AsInt64()));
- if (LastGcTime + m_Config.Interval < GcClock::Now())
+ if (m_LastGcTime + m_Config.Interval < GcClock::Now())
{
- LastGcTime = GcClock::Now();
+ // TODO: Trigger GC?
+ m_LastGcTime = GcClock::Now();
}
}
- m_NextGcTime = NextGcTime(LastGcTime);
+ m_NextGcTime = NextGcTime(m_LastGcTime);
m_GcThread = std::jthread(&GcScheduler::SchedulerThread, this);
}
@@ -315,64 +341,129 @@ GcScheduler::Shutdown()
}
}
+bool
+GcScheduler::Trigger(const GcScheduler::TriggerParams& Params)
+{
+ if (m_Config.Enabled)
+ {
+ std::unique_lock Lock(m_GcMutex);
+ if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
+ {
+ m_TriggerParams = Params;
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
+ m_GcSignal.notify_one();
+ return true;
+ }
+ }
+
+ return false;
+}
+
void
GcScheduler::SchedulerThread()
{
+ using namespace fmt::literals;
+
+ std::chrono::seconds WaitTime = m_Config.MonitorInterval;
+
for (;;)
{
+ bool Timeout = false;
{
+ ZEN_ASSERT(WaitTime.count() >= 0);
std::unique_lock Lock(m_GcMutex);
- if (m_GcSignal.wait_until(Lock, m_NextGcTime, [this]() { return Status() != GcSchedulerStatus::kIdle; }))
- {
- if (Status() == GcSchedulerStatus::kStopped)
- {
- break;
- }
- }
- else
- {
- m_Status.store(static_cast<uint32_t>(GcSchedulerStatus::kRunning));
- }
+ Timeout = std::cv_status::timeout == m_GcSignal.wait_for(Lock, WaitTime);
}
- if (!m_Config.Enabled)
+ if (Status() == GcSchedulerStatus::kStopped)
+ {
+ break;
+ }
+
+ if (!m_Config.Enabled || (!Timeout && Status() == GcSchedulerStatus::kIdle))
{
- ZEN_INFO("disabled");
continue;
}
- Stopwatch Timer;
+ if (Timeout && Status() == GcSchedulerStatus::kIdle)
+ {
+ std::error_code Ec;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec);
+ GcStorageSize TotalSize = m_CasGc.TotalStorageSize();
+ std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now());
+
+ if (RemaingTime < std::chrono::seconds::zero())
+ {
+ RemaingTime = std::chrono::seconds::zero();
+ }
+
+ if (Ec)
+ {
+ ZEN_WARN("get disk space info FAILED, reason '{}'", Ec.message());
+ }
+
+ ZEN_INFO("{} in use, {} of total {} free disk space, {}",
+ NiceBytes(TotalSize.DiskSize),
+ NiceBytes(Space.Free),
+ NiceBytes(Space.Total),
+ m_Config.Interval.count()
+ ? "{} until next GC"_format(NiceTimeSpanMs(uint64_t(std::chrono::milliseconds(RemaingTime).count())))
+ : std::string("next scheduled GC no set"));
+
+ // TODO: Trigger GC if max disk usage water mark is reached
+
+ if (RemaingTime.count() > 0)
+ {
+ WaitTime = m_Config.MonitorInterval < RemaingTime ? m_Config.MonitorInterval : RemaingTime;
+ continue;
+ }
+
+ WaitTime = m_Config.MonitorInterval;
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
+ }
- DiskSpace Space;
- DiskSpaceInfo(m_Config.RootDirectory, Space);
- ZEN_INFO("garbage collection STARTING, disk space {}/{} (free/total)", NiceBytes(Space.Free), NiceBytes(Space.Total));
+ ZEN_ASSERT(Status() == GcSchedulerStatus::kRunning);
GcContext GcCtx;
GcCtx.SetDeletionMode(true);
GcCtx.CollectSmallObjects(m_Config.CollectSmallObjects);
GcCtx.MaxCacheDuration(m_Config.MaxCacheDuration);
- ZEN_DEBUG("collecting small objects {}, max cache duration {}s",
- m_Config.CollectSmallObjects ? "YES"sv : "NO"sv,
- m_Config.MaxCacheDuration.count());
+ if (m_TriggerParams)
+ {
+ const auto TriggerParams = m_TriggerParams.value();
+ m_TriggerParams.reset();
+
+ GcCtx.CollectSmallObjects(TriggerParams.CollectSmallObjects);
+ if (TriggerParams.MaxCacheDuration != std::chrono::seconds::max())
+ {
+ GcCtx.MaxCacheDuration(TriggerParams.MaxCacheDuration);
+ }
+ }
+
+ Stopwatch Timer;
+
+ ZEN_INFO("garbage collection STARTING, small objects gc {}, max cache duration {}",
+ GcCtx.CollectSmallObjects() ? "ENABLED"sv : "DISABLED"sv,
+ NiceTimeSpanMs(uint64_t(std::chrono::duration_cast<std::chrono::milliseconds>(GcCtx.MaxCacheDuration()).count())));
m_CasGc.CollectGarbage(GcCtx);
- m_Status = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
- m_NextGcTime = NextGcTime(GcClock::Now());
+ m_LastGcTime = GcClock::Now();
+ m_NextGcTime = NextGcTime(m_LastGcTime);
+ WaitTime = m_Config.MonitorInterval;
{
+ const fs::path Path = m_Config.RootDirectory / "gc_state";
+ ZEN_DEBUG("saving scheduler state to '{}'", Path);
CbObjectWriter SchedulderState;
- SchedulderState << "LastGcTime"sv << static_cast<int64_t>(GcClock::Now().time_since_epoch().count());
- SaveCompactBinaryObject(m_Config.RootDirectory / "gc_state", SchedulderState.Save());
+ SchedulderState << "LastGcTime"sv << static_cast<int64_t>(m_LastGcTime.time_since_epoch().count());
+ SaveCompactBinaryObject(Path, SchedulderState.Save());
}
- DiskSpaceInfo(m_Config.RootDirectory, Space);
+ ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- ZEN_INFO("garbage collection DONE after {}, disk space {}/{} (free/total)",
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- NiceBytes(Space.Free),
- NiceBytes(Space.Total));
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
}
}
@@ -389,24 +480,6 @@ GcScheduler::NextGcTime(GcClock::TimePoint CurrentTime)
}
}
-bool
-GcScheduler::ScheduleNow()
-{
- if (m_Config.Enabled)
- {
- ZEN_DEBUG("schedule NOW");
- std::unique_lock Lock(m_GcMutex);
- if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
- {
- m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
- m_GcSignal.notify_one();
- return true;
- }
- }
-
- return false;
-}
-
//////////////////////////////////////////////////////////////////////////
} // namespace zen
diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h
index 2ea35b131..20aada746 100644
--- a/zenstore/include/zenstore/gc.h
+++ b/zenstore/include/zenstore/gc.h
@@ -9,6 +9,7 @@
#include <chrono>
#include <filesystem>
#include <functional>
+#include <optional>
#include <span>
#include <thread>
@@ -42,7 +43,6 @@ public:
/** Garbage Collection context object
*/
-
class GcContext
{
public:
@@ -83,7 +83,6 @@ private:
retain.
*/
-
class GcContributor
{
public:
@@ -119,7 +118,6 @@ private:
/** GC orchestrator
*/
-
class CasGc
{
public:
@@ -134,13 +132,14 @@ public:
void CollectGarbage(GcContext& GcCtx);
- void SetCidStore(CidStore* Cids);
- void OnNewCidReferences(std::span<IoHash> Hashes);
- void OnCommittedCidReferences(std::span<IoHash> Hashes);
- void OnDroppedCidReferences(std::span<IoHash> Hashes);
+ void SetCidStore(CidStore* Cids);
+ void OnNewCidReferences(std::span<IoHash> Hashes);
+ void OnCommittedCidReferences(std::span<IoHash> Hashes);
+ void OnDroppedCidReferences(std::span<IoHash> Hashes);
+ GcStorageSize TotalStorageSize() const;
private:
- RwLock m_Lock;
+ mutable RwLock m_Lock;
std::vector<GcContributor*> m_GcContribs;
std::vector<GcStorage*> m_GcStorage;
CidStore* m_CidStore;
@@ -156,12 +155,16 @@ enum class GcSchedulerStatus : uint32_t
struct GcSchedulerConfig
{
std::filesystem::path RootDirectory;
+ std::chrono::seconds MonitorInterval{30};
std::chrono::seconds Interval{};
std::chrono::seconds MaxCacheDuration{86400};
bool CollectSmallObjects = false;
bool Enabled = false;
};
+/**
+ * GC scheduler
+ */
class GcScheduler
{
public:
@@ -169,23 +172,32 @@ public:
~GcScheduler();
void Initialize(const GcSchedulerConfig& Config);
- bool ScheduleNow();
- GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
void Shutdown();
+ GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
+
+ struct TriggerParams
+ {
+ bool CollectSmallObjects = false;
+ std::chrono::seconds MaxCacheDuration = std::chrono::seconds::max();
+ };
+
+ bool Trigger(const TriggerParams& Params);
private:
void SchedulerThread();
GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime);
spdlog::logger& Log() { return m_Log; }
- spdlog::logger& m_Log;
- CasGc& m_CasGc;
- GcSchedulerConfig m_Config;
- GcClock::TimePoint m_NextGcTime{};
- std::atomic_uint32_t m_Status{};
- std::jthread m_GcThread;
- std::mutex m_GcMutex;
- std::condition_variable m_GcSignal;
+ spdlog::logger& m_Log;
+ CasGc& m_CasGc;
+ GcSchedulerConfig m_Config;
+ GcClock::TimePoint m_LastGcTime{};
+ GcClock::TimePoint m_NextGcTime{};
+ std::atomic_uint32_t m_Status{};
+ std::jthread m_GcThread;
+ std::mutex m_GcMutex;
+ std::condition_variable m_GcSignal;
+ std::optional<TriggerParams> m_TriggerParams;
};
} // namespace zen