aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-11-27 16:05:56 +0100
committerGitHub Enterprise <[email protected]>2025-11-27 16:05:56 +0100
commit4984e8cd5c38cf77c8cb978f75f808bce0577f2d (patch)
treec298828c6290a669500788f96f8ea25be41ff88a /src/zenserver
parentremove bad assert (#670) (diff)
downloadzen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.tar.xz
zen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.zip
automatic scrub on startup (#667)
- Improvement: Deeper validation of data when scrub is activated (cas/cache/project) - Improvement: Enabled more multi threading when running scrub operations - Improvement: Added means to force a scrub operation at startup with a new release using ZEN_DATA_FORCE_SCRUB_VERSION variable in xmake.lua
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/storage/cache/httpstructuredcache.cpp9
-rw-r--r--src/zenserver/storage/upstream/upstreamcache.cpp21
-rw-r--r--src/zenserver/storage/zenstorageserver.cpp48
3 files changed, 46 insertions, 32 deletions
diff --git a/src/zenserver/storage/cache/httpstructuredcache.cpp b/src/zenserver/storage/cache/httpstructuredcache.cpp
index ece1d7a46..03d2140a1 100644
--- a/src/zenserver/storage/cache/httpstructuredcache.cpp
+++ b/src/zenserver/storage/cache/httpstructuredcache.cpp
@@ -1238,7 +1238,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
uint64_t RawSize = Body.GetSize();
if (ContentType == HttpContentType::kCompressedBinary)
{
- if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr))
{
m_CacheStats.BadRequestCount++;
return Request.WriteResponse(HttpResponseCode::BadRequest,
@@ -1515,7 +1515,10 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons
{
IoHash RawHash;
uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize))
+ if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value,
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
if (RawHash == Ref.ValueContentId)
{
@@ -1601,7 +1604,7 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
IoHash RawHash;
uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr))
{
m_CacheStats.BadRequestCount++;
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
diff --git a/src/zenserver/storage/upstream/upstreamcache.cpp b/src/zenserver/storage/upstream/upstreamcache.cpp
index 6c489c5d3..938a1a011 100644
--- a/src/zenserver/storage/upstream/upstreamcache.cpp
+++ b/src/zenserver/storage/upstream/upstreamcache.cpp
@@ -208,7 +208,10 @@ namespace detail {
IoHash RawHash;
uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(AttachmentResult.Response, RawHash, RawSize))
+ if (CompressedBuffer::ValidateCompressedHeader(AttachmentResult.Response,
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
Result.Response = AttachmentResult.Response;
++NumAttachments;
@@ -425,7 +428,10 @@ namespace detail {
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
if (Payload && IsCompressedBinary(Payload.GetContentType()))
{
- IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize);
+ IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload,
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr);
}
}
@@ -481,7 +487,11 @@ namespace detail {
{
if (IsCompressedBinary(Payload.GetContentType()))
{
- IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize) && RawHash != PayloadHash;
+ IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload,
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr) &&
+ RawHash != PayloadHash;
}
else
{
@@ -559,7 +569,10 @@ namespace detail {
{
IoHash RawHash;
uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(RecordValue, RawHash, RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(RecordValue,
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
return {.Reason = std::string("Invalid compressed value buffer"), .Success = false};
}
diff --git a/src/zenserver/storage/zenstorageserver.cpp b/src/zenserver/storage/zenstorageserver.cpp
index e4f8c6aa3..827f22ecc 100644
--- a/src/zenserver/storage/zenstorageserver.cpp
+++ b/src/zenserver/storage/zenstorageserver.cpp
@@ -399,9 +399,10 @@ ZenStorageServer::InitializeState(const ZenStorageServerConfig& ServerOptions)
{
m_RootManifest = LoadCompactBinaryObject(Manifest);
- const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0);
- StateId = m_RootManifest["state_id"].AsObjectId();
- CreatedWhen = m_RootManifest["created"].AsDateTime();
+ const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0);
+ const int32_t ForceScrubVersion = m_RootManifest["force_scrub_version"].AsInt32(0);
+ StateId = m_RootManifest["state_id"].AsObjectId();
+ CreatedWhen = m_RootManifest["created"].AsDateTime();
if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION)
{
@@ -424,6 +425,10 @@ ZenStorageServer::InitializeState(const ZenStorageServerConfig& ServerOptions)
fmt::format("Manifest schema version: {}, differs from required: {}", ManifestVersion, ZEN_CFG_SCHEMA_VERSION);
}
}
+ else if (ForceScrubVersion != ZEN_CFG_DATA_FORCE_SCRUB_VERSION)
+ {
+ m_StartupScrubOptions = "nogc,wait";
+ }
}
}
}
@@ -473,7 +478,8 @@ ZenStorageServer::InitializeState(const ZenStorageServerConfig& ServerOptions)
{
CbObjectWriter Cbo;
- Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << CreatedWhen << "updated" << Now << "state_id" << StateId;
+ Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "force_scrub_version" << ZEN_CFG_DATA_FORCE_SCRUB_VERSION << "created"
+ << CreatedWhen << "updated" << Now << "state_id" << StateId;
m_RootManifest = Cbo.Save();
@@ -686,10 +692,6 @@ ZenStorageServer::Run()
const bool IsInteractiveMode = IsInteractiveSession() && !m_TestMode;
- SetNewState(kRunning);
-
- OnReady();
-
if (!m_StartupScrubOptions.empty())
{
using namespace std::literals;
@@ -726,33 +728,29 @@ ZenStorageServer::Run()
if (DoScrub)
{
- m_GcScheduler.TriggerScrub(ScrubParams);
+ ZEN_INFO("Triggering Scrub/GC operation...");
+ while (!m_GcScheduler.TriggerScrub(ScrubParams))
+ {
+ ZEN_INFO("GC scheduler is busy, waiting for it to complete...");
+ Sleep(500);
+ }
if (DoWait)
{
- auto State = m_GcScheduler.Status();
-
- while ((State != GcSchedulerStatus::kRunning) && (State != GcSchedulerStatus::kStopped))
- {
- Sleep(500);
-
- State = m_GcScheduler.Status();
- }
-
- ZEN_INFO("waiting for Scrub/GC to complete...");
-
- while (State == GcSchedulerStatus::kRunning)
+ while (m_GcScheduler.IsManualTriggerPresent())
{
- Sleep(500);
-
- State = m_GcScheduler.Status();
+ ZEN_INFO("Waiting for Scrub/GC to complete...");
+ Sleep(2000);
}
-
ZEN_INFO("Scrub/GC completed");
}
}
}
+ SetNewState(kRunning);
+
+ OnReady();
+
if (m_IsPowerCycle)
{
ZEN_INFO("Power cycle mode enabled -- shutting down");