diff options
| author | Dan Engelbrecht <[email protected]> | 2023-05-09 15:11:10 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-09 15:11:10 +0200 |
| commit | 2542797c56b84473395a877376b68fcc77687ea9 (patch) | |
| tree | 698ebb1e4e6fb33ba9b8be973f8a851b2ee46c83 /src | |
| parent | Validate that entries points inside valid blocks at startup (#280) (diff) | |
| download | zen-2542797c56b84473395a877376b68fcc77687ea9.tar.xz zen-2542797c56b84473395a877376b68fcc77687ea9.zip | |
Low disk space detector (#277)
* - Feature: Disk writes are now blocked early and return an insufficient storage error if free disk space falls below the `--low-diskspace-threshold` value
* Never keep an entry in m_ChunkBlocks that points to a nullptr
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/cache/structuredcache.cpp | 124 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcache.h | 41 | ||||
| -rw-r--r-- | src/zenserver/config.cpp | 16 | ||||
| -rw-r--r-- | src/zenserver/config.h | 22 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 118 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 3 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 26 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 20 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 48 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 29 |
10 files changed, 314 insertions, 133 deletions
diff --git a/src/zenserver/cache/structuredcache.cpp b/src/zenserver/cache/structuredcache.cpp index 3c829f0e8..9f2a448bb 100644 --- a/src/zenserver/cache/structuredcache.cpp +++ b/src/zenserver/cache/structuredcache.cpp @@ -18,6 +18,7 @@ #include <zenhttp/httpserver.h> #include <zenhttp/httpshared.h> #include <zenhttp/httpstats.h> +#include <zenstore/gc.h> #include <zenutil/cache/cache.h> #include <zenutil/cache/rpcrecording.h> @@ -315,17 +316,19 @@ namespace { ////////////////////////////////////////////////////////////////////////// -HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - UpstreamCache& UpstreamCache) +HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache, + const DiskWriteBlocker* InDiskWriteBlocker) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) , m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) +, m_DiskWriteBlocker(InDiskWriteBlocker) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); @@ -983,7 +986,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con bool Success = false; const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal); + const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); ZenCacheValue ClientResultValue; @@ -1097,6 +1100,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { + ZEN_ASSERT_SLOW(StoreLocal); CompressedBuffer Chunk = Attachment->AsCompressedBinary(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); @@ -1188,6 +1192,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con { return Request.WriteResponse(HttpResponseCode::BadRequest); } + if (!AreDiskWritesAllowed()) + { + return Request.WriteResponse(HttpResponseCode::InsufficientStorage); + } const HttpContentType ContentType = Request.RequestContentType(); @@ -1423,7 +1431,10 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { if (RawHash == Ref.ValueContentId) { - m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + if (AreDiskWritesAllowed()) + { + m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + } Source = UpstreamResult.Source; } else @@ -1492,6 +1503,10 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons { return Request.WriteResponse(HttpResponseCode::BadRequest); } + if (!AreDiskWritesAllowed()) + { + return Request.WriteResponse(HttpResponseCode::InsufficientStorage); + } Body.SetContentType(Request.RequestContentType()); @@ -1526,12 +1541,13 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons Request.WriteResponse(ResponseCode); } -CbPackage +HttpResponseCode HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, IoBuffer&& Body, uint32_t& OutAcceptMagic, RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId) + int& OutTargetProcessId, + CbPackage& OutResultPackage) { CbPackage Package; CbObjectView Object; @@ -1554,25 +1570,37 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, if (Method == "PutCacheRecords"sv) { - return HandleRpcPutCacheRecords(Package); + if (!AreDiskWritesAllowed()) + { + return HttpResponseCode::InsufficientStorage; + } + OutResultPackage = HandleRpcPutCacheRecords(Package); } else if (Method == "GetCacheRecords"sv) { - return HandleRpcGetCacheRecords(Object); + OutResultPackage = HandleRpcGetCacheRecords(Object); } else if (Method == "PutCacheValues"sv) { - return HandleRpcPutCacheValues(Package); + if (!AreDiskWritesAllowed()) + { + return HttpResponseCode::InsufficientStorage; + } + OutResultPackage = HandleRpcPutCacheValues(Package); } else if (Method == "GetCacheValues"sv) { - return HandleRpcGetCacheValues(Object); + OutResultPackage = HandleRpcGetCacheValues(Object); } else if (Method == "GetCacheChunks"sv) { - return HandleRpcGetCacheChunks(Object); + OutResultPackage = HandleRpcGetCacheChunks(Object); } - return CbPackage{}; + else + { + return HttpResponseCode::BadRequest; + } + return HttpResponseCode::OK; } void @@ -1594,27 +1622,30 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetPid = 0; - CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid); - if (AcceptMagic == kCbPkgMagic) + CbPackage RpcResult; + if (IsHttpSuccessCode(HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult))) { - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + if (AcceptMagic == kCbPkgMagic) { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) { - Flags |= FormatFlags::kDenyPartialLocalReferences; + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid); + ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); + } + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); + IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); + ZEN_ASSERT(RpcResponseBuffer.Size() > 0); } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid); - ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); - IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); - ZEN_ASSERT(RpcResponseBuffer.Size() > 0); } } JobLatch.CountDown(); @@ -1652,10 +1683,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetProcessId = 0; - CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId); - if (RpcResult.IsNull()) + CbPackage RpcResult; + + HttpResponseCode ResultCode = + HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId, RpcResult); + if (!IsHttpSuccessCode(ResultCode)) { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + AsyncRequest.WriteResponse(ResultCode); return; } if (AcceptMagic == kCbPkgMagic) @@ -1706,6 +1740,7 @@ CbPackage HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest) { ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); + CbObjectView BatchObject = BatchRequest.GetObject(); ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); @@ -2064,7 +2099,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject); Request.RecordObject = ObjectBuffer; - if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) + bool StoreLocal = + EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) { m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); } @@ -2087,13 +2124,14 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { + bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId)) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { Request.Source = Params.Source; Value.Exists = true; - if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) + if (StoreLocal) { m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } @@ -2411,7 +2449,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) Request.RawSize = Params.RawSize; const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); - const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal); + const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); const bool IsHit = SkipData || HasData; if (IsHit) { @@ -2729,7 +2767,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac Record.CacheValue.SetContentType(ZenContentType::kCbObject); Record.Source = Params.Source; - if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) + bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) { m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } @@ -2895,7 +2934,8 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names return; } - if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal)) + bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) { if (Request.IsRecordRequest) { @@ -3054,6 +3094,12 @@ HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request) Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } +bool +HttpStructuredCacheService::AreDiskWritesAllowed() const +{ + return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); +} + #if ZEN_WITH_TESTS TEST_CASE("z$service.parse.relative.Uri") diff --git a/src/zenserver/cache/structuredcache.h b/src/zenserver/cache/structuredcache.h index 8309361f4..77a0aee6a 100644 --- a/src/zenserver/cache/structuredcache.h +++ b/src/zenserver/cache/structuredcache.h @@ -24,6 +24,7 @@ struct PutRequestData; class ScrubContext; class UpstreamCache; class ZenCacheStore; +class DiskWriteBlocker; enum class CachePolicy : uint32_t; enum class RpcAcceptOptions : uint16_t; @@ -67,18 +68,18 @@ namespace cache { class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider { public: - HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - UpstreamCache& UpstreamCache); + HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache, + const DiskWriteBlocker* InDiskWriteBlocker); ~HttpStructuredCacheService(); virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; - - void Flush(); - void Scrub(ScrubContext& Ctx); + void Flush(); + void Scrub(ScrubContext& Ctx); private: struct CacheRef @@ -111,16 +112,17 @@ private: void HandleRpcRequest(HttpServerRequest& Request); void HandleDetailsRequest(HttpServerRequest& Request); - CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest); - CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest); - CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest); - CbPackage HandleRpcRequest(const ZenContentType ContentType, - IoBuffer&& Body, - uint32_t& OutAcceptMagic, - RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId); + CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest); + CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest); + CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest); + HttpResponseCode HandleRpcRequest(const ZenContentType ContentType, + IoBuffer&& Body, + uint32_t& OutAcceptMagic, + RpcAcceptOptions& OutAcceptFlags, + int& OutTargetProcessId, + CbPackage& OutPackage); void HandleCacheRequest(HttpServerRequest& Request); void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace); @@ -156,6 +158,8 @@ private: /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests); + bool AreDiskWritesAllowed() const; + spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; ZenCacheStore& m_CacheStore; @@ -167,6 +171,7 @@ private: metrics::OperationTiming m_HttpRequests; metrics::OperationTiming m_UpstreamGetRequestTiming; CacheStats m_CacheStats; + const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index da0da2593..592de0f7f 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -506,7 +506,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_option("gc", "", "disk-reserve-size", - "Size of gc disk reserve in bytes. Default set to 268435456 (256 Mb).", + "Size of gc disk reserve in bytes. Default set to 268435456 (256 Mb). Set to zero to disable.", cxxopts::value<uint64_t>(ServerOptions.GcConfig.DiskReserveSize)->default_value("268435456"), ""); @@ -519,6 +519,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_option("gc", "", + "gc-low-diskspace-threshold", + "Minimum free space on disk to allow writes to disk. Default set to 268435456 (256 Mb). Set to zero to disable.", + cxxopts::value<uint64_t>(ServerOptions.GcConfig.Cache.MinimumFreeDiskSpaceToAllowWrites)->default_value("268435456"), + ""); + + options.add_option("gc", + "", "gc-disksize-softlimit", "Garbage collection disk usage soft limit. Default set to 0 (Off).", cxxopts::value<uint64_t>(ServerOptions.GcConfig.Cache.DiskSizeSoftLimit)->default_value("0"), @@ -867,9 +874,10 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio if (sol::optional<sol::table> GcConfig = lua["gc"]) { - ServerOptions.GcConfig.MonitorIntervalSeconds = GcConfig.value().get_or("monitorintervalseconds", 30); - ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0); - ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28)); + ServerOptions.GcConfig.MonitorIntervalSeconds = GcConfig.value().get_or("monitorintervalseconds", 30); + ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0); + ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28)); + ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites = GcConfig.value().get_or("lowdiskspacethreshold", 0); if (sol::optional<sol::table> CacheGcConfig = GcConfig.value()["cache"]) { diff --git a/src/zenserver/config.h b/src/zenserver/config.h index 9559cae33..caf4adce7 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -69,11 +69,12 @@ struct ZenUpstreamCacheConfig struct ZenCacheEvictionPolicy { - uint64_t DiskSizeLimit = ~uint64_t(0); - uint64_t MemorySizeLimit = 1024 * 1024 * 1024; - int32_t MaxDurationSeconds = 24 * 60 * 60; - uint64_t DiskSizeSoftLimit = 0; - bool Enabled = true; + uint64_t DiskSizeLimit = ~uint64_t(0); + uint64_t MemorySizeLimit = 1024 * 1024 * 1024; + int32_t MaxDurationSeconds = 24 * 60 * 60; + uint64_t DiskSizeSoftLimit = 0; + uint64_t MinimumFreeDiskSpaceToAllowWrites = 256u * 1024u * 1024u; + bool Enabled = true; }; struct ZenCasEvictionPolicy @@ -88,11 +89,12 @@ struct ZenGcConfig { ZenCasEvictionPolicy Cas; ZenCacheEvictionPolicy Cache; - int32_t MonitorIntervalSeconds = 30; - int32_t IntervalSeconds = 0; - bool CollectSmallObjects = true; - bool Enabled = true; - uint64_t DiskReserveSize = 1ul << 28; + int32_t MonitorIntervalSeconds = 30; + int32_t IntervalSeconds = 0; + bool CollectSmallObjects = true; + bool Enabled = true; + uint64_t DiskReserveSize = 1ul << 28; + uint64_t MinimumFreeDiskSpaceToAllowWrites = 1ul << 28; }; struct ZenOpenIdProviderConfig diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 80e3f27f1..9dd6ddcfc 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1680,6 +1680,7 @@ ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcMa , m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectBasePath(BasePath) +, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker()) { ZEN_INFO("initializing project store at '{}'", BasePath); // m_Log.set_level(spdlog::level::debug); @@ -2560,6 +2561,10 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, if (Method == "import") { + if (!AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); if (Result.second.empty()) { @@ -2602,6 +2607,10 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } else if (Method == "putchunks") { + if (!AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } std::span<const CbAttachment> Attachments = Package.GetAttachments(); for (const CbAttachment& Attachment : Attachments) { @@ -2676,6 +2685,12 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, return ConvertResult(Result); } +bool +ProjectStore::AreDiskWritesAllowed() const +{ + return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); +} + ////////////////////////////////////////////////////////////////////////// HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatsService& StatsService, AuthMgr& AuthMgr) @@ -2920,7 +2935,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, uint64_t Offset = 0; uint64_t Size = ~(0ull); - auto QueryParms = Req.ServerRequest().GetQueryParams(); + auto QueryParms = HttpReq.GetQueryParams(); if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false) { @@ -2987,7 +3002,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpContentType AcceptType = HttpReq.AcceptContentType(); HttpContentType RequestType = HttpReq.RequestContentType(); - switch (Req.ServerRequest().RequestVerb()) + switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { @@ -3019,6 +3034,10 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, } case HttpVerb::kPost: { + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->PutChunk(ProjectId, OplogId, Cid, RequestType, HttpReq.ReadPayload()); if (Result.first == HttpResponseCode::OK || Result.first == HttpResponseCode::Created) @@ -3107,6 +3126,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); @@ -3252,7 +3276,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value())) { CbObject& Op = MaybeOp.value(); - if (Req.ServerRequest().AcceptContentType() == ZenContentType::kCbPackage) + if (HttpReq.AcceptContentType() == ZenContentType::kCbPackage) { CbPackage Package; Package.SetObject(Op); @@ -3319,6 +3343,8 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, m_Router.RegisterRoute( "{project}/oplog/{log}", [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); @@ -3326,22 +3352,22 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, if (!Project) { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("project {} not found", ProjectId)); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); } Project->TouchProject(); - switch (Req.ServerRequest().RequestVerb()) + switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); if (!OplogIt) { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("oplog {} not found in project {}", OplogId, ProjectId)); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("oplog {} not found in project {}", OplogId, ProjectId)); } Project->TouchOplog(OplogId); @@ -3352,14 +3378,18 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, << "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount" << Log.OplogCount() << "expired"sv << Project->IsExpired(GcClock::TimePoint::max(), Log); - Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cb.Save()); + HttpReq.WriteResponse(HttpResponseCode::OK, Cb.Save()); } break; case HttpVerb::kPost: { + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } std::filesystem::path OplogMarkerPath; - if (CbObject Params = Req.ServerRequest().ReadPayloadObject()) + if (CbObject Params = HttpReq.ReadPayloadObject()) { OplogMarkerPath = Params["gcpath"sv].AsString(); } @@ -3370,19 +3400,19 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, if (!Project->NewOplog(OplogId, OplogMarkerPath)) { // TODO: indicate why the operation failed! - return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); + return HttpReq.WriteResponse(HttpResponseCode::InternalServerError); } Project->TouchOplog(OplogId); ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); - return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); + return HttpReq.WriteResponse(HttpResponseCode::Created); } // I guess this should ultimately be used to execute RPCs but for now, it // does absolutely nothing - return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } break; @@ -3392,7 +3422,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, Project->DeleteOplog(OplogId); - return Req.ServerRequest().WriteResponse(HttpResponseCode::OK); + return HttpReq.WriteResponse(HttpResponseCode::OK); } break; @@ -3461,13 +3491,19 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, m_Router.RegisterRoute( "{project}", [this](HttpRouterRequest& Req) { - const std::string ProjectId = Req.GetCapture(1); + HttpServerRequest& HttpReq = Req.ServerRequest(); + const std::string ProjectId = Req.GetCapture(1); - switch (Req.ServerRequest().RequestVerb()) + switch (HttpReq.RequestVerb()) { case HttpVerb::kPost: { - IoBuffer Payload = Req.ServerRequest().ReadPayload(); + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + IoBuffer Payload = HttpReq.ReadPayload(); CbObject Params = LoadCompactBinaryObject(Payload); std::string_view Id = Params["id"sv].AsString(); std::string_view Root = Params["root"sv].AsString(); @@ -3487,7 +3523,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, ProjectFilePath, ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); - Req.ServerRequest().WriteResponse(HttpResponseCode::Created); + HttpReq.WriteResponse(HttpResponseCode::Created); } break; @@ -3496,9 +3532,9 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("project {} not found", ProjectId)); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); } Project->TouchProject(); @@ -3520,7 +3556,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, } Response.EndArray(); // oplogs - Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save()); + HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); } break; @@ -3529,20 +3565,20 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("project {} not found", ProjectId)); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); } ZEN_INFO("deleting project '{}'", ProjectId); if (!m_ProjectStore->DeleteProject(ProjectId)) { - return Req.ServerRequest().WriteResponse(HttpResponseCode::Locked, - HttpContentType::kText, - fmt::format("project {} is in use", ProjectId)); + return HttpReq.WriteResponse(HttpResponseCode::Locked, + HttpContentType::kText, + fmt::format("project {} is in use", ProjectId)); } - return Req.ServerRequest().WriteResponse(HttpResponseCode::NoContent); + return HttpReq.WriteResponse(HttpResponseCode::NoContent); } break; @@ -3556,14 +3592,20 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, m_Router.RegisterRoute( "{project}/oplog/{log}/save", [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); if (HttpReq.RequestContentType() != HttpContentType::kCbObject) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type"); } - IoBuffer Payload = Req.ServerRequest().ReadPayload(); + IoBuffer Payload = HttpReq.ReadPayload(); CbObject Response; std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->WriteOplog(ProjectId, OplogId, std::move(Payload), Response); @@ -3590,11 +3632,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type"); } - IoBuffer Payload = Req.ServerRequest().ReadPayload(); + IoBuffer Payload = HttpReq.ReadPayload(); CbObject Response; std::pair<HttpResponseCode, std::string> Result = - m_ProjectStore->ReadOplog(ProjectId, OplogId, Req.ServerRequest().GetQueryParams(), Response); + m_ProjectStore->ReadOplog(ProjectId, OplogId, HttpReq.GetQueryParams(), Response); if (Result.first == HttpResponseCode::OK) { return HttpReq.WriteResponse(HttpResponseCode::OK, Response); @@ -3615,7 +3657,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); - IoBuffer Payload = Req.ServerRequest().ReadPayload(); + IoBuffer Payload = HttpReq.ReadPayload(); m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr); }, diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 7b0c96205..b446de543 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -322,12 +322,15 @@ public: CbObjectView&& Params, AuthMgr& AuthManager); + bool AreDiskWritesAllowed() const; + private: spdlog::logger& m_Log; CidStore& m_CidStore; std::filesystem::path m_ProjectBasePath; mutable RwLock m_ProjectsLock; std::map<std::string, Ref<Project>> m_Projects; + const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; std::filesystem::path BasePathForProject(std::string_view ProjectId); }; diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 75a49367c..c6ec4b82b 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -394,14 +394,16 @@ public: } ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds); - zen::GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc", - .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), - .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), - .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), - .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, - .Enabled = ServerOptions.GcConfig.Enabled, - .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, - .DiskSizeSoftLimit = ServerOptions.GcConfig.Cache.DiskSizeSoftLimit}; + zen::GcSchedulerConfig GcConfig{ + .RootDirectory = m_DataRoot / "gc", + .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), + .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), + .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), + .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, + .Enabled = ServerOptions.GcConfig.Enabled, + .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, + .DiskSizeSoftLimit = ServerOptions.GcConfig.Cache.DiskSizeSoftLimit, + .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.Cache.MinimumFreeDiskSpaceToAllowWrites}; m_GcScheduler.Initialize(GcConfig); return EffectiveBasePort; @@ -845,8 +847,12 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) } } - m_StructuredCacheService = - std::make_unique<HttpStructuredCacheService>(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, *m_UpstreamCache); + m_StructuredCacheService = std::make_unique<HttpStructuredCacheService>(*m_CacheStore, + *m_CidStore, + m_StatsService, + m_StatusService, + *m_UpstreamCache, + m_GcManager.GetDiskWriteBlocker()); m_Http->RegisterService(*m_StructuredCacheService); m_Http->RegisterService(*m_UpstreamService); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 05bc69fcb..378d9fd52 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -472,7 +472,10 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); - OldBlockFile = m_ChunkBlocks[BlockIndex]; + if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end()) + { + OldBlockFile = It->second; + } } if (!OldBlockFile) @@ -504,8 +507,9 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, }); if (OldBlockFile) { - m_ChunkBlocks[BlockIndex] = nullptr; ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile); + m_ChunkBlocks.erase(BlockIndex); m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); OldBlockFile->MarkAsDeleteOnClose(); } @@ -582,7 +586,9 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); + ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); m_ChunkBlocks.erase(NextBlockIndex); + NewBlockFile->MarkAsDeleteOnClose(); return; } @@ -627,8 +633,9 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); - m_ChunkBlocks[BlockIndex] = nullptr; ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile); + m_ChunkBlocks.erase(BlockIndex); m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); OldBlockFile->MarkAsDeleteOnClose(); } @@ -704,8 +711,8 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, size_t ChunkIndex = LocationIndexes[LocationIndexOffset]; const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; - const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[FirstLocation.BlockIndex]; - if (!BlockFile) + auto FindBlockIt = m_ChunkBlocks.find(FirstLocation.BlockIndex); + if (FindBlockIt == m_ChunkBlocks.end()) { while (ChunkLocations[ChunkIndex].BlockIndex == FirstLocation.BlockIndex) { @@ -719,6 +726,9 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, } continue; } + const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second; + ZEN_ASSERT(BlockFile); + size_t BlockSize = BlockFile->FileSize(); size_t RangeCount = GetNextRange(LocationIndexOffset); if (RangeCount > 0) diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 370c3c965..f9888722b 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -559,10 +559,12 @@ DiskUsageWindow::FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick GcScheduler::GcScheduler(GcManager& GcManager) : m_Log(logging::Get("gc")), m_GcManager(GcManager) { + m_GcManager.SetDiskWriteBlocker(this); } GcScheduler::~GcScheduler() { + m_GcManager.SetDiskWriteBlocker(nullptr); Shutdown(); } @@ -573,6 +575,18 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config) m_Config = Config; + std::error_code Ec; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec); + if (Ec) + { + m_AreDiskWritesBlocked.store(true); + ZEN_WARN("get disk space info FAILED, blocking disk writes, reason: '{}'", Ec.message()); + } + else + { + CheckDiskSpace(Space); + } + if (m_Config.Interval.count() && m_Config.Interval < m_Config.MonitorInterval) { m_Config.Interval = m_Config.MonitorInterval; @@ -580,7 +594,7 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config) std::filesystem::create_directories(Config.RootDirectory); - std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize); + Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize); if (Ec) { ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason '{}'", @@ -664,6 +678,29 @@ GcScheduler::Trigger(const GcScheduler::TriggerParams& Params) } void +GcScheduler::CheckDiskSpace(const DiskSpace& Space) +{ + bool AreDiskWritesBlocked = m_AreDiskWritesBlocked; + bool IsLowOnDiskSpace = (m_Config.MinimumFreeDiskSpaceToAllowWrites) != 0 && (Space.Free < m_Config.MinimumFreeDiskSpaceToAllowWrites); + if (IsLowOnDiskSpace != AreDiskWritesBlocked) + { + m_AreDiskWritesBlocked.store(IsLowOnDiskSpace); + if (IsLowOnDiskSpace) + { + ZEN_WARN("Writing to disk is blocked, free disk space: {}, minimum required {}", + NiceBytes(Space.Free), + NiceBytes(m_Config.MinimumFreeDiskSpaceToAllowWrites)); + } + else + { + ZEN_INFO("Writing to disk is unblocked, free disk space: {}, minimum required {}", + NiceBytes(Space.Free), + NiceBytes(m_Config.MinimumFreeDiskSpaceToAllowWrites)); + } + } +} + +void GcScheduler::SchedulerThread() { std::chrono::seconds WaitTime{0}; @@ -716,16 +753,21 @@ GcScheduler::SchedulerThread() GcClock::TimePoint ExpireTime = MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration; - std::error_code Ec; const GcStorageSize TotalSize = m_GcManager.TotalStorageSize(); if (Timeout && Status() == GcSchedulerStatus::kIdle) { - DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec); + std::error_code Ec; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec); if (Ec) { + m_AreDiskWritesBlocked.store(true); ZEN_WARN("get disk space info FAILED, reason: '{}'", Ec.message()); } + else + { + CheckDiskSpace(Space); + } const int64_t PressureGraphLength = 30; const std::chrono::duration LoadGraphTime = PressureGraphLength * m_Config.MonitorInterval; diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index e0354b331..fe9857e6a 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -27,6 +27,7 @@ class HashKeySet; class GcManager; class CidStore; struct IoHash; +struct DiskSpace; /** GC clock */ @@ -121,6 +122,14 @@ private: GcManager& m_Gc; }; +/** Interface for querying if we are running low on disk space, used to deny put/writes to disk + */ +class DiskWriteBlocker +{ +public: + virtual bool AreDiskWritesAllowed() const = 0; +}; + /** GC orchestrator */ class GcManager @@ -139,6 +148,9 @@ public: GcStorageSize TotalStorageSize() const; + const DiskWriteBlocker* GetDiskWriteBlocker() { return m_DiskWriteBlocker; } + void SetDiskWriteBlocker(const DiskWriteBlocker* Monitor) { m_DiskWriteBlocker = Monitor; } + #if ZEN_USE_REF_TRACKING void OnNewCidReferences(std::span<IoHash> Hashes); void OnCommittedCidReferences(std::span<IoHash> Hashes); @@ -151,7 +163,8 @@ private: mutable RwLock m_Lock; std::vector<GcContributor*> m_GcContribs; std::vector<GcStorage*> m_GcStorage; - CidStore* m_CidStore = nullptr; + CidStore* m_CidStore = nullptr; + const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; }; enum class GcSchedulerStatus : uint32_t @@ -167,10 +180,11 @@ struct GcSchedulerConfig std::chrono::seconds MonitorInterval{30}; std::chrono::seconds Interval{}; std::chrono::seconds MaxCacheDuration{86400}; - bool CollectSmallObjects = true; - bool Enabled = true; - uint64_t DiskReserveSize = 1ul << 28; - uint64_t DiskSizeSoftLimit = 0; + bool CollectSmallObjects = true; + bool Enabled = true; + uint64_t DiskReserveSize = 1ul << 28; + uint64_t DiskSizeSoftLimit = 0; + uint64_t MinimumFreeDiskSpaceToAllowWrites = 1ul << 28; }; class DiskUsageWindow @@ -196,7 +210,7 @@ public: /** * GC scheduler */ -class GcScheduler +class GcScheduler : private DiskWriteBlocker { public: GcScheduler(GcManager& GcManager); @@ -220,6 +234,8 @@ private: void CollectGarbage(const GcClock::TimePoint& ExpireTime, bool Delete, bool CollectSmallObjects); GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime); spdlog::logger& Log() { return m_Log; } + virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); } + void CheckDiskSpace(const DiskSpace& Space); spdlog::logger& m_Log; GcManager& m_GcManager; @@ -232,6 +248,7 @@ private: std::mutex m_GcMutex; std::condition_variable m_GcSignal; std::optional<TriggerParams> m_TriggerParams; + std::atomic_bool m_AreDiskWritesBlocked = false; TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog; DiskUsageWindow m_DiskUsageWindow; |