diff options
| author | Dan Engelbrecht <[email protected]> | 2025-11-27 16:05:56 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-11-27 16:05:56 +0100 |
| commit | 4984e8cd5c38cf77c8cb978f75f808bce0577f2d (patch) | |
| tree | c298828c6290a669500788f96f8ea25be41ff88a /src | |
| parent | remove bad assert (#670) (diff) | |
| download | zen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.tar.xz zen-4984e8cd5c38cf77c8cb978f75f808bce0577f2d.zip | |
automatic scrub on startup (#667)
- Improvement: Deeper validation of data when scrub is activated (cas/cache/project)
- Improvement: Enabled more multi threading when running scrub operations
- Improvement: Added means to force a scrub operation at startup with a new release using ZEN_DATA_FORCE_SCRUB_VERSION variable in xmake.lua
Diffstat (limited to 'src')
26 files changed, 748 insertions, 467 deletions
diff --git a/src/zen/cmds/print_cmd.cpp b/src/zen/cmds/print_cmd.cpp index 557808ba7..a4737e66d 100644 --- a/src/zen/cmds/print_cmd.cpp +++ b/src/zen/cmds/print_cmd.cpp @@ -97,8 +97,17 @@ PrintCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) IoHash RawHash; uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Data, RawHash, RawSize)) + uint64_t CompressedSize; + if (CompressedBuffer::ValidateCompressedHeader(Data, RawHash, RawSize, &CompressedSize)) { + if (CompressedSize != Data.GetSize()) + { + ZEN_CONSOLE_WARN( + "Compressed binary header total compressed size mismatch. Payload is {} bytes and header says compressed payload is {} " + "bytes", + Data.GetSize(), + CompressedSize); + } ZEN_CONSOLE("Compressed binary: size {}, raw size {}, hash: {}", Data.GetSize(), RawSize, RawHash); } else if (IsPackageMessage(Data)) diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index 0c53a8000..1c623db4d 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -59,10 +59,16 @@ struct BufferHeader BLAKE3 RawHash; // The hash of the uncompressed data /** Checks validity of the buffer based on the magic number, method, and CRC-32. */ - static bool IsValid(const CompositeBuffer& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize); - static bool IsValid(const SharedBuffer& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize) + static bool IsValid(const CompositeBuffer& CompressedData, + IoHash& OutRawHash, + uint64_t& OutRawSize, + uint64_t* OutOptionalTotalCompressedSize); + static bool IsValid(const SharedBuffer& CompressedData, + IoHash& OutRawHash, + uint64_t& OutRawSize, + uint64_t* OutOptionalTotalCompressedSize) { - return IsValid(CompositeBuffer(CompressedData), OutRawHash, OutRawSize); + return IsValid(CompositeBuffer(CompressedData), OutRawHash, OutRawSize, OutOptionalTotalCompressedSize); } /** Read a header from a buffer that is at least sizeof(BufferHeader) without any validation. */ @@ -1467,13 +1473,20 @@ ReadHeader(const CompositeBuffer& CompressedData, BufferHeader& OutHeader, Uniqu } bool -BufferHeader::IsValid(const CompositeBuffer& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize) +BufferHeader::IsValid(const CompositeBuffer& CompressedData, + IoHash& OutRawHash, + uint64_t& OutRawSize, + uint64_t* OutOptionalTotalCompressedSize) { detail::BufferHeader Header; if (ReadHeader(CompressedData, Header, nullptr)) { OutRawHash = IoHash::FromBLAKE3(Header.RawHash); OutRawSize = Header.TotalRawSize; + if (OutOptionalTotalCompressedSize) + { + *OutOptionalTotalCompressedSize = Header.TotalCompressedSize; + } return true; } return false; @@ -1643,10 +1656,11 @@ private: template<typename BufferType> inline CompositeBuffer -ValidBufferOrEmpty(BufferType&& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize) +ValidBufferOrEmpty(BufferType&& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize, uint64_t* OutOptionalTotalCompressedSize) { - return BufferHeader::IsValid(CompressedData, OutRawHash, OutRawSize) ? CompositeBuffer(std::forward<BufferType>(CompressedData)) - : CompositeBuffer(); + return BufferHeader::IsValid(CompressedData, OutRawHash, OutRawSize, OutOptionalTotalCompressedSize) + ? CompositeBuffer(std::forward<BufferType>(CompressedData)) + : CompositeBuffer(); } CompositeBuffer @@ -1893,7 +1907,7 @@ CompressedBuffer CompressedBuffer::FromCompressed(const CompositeBuffer& InCompressedData, IoHash& OutRawHash, uint64_t& OutRawSize) { CompressedBuffer Local; - Local.CompressedData = detail::ValidBufferOrEmpty(InCompressedData, OutRawHash, OutRawSize); + Local.CompressedData = detail::ValidBufferOrEmpty(InCompressedData, OutRawHash, OutRawSize, /*OutOptionalTotalCompressedSize*/ nullptr); return Local; } @@ -1901,7 +1915,8 @@ CompressedBuffer CompressedBuffer::FromCompressed(CompositeBuffer&& InCompressedData, IoHash& OutRawHash, uint64_t& OutRawSize) { CompressedBuffer Local; - Local.CompressedData = detail::ValidBufferOrEmpty(std::move(InCompressedData), OutRawHash, OutRawSize); + Local.CompressedData = + detail::ValidBufferOrEmpty(std::move(InCompressedData), OutRawHash, OutRawSize, /*OutOptionalTotalCompressedSize*/ nullptr); return Local; } @@ -1909,7 +1924,7 @@ CompressedBuffer CompressedBuffer::FromCompressed(const SharedBuffer& InCompressedData, IoHash& OutRawHash, uint64_t& OutRawSize) { CompressedBuffer Local; - Local.CompressedData = detail::ValidBufferOrEmpty(InCompressedData, OutRawHash, OutRawSize); + Local.CompressedData = detail::ValidBufferOrEmpty(InCompressedData, OutRawHash, OutRawSize, /*OutOptionalTotalCompressedSize*/ nullptr); return Local; } @@ -1917,7 +1932,8 @@ CompressedBuffer CompressedBuffer::FromCompressed(SharedBuffer&& InCompressedData, IoHash& OutRawHash, uint64_t& OutRawSize) { CompressedBuffer Local; - Local.CompressedData = detail::ValidBufferOrEmpty(std::move(InCompressedData), OutRawHash, OutRawSize); + Local.CompressedData = + detail::ValidBufferOrEmpty(std::move(InCompressedData), OutRawHash, OutRawSize, /*OutOptionalTotalCompressedSize*/ nullptr); return Local; } @@ -1946,15 +1962,30 @@ CompressedBuffer::FromCompressedNoValidate(CompositeBuffer&& InCompressedData) } bool -CompressedBuffer::ValidateCompressedHeader(IoBuffer&& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize) +CompressedBuffer::ValidateCompressedHeader(IoBuffer&& CompressedData, + IoHash& OutRawHash, + uint64_t& OutRawSize, + uint64_t* OutOptionalTotalCompressedSize) +{ + return detail::BufferHeader::IsValid(SharedBuffer(std::move(CompressedData)), OutRawHash, OutRawSize, OutOptionalTotalCompressedSize); +} + +bool +CompressedBuffer::ValidateCompressedHeader(const IoBuffer& CompressedData, + IoHash& OutRawHash, + uint64_t& OutRawSize, + uint64_t* OutOptionalTotalCompressedSize) { - return detail::BufferHeader::IsValid(SharedBuffer(std::move(CompressedData)), OutRawHash, OutRawSize); + return detail::BufferHeader::IsValid(SharedBuffer(CompressedData), OutRawHash, OutRawSize, OutOptionalTotalCompressedSize); } bool -CompressedBuffer::ValidateCompressedHeader(const IoBuffer& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize) +CompressedBuffer::ValidateCompressedHeader(const CompositeBuffer& CompressedData, + IoHash& OutRawHash, + uint64_t& OutRawSize, + uint64_t* OutOptionalTotalCompressedSize) { - return detail::BufferHeader::IsValid(SharedBuffer(CompressedData), OutRawHash, OutRawSize); + return detail::BufferHeader::IsValid(CompressedData, OutRawHash, OutRawSize, OutOptionalTotalCompressedSize); } size_t diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h index 09fa6249d..3802a8c31 100644 --- a/src/zencore/include/zencore/compress.h +++ b/src/zencore/include/zencore/compress.h @@ -100,12 +100,20 @@ public: uint64_t& OutRawSize); [[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressedNoValidate(IoBuffer&& CompressedData); [[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressedNoValidate(CompositeBuffer&& CompressedData); - [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(IoBuffer&& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize); - [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(const IoBuffer& CompressedData, - IoHash& OutRawHash, - uint64_t& OutRawSize); - [[nodiscard]] ZENCORE_API static size_t GetHeaderSizeForNoneEncoder(); - [[nodiscard]] ZENCORE_API static UniqueBuffer CreateHeaderForNoneEncoder(uint64_t RawSize, const BLAKE3& RawHash); + [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(IoBuffer&& CompressedData, + IoHash& OutRawHash, + uint64_t& OutRawSize, + uint64_t* OutOptionalTotalCompressedSize); + [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(const IoBuffer& CompressedData, + IoHash& OutRawHash, + uint64_t& OutRawSize, + uint64_t* OutOptionalTotalCompressedSize); + [[nodiscard]] ZENCORE_API static bool ValidateCompressedHeader(const CompositeBuffer& CompressedData, + IoHash& OutRawHash, + uint64_t& OutRawSize, + uint64_t* OutOptionalTotalCompressedSize); + [[nodiscard]] ZENCORE_API static size_t GetHeaderSizeForNoneEncoder(); + [[nodiscard]] ZENCORE_API static UniqueBuffer CreateHeaderForNoneEncoder(uint64_t RawSize, const BLAKE3& RawHash); /** Reset this to null. */ inline void Reset() { CompressedData.Reset(); } diff --git a/src/zencore/include/zencore/config.h.in b/src/zencore/include/zencore/config.h.in index 3372eca2a..93256756d 100644 --- a/src/zencore/include/zencore/config.h.in +++ b/src/zencore/include/zencore/config.h.in @@ -14,3 +14,4 @@ #define ZEN_CFG_VERSION_BUILD_STRING "${VERSION}-${plat}-${arch}-${mode}" #define ZEN_CFG_VERSION_BUILD_STRING_FULL "${VERSION}-${VERSION_BUILD}-${plat}-${arch}-${mode}-${GIT_COMMIT}" #define ZEN_CFG_SCHEMA_VERSION ${ZEN_SCHEMA_VERSION} +#define ZEN_CFG_DATA_FORCE_SCRUB_VERSION ${ZEN_DATA_FORCE_SCRUB_VERSION} diff --git a/src/zenhttp/clients/httpclientcpr.cpp b/src/zenhttp/clients/httpclientcpr.cpp index 66a4ce16f..7bfc4670c 100644 --- a/src/zenhttp/clients/httpclientcpr.cpp +++ b/src/zenhttp/clients/httpclientcpr.cpp @@ -296,7 +296,7 @@ CprHttpClient::ValidatePayload(cpr::Response& Response, std::unique_ptr<detail:: { IoHash RawHash; uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(ResponseBuffer, RawHash, RawSize)) + if (CompressedBuffer::ValidateCompressedHeader(ResponseBuffer, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr)) { return true; } diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 76f36a921..ba82c5956 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -2926,7 +2926,10 @@ BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash) { IoHash RawHash; uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize)) + if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, + RawHash, + RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) { return CompressedChunkPath; } diff --git a/src/zenremotestore/builds/filebuildstorage.cpp b/src/zenremotestore/builds/filebuildstorage.cpp index 96d81b281..153deaa9f 100644 --- a/src/zenremotestore/builds/filebuildstorage.cpp +++ b/src/zenremotestore/builds/filebuildstorage.cpp @@ -9,6 +9,7 @@ #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zenstore/scrubcontext.h> namespace zen { @@ -278,7 +279,7 @@ public: ZEN_UNUSED(BuildId); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); - ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload)); + ZEN_ASSERT_SLOW(ValidateCompressedBuffer(Payload, &RawHash)); uint64_t ReceivedBytes = 0; uint64_t SentBytes = Payload.GetSize(); @@ -374,7 +375,7 @@ public: if (IsLastPart) { Workload->TempFile.Flush(); - ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(Workload->TempFile.ReadAll()))); + ZEN_ASSERT_SLOW(ValidateCompressedBuffer(CompositeBuffer(Workload->TempFile.ReadAll()), &RawHash)); Workload->TempFile.MoveTemporaryIntoPlace(BlockPath, Ec); if (Ec) { @@ -423,7 +424,7 @@ public: else { Payload = File.ReadAll(); - ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload)))); + ZEN_ASSERT_SLOW(ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Payload)), &RawHash)); } Payload.SetContentType(ZenContentType::kCompressedBinary); ReceivedBytes = Payload.GetSize(); @@ -734,43 +735,6 @@ protected: return NeededAttachments; } - bool ValidateCompressedBuffer(const IoHash& RawHash, const CompositeBuffer& Payload) - { - IoHash VerifyHash; - uint64_t VerifySize; - CompressedBuffer ValidateBuffer = CompressedBuffer::FromCompressed(Payload, VerifyHash, VerifySize); - if (!ValidateBuffer) - { - return false; - } - if (VerifyHash != RawHash) - { - return false; - } - - IoHashStream Hash; - bool CouldDecompress = ValidateBuffer.DecompressToStream( - 0, - (uint64_t)-1, - [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset, SourceSize, Offset); - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) - { - Hash.Append(Segment.GetView()); - } - return true; - }); - if (!CouldDecompress) - { - return false; - } - if (Hash.GetHash() != VerifyHash) - { - return false; - } - return true; - } - private: void AddStatistic(Stopwatch& ExecutionTimer, uint64_t UploadedBytes, uint64_t DownloadedBytes) { diff --git a/src/zenremotestore/jupiter/jupitersession.cpp b/src/zenremotestore/jupiter/jupitersession.cpp index 5a2bf9fe8..a0c961e37 100644 --- a/src/zenremotestore/jupiter/jupitersession.cpp +++ b/src/zenremotestore/jupiter/jupitersession.cpp @@ -782,7 +782,10 @@ JupiterSession::GetBuildBlob(std::string_view Namespace, uint64_t ValidateRawSize = 0; if (!Headers.Entries.contains("Range")) { - ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize)); + ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, + ValidateRawHash, + ValidateRawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)); ZEN_ASSERT_SLOW(ValidateRawHash == Hash); ZEN_ASSERT_SLOW(ValidateRawSize > 0); ZEN_UNUSED(ValidateRawHash, ValidateRawSize); diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index f42856252..99c36b460 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -544,9 +544,11 @@ namespace remotestore_impl { WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); IoHash RawHash; uint64_t RawSize; - ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), - RawHash, - RawSize)); + ZEN_ASSERT( + CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), + RawHash, + RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)); ZEN_ASSERT(RawHash == AttachmentRawHash); WriteRawHashes.emplace_back(AttachmentRawHash); WantedChunks.erase(AttachmentRawHash); diff --git a/src/zenserver/storage/cache/httpstructuredcache.cpp b/src/zenserver/storage/cache/httpstructuredcache.cpp index ece1d7a46..03d2140a1 100644 --- a/src/zenserver/storage/cache/httpstructuredcache.cpp +++ b/src/zenserver/storage/cache/httpstructuredcache.cpp @@ -1238,7 +1238,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con uint64_t RawSize = Body.GetSize(); if (ContentType == HttpContentType::kCompressedBinary) { - if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) + if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr)) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest, @@ -1515,7 +1515,10 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { IoHash RawHash; uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, RawHash, RawSize)) + if (CompressedBuffer::ValidateCompressedHeader(UpstreamResult.Value, + RawHash, + RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) { if (RawHash == Ref.ValueContentId) { @@ -1601,7 +1604,7 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons IoHash RawHash; uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize)) + if (!CompressedBuffer::ValidateCompressedHeader(Body, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr)) { m_CacheStats.BadRequestCount++; return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); diff --git a/src/zenserver/storage/upstream/upstreamcache.cpp b/src/zenserver/storage/upstream/upstreamcache.cpp index 6c489c5d3..938a1a011 100644 --- a/src/zenserver/storage/upstream/upstreamcache.cpp +++ b/src/zenserver/storage/upstream/upstreamcache.cpp @@ -208,7 +208,10 @@ namespace detail { IoHash RawHash; uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(AttachmentResult.Response, RawHash, RawSize)) + if (CompressedBuffer::ValidateCompressedHeader(AttachmentResult.Response, + RawHash, + RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) { Result.Response = AttachmentResult.Response; ++NumAttachments; @@ -425,7 +428,10 @@ namespace detail { m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); if (Payload && IsCompressedBinary(Payload.GetContentType())) { - IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize); + IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, + RawHash, + RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr); } } @@ -481,7 +487,11 @@ namespace detail { { if (IsCompressedBinary(Payload.GetContentType())) { - IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize) && RawHash != PayloadHash; + IsCompressed = CompressedBuffer::ValidateCompressedHeader(Payload, + RawHash, + RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr) && + RawHash != PayloadHash; } else { @@ -559,7 +569,10 @@ namespace detail { { IoHash RawHash; uint64_t RawSize; - if (!CompressedBuffer::ValidateCompressedHeader(RecordValue, RawHash, RawSize)) + if (!CompressedBuffer::ValidateCompressedHeader(RecordValue, + RawHash, + RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) { return {.Reason = std::string("Invalid compressed value buffer"), .Success = false}; } diff --git a/src/zenserver/storage/zenstorageserver.cpp b/src/zenserver/storage/zenstorageserver.cpp index e4f8c6aa3..827f22ecc 100644 --- a/src/zenserver/storage/zenstorageserver.cpp +++ b/src/zenserver/storage/zenstorageserver.cpp @@ -399,9 +399,10 @@ ZenStorageServer::InitializeState(const ZenStorageServerConfig& ServerOptions) { m_RootManifest = LoadCompactBinaryObject(Manifest); - const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0); - StateId = m_RootManifest["state_id"].AsObjectId(); - CreatedWhen = m_RootManifest["created"].AsDateTime(); + const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0); + const int32_t ForceScrubVersion = m_RootManifest["force_scrub_version"].AsInt32(0); + StateId = m_RootManifest["state_id"].AsObjectId(); + CreatedWhen = m_RootManifest["created"].AsDateTime(); if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION) { @@ -424,6 +425,10 @@ ZenStorageServer::InitializeState(const ZenStorageServerConfig& ServerOptions) fmt::format("Manifest schema version: {}, differs from required: {}", ManifestVersion, ZEN_CFG_SCHEMA_VERSION); } } + else if (ForceScrubVersion != ZEN_CFG_DATA_FORCE_SCRUB_VERSION) + { + m_StartupScrubOptions = "nogc,wait"; + } } } } @@ -473,7 +478,8 @@ ZenStorageServer::InitializeState(const ZenStorageServerConfig& ServerOptions) { CbObjectWriter Cbo; - Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "created" << CreatedWhen << "updated" << Now << "state_id" << StateId; + Cbo << "schema_version" << ZEN_CFG_SCHEMA_VERSION << "force_scrub_version" << ZEN_CFG_DATA_FORCE_SCRUB_VERSION << "created" + << CreatedWhen << "updated" << Now << "state_id" << StateId; m_RootManifest = Cbo.Save(); @@ -686,10 +692,6 @@ ZenStorageServer::Run() const bool IsInteractiveMode = IsInteractiveSession() && !m_TestMode; - SetNewState(kRunning); - - OnReady(); - if (!m_StartupScrubOptions.empty()) { using namespace std::literals; @@ -726,33 +728,29 @@ ZenStorageServer::Run() if (DoScrub) { - m_GcScheduler.TriggerScrub(ScrubParams); + ZEN_INFO("Triggering Scrub/GC operation..."); + while (!m_GcScheduler.TriggerScrub(ScrubParams)) + { + ZEN_INFO("GC scheduler is busy, waiting for it to complete..."); + Sleep(500); + } if (DoWait) { - auto State = m_GcScheduler.Status(); - - while ((State != GcSchedulerStatus::kRunning) && (State != GcSchedulerStatus::kStopped)) - { - Sleep(500); - - State = m_GcScheduler.Status(); - } - - ZEN_INFO("waiting for Scrub/GC to complete..."); - - while (State == GcSchedulerStatus::kRunning) + while (m_GcScheduler.IsManualTriggerPresent()) { - Sleep(500); - - State = m_GcScheduler.Status(); + ZEN_INFO("Waiting for Scrub/GC to complete..."); + Sleep(2000); } - ZEN_INFO("Scrub/GC completed"); } } } + SetNewState(kRunning); + + OnReady(); + if (m_IsPowerCycle) { ZEN_INFO("Power cycle mode enabled -- shutting down"); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 6c5b50f58..f97c98e08 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -1129,6 +1129,7 @@ BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocati ++RangeEnd; } + ZEN_LOG_SCOPE("iterating chunks from '{}'", GetBlockPath(m_BlocksBasePath, BlockIndex)); if (!Callback(BlockIndex, ChunkIndexRange.subspan(RangeStart, RangeEnd - RangeStart))) { return false; diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 67f587a7c..4f0d412ec 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -241,7 +241,10 @@ UpdateValueWithRawSizeAndHash(ZenCacheValue& Value) { if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { - return CompressedBuffer::ValidateCompressedHeader(Value.Value, Value.RawHash, Value.RawSize); + return CompressedBuffer::ValidateCompressedHeader(Value.Value, + Value.RawHash, + Value.RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr); } else { @@ -1623,7 +1626,10 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept ZEN_TRACE_CPU("Z$::Bucket::EndGetBatch::MetaData"); if (Location.IsFlagSet(DiskLocation::kCompressed)) { - if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) + if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, + OutValue.RawHash, + OutValue.RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) { OutValue = ZenCacheValue{}; AddToMemCache = false; @@ -1752,7 +1758,10 @@ ZenCacheDiskLayer::CacheBucket::EndGetBatch(GetBatchHandle* Batch) noexcept { if (OutValue.Value.GetContentType() == ZenContentType::kCompressedBinary) { - if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) + if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, + OutValue.RawHash, + OutValue.RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) { OutValue = ZenCacheValue{}; } @@ -1900,7 +1909,10 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData"); if (Location.IsFlagSet(DiskLocation::kCompressed)) { - if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) + if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, + OutValue.RawHash, + OutValue.RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) { OutValue = ZenCacheValue{}; m_DiskMissCount++; @@ -2369,7 +2381,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) auto LogStats = MakeGuard([&] { const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs()); - ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", + ZEN_INFO("cache bucket '{}' scrubbed {} in {} from {} chunks ({})", m_BucketName, NiceBytes(VerifiedChunkBytes.load()), NiceTimeSpanMs(DurationMs), @@ -2402,10 +2414,10 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) const BucketPayload& Payload = m_Payloads[Kv.second]; const DiskLocation& Loc = Payload.Location; + Ctx.ThrowIfDeadlineExpired(); + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - Ctx.ThrowIfDeadlineExpired(); - ChunkCount.fetch_add(1); VerifiedChunkBytes.fetch_add(Loc.Size()); @@ -2439,7 +2451,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) ReportBadKey(HashKey); continue; } - if (!ValidateIoBuffer(Loc.GetContentType(), Buffer)) + if (!ValidateIoBuffer(Loc.GetContentType(), std::move(Buffer))) { ReportBadKey(HashKey); continue; @@ -2478,7 +2490,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); - if (!ValidateIoBuffer(ContentType, Buffer)) + if (!ValidateIoBuffer(ContentType, std::move(Buffer))) { ReportBadKey(Hash); return true; @@ -2501,7 +2513,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; ZenContentType ContentType = Payload.Location.GetContentType(); Buffer.SetContentType(ContentType); - if (!ValidateIoBuffer(ContentType, Buffer)) + if (!ValidateIoBuffer(ContentType, std::move(Buffer))) { ReportBadKey(Hash); return true; @@ -2522,7 +2534,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) if (!BadKeys.empty()) { - ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); + ZEN_WARN("Scrubbing found {} bad chunks out of {} in '{}'", BadKeys.size(), ChunkCount.load(), m_BucketDir); if (Ctx.RunRecovery()) { @@ -4443,35 +4455,33 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) ZEN_TRACE_CPU("Z$::ScrubStorage"); RwLock::SharedLockScope _(m_Lock); - { - std::vector<std::future<void>> Results; - Results.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { -# if 1 - Results.push_back(Ctx.ThreadPool().EnqueueTask( - std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }}, - WorkerThreadPool::EMode::EnableBacklog)); -# else - CacheBucket& Bucket = *Kv.second; - Bucket.ScrubStorage(Ctx); -# endif - } + std::atomic<bool> Abort; + std::atomic<bool> Pause; + ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog); - for (auto& Result : Results) - { - if (Result.valid()) - { - Result.wait(); - } - } - for (auto& Result : Results) + try + { + for (auto& Kv : m_Buckets) { - Result.get(); + Ctx.ThrowIfDeadlineExpired(); + Work.ScheduleWork(Ctx.ThreadPool(), [Bucket = Kv.second.get(), &Ctx](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + Bucket->ScrubStorage(Ctx); + } + }); } + Work.Wait(); + } + catch (const ScrubDeadlineExpiredException&) + { + ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + Abort = true; + Work.Wait(); } } + #endif // ZEN_WITH_TESTS CacheStoreSize diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 09a3d0afc..9e57b41c3 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -1757,7 +1757,10 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, else { IoHash _; - if (CompressedBuffer::ValidateCompressedHeader(Payload, _, Request->RawSize)) + if (CompressedBuffer::ValidateCompressedHeader(Payload, + _, + Request->RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) { Request->Exists = true; Request->RawSizeKnown = true; diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index a164f66c3..c0b433c51 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -4,7 +4,9 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryutil.h> #include <zencore/compactbinaryvalidation.h> +#include <zencore/compositebuffer.h> #include <zencore/compress.h> #include <zencore/except.h> #include <zencore/filesystem.h> @@ -71,48 +73,38 @@ IsKnownBadBucketName(std::string_view Bucket) } bool -ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer) +ValidateIoBuffer(ZenContentType ContentType, IoBuffer&& Buffer) { ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType); if (ContentType == ZenContentType::kCbObject) { - CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); + uint64_t BufferSize = Buffer.GetSize(); + CbValidateError Error = CbValidateError::None; + CbObject Object = ValidateAndReadCompactBinaryObject(std::move(Buffer), Error); if (Error == CbValidateError::None) { - return true; - } - - ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error)); - - return false; - } - else if (ContentType == ZenContentType::kCompressedBinary) - { - IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer); - - IoHash HeaderRawHash; - uint64_t RawSize = 0; - if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize)) - { - ZEN_SCOPED_ERROR("compressed buffer header validation failed"); - - return false; + if (Object.GetSize() == BufferSize) + { + return true; + } + else + { + ZEN_SCOPED_WARN("compact binary object size {} does not match payload size {}", Object.GetSize(), BufferSize); + return false; + } } - - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize); - CompositeBuffer Decompressed = Compressed.DecompressToComposite(); - IoHash DecompressedHash = IoHash::HashBuffer(Decompressed); - - if (HeaderRawHash != DecompressedHash) + else { - ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash); - + ZEN_SCOPED_WARN("compact binary validation failed: '{}'", ToString(Error)); return false; } } + else if (ContentType == ZenContentType::kCompressedBinary) + { + return ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(std::move(Buffer))), /*OptionalExpectedHash*/ nullptr); + } else { // No way to verify this kind of content (what is it exactly?) diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp index 52d5df061..bedf91287 100644 --- a/src/zenstore/cidstore.cpp +++ b/src/zenstore/cidstore.cpp @@ -31,7 +31,8 @@ struct CidStore::Impl #if ZEN_BUILD_DEBUG IoHash VerifyRawHash; uint64_t _; - ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHash == VerifyRawHash); + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _, /*OutOptionalTotalCompressedSize*/ nullptr) && + RawHash == VerifyRawHash); #endif IoBuffer Payload(ChunkData); @@ -66,7 +67,9 @@ struct CidStore::Impl #if ZEN_BUILD_DEBUG IoHash VerifyRawHash; uint64_t _; - ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _) && RawHashes[Offset++] == VerifyRawHash); + ZEN_ASSERT( + CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _, /*OutOptionalTotalCompressedSize*/ nullptr) && + RawHashes[Offset++] == VerifyRawHash); #endif Chunks.push_back(ChunkData); Chunks.back().SetContentType(ZenContentType::kCompressedBinary); diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index e1f17fbb9..a5de5c448 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -557,6 +557,10 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) std::vector<BlockStoreLocation> ChunkLocations; std::vector<IoHash> ChunkIndexToChunkHash; + std::atomic<bool> Abort; + std::atomic<bool> Pause; + ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog); + try { RwLock::SharedLockScope _(m_LocationMapLock); @@ -589,58 +593,57 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) } IoBuffer Buffer(IoBuffer::Wrap, Data, Size); - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + + if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Buffer)), &Hash)) { - if (RawHash == Hash) - { - // TODO: this should also hash the (decompressed) contents - return true; - } + BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); } - BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); return true; }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - Ctx.ThrowIfDeadlineExpired(); - ChunkCount.fetch_add(1); ChunkBytes.fetch_add(Size); const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); - IoHash RawHash; - uint64_t RawSize; - // TODO: Add API to verify compressed buffer without having to memory-map the whole file - if (CompressedBuffer::ValidateCompressedHeader(Buffer, RawHash, RawSize)) + if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Buffer)), &Hash)) { - if (RawHash == Hash) - { - // TODO: this should also hash the (decompressed) contents - return true; - } + BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); } - BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Hash); }); return true; }; m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { - return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0); + Ctx.ThrowIfDeadlineExpired(); + Work.ScheduleWork( + Ctx.ThreadPool(), + [&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0); + } + }); + return !Abort; }); + Work.Wait(); } catch (const ScrubDeadlineExpiredException&) { ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + Abort = true; + Work.Wait(); } Ctx.ReportScrubbed(ChunkCount, ChunkBytes); if (!BadKeys.empty()) { - ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); + ZEN_WARN("Scrubbing found {} bad chunks out of {} in '{}'", + BadKeys.size(), + ChunkCount.load(), + m_RootDirectory / m_ContainerBaseName); if (Ctx.RunRecovery()) { diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 13d881e0c..31b3a68c4 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -799,8 +799,11 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) ZEN_ASSERT(m_IsInitialized); - std::vector<IoHash> BadHashes; - uint64_t ChunkCount{0}, ChunkBytes{0}; + RwLock BadKeysLock; + std::vector<IoHash> BadKeys; + auto ReportBadKey = [&](const IoHash& Key) { BadKeysLock.WithExclusiveLock([&]() { BadKeys.push_back(Key); }); }; + + uint64_t ChunkCount{0}, ChunkBytes{0}; int DiscoveredFilesNotInIndex = 0; @@ -820,88 +823,56 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) ZEN_INFO("discovered {} files @ '{}' ({} not in index), scrubbing", m_Index.size(), m_RootDirectory, DiscoveredFilesNotInIndex); - IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { - if (!Payload) - { - BadHashes.push_back(Hash); - return; - } - ++ChunkCount; - ChunkBytes += Payload.GetSize(); + std::atomic<bool> Abort; + std::atomic<bool> Pause; + ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog); + + try + { + IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { + Ctx.ThrowIfDeadlineExpired(); - IoBuffer InMemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Payload); + ChunkCount++; + ChunkBytes += Payload.GetSize(); - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Payload, /* out */ RawHash, /* out */ RawSize)) - { - if (RawHash == Hash) + if (!Payload) { - // Header hash matches the file name, full validation requires that - // we check that the decompressed data hash also matches + ReportBadKey(Hash); + return; + } - CompressedBuffer CompBuffer = CompressedBuffer::FromCompressedNoValidate(std::move(InMemoryBuffer)); + Payload.MakeOwned(); - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - uint64_t BlockSize; - if (CompBuffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + Work.ScheduleWork(Ctx.ThreadPool(), [&, Hash = IoHash(Hash), Payload = std::move(Payload)](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) { - if (BlockSize == 0) - { - BlockSize = 256 * 1024; - } - else if (BlockSize < (1024 * 1024)) - { - BlockSize = BlockSize * (1024 * 1024 / BlockSize); - } - - std::unique_ptr<uint8_t[]> DecompressionBuffer(new uint8_t[BlockSize]); - - IoHashStream Hasher; - - uint64_t RawOffset = 0; - while (RawSize) + if (!ValidateCompressedBuffer(CompositeBuffer(SharedBuffer(Payload)), &Hash)) { - const uint64_t DecompressedBlockSize = Min(BlockSize, RawSize); - - bool Ok = CompBuffer.TryDecompressTo(MutableMemoryView((void*)DecompressionBuffer.get(), DecompressedBlockSize), - RawOffset); - - if (Ok) - { - Hasher.Append(DecompressionBuffer.get(), DecompressedBlockSize); - } - - RawSize -= DecompressedBlockSize; - RawOffset += DecompressedBlockSize; - } - - const IoHash FinalHash = Hasher.GetHash(); - - if (FinalHash == Hash) - { - // all good - return; + ReportBadKey(Hash); } } - } - } - - BadHashes.push_back(Hash); - }); + }); + }); + Work.Wait(); + } + catch (const ScrubDeadlineExpiredException&) + { + ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + Abort = true; + Work.Wait(); + } Ctx.ReportScrubbed(ChunkCount, ChunkBytes); - if (!BadHashes.empty()) + if (!BadKeys.empty()) { - ZEN_WARN("file CAS scrubbing: {} bad chunks found @ '{}'", BadHashes.size(), m_RootDirectory); + ZEN_WARN("file CAS scrubbing: {} bad chunks out of {} found @ '{}'", BadKeys.size(), ChunkCount, m_RootDirectory); if (Ctx.RunRecovery()) { - ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size()); + ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadKeys.size()); - for (const IoHash& Hash : BadHashes) + for (const IoHash& Hash : BadKeys) { std::error_code Ec; DeleteChunk(Hash, Ec); @@ -914,14 +885,14 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) } else { - ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadHashes.size()); + ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadKeys.size()); } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do - Ctx.ReportBadCidChunks(BadHashes); + Ctx.ReportBadCidChunks(BadKeys); ZEN_INFO("file CAS @ '{}' scrubbed: {} chunks ({}), took {}", m_RootDirectory, diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index ba1bce974..a4a141577 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -1810,6 +1810,8 @@ GcScheduler::Shutdown() ZEN_TRACE_CPU("GcScheduler::Shutdown"); ZEN_MEMSCOPE(GetGcTag()); + m_GcManager.SetCancelGC(true); + if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status) { bool GcIsRunning = m_Status == static_cast<uint32_t>(GcSchedulerStatus::kRunning); @@ -1817,18 +1819,18 @@ GcScheduler::Shutdown() { ZEN_INFO("Requesting cancel running garbage collection"); } - m_GcManager.SetCancelGC(true); m_Status = static_cast<uint32_t>(GcSchedulerStatus::kStopped); - m_GcSignal.notify_one(); + m_GcSignal.Set(); + } - if (m_GcThread.joinable()) + if (m_GcThread.joinable()) + { + bool GcIsRunning = m_Status == static_cast<uint32_t>(GcSchedulerStatus::kRunning); + if (GcIsRunning) { - if (GcIsRunning) - { - ZEN_INFO("Waiting for garbage collection to complete"); - } - m_GcThread.join(); + ZEN_INFO("Waiting for garbage collection to complete"); } + m_GcThread.join(); } m_DiskUsageLog.Flush(); m_DiskUsageLog.Close(); @@ -1839,17 +1841,17 @@ GcScheduler::TriggerGc(const GcScheduler::TriggerGcParams& Params) { ZEN_MEMSCOPE(GetGcTag()); std::unique_lock Lock(m_GcMutex); - if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) + + if (m_TriggerGcParams || m_TriggerScrubParams) { - m_TriggerGcParams = Params; - uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); + return false; + } - if (m_Status.compare_exchange_strong(/* expected */ IdleState, - /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning))) - { - m_GcSignal.notify_one(); - return true; - } + if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status) + { + m_TriggerGcParams = Params; + m_GcSignal.Set(); + return true; } return false; } @@ -1860,17 +1862,16 @@ GcScheduler::TriggerScrub(const TriggerScrubParams& Params) ZEN_MEMSCOPE(GetGcTag()); std::unique_lock Lock(m_GcMutex); - if (static_cast<uint32_t>(GcSchedulerStatus::kIdle) == m_Status) + if (m_TriggerGcParams || m_TriggerScrubParams) { - m_TriggerScrubParams = Params; - uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); - - if (m_Status.compare_exchange_strong(/* expected */ IdleState, /* desired */ static_cast<uint32_t>(GcSchedulerStatus::kRunning))) - { - m_GcSignal.notify_one(); + return false; + } - return true; - } + if (static_cast<uint32_t>(GcSchedulerStatus::kStopped) != m_Status) + { + m_TriggerScrubParams = Params; + m_GcSignal.Set(); + return true; } return false; @@ -1977,8 +1978,6 @@ GcScheduler::AppendGCLog(std::string_view Id, GcClock::TimePoint StartTime, cons MemoryView EntryBuffer(Blob.data(), Blob.size()); { - RwLock::ExclusiveLockScope _(m_GcLogLock); - GcLogFile.Open(Path, BasicFile::Mode::kWrite); uint64_t AppendPos = GcLogFile.FileSize(); @@ -2096,13 +2095,25 @@ GcScheduler::GetState() const return Result; } +bool +GcScheduler::IsManualTriggerPresent() const +{ + bool IsPending = Status() != GcSchedulerStatus::kStopped; + if (IsPending) + { + std::unique_lock Lock(m_GcMutex); + IsPending = m_TriggerGcParams || m_TriggerScrubParams; + } + return IsPending; +} + void GcScheduler::SchedulerThread() { ZEN_MEMSCOPE(GetGcTag()); SetCurrentThreadName("GcScheduler"); - std::chrono::seconds WaitTime{0}; + std::chrono::seconds WaitTime{m_Config.Enabled ? std::chrono::seconds{0} : std::chrono::seconds::max()}; const std::chrono::seconds ShortWaitTime{5}; bool SilenceErrors = false; @@ -2111,60 +2122,67 @@ GcScheduler::SchedulerThread() (void)CheckDiskSpace(); std::chrono::seconds WaitedTime{0}; - bool Timeout = false; + + std::optional<TriggerGcParams> TriggerGcParams; + std::optional<TriggerScrubParams> TriggerScrubParams; + + ZEN_ASSERT(WaitTime.count() >= 0); + while (Status() != GcSchedulerStatus::kStopped) { - ZEN_ASSERT(WaitTime.count() >= 0); - std::unique_lock Lock(m_GcMutex); - while (!Timeout && (Status() != GcSchedulerStatus::kStopped)) - { - std::chrono::seconds ShortWait = Min(WaitTime, ShortWaitTime); - bool ShortTimeout = std::cv_status::timeout == m_GcSignal.wait_for(Lock, ShortWait); + std::chrono::seconds ShortWait = Min(WaitTime, ShortWaitTime); + bool ShortTimeout = !m_GcSignal.Wait(gsl::narrow<int>(ShortWait.count() * 1000)); - if (ShortTimeout) + if (ShortTimeout) + { + if (WaitTime > ShortWaitTime) { - if (WaitTime > ShortWaitTime) + DiskSpace Space = CheckDiskSpace(); + if (!AreDiskWritesAllowed()) { - DiskSpace Space = CheckDiskSpace(); - if (!AreDiskWritesAllowed()) - { - ZEN_INFO("Triggering GC due to low disk space ({}) on {}", NiceBytes(Space.Free), m_Config.RootDirectory); - Timeout = true; - } - WaitTime -= ShortWaitTime; - } - else - { - Timeout = true; + ZEN_INFO("Triggering GC due to low disk space ({}) on {}", NiceBytes(Space.Free), m_Config.RootDirectory); + break; } + WaitTime -= ShortWaitTime; } else { - // We got a signal break; } } + else + { + m_GcSignal.Reset(); + { + std::unique_lock Lock(m_GcMutex); + TriggerGcParams = m_TriggerGcParams; + TriggerScrubParams = m_TriggerScrubParams; + } + break; + } } + auto TriggerCleanup = MakeGuard([&]() { + if (TriggerGcParams || TriggerScrubParams) + { + std::unique_lock Lock(m_GcMutex); + m_TriggerGcParams.reset(); + m_TriggerScrubParams.reset(); + } + }); + if (Status() == GcSchedulerStatus::kStopped) { break; } - if (!m_Config.Enabled && !m_TriggerScrubParams && !m_TriggerGcParams) - { - WaitTime = std::chrono::seconds::max(); - continue; - } - - if (!Timeout && Status() == GcSchedulerStatus::kIdle) - { - continue; - } - try { - bool DoGc = m_Config.Enabled; - bool DoScrubbing = false; + bool ManualGcTriggered = false; + bool ManualScrubbingTriggered = false; + bool LowDiskSpaceGCTriggered = false; + bool HighDiskSpaceUsageGCTriggered = false; + bool TimeBasedGCTriggered = false; + std::chrono::seconds ScrubTimeslice = std::chrono::seconds::max(); bool DoDelete = true; bool CollectSmallObjects = m_Config.CollectSmallObjects; @@ -2189,16 +2207,33 @@ GcScheduler::SchedulerThread() uint8_t NextAttachmentPassIndex = ComputeAttachmentRange(m_AttachmentPassIndex, m_Config.AttachmentPassCount, AttachmentRangeMin, AttachmentRangeMax); - bool LowDiskSpaceGCTriggered = false; - bool HighDiskSpaceUsageGCTriggered = false; - bool TimeBasedGCTriggered = false; - GcClock::TimePoint Now = GcClock::Now(); - if (m_TriggerGcParams) + if (TriggerScrubParams) { - const auto TriggerParams = m_TriggerGcParams.value(); - m_TriggerGcParams.reset(); + ZEN_ASSERT_SLOW(!TriggerGcParams); + ZEN_INFO("Manual scrub triggered"); + const auto TriggerParams = TriggerScrubParams.value(); + + ManualScrubbingTriggered = true; + + if (!TriggerParams.SkipGc) + { + ManualGcTriggered = true; + } + + if (TriggerParams.SkipCas) + { + SkipCid = true; + } + + DoDelete = !TriggerParams.SkipDelete; + ScrubTimeslice = TriggerParams.MaxTimeslice; + } + else if (TriggerGcParams) + { + ZEN_INFO("Manual gc triggered"); + const auto TriggerParams = TriggerGcParams.value(); CollectSmallObjects = TriggerParams.CollectSmallObjects; @@ -2249,34 +2284,10 @@ GcScheduler::SchedulerThread() { EnableValidation = TriggerParams.EnableValidation.value(); } - DoGc = true; - } - - if (m_TriggerScrubParams) - { - DoScrubbing = true; - - if (m_TriggerScrubParams->SkipGc) - { - DoGc = false; - } - - if (m_TriggerScrubParams->SkipCas) - { - SkipCid = true; - } - - DoDelete = !m_TriggerScrubParams->SkipDelete; - ScrubTimeslice = m_TriggerScrubParams->MaxTimeslice; + ManualGcTriggered = true; } - if (DoScrubbing) - { - ScrubStorage(DoDelete, SkipCid, ScrubTimeslice); - m_TriggerScrubParams.reset(); - } - - if (!DoGc) + if (!ManualScrubbingTriggered && !ManualGcTriggered && !m_Config.Enabled) { continue; } @@ -2288,10 +2299,12 @@ GcScheduler::SchedulerThread() GcClock::TimePoint BuildStoreExpireTime = MaxBuildStoreDuration == GcClock::Duration::max() ? GcClock::TimePoint::min() : Now - MaxBuildStoreDuration; - const GcStorageSize TotalSize = m_GcManager.TotalStorageSize(); - - if (Timeout && Status() == GcSchedulerStatus::kIdle) + if (!ManualGcTriggered && !ManualScrubbingTriggered) { + // Check for GC triggered by time/size limits + + const GcStorageSize TotalSize = m_GcManager.TotalStorageSize(); + DiskSpace Space = CheckDiskSpace(); const int64_t PressureGraphLength = 30; @@ -2508,7 +2521,9 @@ GcScheduler::SchedulerThread() } continue; } + } + { uint32_t IdleState = static_cast<uint32_t>(GcSchedulerStatus::kIdle); if (!m_Status.compare_exchange_strong(IdleState, static_cast<uint32_t>(GcSchedulerStatus::kRunning))) { @@ -2517,13 +2532,30 @@ GcScheduler::SchedulerThread() } } - if (!SkipCid) + auto ResetState = MakeGuard([&]() { + uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning); + if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle))) + { + ZEN_ASSERT(m_Status == static_cast<uint32_t>(GcSchedulerStatus::kStopped)); + } + }); + + if (ManualScrubbingTriggered) { - m_AttachmentPassIndex = NextAttachmentPassIndex; + ScrubStorage(DoDelete, SkipCid, ScrubTimeslice); + if (!ManualGcTriggered) + { + continue; + } } if (PrepareDiskReserve()) { + if (!SkipCid) + { + m_AttachmentPassIndex = NextAttachmentPassIndex; + } + bool GcSuccess = CollectGarbage(CacheExpireTime, ProjectStoreExpireTime, BuildStoreExpireTime, @@ -2596,13 +2628,6 @@ GcScheduler::SchedulerThread() WaitTime = m_Config.MonitorInterval; } m_GcManager.SetCancelGC(false); - - uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning); - if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle))) - { - ZEN_ASSERT(m_Status == static_cast<uint32_t>(GcSchedulerStatus::kStopped)); - break; - } } } diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h index 8f40ae727..791720589 100644 --- a/src/zenstore/include/zenstore/cache/cacheshared.h +++ b/src/zenstore/include/zenstore/cache/cacheshared.h @@ -78,6 +78,6 @@ enum class PutStatus }; bool IsKnownBadBucketName(std::string_view BucketName); -bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer); +bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer&& Buffer); } // namespace zen diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index 5150ecd42..734d2e5a7 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -488,10 +488,10 @@ public: GcScheduler(GcManager& GcManager); ~GcScheduler(); - void Initialize(const GcSchedulerConfig& Config); - void Shutdown(); - GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); } - GcSchedulerState GetState() const; + void Initialize(const GcSchedulerConfig& Config); + void Shutdown(); + bool IsManualTriggerPresent() const; + GcSchedulerState GetState() const; struct TriggerGcParams { @@ -528,30 +528,31 @@ public: bool CancelGC(); private: - void SchedulerThread(); - bool ReclaimDiskReserve(); - bool PrepareDiskReserve(); - bool CollectGarbage(const GcClock::TimePoint& CacheExpireTime, - const GcClock::TimePoint& ProjectStoreExpireTime, - const GcClock::TimePoint& BuildStoreExpireTime, - bool Delete, - bool CollectSmallObjects, - bool SkipCid, - GcVersion UseGCVersion, - uint32_t CompactBlockUsageThresholdPercent, - bool Verbose, - bool SingleThreaded, - const IoHash& AttachmentRangeMin, - const IoHash& AttachmentRangeMax, - bool StoreCacheAttachmentMetaData, - bool StoreProjectAttachmentMetaData, - bool EnableValidation, - bool SilenceErrors); - void ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds TimeSlice); - LoggerRef Log() { return m_Log; } - virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); } - DiskSpace CheckDiskSpace(); - void AppendGCLog(std::string_view Id, GcClock::TimePoint GcStartTime, const GcSettings& Settings, const GcResult& Result); + GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); } + void SchedulerThread(); + bool ReclaimDiskReserve(); + bool PrepareDiskReserve(); + bool CollectGarbage(const GcClock::TimePoint& CacheExpireTime, + const GcClock::TimePoint& ProjectStoreExpireTime, + const GcClock::TimePoint& BuildStoreExpireTime, + bool Delete, + bool CollectSmallObjects, + bool SkipCid, + GcVersion UseGCVersion, + uint32_t CompactBlockUsageThresholdPercent, + bool Verbose, + bool SingleThreaded, + const IoHash& AttachmentRangeMin, + const IoHash& AttachmentRangeMax, + bool StoreCacheAttachmentMetaData, + bool StoreProjectAttachmentMetaData, + bool EnableValidation, + bool SilenceErrors); + void ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds TimeSlice); + LoggerRef Log() { return m_Log; } + virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); } + DiskSpace CheckDiskSpace(); + void AppendGCLog(std::string_view Id, GcClock::TimePoint GcStartTime, const GcSettings& Settings, const GcResult& Result); LoggerRef m_Log; GcManager& m_GcManager; @@ -571,18 +572,17 @@ private: std::optional<GcResult> m_LastLightweightGCV2Result; std::optional<GcResult> m_LastFullGCV2Result; - std::atomic_uint32_t m_Status{}; - std::thread m_GcThread; - mutable std::mutex m_GcMutex; - std::condition_variable m_GcSignal; + std::atomic_uint32_t m_Status{}; + std::thread m_GcThread; + mutable std::mutex m_GcMutex; + Event m_GcSignal; + std::optional<TriggerGcParams> m_TriggerGcParams; std::optional<TriggerScrubParams> m_TriggerScrubParams; std::atomic_bool m_AreDiskWritesBlocked = false; TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog; DiskUsageWindow m_DiskUsageWindow; - - RwLock m_GcLogLock; }; void gc_forcelink(); diff --git a/src/zenstore/include/zenstore/projectstore.h b/src/zenstore/include/zenstore/projectstore.h index 258be5930..ad108f65b 100644 --- a/src/zenstore/include/zenstore/projectstore.h +++ b/src/zenstore/include/zenstore/projectstore.h @@ -133,6 +133,7 @@ public: void IterateOplog(std::function<void(CbObjectView)>&& Fn, const Paging& EntryPaging); void IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Fn); void IterateOplogWithKey(std::function<void(LogSequenceNumber, const Oid&, CbObjectView)>&& Fn, const Paging& EntryPaging); + void IterateOplogWithKeyRaw(std::function<void(LogSequenceNumber, const Oid&, const IoBuffer& Buffer)>&& Handler); void IterateOplogLocked(std::function<void(CbObjectView)>&& Fn, const Paging& EntryPaging); size_t GetOplogEntryCount() const; diff --git a/src/zenstore/include/zenstore/scrubcontext.h b/src/zenstore/include/zenstore/scrubcontext.h index 2f28cfec7..0562ca8c5 100644 --- a/src/zenstore/include/zenstore/scrubcontext.h +++ b/src/zenstore/include/zenstore/scrubcontext.h @@ -8,6 +8,7 @@ namespace zen { class WorkerThreadPool; +class CompositeBuffer; /** Context object for data scrubbing @@ -67,4 +68,6 @@ public: ~ScrubDeadlineExpiredException(); }; +bool ValidateCompressedBuffer(const CompositeBuffer& Buffer, const IoHash* OptionalExpectedHash); + } // namespace zen diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp index 7570b8513..7e9ff50bb 100644 --- a/src/zenstore/projectstore.cpp +++ b/src/zenstore/projectstore.cpp @@ -774,70 +774,125 @@ struct ProjectStore::OplogStorage : public RefCounted ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries"); BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); + const uint64_t BlobsSize = m_OpBlobs.FileSize(); for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order) { const Oplog::OplogPayload& Entry = Entries[EntryOffset]; const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign; - MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); - if (OpBufferView.GetSize() == Entry.Address.Size) + if (OpFileOffset + Entry.Address.Size > BlobsSize) { - if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None) + ZEN_WARN("oplog '{}/{}': skipping op outside of file size - {}. Op offset: {}, Op size: {}, file size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + OpFileOffset, + Entry.Address.Size, + BlobsSize); + } + else + { + MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); + if (OpBufferView.GetSize() == Entry.Address.Size) { - CbObjectView OpView(OpBufferView.GetData()); - if (OpView.GetSize() != OpBufferView.GetSize()) + if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); + Error == CbValidateError::None) { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - Entry.Lsn.Number, - OpView.GetSize(), - OpBufferView.GetSize()); + CbObjectView OpView(OpBufferView.GetData()); + if (OpView.GetSize() != OpBufferView.GetSize()) + { + ZEN_WARN( + "oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + OpView.GetSize(), + OpBufferView.GetSize()); + } + else + { + Handler(Entry.Lsn, OpView); + } } else { - Handler(Entry.Lsn, OpView); + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + ToString(Error)); } } else { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - Entry.Lsn.Number, - ToString(Error)); - } - } - else - { - IoBuffer OpBuffer(Entry.Address.Size); - OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); - OpBufferView = OpBuffer.GetView(); - if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error == CbValidateError::None) - { - CbObjectView OpView(OpBuffer.Data()); - if (OpView.GetSize() != OpBuffer.GetSize()) + IoBuffer OpBuffer(Entry.Address.Size); + OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); + OpBufferView = OpBuffer.GetView(); + if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); + Error == CbValidateError::None) { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - Entry.Lsn.Number, - OpView.GetSize(), - OpBuffer.GetSize()); + CbObjectView OpView(OpBuffer.Data()); + if (OpView.GetSize() != OpBuffer.GetSize()) + { + ZEN_WARN( + "oplog '{}/{}': skipping invalid format op - {}. Object payload size {} does not match op data size {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + OpView.GetSize(), + OpBuffer.GetSize()); + } + else + { + Handler(Entry.Lsn, OpView); + } } else { - Handler(Entry.Lsn, OpView); + ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}", + m_OwnerOplog->GetOuterProjectIdentifier(), + m_OwnerOplog->OplogId(), + Entry.Lsn.Number, + ToString(Error)); } } + } + } + } + + void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries, + const std::span<const Oplog::PayloadIndex> Order, + std::function<void(LogSequenceNumber Lsn, const IoBuffer& Buffer)>&& Handler) + { + ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries"); + + BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); + const uint64_t BlobsSize = m_OpBlobs.FileSize(); + + for (ProjectStore::Oplog::PayloadIndex EntryOffset : Order) + { + const Oplog::OplogPayload& Entry = Entries[EntryOffset]; + + const uint64_t OpFileOffset = Entry.Address.Offset * m_OpsAlign; + if (OpFileOffset + Entry.Address.Size > BlobsSize) + { + Handler(Entry.Lsn, {}); + } + else + { + MemoryView OpBufferView = OpBlobsBuffer.MakeView(Entry.Address.Size, OpFileOffset); + if (OpBufferView.GetSize() == Entry.Address.Size) + { + IoBuffer Buffer = IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize()); + Handler(Entry.Lsn, Buffer); + } else { - ZEN_WARN("oplog '{}/{}': skipping invalid format op - {}. Validation error: {}", - m_OwnerOplog->GetOuterProjectIdentifier(), - m_OwnerOplog->OplogId(), - Entry.Lsn.Number, - ToString(Error)); + IoBuffer OpBuffer(Entry.Address.Size); + OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset); + Handler(Entry.Lsn, OpBuffer); } } } @@ -1118,40 +1173,77 @@ ProjectStore::Oplog::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); + ZEN_INFO("scrubbing oplog '{}/{}'", m_OuterProjectId, m_OplogId); + ZEN_ASSERT(m_Mode == EMode::kFull); + Stopwatch Timer; + std::atomic_uint64_t OpCount = 0; + std::atomic_uint64_t VerifiedOpBytes = 0; + + auto LogStats = MakeGuard([&] { + const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs()); + + ZEN_INFO("oplog '{}/{}' scrubbed {} in {} from {} ops ({})", + m_OuterProjectId, + m_OplogId, + NiceBytes(VerifiedOpBytes.load()), + NiceTimeSpanMs(DurationMs), + OpCount.load(), + NiceRate(VerifiedOpBytes, DurationMs)); + }); + std::vector<std::pair<LogSequenceNumber, Oid>> BadEntries; using namespace std::literals; - IterateOplogWithKey([&](LogSequenceNumber Lsn, const Oid& Key, CbObjectView Op) { + IterateOplogWithKeyRaw([&](LogSequenceNumber Lsn, const Oid& Key, const IoBuffer& Buffer) { + Ctx.ThrowIfDeadlineExpired(); + + OpCount++; + VerifiedOpBytes += Buffer.GetSize(); + + if (!Buffer) { - const Oid KeyHash = ComputeOpKey(Op); - if (KeyHash != Key) + ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) could not be read from disk", Key, Lsn.Number); + BadEntries.push_back({Lsn, Key}); + return; + } + { + MemoryView OpBufferView = Buffer.GetView(); + if (CbValidateError Error = ValidateCompactBinary(OpBufferView, CbValidateMode::Default); Error != CbValidateError::None) { + ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) is not valid compact binary. Error: {}", Key, Lsn.Number, ToString(Error)); BadEntries.push_back({Lsn, Key}); - ZEN_WARN("Scrub: oplog data does not match information from index (op:{} != index:{})", KeyHash, Key); return; } } - // TODO: Should we really delete an Op because it points to a missing or malformed Cid chunk? + CbObjectView OpView(Buffer.GetData()); + if (OpView.GetSize() != Buffer.GetSize()) + { + ZEN_WARN("Scrub: oplog payload size {} for op {} (Lns: {}) does not match object size {}", + Buffer.GetSize(), + Key, + Lsn.Number, + OpView.GetSize()); + BadEntries.push_back({Lsn, Key}); + return; + } - Op.IterateAttachments([&](CbFieldView Visitor) { - const IoHash Cid = Visitor.AsAttachment(); - if (Ctx.IsBadCid(Cid)) - { - // oplog entry references a CAS chunk which has been flagged as bad - BadEntries.push_back({Lsn, Key}); - return; - } - if (!m_CidStore.ContainsChunk(Cid)) + { + const Oid KeyHash = ComputeOpKey(OpView); + if (KeyHash != Key) { - // oplog entry references a CAS chunk which is not present BadEntries.push_back({Lsn, Key}); + ZEN_WARN("Scrub: oplog data for op {} (Lns: {}) does not match information from index (op:{} != index:{})", + Key, + Lsn.Number, + KeyHash, + Key); return; } - }); + } }); if (!BadEntries.empty()) @@ -2577,6 +2669,32 @@ ProjectStore::Oplog::IterateOplogWithKey(std::function<void(LogSequenceNumber, c } } +void +ProjectStore::Oplog::IterateOplogWithKeyRaw(std::function<void(LogSequenceNumber, const Oid&, const IoBuffer& Buffer)>&& Handler) +{ + ZEN_MEMSCOPE(GetProjectstoreTag()); + + tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap; + std::vector<PayloadIndex> ReplayOrder; + + { + RwLock::SharedLockScope _(m_OplogLock); + if (m_Storage) + { + ReplayOrder = GetSortedOpPayloadRangeLocked({}, &ReverseKeyMap); + if (!ReplayOrder.empty()) + { + uint32_t EntryIndex = 0; + m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, const IoBuffer& Buffer) { + const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex]; + Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Buffer); + EntryIndex++; + }); + } + } + } +} + static constexpr uint32_t OplogMetaDataExpectedMagic = 0x6f'74'6d'62; // 'omta'; void @@ -3773,18 +3891,64 @@ void ProjectStore::Project::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); + + ZEN_INFO("scrubbing '{}'", ProjectRootDir); + // Scrubbing needs to check all existing oplogs std::vector<std::string> OpLogs = ScanForOplogs(); - for (const std::string& OpLogId : OpLogs) + + RwLock::SharedLockScope _(m_ProjectLock); + + std::atomic<bool> Abort; + std::atomic<bool> Pause; + ParallelWork Work(Abort, Pause, WorkerThreadPool::EMode::DisableBacklog); + + try { - OpenOplog(OpLogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); - } - IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) { - if (!IsExpired(GcClock::TimePoint::min(), Ops)) + for (const std::string& OpLogId : OpLogs) { - Ops.Scrub(Ctx); + Ref<ProjectStore::Oplog> OpLog; + { + if (auto OpIt = m_Oplogs.find(OpLogId); OpIt != m_Oplogs.end()) + { + OpLog = OpIt->second; + } + else + { + std::filesystem::path OplogBasePath = BasePathForOplog(OpLogId); + if (ProjectStore::Oplog::ExistsAt(OplogBasePath)) + { + OpLog = new ProjectStore::Oplog( + Log(), + Identifier, + OpLogId, + m_CidStore, + OplogBasePath, + std::filesystem::path{}, + ProjectStore::Oplog::EMode::kFull); // We need it to be a full read so we can write a new index snapshot + OpLog->Read(); + } + } + } + + if (OpLog) + { + Work.ScheduleWork(Ctx.ThreadPool(), [OpLog, &Ctx](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + OpLog->Scrub(Ctx); + } + }); + } } - }); + Work.Wait(); + } + catch (const ScrubDeadlineExpiredException&) + { + ZEN_INFO("Scrubbing deadline expired, operation incomplete"); + Abort = true; + Work.Wait(); + } } uint64_t @@ -4454,7 +4618,10 @@ ProjectStore::GetProjectFiles(LoggerRef InLog, Project& Project, Oplog& Oplog, c if (WantsRawSizeField) { IoHash _; - if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index])) + if (CompressedBuffer::ValidateCompressedHeader(Payload, + _, + RawSizes[Index], + /*OutOptionalTotalCompressedSize*/ nullptr)) { if (WantsSizeField) { @@ -4611,7 +4778,10 @@ ProjectStore::GetProjectChunkInfos(LoggerRef InLog, Project& Project, Oplog& Opl { ZEN_ASSERT_SLOW(RawSizes[Index] == (uint64_t)-1); IoHash _; - if (CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSizes[Index])) + if (CompressedBuffer::ValidateCompressedHeader(Payload, + _, + RawSizes[Index], + /*OutOptionalTotalCompressedSize*/ nullptr)) { if (WantsSizeField) { @@ -4722,7 +4892,7 @@ ProjectStore::GetChunkInfo(LoggerRef InLog, Project& Project, Oplog& Oplog, cons { IoHash RawHash; uint64_t RawSize; - bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize); + bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr); if (!IsCompressed) { throw std::runtime_error( diff --git a/src/zenstore/scrubcontext.cpp b/src/zenstore/scrubcontext.cpp index fbcd7d33c..8f8ec09a7 100644 --- a/src/zenstore/scrubcontext.cpp +++ b/src/zenstore/scrubcontext.cpp @@ -2,6 +2,10 @@ #include "zenstore/scrubcontext.h" +#include <zencore/compress.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/string.h> #include <zencore/workthreadpool.h> namespace zen { @@ -62,4 +66,64 @@ ScrubContext::ThrowIfDeadlineExpired() const throw ScrubDeadlineExpiredException(); } +bool +ValidateCompressedBuffer(const CompositeBuffer& Buffer, const IoHash* OptionalExpectedHash) +{ + IoHash HeaderRawHash; + uint64_t RawSize = 0; + uint64_t TotalCompressedSize = 0; + if (!CompressedBuffer::ValidateCompressedHeader(Buffer, HeaderRawHash, RawSize, &TotalCompressedSize)) + { + if (OptionalExpectedHash) + { + ZEN_SCOPED_WARN("compressed buffer header validation failed for chunk with hash {}", *OptionalExpectedHash); + } + else + { + ZEN_SCOPED_WARN("compressed buffer header validation failed"); + } + return false; + } + + if (OptionalExpectedHash != nullptr && HeaderRawHash != (*OptionalExpectedHash)) + { + ZEN_SCOPED_WARN("compressed buffer hash {} does not match expected hash {}", HeaderRawHash, *OptionalExpectedHash); + return false; + } + + if (TotalCompressedSize != Buffer.GetSize()) + { + ZEN_SCOPED_WARN("compressed buffer size does not match total compressed size in header for chunk {}", HeaderRawHash); + return false; + } + + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Buffer, /* out */ HeaderRawHash, /* out */ RawSize); + + IoHashStream HashStream; + if (!Compressed.DecompressToStream( + 0, + RawSize, + [&HashStream](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) -> bool { + ZEN_UNUSED(SourceOffset, SourceSize, Offset); + for (const SharedBuffer& Segment : Range.GetSegments()) + { + HashStream.Append(Segment); + } + return true; + })) + { + ZEN_SCOPED_WARN("compressed buffer could not be decompressed for chunk {}", HeaderRawHash); + return false; + } + + IoHash DecompressedHash = HashStream.GetHash(); + + if (HeaderRawHash != DecompressedHash) + { + ZEN_SCOPED_WARN("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash); + return false; + } + return true; +} + } // namespace zen |