aboutsummaryrefslogtreecommitdiff
path: root/zenstore
diff options
context:
space:
mode:
Diffstat (limited to 'zenstore')
-rw-r--r--zenstore/CAS.cpp6
-rw-r--r--zenstore/gc.cpp155
-rw-r--r--zenstore/include/zenstore/CAS.h2
-rw-r--r--zenstore/include/zenstore/gc.h50
4 files changed, 180 insertions, 33 deletions
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp
index a0d47c213..40846e368 100644
--- a/zenstore/CAS.cpp
+++ b/zenstore/CAS.cpp
@@ -333,10 +333,10 @@ CasImpl::TotalSize() const
//////////////////////////////////////////////////////////////////////////
-CasStore*
+std::unique_ptr<CasStore>
CreateCasStore(CasGc& Gc)
{
- return new CasImpl(Gc);
+ return std::make_unique<CasImpl>(Gc);
}
//////////////////////////////////////////////////////////////////////////
@@ -355,7 +355,7 @@ TEST_CASE("CasStore")
CasGc Gc;
- std::unique_ptr<CasStore> Store{CreateCasStore(Gc)};
+ std::unique_ptr<CasStore> Store = CreateCasStore(Gc);
Store->Initialize(config);
ScrubContext Ctx;
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index 1b987ca08..7ab2045b5 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -2,14 +2,47 @@
#include <zenstore/gc.h>
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/filesystem.h>
#include <zencore/logging.h>
#include <zencore/string.h>
#include <zencore/timer.h>
#include <zenstore/CAS.h>
#include <zenstore/cidstore.h>
+#include <filesystem>
+
namespace zen {
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+CbObject
+LoadCompactBinaryObject(const std::filesystem::path& Path)
+{
+ FileContents Result = ReadFile(Path);
+
+ if (!Result.ErrorCode)
+ {
+ IoBuffer Buffer = Result.Flatten();
+ if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
+ {
+ return LoadCompactBinaryObject(Buffer);
+ }
+ }
+
+ return CbObject();
+}
+
+void
+SaveCompactBinaryObject(const std::filesystem::path& Path, const CbObject& Object)
+{
+ WriteFile(Path, Object.GetBuffer().AsIoBuffer());
+}
+
//////////////////////////////////////////////////////////////////////////
struct GcContext::GcState
@@ -145,16 +178,12 @@ CasGc::RemoveGcStorage(GcStorage* Storage)
}
void
-CasGc::CollectGarbage()
+CasGc::CollectGarbage(GcContext& GcCtx)
{
RwLock::SharedLockScope _(m_Lock);
// First gather reference set
- GcContext GcCtx;
- GcCtx.SetDeletionMode(true);
- GcCtx.SetContainerGcEnabled(false);
-
for (GcContributor* Contributor : m_GcContribs)
{
Contributor->GatherReferences(GcCtx);
@@ -219,28 +248,116 @@ CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes)
ZEN_UNUSED(Hashes);
}
-bool
-Gc::Trigger()
+//////////////////////////////////////////////////////////////////////////
+
+GcScheduler::GcScheduler(CasGc& CasGc) : m_Log(logging::Get("gc")), m_CasGc(CasGc)
+{
+}
+
+GcScheduler::~GcScheduler()
+{
+ Shutdown();
+}
+
+void
+GcScheduler::Initialize(const GcSchedulerConfig& Config)
{
- uint32_t Expected = uint32_t(GcStatus::kIdle);
- if (!m_Status.compare_exchange_strong(Expected, static_cast<uint32_t>(GcStatus::kRunning)))
+ using namespace std::chrono;
+
+ m_Config = Config;
+
+ std::filesystem::create_directories(Config.RootDirectory);
+
+ Timepoint CurrentTime = Clock::now();
+
+ if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state"))
{
- return false;
+ const int64_t LastGcTime = SchedulerState["LastGcTime"sv].AsInt64();
+ CurrentTime = Timepoint(Clock::duration(LastGcTime));
+
+ if (CurrentTime + m_Config.Interval < Clock::now())
+ {
+ CurrentTime = Clock::now();
+ }
}
- ZEN_ASSERT(GcStatus::kRunning == Status());
+ m_NextGcTime = CurrentTime + m_Config.Interval;
+ m_GcThread = std::jthread(&GcScheduler::SchedulerThread, this);
+}
- m_GcThread = std::jthread([this]() {
- Stopwatch Timer;
- ZEN_INFO("garbage collection STARTING");
+void
+GcScheduler::Shutdown()
+{
+ if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status)
+ {
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped);
+ m_GcSignal.notify_one();
+ }
+}
+
+void
+GcScheduler::SchedulerThread()
+{
+ for (;;)
+ {
+ {
+ std::unique_lock Lock(m_GcMutex);
+ if (m_GcSignal.wait_until(Lock, m_NextGcTime, [this]() { return Status() != GcSchedulerStatus::kIdle; }))
+ {
+ if (Status() == GcSchedulerStatus::kStopped)
+ {
+ break;
+ }
+ }
+ else
+ {
+ m_Status.store(static_cast<uint32_t>(GcSchedulerStatus::kRunning));
+ }
+ }
+
+ if (m_Config.Enabled)
+ {
+ Stopwatch Timer;
+ ZEN_INFO("garbage collection STARTING");
- m_CasGc.CollectGarbage();
- m_Status = static_cast<uint32_t>(GcStatus::kIdle);
+ GcContext GcCtx;
+ GcCtx.SetDeletionMode(true);
+ GcCtx.SetContainerGcEnabled(true);
- ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
+ m_CasGc.CollectGarbage(GcCtx);
- return true;
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
+ m_NextGcTime = Clock::now() + m_Config.Interval;
+
+ {
+ CbObjectWriter SchedulderState;
+ SchedulderState << "LastGcTime"sv << static_cast<int64_t>(Clock::now().time_since_epoch().count());
+ SaveCompactBinaryObject(m_Config.RootDirectory / "gc_state", SchedulderState.Save());
+ }
+
+ ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ }
}
+bool
+GcScheduler::ScheduleNow()
+{
+ if (m_Config.Enabled)
+ {
+ ZEN_DEBUG("schedule NOW");
+ std::unique_lock Lock(m_GcMutex);
+ if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
+ {
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
+ m_GcSignal.notify_one();
+ return true;
+ }
+ }
+
+ return false;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
} // namespace zen
diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h
index bba1bb721..72b750d6c 100644
--- a/zenstore/include/zenstore/CAS.h
+++ b/zenstore/include/zenstore/CAS.h
@@ -129,7 +129,7 @@ protected:
uint64_t m_LastScrubTime = 0;
};
-ZENCORE_API CasStore* CreateCasStore(CasGc& Gc);
+ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(CasGc& Gc);
void CAS_forcelink();
diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h
index 6b00f1ffb..94f3c32ac 100644
--- a/zenstore/include/zenstore/gc.h
+++ b/zenstore/include/zenstore/gc.h
@@ -6,11 +6,18 @@
#include <zencore/thread.h>
#include <atomic>
+#include <chrono>
+#include <filesystem>
#include <functional>
#include <span>
+#include <thread>
#define ZEN_USE_REF_TRACKING 0 // This is not currently functional
+namespace spdlog {
+class logger;
+}
+
namespace zen {
class CasStore;
@@ -97,7 +104,7 @@ public:
void AddGcStorage(GcStorage* Contributor);
void RemoveGcStorage(GcStorage* Contributor);
- void CollectGarbage();
+ void CollectGarbage(GcContext& GcCtx);
void SetCidStore(CidStore* Cids);
void OnNewCidReferences(std::span<IoHash> Hashes);
@@ -111,23 +118,46 @@ private:
CidStore* m_CidStore;
};
-enum class GcStatus : uint32_t
+enum class GcSchedulerStatus : uint32_t
{
kIdle,
- kRunning
+ kRunning,
+ kStopped
+};
+
+struct GcSchedulerConfig
+{
+ std::filesystem::path RootDirectory;
+ std::chrono::seconds Interval{3600};
+ bool Enabled = true;
};
-class Gc
+class GcScheduler
{
+ using Clock = std::chrono::system_clock;
+ using Timepoint = std::chrono::time_point<Clock>;
+
public:
- bool Trigger();
- CasGc& Cas() { return m_CasGc; }
- GcStatus Status() const { return static_cast<GcStatus>(m_Status.load()); }
+ GcScheduler(CasGc& CasGc);
+ ~GcScheduler();
+
+ void Initialize(const GcSchedulerConfig& Config);
+ bool ScheduleNow();
+ GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
+ void Shutdown();
private:
- CasGc m_CasGc;
- std::jthread m_GcThread;
- std::atomic_uint32_t m_Status;
+ void SchedulerThread();
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
+ CasGc& m_CasGc;
+ GcSchedulerConfig m_Config;
+ Timepoint m_NextGcTime{};
+ std::atomic_uint32_t m_Status{};
+ std::jthread m_GcThread;
+ std::mutex m_GcMutex;
+ std::condition_variable m_GcSignal;
};
} // namespace zen