aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-11-27 16:05:56 +0100
committerGitHub Enterprise <[email protected]>2025-11-27 16:05:56 +0100
commit4984e8cd5c38cf77c8cb978f75f808bce0577f2d (patch)
treec298828c6290a669500788f96f8ea25be41ff88a /src
parentremove bad assert (#670) (diff)
downloadzen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.tar.xz
zen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.zip
automatic scrub on startup (#667)
- Improvement: Deeper validation of data when scrub is activated (cas/cache/project) - Improvement: Enabled more multi threading when running scrub operations - Improvement: Added means to force a scrub operation at startup with a new release using ZEN_DATA_FORCE_SCRUB_VERSION variable in xmake.lua
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/print_cmd.cpp11
-rw-r--r--src/zencore/compress.cpp61
-rw-r--r--src/zencore/include/zencore/compress.h20
-rw-r--r--src/zencore/include/zencore/config.h.in1
-rw-r--r--src/zenhttp/clients/httpclientcpr.cpp2
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp5
-rw-r--r--src/zenremotestore/builds/filebuildstorage.cpp44
-rw-r--r--src/zenremotestore/jupiter/jupitersession.cpp5
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp8
-rw-r--r--src/zenserver/storage/cache/httpstructuredcache.cpp9
-rw-r--r--src/zenserver/storage/upstream/upstreamcache.cpp21
-rw-r--r--src/zenserver/storage/zenstorageserver.cpp48
-rw-r--r--src/zenstore/blockstore.cpp1
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp78
-rw-r--r--src/zenstore/cache/cacherpc.cpp5
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp50
-rw-r--r--src/zenstore/cidstore.cpp7
-rw-r--r--src/zenstore/compactcas.cpp49
-rw-r--r--src/zenstore/filecas.cpp109
-rw-r--r--src/zenstore/gc.cpp241
-rw-r--r--src/zenstore/include/zenstore/cache/cacheshared.h2
-rw-r--r--src/zenstore/include/zenstore/gc.h68
-rw-r--r--src/zenstore/include/zenstore/projectstore.h1
-rw-r--r--src/zenstore/include/zenstore/scrubcontext.h3
-rw-r--r--src/zenstore/projectstore.cpp302
-rw-r--r--src/zenstore/scrubcontext.cpp64
26 files changed, 748 insertions, 467 deletions
diff --git a/src/zen/cmds/print_cmd.cpp b/src/zen/cmds/print_cmd.cpp
index 557808ba7..a4737e66d 100644
--- a/src/zen/cmds/print_cmd.cpp
+++ b/src/zen/cmds/print_cmd.cpp
@@ -97,8 +97,17 @@ PrintCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
IoHash RawHash;
uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Data, RawHash, RawSize))
+ uint64_t CompressedSize;
+ if (CompressedBuffer::ValidateCompressedHeader(Data, RawHash, RawSize, &CompressedSize))
{
+ if (CompressedSize != Data.GetSize())
+ {
+ ZEN_CONSOLE_WARN(
+ "Compressed binary header total compressed size mismatch. Payload is {} bytes and header says compressed payload is {} "
+ "bytes",
+ Data.GetSize(),
+ CompressedSize);
+ }
ZEN_CONSOLE("Compressed binary: size {}, raw size {}, hash: {}", Data.GetSize(), RawSize, RawHash);
}
else if (IsPackageMessage(Data))
diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp
index 0c53a8000..1c623db4d 100644
--- a/src/zencore/compress.cpp
+++ b/src/zencore/compress.cpp
@@ -59,10 +59,16 @@ struct BufferHeader
BLAKE3 RawHash; // The hash of the uncompressed data
/** Checks validity of the buffer based on the magic number, method, and CRC-32. */
- static bool IsValid(const CompositeBuffer& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize);
- static bool IsValid(const SharedBuffer& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize)
+ static bool IsValid(const CompositeBuffer& CompressedData,
+ IoHash& OutRawHash,
+ uint64_t& OutRawSize,
+ uint64_t* OutOptionalTotalCompressedSize);
+ static bool IsValid(const SharedBuffer& CompressedData,
+ IoHash& OutRawHash,
+ uint64_t& OutRawSize,
+ uint64_t* OutOptionalTotalCompressedSize)
{
- return IsValid(CompositeBuffer(CompressedData), OutRawHash, OutRawSize);
+ return IsValid(CompositeBuffer(CompressedData), OutRawHash, OutRawSize, OutOptionalTotalCompressedSize);
}
/** Read a header from a buffer that is at least sizeof(BufferHeader) without any validation. */
@@ -1467,13 +1473,20 @@ ReadHeader(const CompositeBuffer& CompressedData, BufferHeader& OutHeader, Uniqu
}
bool
-BufferHeader::IsValid(const CompositeBuffer& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize)
+BufferHeader::IsValid(const CompositeBuffer& CompressedData,
+ IoHash& OutRawHash,
+ uint64_t& OutRawSize,
+ uint64_t* OutOptionalTotalCompressedSize)
{
detail::BufferHeader Header;
if (ReadHeader(CompressedData, Header, nullptr))
{
OutRawHash = IoHash::FromBLAKE3(Header.RawHash);
OutRawSize = Header.TotalRawSize;
+ if (OutOptionalTotalCompressedSize)
+ {
+ *OutOptionalTotalCompressedSize = Header.TotalCompressedSize;
+ }
return true;
}
return false;
@@ -1643,10 +1656,11 @@ private:
template<typename BufferType>
inline CompositeBuffer
-ValidBufferOrEmpty(BufferType&& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize)
+ValidBufferOrEmpty(BufferType&& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize, uint64_t* OutOptionalTotalCompressedSize)
{
- return BufferHeader::IsValid(CompressedData, OutRawHash, OutRawSize) ? CompositeBuffer(std::forward<BufferType>(CompressedData))
- : CompositeBuffer();
+ return BufferHeader::IsValid(CompressedData, OutRawHash, OutRawSize, OutOptionalTotalCompressedSize)
+ ? CompositeBuffer(std::forward<BufferType>(CompressedData))
+ : CompositeBuffer();
}
CompositeBuffer
@@ -1893,7 +1907,7 @@ CompressedBuffer
CompressedBuffer::FromCompressed(const CompositeBuffer& InCompressedData, IoHash& OutRawHash, uint64_t& OutRawSize)
{
CompressedBuffer Local;
- Local.CompressedData = detail::ValidBufferOrEmpty(InCompressedData, OutRawHash, OutRawSize);
+ Local.CompressedData = detail::ValidBufferOrEmpty(InCompressedData, OutRawHash, OutRawSize, /*OutOptionalTotalCompressedSize*/ nullptr);
return Local;
}
@@ -1901,7 +1915,8 @@ CompressedBuffer
CompressedBuffer::FromCompressed(CompositeBuffer&& InCompressedData, IoHash& OutRawHash, uint64_t& OutRawSize)
{
CompressedBuffer Local;
- Local.CompressedData = detail::ValidBufferOrEmpty(std::move(InCompressedData), OutRawHash, OutRawSize);
+ Local.CompressedData =
+ detail::ValidBufferOrEmpty(std::move(InCompressedData), OutRawHash, OutRawSize, /*OutOptionalTotalCompressedSize*/ nullptr);
return Local;
}
@@ -1909,7 +1924,7 @@ CompressedBuffer
CompressedBuffer::FromCompressed(const SharedBuffer& InCompressedData, IoHash& OutRawHash, uint64_t& OutRawSize)
{
CompressedBuffer Local;
- Local.CompressedData = detail::ValidBufferOrEmpty(InCompressedData, OutRawHash, OutRawSize);
+ Local.CompressedData = detail::ValidBufferOrEmpty(InCompressedData, OutRawHash, OutRawSize, /*OutOptionalTotalCompressedSize*/ nullptr);
return Local;
}
@@ -1917,7 +1932,8 @@ CompressedBuffer
CompressedBuffer::FromCompressed(SharedBuffer&& InCompressedData, IoHash& OutRawHash, uint64_t& OutRawSize)
{
CompressedBuffer Local;
- Local.CompressedData = detail::ValidBufferOrEmpty(std::move(InCompressedData), OutRawHash, OutRawSize);
+ Local.CompressedData =
+ detail::ValidBufferOrEmpty(std::move(InCompressedData), OutRawHash, OutRawSize, /*OutOptionalTotalCompressedSize*/ nullptr);
return Local;
}
@@ -1946,15 +1962,30 @@ CompressedBuffer::FromCompressedNoValidate(CompositeBuffer&& InCompressedData)
}
bool
-CompressedBuffer::ValidateCompressedHeader(IoBuffer&& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize)
+CompressedBuffer::ValidateCompressedHeader(IoBuffer&& CompressedData,
+ IoHash& OutRawHash,
+ uint64_t& OutRawSize,
+ uint64_t* OutOptionalTotalCompressedSize)
+{
+ return detail::BufferHeader::IsValid(SharedBuffer(std::move(CompressedData)), OutRawHash, OutRawSize, OutOptionalTotalCompressedSize);
+}
+
+bool
+CompressedBuffer::ValidateCompressedHeader(const IoBuffer& CompressedData,
+ IoHash& OutRawHash,
+ uint64_t& OutRawSize,
+ uint64_t* OutOptionalTotalCompressedSize)
{
- return detail::BufferHeader::IsValid(SharedBuffer(std::move(CompressedData)), OutRawHash, OutRawSize);
+ return detail::BufferHeader::IsValid(SharedBuffer(CompressedData), OutRawHash, OutRawSize, OutOptionalTotalCompressedSize);
}
bool
-CompressedBuffer::ValidateCompressedHeader(const IoBuffer& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize)
+CompressedBuffer::ValidateCompressedHeader(const CompositeBuffer& CompressedData,
+ IoHash& OutRawHash,
+ uint64_t& OutRawSize,
+ uint64_t* OutOptionalTotalCompressedSize)
{
- return detail::BufferHeader::IsValid(SharedBuffer(CompressedData), OutRawHash, OutRawSize);
+ return detail::BufferHeader::IsValid(CompressedData, OutRawHash, OutRawSize, OutOptionalTotalCompressedSize);
}
size_t
diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h
index 09fa6249d..3802a8c31 100644
--- a/src/zencore/include/zencore/compress.h
+++ b/src/zencore/include/zencore/compress.h
@@ -100,12 +100,20 @@ public:
uint64_t& OutRawSize);
[[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressedNoValidate(IoBuffer&& CompressedData);
[[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressedNoValidate(CompositeBuffer&& CompressedData);
- [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(IoBuffer&& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize);
- [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(const IoBuffer& CompressedData,
- IoHash& OutRawHash,
- uint64_t& OutRawSize);
- [[nodiscard]] ZENCORE_API static size_t GetHeaderSizeForNoneEncoder();
- [[nodiscard]] ZENCORE_API static UniqueBuffer CreateHeaderForNoneEncoder(uint64_t RawSize, const BLAKE3& RawHash);
+ [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(IoBuffer&& CompressedData,
+ IoHash& OutRawHash,
+ uint64_t& OutRawSize,
+ uint64_t* OutOptionalTotalCompressedSize);
+ [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(const IoBuffer& CompressedData,
+ IoHash& OutRawHash,
+ uint64_t& OutRawSize,
+ uint64_t* OutOptionalTotalCompressedSize);
+ [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(const CompositeBuffer& CompressedData,
+ IoHash& OutRawHash,
+ uint64_t& OutRawSize,
+ uint64_t* OutOptionalTotalCompressedSize);
+ [[nodiscard]] ZENCORE_API static size_t GetHeaderSizeForNoneEncoder();
+ [[nodiscard]] ZENCORE_API static UniqueBuffer CreateHeaderForNoneEncoder(uint64_t RawSize, const BLAKE3& RawHash);
/** Reset this to null. */
inline void Reset() { CompressedData.Reset(); }
diff --git a/src/zencore/include/zencore/config.h.in b/src/zencore/include/zencore/config.h.in
index 3372eca2a..93256756d 100644
--- a/src/zencore/include/zencore/config.h.in
+++ b/src/zencore/include/zencore/config.h.in
@@ -14,3 +14,4 @@
#define ZEN_CFG_VERSION_BUILD_STRING "${VERSION}-${plat}-${arch}-${mode}"
#define ZEN_CFG_VERSION_BUILD_STRING_FULL "${VERSION}-${VERSION_BUILD}-${plat}-${arch}-${mode}-${GIT_COMMIT}"
#define ZEN_CFG_SCHEMA_VERSION ${ZEN_SCHEMA_VERSION}
+#define ZEN_CFG_DATA_FORCE_SCRUB_VERSION ${ZEN_DATA_FORCE_SCRUB_VERSION}
diff --git a/src/zenhttp/clients/httpclientcpr.cpp b/src/zenhttp/clients/httpclientcpr.cpp
index 66a4ce16f..7bfc4670c 100644
--- a/src/zenhttp/clients/httpclientcpr.cpp
+++ b/src/zenhttp/clients/httpclientcpr.cpp
@@ -296,7 +296,7 @@ CprHttpClient::ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::
{
IoHash RawHash;
uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(ResponseBuffer, RawHash, RawSize))
+ if (CompressedBuffer::ValidateCompressedHeader(ResponseBuffer, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr))
{
return true;
}
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 76f36a921..ba82c5956 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -2926,7 +2926,10 @@ BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash)
{
IoHash RawHash;
uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize))
+ if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart,
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
return CompressedChunkPath;
}
diff --git a/src/zenremotestore/builds/filebuildstorage.cpp b/src/zenremotestore/builds/filebuildstorage.cpp
index 96d81b281..153deaa9f 100644
--- a/src/zenremotestore/builds/filebuildstorage.cpp
+++ b/src/zenremotestore/builds/filebuildstorage.cpp
@@ -9,6 +9,7 @@
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zenstore/scrubcontext.h>
namespace zen {
@@ -278,7 +279,7 @@ public:
ZEN_UNUSED(BuildId);
ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
- ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload));
+ ZEN_ASSERT_SLOW(ValidateCompressedBuffer(Payload, &RawHash));
uint64_t ReceivedBytes = 0;
uint64_t SentBytes = Payload.GetSize();
@@ -374,7 +375,7 @@ public:
if (IsLastPart)
{
Workload->TempFile.Flush();
- ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(Workload->TempFile.ReadAll())));
+ ZEN_ASSERT_SLOW(ValidateCompressedBuffer(CompositeBuffer(Workload->TempFile.ReadAll()), &RawHash));
Workload->TempFile.MoveTemporaryIntoPlace(BlockPath, Ec);
if (Ec)
{
@@ -423,7 +424,7 @@ public:
else
{
Payload = File.ReadAll();
- ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload))));
+ ZEN_ASSERT_SLOW(ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Payload)), &RawHash));
}
Payload.SetContentType(ZenContentType::kCompressedBinary);
ReceivedBytes = Payload.GetSize();
@@ -734,43 +735,6 @@ protected:
return NeededAttachments;
}
- bool ValidateCompressedBuffer(const IoHash& RawHash, const CompositeBuffer& Payload)
- {
- IoHash VerifyHash;
- uint64_t VerifySize;
- CompressedBuffer ValidateBuffer = CompressedBuffer::FromCompressed(Payload, VerifyHash, VerifySize);
- if (!ValidateBuffer)
- {
- return false;
- }
- if (VerifyHash != RawHash)
- {
- return false;
- }
-
- IoHashStream Hash;
- bool CouldDecompress = ValidateBuffer.DecompressToStream(
- 0,
- (uint64_t)-1,
- [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_UNUSED(SourceOffset, SourceSize, Offset);
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
- {
- Hash.Append(Segment.GetView());
- }
- return true;
- });
- if (!CouldDecompress)
- {
- return false;
- }
- if (Hash.GetHash() != VerifyHash)
- {
- return false;
- }
- return true;
- }
-
private:
void AddStatistic(Stopwatch& ExecutionTimer, uint64_t UploadedBytes, uint64_t DownloadedBytes)
{
diff --git a/src/zenremotestore/jupiter/jupitersession.cpp b/src/zenremotestore/jupiter/jupitersession.cpp
index 5a2bf9fe8..a0c961e37 100644
--- a/src/zenremotestore/jupiter/jupitersession.cpp
+++ b/src/zenremotestore/jupiter/jupitersession.cpp
@@ -782,7 +782,10 @@ JupiterSession::GetBuildBlob(std::string_view Namespace,
uint64_t ValidateRawSize = 0;
if (!Headers.Entries.contains("Range"))
{
- ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize));
+ ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload,
+ ValidateRawHash,
+ ValidateRawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr));
ZEN_ASSERT_SLOW(ValidateRawHash == Hash);
ZEN_ASSERT_SLOW(ValidateRawSize > 0);
ZEN_UNUSED(ValidateRawHash, ValidateRawSize);
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index f42856252..99c36b460 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -544,9 +544,11 @@ namespace remotestore_impl {
WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
IoHash RawHash;
uint64_t RawSize;
- ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(),
- RawHash,
- RawSize));
+ ZEN_ASSERT(
+ CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(),
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr));
ZEN_ASSERT(RawHash == AttachmentRawHash);
WriteRawHashes.emplace_back(AttachmentRawHash);
WantedChunks.erase(AttachmentRawHash);
diff --git a/src/zenserver/storage/cache/httpstructuredcache.cpp b/src/zenserver/storage/cache/httpstructuredcache.cpp
index ece1d7a46..03d2140a1 100644
--- a/src/zenserver/storage/cache/httpstructuredcache.cpp
+++ b/src/zenserver/storage/cache/httpstructuredcache.cpp
@@ -1238,7 +1238,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
uint64_t RawSize = Body.GetSize();
if (ContentType == HttpContentType::kCompressedBinary)
{
- if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr))
{
m_CacheStats.BadRequestCount++;
return Request.WriteResponse(HttpResponseCode::BadRequest,
@@ -1515,7 +1515,10 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons
{
IoHash RawHash;
uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize))
+ if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value,
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
if (RawHash == Ref.ValueContentId)
{
@@ -1601,7 +1604,7 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
IoHash RawHash;
uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr))
{
m_CacheStats.BadRequestCount++;
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
diff --git a/src/zenserver/storage/upstream/upstreamcache.cpp b/src/zenserver/storage/upstream/upstreamcache.cpp
index 6c489c5d3..938a1a011 100644
--- a/src/zenserver/storage/upstream/upstreamcache.cpp
+++ b/src/zenserver/storage/upstream/upstreamcache.cpp
@@ -208,7 +208,10 @@ namespace detail {
IoHash RawHash;
uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(AttachmentResult.Response, RawHash, RawSize))
+ if (CompressedBuffer::ValidateCompressedHeader(AttachmentResult.Response,
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
Result.Response = AttachmentResult.Response;
++NumAttachments;
@@ -425,7 +428,10 @@ namespace detail {
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
if (Payload && IsCompressedBinary(Payload.GetContentType()))
{
- IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize);
+ IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload,
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr);
}
}
@@ -481,7 +487,11 @@ namespace detail {
{
if (IsCompressedBinary(Payload.GetContentType()))
{
- IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize) && RawHash != PayloadHash;
+ IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload,
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr) &&
+ RawHash != PayloadHash;
}
else
{
@@ -559,7 +569,10 @@ namespace detail {
{
IoHash RawHash;
uint64_t RawSize;
- if (!CompressedBuffer::ValidateCompressedHeader(RecordValue, RawHash, RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(RecordValue,
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
return {.Reason = std::string("Invalid compressed value buffer"), .Success = false};
}
diff --git a/src/zenserver/storage/zenstorageserver.cpp b/src/zenserver/storage/zenstorageserver.cpp
index e4f8c6aa3..827f22ecc 100644
--- a/src/zenserver/storage/zenstorageserver.cpp
+++ b/src/zenserver/storage/zenstorageserver.cpp
@@ -399,9 +399,10 @@ ZenStorageServer::InitializeState(const ZenStorageServerConfig& ServerOptions)
{
m_RootManifest = LoadCompactBinaryObject(Manifest);
- const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0);
- StateId = m_RootManifest["state_id"].AsObjectId();
- CreatedWhen = m_RootManifest["created"].AsDateTime();
+ const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0);
+ const int32_t ForceScrubVersion = m_RootManifest["force_scrub_version"].AsInt32(0);
+ StateId = m_RootManifest["state_id"].AsObjectId();
+ CreatedWhen = m_RootManifest["created"].AsDateTime();
if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION)
{
@@ -424,6 +425,10 @@ ZenStorageServer::InitializeState(const ZenStorageServerConfig& ServerOptions)
fmt::format("Manifest schema version: {}, differs from required: {}", ManifestVersion, ZEN_CFG_SCHEMA_VERSION);
}
}
+ else if (ForceScrubVersion != ZEN_CFG_DATA_FORCE_SCRUB_VERSION)
+ {
+ m_StartupScrubOptions = "nogc,wait";
+ }
}
}
}
@@ -473,7 +478,8 @@ ZenStorageServer::InitializeState(const ZenStorageServerConfig& ServerOptions)
{
CbObjectWriter Cbo;
- Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << CreatedWhen << "updated" << Now << "state_id" << StateId;
+ Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "force_scrub_version" << ZEN_CFG_DATA_FORCE_SCRUB_VERSION << "created"
+ << CreatedWhen << "updated" << Now << "state_id" << StateId;
m_RootManifest = Cbo.Save();
@@ -686,10 +692,6 @@ ZenStorageServer::Run()
const bool IsInteractiveMode = IsInteractiveSession() && !m_TestMode;
- SetNewState(kRunning);
-
- OnReady();
-
if (!m_StartupScrubOptions.empty())
{
using namespace std::literals;
@@ -726,33 +728,29 @@ ZenStorageServer::Run()
if (DoScrub)
{
- m_GcScheduler.TriggerScrub(ScrubParams);
+ ZEN_INFO("Triggering Scrub/GC operation...");
+ while (!m_GcScheduler.TriggerScrub(ScrubParams))
+ {
+ ZEN_INFO("GC scheduler is busy, waiting for it to complete...");
+ Sleep(500);
+ }
if (DoWait)
{
- auto State = m_GcScheduler.Status();
-
- while ((State != GcSchedulerStatus::kRunning) && (State != GcSchedulerStatus::kStopped))
- {
- Sleep(500);
-
- State = m_GcScheduler.Status();
- }
-
- ZEN_INFO("waiting for Scrub/GC to complete...");
-
- while (State == GcSchedulerStatus::kRunning)
+ while (m_GcScheduler.IsManualTriggerPresent())
{
- Sleep(500);
-
- State = m_GcScheduler.Status();
+ ZEN_INFO("Waiting for Scrub/GC to complete...");
+ Sleep(2000);
}
-
ZEN_INFO("Scrub/GC completed");
}
}
}
+ SetNewState(kRunning);
+
+ OnReady();
+
if (m_IsPowerCycle)
{
ZEN_INFO("Power cycle mode enabled -- shutting down");
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 6c5b50f58..f97c98e08 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1129,6 +1129,7 @@ BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocati
++RangeEnd;
}
+ ZEN_LOG_SCOPE("iterating chunks from '{}'", GetBlockPath(m_BlocksBasePath, BlockIndex));
if (!Callback(BlockIndex, ChunkIndexRange.subspan(RangeStart, RangeEnd - RangeStart)))
{
return false;
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 67f587a7c..4f0d412ec 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -241,7 +241,10 @@ UpdateValueWithRawSizeAndHash(ZenCacheValue& Value)
{
if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
{
- return CompressedBuffer::ValidateCompressedHeader(Value.Value, Value.RawHash, Value.RawSize);
+ return CompressedBuffer::ValidateCompressedHeader(Value.Value,
+ Value.RawHash,
+ Value.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr);
}
else
{
@@ -1623,7 +1626,10 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData");
if (Location.IsFlagSet(DiskLocation::kCompressed))
{
- if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value,
+ OutValue.RawHash,
+ OutValue.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
OutValue = ZenCacheValue{};
AddToMemCache = false;
@@ -1752,7 +1758,10 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept
{
if (OutValue.Value.GetContentType() == ZenContentType::kCompressedBinary)
{
- if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value,
+ OutValue.RawHash,
+ OutValue.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
OutValue = ZenCacheValue{};
}
@@ -1900,7 +1909,10 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData");
if (Location.IsFlagSet(DiskLocation::kCompressed))
{
- if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize))
+ if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value,
+ OutValue.RawHash,
+ OutValue.RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
OutValue = ZenCacheValue{};
m_DiskMissCount++;
@@ -2369,7 +2381,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
auto LogStats = MakeGuard([&] {
const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs());
- ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})",
+ ZEN_INFO("cache bucket '{}' scrubbed {} in {} from {} chunks ({})",
m_BucketName,
NiceBytes(VerifiedChunkBytes.load()),
NiceTimeSpanMs(DurationMs),
@@ -2402,10 +2414,10 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
const BucketPayload& Payload = m_Payloads[Kv.second];
const DiskLocation& Loc = Payload.Location;
+ Ctx.ThrowIfDeadlineExpired();
+
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- Ctx.ThrowIfDeadlineExpired();
-
ChunkCount.fetch_add(1);
VerifiedChunkBytes.fetch_add(Loc.Size());
@@ -2439,7 +2451,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
ReportBadKey(HashKey);
continue;
}
- if (!ValidateIoBuffer(Loc.GetContentType(), Buffer))
+ if (!ValidateIoBuffer(Loc.GetContentType(), std::move(Buffer)))
{
ReportBadKey(HashKey);
continue;
@@ -2478,7 +2490,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
ZenContentType ContentType = Payload.Location.GetContentType();
Buffer.SetContentType(ContentType);
- if (!ValidateIoBuffer(ContentType, Buffer))
+ if (!ValidateIoBuffer(ContentType, std::move(Buffer)))
{
ReportBadKey(Hash);
return true;
@@ -2501,7 +2513,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
ZenContentType ContentType = Payload.Location.GetContentType();
Buffer.SetContentType(ContentType);
- if (!ValidateIoBuffer(ContentType, Buffer))
+ if (!ValidateIoBuffer(ContentType, std::move(Buffer)))
{
ReportBadKey(Hash);
return true;
@@ -2522,7 +2534,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
if (!BadKeys.empty())
{
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir);
+ ZEN_WARN("Scrubbing found {} bad chunks out of {} in '{}'", BadKeys.size(), ChunkCount.load(), m_BucketDir);
if (Ctx.RunRecovery())
{
@@ -4443,35 +4455,33 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
ZEN_TRACE_CPU("Z$::ScrubStorage");
RwLock::SharedLockScope _(m_Lock);
- {
- std::vector<std::future<void>> Results;
- Results.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
- {
-# if 1
- Results.push_back(Ctx.ThreadPool().EnqueueTask(
- std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }},
- WorkerThreadPool::EMode::EnableBacklog));
-# else
- CacheBucket& Bucket = *Kv.second;
- Bucket.ScrubStorage(Ctx);
-# endif
- }
+ std::atomic<bool> Abort;
+ std::atomic<bool> Pause;
+ ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog);
- for (auto& Result : Results)
- {
- if (Result.valid())
- {
- Result.wait();
- }
- }
- for (auto& Result : Results)
+ try
+ {
+ for (auto& Kv : m_Buckets)
{
- Result.get();
+ Ctx.ThrowIfDeadlineExpired();
+ Work.ScheduleWork(Ctx.ThreadPool(), [Bucket = Kv.second.get(), &Ctx](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ Bucket->ScrubStorage(Ctx);
+ }
+ });
}
+ Work.Wait();
+ }
+ catch (const ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ Abort = true;
+ Work.Wait();
}
}
+
#endif // ZEN_WITH_TESTS
CacheStoreSize
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 09a3d0afc..9e57b41c3 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -1757,7 +1757,10 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
else
{
IoHash _;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, _, Request->RawSize))
+ if (CompressedBuffer::ValidateCompressedHeader(Payload,
+ _,
+ Request->RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
Request->Exists = true;
Request->RawSizeKnown = true;
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index a164f66c3..c0b433c51 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -4,7 +4,9 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryutil.h>
#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compositebuffer.h>
#include <zencore/compress.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
@@ -71,48 +73,38 @@ IsKnownBadBucketName(std::string_view Bucket)
}
bool
-ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer)
+ValidateIoBuffer(ZenContentType ContentType, IoBuffer&& Buffer)
{
ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType);
if (ContentType == ZenContentType::kCbObject)
{
- CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
+ uint64_t BufferSize = Buffer.GetSize();
+ CbValidateError Error = CbValidateError::None;
+ CbObject Object = ValidateAndReadCompactBinaryObject(std::move(Buffer), Error);
if (Error == CbValidateError::None)
{
- return true;
- }
-
- ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error));
-
- return false;
- }
- else if (ContentType == ZenContentType::kCompressedBinary)
- {
- IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer);
-
- IoHash HeaderRawHash;
- uint64_t RawSize = 0;
- if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize))
- {
- ZEN_SCOPED_ERROR("compressed buffer header validation failed");
-
- return false;
+ if (Object.GetSize() == BufferSize)
+ {
+ return true;
+ }
+ else
+ {
+ ZEN_SCOPED_WARN("compact binary object size {} does not match payload size {}", Object.GetSize(), BufferSize);
+ return false;
+ }
}
-
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize);
- CompositeBuffer Decompressed = Compressed.DecompressToComposite();
- IoHash DecompressedHash = IoHash::HashBuffer(Decompressed);
-
- if (HeaderRawHash != DecompressedHash)
+ else
{
- ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash);
-
+ ZEN_SCOPED_WARN("compact binary validation failed: '{}'", ToString(Error));
return false;
}
}
+ else if (ContentType == ZenContentType::kCompressedBinary)
+ {
+ return ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(std::move(Buffer))), /*OptionalExpectedHash*/ nullptr);
+ }
else
{
// No way to verify this kind of content (what is it exactly?)
diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp
index 52d5df061..bedf91287 100644
--- a/src/zenstore/cidstore.cpp
+++ b/src/zenstore/cidstore.cpp
@@ -31,7 +31,8 @@ struct CidStore::Impl
#if ZEN_BUILD_DEBUG
IoHash VerifyRawHash;
uint64_t _;
- ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHash == VerifyRawHash);
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _, /*OutOptionalTotalCompressedSize*/ nullptr) &&
+ RawHash == VerifyRawHash);
#endif
IoBuffer Payload(ChunkData);
@@ -66,7 +67,9 @@ struct CidStore::Impl
#if ZEN_BUILD_DEBUG
IoHash VerifyRawHash;
uint64_t _;
- ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHashes[Offset++] == VerifyRawHash);
+ ZEN_ASSERT(
+ CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _, /*OutOptionalTotalCompressedSize*/ nullptr) &&
+ RawHashes[Offset++] == VerifyRawHash);
#endif
Chunks.push_back(ChunkData);
Chunks.back().SetContentType(ZenContentType::kCompressedBinary);
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index e1f17fbb9..a5de5c448 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -557,6 +557,10 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
+ std::atomic<bool> Abort;
+ std::atomic<bool> Pause;
+ ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog);
+
try
{
RwLock::SharedLockScope _(m_LocationMapLock);
@@ -589,58 +593,57 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
}
IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+
+ if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Buffer)), &Hash))
{
- if (RawHash == Hash)
- {
- // TODO: this should also hash the (decompressed) contents
- return true;
- }
+ BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
}
- BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
return true;
};
const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- Ctx.ThrowIfDeadlineExpired();
-
ChunkCount.fetch_add(1);
ChunkBytes.fetch_add(Size);
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
- IoHash RawHash;
- uint64_t RawSize;
- // TODO: Add API to verify compressed buffer without having to memory-map the whole file
- if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize))
+ if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Buffer)), &Hash))
{
- if (RawHash == Hash)
- {
- // TODO: this should also hash the (decompressed) contents
- return true;
- }
+ BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
}
- BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); });
return true;
};
m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
- return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0);
+ Ctx.ThrowIfDeadlineExpired();
+ Work.ScheduleWork(
+ Ctx.ThreadPool(),
+ [&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0);
+ }
+ });
+ return !Abort;
});
+ Work.Wait();
}
catch (const ScrubDeadlineExpiredException&)
{
ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ Abort = true;
+ Work.Wait();
}
Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
if (!BadKeys.empty())
{
- ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName);
+ ZEN_WARN("Scrubbing found {} bad chunks out of {} in '{}'",
+ BadKeys.size(),
+ ChunkCount.load(),
+ m_RootDirectory / m_ContainerBaseName);
if (Ctx.RunRecovery())
{
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 13d881e0c..31b3a68c4 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -799,8 +799,11 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
ZEN_ASSERT(m_IsInitialized);
- std::vector<IoHash> BadHashes;
- uint64_t ChunkCount{0}, ChunkBytes{0};
+ RwLock BadKeysLock;
+ std::vector<IoHash> BadKeys;
+ auto ReportBadKey = [&](const IoHash& Key) { BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Key); }); };
+
+ uint64_t ChunkCount{0}, ChunkBytes{0};
int DiscoveredFilesNotInIndex = 0;
@@ -820,88 +823,56 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
ZEN_INFO("discovered {} files @ '{}' ({} not in index), scrubbing", m_Index.size(), m_RootDirectory, DiscoveredFilesNotInIndex);
- IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
- if (!Payload)
- {
- BadHashes.push_back(Hash);
- return;
- }
- ++ChunkCount;
- ChunkBytes += Payload.GetSize();
+ std::atomic<bool> Abort;
+ std::atomic<bool> Pause;
+ ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog);
+
+ try
+ {
+ IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
+ Ctx.ThrowIfDeadlineExpired();
- IoBuffer InMemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Payload);
+ ChunkCount++;
+ ChunkBytes += Payload.GetSize();
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, /* out */ RawHash, /* out */ RawSize))
- {
- if (RawHash == Hash)
+ if (!Payload)
{
- // Header hash matches the file name, full validation requires that
- // we check that the decompressed data hash also matches
+ ReportBadKey(Hash);
+ return;
+ }
- CompressedBuffer CompBuffer = CompressedBuffer::FromCompressedNoValidate(std::move(InMemoryBuffer));
+ Payload.MakeOwned();
- OodleCompressor Compressor;
- OodleCompressionLevel CompressionLevel;
- uint64_t BlockSize;
- if (CompBuffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ Work.ScheduleWork(Ctx.ThreadPool(), [&, Hash = IoHash(Hash), Payload = std::move(Payload)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
{
- if (BlockSize == 0)
- {
- BlockSize = 256 * 1024;
- }
- else if (BlockSize < (1024 * 1024))
- {
- BlockSize = BlockSize * (1024 * 1024 / BlockSize);
- }
-
- std::unique_ptr<uint8_t[]> DecompressionBuffer(new uint8_t[BlockSize]);
-
- IoHashStream Hasher;
-
- uint64_t RawOffset = 0;
- while (RawSize)
+ if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Payload)), &Hash))
{
- const uint64_t DecompressedBlockSize = Min(BlockSize, RawSize);
-
- bool Ok = CompBuffer.TryDecompressTo(MutableMemoryView((void*)DecompressionBuffer.get(), DecompressedBlockSize),
- RawOffset);
-
- if (Ok)
- {
- Hasher.Append(DecompressionBuffer.get(), DecompressedBlockSize);
- }
-
- RawSize -= DecompressedBlockSize;
- RawOffset += DecompressedBlockSize;
- }
-
- const IoHash FinalHash = Hasher.GetHash();
-
- if (FinalHash == Hash)
- {
- // all good
- return;
+ ReportBadKey(Hash);
}
}
- }
- }
-
- BadHashes.push_back(Hash);
- });
+ });
+ });
+ Work.Wait();
+ }
+ catch (const ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ Abort = true;
+ Work.Wait();
+ }
Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
- if (!BadHashes.empty())
+ if (!BadKeys.empty())
{
- ZEN_WARN("file CAS scrubbing: {} bad chunks found @ '{}'", BadHashes.size(), m_RootDirectory);
+ ZEN_WARN("file CAS scrubbing: {} bad chunks out of {} found @ '{}'", BadKeys.size(), ChunkCount, m_RootDirectory);
if (Ctx.RunRecovery())
{
- ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size());
+ ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadKeys.size());
- for (const IoHash& Hash : BadHashes)
+ for (const IoHash& Hash : BadKeys)
{
std::error_code Ec;
DeleteChunk(Hash, Ec);
@@ -914,14 +885,14 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
}
else
{
- ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadHashes.size());
+ ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadKeys.size());
}
}
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadCidChunks(BadHashes);
+ Ctx.ReportBadCidChunks(BadKeys);
ZEN_INFO("file CAS @ '{}' scrubbed: {} chunks ({}), took {}",
m_RootDirectory,
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index ba1bce974..a4a141577 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -1810,6 +1810,8 @@ GcScheduler::Shutdown()
ZEN_TRACE_CPU("GcScheduler::Shutdown");
ZEN_MEMSCOPE(GetGcTag());
+ m_GcManager.SetCancelGC(true);
+
if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status)
{
bool GcIsRunning = m_Status == static_cast<uint32_t>(GcSchedulerStatus::kRunning);
@@ -1817,18 +1819,18 @@ GcScheduler::Shutdown()
{
ZEN_INFO("Requesting cancel running garbage collection");
}
- m_GcManager.SetCancelGC(true);
m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped);
- m_GcSignal.notify_one();
+ m_GcSignal.Set();
+ }
- if (m_GcThread.joinable())
+ if (m_GcThread.joinable())
+ {
+ bool GcIsRunning = m_Status == static_cast<uint32_t>(GcSchedulerStatus::kRunning);
+ if (GcIsRunning)
{
- if (GcIsRunning)
- {
- ZEN_INFO("Waiting for garbage collection to complete");
- }
- m_GcThread.join();
+ ZEN_INFO("Waiting for garbage collection to complete");
}
+ m_GcThread.join();
}
m_DiskUsageLog.Flush();
m_DiskUsageLog.Close();
@@ -1839,17 +1841,17 @@ GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params)
{
ZEN_MEMSCOPE(GetGcTag());
std::unique_lock Lock(m_GcMutex);
- if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
+
+ if (m_TriggerGcParams || m_TriggerScrubParams)
{
- m_TriggerGcParams = Params;
- uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
+ return false;
+ }
- if (m_Status.compare_exchange_strong(/* expected */ IdleState,
- /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
- {
- m_GcSignal.notify_one();
- return true;
- }
+ if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status)
+ {
+ m_TriggerGcParams = Params;
+ m_GcSignal.Set();
+ return true;
}
return false;
}
@@ -1860,17 +1862,16 @@ GcScheduler::TriggerScrub(const TriggerScrubParams& Params)
ZEN_MEMSCOPE(GetGcTag());
std::unique_lock Lock(m_GcMutex);
- if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status)
+ if (m_TriggerGcParams || m_TriggerScrubParams)
{
- m_TriggerScrubParams = Params;
- uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
-
- if (m_Status.compare_exchange_strong(/* expected */ IdleState, /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
- {
- m_GcSignal.notify_one();
+ return false;
+ }
- return true;
- }
+ if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status)
+ {
+ m_TriggerScrubParams = Params;
+ m_GcSignal.Set();
+ return true;
}
return false;
@@ -1977,8 +1978,6 @@ GcScheduler::AppendGCLog(std::string_view Id, GcClock::TimePoint StartTime, cons
MemoryView EntryBuffer(Blob.data(), Blob.size());
{
- RwLock::ExclusiveLockScope _(m_GcLogLock);
-
GcLogFile.Open(Path, BasicFile::Mode::kWrite);
uint64_t AppendPos = GcLogFile.FileSize();
@@ -2096,13 +2095,25 @@ GcScheduler::GetState() const
return Result;
}
+bool
+GcScheduler::IsManualTriggerPresent() const
+{
+ bool IsPending = Status() != GcSchedulerStatus::kStopped;
+ if (IsPending)
+ {
+ std::unique_lock Lock(m_GcMutex);
+ IsPending = m_TriggerGcParams || m_TriggerScrubParams;
+ }
+ return IsPending;
+}
+
void
GcScheduler::SchedulerThread()
{
ZEN_MEMSCOPE(GetGcTag());
SetCurrentThreadName("GcScheduler");
- std::chrono::seconds WaitTime{0};
+ std::chrono::seconds WaitTime{m_Config.Enabled ? std::chrono::seconds{0} : std::chrono::seconds::max()};
const std::chrono::seconds ShortWaitTime{5};
bool SilenceErrors = false;
@@ -2111,60 +2122,67 @@ GcScheduler::SchedulerThread()
(void)CheckDiskSpace();
std::chrono::seconds WaitedTime{0};
- bool Timeout = false;
+
+ std::optional<TriggerGcParams> TriggerGcParams;
+ std::optional<TriggerScrubParams> TriggerScrubParams;
+
+ ZEN_ASSERT(WaitTime.count() >= 0);
+ while (Status() != GcSchedulerStatus::kStopped)
{
- ZEN_ASSERT(WaitTime.count() >= 0);
- std::unique_lock Lock(m_GcMutex);
- while (!Timeout && (Status() != GcSchedulerStatus::kStopped))
- {
- std::chrono::seconds ShortWait = Min(WaitTime, ShortWaitTime);
- bool ShortTimeout = std::cv_status::timeout == m_GcSignal.wait_for(Lock, ShortWait);
+ std::chrono::seconds ShortWait = Min(WaitTime, ShortWaitTime);
+ bool ShortTimeout = !m_GcSignal.Wait(gsl::narrow<int>(ShortWait.count() * 1000));
- if (ShortTimeout)
+ if (ShortTimeout)
+ {
+ if (WaitTime > ShortWaitTime)
{
- if (WaitTime > ShortWaitTime)
+ DiskSpace Space = CheckDiskSpace();
+ if (!AreDiskWritesAllowed())
{
- DiskSpace Space = CheckDiskSpace();
- if (!AreDiskWritesAllowed())
- {
- ZEN_INFO("Triggering GC due to low disk space ({}) on {}", NiceBytes(Space.Free), m_Config.RootDirectory);
- Timeout = true;
- }
- WaitTime -= ShortWaitTime;
- }
- else
- {
- Timeout = true;
+ ZEN_INFO("Triggering GC due to low disk space ({}) on {}", NiceBytes(Space.Free), m_Config.RootDirectory);
+ break;
}
+ WaitTime -= ShortWaitTime;
}
else
{
- // We got a signal
break;
}
}
+ else
+ {
+ m_GcSignal.Reset();
+ {
+ std::unique_lock Lock(m_GcMutex);
+ TriggerGcParams = m_TriggerGcParams;
+ TriggerScrubParams = m_TriggerScrubParams;
+ }
+ break;
+ }
}
+ auto TriggerCleanup = MakeGuard([&]() {
+ if (TriggerGcParams || TriggerScrubParams)
+ {
+ std::unique_lock Lock(m_GcMutex);
+ m_TriggerGcParams.reset();
+ m_TriggerScrubParams.reset();
+ }
+ });
+
if (Status() == GcSchedulerStatus::kStopped)
{
break;
}
- if (!m_Config.Enabled && !m_TriggerScrubParams && !m_TriggerGcParams)
- {
- WaitTime = std::chrono::seconds::max();
- continue;
- }
-
- if (!Timeout && Status() == GcSchedulerStatus::kIdle)
- {
- continue;
- }
-
try
{
- bool DoGc = m_Config.Enabled;
- bool DoScrubbing = false;
+ bool ManualGcTriggered = false;
+ bool ManualScrubbingTriggered = false;
+ bool LowDiskSpaceGCTriggered = false;
+ bool HighDiskSpaceUsageGCTriggered = false;
+ bool TimeBasedGCTriggered = false;
+
std::chrono::seconds ScrubTimeslice = std::chrono::seconds::max();
bool DoDelete = true;
bool CollectSmallObjects = m_Config.CollectSmallObjects;
@@ -2189,16 +2207,33 @@ GcScheduler::SchedulerThread()
uint8_t NextAttachmentPassIndex =
ComputeAttachmentRange(m_AttachmentPassIndex, m_Config.AttachmentPassCount, AttachmentRangeMin, AttachmentRangeMax);
- bool LowDiskSpaceGCTriggered = false;
- bool HighDiskSpaceUsageGCTriggered = false;
- bool TimeBasedGCTriggered = false;
-
GcClock::TimePoint Now = GcClock::Now();
- if (m_TriggerGcParams)
+ if (TriggerScrubParams)
{
- const auto TriggerParams = m_TriggerGcParams.value();
- m_TriggerGcParams.reset();
+ ZEN_ASSERT_SLOW(!TriggerGcParams);
+ ZEN_INFO("Manual scrub triggered");
+ const auto TriggerParams = TriggerScrubParams.value();
+
+ ManualScrubbingTriggered = true;
+
+ if (!TriggerParams.SkipGc)
+ {
+ ManualGcTriggered = true;
+ }
+
+ if (TriggerParams.SkipCas)
+ {
+ SkipCid = true;
+ }
+
+ DoDelete = !TriggerParams.SkipDelete;
+ ScrubTimeslice = TriggerParams.MaxTimeslice;
+ }
+ else if (TriggerGcParams)
+ {
+ ZEN_INFO("Manual gc triggered");
+ const auto TriggerParams = TriggerGcParams.value();
CollectSmallObjects = TriggerParams.CollectSmallObjects;
@@ -2249,34 +2284,10 @@ GcScheduler::SchedulerThread()
{
EnableValidation = TriggerParams.EnableValidation.value();
}
- DoGc = true;
- }
-
- if (m_TriggerScrubParams)
- {
- DoScrubbing = true;
-
- if (m_TriggerScrubParams->SkipGc)
- {
- DoGc = false;
- }
-
- if (m_TriggerScrubParams->SkipCas)
- {
- SkipCid = true;
- }
-
- DoDelete = !m_TriggerScrubParams->SkipDelete;
- ScrubTimeslice = m_TriggerScrubParams->MaxTimeslice;
+ ManualGcTriggered = true;
}
- if (DoScrubbing)
- {
- ScrubStorage(DoDelete, SkipCid, ScrubTimeslice);
- m_TriggerScrubParams.reset();
- }
-
- if (!DoGc)
+ if (!ManualScrubbingTriggered && !ManualGcTriggered && !m_Config.Enabled)
{
continue;
}
@@ -2288,10 +2299,12 @@ GcScheduler::SchedulerThread()
GcClock::TimePoint BuildStoreExpireTime =
MaxBuildStoreDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxBuildStoreDuration;
- const GcStorageSize TotalSize = m_GcManager.TotalStorageSize();
-
- if (Timeout && Status() == GcSchedulerStatus::kIdle)
+ if (!ManualGcTriggered && !ManualScrubbingTriggered)
{
+ // Check for GC triggered by time/size limits
+
+ const GcStorageSize TotalSize = m_GcManager.TotalStorageSize();
+
DiskSpace Space = CheckDiskSpace();
const int64_t PressureGraphLength = 30;
@@ -2508,7 +2521,9 @@ GcScheduler::SchedulerThread()
}
continue;
}
+ }
+ {
uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle);
if (!m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning)))
{
@@ -2517,13 +2532,30 @@ GcScheduler::SchedulerThread()
}
}
- if (!SkipCid)
+ auto ResetState = MakeGuard([&]() {
+ uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
+ if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle)))
+ {
+ ZEN_ASSERT(m_Status == static_cast<uint32_t>(GcSchedulerStatus::kStopped));
+ }
+ });
+
+ if (ManualScrubbingTriggered)
{
- m_AttachmentPassIndex = NextAttachmentPassIndex;
+ ScrubStorage(DoDelete, SkipCid, ScrubTimeslice);
+ if (!ManualGcTriggered)
+ {
+ continue;
+ }
}
if (PrepareDiskReserve())
{
+ if (!SkipCid)
+ {
+ m_AttachmentPassIndex = NextAttachmentPassIndex;
+ }
+
bool GcSuccess = CollectGarbage(CacheExpireTime,
ProjectStoreExpireTime,
BuildStoreExpireTime,
@@ -2596,13 +2628,6 @@ GcScheduler::SchedulerThread()
WaitTime = m_Config.MonitorInterval;
}
m_GcManager.SetCancelGC(false);
-
- uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
- if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle)))
- {
- ZEN_ASSERT(m_Status == static_cast<uint32_t>(GcSchedulerStatus::kStopped));
- break;
- }
}
}
diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h
index 8f40ae727..791720589 100644
--- a/src/zenstore/include/zenstore/cache/cacheshared.h
+++ b/src/zenstore/include/zenstore/cache/cacheshared.h
@@ -78,6 +78,6 @@ enum class PutStatus
};
bool IsKnownBadBucketName(std::string_view BucketName);
-bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer);
+bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer&& Buffer);
} // namespace zen
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index 5150ecd42..734d2e5a7 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -488,10 +488,10 @@ public:
GcScheduler(GcManager& GcManager);
~GcScheduler();
- void Initialize(const GcSchedulerConfig& Config);
- void Shutdown();
- GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
- GcSchedulerState GetState() const;
+ void Initialize(const GcSchedulerConfig& Config);
+ void Shutdown();
+ bool IsManualTriggerPresent() const;
+ GcSchedulerState GetState() const;
struct TriggerGcParams
{
@@ -528,30 +528,31 @@ public:
bool CancelGC();
private:
- void SchedulerThread();
- bool ReclaimDiskReserve();
- bool PrepareDiskReserve();
- bool CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
- const GcClock::TimePoint& ProjectStoreExpireTime,
- const GcClock::TimePoint& BuildStoreExpireTime,
- bool Delete,
- bool CollectSmallObjects,
- bool SkipCid,
- GcVersion UseGCVersion,
- uint32_t CompactBlockUsageThresholdPercent,
- bool Verbose,
- bool SingleThreaded,
- const IoHash& AttachmentRangeMin,
- const IoHash& AttachmentRangeMax,
- bool StoreCacheAttachmentMetaData,
- bool StoreProjectAttachmentMetaData,
- bool EnableValidation,
- bool SilenceErrors);
- void ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds TimeSlice);
- LoggerRef Log() { return m_Log; }
- virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); }
- DiskSpace CheckDiskSpace();
- void AppendGCLog(std::string_view Id, GcClock::TimePoint GcStartTime, const GcSettings& Settings, const GcResult& Result);
+ GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
+ void SchedulerThread();
+ bool ReclaimDiskReserve();
+ bool PrepareDiskReserve();
+ bool CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
+ const GcClock::TimePoint& ProjectStoreExpireTime,
+ const GcClock::TimePoint& BuildStoreExpireTime,
+ bool Delete,
+ bool CollectSmallObjects,
+ bool SkipCid,
+ GcVersion UseGCVersion,
+ uint32_t CompactBlockUsageThresholdPercent,
+ bool Verbose,
+ bool SingleThreaded,
+ const IoHash& AttachmentRangeMin,
+ const IoHash& AttachmentRangeMax,
+ bool StoreCacheAttachmentMetaData,
+ bool StoreProjectAttachmentMetaData,
+ bool EnableValidation,
+ bool SilenceErrors);
+ void ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds TimeSlice);
+ LoggerRef Log() { return m_Log; }
+ virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); }
+ DiskSpace CheckDiskSpace();
+ void AppendGCLog(std::string_view Id, GcClock::TimePoint GcStartTime, const GcSettings& Settings, const GcResult& Result);
LoggerRef m_Log;
GcManager& m_GcManager;
@@ -571,18 +572,17 @@ private:
std::optional<GcResult> m_LastLightweightGCV2Result;
std::optional<GcResult> m_LastFullGCV2Result;
- std::atomic_uint32_t m_Status{};
- std::thread m_GcThread;
- mutable std::mutex m_GcMutex;
- std::condition_variable m_GcSignal;
+ std::atomic_uint32_t m_Status{};
+ std::thread m_GcThread;
+ mutable std::mutex m_GcMutex;
+ Event m_GcSignal;
+
std::optional<TriggerGcParams> m_TriggerGcParams;
std::optional<TriggerScrubParams> m_TriggerScrubParams;
std::atomic_bool m_AreDiskWritesBlocked = false;
TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog;
DiskUsageWindow m_DiskUsageWindow;
-
- RwLock m_GcLogLock;
};
void gc_forcelink();
diff --git a/src/zenstore/include/zenstore/projectstore.h b/src/zenstore/include/zenstore/projectstore.h
index 258be5930..ad108f65b 100644
--- a/src/zenstore/include/zenstore/projectstore.h
+++ b/src/zenstore/include/zenstore/projectstore.h
@@ -133,6 +133,7 @@ public:
void IterateOplog(std::function<void(CbObjectView)>&& Fn, const Paging& EntryPaging);
void IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Fn);
void IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Fn, const Paging& EntryPaging);
+ void IterateOplogWithKeyRaw(std::function<void(LogSequenceNumber, const Oid&, const IoBuffer& Buffer)>&& Handler);
void IterateOplogLocked(std::function<void(CbObjectView)>&& Fn, const Paging& EntryPaging);
size_t GetOplogEntryCount() const;
diff --git a/src/zenstore/include/zenstore/scrubcontext.h b/src/zenstore/include/zenstore/scrubcontext.h
index 2f28cfec7..0562ca8c5 100644
--- a/src/zenstore/include/zenstore/scrubcontext.h
+++ b/src/zenstore/include/zenstore/scrubcontext.h
@@ -8,6 +8,7 @@
namespace zen {
class WorkerThreadPool;
+class CompositeBuffer;
/** Context object for data scrubbing
@@ -67,4 +68,6 @@ public:
~ScrubDeadlineExpiredException();
};
+bool ValidateCompressedBuffer(const CompositeBuffer& Buffer, const IoHash* OptionalExpectedHash);
+
} // namespace zen
diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp
index 7570b8513..7e9ff50bb 100644
--- a/src/zenstore/projectstore.cpp
+++ b/src/zenstore/projectstore.cpp
@@ -774,70 +774,125 @@ struct ProjectStore::OplogStorage : public RefCounted
ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries");
BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
+ const uint64_t BlobsSize = m_OpBlobs.FileSize();
for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order)
{
const Oplog::OplogPayload& Entry = Entries[EntryOffset];
const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign;
- MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
- if (OpBufferView.GetSize() == Entry.Address.Size)
+ if (OpFileOffset + Entry.Address.Size > BlobsSize)
{
- if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None)
+ ZEN_WARN("oplog '{}/{}': skipping op outside of file size - {}. Op offset: {}, Op size: {}, file size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpFileOffset,
+ Entry.Address.Size,
+ BlobsSize);
+ }
+ else
+ {
+ MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
+ if (OpBufferView.GetSize() == Entry.Address.Size)
{
- CbObjectView OpView(OpBufferView.GetData());
- if (OpView.GetSize() != OpBufferView.GetSize())
+ if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default);
+ Error == CbValidateError::None)
{
- ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
- m_OwnerOplog->GetOuterProjectIdentifier(),
- m_OwnerOplog->OplogId(),
- Entry.Lsn.Number,
- OpView.GetSize(),
- OpBufferView.GetSize());
+ CbObjectView OpView(OpBufferView.GetData());
+ if (OpView.GetSize() != OpBufferView.GetSize())
+ {
+ ZEN_WARN(
+ "oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpView.GetSize(),
+ OpBufferView.GetSize());
+ }
+ else
+ {
+ Handler(Entry.Lsn, OpView);
+ }
}
else
{
- Handler(Entry.Lsn, OpView);
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ ToString(Error));
}
}
else
{
- ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
- m_OwnerOplog->GetOuterProjectIdentifier(),
- m_OwnerOplog->OplogId(),
- Entry.Lsn.Number,
- ToString(Error));
- }
- }
- else
- {
- IoBuffer OpBuffer(Entry.Address.Size);
- OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
- OpBufferView = OpBuffer.GetView();
- if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None)
- {
- CbObjectView OpView(OpBuffer.Data());
- if (OpView.GetSize() != OpBuffer.GetSize())
+ IoBuffer OpBuffer(Entry.Address.Size);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
+ OpBufferView = OpBuffer.GetView();
+ if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default);
+ Error == CbValidateError::None)
{
- ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
- m_OwnerOplog->GetOuterProjectIdentifier(),
- m_OwnerOplog->OplogId(),
- Entry.Lsn.Number,
- OpView.GetSize(),
- OpBuffer.GetSize());
+ CbObjectView OpView(OpBuffer.Data());
+ if (OpView.GetSize() != OpBuffer.GetSize())
+ {
+ ZEN_WARN(
+ "oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ OpView.GetSize(),
+ OpBuffer.GetSize());
+ }
+ else
+ {
+ Handler(Entry.Lsn, OpView);
+ }
}
else
{
- Handler(Entry.Lsn, OpView);
+ ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
+ m_OwnerOplog->GetOuterProjectIdentifier(),
+ m_OwnerOplog->OplogId(),
+ Entry.Lsn.Number,
+ ToString(Error));
}
}
+ }
+ }
+ }
+
+ void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries,
+ const std::span<const Oplog::PayloadIndex> Order,
+ std::function<void(LogSequenceNumber Lsn, const IoBuffer& Buffer)>&& Handler)
+ {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries");
+
+ BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
+ const uint64_t BlobsSize = m_OpBlobs.FileSize();
+
+ for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order)
+ {
+ const Oplog::OplogPayload& Entry = Entries[EntryOffset];
+
+ const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign;
+ if (OpFileOffset + Entry.Address.Size > BlobsSize)
+ {
+ Handler(Entry.Lsn, {});
+ }
+ else
+ {
+ MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset);
+ if (OpBufferView.GetSize() == Entry.Address.Size)
+ {
+ IoBuffer Buffer = IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize());
+ Handler(Entry.Lsn, Buffer);
+ }
else
{
- ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}",
- m_OwnerOplog->GetOuterProjectIdentifier(),
- m_OwnerOplog->OplogId(),
- Entry.Lsn.Number,
- ToString(Error));
+ IoBuffer OpBuffer(Entry.Address.Size);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
+ Handler(Entry.Lsn, OpBuffer);
}
}
}
@@ -1118,40 +1173,77 @@ ProjectStore::Oplog::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_INFO("scrubbing oplog '{}/{}'", m_OuterProjectId, m_OplogId);
+
ZEN_ASSERT(m_Mode == EMode::kFull);
+ Stopwatch Timer;
+ std::atomic_uint64_t OpCount = 0;
+ std::atomic_uint64_t VerifiedOpBytes = 0;
+
+ auto LogStats = MakeGuard([&] {
+ const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs());
+
+ ZEN_INFO("oplog '{}/{}' scrubbed {} in {} from {} ops ({})",
+ m_OuterProjectId,
+ m_OplogId,
+ NiceBytes(VerifiedOpBytes.load()),
+ NiceTimeSpanMs(DurationMs),
+ OpCount.load(),
+ NiceRate(VerifiedOpBytes, DurationMs));
+ });
+
std::vector<std::pair<LogSequenceNumber, Oid>> BadEntries;
using namespace std::literals;
- IterateOplogWithKey([&](LogSequenceNumber Lsn, const Oid& Key, CbObjectView Op) {
+ IterateOplogWithKeyRaw([&](LogSequenceNumber Lsn, const Oid& Key, const IoBuffer& Buffer) {
+ Ctx.ThrowIfDeadlineExpired();
+
+ OpCount++;
+ VerifiedOpBytes += Buffer.GetSize();
+
+ if (!Buffer)
{
- const Oid KeyHash = ComputeOpKey(Op);
- if (KeyHash != Key)
+ ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) could not be read from disk", Key, Lsn.Number);
+ BadEntries.push_back({Lsn, Key});
+ return;
+ }
+ {
+ MemoryView OpBufferView = Buffer.GetView();
+ if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error != CbValidateError::None)
{
+ ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) is not valid compact binary. Error: {}", Key, Lsn.Number, ToString(Error));
BadEntries.push_back({Lsn, Key});
- ZEN_WARN("Scrub: oplog data does not match information from index (op:{} != index:{})", KeyHash, Key);
return;
}
}
- // TODO: Should we really delete an Op because it points to a missing or malformed Cid chunk?
+ CbObjectView OpView(Buffer.GetData());
+ if (OpView.GetSize() != Buffer.GetSize())
+ {
+ ZEN_WARN("Scrub: oplog payload size {} for op {} (Lns: {}) does not match object size {}",
+ Buffer.GetSize(),
+ Key,
+ Lsn.Number,
+ OpView.GetSize());
+ BadEntries.push_back({Lsn, Key});
+ return;
+ }
- Op.IterateAttachments([&](CbFieldView Visitor) {
- const IoHash Cid = Visitor.AsAttachment();
- if (Ctx.IsBadCid(Cid))
- {
- // oplog entry references a CAS chunk which has been flagged as bad
- BadEntries.push_back({Lsn, Key});
- return;
- }
- if (!m_CidStore.ContainsChunk(Cid))
+ {
+ const Oid KeyHash = ComputeOpKey(OpView);
+ if (KeyHash != Key)
{
- // oplog entry references a CAS chunk which is not present
BadEntries.push_back({Lsn, Key});
+ ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) does not match information from index (op:{} != index:{})",
+ Key,
+ Lsn.Number,
+ KeyHash,
+ Key);
return;
}
- });
+ }
});
if (!BadEntries.empty())
@@ -2577,6 +2669,32 @@ ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, c
}
}
+void
+ProjectStore::Oplog::IterateOplogWithKeyRaw(std::function<void(LogSequenceNumber, const Oid&, const IoBuffer& Buffer)>&& Handler)
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap;
+ std::vector<PayloadIndex> ReplayOrder;
+
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (m_Storage)
+ {
+ ReplayOrder = GetSortedOpPayloadRangeLocked({}, &ReverseKeyMap);
+ if (!ReplayOrder.empty())
+ {
+ uint32_t EntryIndex = 0;
+ m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, const IoBuffer& Buffer) {
+ const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex];
+ Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Buffer);
+ EntryIndex++;
+ });
+ }
+ }
+ }
+}
+
static constexpr uint32_t OplogMetaDataExpectedMagic = 0x6f'74'6d'62; // 'omta';
void
@@ -3773,18 +3891,64 @@ void
ProjectStore::Project::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
+
+ ZEN_INFO("scrubbing '{}'", ProjectRootDir);
+
// Scrubbing needs to check all existing oplogs
std::vector<std::string> OpLogs = ScanForOplogs();
- for (const std::string& OpLogId : OpLogs)
+
+ RwLock::SharedLockScope _(m_ProjectLock);
+
+ std::atomic<bool> Abort;
+ std::atomic<bool> Pause;
+ ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog);
+
+ try
{
- OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true);
- }
- IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) {
- if (!IsExpired(GcClock::TimePoint::min(), Ops))
+ for (const std::string& OpLogId : OpLogs)
{
- Ops.Scrub(Ctx);
+ Ref<ProjectStore::Oplog> OpLog;
+ {
+ if (auto OpIt = m_Oplogs.find(OpLogId); OpIt != m_Oplogs.end())
+ {
+ OpLog = OpIt->second;
+ }
+ else
+ {
+ std::filesystem::path OplogBasePath = BasePathForOplog(OpLogId);
+ if (ProjectStore::Oplog::ExistsAt(OplogBasePath))
+ {
+ OpLog = new ProjectStore::Oplog(
+ Log(),
+ Identifier,
+ OpLogId,
+ m_CidStore,
+ OplogBasePath,
+ std::filesystem::path{},
+ ProjectStore::Oplog::EMode::kFull); // We need it to be a full read so we can write a new index snapshot
+ OpLog->Read();
+ }
+ }
+ }
+
+ if (OpLog)
+ {
+ Work.ScheduleWork(Ctx.ThreadPool(), [OpLog, &Ctx](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ OpLog->Scrub(Ctx);
+ }
+ });
+ }
}
- });
+ Work.Wait();
+ }
+ catch (const ScrubDeadlineExpiredException&)
+ {
+ ZEN_INFO("Scrubbing deadline expired, operation incomplete");
+ Abort = true;
+ Work.Wait();
+ }
}
uint64_t
@@ -4454,7 +4618,10 @@ ProjectStore::GetProjectFiles(LoggerRef InLog, Project& Project, Oplog& Oplog, c
if (WantsRawSizeField)
{
IoHash _;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index]))
+ if (CompressedBuffer::ValidateCompressedHeader(Payload,
+ _,
+ RawSizes[Index],
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
if (WantsSizeField)
{
@@ -4611,7 +4778,10 @@ ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Opl
{
ZEN_ASSERT_SLOW(RawSizes[Index] == (uint64_t)-1);
IoHash _;
- if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index]))
+ if (CompressedBuffer::ValidateCompressedHeader(Payload,
+ _,
+ RawSizes[Index],
+ /*OutOptionalTotalCompressedSize*/ nullptr))
{
if (WantsSizeField)
{
@@ -4722,7 +4892,7 @@ ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, cons
{
IoHash RawHash;
uint64_t RawSize;
- bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize);
+ bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr);
if (!IsCompressed)
{
throw std::runtime_error(
diff --git a/src/zenstore/scrubcontext.cpp b/src/zenstore/scrubcontext.cpp
index fbcd7d33c..8f8ec09a7 100644
--- a/src/zenstore/scrubcontext.cpp
+++ b/src/zenstore/scrubcontext.cpp
@@ -2,6 +2,10 @@
#include "zenstore/scrubcontext.h"
+#include <zencore/compress.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/string.h>
#include <zencore/workthreadpool.h>
namespace zen {
@@ -62,4 +66,64 @@ ScrubContext::ThrowIfDeadlineExpired() const
throw ScrubDeadlineExpiredException();
}
+bool
+ValidateCompressedBuffer(const CompositeBuffer& Buffer, const IoHash* OptionalExpectedHash)
+{
+ IoHash HeaderRawHash;
+ uint64_t RawSize = 0;
+ uint64_t TotalCompressedSize = 0;
+ if (!CompressedBuffer::ValidateCompressedHeader(Buffer, HeaderRawHash, RawSize, &TotalCompressedSize))
+ {
+ if (OptionalExpectedHash)
+ {
+ ZEN_SCOPED_WARN("compressed buffer header validation failed for chunk with hash {}", *OptionalExpectedHash);
+ }
+ else
+ {
+ ZEN_SCOPED_WARN("compressed buffer header validation failed");
+ }
+ return false;
+ }
+
+ if (OptionalExpectedHash != nullptr && HeaderRawHash != (*OptionalExpectedHash))
+ {
+ ZEN_SCOPED_WARN("compressed buffer hash {} does not match expected hash {}", HeaderRawHash, *OptionalExpectedHash);
+ return false;
+ }
+
+ if (TotalCompressedSize != Buffer.GetSize())
+ {
+ ZEN_SCOPED_WARN("compressed buffer size does not match total compressed size in header for chunk {}", HeaderRawHash);
+ return false;
+ }
+
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Buffer, /* out */ HeaderRawHash, /* out */ RawSize);
+
+ IoHashStream HashStream;
+ if (!Compressed.DecompressToStream(
+ 0,
+ RawSize,
+ [&HashStream](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) -> bool {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+ for (const SharedBuffer& Segment : Range.GetSegments())
+ {
+ HashStream.Append(Segment);
+ }
+ return true;
+ }))
+ {
+ ZEN_SCOPED_WARN("compressed buffer could not be decompressed for chunk {}", HeaderRawHash);
+ return false;
+ }
+
+ IoHash DecompressedHash = HashStream.GetHash();
+
+ if (HeaderRawHash != DecompressedHash)
+ {
+ ZEN_SCOPED_WARN("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash);
+ return false;
+ }
+ return true;
+}
+
} // namespace zen