diff options
| author | Dan Engelbrecht <[email protected]> | 2023-05-09 15:11:10 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-09 15:11:10 +0200 |
| commit | 2542797c56b84473395a877376b68fcc77687ea9 (patch) | |
| tree | 698ebb1e4e6fb33ba9b8be973f8a851b2ee46c83 /src/zenserver/cache/structuredcache.cpp | |
| parent | Validate that entries points inside valid blocks at startup (#280) (diff) | |
| download | zen-2542797c56b84473395a877376b68fcc77687ea9.tar.xz zen-2542797c56b84473395a877376b68fcc77687ea9.zip | |
Low disk space detector (#277)
* - Feature: Disk writes are now blocked early and return an insufficient storage error if free disk space falls below the `--low-diskspace-threshold` value
* Never keep an entry in m_ChunkBlocks that points to a nullptr
Diffstat (limited to 'src/zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | src/zenserver/cache/structuredcache.cpp | 124 |
1 files changed, 85 insertions, 39 deletions
diff --git a/src/zenserver/cache/structuredcache.cpp b/src/zenserver/cache/structuredcache.cpp index 3c829f0e8..9f2a448bb 100644 --- a/src/zenserver/cache/structuredcache.cpp +++ b/src/zenserver/cache/structuredcache.cpp @@ -18,6 +18,7 @@ #include <zenhttp/httpserver.h> #include <zenhttp/httpshared.h> #include <zenhttp/httpstats.h> +#include <zenstore/gc.h> #include <zenutil/cache/cache.h> #include <zenutil/cache/rpcrecording.h> @@ -315,17 +316,19 @@ namespace { ////////////////////////////////////////////////////////////////////////// -HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - UpstreamCache& UpstreamCache) +HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache, + const DiskWriteBlocker* InDiskWriteBlocker) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) , m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) +, m_DiskWriteBlocker(InDiskWriteBlocker) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); @@ -983,7 +986,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con bool Success = false; const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal); + const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); ZenCacheValue ClientResultValue; @@ -1097,6 +1100,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { + ZEN_ASSERT_SLOW(StoreLocal); CompressedBuffer Chunk = Attachment->AsCompressedBinary(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); @@ -1188,6 +1192,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con { return Request.WriteResponse(HttpResponseCode::BadRequest); } + if (!AreDiskWritesAllowed()) + { + return Request.WriteResponse(HttpResponseCode::InsufficientStorage); + } const HttpContentType ContentType = Request.RequestContentType(); @@ -1423,7 +1431,10 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { if (RawHash == Ref.ValueContentId) { - m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + if (AreDiskWritesAllowed()) + { + m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + } Source = UpstreamResult.Source; } else @@ -1492,6 +1503,10 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons { return Request.WriteResponse(HttpResponseCode::BadRequest); } + if (!AreDiskWritesAllowed()) + { + return Request.WriteResponse(HttpResponseCode::InsufficientStorage); + } Body.SetContentType(Request.RequestContentType()); @@ -1526,12 +1541,13 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons Request.WriteResponse(ResponseCode); } -CbPackage +HttpResponseCode HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, IoBuffer&& Body, uint32_t& OutAcceptMagic, RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId) + int& OutTargetProcessId, + CbPackage& OutResultPackage) { CbPackage Package; CbObjectView Object; @@ -1554,25 +1570,37 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, if (Method == "PutCacheRecords"sv) { - return HandleRpcPutCacheRecords(Package); + if (!AreDiskWritesAllowed()) + { + return HttpResponseCode::InsufficientStorage; + } + OutResultPackage = HandleRpcPutCacheRecords(Package); } else if (Method == "GetCacheRecords"sv) { - return HandleRpcGetCacheRecords(Object); + OutResultPackage = HandleRpcGetCacheRecords(Object); } else if (Method == "PutCacheValues"sv) { - return HandleRpcPutCacheValues(Package); + if (!AreDiskWritesAllowed()) + { + return HttpResponseCode::InsufficientStorage; + } + OutResultPackage = HandleRpcPutCacheValues(Package); } else if (Method == "GetCacheValues"sv) { - return HandleRpcGetCacheValues(Object); + OutResultPackage = HandleRpcGetCacheValues(Object); } else if (Method == "GetCacheChunks"sv) { - return HandleRpcGetCacheChunks(Object); + OutResultPackage = HandleRpcGetCacheChunks(Object); } - return CbPackage{}; + else + { + return HttpResponseCode::BadRequest; + } + return HttpResponseCode::OK; } void @@ -1594,27 +1622,30 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetPid = 0; - CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid); - if (AcceptMagic == kCbPkgMagic) + CbPackage RpcResult; + if (IsHttpSuccessCode(HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult))) { - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + if (AcceptMagic == kCbPkgMagic) { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) { - Flags |= FormatFlags::kDenyPartialLocalReferences; + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid); + ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); + } + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); + IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); + ZEN_ASSERT(RpcResponseBuffer.Size() > 0); } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid); - ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); - IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); - ZEN_ASSERT(RpcResponseBuffer.Size() > 0); } } JobLatch.CountDown(); @@ -1652,10 +1683,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetProcessId = 0; - CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId); - if (RpcResult.IsNull()) + CbPackage RpcResult; + + HttpResponseCode ResultCode = + HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId, RpcResult); + if (!IsHttpSuccessCode(ResultCode)) { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + AsyncRequest.WriteResponse(ResultCode); return; } if (AcceptMagic == kCbPkgMagic) @@ -1706,6 +1740,7 @@ CbPackage HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest) { ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); + CbObjectView BatchObject = BatchRequest.GetObject(); ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); @@ -2064,7 +2099,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject); Request.RecordObject = ObjectBuffer; - if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) + bool StoreLocal = + EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) { m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); } @@ -2087,13 +2124,14 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { + bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId)) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { Request.Source = Params.Source; Value.Exists = true; - if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) + if (StoreLocal) { m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } @@ -2411,7 +2449,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) Request.RawSize = Params.RawSize; const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); - const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal); + const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); const bool IsHit = SkipData || HasData; if (IsHit) { @@ -2729,7 +2767,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac Record.CacheValue.SetContentType(ZenContentType::kCbObject); Record.Source = Params.Source; - if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) + bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) { m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } @@ -2895,7 +2934,8 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names return; } - if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal)) + bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) { if (Request.IsRecordRequest) { @@ -3054,6 +3094,12 @@ HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request) Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } +bool +HttpStructuredCacheService::AreDiskWritesAllowed() const +{ + return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); +} + #if ZEN_WITH_TESTS TEST_CASE("z$service.parse.relative.Uri") |