aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-09 15:33:38 +0200
committerStefan Boberg <[email protected]>2023-05-09 15:33:38 +0200
commit15f361ad8549a4c0909d164943f9ddd8a714756c (patch)
treee3e4f37778015ea12a14b610039906a66377d3a4 /src
parentmake logging tests run as part of zencore-test (diff)
parentLow disk space detector (#277) (diff)
downloadzen-15f361ad8549a4c0909d164943f9ddd8a714756c.tar.xz
zen-15f361ad8549a4c0909d164943f9ddd8a714756c.zip
Merge branch 'main' of https://github.com/EpicGames/zen
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/cache/structuredcache.cpp124
-rw-r--r--src/zenserver/cache/structuredcache.h41
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp48
-rw-r--r--src/zenserver/config.cpp16
-rw-r--r--src/zenserver/config.h22
-rw-r--r--src/zenserver/projectstore/projectstore.cpp118
-rw-r--r--src/zenserver/projectstore/projectstore.h3
-rw-r--r--src/zenserver/zenserver.cpp26
-rw-r--r--src/zenstore/blockstore.cpp99
-rw-r--r--src/zenstore/compactcas.cpp47
-rw-r--r--src/zenstore/gc.cpp48
-rw-r--r--src/zenstore/include/zenstore/blockstore.h9
-rw-r--r--src/zenstore/include/zenstore/gc.h29
13 files changed, 451 insertions, 179 deletions
diff --git a/src/zenserver/cache/structuredcache.cpp b/src/zenserver/cache/structuredcache.cpp
index 3c829f0e8..9f2a448bb 100644
--- a/src/zenserver/cache/structuredcache.cpp
+++ b/src/zenserver/cache/structuredcache.cpp
@@ -18,6 +18,7 @@
#include <zenhttp/httpserver.h>
#include <zenhttp/httpshared.h>
#include <zenhttp/httpstats.h>
+#include <zenstore/gc.h>
#include <zenutil/cache/cache.h>
#include <zenutil/cache/rpcrecording.h>
@@ -315,17 +316,19 @@ namespace {
//////////////////////////////////////////////////////////////////////////
-HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
- HttpStatsService& StatsService,
- HttpStatusService& StatusService,
- UpstreamCache& UpstreamCache)
+HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
+ UpstreamCache& UpstreamCache,
+ const DiskWriteBlocker* InDiskWriteBlocker)
: m_Log(logging::Get("cache"))
, m_CacheStore(InCacheStore)
, m_StatsService(StatsService)
, m_StatusService(StatusService)
, m_CidStore(InCidStore)
, m_UpstreamCache(UpstreamCache)
+, m_DiskWriteBlocker(InDiskWriteBlocker)
{
m_StatsService.RegisterHandler("z$", *this);
m_StatusService.RegisterHandler("z$", *this);
@@ -983,7 +986,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
bool Success = false;
const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord);
const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
- const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal);
+ const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData);
ZenCacheValue ClientResultValue;
@@ -1097,6 +1100,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
{
+ ZEN_ASSERT_SLOW(StoreLocal);
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
CidStore::InsertResult InsertResult =
m_CidStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
@@ -1188,6 +1192,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
+ if (!AreDiskWritesAllowed())
+ {
+ return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
const HttpContentType ContentType = Request.RequestContentType();
@@ -1423,7 +1431,10 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons
{
if (RawHash == Ref.ValueContentId)
{
- m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ if (AreDiskWritesAllowed())
+ {
+ m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ }
Source = UpstreamResult.Source;
}
else
@@ -1492,6 +1503,10 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
+ if (!AreDiskWritesAllowed())
+ {
+ return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
Body.SetContentType(Request.RequestContentType());
@@ -1526,12 +1541,13 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
Request.WriteResponse(ResponseCode);
}
-CbPackage
+HttpResponseCode
HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
IoBuffer&& Body,
uint32_t& OutAcceptMagic,
RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId)
+ int& OutTargetProcessId,
+ CbPackage& OutResultPackage)
{
CbPackage Package;
CbObjectView Object;
@@ -1554,25 +1570,37 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
if (Method == "PutCacheRecords"sv)
{
- return HandleRpcPutCacheRecords(Package);
+ if (!AreDiskWritesAllowed())
+ {
+ return HttpResponseCode::InsufficientStorage;
+ }
+ OutResultPackage = HandleRpcPutCacheRecords(Package);
}
else if (Method == "GetCacheRecords"sv)
{
- return HandleRpcGetCacheRecords(Object);
+ OutResultPackage = HandleRpcGetCacheRecords(Object);
}
else if (Method == "PutCacheValues"sv)
{
- return HandleRpcPutCacheValues(Package);
+ if (!AreDiskWritesAllowed())
+ {
+ return HttpResponseCode::InsufficientStorage;
+ }
+ OutResultPackage = HandleRpcPutCacheValues(Package);
}
else if (Method == "GetCacheValues"sv)
{
- return HandleRpcGetCacheValues(Object);
+ OutResultPackage = HandleRpcGetCacheValues(Object);
}
else if (Method == "GetCacheChunks"sv)
{
- return HandleRpcGetCacheChunks(Object);
+ OutResultPackage = HandleRpcGetCacheChunks(Object);
}
- return CbPackage{};
+ else
+ {
+ return HttpResponseCode::BadRequest;
+ }
+ return HttpResponseCode::OK;
}
void
@@ -1594,27 +1622,30 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re
uint32_t AcceptMagic = 0;
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
int TargetPid = 0;
- CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid);
- if (AcceptMagic == kCbPkgMagic)
+ CbPackage RpcResult;
+ if (IsHttpSuccessCode(HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid, RpcResult)))
{
- FormatFlags Flags = FormatFlags::kDefault;
- if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ if (AcceptMagic == kCbPkgMagic)
{
- Flags |= FormatFlags::kAllowLocalReferences;
- if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
{
- Flags |= FormatFlags::kDenyPartialLocalReferences;
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
}
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid);
+ ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
+ IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize());
+ ZEN_ASSERT(RpcResponseBuffer.Size() > 0);
}
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid);
- ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
- IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize());
- ZEN_ASSERT(RpcResponseBuffer.Size() > 0);
}
}
JobLatch.CountDown();
@@ -1652,10 +1683,13 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
uint32_t AcceptMagic = 0;
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
int TargetProcessId = 0;
- CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId);
- if (RpcResult.IsNull())
+ CbPackage RpcResult;
+
+ HttpResponseCode ResultCode =
+ HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId, RpcResult);
+ if (!IsHttpSuccessCode(ResultCode))
{
- AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ AsyncRequest.WriteResponse(ResultCode);
return;
}
if (AcceptMagic == kCbPkgMagic)
@@ -1706,6 +1740,7 @@ CbPackage
HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest)
{
ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
+
CbObjectView BatchObject = BatchRequest.GetObject();
ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
@@ -2064,7 +2099,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject);
Request.RecordObject = ObjectBuffer;
- if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal))
+ bool StoreLocal =
+ EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
{
m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}});
}
@@ -2087,13 +2124,14 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
}
if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
{
+ bool StoreLocal = EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId))
{
if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
{
Request.Source = Params.Source;
Value.Exists = true;
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
+ if (StoreLocal)
{
m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash());
}
@@ -2411,7 +2449,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
Request.RawSize = Params.RawSize;
const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
- const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal);
+ const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
const bool IsHit = SkipData || HasData;
if (IsHit)
{
@@ -2729,7 +2767,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespac
Record.CacheValue.SetContentType(ZenContentType::kCbObject);
Record.Source = Params.Source;
- if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal))
+ bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
{
m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
}
@@ -2895,7 +2934,8 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
return;
}
- if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal))
+ bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
{
if (Request.IsRecordRequest)
{
@@ -3054,6 +3094,12 @@ HttpStructuredCacheService::HandleStatusRequest(HttpServerRequest& Request)
Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
+bool
+HttpStructuredCacheService::AreDiskWritesAllowed() const
+{
+ return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
+}
+
#if ZEN_WITH_TESTS
TEST_CASE("z$service.parse.relative.Uri")
diff --git a/src/zenserver/cache/structuredcache.h b/src/zenserver/cache/structuredcache.h
index 8309361f4..77a0aee6a 100644
--- a/src/zenserver/cache/structuredcache.h
+++ b/src/zenserver/cache/structuredcache.h
@@ -24,6 +24,7 @@ struct PutRequestData;
class ScrubContext;
class UpstreamCache;
class ZenCacheStore;
+class DiskWriteBlocker;
enum class CachePolicy : uint32_t;
enum class RpcAcceptOptions : uint16_t;
@@ -67,18 +68,18 @@ namespace cache {
class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider
{
public:
- HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
- HttpStatsService& StatsService,
- HttpStatusService& StatusService,
- UpstreamCache& UpstreamCache);
+ HttpStructuredCacheService(ZenCacheStore& InCacheStore,
+ CidStore& InCidStore,
+ HttpStatsService& StatsService,
+ HttpStatusService& StatusService,
+ UpstreamCache& UpstreamCache,
+ const DiskWriteBlocker* InDiskWriteBlocker);
~HttpStructuredCacheService();
virtual const char* BaseUri() const override;
virtual void HandleRequest(HttpServerRequest& Request) override;
-
- void Flush();
- void Scrub(ScrubContext& Ctx);
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
private:
struct CacheRef
@@ -111,16 +112,17 @@ private:
void HandleRpcRequest(HttpServerRequest& Request);
void HandleDetailsRequest(HttpServerRequest& Request);
- CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest);
- CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest);
- CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest);
- CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest);
- CbPackage HandleRpcRequest(const ZenContentType ContentType,
- IoBuffer&& Body,
- uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags,
- int& OutTargetProcessId);
+ CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest);
+ CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest);
+ CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest);
+ HttpResponseCode HandleRpcRequest(const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags,
+ int& OutTargetProcessId,
+ CbPackage& OutPackage);
void HandleCacheRequest(HttpServerRequest& Request);
void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace);
@@ -156,6 +158,8 @@ private:
/** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests);
+ bool AreDiskWritesAllowed() const;
+
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
ZenCacheStore& m_CacheStore;
@@ -167,6 +171,7 @@ private:
metrics::OperationTiming m_HttpRequests;
metrics::OperationTiming m_UpstreamGetRequestTiming;
CacheStats m_CacheStats;
+ const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index 26e970073..99ca23407 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -1002,6 +1002,11 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
fs::remove_all(m_BlocksBasePath);
}
+ CreateDirectories(m_BucketDir);
+
+ std::unordered_map<uint32_t, uint64_t> BlockSizes =
+ m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);
+
uint64_t LogEntryCount = 0;
{
uint32_t IndexVersion = 0;
@@ -1023,12 +1028,11 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
}
}
- CreateDirectories(m_BucketDir);
-
m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
std::vector<BlockStoreLocation> KnownLocations;
KnownLocations.reserve(m_Index.size());
+ std::vector<DiskIndexEntry> BadEntries;
for (const auto& Entry : m_Index)
{
size_t EntryIndex = Entry.second;
@@ -1041,10 +1045,46 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
continue;
}
const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment);
- KnownLocations.push_back(BlockLocation);
+
+ auto BlockIt = BlockSizes.find(BlockLocation.BlockIndex);
+ if (BlockIt == BlockSizes.end())
+ {
+ ZEN_WARN("Unknown block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString());
+ }
+ else
+ {
+ uint64_t BlockSize = BlockIt->second;
+ if (BlockLocation.Offset + BlockLocation.Size > BlockSize)
+ {
+ ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString());
+ }
+ else
+ {
+ KnownLocations.push_back(BlockLocation);
+ continue;
+ }
+ }
+
+ DiskLocation NewLocation = Payload.Location;
+ NewLocation.Flags |= DiskLocation::kTombStone;
+ BadEntries.push_back(DiskIndexEntry{.Key = Entry.first, .Location = NewLocation});
}
- m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
+ if (!BadEntries.empty())
+ {
+ m_SlogFile.Append(BadEntries);
+ m_SlogFile.Flush();
+
+ LogEntryCount += BadEntries.size();
+
+ for (const DiskIndexEntry& BadEntry : BadEntries)
+ {
+ m_Index.erase(BadEntry.Key);
+ }
+ }
+
+ m_BlockStore.Prune(KnownLocations);
+
if (IsNew || LogEntryCount > 0)
{
MakeIndexSnapshot();
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index da0da2593..592de0f7f 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -506,7 +506,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_option("gc",
"",
"disk-reserve-size",
- "Size of gc disk reserve in bytes. Default set to 268435456 (256 Mb).",
+ "Size of gc disk reserve in bytes. Default set to 268435456 (256 Mb). Set to zero to disable.",
cxxopts::value<uint64_t>(ServerOptions.GcConfig.DiskReserveSize)->default_value("268435456"),
"");
@@ -519,6 +519,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_option("gc",
"",
+ "gc-low-diskspace-threshold",
+ "Minimum free space on disk to allow writes to disk. Default set to 268435456 (256 Mb). Set to zero to disable.",
+ cxxopts::value<uint64_t>(ServerOptions.GcConfig.Cache.MinimumFreeDiskSpaceToAllowWrites)->default_value("268435456"),
+ "");
+
+ options.add_option("gc",
+ "",
"gc-disksize-softlimit",
"Garbage collection disk usage soft limit. Default set to 0 (Off).",
cxxopts::value<uint64_t>(ServerOptions.GcConfig.Cache.DiskSizeSoftLimit)->default_value("0"),
@@ -867,9 +874,10 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio
if (sol::optional<sol::table> GcConfig = lua["gc"])
{
- ServerOptions.GcConfig.MonitorIntervalSeconds = GcConfig.value().get_or("monitorintervalseconds", 30);
- ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0);
- ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28));
+ ServerOptions.GcConfig.MonitorIntervalSeconds = GcConfig.value().get_or("monitorintervalseconds", 30);
+ ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0);
+ ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28));
+ ServerOptions.GcConfig.MinimumFreeDiskSpaceToAllowWrites = GcConfig.value().get_or("lowdiskspacethreshold", 0);
if (sol::optional<sol::table> CacheGcConfig = GcConfig.value()["cache"])
{
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index 9559cae33..caf4adce7 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -69,11 +69,12 @@ struct ZenUpstreamCacheConfig
struct ZenCacheEvictionPolicy
{
- uint64_t DiskSizeLimit = ~uint64_t(0);
- uint64_t MemorySizeLimit = 1024 * 1024 * 1024;
- int32_t MaxDurationSeconds = 24 * 60 * 60;
- uint64_t DiskSizeSoftLimit = 0;
- bool Enabled = true;
+ uint64_t DiskSizeLimit = ~uint64_t(0);
+ uint64_t MemorySizeLimit = 1024 * 1024 * 1024;
+ int32_t MaxDurationSeconds = 24 * 60 * 60;
+ uint64_t DiskSizeSoftLimit = 0;
+ uint64_t MinimumFreeDiskSpaceToAllowWrites = 256u * 1024u * 1024u;
+ bool Enabled = true;
};
struct ZenCasEvictionPolicy
@@ -88,11 +89,12 @@ struct ZenGcConfig
{
ZenCasEvictionPolicy Cas;
ZenCacheEvictionPolicy Cache;
- int32_t MonitorIntervalSeconds = 30;
- int32_t IntervalSeconds = 0;
- bool CollectSmallObjects = true;
- bool Enabled = true;
- uint64_t DiskReserveSize = 1ul << 28;
+ int32_t MonitorIntervalSeconds = 30;
+ int32_t IntervalSeconds = 0;
+ bool CollectSmallObjects = true;
+ bool Enabled = true;
+ uint64_t DiskReserveSize = 1ul << 28;
+ uint64_t MinimumFreeDiskSpaceToAllowWrites = 1ul << 28;
};
struct ZenOpenIdProviderConfig
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 80e3f27f1..9dd6ddcfc 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1680,6 +1680,7 @@ ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcMa
, m_Log(logging::Get("project"))
, m_CidStore(Store)
, m_ProjectBasePath(BasePath)
+, m_DiskWriteBlocker(Gc.GetDiskWriteBlocker())
{
ZEN_INFO("initializing project store at '{}'", BasePath);
// m_Log.set_level(spdlog::level::debug);
@@ -2560,6 +2561,10 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
if (Method == "import")
{
+ if (!AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
if (Result.second.empty())
{
@@ -2602,6 +2607,10 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
else if (Method == "putchunks")
{
+ if (!AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
std::span<const CbAttachment> Attachments = Package.GetAttachments();
for (const CbAttachment& Attachment : Attachments)
{
@@ -2676,6 +2685,12 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
return ConvertResult(Result);
}
+bool
+ProjectStore::AreDiskWritesAllowed() const
+{
+ return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
+}
+
//////////////////////////////////////////////////////////////////////////
HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatsService& StatsService, AuthMgr& AuthMgr)
@@ -2920,7 +2935,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
uint64_t Offset = 0;
uint64_t Size = ~(0ull);
- auto QueryParms = Req.ServerRequest().GetQueryParams();
+ auto QueryParms = HttpReq.GetQueryParams();
if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false)
{
@@ -2987,7 +3002,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
HttpContentType AcceptType = HttpReq.AcceptContentType();
HttpContentType RequestType = HttpReq.RequestContentType();
- switch (Req.ServerRequest().RequestVerb())
+ switch (HttpReq.RequestVerb())
{
case HttpVerb::kGet:
{
@@ -3019,6 +3034,10 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
}
case HttpVerb::kPost:
{
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
std::pair<HttpResponseCode, std::string> Result =
m_ProjectStore->PutChunk(ProjectId, OplogId, Cid, RequestType, HttpReq.ReadPayload());
if (Result.first == HttpResponseCode::OK || Result.first == HttpResponseCode::Created)
@@ -3107,6 +3126,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
const auto& ProjectId = Req.GetCapture(1);
const auto& OplogId = Req.GetCapture(2);
@@ -3252,7 +3276,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value()))
{
CbObject& Op = MaybeOp.value();
- if (Req.ServerRequest().AcceptContentType() == ZenContentType::kCbPackage)
+ if (HttpReq.AcceptContentType() == ZenContentType::kCbPackage)
{
CbPackage Package;
Package.SetObject(Op);
@@ -3319,6 +3343,8 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
m_Router.RegisterRoute(
"{project}/oplog/{log}",
[this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
const auto& ProjectId = Req.GetCapture(1);
const auto& OplogId = Req.GetCapture(2);
@@ -3326,22 +3352,22 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
if (!Project)
{
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("project {} not found", ProjectId));
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("project {} not found", ProjectId));
}
Project->TouchProject();
- switch (Req.ServerRequest().RequestVerb())
+ switch (HttpReq.RequestVerb())
{
case HttpVerb::kGet:
{
ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId);
if (!OplogIt)
{
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("oplog {} not found in project {}", OplogId, ProjectId));
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("oplog {} not found in project {}", OplogId, ProjectId));
}
Project->TouchOplog(OplogId);
@@ -3352,14 +3378,18 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
<< "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount"
<< Log.OplogCount() << "expired"sv << Project->IsExpired(GcClock::TimePoint::max(), Log);
- Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cb.Save());
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cb.Save());
}
break;
case HttpVerb::kPost:
{
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
std::filesystem::path OplogMarkerPath;
- if (CbObject Params = Req.ServerRequest().ReadPayloadObject())
+ if (CbObject Params = HttpReq.ReadPayloadObject())
{
OplogMarkerPath = Params["gcpath"sv].AsString();
}
@@ -3370,19 +3400,19 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
if (!Project->NewOplog(OplogId, OplogMarkerPath))
{
// TODO: indicate why the operation failed!
- return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError);
+ return HttpReq.WriteResponse(HttpResponseCode::InternalServerError);
}
Project->TouchOplog(OplogId);
ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath);
- return Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
+ return HttpReq.WriteResponse(HttpResponseCode::Created);
}
// I guess this should ultimately be used to execute RPCs but for now, it
// does absolutely nothing
- return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
}
break;
@@ -3392,7 +3422,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
Project->DeleteOplog(OplogId);
- return Req.ServerRequest().WriteResponse(HttpResponseCode::OK);
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
}
break;
@@ -3461,13 +3491,19 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
m_Router.RegisterRoute(
"{project}",
[this](HttpRouterRequest& Req) {
- const std::string ProjectId = Req.GetCapture(1);
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const std::string ProjectId = Req.GetCapture(1);
- switch (Req.ServerRequest().RequestVerb())
+ switch (HttpReq.RequestVerb())
{
case HttpVerb::kPost:
{
- IoBuffer Payload = Req.ServerRequest().ReadPayload();
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ IoBuffer Payload = HttpReq.ReadPayload();
CbObject Params = LoadCompactBinaryObject(Payload);
std::string_view Id = Params["id"sv].AsString();
std::string_view Root = Params["root"sv].AsString();
@@ -3487,7 +3523,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
ProjectFilePath,
ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : "");
- Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
+ HttpReq.WriteResponse(HttpResponseCode::Created);
}
break;
@@ -3496,9 +3532,9 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
if (!Project)
{
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("project {} not found", ProjectId));
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("project {} not found", ProjectId));
}
Project->TouchProject();
@@ -3520,7 +3556,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
}
Response.EndArray(); // oplogs
- Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save());
+ HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
}
break;
@@ -3529,20 +3565,20 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
if (!Project)
{
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("project {} not found", ProjectId));
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("project {} not found", ProjectId));
}
ZEN_INFO("deleting project '{}'", ProjectId);
if (!m_ProjectStore->DeleteProject(ProjectId))
{
- return Req.ServerRequest().WriteResponse(HttpResponseCode::Locked,
- HttpContentType::kText,
- fmt::format("project {} is in use", ProjectId));
+ return HttpReq.WriteResponse(HttpResponseCode::Locked,
+ HttpContentType::kText,
+ fmt::format("project {} is in use", ProjectId));
}
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NoContent);
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
}
break;
@@ -3556,14 +3592,20 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
m_Router.RegisterRoute(
"{project}/oplog/{log}/save",
[this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (!m_ProjectStore->AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
if (HttpReq.RequestContentType() != HttpContentType::kCbObject)
{
return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type");
}
- IoBuffer Payload = Req.ServerRequest().ReadPayload();
+ IoBuffer Payload = HttpReq.ReadPayload();
CbObject Response;
std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->WriteOplog(ProjectId, OplogId, std::move(Payload), Response);
@@ -3590,11 +3632,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
{
return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type");
}
- IoBuffer Payload = Req.ServerRequest().ReadPayload();
+ IoBuffer Payload = HttpReq.ReadPayload();
CbObject Response;
std::pair<HttpResponseCode, std::string> Result =
- m_ProjectStore->ReadOplog(ProjectId, OplogId, Req.ServerRequest().GetQueryParams(), Response);
+ m_ProjectStore->ReadOplog(ProjectId, OplogId, HttpReq.GetQueryParams(), Response);
if (Result.first == HttpResponseCode::OK)
{
return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
@@ -3615,7 +3657,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects,
const auto& ProjectId = Req.GetCapture(1);
const auto& OplogId = Req.GetCapture(2);
- IoBuffer Payload = Req.ServerRequest().ReadPayload();
+ IoBuffer Payload = HttpReq.ReadPayload();
m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr);
},
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 7b0c96205..b446de543 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -322,12 +322,15 @@ public:
CbObjectView&& Params,
AuthMgr& AuthManager);
+ bool AreDiskWritesAllowed() const;
+
private:
spdlog::logger& m_Log;
CidStore& m_CidStore;
std::filesystem::path m_ProjectBasePath;
mutable RwLock m_ProjectsLock;
std::map<std::string, Ref<Project>> m_Projects;
+ const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
std::filesystem::path BasePathForProject(std::string_view ProjectId);
};
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 75a49367c..c6ec4b82b 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -394,14 +394,16 @@ public:
}
ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds);
- zen::GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc",
- .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds),
- .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds),
- .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds),
- .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects,
- .Enabled = ServerOptions.GcConfig.Enabled,
- .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize,
- .DiskSizeSoftLimit = ServerOptions.GcConfig.Cache.DiskSizeSoftLimit};
+ zen::GcSchedulerConfig GcConfig{
+ .RootDirectory = m_DataRoot / "gc",
+ .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds),
+ .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds),
+ .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds),
+ .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects,
+ .Enabled = ServerOptions.GcConfig.Enabled,
+ .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize,
+ .DiskSizeSoftLimit = ServerOptions.GcConfig.Cache.DiskSizeSoftLimit,
+ .MinimumFreeDiskSpaceToAllowWrites = ServerOptions.GcConfig.Cache.MinimumFreeDiskSpaceToAllowWrites};
m_GcScheduler.Initialize(GcConfig);
return EffectiveBasePort;
@@ -845,8 +847,12 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
}
}
- m_StructuredCacheService =
- std::make_unique<HttpStructuredCacheService>(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, *m_UpstreamCache);
+ m_StructuredCacheService = std::make_unique<HttpStructuredCacheService>(*m_CacheStore,
+ *m_CidStore,
+ m_StatsService,
+ m_StatusService,
+ *m_UpstreamCache,
+ m_GcManager.GetDiskWriteBlocker());
m_Http->RegisterService(*m_StructuredCacheService);
m_Http->RegisterService(*m_UpstreamService);
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index e19712c40..378d9fd52 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -118,28 +118,19 @@ BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::functio
constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024;
-void
-BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
- uint64_t MaxBlockSize,
- uint64_t MaxBlockCount,
- const std::vector<BlockStoreLocation>& KnownLocations)
+std::unordered_map<uint32_t, uint64_t>
+BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount)
{
ZEN_ASSERT(MaxBlockSize > 0);
ZEN_ASSERT(MaxBlockCount > 0);
ZEN_ASSERT(IsPow2(MaxBlockCount));
+ std::unordered_map<uint32_t, uint64_t> FoundBlocks;
+
m_TotalSize = 0;
m_BlocksBasePath = BlocksBasePath;
m_MaxBlockSize = MaxBlockSize;
- m_ChunkBlocks.clear();
-
- std::unordered_set<uint32_t> KnownBlocks;
- for (const auto& Entry : KnownLocations)
- {
- KnownBlocks.insert(Entry.BlockIndex);
- }
-
if (std::filesystem::is_directory(m_BlocksBasePath))
{
std::vector<std::filesystem::path> FoldersToScan;
@@ -168,23 +159,11 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
{
continue;
}
- if (!KnownBlocks.contains(BlockIndex))
- {
- // Log removing unreferenced block
- // Clear out unused blocks
- ZEN_DEBUG("removing unused block at '{}'", Path);
- std::error_code Ec;
- std::filesystem::remove(Path, Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
- }
- continue;
- }
Ref<BlockStoreFile> BlockFile{new BlockStoreFile(Path)};
BlockFile->Open();
m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed);
m_ChunkBlocks[BlockIndex] = BlockFile;
+ FoundBlocks[BlockIndex] = BlockFile->FileSize();
}
}
++FolderOffset;
@@ -194,6 +173,39 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
{
CreateDirectories(m_BlocksBasePath);
}
+ return FoundBlocks;
+}
+
+void
+BlockStore::Prune(const std::vector<BlockStoreLocation>& KnownLocations)
+{
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+
+ std::unordered_set<uint32_t> KnownBlocks;
+ for (const auto& Entry : KnownLocations)
+ {
+ KnownBlocks.insert(Entry.BlockIndex);
+ }
+ std::vector<uint32_t> BlocksToDelete;
+ for (auto It = m_ChunkBlocks.begin(); It != m_ChunkBlocks.end(); ++It)
+ {
+ uint32_t BlockIndex = It->first;
+ if (!KnownBlocks.contains(BlockIndex))
+ {
+ Ref<BlockStoreFile> BlockFile = m_ChunkBlocks[BlockIndex];
+ m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed);
+ BlocksToDelete.push_back(BlockIndex);
+ }
+ }
+
+ for (uint32_t BlockIndex : BlocksToDelete)
+ {
+ // Clear out unused blocks
+ Ref<BlockStoreFile> BlockFile = m_ChunkBlocks[BlockIndex];
+ m_ChunkBlocks.erase(BlockIndex);
+ ZEN_DEBUG("marking block store file '{}' for delete, block #{}", BlockFile->GetPath(), BlockIndex);
+ BlockFile->MarkAsDeleteOnClose();
+ }
}
void
@@ -460,7 +472,10 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
WriteBlockTimeUs += ElapsedUs;
WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
- OldBlockFile = m_ChunkBlocks[BlockIndex];
+ if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end())
+ {
+ OldBlockFile = It->second;
+ }
}
if (!OldBlockFile)
@@ -492,8 +507,9 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
});
if (OldBlockFile)
{
- m_ChunkBlocks[BlockIndex] = nullptr;
ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile);
+ m_ChunkBlocks.erase(BlockIndex);
m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
OldBlockFile->MarkAsDeleteOnClose();
}
@@ -570,7 +586,9 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
ReadBlockTimeUs += ElapsedUs;
ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
+ ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
m_ChunkBlocks.erase(NextBlockIndex);
+ NewBlockFile->MarkAsDeleteOnClose();
return;
}
@@ -615,8 +633,9 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
ReadBlockTimeUs += ElapsedUs;
ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
- m_ChunkBlocks[BlockIndex] = nullptr;
ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile);
+ m_ChunkBlocks.erase(BlockIndex);
m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
OldBlockFile->MarkAsDeleteOnClose();
}
@@ -692,8 +711,8 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
size_t ChunkIndex = LocationIndexes[LocationIndexOffset];
const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex];
- const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[FirstLocation.BlockIndex];
- if (!BlockFile)
+ auto FindBlockIt = m_ChunkBlocks.find(FirstLocation.BlockIndex);
+ if (FindBlockIt == m_ChunkBlocks.end())
{
while (ChunkLocations[ChunkIndex].BlockIndex == FirstLocation.BlockIndex)
{
@@ -707,6 +726,9 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
}
continue;
}
+ const Ref<BlockStoreFile>& BlockFile = FindBlockIt->second;
+ ZEN_ASSERT(BlockFile);
+
size_t BlockSize = BlockFile->FileSize();
size_t RangeCount = GetNextRange(LocationIndexOffset);
if (RangeCount > 0)
@@ -939,7 +961,7 @@ TEST_CASE("blockstore.chunks")
auto RootDirectory = TempDir.Path();
BlockStore Store;
- Store.Initialize(RootDirectory, 128, 1024, {});
+ Store.Initialize(RootDirectory, 128, 1024);
IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512});
CHECK(!BadChunk);
@@ -969,7 +991,7 @@ TEST_CASE("blockstore.clean.stray.blocks")
auto RootDirectory = TempDir.Path();
BlockStore Store;
- Store.Initialize(RootDirectory / "store", 128, 1024, {});
+ Store.Initialize(RootDirectory / "store", 128, 1024);
std::string FirstChunkData = "This is the data of the first chunk that we will write";
BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
@@ -982,7 +1004,8 @@ TEST_CASE("blockstore.clean.stray.blocks")
Store.Close();
// Not referencing the second block means that we should be deleted
- Store.Initialize(RootDirectory / "store", 128, 1024, {FirstChunkLocation, SecondChunkLocation});
+ Store.Initialize(RootDirectory / "store", 128, 1024);
+ Store.Prune({FirstChunkLocation, SecondChunkLocation});
CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1);
}
@@ -995,7 +1018,7 @@ TEST_CASE("blockstore.flush.forces.new.block")
auto RootDirectory = TempDir.Path();
BlockStore Store;
- Store.Initialize(RootDirectory / "store", 128, 1024, {});
+ Store.Initialize(RootDirectory / "store", 128, 1024);
std::string FirstChunkData = "This is the data of the first chunk that we will write";
WriteStringAsChunk(Store, FirstChunkData, 4);
@@ -1018,7 +1041,7 @@ TEST_CASE("blockstore.iterate.chunks")
auto RootDirectory = TempDir.Path();
BlockStore Store;
- Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {});
+ Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024);
IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512});
CHECK(!BadChunk);
@@ -1114,7 +1137,7 @@ TEST_CASE("blockstore.reclaim.space")
auto RootDirectory = TempDir.Path();
BlockStore Store;
- Store.Initialize(RootDirectory / "store", 512, 1024, {});
+ Store.Initialize(RootDirectory / "store", 512, 1024);
constexpr size_t ChunkCount = 200;
constexpr size_t Alignment = 8;
@@ -1231,7 +1254,7 @@ TEST_CASE("blockstore.thread.read.write")
auto RootDirectory = TempDir.Path();
BlockStore Store;
- Store.Initialize(RootDirectory / "store", 1088, 1024, {});
+ Store.Initialize(RootDirectory / "store", 1088, 1024);
constexpr size_t ChunkCount = 1000;
constexpr size_t Alignment = 8;
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 7b2c21b0f..2974570e5 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -701,23 +701,60 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
std::filesystem::remove_all(BasePath);
}
+ CreateDirectories(BasePath);
+
+ std::unordered_map<uint32_t, uint64_t> BlockSizes =
+ m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1);
+
m_LogFlushPosition = ReadIndexFile();
uint64_t LogEntryCount = ReadLog(m_LogFlushPosition);
- CreateDirectories(BasePath);
-
std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
std::vector<BlockStoreLocation> KnownLocations;
KnownLocations.reserve(m_LocationMap.size());
+ std::vector<CasDiskIndexEntry> BadEntries;
for (const auto& Entry : m_LocationMap)
{
- const BlockStoreDiskLocation& Location = Entry.second;
- KnownLocations.push_back(Location.Get(m_PayloadAlignment));
+ const BlockStoreDiskLocation& DiskLocation = Entry.second;
+ auto BlockIt = BlockSizes.find(DiskLocation.GetBlockIndex());
+ if (BlockIt == BlockSizes.end())
+ {
+ ZEN_WARN("Unknown block {} for entry {}", DiskLocation.GetBlockIndex(), Entry.first.ToHexString());
+ }
+ else
+ {
+ BlockStoreLocation BlockLocation = DiskLocation.Get(m_PayloadAlignment);
+
+ uint64_t BlockSize = BlockIt->second;
+ if (BlockLocation.Offset + BlockLocation.Size > BlockSize)
+ {
+ ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString());
+ }
+ else
+ {
+ KnownLocations.emplace_back(std::move(BlockLocation));
+ continue;
+ }
+ BadEntries.push_back({.Key = Entry.first, .Location = DiskLocation, .Flags = CasDiskIndexEntry::kTombstone});
+ }
+ }
+
+ if (!BadEntries.empty())
+ {
+ m_CasLog.Append(BadEntries);
+ m_CasLog.Flush();
+
+ LogEntryCount += BadEntries.size();
+
+ for (const CasDiskIndexEntry& BadEntry : BadEntries)
+ {
+ m_LocationMap.erase(BadEntry.Key);
+ }
}
- m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
+ m_BlockStore.Prune(KnownLocations);
if (IsNewStore || (LogEntryCount > 0))
{
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index 370c3c965..f9888722b 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -559,10 +559,12 @@ DiskUsageWindow::FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick
GcScheduler::GcScheduler(GcManager& GcManager) : m_Log(logging::Get("gc")), m_GcManager(GcManager)
{
+ m_GcManager.SetDiskWriteBlocker(this);
}
GcScheduler::~GcScheduler()
{
+ m_GcManager.SetDiskWriteBlocker(nullptr);
Shutdown();
}
@@ -573,6 +575,18 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config)
m_Config = Config;
+ std::error_code Ec;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec);
+ if (Ec)
+ {
+ m_AreDiskWritesBlocked.store(true);
+ ZEN_WARN("get disk space info FAILED, blocking disk writes, reason: '{}'", Ec.message());
+ }
+ else
+ {
+ CheckDiskSpace(Space);
+ }
+
if (m_Config.Interval.count() && m_Config.Interval < m_Config.MonitorInterval)
{
m_Config.Interval = m_Config.MonitorInterval;
@@ -580,7 +594,7 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config)
std::filesystem::create_directories(Config.RootDirectory);
- std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize);
+ Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize);
if (Ec)
{
ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason '{}'",
@@ -664,6 +678,29 @@ GcScheduler::Trigger(const GcScheduler::TriggerParams& Params)
}
void
+GcScheduler::CheckDiskSpace(const DiskSpace& Space)
+{
+ bool AreDiskWritesBlocked = m_AreDiskWritesBlocked;
+ bool IsLowOnDiskSpace = (m_Config.MinimumFreeDiskSpaceToAllowWrites) != 0 && (Space.Free < m_Config.MinimumFreeDiskSpaceToAllowWrites);
+ if (IsLowOnDiskSpace != AreDiskWritesBlocked)
+ {
+ m_AreDiskWritesBlocked.store(IsLowOnDiskSpace);
+ if (IsLowOnDiskSpace)
+ {
+ ZEN_WARN("Writing to disk is blocked, free disk space: {}, minimum required {}",
+ NiceBytes(Space.Free),
+ NiceBytes(m_Config.MinimumFreeDiskSpaceToAllowWrites));
+ }
+ else
+ {
+ ZEN_INFO("Writing to disk is unblocked, free disk space: {}, minimum required {}",
+ NiceBytes(Space.Free),
+ NiceBytes(m_Config.MinimumFreeDiskSpaceToAllowWrites));
+ }
+ }
+}
+
+void
GcScheduler::SchedulerThread()
{
std::chrono::seconds WaitTime{0};
@@ -716,16 +753,21 @@ GcScheduler::SchedulerThread()
GcClock::TimePoint ExpireTime = MaxCacheDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxCacheDuration;
- std::error_code Ec;
const GcStorageSize TotalSize = m_GcManager.TotalStorageSize();
if (Timeout && Status() == GcSchedulerStatus::kIdle)
{
- DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec);
+ std::error_code Ec;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec);
if (Ec)
{
+ m_AreDiskWritesBlocked.store(true);
ZEN_WARN("get disk space info FAILED, reason: '{}'", Ec.message());
}
+ else
+ {
+ CheckDiskSpace(Space);
+ }
const int64_t PressureGraphLength = 30;
const std::chrono::duration LoadGraphTime = PressureGraphLength * m_Config.MonitorInterval;
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 857ccae38..738510cac 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -124,10 +124,11 @@ public:
typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback;
typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback;
- void Initialize(const std::filesystem::path& BlocksBasePath,
- uint64_t MaxBlockSize,
- uint64_t MaxBlockCount,
- const std::vector<BlockStoreLocation>& KnownLocations);
+ std::unordered_map<uint32_t, uint64_t> Initialize(const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount);
+
+ void Prune(const std::vector<BlockStoreLocation>& KnownLocations);
void Close();
void WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback);
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index e0354b331..fe9857e6a 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -27,6 +27,7 @@ class HashKeySet;
class GcManager;
class CidStore;
struct IoHash;
+struct DiskSpace;
/** GC clock
*/
@@ -121,6 +122,14 @@ private:
GcManager& m_Gc;
};
+/** Interface for querying if we are running low on disk space, used to deny put/writes to disk
+ */
+class DiskWriteBlocker
+{
+public:
+ virtual bool AreDiskWritesAllowed() const = 0;
+};
+
/** GC orchestrator
*/
class GcManager
@@ -139,6 +148,9 @@ public:
GcStorageSize TotalStorageSize() const;
+ const DiskWriteBlocker* GetDiskWriteBlocker() { return m_DiskWriteBlocker; }
+ void SetDiskWriteBlocker(const DiskWriteBlocker* Monitor) { m_DiskWriteBlocker = Monitor; }
+
#if ZEN_USE_REF_TRACKING
void OnNewCidReferences(std::span<IoHash> Hashes);
void OnCommittedCidReferences(std::span<IoHash> Hashes);
@@ -151,7 +163,8 @@ private:
mutable RwLock m_Lock;
std::vector<GcContributor*> m_GcContribs;
std::vector<GcStorage*> m_GcStorage;
- CidStore* m_CidStore = nullptr;
+ CidStore* m_CidStore = nullptr;
+ const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
};
enum class GcSchedulerStatus : uint32_t
@@ -167,10 +180,11 @@ struct GcSchedulerConfig
std::chrono::seconds MonitorInterval{30};
std::chrono::seconds Interval{};
std::chrono::seconds MaxCacheDuration{86400};
- bool CollectSmallObjects = true;
- bool Enabled = true;
- uint64_t DiskReserveSize = 1ul << 28;
- uint64_t DiskSizeSoftLimit = 0;
+ bool CollectSmallObjects = true;
+ bool Enabled = true;
+ uint64_t DiskReserveSize = 1ul << 28;
+ uint64_t DiskSizeSoftLimit = 0;
+ uint64_t MinimumFreeDiskSpaceToAllowWrites = 1ul << 28;
};
class DiskUsageWindow
@@ -196,7 +210,7 @@ public:
/**
* GC scheduler
*/
-class GcScheduler
+class GcScheduler : private DiskWriteBlocker
{
public:
GcScheduler(GcManager& GcManager);
@@ -220,6 +234,8 @@ private:
void CollectGarbage(const GcClock::TimePoint& ExpireTime, bool Delete, bool CollectSmallObjects);
GcClock::TimePoint NextGcTime(GcClock::TimePoint CurrentTime);
spdlog::logger& Log() { return m_Log; }
+ virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); }
+ void CheckDiskSpace(const DiskSpace& Space);
spdlog::logger& m_Log;
GcManager& m_GcManager;
@@ -232,6 +248,7 @@ private:
std::mutex m_GcMutex;
std::condition_variable m_GcSignal;
std::optional<TriggerParams> m_TriggerParams;
+ std::atomic_bool m_AreDiskWritesBlocked = false;
TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog;
DiskUsageWindow m_DiskUsageWindow;