// Copyright Epic Games, Inc. All Rights Reserved. #include "zenstore/scrubcontext.h" #include #include #include #include #include namespace zen { ScrubDeadlineExpiredException::ScrubDeadlineExpiredException() : std::runtime_error("scrubbing deadline expired") { } ScrubDeadlineExpiredException::~ScrubDeadlineExpiredException() { } ////////////////////////////////////////////////////////////////////////// ScrubContext::ScrubContext(WorkerThreadPool& InWorkerThreadPool, std::chrono::steady_clock::time_point Deadline) : m_WorkerThreadPool(InWorkerThreadPool) , m_Deadline(Deadline) { } ScrubContext::~ScrubContext() { } HashKeySet ScrubContext::BadCids() const { RwLock::SharedLockScope _(m_Lock); return m_BadCid; } bool ScrubContext::IsBadCid(const IoHash& Cid) const { RwLock::SharedLockScope _(m_Lock); return m_BadCid.ContainsHash(Cid); } void ScrubContext::ReportBadCidChunks(std::span BadCasChunks) { RwLock::ExclusiveLockScope _(m_Lock); m_BadCid.AddHashesToSet(BadCasChunks); } bool ScrubContext::IsWithinDeadline() const { return std::chrono::steady_clock::now() < m_Deadline; } void ScrubContext::ThrowIfDeadlineExpired() const { if (IsWithinDeadline()) return; throw ScrubDeadlineExpiredException(); } bool ValidateCompressedBuffer(const CompositeBuffer& Buffer, const IoHash* OptionalExpectedHash) { IoHash HeaderRawHash; uint64_t RawSize = 0; uint64_t TotalCompressedSize = 0; if (!CompressedBuffer::ValidateCompressedHeader(Buffer, HeaderRawHash, RawSize, &TotalCompressedSize)) { if (OptionalExpectedHash) { ZEN_SCOPED_WARN("compressed buffer header validation failed for chunk with hash {}", *OptionalExpectedHash); } else { ZEN_SCOPED_WARN("compressed buffer header validation failed"); } return false; } if (OptionalExpectedHash != nullptr && HeaderRawHash != (*OptionalExpectedHash)) { ZEN_SCOPED_WARN("compressed buffer hash {} does not match expected hash {}", HeaderRawHash, *OptionalExpectedHash); return false; } if (TotalCompressedSize != Buffer.GetSize()) { ZEN_SCOPED_WARN("compressed buffer size does not match total compressed size in header for chunk {}", HeaderRawHash); return false; } CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Buffer, /* out */ HeaderRawHash, /* out */ RawSize); IoHashStream HashStream; if (!Compressed.DecompressToStream( 0, RawSize, [&HashStream](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) -> bool { ZEN_UNUSED(SourceOffset, SourceSize, Offset); for (const SharedBuffer& Segment : Range.GetSegments()) { HashStream.Append(Segment); } return true; })) { ZEN_SCOPED_WARN("compressed buffer could not be decompressed for chunk {}", HeaderRawHash); return false; } IoHash DecompressedHash = HashStream.GetHash(); if (HeaderRawHash != DecompressedHash) { ZEN_SCOPED_WARN("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash); return false; } return true; } } // namespace zen