diff options
Diffstat (limited to 'zenstore')
| -rw-r--r-- | zenstore/CAS.cpp | 6 | ||||
| -rw-r--r-- | zenstore/gc.cpp | 155 | ||||
| -rw-r--r-- | zenstore/include/zenstore/CAS.h | 2 | ||||
| -rw-r--r-- | zenstore/include/zenstore/gc.h | 50 |
4 files changed, 180 insertions, 33 deletions
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index a0d47c213..40846e368 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -333,10 +333,10 @@ CasImpl::TotalSize() const ////////////////////////////////////////////////////////////////////////// -CasStore* +std::unique_ptr<CasStore> CreateCasStore(CasGc& Gc) { - return new CasImpl(Gc); + return std::make_unique<CasImpl>(Gc); } ////////////////////////////////////////////////////////////////////////// @@ -355,7 +355,7 @@ TEST_CASE("CasStore") CasGc Gc; - std::unique_ptr<CasStore> Store{CreateCasStore(Gc)}; + std::unique_ptr<CasStore> Store = CreateCasStore(Gc); Store->Initialize(config); ScrubContext Ctx; diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index 1b987ca08..7ab2045b5 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -2,14 +2,47 @@ #include <zenstore/gc.h> +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/filesystem.h> #include <zencore/logging.h> #include <zencore/string.h> #include <zencore/timer.h> #include <zenstore/CAS.h> #include <zenstore/cidstore.h> +#include <filesystem> + namespace zen { +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +CbObject +LoadCompactBinaryObject(const std::filesystem::path& Path) +{ + FileContents Result = ReadFile(Path); + + if (!Result.ErrorCode) + { + IoBuffer Buffer = Result.Flatten(); + if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + { + return LoadCompactBinaryObject(Buffer); + } + } + + return CbObject(); +} + +void +SaveCompactBinaryObject(const std::filesystem::path& Path, const CbObject& Object) +{ + WriteFile(Path, Object.GetBuffer().AsIoBuffer()); +} + ////////////////////////////////////////////////////////////////////////// struct GcContext::GcState @@ -145,16 +178,12 @@ CasGc::RemoveGcStorage(GcStorage* Storage) } void -CasGc::CollectGarbage() +CasGc::CollectGarbage(GcContext& GcCtx) { RwLock::SharedLockScope _(m_Lock); // First gather reference set - GcContext GcCtx; - GcCtx.SetDeletionMode(true); - GcCtx.SetContainerGcEnabled(false); - for (GcContributor* Contributor : m_GcContribs) { Contributor->GatherReferences(GcCtx); @@ -219,28 +248,116 @@ CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes) ZEN_UNUSED(Hashes); } -bool -Gc::Trigger() +////////////////////////////////////////////////////////////////////////// + +GcScheduler::GcScheduler(CasGc& CasGc) : m_Log(logging::Get("gc")), m_CasGc(CasGc) +{ +} + +GcScheduler::~GcScheduler() +{ + Shutdown(); +} + +void +GcScheduler::Initialize(const GcSchedulerConfig& Config) { - uint32_t Expected = uint32_t(GcStatus::kIdle); - if (!m_Status.compare_exchange_strong(Expected, static_cast<uint32_t>(GcStatus::kRunning))) + using namespace std::chrono; + + m_Config = Config; + + std::filesystem::create_directories(Config.RootDirectory); + + Timepoint CurrentTime = Clock::now(); + + if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state")) { - return false; + const int64_t LastGcTime = SchedulerState["LastGcTime"sv].AsInt64(); + CurrentTime = Timepoint(Clock::duration(LastGcTime)); + + if (CurrentTime + m_Config.Interval < Clock::now()) + { + CurrentTime = Clock::now(); + } } - ZEN_ASSERT(GcStatus::kRunning == Status()); + m_NextGcTime = CurrentTime + m_Config.Interval; + m_GcThread = std::jthread(&GcScheduler::SchedulerThread, this); +} - m_GcThread = std::jthread([this]() { - Stopwatch Timer; - ZEN_INFO("garbage collection STARTING"); +void +GcScheduler::Shutdown() +{ + if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status) + { + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped); + m_GcSignal.notify_one(); + } +} + +void +GcScheduler::SchedulerThread() +{ + for (;;) + { + { + std::unique_lock Lock(m_GcMutex); + if (m_GcSignal.wait_until(Lock, m_NextGcTime, [this]() { return Status() != GcSchedulerStatus::kIdle; })) + { + if (Status() == GcSchedulerStatus::kStopped) + { + break; + } + } + else + { + m_Status.store(static_cast<uint32_t>(GcSchedulerStatus::kRunning)); + } + } + + if (m_Config.Enabled) + { + Stopwatch Timer; + ZEN_INFO("garbage collection STARTING"); - m_CasGc.CollectGarbage(); - m_Status = static_cast<uint32_t>(GcStatus::kIdle); + GcContext GcCtx; + GcCtx.SetDeletionMode(true); + GcCtx.SetContainerGcEnabled(true); - ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); + m_CasGc.CollectGarbage(GcCtx); - return true; + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kIdle); + m_NextGcTime = Clock::now() + m_Config.Interval; + + { + CbObjectWriter SchedulderState; + SchedulderState << "LastGcTime"sv << static_cast<int64_t>(Clock::now().time_since_epoch().count()); + SaveCompactBinaryObject(m_Config.RootDirectory / "gc_state", SchedulderState.Save()); + } + + ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + } } +bool +GcScheduler::ScheduleNow() +{ + if (m_Config.Enabled) + { + ZEN_DEBUG("schedule NOW"); + std::unique_lock Lock(m_GcMutex); + if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) + { + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning); + m_GcSignal.notify_one(); + return true; + } + } + + return false; +} + +////////////////////////////////////////////////////////////////////////// + } // namespace zen diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index bba1bb721..72b750d6c 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -129,7 +129,7 @@ protected: uint64_t m_LastScrubTime = 0; }; -ZENCORE_API CasStore* CreateCasStore(CasGc& Gc); +ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(CasGc& Gc); void CAS_forcelink(); diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h index 6b00f1ffb..94f3c32ac 100644 --- a/zenstore/include/zenstore/gc.h +++ b/zenstore/include/zenstore/gc.h @@ -6,11 +6,18 @@ #include <zencore/thread.h> #include <atomic> +#include <chrono> +#include <filesystem> #include <functional> #include <span> +#include <thread> #define ZEN_USE_REF_TRACKING 0 // This is not currently functional +namespace spdlog { +class logger; +} + namespace zen { class CasStore; @@ -97,7 +104,7 @@ public: void AddGcStorage(GcStorage* Contributor); void RemoveGcStorage(GcStorage* Contributor); - void CollectGarbage(); + void CollectGarbage(GcContext& GcCtx); void SetCidStore(CidStore* Cids); void OnNewCidReferences(std::span<IoHash> Hashes); @@ -111,23 +118,46 @@ private: CidStore* m_CidStore; }; -enum class GcStatus : uint32_t +enum class GcSchedulerStatus : uint32_t { kIdle, - kRunning + kRunning, + kStopped +}; + +struct GcSchedulerConfig +{ + std::filesystem::path RootDirectory; + std::chrono::seconds Interval{3600}; + bool Enabled = true; }; -class Gc +class GcScheduler { + using Clock = std::chrono::system_clock; + using Timepoint = std::chrono::time_point<Clock>; + public: - bool Trigger(); - CasGc& Cas() { return m_CasGc; } - GcStatus Status() const { return static_cast<GcStatus>(m_Status.load()); } + GcScheduler(CasGc& CasGc); + ~GcScheduler(); + + void Initialize(const GcSchedulerConfig& Config); + bool ScheduleNow(); + GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); } + void Shutdown(); private: - CasGc m_CasGc; - std::jthread m_GcThread; - std::atomic_uint32_t m_Status; + void SchedulerThread(); + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; + CasGc& m_CasGc; + GcSchedulerConfig m_Config; + Timepoint m_NextGcTime{}; + std::atomic_uint32_t m_Status{}; + std::jthread m_GcThread; + std::mutex m_GcMutex; + std::condition_variable m_GcSignal; }; } // namespace zen |