diff options
| author | Per Larsson <[email protected]> | 2021-12-05 16:03:27 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-12-05 16:03:27 +0100 |
| commit | 9eb0876ab1f35317eb04dd8a74f0394e853f4f56 (patch) | |
| tree | 1dfd11024baea97b01c69b40153086511987f361 | |
| parent | Merge branch 'gc' of https://github.com/EpicGames/zen into gc (diff) | |
| download | zen-9eb0876ab1f35317eb04dd8a74f0394e853f4f56.tar.xz zen-9eb0876ab1f35317eb04dd8a74f0394e853f4f56.zip | |
Added simple GC interval scheduling.
| -rw-r--r-- | zen/chunk/chunk.cpp | 4 | ||||
| -rw-r--r-- | zenserver/config.cpp | 13 | ||||
| -rw-r--r-- | zenserver/config.h | 25 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 27 | ||||
| -rw-r--r-- | zenstore/CAS.cpp | 6 | ||||
| -rw-r--r-- | zenstore/gc.cpp | 155 | ||||
| -rw-r--r-- | zenstore/include/zenstore/CAS.h | 2 | ||||
| -rw-r--r-- | zenstore/include/zenstore/gc.h | 50 |
8 files changed, 238 insertions, 44 deletions
diff --git a/zen/chunk/chunk.cpp b/zen/chunk/chunk.cpp index 043832dd3..9697969c0 100644 --- a/zen/chunk/chunk.cpp +++ b/zen/chunk/chunk.cpp @@ -950,7 +950,7 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) zen::CasStoreConfiguration Config; Config.RootDirectory = m_RootDirectory; - CasStore.reset(zen::CreateCasStore(Gc)); + CasStore = zen::CreateCasStore(Gc); CasStore->Initialize(Config); } @@ -1162,4 +1162,4 @@ TEST_CASE("chunking") SUBCASE("mod method") { test(/* UseThreshold */ false, /* Random */ Random, 2048, 1 * 1024 * 1024); } } -#endif
\ No newline at end of file +#endif diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 1e847ce3d..49240d176 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -270,6 +270,19 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) cxxopts::value<int32_t>(ServerOptions.UpstreamCacheConfig.TimeoutMilliseconds)->default_value("0"), ""); + options.add_option("gc", + "", + "gc-enabled", + "Whether garbage collection is enabled or not.", + cxxopts::value<bool>(ServerOptions.GcConfig.Enabled)->default_value("true"), + ""); + + options.add_option("gc", + "", + "gc-interval-seconds", + "Garbage collection interval. Default is 1h.", + cxxopts::value<int32_t>(ServerOptions.GcConfig.IntervalSeconds)->default_value("3600"), + ""); try { auto result = options.parse(argc, argv); diff --git a/zenserver/config.h b/zenserver/config.h index 19fba71a3..97f339a0e 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -55,9 +55,34 @@ struct ZenUpstreamCacheConfig bool StatsEnabled = false; }; +struct ZenCacheEvictionPolicy +{ + bool Enabled = true; + uint64_t DiskSizeLimit = ~uint64_t(0); + uint64_t MemorySizeLimit = 1024 * 1024 * 1024; +}; + +struct ZenCasEvictionPolicy +{ + bool Enabled = true; + bool ContainerGcEnabled = true; + uint64_t LargeStrategySizeLimit = ~uint64_t(0); + uint64_t SmallStrategySizeLimit = ~uint64_t(0); + uint64_t TinyStrategySizeLimit = ~uint64_t(0); +}; + +struct ZenGcConfig +{ + ZenCasEvictionPolicy Cas; + ZenCacheEvictionPolicy Cache; + int32_t IntervalSeconds = 60 * 60; + bool Enabled = true; +}; + struct ZenServerOptions { ZenUpstreamCacheConfig UpstreamCacheConfig; + ZenGcConfig GcConfig; std::filesystem::path DataDir; // Root directory for state (used for testing) std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) std::filesystem::path AbsLogFile; // Absolute path to main log file diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 47c79526b..5b5cfb3b7 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -198,15 +198,15 @@ public: m_AdminService.RegisterGcHandler({.Trigger = [this]() { CbObjectWriter Writer; - const bool Started = m_Gc.Trigger(); + const bool Started = m_GcScheduler.ScheduleNow(); Writer << "Status"sv << (Started ? "Started"sv : "Running"sv); return Writer.Save(); }, .Status = [this]() { - CbObjectWriter Writer; - const GcStatus Status = m_Gc.Status(); - Writer << "Status"sv << (GcStatus::kIdle == Status ? "Idle"sv : "Running"sv); + CbObjectWriter Writer; + const GcSchedulerStatus Status = m_GcScheduler.Status(); + Writer << "Status"sv << (GcSchedulerStatus::kIdle == Status ? "Idle"sv : "Running"sv); return Writer.Save(); }}); @@ -229,11 +229,11 @@ public: m_CasStore->Initialize(Config); m_CidStore = std::make_unique<zen::CidStore>(*m_CasStore, m_DataRoot / "cid"); - m_Gc.Cas().SetCidStore(m_CidStore.get()); + m_CasGc.SetCidStore(m_CidStore.get()); ZEN_INFO("instantiating project service"); - m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_Gc.Cas()); + m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_CasGc); m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore}); #if ZEN_USE_NAMED_PIPES @@ -306,6 +306,14 @@ public: { m_Http->RegisterService(*m_FrontendService); } + + ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds); + zen::GcSchedulerConfig GcConfig{ + .RootDirectory = m_DataRoot / "gc", + .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), + .Enabled = ServerOptions.GcConfig.Enabled, + }; + m_GcScheduler.Initialize(GcConfig); } void InitializeState(const ZenServerOptions& ServerOptions); @@ -518,8 +526,9 @@ private: zen::Ref<zen::HttpServer> m_Http; zen::HttpStatusService m_StatusService; zen::HttpStatsService m_StatsService; - zen::Gc m_Gc; - std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore(m_Gc.Cas())}; + zen::CasGc m_CasGc; + zen::GcScheduler m_GcScheduler{m_CasGc}; + std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore(m_CasGc)}; std::unique_ptr<zen::CidStore> m_CidStore; std::unique_ptr<zen::ZenCacheStore> m_CacheStore; zen::CasScrubber m_Scrubber{*m_CasStore}; @@ -661,7 +670,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; ZEN_INFO("instantiating structured cache service"); - m_CacheStore = std::make_unique<ZenCacheStore>(m_Gc.Cas(), m_DataRoot / "cache"); + m_CacheStore = std::make_unique<ZenCacheStore>(m_CasGc, m_DataRoot / "cache"); std::unique_ptr<zen::UpstreamCache> UpstreamCache; if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index a0d47c213..40846e368 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -333,10 +333,10 @@ CasImpl::TotalSize() const ////////////////////////////////////////////////////////////////////////// -CasStore* +std::unique_ptr<CasStore> CreateCasStore(CasGc& Gc) { - return new CasImpl(Gc); + return std::make_unique<CasImpl>(Gc); } ////////////////////////////////////////////////////////////////////////// @@ -355,7 +355,7 @@ TEST_CASE("CasStore") CasGc Gc; - std::unique_ptr<CasStore> Store{CreateCasStore(Gc)}; + std::unique_ptr<CasStore> Store = CreateCasStore(Gc); Store->Initialize(config); ScrubContext Ctx; diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index 1b987ca08..7ab2045b5 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -2,14 +2,47 @@ #include <zenstore/gc.h> +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/filesystem.h> #include <zencore/logging.h> #include <zencore/string.h> #include <zencore/timer.h> #include <zenstore/CAS.h> #include <zenstore/cidstore.h> +#include <filesystem> + namespace zen { +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +CbObject +LoadCompactBinaryObject(const std::filesystem::path& Path) +{ + FileContents Result = ReadFile(Path); + + if (!Result.ErrorCode) + { + IoBuffer Buffer = Result.Flatten(); + if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + { + return LoadCompactBinaryObject(Buffer); + } + } + + return CbObject(); +} + +void +SaveCompactBinaryObject(const std::filesystem::path& Path, const CbObject& Object) +{ + WriteFile(Path, Object.GetBuffer().AsIoBuffer()); +} + ////////////////////////////////////////////////////////////////////////// struct GcContext::GcState @@ -145,16 +178,12 @@ CasGc::RemoveGcStorage(GcStorage* Storage) } void -CasGc::CollectGarbage() +CasGc::CollectGarbage(GcContext& GcCtx) { RwLock::SharedLockScope _(m_Lock); // First gather reference set - GcContext GcCtx; - GcCtx.SetDeletionMode(true); - GcCtx.SetContainerGcEnabled(false); - for (GcContributor* Contributor : m_GcContribs) { Contributor->GatherReferences(GcCtx); @@ -219,28 +248,116 @@ CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes) ZEN_UNUSED(Hashes); } -bool -Gc::Trigger() +////////////////////////////////////////////////////////////////////////// + +GcScheduler::GcScheduler(CasGc& CasGc) : m_Log(logging::Get("gc")), m_CasGc(CasGc) +{ +} + +GcScheduler::~GcScheduler() +{ + Shutdown(); +} + +void +GcScheduler::Initialize(const GcSchedulerConfig& Config) { - uint32_t Expected = uint32_t(GcStatus::kIdle); - if (!m_Status.compare_exchange_strong(Expected, static_cast<uint32_t>(GcStatus::kRunning))) + using namespace std::chrono; + + m_Config = Config; + + std::filesystem::create_directories(Config.RootDirectory); + + Timepoint CurrentTime = Clock::now(); + + if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state")) { - return false; + const int64_t LastGcTime = SchedulerState["LastGcTime"sv].AsInt64(); + CurrentTime = Timepoint(Clock::duration(LastGcTime)); + + if (CurrentTime + m_Config.Interval < Clock::now()) + { + CurrentTime = Clock::now(); + } } - ZEN_ASSERT(GcStatus::kRunning == Status()); + m_NextGcTime = CurrentTime + m_Config.Interval; + m_GcThread = std::jthread(&GcScheduler::SchedulerThread, this); +} - m_GcThread = std::jthread([this]() { - Stopwatch Timer; - ZEN_INFO("garbage collection STARTING"); +void +GcScheduler::Shutdown() +{ + if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status) + { + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped); + m_GcSignal.notify_one(); + } +} + +void +GcScheduler::SchedulerThread() +{ + for (;;) + { + { + std::unique_lock Lock(m_GcMutex); + if (m_GcSignal.wait_until(Lock, m_NextGcTime, [this]() { return Status() != GcSchedulerStatus::kIdle; })) + { + if (Status() == GcSchedulerStatus::kStopped) + { + break; + } + } + else + { + m_Status.store(static_cast<uint32_t>(GcSchedulerStatus::kRunning)); + } + } + + if (m_Config.Enabled) + { + Stopwatch Timer; + ZEN_INFO("garbage collection STARTING"); - m_CasGc.CollectGarbage(); - m_Status = static_cast<uint32_t>(GcStatus::kIdle); + GcContext GcCtx; + GcCtx.SetDeletionMode(true); + GcCtx.SetContainerGcEnabled(true); - ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); + m_CasGc.CollectGarbage(GcCtx); - return true; + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kIdle); + m_NextGcTime = Clock::now() + m_Config.Interval; + + { + CbObjectWriter SchedulderState; + SchedulderState << "LastGcTime"sv << static_cast<int64_t>(Clock::now().time_since_epoch().count()); + SaveCompactBinaryObject(m_Config.RootDirectory / "gc_state", SchedulderState.Save()); + } + + ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + } } +bool +GcScheduler::ScheduleNow() +{ + if (m_Config.Enabled) + { + ZEN_DEBUG("schedule NOW"); + std::unique_lock Lock(m_GcMutex); + if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) + { + m_Status = static_cast<uint32_t>(GcSchedulerStatus::kRunning); + m_GcSignal.notify_one(); + return true; + } + } + + return false; +} + +////////////////////////////////////////////////////////////////////////// + } // namespace zen diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index bba1bb721..72b750d6c 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -129,7 +129,7 @@ protected: uint64_t m_LastScrubTime = 0; }; -ZENCORE_API CasStore* CreateCasStore(CasGc& Gc); +ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(CasGc& Gc); void CAS_forcelink(); diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h index 6b00f1ffb..94f3c32ac 100644 --- a/zenstore/include/zenstore/gc.h +++ b/zenstore/include/zenstore/gc.h @@ -6,11 +6,18 @@ #include <zencore/thread.h> #include <atomic> +#include <chrono> +#include <filesystem> #include <functional> #include <span> +#include <thread> #define ZEN_USE_REF_TRACKING 0 // This is not currently functional +namespace spdlog { +class logger; +} + namespace zen { class CasStore; @@ -97,7 +104,7 @@ public: void AddGcStorage(GcStorage* Contributor); void RemoveGcStorage(GcStorage* Contributor); - void CollectGarbage(); + void CollectGarbage(GcContext& GcCtx); void SetCidStore(CidStore* Cids); void OnNewCidReferences(std::span<IoHash> Hashes); @@ -111,23 +118,46 @@ private: CidStore* m_CidStore; }; -enum class GcStatus : uint32_t +enum class GcSchedulerStatus : uint32_t { kIdle, - kRunning + kRunning, + kStopped +}; + +struct GcSchedulerConfig +{ + std::filesystem::path RootDirectory; + std::chrono::seconds Interval{3600}; + bool Enabled = true; }; -class Gc +class GcScheduler { + using Clock = std::chrono::system_clock; + using Timepoint = std::chrono::time_point<Clock>; + public: - bool Trigger(); - CasGc& Cas() { return m_CasGc; } - GcStatus Status() const { return static_cast<GcStatus>(m_Status.load()); } + GcScheduler(CasGc& CasGc); + ~GcScheduler(); + + void Initialize(const GcSchedulerConfig& Config); + bool ScheduleNow(); + GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); } + void Shutdown(); private: - CasGc m_CasGc; - std::jthread m_GcThread; - std::atomic_uint32_t m_Status; + void SchedulerThread(); + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; + CasGc& m_CasGc; + GcSchedulerConfig m_Config; + Timepoint m_NextGcTime{}; + std::atomic_uint32_t m_Status{}; + std::jthread m_GcThread; + std::mutex m_GcMutex; + std::condition_variable m_GcSignal; }; } // namespace zen |