diff options
| author | Stefan Boberg <[email protected]> | 2023-05-09 15:33:38 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2023-05-09 15:33:38 +0200 |
| commit | 15f361ad8549a4c0909d164943f9ddd8a714756c (patch) | |
| tree | e3e4f37778015ea12a14b610039906a66377d3a4 /src/zenserver/cache | |
| parent | make logging tests run as part of zencore-test (diff) | |
| parent | Low disk space detector (#277) (diff) | |
| download | zen-15f361ad8549a4c0909d164943f9ddd8a714756c.tar.xz zen-15f361ad8549a4c0909d164943f9ddd8a714756c.zip | |
Merge branch 'main' of https://github.com/EpicGames/zen
Diffstat (limited to 'src/zenserver/cache')
| -rw-r--r-- | src/zenserver/cache/structuredcache.cpp | 124 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcache.h | 41 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 48 |
3 files changed, 152 insertions, 61 deletions
diff --git a/src/zenserver/cache/structuredcache.cpp b/src/zenserver/cache/structuredcache.cpp index 3c829f0e8..9f2a448bb 100644 --- a/src/zenserver/cache/structuredcache.cpp +++ b/src/zenserver/cache/structuredcache.cpp @@ -18,6 +18,7 @@ #include <zenhttp/httpserver.h> #include <zenhttp/httpshared.h> #include <zenhttp/httpstats.h> +#include <zenstore/gc.h> #include <zenutil/cache/cache.h> #include <zenutil/cache/rpcrecording.h> @@ -315,17 +316,19 @@ namespace { ////////////////////////////////////////////////////////////////////////// -HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - UpstreamCache& UpstreamCache) +HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache, + const DiskWriteBlocker* InDiskWriteBlocker) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) , m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) +, m_DiskWriteBlocker(InDiskWriteBlocker) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); @@ -983,7 +986,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con bool Success = false; const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal); + const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); ZenCacheValue ClientResultValue; @@ -1097,6 +1100,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { + ZEN_ASSERT_SLOW(StoreLocal); CompressedBuffer Chunk = Attachment->AsCompressedBinary(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); @@ -1188,6 +1192,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con { return Request.WriteResponse(HttpResponseCode::BadRequest); } + if (!AreDiskWritesAllowed()) + { + return Request.WriteResponse(HttpResponseCode::InsufficientStorage); + } const HttpContentType ContentType = Request.RequestContentType(); @@ -1423,7 +1431,10 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { if (RawHash == Ref.ValueContentId) { - m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + if (AreDiskWritesAllowed()) + { + m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + } Source = UpstreamResult.Source; } else @@ -1492,6 +1503,10 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons { return Request.WriteResponse(HttpResponseCode::BadRequest); } + if (!AreDiskWritesAllowed()) + { + return Request.WriteResponse(HttpResponseCode::InsufficientStorage); + } Body.SetContentType(Request.RequestContentType()); @@ -1526,12 +1541,13 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons Request.WriteResponse(ResponseCode); } -CbPackage +HttpResponseCode HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, IoBuffer&& Body, uint32_t& OutAcceptMagic, RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId) + int& OutTargetProcessId, + CbPackage& OutResultPackage) { CbPackage Package; CbObjectView Object; @@ -1554,25 +1570,37 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, if (Method == "PutCacheRecords"sv) { - return HandleRpcPutCacheRecords(Package); + if (!AreDiskWritesAllowed()) + { + return HttpResponseCode::InsufficientStorage; + } + OutResultPackage = HandleRpcPutCacheRecords(Package); } else if (Method == "GetCacheRecords"sv) { - return HandleRpcGetCacheRecords(Object); + OutResultPackage = HandleRpcGetCacheRecords(Object); } else if (Method == "PutCacheValues"sv) { - return HandleRpcPutCacheValues(Package); + if (!AreDiskWritesAllowed()) + { + return HttpResponseCode::InsufficientStorage; + } + OutResultPackage = HandleRpcPutCacheValues(Package); } else if (Method == "GetCacheValues"sv) { - return HandleRpcGetCacheValues(Object); + OutResultPackage = HandleRpcGetCacheValues(Object); } else if (Method == "GetCacheChunks"sv) { - return HandleRpcGetCacheChunks(Object); + OutResultPackage = HandleRpcGetCacheChunks(Object); } - return CbPackage{}; + else + { + return HttpResponseCode::BadRequest; + } + return HttpResponseCode::OK; } void @@ -1594,27 +1622,30 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetPid = 0; - CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid); - if (AcceptMagic == kCbPkgMagic) + CbPackage RpcResult; + if (IsHttpSuccessCode(HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult))) { - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + if (AcceptMagic == kCbPkgMagic) { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) { - Flags |= FormatFlags::kDenyPartialLocalReferences; + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid); + ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); + } + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); + IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); + ZEN_ASSERT(RpcResponseBuffer.Size() > 0); } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid); - ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); - IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); - ZEN_ASSERT(RpcResponseBuffer.Size() > 0); } } JobLatch.CountDown(); @@ -1652,10 +1683,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetProcessId = 0; - CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId); - if (RpcResult.IsNull()) + CbPackage RpcResult; + + HttpResponseCode ResultCode = + HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId, RpcResult); + if (!IsHttpSuccessCode(ResultCode)) { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + AsyncRequest.WriteResponse(ResultCode); return; } if (AcceptMagic == kCbPkgMagic) @@ -1706,6 +1740,7 @@ CbPackage HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest) { ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); + CbObjectView BatchObject = BatchRequest.GetObject(); ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); @@ -2064,7 +2099,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject); Request.RecordObject = ObjectBuffer; - if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) + bool StoreLocal = + EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) { m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); } @@ -2087,13 +2124,14 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { + bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId)) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { Request.Source = Params.Source; Value.Exists = true; - if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) + if (StoreLocal) { m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } @@ -2411,7 +2449,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) Request.RawSize = Params.RawSize; const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); - const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal); + const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); const bool IsHit = SkipData || HasData; if (IsHit) { @@ -2729,7 +2767,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac Record.CacheValue.SetContentType(ZenContentType::kCbObject); Record.Source = Params.Source; - if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) + bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) { m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } @@ -2895,7 +2934,8 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names return; } - if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal)) + bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) { if (Request.IsRecordRequest) { @@ -3054,6 +3094,12 @@ HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request) Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } +bool +HttpStructuredCacheService::AreDiskWritesAllowed() const +{ + return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); +} + #if ZEN_WITH_TESTS TEST_CASE("z$service.parse.relative.Uri") diff --git a/src/zenserver/cache/structuredcache.h b/src/zenserver/cache/structuredcache.h index 8309361f4..77a0aee6a 100644 --- a/src/zenserver/cache/structuredcache.h +++ b/src/zenserver/cache/structuredcache.h @@ -24,6 +24,7 @@ struct PutRequestData; class ScrubContext; class UpstreamCache; class ZenCacheStore; +class DiskWriteBlocker; enum class CachePolicy : uint32_t; enum class RpcAcceptOptions : uint16_t; @@ -67,18 +68,18 @@ namespace cache { class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider { public: - HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - UpstreamCache& UpstreamCache); + HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache, + const DiskWriteBlocker* InDiskWriteBlocker); ~HttpStructuredCacheService(); virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; - - void Flush(); - void Scrub(ScrubContext& Ctx); + void Flush(); + void Scrub(ScrubContext& Ctx); private: struct CacheRef @@ -111,16 +112,17 @@ private: void HandleRpcRequest(HttpServerRequest& Request); void HandleDetailsRequest(HttpServerRequest& Request); - CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest); - CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest); - CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest); - CbPackage HandleRpcRequest(const ZenContentType ContentType, - IoBuffer&& Body, - uint32_t& OutAcceptMagic, - RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId); + CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest); + CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest); + CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest); + HttpResponseCode HandleRpcRequest(const ZenContentType ContentType, + IoBuffer&& Body, + uint32_t& OutAcceptMagic, + RpcAcceptOptions& OutAcceptFlags, + int& OutTargetProcessId, + CbPackage& OutPackage); void HandleCacheRequest(HttpServerRequest& Request); void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace); @@ -156,6 +158,8 @@ private: /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests); + bool AreDiskWritesAllowed() const; + spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; ZenCacheStore& m_CacheStore; @@ -167,6 +171,7 @@ private: metrics::OperationTiming m_HttpRequests; metrics::OperationTiming m_UpstreamGetRequestTiming; CacheStats m_CacheStats; + const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index 26e970073..99ca23407 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -1002,6 +1002,11 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) fs::remove_all(m_BlocksBasePath); } + CreateDirectories(m_BucketDir); + + std::unordered_map<uint32_t, uint64_t> BlockSizes = + m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); + uint64_t LogEntryCount = 0; { uint32_t IndexVersion = 0; @@ -1023,12 +1028,11 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) } } - CreateDirectories(m_BucketDir); - m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); std::vector<BlockStoreLocation> KnownLocations; KnownLocations.reserve(m_Index.size()); + std::vector<DiskIndexEntry> BadEntries; for (const auto& Entry : m_Index) { size_t EntryIndex = Entry.second; @@ -1041,10 +1045,46 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) continue; } const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); - KnownLocations.push_back(BlockLocation); + + auto BlockIt = BlockSizes.find(BlockLocation.BlockIndex); + if (BlockIt == BlockSizes.end()) + { + ZEN_WARN("Unknown block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + } + else + { + uint64_t BlockSize = BlockIt->second; + if (BlockLocation.Offset + BlockLocation.Size > BlockSize) + { + ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + } + else + { + KnownLocations.push_back(BlockLocation); + continue; + } + } + + DiskLocation NewLocation = Payload.Location; + NewLocation.Flags |= DiskLocation::kTombStone; + BadEntries.push_back(DiskIndexEntry{.Key = Entry.first, .Location = NewLocation}); } - m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); + if (!BadEntries.empty()) + { + m_SlogFile.Append(BadEntries); + m_SlogFile.Flush(); + + LogEntryCount += BadEntries.size(); + + for (const DiskIndexEntry& BadEntry : BadEntries) + { + m_Index.erase(BadEntry.Key); + } + } + + m_BlockStore.Prune(KnownLocations); + if (IsNew || LogEntryCount > 0) { MakeIndexSnapshot(); |