aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-05-09 15:11:10 +0200
committerGitHub <[email protected]>2023-05-09 15:11:10 +0200
commit2542797c56b84473395a877376b68fcc77687ea9 (patch)
tree698ebb1e4e6fb33ba9b8be973f8a851b2ee46c83 /src/zenserver/cache/structuredcache.cpp
parentValidate that entries points inside valid blocks at startup (#280) (diff)
downloadzen-2542797c56b84473395a877376b68fcc77687ea9.tar.xz
zen-2542797c56b84473395a877376b68fcc77687ea9.zip
Low disk space detector (#277)
* - Feature: Disk writes are now blocked early and return an insufficient storage error if free disk space falls below the `--low-diskspace-threshold` value * Never keep an entry in m_ChunkBlocks that points to a nullptr
Diffstat (limited to 'src/zenserver/cache/structuredcache.cpp')
-rw-r--r--src/zenserver/cache/structuredcache.cpp124
1 files changed, 85 insertions, 39 deletions
diff --git a/src/zenserver/cache/structuredcache.cpp b/src/zenserver/cache/structuredcache.cpp
index 3c829f0e8..9f2a448bb 100644
--- a/src/zenserver/cache/structuredcache.cpp
+++ b/src/zenserver/cache/structuredcache.cpp
@@ -18,6 +18,7 @@
#include <zenhttp/httpserver.h>
#include <zenhttp/httpshared.h>
#include <zenhttp/httpstats.h>
+#include <zenstore/gc.h>
#include <zenutil/cache/cache.h>
#include <zenutil/cache/rpcrecording.h>
@@ -315,17 +316,19 @@ namespace {
//////////////////////////////////////////////////////////////////////////
-HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
- HttpStatsService& StatsService,
- HttpStatusService& StatusService,
- UpstreamCache& UpstreamCache)
+HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
+ UpstreamCache& UpstreamCache,
+ const DiskWriteBlocker* InDiskWriteBlocker)
: m_Log(logging::Get("cache"))
, m_CacheStore(InCacheStore)
, m_StatsService(StatsService)
, m_StatusService(StatusService)
, m_CidStore(InCidStore)
, m_UpstreamCache(UpstreamCache)
+, m_DiskWriteBlocker(InDiskWriteBlocker)
{
m_StatsService.RegisterHandler("z$", *this);
m_StatusService.RegisterHandler("z$", *this);
@@ -983,7 +986,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
bool Success = false;
const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
- const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal);
+ const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
ZenCacheValue ClientResultValue;
@@ -1097,6 +1100,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
+ ZEN_ASSERT_SLOW(StoreLocal);
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
CidStore::InsertResult InsertResult =
m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
@@ -1188,6 +1192,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
+ if (!AreDiskWritesAllowed())
+ {
+ return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
const HttpContentType ContentType = Request.RequestContentType();
@@ -1423,7 +1431,10 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons
{
if (RawHash == Ref.ValueContentId)
{
- m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ if (AreDiskWritesAllowed())
+ {
+ m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ }
Source = UpstreamResult.Source;
}
else
@@ -1492,6 +1503,10 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
+ if (!AreDiskWritesAllowed())
+ {
+ return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
Body.SetContentType(Request.RequestContentType());
@@ -1526,12 +1541,13 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
Request.WriteResponse(ResponseCode);
}
-CbPackage
+HttpResponseCode
HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
IoBuffer&& Body,
uint32_t& OutAcceptMagic,
RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId)
+ int& OutTargetProcessId,
+ CbPackage& OutResultPackage)
{
CbPackage Package;
CbObjectView Object;
@@ -1554,25 +1570,37 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
if (Method == "PutCacheRecords"sv)
{
- return HandleRpcPutCacheRecords(Package);
+ if (!AreDiskWritesAllowed())
+ {
+ return HttpResponseCode::InsufficientStorage;
+ }
+ OutResultPackage = HandleRpcPutCacheRecords(Package);
}
else if (Method == "GetCacheRecords"sv)
{
- return HandleRpcGetCacheRecords(Object);
+ OutResultPackage = HandleRpcGetCacheRecords(Object);
}
else if (Method == "PutCacheValues"sv)
{
- return HandleRpcPutCacheValues(Package);
+ if (!AreDiskWritesAllowed())
+ {
+ return HttpResponseCode::InsufficientStorage;
+ }
+ OutResultPackage = HandleRpcPutCacheValues(Package);
}
else if (Method == "GetCacheValues"sv)
{
- return HandleRpcGetCacheValues(Object);
+ OutResultPackage = HandleRpcGetCacheValues(Object);
}
else if (Method == "GetCacheChunks"sv)
{
- return HandleRpcGetCacheChunks(Object);
+ OutResultPackage = HandleRpcGetCacheChunks(Object);
}
- return CbPackage{};
+ else
+ {
+ return HttpResponseCode::BadRequest;
+ }
+ return HttpResponseCode::OK;
}
void
@@ -1594,27 +1622,30 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re
uint32_t AcceptMagic = 0;
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
int TargetPid = 0;
- CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid);
- if (AcceptMagic == kCbPkgMagic)
+ CbPackage RpcResult;
+ if (IsHttpSuccessCode(HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult)))
{
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ if (AcceptMagic == kCbPkgMagic)
{
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
{
- Flags |= FormatFlags::kDenyPartialLocalReferences;
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
}
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid);
+ ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
+ IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize());
+ ZEN_ASSERT(RpcResponseBuffer.Size() > 0);
}
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid);
- ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
- IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize());
- ZEN_ASSERT(RpcResponseBuffer.Size() > 0);
}
}
JobLatch.CountDown();
@@ -1652,10 +1683,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
uint32_t AcceptMagic = 0;
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
int TargetProcessId = 0;
- CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId);
- if (RpcResult.IsNull())
+ CbPackage RpcResult;
+
+ HttpResponseCode ResultCode =
+ HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId, RpcResult);
+ if (!IsHttpSuccessCode(ResultCode))
{
- AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ AsyncRequest.WriteResponse(ResultCode);
return;
}
if (AcceptMagic == kCbPkgMagic)
@@ -1706,6 +1740,7 @@ CbPackage
HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest)
{
ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
+
CbObjectView BatchObject = BatchRequest.GetObject();
ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
@@ -2064,7 +2099,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject);
Request.RecordObject = ObjectBuffer;
- if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal))
+ bool StoreLocal =
+ EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
{
m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}});
}
@@ -2087,13 +2124,14 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
}
if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
{
+ bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId))
{
if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
{
Request.Source = Params.Source;
Value.Exists = true;
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
+ if (StoreLocal)
{
m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
}
@@ -2411,7 +2449,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
Request.RawSize = Params.RawSize;
const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
- const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal);
+ const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
const bool IsHit = SkipData || HasData;
if (IsHit)
{
@@ -2729,7 +2767,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
Record.CacheValue.SetContentType(ZenContentType::kCbObject);
Record.Source = Params.Source;
- if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal))
+ bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
{
m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
}
@@ -2895,7 +2934,8 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
return;
}
- if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal))
+ bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
{
if (Request.IsRecordRequest)
{
@@ -3054,6 +3094,12 @@ HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request)
Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
+bool
+HttpStructuredCacheService::AreDiskWritesAllowed() const
+{
+ return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
+}
+
#if ZEN_WITH_TESTS
TEST_CASE("z$service.parse.relative.Uri")