aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-12-05 16:03:27 +0100
committerPer Larsson <[email protected]>2021-12-05 16:03:27 +0100
commit9eb0876ab1f35317eb04dd8a74f0394e853f4f56 (patch)
tree1dfd11024baea97b01c69b40153086511987f361
parentMerge branch 'gc' of https://github.com/EpicGames/zen into gc (diff)
downloadzen-9eb0876ab1f35317eb04dd8a74f0394e853f4f56.tar.xz
zen-9eb0876ab1f35317eb04dd8a74f0394e853f4f56.zip
Added simple GC interval scheduling.
-rw-r--r--zen/chunk/chunk.cpp4
-rw-r--r--zenserver/config.cpp13
-rw-r--r--zenserver/config.h25
-rw-r--r--zenserver/zenserver.cpp27
-rw-r--r--zenstore/CAS.cpp6
-rw-r--r--zenstore/gc.cpp155
-rw-r--r--zenstore/include/zenstore/CAS.h2
-rw-r--r--zenstore/include/zenstore/gc.h50
8 files changed, 238 insertions, 44 deletions
diff --git a/zen/chunk/chunk.cpp b/zen/chunk/chunk.cpp
index 043832dd3..9697969c0 100644
--- a/zen/chunk/chunk.cpp
+++ b/zen/chunk/chunk.cpp
@@ -950,7 +950,7 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
zen::CasStoreConfiguration Config;
Config.RootDirectory = m_RootDirectory;
- CasStore.reset(zen::CreateCasStore(Gc));
+ CasStore = zen::CreateCasStore(Gc);
CasStore->Initialize(Config);
}
@@ -1162,4 +1162,4 @@ TEST_CASE("chunking")
SUBCASE("mod method") { test(/* UseThreshold */ false, /* Random */ Random, 2048, 1 * 1024 * 1024); }
}
-#endif \ No newline at end of file
+#endif
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 1e847ce3d..49240d176 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -270,6 +270,19 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
cxxopts::value<int32_t>(ServerOptions.UpstreamCacheConfig.TimeoutMilliseconds)->default_value("0"),
"");
+ options.add_option("gc",
+ "",
+ "gc-enabled",
+ "Whether garbage collection is enabled or not.",
+ cxxopts::value<bool>(ServerOptions.GcConfig.Enabled)->default_value("true"),
+ "");
+
+ options.add_option("gc",
+ "",
+ "gc-interval-seconds",
+ "Garbage collection interval. Default is 1h.",
+ cxxopts::value<int32_t>(ServerOptions.GcConfig.IntervalSeconds)->default_value("3600"),
+ "");
try
{
auto result = options.parse(argc, argv);
diff --git a/zenserver/config.h b/zenserver/config.h
index 19fba71a3..97f339a0e 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -55,9 +55,34 @@ struct ZenUpstreamCacheConfig
bool StatsEnabled = false;
};
+struct ZenCacheEvictionPolicy
+{
+ bool Enabled = true;
+ uint64_t DiskSizeLimit = ~uint64_t(0);
+ uint64_t MemorySizeLimit = 1024 * 1024 * 1024;
+};
+
+struct ZenCasEvictionPolicy
+{
+ bool Enabled = true;
+ bool ContainerGcEnabled = true;
+ uint64_t LargeStrategySizeLimit = ~uint64_t(0);
+ uint64_t SmallStrategySizeLimit = ~uint64_t(0);
+ uint64_t TinyStrategySizeLimit = ~uint64_t(0);
+};
+
+struct ZenGcConfig
+{
+ ZenCasEvictionPolicy Cas;
+ ZenCacheEvictionPolicy Cache;
+ int32_t IntervalSeconds = 60 * 60;
+ bool Enabled = true;
+};
+
struct ZenServerOptions
{
ZenUpstreamCacheConfig UpstreamCacheConfig;
+ ZenGcConfig GcConfig;
std::filesystem::path DataDir; // Root directory for state (used for testing)
std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
std::filesystem::path AbsLogFile; // Absolute path to main log file
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 47c79526b..5b5cfb3b7 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -198,15 +198,15 @@ public:
m_AdminService.RegisterGcHandler({.Trigger =
[this]() {
CbObjectWriter Writer;
- const bool Started = m_Gc.Trigger();
+ const bool Started = m_GcScheduler.ScheduleNow();
Writer << "Status"sv << (Started ? "Started"sv : "Running"sv);
return Writer.Save();
},
.Status =
[this]() {
- CbObjectWriter Writer;
- const GcStatus Status = m_Gc.Status();
- Writer << "Status"sv << (GcStatus::kIdle == Status ? "Idle"sv : "Running"sv);
+ CbObjectWriter Writer;
+ const GcSchedulerStatus Status = m_GcScheduler.Status();
+ Writer << "Status"sv << (GcSchedulerStatus::kIdle == Status ? "Idle"sv : "Running"sv);
return Writer.Save();
}});
@@ -229,11 +229,11 @@ public:
m_CasStore->Initialize(Config);
m_CidStore = std::make_unique<zen::CidStore>(*m_CasStore, m_DataRoot / "cid");
- m_Gc.Cas().SetCidStore(m_CidStore.get());
+ m_CasGc.SetCidStore(m_CidStore.get());
ZEN_INFO("instantiating project service");
- m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_Gc.Cas());
+ m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_CasGc);
m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore});
#if ZEN_USE_NAMED_PIPES
@@ -306,6 +306,14 @@ public:
{
m_Http->RegisterService(*m_FrontendService);
}
+
+ ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds);
+ zen::GcSchedulerConfig GcConfig{
+ .RootDirectory = m_DataRoot / "gc",
+ .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds),
+ .Enabled = ServerOptions.GcConfig.Enabled,
+ };
+ m_GcScheduler.Initialize(GcConfig);
}
void InitializeState(const ZenServerOptions& ServerOptions);
@@ -518,8 +526,9 @@ private:
zen::Ref<zen::HttpServer> m_Http;
zen::HttpStatusService m_StatusService;
zen::HttpStatsService m_StatsService;
- zen::Gc m_Gc;
- std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore(m_Gc.Cas())};
+ zen::CasGc m_CasGc;
+ zen::GcScheduler m_GcScheduler{m_CasGc};
+ std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore(m_CasGc)};
std::unique_ptr<zen::CidStore> m_CidStore;
std::unique_ptr<zen::ZenCacheStore> m_CacheStore;
zen::CasScrubber m_Scrubber{*m_CasStore};
@@ -661,7 +670,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
ZEN_INFO("instantiating structured cache service");
- m_CacheStore = std::make_unique<ZenCacheStore>(m_Gc.Cas(), m_DataRoot / "cache");
+ m_CacheStore = std::make_unique<ZenCacheStore>(m_CasGc, m_DataRoot / "cache");
std::unique_ptr<zen::UpstreamCache> UpstreamCache;
if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp
index a0d47c213..40846e368 100644
--- a/zenstore/CAS.cpp
+++ b/zenstore/CAS.cpp
@@ -333,10 +333,10 @@ CasImpl::TotalSize() const
//////////////////////////////////////////////////////////////////////////
-CasStore*
+std::unique_ptr<CasStore>
CreateCasStore(CasGc& Gc)
{
- return new CasImpl(Gc);
+ return std::make_unique<CasImpl>(Gc);
}
//////////////////////////////////////////////////////////////////////////
@@ -355,7 +355,7 @@ TEST_CASE("CasStore")
CasGc Gc;
- std::unique_ptr<CasStore> Store{CreateCasStore(Gc)};
+ std::unique_ptr<CasStore> Store = CreateCasStore(Gc);
Store->Initialize(config);
ScrubContext Ctx;
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index 1b987ca08..7ab2045b5 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -2,14 +2,47 @@
#include <zenstore/gc.h>
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/filesystem.h>
#include <zencore/logging.h>
#include <zencore/string.h>
#include <zencore/timer.h>
#include <zenstore/CAS.h>
#include <zenstore/cidstore.h>
+#include <filesystem>
+
namespace zen {
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+CbObject
+LoadCompactBinaryObject(const std::filesystem::path& Path)
+{
+ FileContents Result = ReadFile(Path);
+
+ if (!Result.ErrorCode)
+ {
+ IoBuffer Buffer = Result.Flatten();
+ if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
+ {
+ return LoadCompactBinaryObject(Buffer);
+ }
+ }
+
+ return CbObject();
+}
+
+void
+SaveCompactBinaryObject(const std::filesystem::path& Path, const CbObject& Object)
+{
+ WriteFile(Path, Object.GetBuffer().AsIoBuffer());
+}
+
//////////////////////////////////////////////////////////////////////////
struct GcContext::GcState
@@ -145,16 +178,12 @@ CasGc::RemoveGcStorage(GcStorage* Storage)
}
void
-CasGc::CollectGarbage()
+CasGc::CollectGarbage(GcContext& GcCtx)
{
RwLock::SharedLockScope _(m_Lock);
// First gather reference set
- GcContext GcCtx;
- GcCtx.SetDeletionMode(true);
- GcCtx.SetContainerGcEnabled(false);
-
for (GcContributor* Contributor : m_GcContribs)
{
Contributor->GatherReferences(GcCtx);
@@ -219,28 +248,116 @@ CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes)
ZEN_UNUSED(Hashes);
}
-bool
-Gc::Trigger()
+//////////////////////////////////////////////////////////////////////////
+
+GcScheduler::GcScheduler(CasGc& CasGc) : m_Log(logging::Get("gc")), m_CasGc(CasGc)
+{
+}
+
+GcScheduler::~GcScheduler()
+{
+ Shutdown();
+}
+
+void
+GcScheduler::Initialize(const GcSchedulerConfig& Config)
{
- uint32_t Expected = uint32_t(GcStatus::kIdle);
- if (!m_Status.compare_exchange_strong(Expected, static_cast<uint32_t>(GcStatus::kRunning)))
+ using namespace std::chrono;
+
+ m_Config = Config;
+
+ std::filesystem::create_directories(Config.RootDirectory);
+
+ Timepoint CurrentTime = Clock::now();
+
+ if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state"))
{
- return false;
+ const int64_t LastGcTime = SchedulerState["LastGcTime"sv].AsInt64();
+ CurrentTime = Timepoint(Clock::duration(LastGcTime));
+
+ if (CurrentTime + m_Config.Interval < Clock::now())
+ {
+ CurrentTime = Clock::now();
+ }
}
- ZEN_ASSERT(GcStatus::kRunning == Status());
+ m_NextGcTime = CurrentTime + m_Config.Interval;
+ m_GcThread = std::jthread(&GcScheduler::SchedulerThread, this);
+}
- m_GcThread = std::jthread([this]() {
- Stopwatch Timer;
- ZEN_INFO("garbage collection STARTING");
+void
+GcScheduler::Shutdown()
+{
+ if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status)
+ {
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped);
+ m_GcSignal.notify_one();
+ }
+}
+
+void
+GcScheduler::SchedulerThread()
+{
+ for (;;)
+ {
+ {
+ std::unique_lock Lock(m_GcMutex);
+ if (m_GcSignal.wait_until(Lock, m_NextGcTime, [this]() { return Status() != GcSchedulerStatus::kIdle; }))
+ {
+ if (Status() == GcSchedulerStatus::kStopped)
+ {
+ break;
+ }
+ }
+ else
+ {
+ m_Status.store(static_cast<uint32_t>(GcSchedulerStatus::kRunning));
+ }
+ }
+
+ if (m_Config.Enabled)
+ {
+ Stopwatch Timer;
+ ZEN_INFO("garbage collection STARTING");
- m_CasGc.CollectGarbage();
- m_Status = static_cast<uint32_t>(GcStatus::kIdle);
+ GcContext GcCtx;
+ GcCtx.SetDeletionMode(true);
+ GcCtx.SetContainerGcEnabled(true);
- ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
+ m_CasGc.CollectGarbage(GcCtx);
- return true;
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
+ m_NextGcTime = Clock::now() + m_Config.Interval;
+
+ {
+ CbObjectWriter SchedulderState;
+ SchedulderState << "LastGcTime"sv << static_cast<int64_t>(Clock::now().time_since_epoch().count());
+ SaveCompactBinaryObject(m_Config.RootDirectory / "gc_state", SchedulderState.Save());
+ }
+
+ ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ }
}
+bool
+GcScheduler::ScheduleNow()
+{
+ if (m_Config.Enabled)
+ {
+ ZEN_DEBUG("schedule NOW");
+ std::unique_lock Lock(m_GcMutex);
+ if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
+ {
+ m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
+ m_GcSignal.notify_one();
+ return true;
+ }
+ }
+
+ return false;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
} // namespace zen
diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h
index bba1bb721..72b750d6c 100644
--- a/zenstore/include/zenstore/CAS.h
+++ b/zenstore/include/zenstore/CAS.h
@@ -129,7 +129,7 @@ protected:
uint64_t m_LastScrubTime = 0;
};
-ZENCORE_API CasStore* CreateCasStore(CasGc& Gc);
+ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(CasGc& Gc);
void CAS_forcelink();
diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h
index 6b00f1ffb..94f3c32ac 100644
--- a/zenstore/include/zenstore/gc.h
+++ b/zenstore/include/zenstore/gc.h
@@ -6,11 +6,18 @@
#include <zencore/thread.h>
#include <atomic>
+#include <chrono>
+#include <filesystem>
#include <functional>
#include <span>
+#include <thread>
#define ZEN_USE_REF_TRACKING 0 // This is not currently functional
+namespace spdlog {
+class logger;
+}
+
namespace zen {
class CasStore;
@@ -97,7 +104,7 @@ public:
void AddGcStorage(GcStorage* Contributor);
void RemoveGcStorage(GcStorage* Contributor);
- void CollectGarbage();
+ void CollectGarbage(GcContext& GcCtx);
void SetCidStore(CidStore* Cids);
void OnNewCidReferences(std::span<IoHash> Hashes);
@@ -111,23 +118,46 @@ private:
CidStore* m_CidStore;
};
-enum class GcStatus : uint32_t
+enum class GcSchedulerStatus : uint32_t
{
kIdle,
- kRunning
+ kRunning,
+ kStopped
+};
+
+struct GcSchedulerConfig
+{
+ std::filesystem::path RootDirectory;
+ std::chrono::seconds Interval{3600};
+ bool Enabled = true;
};
-class Gc
+class GcScheduler
{
+ using Clock = std::chrono::system_clock;
+ using Timepoint = std::chrono::time_point<Clock>;
+
public:
- bool Trigger();
- CasGc& Cas() { return m_CasGc; }
- GcStatus Status() const { return static_cast<GcStatus>(m_Status.load()); }
+ GcScheduler(CasGc& CasGc);
+ ~GcScheduler();
+
+ void Initialize(const GcSchedulerConfig& Config);
+ bool ScheduleNow();
+ GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
+ void Shutdown();
private:
- CasGc m_CasGc;
- std::jthread m_GcThread;
- std::atomic_uint32_t m_Status;
+ void SchedulerThread();
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
+ CasGc& m_CasGc;
+ GcSchedulerConfig m_Config;
+ Timepoint m_NextGcTime{};
+ std::atomic_uint32_t m_Status{};
+ std::jthread m_GcThread;
+ std::mutex m_GcMutex;
+ std::condition_variable m_GcSignal;
};
} // namespace zen