aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-05-09 15:11:10 +0200
committerGitHub <[email protected]>2023-05-09 15:11:10 +0200
commit2542797c56b84473395a877376b68fcc77687ea9 (patch)
tree698ebb1e4e6fb33ba9b8be973f8a851b2ee46c83 /src
parentValidate that entries points inside valid blocks at startup (#280) (diff)
downloadzen-2542797c56b84473395a877376b68fcc77687ea9.tar.xz
zen-2542797c56b84473395a877376b68fcc77687ea9.zip
Low disk space detector (#277)
* - Feature: Disk writes are now blocked early and return an insufficient storage error if free disk space falls below the `--low-diskspace-threshold` value * Never keep an entry in m_ChunkBlocks that points to a nullptr
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/cache/structuredcache.cpp124
-rw-r--r--src/zenserver/cache/structuredcache.h41
-rw-r--r--src/zenserver/config.cpp16
-rw-r--r--src/zenserver/config.h22
-rw-r--r--src/zenserver/projectstore/projectstore.cpp118
-rw-r--r--src/zenserver/projectstore/projectstore.h3
-rw-r--r--src/zenserver/zenserver.cpp26
-rw-r--r--src/zenstore/blockstore.cpp20
-rw-r--r--src/zenstore/gc.cpp48
-rw-r--r--src/zenstore/include/zenstore/gc.h29
10 files changed, 314 insertions, 133 deletions
diff --git a/src/zenserver/cache/structuredcache.cpp b/src/zenserver/cache/structuredcache.cpp
index 3c829f0e8..9f2a448bb 100644
--- a/src/zenserver/cache/structuredcache.cpp
+++ b/src/zenserver/cache/structuredcache.cpp
@@ -18,6 +18,7 @@
#include <zenhttp/httpserver.h>
#include <zenhttp/httpshared.h>
#include <zenhttp/httpstats.h>
+#include <zenstore/gc.h>
#include <zenutil/cache/cache.h>
#include <zenutil/cache/rpcrecording.h>
@@ -315,17 +316,19 @@ namespace {
//////////////////////////////////////////////////////////////////////////
-HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
- HttpStatsService& StatsService,
- HttpStatusService& StatusService,
- UpstreamCache& UpstreamCache)
+HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
+ UpstreamCache& UpstreamCache,
+ const DiskWriteBlocker* InDiskWriteBlocker)
: m_Log(logging::Get("cache"))
, m_CacheStore(InCacheStore)
, m_StatsService(StatsService)
, m_StatusService(StatusService)
, m_CidStore(InCidStore)
, m_UpstreamCache(UpstreamCache)
+, m_DiskWriteBlocker(InDiskWriteBlocker)
{
m_StatsService.RegisterHandler("z$", *this);
m_StatusService.RegisterHandler("z$", *this);
@@ -983,7 +986,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
bool Success = false;
const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
- const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal);
+ const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
ZenCacheValue ClientResultValue;
@@ -1097,6 +1100,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
+ ZEN_ASSERT_SLOW(StoreLocal);
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
CidStore::InsertResult InsertResult =
m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
@@ -1188,6 +1192,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
+ if (!AreDiskWritesAllowed())
+ {
+ return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
const HttpContentType ContentType = Request.RequestContentType();
@@ -1423,7 +1431,10 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons
{
if (RawHash == Ref.ValueContentId)
{
- m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ if (AreDiskWritesAllowed())
+ {
+ m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ }
Source = UpstreamResult.Source;
}
else
@@ -1492,6 +1503,10 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
+ if (!AreDiskWritesAllowed())
+ {
+ return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
Body.SetContentType(Request.RequestContentType());
@@ -1526,12 +1541,13 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
Request.WriteResponse(ResponseCode);
}
-CbPackage
+HttpResponseCode
HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
IoBuffer&& Body,
uint32_t& OutAcceptMagic,
RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId)
+ int& OutTargetProcessId,
+ CbPackage& OutResultPackage)
{
CbPackage Package;
CbObjectView Object;
@@ -1554,25 +1570,37 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
if (Method == "PutCacheRecords"sv)
{
- return HandleRpcPutCacheRecords(Package);
+ if (!AreDiskWritesAllowed())
+ {
+ return HttpResponseCode::InsufficientStorage;
+ }
+ OutResultPackage = HandleRpcPutCacheRecords(Package);
}
else if (Method == "GetCacheRecords"sv)
{
- return HandleRpcGetCacheRecords(Object);
+ OutResultPackage = HandleRpcGetCacheRecords(Object);
}
else if (Method == "PutCacheValues"sv)
{
- return HandleRpcPutCacheValues(Package);
+ if (!AreDiskWritesAllowed())
+ {
+ return HttpResponseCode::InsufficientStorage;
+ }
+ OutResultPackage = HandleRpcPutCacheValues(Package);
}
else if (Method == "GetCacheValues"sv)
{
- return HandleRpcGetCacheValues(Object);
+ OutResultPackage = HandleRpcGetCacheValues(Object);
}
else if (Method == "GetCacheChunks"sv)
{
- return HandleRpcGetCacheChunks(Object);
+ OutResultPackage = HandleRpcGetCacheChunks(Object);
}
- return CbPackage{};
+ else
+ {
+ return HttpResponseCode::BadRequest;
+ }
+ return HttpResponseCode::OK;
}
void
@@ -1594,27 +1622,30 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re
uint32_t AcceptMagic = 0;
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
int TargetPid = 0;
- CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid);
- if (AcceptMagic == kCbPkgMagic)
+ CbPackage RpcResult;
+ if (IsHttpSuccessCode(HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult)))
{
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ if (AcceptMagic == kCbPkgMagic)
{
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
{
- Flags |= FormatFlags::kDenyPartialLocalReferences;
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
}
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid);
+ ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
+ IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize());
+ ZEN_ASSERT(RpcResponseBuffer.Size() > 0);
}
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid);
- ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
- IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize());
- ZEN_ASSERT(RpcResponseBuffer.Size() > 0);
}
}
JobLatch.CountDown();
@@ -1652,10 +1683,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
uint32_t AcceptMagic = 0;
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
int TargetProcessId = 0;
- CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId);
- if (RpcResult.IsNull())
+ CbPackage RpcResult;
+
+ HttpResponseCode ResultCode =
+ HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId, RpcResult);
+ if (!IsHttpSuccessCode(ResultCode))
{
- AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ AsyncRequest.WriteResponse(ResultCode);
return;
}
if (AcceptMagic == kCbPkgMagic)
@@ -1706,6 +1740,7 @@ CbPackage
HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest)
{
ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
+
CbObjectView BatchObject = BatchRequest.GetObject();
ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
@@ -2064,7 +2099,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject);
Request.RecordObject = ObjectBuffer;
- if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal))
+ bool StoreLocal =
+ EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
{
m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}});
}
@@ -2087,13 +2124,14 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
}
if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
{
+ bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId))
{
if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
{
Request.Source = Params.Source;
Value.Exists = true;
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
+ if (StoreLocal)
{
m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
}
@@ -2411,7 +2449,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
Request.RawSize = Params.RawSize;
const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
- const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal);
+ const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
const bool IsHit = SkipData || HasData;
if (IsHit)
{
@@ -2729,7 +2767,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
Record.CacheValue.SetContentType(ZenContentType::kCbObject);
Record.Source = Params.Source;
- if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal))
+ bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
{
m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
}
@@ -2895,7 +2934,8 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
return;
}
- if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal))
+ bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
{
if (Request.IsRecordRequest)
{
@@ -3054,6 +3094,12 @@ HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request)
Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
+bool
+HttpStructuredCacheService::AreDiskWritesAllowed() const
+{
+ return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
+}
+
#if ZEN_WITH_TESTS
TEST_CASE("z$service.parse.relative.Uri")
diff --git a/src/zenserver/cache/structuredcache.h b/src/zenserver/cache/structuredcache.h
index 8309361f4..77a0aee6a 100644
--- a/src/zenserver/cache/structuredcache.h
+++ b/src/zenserver/cache/structuredcache.h
@@ -24,6 +24,7 @@ struct PutRequestData;
class ScrubContext;
class UpstreamCache;
class ZenCacheStore;
+class DiskWriteBlocker;
enum class CachePolicy : uint32_t;
enum class RpcAcceptOptions : uint16_t;
@@ -67,18 +68,18 @@ namespace cache {
class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider
{
public:
- HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
- HttpStatsService& StatsService,
- HttpStatusService& StatusService,
- UpstreamCache& UpstreamCache);
+ HttpStructuredCacheService(ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
+ UpstreamCache& UpstreamCache,
+ const DiskWriteBlocker* InDiskWriteBlocker);
~HttpStructuredCacheService();
virtual const char* BaseUri() const override;
virtual void HandleRequest(HttpServerRequest& Request) override;
-
- void Flush();
- void Scrub(ScrubContext& Ctx);
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
private:
struct CacheRef
@@ -111,16 +112,17 @@ private:
void HandleRpcRequest(HttpServerRequest& Request);
void HandleDetailsRequest(HttpServerRequest& Request);
- CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest);
- CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest);
- CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest);
- CbPackage HandleRpcRequest(const ZenContentType ContentType,
- IoBuffer&& Body,
- uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId);
+ CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest);
+ CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest);
+ CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest);
+ HttpResponseCode HandleRpcRequest(const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags,
+ int& OutTargetProcessId,
+ CbPackage& OutPackage);
void HandleCacheRequest(HttpServerRequest& Request);
void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace);
@@ -156,6 +158,8 @@ private:
/** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests);
+ bool AreDiskWritesAllowed() const;
+
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
ZenCacheStore& m_CacheStore;
@@ -167,6 +171,7 @@ private:
metrics::OperationTiming m_HttpRequests;
metrics::OperationTiming m_UpstreamGetRequestTiming;
CacheStats m_CacheStats;
+ const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index da0da2593..592de0f7f 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -506,7 +506,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_option("gc",
"",
"disk-reserve-size",
- "Size of gc disk reserve in bytes. Default set to 268435456 (256 Mb).",
+ "Size of gc disk reserve in bytes. Default set to 268435456 (256 Mb). Set to zero to disable.",
cxxopts::value<uint64_t>(ServerOptions.GcConfig.DiskReserveSize)->default_value("268435456"),
"");
@@ -519,6 +519,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_option("gc",
"",
+ "gc-low-diskspace-threshold",
+ "Minimum free space on disk to allow writes to disk. Default set to 268435456 (256 Mb). Set to zero to disable.",
+ cxxopts::value<uint64_t>(ServerOptions.GcConfig.Cache.MinimumFreeDiskSpaceToAllowWrites)->default_value("268435456"),
+ "");
+
+ options.add_option("gc",
+ "",
"gc-disksize-softlimit",
"Garbage collection disk usage soft limit. Default set to 0 (Off).",
cxxopts::value<uint64_t>(ServerOptions.GcConfig.Cache.DiskSizeSoftLimit)->default_value("0"),
@@ -867,9 +874,10 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio
if (sol::optional<sol::table> GcConfig = lua["gc"])
{
- ServerOptions.GcConfig.MonitorIntervalSeconds = GcConfig.value().get_or("monitorintervalseconds", 30);
- ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0);
- ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28));
+ ServerOptions.GcConfig.MonitorIntervalSeconds = GcConfig.value().get_or("monitorintervalseconds", 30);
+ ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0);
+ ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28));
+ ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites = GcConfig.value().get_or("lowdiskspacethreshold", 0);
if (sol::optional<sol::table> CacheGcConfig = GcConfig.value()["cache"])
{
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index 9559cae33..caf4adce7 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -69,11 +69,12 @@ struct ZenUpstreamCacheConfig
struct ZenCacheEvictionPolicy
{
- uint64_t DiskSizeLimit = ~uint64_t(0);
- uint64_t MemorySizeLimit = 1024 * 1024 * 1024;
- int32_t MaxDurationSeconds = 24 * 60 * 60;
- uint64_t DiskSizeSoftLimit = 0;
- bool Enabled = true;
+ uint64_t DiskSizeLimit = ~uint64_t(0);
+ uint64_t MemorySizeLimit = 1024 * 1024 * 1024;
+ int32_t MaxDurationSeconds = 24 * 60 * 60;
+ uint64_t DiskSizeSoftLimit = 0;
+ uint64_t MinimumFreeDiskSpaceToAllowWrites = 256u * 1024u * 1024u;
+ bool Enabled = true;
};
struct ZenCasEvictionPolicy
@@ -88,11 +89,12 @@ struct ZenGcConfig
{
ZenCasEvictionPolicy Cas;
ZenCacheEvictionPolicy Cache;
- int32_t MonitorIntervalSeconds = 30;
- int32_t IntervalSeconds = 0;
- bool CollectSmallObjects = true;
- bool Enabled = true;
- uint64_t DiskReserveSize = 1ul << 28;
+ int32_t MonitorIntervalSeconds = 30;
+ int32_t IntervalSeconds = 0;
+ bool CollectSmallObjects = true;
+ bool Enabled = true;
+ uint64_t DiskReserveSize = 1ul << 28;
+ uint64_t MinimumFreeDiskSpaceToAllowWrites = 1ul << 28;
};
struct ZenOpenIdProviderConfig
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 80e3f27f1..9dd6ddcfc 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1680,6 +1680,7 @@ ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcMa
, m_Log(logging::Get("project"))
, m_CidStore(Store)
, m_ProjectBasePath(BasePath)
+, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker())
{
ZEN_INFO("initializing project store at '{}'", BasePath);
// m_Log.set_level(spdlog::level::debug);
@@ -2560,6 +2561,10 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
if (Method == "import")
{
+ if (!AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
if (Result.second.empty())
{
@@ -2602,6 +2607,10 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
else if (Method == "putchunks")
{
+ if (!AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
std::span<const CbAttachment> Attachments = Package.GetAttachments();
for (const CbAttachment& Attachment : Attachments)
{
@@ -2676,6 +2685,12 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
return ConvertResult(Result);
}
+bool
+ProjectStore::AreDiskWritesAllowed() const
+{
+ return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
+}
+
//////////////////////////////////////////////////////////////////////////
HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatsService& StatsService, AuthMgr& AuthMgr)
@@ -2920,7 +2935,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
uint64_t Offset = 0;
uint64_t Size = ~(0ull);
- auto QueryParms = Req.ServerRequest().GetQueryParams();
+ auto QueryParms = HttpReq.GetQueryParams();
if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false)
{
@@ -2987,7 +3002,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
HttpContentType AcceptType = HttpReq.AcceptContentType();
HttpContentType RequestType = HttpReq.RequestContentType();
- switch (Req.ServerRequest().RequestVerb())
+ switch (HttpReq.RequestVerb())
{
case HttpVerb::kGet:
{
@@ -3019,6 +3034,10 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
}
case HttpVerb::kPost:
{
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
std::pair<HttpResponseCode, std::string> Result =
m_ProjectStore->PutChunk(ProjectId, OplogId, Cid, RequestType, HttpReq.ReadPayload());
if (Result.first == HttpResponseCode::OK || Result.first == HttpResponseCode::Created)
@@ -3107,6 +3126,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
const auto& ProjectId = Req.GetCapture(1);
const auto& OplogId = Req.GetCapture(2);
@@ -3252,7 +3276,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value()))
{
CbObject& Op = MaybeOp.value();
- if (Req.ServerRequest().AcceptContentType() == ZenContentType::kCbPackage)
+ if (HttpReq.AcceptContentType() == ZenContentType::kCbPackage)
{
CbPackage Package;
Package.SetObject(Op);
@@ -3319,6 +3343,8 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
m_Router.RegisterRoute(
"{project}/oplog/{log}",
[this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
const auto& ProjectId = Req.GetCapture(1);
const auto& OplogId = Req.GetCapture(2);
@@ -3326,22 +3352,22 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
if (!Project)
{
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("project {} not found", ProjectId));
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("project {} not found", ProjectId));
}
Project->TouchProject();
- switch (Req.ServerRequest().RequestVerb())
+ switch (HttpReq.RequestVerb())
{
case HttpVerb::kGet:
{
ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId);
if (!OplogIt)
{
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("oplog {} not found in project {}", OplogId, ProjectId));
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("oplog {} not found in project {}", OplogId, ProjectId));
}
Project->TouchOplog(OplogId);
@@ -3352,14 +3378,18 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
<< "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount"
<< Log.OplogCount() << "expired"sv << Project->IsExpired(GcClock::TimePoint::max(), Log);
- Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cb.Save());
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cb.Save());
}
break;
case HttpVerb::kPost:
{
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
std::filesystem::path OplogMarkerPath;
- if (CbObject Params = Req.ServerRequest().ReadPayloadObject())
+ if (CbObject Params = HttpReq.ReadPayloadObject())
{
OplogMarkerPath = Params["gcpath"sv].AsString();
}
@@ -3370,19 +3400,19 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
if (!Project->NewOplog(OplogId, OplogMarkerPath))
{
// TODO: indicate why the operation failed!
- return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError);
+ return HttpReq.WriteResponse(HttpResponseCode::InternalServerError);
}
Project->TouchOplog(OplogId);
ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath);
- return Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
+ return HttpReq.WriteResponse(HttpResponseCode::Created);
}
// I guess this should ultimately be used to execute RPCs but for now, it
// does absolutely nothing
- return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
}
break;
@@ -3392,7 +3422,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
Project->DeleteOplog(OplogId);
- return Req.ServerRequest().WriteResponse(HttpResponseCode::OK);
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
}
break;
@@ -3461,13 +3491,19 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
m_Router.RegisterRoute(
"{project}",
[this](HttpRouterRequest& Req) {
- const std::string ProjectId = Req.GetCapture(1);
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const std::string ProjectId = Req.GetCapture(1);
- switch (Req.ServerRequest().RequestVerb())
+ switch (HttpReq.RequestVerb())
{
case HttpVerb::kPost:
{
- IoBuffer Payload = Req.ServerRequest().ReadPayload();
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ IoBuffer Payload = HttpReq.ReadPayload();
CbObject Params = LoadCompactBinaryObject(Payload);
std::string_view Id = Params["id"sv].AsString();
std::string_view Root = Params["root"sv].AsString();
@@ -3487,7 +3523,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
ProjectFilePath,
ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : "");
- Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
+ HttpReq.WriteResponse(HttpResponseCode::Created);
}
break;
@@ -3496,9 +3532,9 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
if (!Project)
{
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("project {} not found", ProjectId));
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("project {} not found", ProjectId));
}
Project->TouchProject();
@@ -3520,7 +3556,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
}
Response.EndArray(); // oplogs
- Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save());
+ HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
}
break;
@@ -3529,20 +3565,20 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
if (!Project)
{
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("project {} not found", ProjectId));
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("project {} not found", ProjectId));
}
ZEN_INFO("deleting project '{}'", ProjectId);
if (!m_ProjectStore->DeleteProject(ProjectId))
{
- return Req.ServerRequest().WriteResponse(HttpResponseCode::Locked,
- HttpContentType::kText,
- fmt::format("project {} is in use", ProjectId));
+ return HttpReq.WriteResponse(HttpResponseCode::Locked,
+ HttpContentType::kText,
+ fmt::format("project {} is in use", ProjectId));
}
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NoContent);
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
}
break;
@@ -3556,14 +3592,20 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
m_Router.RegisterRoute(
"{project}/oplog/{log}/save",
[this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
if (HttpReq.RequestContentType() != HttpContentType::kCbObject)
{
return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type");
}
- IoBuffer Payload = Req.ServerRequest().ReadPayload();
+ IoBuffer Payload = HttpReq.ReadPayload();
CbObject Response;
std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->WriteOplog(ProjectId, OplogId, std::move(Payload), Response);
@@ -3590,11 +3632,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
{
return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type");
}
- IoBuffer Payload = Req.ServerRequest().ReadPayload();
+ IoBuffer Payload = HttpReq.ReadPayload();
CbObject Response;
std::pair<HttpResponseCode, std::string> Result =
- m_ProjectStore->ReadOplog(ProjectId, OplogId, Req.ServerRequest().GetQueryParams(), Response);
+ m_ProjectStore->ReadOplog(ProjectId, OplogId, HttpReq.GetQueryParams(), Response);
if (Result.first == HttpResponseCode::OK)
{
return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
@@ -3615,7 +3657,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
const auto& ProjectId = Req.GetCapture(1);
const auto& OplogId = Req.GetCapture(2);
- IoBuffer Payload = Req.ServerRequest().ReadPayload();
+ IoBuffer Payload = HttpReq.ReadPayload();
m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr);
},
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 7b0c96205..b446de543 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -322,12 +322,15 @@ public:
CbObjectView&& Params,
AuthMgr& AuthManager);
+ bool AreDiskWritesAllowed() const;
+
private:
spdlog::logger& m_Log;
CidStore& m_CidStore;
std::filesystem::path m_ProjectBasePath;
mutable RwLock m_ProjectsLock;
std::map<std::string, Ref<Project>> m_Projects;
+ const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
std::filesystem::path BasePathForProject(std::string_view ProjectId);
};
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 75a49367c..c6ec4b82b 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -394,14 +394,16 @@ public:
}
ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds);
- zen::GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc",
- .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds),
- .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds),
- .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds),
- .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects,
- .Enabled = ServerOptions.GcConfig.Enabled,
- .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize,
- .DiskSizeSoftLimit = ServerOptions.GcConfig.Cache.DiskSizeSoftLimit};
+ zen::GcSchedulerConfig GcConfig{
+ .RootDirectory = m_DataRoot / "gc",
+ .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds),
+ .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds),
+ .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds),
+ .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects,
+ .Enabled = ServerOptions.GcConfig.Enabled,
+ .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize,
+ .DiskSizeSoftLimit = ServerOptions.GcConfig.Cache.DiskSizeSoftLimit,
+ .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.Cache.MinimumFreeDiskSpaceToAllowWrites};
m_GcScheduler.Initialize(GcConfig);
return EffectiveBasePort;
@@ -845,8 +847,12 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
}
}
- m_StructuredCacheService =
- std::make_unique<HttpStructuredCacheService>(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, *m_UpstreamCache);
+ m_StructuredCacheService = std::make_unique<HttpStructuredCacheService>(*m_CacheStore,
+ *m_CidStore,
+ m_StatsService,
+ m_StatusService,
+ *m_UpstreamCache,
+ m_GcManager.GetDiskWriteBlocker());
m_Http->RegisterService(*m_StructuredCacheService);
m_Http->RegisterService(*m_UpstreamService);
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 05bc69fcb..378d9fd52 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -472,7 +472,10 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
WriteBlockTimeUs += ElapsedUs;
WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
- OldBlockFile = m_ChunkBlocks[BlockIndex];
+ if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end())
+ {
+ OldBlockFile = It->second;
+ }
}
if (!OldBlockFile)
@@ -504,8 +507,9 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
});
if (OldBlockFile)
{
- m_ChunkBlocks[BlockIndex] = nullptr;
ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile);
+ m_ChunkBlocks.erase(BlockIndex);
m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
OldBlockFile->MarkAsDeleteOnClose();
}
@@ -582,7 +586,9 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
ReadBlockTimeUs += ElapsedUs;
ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
+ ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
m_ChunkBlocks.erase(NextBlockIndex);
+ NewBlockFile->MarkAsDeleteOnClose();
return;
}
@@ -627,8 +633,9 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
ReadBlockTimeUs += ElapsedUs;
ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
- m_ChunkBlocks[BlockIndex] = nullptr;
ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile);
+ m_ChunkBlocks.erase(BlockIndex);
m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
OldBlockFile->MarkAsDeleteOnClose();
}
@@ -704,8 +711,8 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
size_t ChunkIndex = LocationIndexes[LocationIndexOffset];
const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex];
- const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[FirstLocation.BlockIndex];
- if (!BlockFile)
+ auto FindBlockIt = m_ChunkBlocks.find(FirstLocation.BlockIndex);
+ if (FindBlockIt == m_ChunkBlocks.end())
{
while (ChunkLocations[ChunkIndex].BlockIndex == FirstLocation.BlockIndex)
{
@@ -719,6 +726,9 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
}
continue;
}
+ const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second;
+ ZEN_ASSERT(BlockFile);
+
size_t BlockSize = BlockFile->FileSize();
size_t RangeCount = GetNextRange(LocationIndexOffset);
if (RangeCount > 0)
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index 370c3c965..f9888722b 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -559,10 +559,12 @@ DiskUsageWindow::FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick
GcScheduler::GcScheduler(GcManager& GcManager) : m_Log(logging::Get("gc")), m_GcManager(GcManager)
{
+ m_GcManager.SetDiskWriteBlocker(this);
}
GcScheduler::~GcScheduler()
{
+ m_GcManager.SetDiskWriteBlocker(nullptr);
Shutdown();
}
@@ -573,6 +575,18 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config)
m_Config = Config;
+ std::error_code Ec;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec);
+ if (Ec)
+ {
+ m_AreDiskWritesBlocked.store(true);
+ ZEN_WARN("get disk space info FAILED, blocking disk writes, reason: '{}'", Ec.message());
+ }
+ else
+ {
+ CheckDiskSpace(Space);
+ }
+
if (m_Config.Interval.count() && m_Config.Interval < m_Config.MonitorInterval)
{
m_Config.Interval = m_Config.MonitorInterval;
@@ -580,7 +594,7 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config)
std::filesystem::create_directories(Config.RootDirectory);
- std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize);
+ Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize);
if (Ec)
{
ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason '{}'",
@@ -664,6 +678,29 @@ GcScheduler::Trigger(const GcScheduler::TriggerParams& Params)
}
void
+GcScheduler::CheckDiskSpace(const DiskSpace& Space)
+{
+ bool AreDiskWritesBlocked = m_AreDiskWritesBlocked;
+ bool IsLowOnDiskSpace = (m_Config.MinimumFreeDiskSpaceToAllowWrites) != 0 && (Space.Free < m_Config.MinimumFreeDiskSpaceToAllowWrites);
+ if (IsLowOnDiskSpace != AreDiskWritesBlocked)
+ {
+ m_AreDiskWritesBlocked.store(IsLowOnDiskSpace);
+ if (IsLowOnDiskSpace)
+ {
+ ZEN_WARN("Writing to disk is blocked, free disk space: {}, minimum required {}",
+ NiceBytes(Space.Free),
+ NiceBytes(m_Config.MinimumFreeDiskSpaceToAllowWrites));
+ }
+ else
+ {
+ ZEN_INFO("Writing to disk is unblocked, free disk space: {}, minimum required {}",
+ NiceBytes(Space.Free),
+ NiceBytes(m_Config.MinimumFreeDiskSpaceToAllowWrites));
+ }
+ }
+}
+
+void
GcScheduler::SchedulerThread()
{
std::chrono::seconds WaitTime{0};
@@ -716,16 +753,21 @@ GcScheduler::SchedulerThread()
GcClock::TimePoint ExpireTime = MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration;
- std::error_code Ec;
const GcStorageSize TotalSize = m_GcManager.TotalStorageSize();
if (Timeout && Status() == GcSchedulerStatus::kIdle)
{
- DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec);
+ std::error_code Ec;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec);
if (Ec)
{
+ m_AreDiskWritesBlocked.store(true);
ZEN_WARN("get disk space info FAILED, reason: '{}'", Ec.message());
}
+ else
+ {
+ CheckDiskSpace(Space);
+ }
const int64_t PressureGraphLength = 30;
const std::chrono::duration LoadGraphTime = PressureGraphLength * m_Config.MonitorInterval;
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index e0354b331..fe9857e6a 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -27,6 +27,7 @@ class HashKeySet;
class GcManager;
class CidStore;
struct IoHash;
+struct DiskSpace;
/** GC clock
*/
@@ -121,6 +122,14 @@ private:
GcManager& m_Gc;
};
+/** Interface for querying if we are running low on disk space, used to deny put/writes to disk
+ */
+class DiskWriteBlocker
+{
+public:
+ virtual bool AreDiskWritesAllowed() const = 0;
+};
+
/** GC orchestrator
*/
class GcManager
@@ -139,6 +148,9 @@ public:
GcStorageSize TotalStorageSize() const;
+ const DiskWriteBlocker* GetDiskWriteBlocker() { return m_DiskWriteBlocker; }
+ void SetDiskWriteBlocker(const DiskWriteBlocker* Monitor) { m_DiskWriteBlocker = Monitor; }
+
#if ZEN_USE_REF_TRACKING
void OnNewCidReferences(std::span<IoHash> Hashes);
void OnCommittedCidReferences(std::span<IoHash> Hashes);
@@ -151,7 +163,8 @@ private:
mutable RwLock m_Lock;
std::vector<GcContributor*> m_GcContribs;
std::vector<GcStorage*> m_GcStorage;
- CidStore* m_CidStore = nullptr;
+ CidStore* m_CidStore = nullptr;
+ const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
};
enum class GcSchedulerStatus : uint32_t
@@ -167,10 +180,11 @@ struct GcSchedulerConfig
std::chrono::seconds MonitorInterval{30};
std::chrono::seconds Interval{};
std::chrono::seconds MaxCacheDuration{86400};
- bool CollectSmallObjects = true;
- bool Enabled = true;
- uint64_t DiskReserveSize = 1ul << 28;
- uint64_t DiskSizeSoftLimit = 0;
+ bool CollectSmallObjects = true;
+ bool Enabled = true;
+ uint64_t DiskReserveSize = 1ul << 28;
+ uint64_t DiskSizeSoftLimit = 0;
+ uint64_t MinimumFreeDiskSpaceToAllowWrites = 1ul << 28;
};
class DiskUsageWindow
@@ -196,7 +210,7 @@ public:
/**
* GC scheduler
*/
-class GcScheduler
+class GcScheduler : private DiskWriteBlocker
{
public:
GcScheduler(GcManager& GcManager);
@@ -220,6 +234,8 @@ private:
void CollectGarbage(const GcClock::TimePoint& ExpireTime, bool Delete, bool CollectSmallObjects);
GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime);
spdlog::logger& Log() { return m_Log; }
+ virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); }
+ void CheckDiskSpace(const DiskSpace& Space);
spdlog::logger& m_Log;
GcManager& m_GcManager;
@@ -232,6 +248,7 @@ private:
std::mutex m_GcMutex;
std::condition_variable m_GcSignal;
std::optional<TriggerParams> m_TriggerParams;
+ std::atomic_bool m_AreDiskWritesBlocked = false;
TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog;
DiskUsageWindow m_DiskUsageWindow;