diff options
| author | Stefan Boberg <[email protected]> | 2023-05-09 15:33:38 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2023-05-09 15:33:38 +0200 |
| commit | 15f361ad8549a4c0909d164943f9ddd8a714756c (patch) | |
| tree | e3e4f37778015ea12a14b610039906a66377d3a4 /src | |
| parent | make logging tests run as part of zencore-test (diff) | |
| parent | Low disk space detector (#277) (diff) | |
| download | zen-15f361ad8549a4c0909d164943f9ddd8a714756c.tar.xz zen-15f361ad8549a4c0909d164943f9ddd8a714756c.zip | |
Merge branch 'main' of https://github.com/EpicGames/zen
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/cache/structuredcache.cpp | 124 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcache.h | 41 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 48 | ||||
| -rw-r--r-- | src/zenserver/config.cpp | 16 | ||||
| -rw-r--r-- | src/zenserver/config.h | 22 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 118 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 3 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 26 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 99 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 47 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 48 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 9 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 29 |
13 files changed, 451 insertions, 179 deletions
diff --git a/src/zenserver/cache/structuredcache.cpp b/src/zenserver/cache/structuredcache.cpp index 3c829f0e8..9f2a448bb 100644 --- a/src/zenserver/cache/structuredcache.cpp +++ b/src/zenserver/cache/structuredcache.cpp @@ -18,6 +18,7 @@ #include <zenhttp/httpserver.h> #include <zenhttp/httpshared.h> #include <zenhttp/httpstats.h> +#include <zenstore/gc.h> #include <zenutil/cache/cache.h> #include <zenutil/cache/rpcrecording.h> @@ -315,17 +316,19 @@ namespace { ////////////////////////////////////////////////////////////////////////// -HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - UpstreamCache& UpstreamCache) +HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache, + const DiskWriteBlocker* InDiskWriteBlocker) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) , m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) +, m_DiskWriteBlocker(InDiskWriteBlocker) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); @@ -983,7 +986,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con bool Success = false; const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal); + const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); ZenCacheValue ClientResultValue; @@ -1097,6 +1100,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con for (const CbAttachment* Attachment : AttachmentsToStoreLocally) { + ZEN_ASSERT_SLOW(StoreLocal); CompressedBuffer Chunk = Attachment->AsCompressedBinary(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); @@ -1188,6 +1192,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con { return Request.WriteResponse(HttpResponseCode::BadRequest); } + if (!AreDiskWritesAllowed()) + { + return Request.WriteResponse(HttpResponseCode::InsufficientStorage); + } const HttpContentType ContentType = Request.RequestContentType(); @@ -1423,7 +1431,10 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { if (RawHash == Ref.ValueContentId) { - m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + if (AreDiskWritesAllowed()) + { + m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + } Source = UpstreamResult.Source; } else @@ -1492,6 +1503,10 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons { return Request.WriteResponse(HttpResponseCode::BadRequest); } + if (!AreDiskWritesAllowed()) + { + return Request.WriteResponse(HttpResponseCode::InsufficientStorage); + } Body.SetContentType(Request.RequestContentType()); @@ -1526,12 +1541,13 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons Request.WriteResponse(ResponseCode); } -CbPackage +HttpResponseCode HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, IoBuffer&& Body, uint32_t& OutAcceptMagic, RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId) + int& OutTargetProcessId, + CbPackage& OutResultPackage) { CbPackage Package; CbObjectView Object; @@ -1554,25 +1570,37 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, if (Method == "PutCacheRecords"sv) { - return HandleRpcPutCacheRecords(Package); + if (!AreDiskWritesAllowed()) + { + return HttpResponseCode::InsufficientStorage; + } + OutResultPackage = HandleRpcPutCacheRecords(Package); } else if (Method == "GetCacheRecords"sv) { - return HandleRpcGetCacheRecords(Object); + OutResultPackage = HandleRpcGetCacheRecords(Object); } else if (Method == "PutCacheValues"sv) { - return HandleRpcPutCacheValues(Package); + if (!AreDiskWritesAllowed()) + { + return HttpResponseCode::InsufficientStorage; + } + OutResultPackage = HandleRpcPutCacheValues(Package); } else if (Method == "GetCacheValues"sv) { - return HandleRpcGetCacheValues(Object); + OutResultPackage = HandleRpcGetCacheValues(Object); } else if (Method == "GetCacheChunks"sv) { - return HandleRpcGetCacheChunks(Object); + OutResultPackage = HandleRpcGetCacheChunks(Object); } - return CbPackage{}; + else + { + return HttpResponseCode::BadRequest; + } + return HttpResponseCode::OK; } void @@ -1594,27 +1622,30 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetPid = 0; - CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid); - if (AcceptMagic == kCbPkgMagic) + CbPackage RpcResult; + if (IsHttpSuccessCode(HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult))) { - FormatFlags Flags = FormatFlags::kDefault; - if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + if (AcceptMagic == kCbPkgMagic) { - Flags |= FormatFlags::kAllowLocalReferences; - if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) { - Flags |= FormatFlags::kDenyPartialLocalReferences; + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid); + ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); + } + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); + IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); + ZEN_ASSERT(RpcResponseBuffer.Size() > 0); } - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid); - ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); - IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); - ZEN_ASSERT(RpcResponseBuffer.Size() > 0); } } JobLatch.CountDown(); @@ -1652,10 +1683,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; int TargetProcessId = 0; - CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId); - if (RpcResult.IsNull()) + CbPackage RpcResult; + + HttpResponseCode ResultCode = + HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId, RpcResult); + if (!IsHttpSuccessCode(ResultCode)) { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + AsyncRequest.WriteResponse(ResultCode); return; } if (AcceptMagic == kCbPkgMagic) @@ -1706,6 +1740,7 @@ CbPackage HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest) { ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); + CbObjectView BatchObject = BatchRequest.GetObject(); ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); @@ -2064,7 +2099,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject); Request.RecordObject = ObjectBuffer; - if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) + bool StoreLocal = + EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) { m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); } @@ -2087,13 +2124,14 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest) } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { + bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId)) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { Request.Source = Params.Source; Value.Exists = true; - if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) + if (StoreLocal) { m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } @@ -2411,7 +2449,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest) Request.RawSize = Params.RawSize; const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); - const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal); + const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); const bool IsHit = SkipData || HasData; if (IsHit) { @@ -2729,7 +2767,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac Record.CacheValue.SetContentType(ZenContentType::kCbObject); Record.Source = Params.Source; - if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) + bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) { m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } @@ -2895,7 +2934,8 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names return; } - if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal)) + bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) { if (Request.IsRecordRequest) { @@ -3054,6 +3094,12 @@ HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request) Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } +bool +HttpStructuredCacheService::AreDiskWritesAllowed() const +{ + return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); +} + #if ZEN_WITH_TESTS TEST_CASE("z$service.parse.relative.Uri") diff --git a/src/zenserver/cache/structuredcache.h b/src/zenserver/cache/structuredcache.h index 8309361f4..77a0aee6a 100644 --- a/src/zenserver/cache/structuredcache.h +++ b/src/zenserver/cache/structuredcache.h @@ -24,6 +24,7 @@ struct PutRequestData; class ScrubContext; class UpstreamCache; class ZenCacheStore; +class DiskWriteBlocker; enum class CachePolicy : uint32_t; enum class RpcAcceptOptions : uint16_t; @@ -67,18 +68,18 @@ namespace cache { class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider { public: - HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - UpstreamCache& UpstreamCache); + HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache, + const DiskWriteBlocker* InDiskWriteBlocker); ~HttpStructuredCacheService(); virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; - - void Flush(); - void Scrub(ScrubContext& Ctx); + void Flush(); + void Scrub(ScrubContext& Ctx); private: struct CacheRef @@ -111,16 +112,17 @@ private: void HandleRpcRequest(HttpServerRequest& Request); void HandleDetailsRequest(HttpServerRequest& Request); - CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest); - CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest); - CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest); - CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest); - CbPackage HandleRpcRequest(const ZenContentType ContentType, - IoBuffer&& Body, - uint32_t& OutAcceptMagic, - RpcAcceptOptions& OutAcceptFlags, - int& OutTargetProcessId); + CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest); + CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest); + CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest); + CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest); + HttpResponseCode HandleRpcRequest(const ZenContentType ContentType, + IoBuffer&& Body, + uint32_t& OutAcceptMagic, + RpcAcceptOptions& OutAcceptFlags, + int& OutTargetProcessId, + CbPackage& OutPackage); void HandleCacheRequest(HttpServerRequest& Request); void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace); @@ -156,6 +158,8 @@ private: /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests); + bool AreDiskWritesAllowed() const; + spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; ZenCacheStore& m_CacheStore; @@ -167,6 +171,7 @@ private: metrics::OperationTiming m_HttpRequests; metrics::OperationTiming m_UpstreamGetRequestTiming; CacheStats m_CacheStats; + const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index 26e970073..99ca23407 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -1002,6 +1002,11 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) fs::remove_all(m_BlocksBasePath); } + CreateDirectories(m_BucketDir); + + std::unordered_map<uint32_t, uint64_t> BlockSizes = + m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); + uint64_t LogEntryCount = 0; { uint32_t IndexVersion = 0; @@ -1023,12 +1028,11 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) } } - CreateDirectories(m_BucketDir); - m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); std::vector<BlockStoreLocation> KnownLocations; KnownLocations.reserve(m_Index.size()); + std::vector<DiskIndexEntry> BadEntries; for (const auto& Entry : m_Index) { size_t EntryIndex = Entry.second; @@ -1041,10 +1045,46 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) continue; } const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment); - KnownLocations.push_back(BlockLocation); + + auto BlockIt = BlockSizes.find(BlockLocation.BlockIndex); + if (BlockIt == BlockSizes.end()) + { + ZEN_WARN("Unknown block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + } + else + { + uint64_t BlockSize = BlockIt->second; + if (BlockLocation.Offset + BlockLocation.Size > BlockSize) + { + ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + } + else + { + KnownLocations.push_back(BlockLocation); + continue; + } + } + + DiskLocation NewLocation = Payload.Location; + NewLocation.Flags |= DiskLocation::kTombStone; + BadEntries.push_back(DiskIndexEntry{.Key = Entry.first, .Location = NewLocation}); } - m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); + if (!BadEntries.empty()) + { + m_SlogFile.Append(BadEntries); + m_SlogFile.Flush(); + + LogEntryCount += BadEntries.size(); + + for (const DiskIndexEntry& BadEntry : BadEntries) + { + m_Index.erase(BadEntry.Key); + } + } + + m_BlockStore.Prune(KnownLocations); + if (IsNew || LogEntryCount > 0) { MakeIndexSnapshot(); diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index da0da2593..592de0f7f 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -506,7 +506,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_option("gc", "", "disk-reserve-size", - "Size of gc disk reserve in bytes. Default set to 268435456 (256 Mb).", + "Size of gc disk reserve in bytes. Default set to 268435456 (256 Mb). Set to zero to disable.", cxxopts::value<uint64_t>(ServerOptions.GcConfig.DiskReserveSize)->default_value("268435456"), ""); @@ -519,6 +519,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_option("gc", "", + "gc-low-diskspace-threshold", + "Minimum free space on disk to allow writes to disk. Default set to 268435456 (256 Mb). Set to zero to disable.", + cxxopts::value<uint64_t>(ServerOptions.GcConfig.Cache.MinimumFreeDiskSpaceToAllowWrites)->default_value("268435456"), + ""); + + options.add_option("gc", + "", "gc-disksize-softlimit", "Garbage collection disk usage soft limit. Default set to 0 (Off).", cxxopts::value<uint64_t>(ServerOptions.GcConfig.Cache.DiskSizeSoftLimit)->default_value("0"), @@ -867,9 +874,10 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio if (sol::optional<sol::table> GcConfig = lua["gc"]) { - ServerOptions.GcConfig.MonitorIntervalSeconds = GcConfig.value().get_or("monitorintervalseconds", 30); - ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0); - ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28)); + ServerOptions.GcConfig.MonitorIntervalSeconds = GcConfig.value().get_or("monitorintervalseconds", 30); + ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0); + ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28)); + ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites = GcConfig.value().get_or("lowdiskspacethreshold", 0); if (sol::optional<sol::table> CacheGcConfig = GcConfig.value()["cache"]) { diff --git a/src/zenserver/config.h b/src/zenserver/config.h index 9559cae33..caf4adce7 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -69,11 +69,12 @@ struct ZenUpstreamCacheConfig struct ZenCacheEvictionPolicy { - uint64_t DiskSizeLimit = ~uint64_t(0); - uint64_t MemorySizeLimit = 1024 * 1024 * 1024; - int32_t MaxDurationSeconds = 24 * 60 * 60; - uint64_t DiskSizeSoftLimit = 0; - bool Enabled = true; + uint64_t DiskSizeLimit = ~uint64_t(0); + uint64_t MemorySizeLimit = 1024 * 1024 * 1024; + int32_t MaxDurationSeconds = 24 * 60 * 60; + uint64_t DiskSizeSoftLimit = 0; + uint64_t MinimumFreeDiskSpaceToAllowWrites = 256u * 1024u * 1024u; + bool Enabled = true; }; struct ZenCasEvictionPolicy @@ -88,11 +89,12 @@ struct ZenGcConfig { ZenCasEvictionPolicy Cas; ZenCacheEvictionPolicy Cache; - int32_t MonitorIntervalSeconds = 30; - int32_t IntervalSeconds = 0; - bool CollectSmallObjects = true; - bool Enabled = true; - uint64_t DiskReserveSize = 1ul << 28; + int32_t MonitorIntervalSeconds = 30; + int32_t IntervalSeconds = 0; + bool CollectSmallObjects = true; + bool Enabled = true; + uint64_t DiskReserveSize = 1ul << 28; + uint64_t MinimumFreeDiskSpaceToAllowWrites = 1ul << 28; }; struct ZenOpenIdProviderConfig diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 80e3f27f1..9dd6ddcfc 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1680,6 +1680,7 @@ ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcMa , m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectBasePath(BasePath) +, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker()) { ZEN_INFO("initializing project store at '{}'", BasePath); // m_Log.set_level(spdlog::level::debug); @@ -2560,6 +2561,10 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, if (Method == "import") { + if (!AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); if (Result.second.empty()) { @@ -2602,6 +2607,10 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } else if (Method == "putchunks") { + if (!AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } std::span<const CbAttachment> Attachments = Package.GetAttachments(); for (const CbAttachment& Attachment : Attachments) { @@ -2676,6 +2685,12 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, return ConvertResult(Result); } +bool +ProjectStore::AreDiskWritesAllowed() const +{ + return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); +} + ////////////////////////////////////////////////////////////////////////// HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatsService& StatsService, AuthMgr& AuthMgr) @@ -2920,7 +2935,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, uint64_t Offset = 0; uint64_t Size = ~(0ull); - auto QueryParms = Req.ServerRequest().GetQueryParams(); + auto QueryParms = HttpReq.GetQueryParams(); if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false) { @@ -2987,7 +3002,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpContentType AcceptType = HttpReq.AcceptContentType(); HttpContentType RequestType = HttpReq.RequestContentType(); - switch (Req.ServerRequest().RequestVerb()) + switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { @@ -3019,6 +3034,10 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, } case HttpVerb::kPost: { + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->PutChunk(ProjectId, OplogId, Cid, RequestType, HttpReq.ReadPayload()); if (Result.first == HttpResponseCode::OK || Result.first == HttpResponseCode::Created) @@ -3107,6 +3126,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); @@ -3252,7 +3276,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value())) { CbObject& Op = MaybeOp.value(); - if (Req.ServerRequest().AcceptContentType() == ZenContentType::kCbPackage) + if (HttpReq.AcceptContentType() == ZenContentType::kCbPackage) { CbPackage Package; Package.SetObject(Op); @@ -3319,6 +3343,8 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, m_Router.RegisterRoute( "{project}/oplog/{log}", [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); @@ -3326,22 +3352,22 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, if (!Project) { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("project {} not found", ProjectId)); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); } Project->TouchProject(); - switch (Req.ServerRequest().RequestVerb()) + switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId); if (!OplogIt) { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("oplog {} not found in project {}", OplogId, ProjectId)); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("oplog {} not found in project {}", OplogId, ProjectId)); } Project->TouchOplog(OplogId); @@ -3352,14 +3378,18 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, << "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount" << Log.OplogCount() << "expired"sv << Project->IsExpired(GcClock::TimePoint::max(), Log); - Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cb.Save()); + HttpReq.WriteResponse(HttpResponseCode::OK, Cb.Save()); } break; case HttpVerb::kPost: { + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } std::filesystem::path OplogMarkerPath; - if (CbObject Params = Req.ServerRequest().ReadPayloadObject()) + if (CbObject Params = HttpReq.ReadPayloadObject()) { OplogMarkerPath = Params["gcpath"sv].AsString(); } @@ -3370,19 +3400,19 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, if (!Project->NewOplog(OplogId, OplogMarkerPath)) { // TODO: indicate why the operation failed! - return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); + return HttpReq.WriteResponse(HttpResponseCode::InternalServerError); } Project->TouchOplog(OplogId); ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath); - return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); + return HttpReq.WriteResponse(HttpResponseCode::Created); } // I guess this should ultimately be used to execute RPCs but for now, it // does absolutely nothing - return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } break; @@ -3392,7 +3422,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, Project->DeleteOplog(OplogId); - return Req.ServerRequest().WriteResponse(HttpResponseCode::OK); + return HttpReq.WriteResponse(HttpResponseCode::OK); } break; @@ -3461,13 +3491,19 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, m_Router.RegisterRoute( "{project}", [this](HttpRouterRequest& Req) { - const std::string ProjectId = Req.GetCapture(1); + HttpServerRequest& HttpReq = Req.ServerRequest(); + const std::string ProjectId = Req.GetCapture(1); - switch (Req.ServerRequest().RequestVerb()) + switch (HttpReq.RequestVerb()) { case HttpVerb::kPost: { - IoBuffer Payload = Req.ServerRequest().ReadPayload(); + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + IoBuffer Payload = HttpReq.ReadPayload(); CbObject Params = LoadCompactBinaryObject(Payload); std::string_view Id = Params["id"sv].AsString(); std::string_view Root = Params["root"sv].AsString(); @@ -3487,7 +3523,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, ProjectFilePath, ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : ""); - Req.ServerRequest().WriteResponse(HttpResponseCode::Created); + HttpReq.WriteResponse(HttpResponseCode::Created); } break; @@ -3496,9 +3532,9 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("project {} not found", ProjectId)); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); } Project->TouchProject(); @@ -3520,7 +3556,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, } Response.EndArray(); // oplogs - Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save()); + HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); } break; @@ -3529,20 +3565,20 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound, - HttpContentType::kText, - fmt::format("project {} not found", ProjectId)); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("project {} not found", ProjectId)); } ZEN_INFO("deleting project '{}'", ProjectId); if (!m_ProjectStore->DeleteProject(ProjectId)) { - return Req.ServerRequest().WriteResponse(HttpResponseCode::Locked, - HttpContentType::kText, - fmt::format("project {} is in use", ProjectId)); + return HttpReq.WriteResponse(HttpResponseCode::Locked, + HttpContentType::kText, + fmt::format("project {} is in use", ProjectId)); } - return Req.ServerRequest().WriteResponse(HttpResponseCode::NoContent); + return HttpReq.WriteResponse(HttpResponseCode::NoContent); } break; @@ -3556,14 +3592,20 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, m_Router.RegisterRoute( "{project}/oplog/{log}/save", [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (!m_ProjectStore->AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); if (HttpReq.RequestContentType() != HttpContentType::kCbObject) { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type"); } - IoBuffer Payload = Req.ServerRequest().ReadPayload(); + IoBuffer Payload = HttpReq.ReadPayload(); CbObject Response; std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->WriteOplog(ProjectId, OplogId, std::move(Payload), Response); @@ -3590,11 +3632,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, { return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type"); } - IoBuffer Payload = Req.ServerRequest().ReadPayload(); + IoBuffer Payload = HttpReq.ReadPayload(); CbObject Response; std::pair<HttpResponseCode, std::string> Result = - m_ProjectStore->ReadOplog(ProjectId, OplogId, Req.ServerRequest().GetQueryParams(), Response); + m_ProjectStore->ReadOplog(ProjectId, OplogId, HttpReq.GetQueryParams(), Response); if (Result.first == HttpResponseCode::OK) { return HttpReq.WriteResponse(HttpResponseCode::OK, Response); @@ -3615,7 +3657,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); - IoBuffer Payload = Req.ServerRequest().ReadPayload(); + IoBuffer Payload = HttpReq.ReadPayload(); m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr); }, diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 7b0c96205..b446de543 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -322,12 +322,15 @@ public: CbObjectView&& Params, AuthMgr& AuthManager); + bool AreDiskWritesAllowed() const; + private: spdlog::logger& m_Log; CidStore& m_CidStore; std::filesystem::path m_ProjectBasePath; mutable RwLock m_ProjectsLock; std::map<std::string, Ref<Project>> m_Projects; + const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; std::filesystem::path BasePathForProject(std::string_view ProjectId); }; diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 75a49367c..c6ec4b82b 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -394,14 +394,16 @@ public: } ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds); - zen::GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc", - .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), - .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), - .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), - .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, - .Enabled = ServerOptions.GcConfig.Enabled, - .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, - .DiskSizeSoftLimit = ServerOptions.GcConfig.Cache.DiskSizeSoftLimit}; + zen::GcSchedulerConfig GcConfig{ + .RootDirectory = m_DataRoot / "gc", + .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), + .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), + .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), + .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, + .Enabled = ServerOptions.GcConfig.Enabled, + .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, + .DiskSizeSoftLimit = ServerOptions.GcConfig.Cache.DiskSizeSoftLimit, + .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.Cache.MinimumFreeDiskSpaceToAllowWrites}; m_GcScheduler.Initialize(GcConfig); return EffectiveBasePort; @@ -845,8 +847,12 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) } } - m_StructuredCacheService = - std::make_unique<HttpStructuredCacheService>(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, *m_UpstreamCache); + m_StructuredCacheService = std::make_unique<HttpStructuredCacheService>(*m_CacheStore, + *m_CidStore, + m_StatsService, + m_StatusService, + *m_UpstreamCache, + m_GcManager.GetDiskWriteBlocker()); m_Http->RegisterService(*m_StructuredCacheService); m_Http->RegisterService(*m_UpstreamService); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index e19712c40..378d9fd52 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -118,28 +118,19 @@ BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::functio constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024; -void -BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, - uint64_t MaxBlockSize, - uint64_t MaxBlockCount, - const std::vector<BlockStoreLocation>& KnownLocations) +std::unordered_map<uint32_t, uint64_t> +BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount) { ZEN_ASSERT(MaxBlockSize > 0); ZEN_ASSERT(MaxBlockCount > 0); ZEN_ASSERT(IsPow2(MaxBlockCount)); + std::unordered_map<uint32_t, uint64_t> FoundBlocks; + m_TotalSize = 0; m_BlocksBasePath = BlocksBasePath; m_MaxBlockSize = MaxBlockSize; - m_ChunkBlocks.clear(); - - std::unordered_set<uint32_t> KnownBlocks; - for (const auto& Entry : KnownLocations) - { - KnownBlocks.insert(Entry.BlockIndex); - } - if (std::filesystem::is_directory(m_BlocksBasePath)) { std::vector<std::filesystem::path> FoldersToScan; @@ -168,23 +159,11 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, { continue; } - if (!KnownBlocks.contains(BlockIndex)) - { - // Log removing unreferenced block - // Clear out unused blocks - ZEN_DEBUG("removing unused block at '{}'", Path); - std::error_code Ec; - std::filesystem::remove(Path, Ec); - if (Ec) - { - ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); - } - continue; - } Ref<BlockStoreFile> BlockFile{new BlockStoreFile(Path)}; BlockFile->Open(); m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed); m_ChunkBlocks[BlockIndex] = BlockFile; + FoundBlocks[BlockIndex] = BlockFile->FileSize(); } } ++FolderOffset; @@ -194,6 +173,39 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, { CreateDirectories(m_BlocksBasePath); } + return FoundBlocks; +} + +void +BlockStore::Prune(const std::vector<BlockStoreLocation>& KnownLocations) +{ + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + + std::unordered_set<uint32_t> KnownBlocks; + for (const auto& Entry : KnownLocations) + { + KnownBlocks.insert(Entry.BlockIndex); + } + std::vector<uint32_t> BlocksToDelete; + for (auto It = m_ChunkBlocks.begin(); It != m_ChunkBlocks.end(); ++It) + { + uint32_t BlockIndex = It->first; + if (!KnownBlocks.contains(BlockIndex)) + { + Ref<BlockStoreFile> BlockFile = m_ChunkBlocks[BlockIndex]; + m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed); + BlocksToDelete.push_back(BlockIndex); + } + } + + for (uint32_t BlockIndex : BlocksToDelete) + { + // Clear out unused blocks + Ref<BlockStoreFile> BlockFile = m_ChunkBlocks[BlockIndex]; + m_ChunkBlocks.erase(BlockIndex); + ZEN_DEBUG("marking block store file '{}' for delete, block #{}", BlockFile->GetPath(), BlockIndex); + BlockFile->MarkAsDeleteOnClose(); + } } void @@ -460,7 +472,10 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); - OldBlockFile = m_ChunkBlocks[BlockIndex]; + if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end()) + { + OldBlockFile = It->second; + } } if (!OldBlockFile) @@ -492,8 +507,9 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, }); if (OldBlockFile) { - m_ChunkBlocks[BlockIndex] = nullptr; ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile); + m_ChunkBlocks.erase(BlockIndex); m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); OldBlockFile->MarkAsDeleteOnClose(); } @@ -570,7 +586,9 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); + ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); m_ChunkBlocks.erase(NextBlockIndex); + NewBlockFile->MarkAsDeleteOnClose(); return; } @@ -615,8 +633,9 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); - m_ChunkBlocks[BlockIndex] = nullptr; ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile); + m_ChunkBlocks.erase(BlockIndex); m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); OldBlockFile->MarkAsDeleteOnClose(); } @@ -692,8 +711,8 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, size_t ChunkIndex = LocationIndexes[LocationIndexOffset]; const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; - const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[FirstLocation.BlockIndex]; - if (!BlockFile) + auto FindBlockIt = m_ChunkBlocks.find(FirstLocation.BlockIndex); + if (FindBlockIt == m_ChunkBlocks.end()) { while (ChunkLocations[ChunkIndex].BlockIndex == FirstLocation.BlockIndex) { @@ -707,6 +726,9 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, } continue; } + const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second; + ZEN_ASSERT(BlockFile); + size_t BlockSize = BlockFile->FileSize(); size_t RangeCount = GetNextRange(LocationIndexOffset); if (RangeCount > 0) @@ -939,7 +961,7 @@ TEST_CASE("blockstore.chunks") auto RootDirectory = TempDir.Path(); BlockStore Store; - Store.Initialize(RootDirectory, 128, 1024, {}); + Store.Initialize(RootDirectory, 128, 1024); IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); CHECK(!BadChunk); @@ -969,7 +991,7 @@ TEST_CASE("blockstore.clean.stray.blocks") auto RootDirectory = TempDir.Path(); BlockStore Store; - Store.Initialize(RootDirectory / "store", 128, 1024, {}); + Store.Initialize(RootDirectory / "store", 128, 1024); std::string FirstChunkData = "This is the data of the first chunk that we will write"; BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); @@ -982,7 +1004,8 @@ TEST_CASE("blockstore.clean.stray.blocks") Store.Close(); // Not referencing the second block means that we should be deleted - Store.Initialize(RootDirectory / "store", 128, 1024, {FirstChunkLocation, SecondChunkLocation}); + Store.Initialize(RootDirectory / "store", 128, 1024); + Store.Prune({FirstChunkLocation, SecondChunkLocation}); CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1); } @@ -995,7 +1018,7 @@ TEST_CASE("blockstore.flush.forces.new.block") auto RootDirectory = TempDir.Path(); BlockStore Store; - Store.Initialize(RootDirectory / "store", 128, 1024, {}); + Store.Initialize(RootDirectory / "store", 128, 1024); std::string FirstChunkData = "This is the data of the first chunk that we will write"; WriteStringAsChunk(Store, FirstChunkData, 4); @@ -1018,7 +1041,7 @@ TEST_CASE("blockstore.iterate.chunks") auto RootDirectory = TempDir.Path(); BlockStore Store; - Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {}); + Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024); IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); CHECK(!BadChunk); @@ -1114,7 +1137,7 @@ TEST_CASE("blockstore.reclaim.space") auto RootDirectory = TempDir.Path(); BlockStore Store; - Store.Initialize(RootDirectory / "store", 512, 1024, {}); + Store.Initialize(RootDirectory / "store", 512, 1024); constexpr size_t ChunkCount = 200; constexpr size_t Alignment = 8; @@ -1231,7 +1254,7 @@ TEST_CASE("blockstore.thread.read.write") auto RootDirectory = TempDir.Path(); BlockStore Store; - Store.Initialize(RootDirectory / "store", 1088, 1024, {}); + Store.Initialize(RootDirectory / "store", 1088, 1024); constexpr size_t ChunkCount = 1000; constexpr size_t Alignment = 8; diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 7b2c21b0f..2974570e5 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -701,23 +701,60 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) std::filesystem::remove_all(BasePath); } + CreateDirectories(BasePath); + + std::unordered_map<uint32_t, uint64_t> BlockSizes = + m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); + m_LogFlushPosition = ReadIndexFile(); uint64_t LogEntryCount = ReadLog(m_LogFlushPosition); - CreateDirectories(BasePath); - std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); std::vector<BlockStoreLocation> KnownLocations; KnownLocations.reserve(m_LocationMap.size()); + std::vector<CasDiskIndexEntry> BadEntries; for (const auto& Entry : m_LocationMap) { - const BlockStoreDiskLocation& Location = Entry.second; - KnownLocations.push_back(Location.Get(m_PayloadAlignment)); + const BlockStoreDiskLocation& DiskLocation = Entry.second; + auto BlockIt = BlockSizes.find(DiskLocation.GetBlockIndex()); + if (BlockIt == BlockSizes.end()) + { + ZEN_WARN("Unknown block {} for entry {}", DiskLocation.GetBlockIndex(), Entry.first.ToHexString()); + } + else + { + BlockStoreLocation BlockLocation = DiskLocation.Get(m_PayloadAlignment); + + uint64_t BlockSize = BlockIt->second; + if (BlockLocation.Offset + BlockLocation.Size > BlockSize) + { + ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + } + else + { + KnownLocations.emplace_back(std::move(BlockLocation)); + continue; + } + BadEntries.push_back({.Key = Entry.first, .Location = DiskLocation, .Flags = CasDiskIndexEntry::kTombstone}); + } + } + + if (!BadEntries.empty()) + { + m_CasLog.Append(BadEntries); + m_CasLog.Flush(); + + LogEntryCount += BadEntries.size(); + + for (const CasDiskIndexEntry& BadEntry : BadEntries) + { + m_LocationMap.erase(BadEntry.Key); + } } - m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); + m_BlockStore.Prune(KnownLocations); if (IsNewStore || (LogEntryCount > 0)) { diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 370c3c965..f9888722b 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -559,10 +559,12 @@ DiskUsageWindow::FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick GcScheduler::GcScheduler(GcManager& GcManager) : m_Log(logging::Get("gc")), m_GcManager(GcManager) { + m_GcManager.SetDiskWriteBlocker(this); } GcScheduler::~GcScheduler() { + m_GcManager.SetDiskWriteBlocker(nullptr); Shutdown(); } @@ -573,6 +575,18 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config) m_Config = Config; + std::error_code Ec; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec); + if (Ec) + { + m_AreDiskWritesBlocked.store(true); + ZEN_WARN("get disk space info FAILED, blocking disk writes, reason: '{}'", Ec.message()); + } + else + { + CheckDiskSpace(Space); + } + if (m_Config.Interval.count() && m_Config.Interval < m_Config.MonitorInterval) { m_Config.Interval = m_Config.MonitorInterval; @@ -580,7 +594,7 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config) std::filesystem::create_directories(Config.RootDirectory); - std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize); + Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize); if (Ec) { ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason '{}'", @@ -664,6 +678,29 @@ GcScheduler::Trigger(const GcScheduler::TriggerParams& Params) } void +GcScheduler::CheckDiskSpace(const DiskSpace& Space) +{ + bool AreDiskWritesBlocked = m_AreDiskWritesBlocked; + bool IsLowOnDiskSpace = (m_Config.MinimumFreeDiskSpaceToAllowWrites) != 0 && (Space.Free < m_Config.MinimumFreeDiskSpaceToAllowWrites); + if (IsLowOnDiskSpace != AreDiskWritesBlocked) + { + m_AreDiskWritesBlocked.store(IsLowOnDiskSpace); + if (IsLowOnDiskSpace) + { + ZEN_WARN("Writing to disk is blocked, free disk space: {}, minimum required {}", + NiceBytes(Space.Free), + NiceBytes(m_Config.MinimumFreeDiskSpaceToAllowWrites)); + } + else + { + ZEN_INFO("Writing to disk is unblocked, free disk space: {}, minimum required {}", + NiceBytes(Space.Free), + NiceBytes(m_Config.MinimumFreeDiskSpaceToAllowWrites)); + } + } +} + +void GcScheduler::SchedulerThread() { std::chrono::seconds WaitTime{0}; @@ -716,16 +753,21 @@ GcScheduler::SchedulerThread() GcClock::TimePoint ExpireTime = MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration; - std::error_code Ec; const GcStorageSize TotalSize = m_GcManager.TotalStorageSize(); if (Timeout && Status() == GcSchedulerStatus::kIdle) { - DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec); + std::error_code Ec; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec); if (Ec) { + m_AreDiskWritesBlocked.store(true); ZEN_WARN("get disk space info FAILED, reason: '{}'", Ec.message()); } + else + { + CheckDiskSpace(Space); + } const int64_t PressureGraphLength = 30; const std::chrono::duration LoadGraphTime = PressureGraphLength * m_Config.MonitorInterval; diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 857ccae38..738510cac 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -124,10 +124,11 @@ public: typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback; typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback; - void Initialize(const std::filesystem::path& BlocksBasePath, - uint64_t MaxBlockSize, - uint64_t MaxBlockCount, - const std::vector<BlockStoreLocation>& KnownLocations); + std::unordered_map<uint32_t, uint64_t> Initialize(const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount); + + void Prune(const std::vector<BlockStoreLocation>& KnownLocations); void Close(); void WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback); diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index e0354b331..fe9857e6a 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -27,6 +27,7 @@ class HashKeySet; class GcManager; class CidStore; struct IoHash; +struct DiskSpace; /** GC clock */ @@ -121,6 +122,14 @@ private: GcManager& m_Gc; }; +/** Interface for querying if we are running low on disk space, used to deny put/writes to disk + */ +class DiskWriteBlocker +{ +public: + virtual bool AreDiskWritesAllowed() const = 0; +}; + /** GC orchestrator */ class GcManager @@ -139,6 +148,9 @@ public: GcStorageSize TotalStorageSize() const; + const DiskWriteBlocker* GetDiskWriteBlocker() { return m_DiskWriteBlocker; } + void SetDiskWriteBlocker(const DiskWriteBlocker* Monitor) { m_DiskWriteBlocker = Monitor; } + #if ZEN_USE_REF_TRACKING void OnNewCidReferences(std::span<IoHash> Hashes); void OnCommittedCidReferences(std::span<IoHash> Hashes); @@ -151,7 +163,8 @@ private: mutable RwLock m_Lock; std::vector<GcContributor*> m_GcContribs; std::vector<GcStorage*> m_GcStorage; - CidStore* m_CidStore = nullptr; + CidStore* m_CidStore = nullptr; + const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; }; enum class GcSchedulerStatus : uint32_t @@ -167,10 +180,11 @@ struct GcSchedulerConfig std::chrono::seconds MonitorInterval{30}; std::chrono::seconds Interval{}; std::chrono::seconds MaxCacheDuration{86400}; - bool CollectSmallObjects = true; - bool Enabled = true; - uint64_t DiskReserveSize = 1ul << 28; - uint64_t DiskSizeSoftLimit = 0; + bool CollectSmallObjects = true; + bool Enabled = true; + uint64_t DiskReserveSize = 1ul << 28; + uint64_t DiskSizeSoftLimit = 0; + uint64_t MinimumFreeDiskSpaceToAllowWrites = 1ul << 28; }; class DiskUsageWindow @@ -196,7 +210,7 @@ public: /** * GC scheduler */ -class GcScheduler +class GcScheduler : private DiskWriteBlocker { public: GcScheduler(GcManager& GcManager); @@ -220,6 +234,8 @@ private: void CollectGarbage(const GcClock::TimePoint& ExpireTime, bool Delete, bool CollectSmallObjects); GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime); spdlog::logger& Log() { return m_Log; } + virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); } + void CheckDiskSpace(const DiskSpace& Space); spdlog::logger& m_Log; GcManager& m_GcManager; @@ -232,6 +248,7 @@ private: std::mutex m_GcMutex; std::condition_variable m_GcSignal; std::optional<TriggerParams> m_TriggerParams; + std::atomic_bool m_AreDiskWritesBlocked = false; TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog; DiskUsageWindow m_DiskUsageWindow; |