// Copyright Epic Games, Inc. All Rights Reserved. #include "s3asyncstorage.h" #include #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include # include # include #endif namespace zen { using namespace std::literals; namespace { // Bounded write-buffer pool. OnData callbacks on the io strand call // Acquire to obtain a fixed-size IoBuffer; workers call Release after // the dispatched write completes. The pool retains at most as many // buffers as future Acquire calls remain (PendingAcquires), so it // stabilises at peak write concurrency without dragging buffers past // the point they can ever be re-used. class WriteBufferPool { public: WriteBufferPool(size_t InBufferSize, uint32_t TotalAcquires) : m_BufferSize(InBufferSize), m_PendingAcquires(TotalAcquires) {} size_t BufferSize() const { return m_BufferSize; } IoBuffer Acquire() { IoBuffer Buf; bool Exhausted = false; m_Lock.WithExclusiveLock([&] { if (m_PendingAcquires == 0) { Exhausted = true; return; } --m_PendingAcquires; if (!m_Pool.empty()) { Buf = std::move(m_Pool.back()); m_Pool.pop_back(); } }); if (Exhausted) { // Caller (OnData / OnComplete tail flush on the curl io strand) // over-acquired vs the precomputed TotalBlocks budget. Throw a // real exception so the boundary catch in // AsyncCurlStreamWriteCallback can surface it; never let // ZEN_ASSERT propagate through curl's C frames. throw zen::runtime_error("WriteBufferPool exhausted: more buffers requested than expected"); } if (!Buf) { Buf = IoBuffer(m_BufferSize); } return Buf; } void Release(IoBuffer Buf) { m_Lock.WithExclusiveLock([&] { if (m_Pool.size() < m_PendingAcquires) { m_Pool.push_back(std::move(Buf)); } }); } // Returns the slot consumed by Acquire() back to the budget without // supplying a buffer; for paths where the buffer was already moved into a // lambda (e.g. ScheduleWork threw after the move). void RestoreAcquireSlot() { m_Lock.WithExclusiveLock([&] { ++m_PendingAcquires; }); } private: const size_t m_BufferSize; RwLock m_Lock; std::vector m_Pool; uint32_t m_PendingAcquires = 0; }; // Acquire one admission slot on the dispatcher thread. Returns a refcounted // handle whose deleter releases the slot on last drop, so callers thread it // through worker lambdas and AsyncXxx callbacks; whichever runs last fires // the release. nullptr when admission is disabled (Sem == nullptr). // // Stats may be null when the caller has no per-request stat block (DeleteAll); // the slot is still held but the wait time is not recorded. // // Exception safety: counting_semaphore::acquire is noexcept; only the // shared_ptr control-block allocation can throw. The standard's deleter- // invocation guarantee for shared_ptr(p, d) only kicks in when p is non- // null (p == nullptr here), so libstdc++/MSVC happen to invoke the deleter // on alloc failure but it is not portably required - hence the explicit // guard. std::shared_ptr AcquireAdmissionSlot(const std::shared_ptr& Sem, S3AsyncStorageStats* Stats = nullptr) { if (!Sem) { return nullptr; } Stopwatch AdmWait; Sem->acquire(); auto ReleaseGuard = MakeGuard([&Sem] { Sem->release(); }); std::shared_ptr SlotRef(nullptr, [Sem](void*) { Sem->release(); }); ReleaseGuard.Dismiss(); if (Stats) { Stats->RecordAdmissionWait(AdmWait.GetElapsedTimeUs()); } return SlotRef; } } // namespace S3AsyncStorage::S3AsyncStorage(AsyncHttpClient& Client, S3RequestBuilder& Builder, CredentialsCallback GetCreds, std::string KeyPrefix, uint64_t MultipartChunkSize, std::shared_ptr Admission, uint32_t AdmissionCap) : m_Client(Client) , m_Builder(Builder) , m_GetCreds(std::move(GetCreds)) , m_KeyPrefix(std::move(KeyPrefix)) , m_MultipartChunkSize(MultipartChunkSize) , m_Admission(std::move(Admission)) , m_AdmissionCap(m_Admission ? AdmissionCap : 0u) { } std::string S3AsyncStorage::CasKey(const IoHash& Hash) const { return fmt::format("{}/cas/{}", m_KeyPrefix, Hash); } std::string S3AsyncStorage::CasPath(const IoHash& Hash) const { return m_Builder.KeyToPath(CasKey(Hash)); } // Per-range writes target the prealloc'd file at-offset; BasicFile::Write is positional and concurrency-safe. struct S3AsyncStorage::GetMultipartState { GetMultipartState(ParallelWork& InWork, WorkerThreadPool& InPool, S3AsyncStorageStats InStats, uint32_t TotalBlocks) : Work(InWork) , Pool(InPool) , Stats(InStats) , Buffers(WriteBufferSize, TotalBlocks) { } static constexpr size_t WriteBufferSize = 512u * 1024u; ParallelWork& Work; WorkerThreadPool& Pool; std::shared_ptr DestFile; std::filesystem::path DestPath; std::shared_ptr Token; IoHash ContentHash; S3AsyncStorageStats Stats; uint32_t TotalRanges = 0; std::atomic PendingRanges{0}; std::atomic AnyFailed{false}; RwLock ErrorLock; std::string FirstError; WriteBufferPool Buffers; }; // Shared state for a single-stream GET (medium tier - one ranged request, // streamed body). OnData fills 512 KiB IoBuffers from a shared pool on the // io strand and dispatches each filled buffer to a worker for one positional // Write. PendingWork counts in-flight writes plus a +1 slot for the stream // itself; the side that drops it to zero (last writer or OnComplete after // the final flush) finalises the request. struct S3AsyncStorage::GetStreamState { GetStreamState(ParallelWork& InWork, WorkerThreadPool& InPool, S3AsyncStorageStats InStats, uint32_t TotalBlocks) : Work(InWork) , Pool(InPool) , Stats(InStats) , Buffers(WriteBufferSize, TotalBlocks) { } static constexpr size_t WriteBufferSize = 512u * 1024u; ParallelWork& Work; WorkerThreadPool& Pool; std::shared_ptr DestFile; std::filesystem::path DestPath; std::shared_ptr Token; IoHash ContentHash; S3AsyncStorageStats Stats; uint64_t ExpectedSize = 0; IoBuffer ActiveBuf; size_t BufFill = 0; uint64_t NextAbsOffset = 0; uint64_t TotalReceived = 0; std::atomic PendingWork{1}; // +1 for stream completion std::atomic Failed{false}; RwLock ErrorLock; std::string FirstError; WriteBufferPool Buffers; }; // Shared state for an in-flight multipart upload. PendingParts.fetch_sub(acq_rel) // in FinalizePutPart is the publication barrier for ETag writes; CompleteMultipart // (gated on Remaining == 1) sees every part's slot. ETagLock guards FirstError; // AnyFailed short-circuits the pipeline on first error. struct S3AsyncStorage::PutMultipartState { PutMultipartState(ParallelWork& InWork, WorkerThreadPool& InPool, S3AsyncStorageStats InStats) : Work(InWork) , Pool(InPool) , Stats(InStats) { } ParallelWork& Work; WorkerThreadPool& Pool; std::shared_ptr File; std::shared_ptr Token; IoHash ContentHash; S3AsyncStorageStats Stats; // Snapshot of credentials taken once at PutMultipart entry. Reused across // all per-part signing + Complete/Abort. Avoids per-part m_GetCreds() // shared-lock contention on the credential provider. SigV4Credentials Creds; std::string Key; std::string Path; std::string UploadId; uint64_t TotalSize = 0; uint64_t PartSize = 0; uint32_t TotalParts = 0; std::atomic PendingParts{0}; // Monotonic dispatch cursor. Initialized to PreAcquired (min(TotalParts, // AdmissionCap)) in PutMultipart. Each AsyncPut completion racing the // handoff path issues fetch_add(1); whichever completion observes a value // < TotalParts owns the next dispatch. std::atomic NextPartToDispatch{0}; uint32_t PreAcquired = 0; // Counts pending ReadRange calls into File. Used to release File from the // worker thread that does the last read so the close runs off the curl // io strand. std::atomic ReadsRemaining{0}; std::atomic AnyFailed{false}; RwLock ETagLock; std::vector ETags; // indexed by PartNumber-1 std::string FirstError; }; namespace { // Sets AnyFailed (CAS) and captures FirstError on first failure only. // Does NOT touch the dispatch cursor; callers that need to drain the // undispatched tail invoke ClaimUndispatchedParts and fan out skips. void RecordPutPartFailure(S3AsyncStorage::PutMultipartState& State, const std::string& Err) { // Log every failure so correlated S3 errors (e.g. AccessDenied on one // part, ServiceUnavailable on another) all reach the operator. CAS still // keeps the first message for the eventual Token->Fail. ZEN_WARN("S3AsyncStorage::PutMultipart '{}': {}", State.ContentHash, Err); bool ExpectedFalse = false; if (State.AnyFailed.compare_exchange_strong(ExpectedFalse, true)) { State.ETagLock.WithExclusiveLock([&] { State.FirstError = Err; }); } } // Claims every part index from NextPartToDispatch up to TotalParts. // Returns the count claimed; caller is responsible for firing one // FinalizePutPart per claimed index so PendingParts can reach 1 and // trip CompleteMultipart/AbortMultipart. Idempotent on repeated calls // (subsequent invocations return 0). uint32_t ClaimUndispatchedParts(S3AsyncStorage::PutMultipartState& State) { const uint32_t Claimed = State.NextPartToDispatch.exchange(State.TotalParts, std::memory_order_acq_rel); return Claimed < State.TotalParts ? State.TotalParts - Claimed : 0u; } void RecordGetPartFailure(S3AsyncStorage::GetMultipartState& State, const std::string& Err) { ZEN_WARN("S3AsyncStorage::GetMultipart '{}': {}", State.ContentHash, Err); bool ExpectedFalse = false; if (State.AnyFailed.compare_exchange_strong(ExpectedFalse, true)) { State.ErrorLock.WithExclusiveLock([&] { State.FirstError = Err; }); } } void RecordGetStreamFailure(S3AsyncStorage::GetStreamState& State, const std::string& Err) { ZEN_WARN("S3AsyncStorage::Get '{}': {}", State.ContentHash, Err); bool ExpectedFalse = false; if (State.Failed.compare_exchange_strong(ExpectedFalse, true)) { State.ErrorLock.WithExclusiveLock([&] { State.FirstError = Err; }); } } } // namespace // Three tiers selected by Size: // - Small (< kPutSmallThreshold): single PUT, body materialized into one IoBuffer; SHA in one pass. // - Medium (< MultipartThreshold): single PUT, streaming source; two-pass (hash, then upload) with bounded RAM. // - Large (>= MultipartThreshold): S3 multipart; per-part body materialized in DispatchPartUpload. namespace { constexpr uint64_t kPutSmallThreshold = 512u * 1024u; constexpr size_t kPutReadChunk = 256u * 1024u; } // namespace void S3AsyncStorage::Put(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, S3AsyncStorageStats& Stats) { const uint64_t MultipartThreshold = m_MultipartChunkSize + (m_MultipartChunkSize / 4); if (Size >= MultipartThreshold) { PutMultipart(Work, Pool, Hash, Size, SourcePath, Stats); return; } if (Size < kPutSmallThreshold) { PutSmall(Work, Pool, Hash, Size, SourcePath, Stats); return; } PutMedium(Work, Pool, Hash, Size, SourcePath, Stats); } void S3AsyncStorage::PutSmall(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, S3AsyncStorageStats& Stats) { auto Token = std::make_shared(Work.RegisterExternal()); Stats.RecordScheduled(); // Acquire one admission slot on the dispatcher; SlotRef threads through // worker lambda + AsyncPut callback so the slot is held until the network // transfer completes (or the chain is destroyed on shutdown/cancel). std::shared_ptr SlotRef = AcquireAdmissionSlot(m_Admission, &Stats); // Snapshot creds once on the dispatcher (single shared-lock acquire on the // credential provider) and capture by value into the worker. Mirrors // PutMultipart and avoids per-fanout contention on m_GetCreds. SigV4Credentials Creds = m_GetCreds(); // Capture Stats BY VALUE: S3AsyncStorageStats is a struct of references to // the long-lived PhaseStats atomics (PhaseStats outlives all in-flight // transfers via the ParallelWork latch). The Stats wrapper struct itself // is a local in the caller (S3AsyncStorageAdapter::Put) and goes out of // scope before deferred work runs, so capturing &Stats would dangle. Work.ScheduleWork( Pool, [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), Token, Stats, Creds = std::move(Creds), SlotRef = std::move(SlotRef)](std::atomic& Abort) mutable { if (Abort.load()) { Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutSmall '{}': aborted"sv, Hash))); return; } try { if (Creds.AccessKeyId.empty()) { Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutSmall '{}': no credentials available"sv, Hash))); return; } BasicFile File(SourcePath, BasicFile::Mode::kRead); if (File.FileSize() != Size) { Token->Fail(std::make_exception_ptr( zen::runtime_error("S3AsyncStorage::PutSmall '{}': source size {} differs from declared {}"sv, Hash, File.FileSize(), Size))); return; } // Heap IoBuffer of exact size; explicit chunked pread fills it. No // IoBuffer auto-materialize, no mmap. SHA computed in one pass over // the buffer once it's filled. IoBuffer Body(static_cast(Size)); uint8_t* Dst = static_cast(Body.MutableData()); uint64_t Off = 0; while (Off < Size) { const size_t Take = static_cast(std::min(kPutReadChunk, Size - Off)); File.Read(Dst + Off, Take, Off); Off += Take; } std::string PayloadHash = Sha256ToHex(ComputeSha256(Body.GetData(), Body.GetSize())); std::string Path = CasPath(Hash); HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "PUT", Path, "", PayloadHash); Stopwatch Timer = Stats.BeginRequest(); // Pair Begin with End so an alloc throw inside AsyncPut (lambda capture // before submit) does not strand InFlight elevated. auto InFlightGuard = MakeGuard([&Stats] { Stats.EndRequest(0, 0); }); m_Client.AsyncPut( Path, std::move(Body), [Hash, Token, Timer, Stats, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? static_cast(Resp.UploadedBytes) : 0); if (!Resp.IsSuccess()) { std::string Err = S3ErrorMessage("S3 PUT failed", Resp); Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutSmall '{}': {}"sv, Hash, Err))); return; } Token->Complete(); }, Headers); InFlightGuard.Dismiss(); } catch (...) { Token->Fail(std::current_exception()); } }); } void S3AsyncStorage::PutMedium(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, S3AsyncStorageStats& Stats) { auto Token = std::make_shared(Work.RegisterExternal()); Stats.RecordScheduled(); std::shared_ptr SlotRef = AcquireAdmissionSlot(m_Admission, &Stats); // Snapshot creds on the dispatcher; see PutSmall. SigV4Credentials Creds = m_GetCreds(); Work.ScheduleWork( Pool, [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), Token, Stats, Creds = std::move(Creds), SlotRef = std::move(SlotRef)](std::atomic& Abort) mutable { if (Abort.load()) { Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMedium '{}': aborted"sv, Hash))); return; } try { if (Creds.AccessKeyId.empty()) { Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMedium '{}': no credentials available"sv, Hash))); return; } // Hash pass: stream-read 256 KiB chunks into a heap scratch buffer, // feed Sha256Stream incrementally. No body materialization. The file // handle is shared with the upload pass via shared_ptr so libcurl's // READ callback can pread from the same handle on the io thread. auto File = std::make_shared(SourcePath, BasicFile::Mode::kRead); if (File->FileSize() != Size) { Token->Fail(std::make_exception_ptr( zen::runtime_error("S3AsyncStorage::PutMedium '{}': source size {} differs from declared {}"sv, Hash, File->FileSize(), Size))); return; } Sha256Stream Hasher; { std::unique_ptr Scratch(new uint8_t[kPutReadChunk]); uint64_t Off = 0; while (Off < Size) { const size_t Take = static_cast(std::min(kPutReadChunk, Size - Off)); File->Read(Scratch.get(), Take, Off); Hasher.Update(Scratch.get(), Take); Off += Take; } } std::string PayloadHash = Sha256ToHex(Hasher.Finalize()); std::string Path = CasPath(Hash); HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "PUT", Path, "", PayloadHash); Stopwatch Timer = Stats.BeginRequest(); // Pair Begin with End so an alloc throw inside AsyncPut does not // strand InFlight elevated. auto InFlightGuard = MakeGuard([&Stats] { Stats.EndRequest(0, 0); }); // Streaming source: libcurl pulls 256 KiB chunks from the file on // the io thread. Local-disk pread is fast enough for medium tier; // page cache makes the second pass over the same bytes near-free. m_Client.AsyncPut( Path, Size, [File](uint8_t* DstBuf, size_t MaxBytes, uint64_t AbsOffset) -> size_t { const uint64_t Remaining = File->FileSize() > AbsOffset ? File->FileSize() - AbsOffset : 0; const size_t Take = static_cast(std::min(MaxBytes, Remaining)); if (Take == 0) { return 0; } File->Read(DstBuf, Take, AbsOffset); return Take; }, [Hash, Token, Timer, Size, Stats, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? Size : 0); if (!Resp.IsSuccess()) { std::string Err = S3ErrorMessage("S3 PUT failed", Resp); Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMedium '{}': {}"sv, Hash, Err))); return; } Token->Complete(); }, Headers); InFlightGuard.Dismiss(); } catch (...) { Token->Fail(std::current_exception()); } }); } void S3AsyncStorage::PutMultipart(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, S3AsyncStorageStats& Stats) { auto Token = std::make_shared(Work.RegisterExternal()); Stats.RecordScheduled(); // Fires only when an exception is in flight; a missed Dismiss() on a // non-throwing early return falls through to Token's destructor safety // net rather than asserting on Token->Fail(nullptr). auto FailGuard = MakeGuard([Token] { if (auto Ex = std::current_exception()) { Token->Fail(Ex); } }); SigV4Credentials Creds = m_GetCreds(); if (Creds.AccessKeyId.empty()) { FailGuard.Dismiss(); Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': no credentials available"sv, Hash))); return; } auto File = std::make_shared(SourcePath, BasicFile::Mode::kRead); if (File->FileSize() != Size) { FailGuard.Dismiss(); Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': source size {} differs from declared {}"sv, Hash, File->FileSize(), Size))); return; } auto State = std::make_shared(Work, Pool, Stats); State->File = std::move(File); State->Token = Token; State->ContentHash = Hash; State->Creds = Creds; // snapshot reused across all parts + complete/abort State->Key = CasKey(Hash); State->Path = CasPath(Hash); State->TotalSize = Size; State->PartSize = m_MultipartChunkSize; State->TotalParts = static_cast((Size + State->PartSize - 1) / State->PartSize); State->PendingParts = State->TotalParts; State->ReadsRemaining = State->TotalParts; State->ETags.resize(State->TotalParts); // Admission gating: the wave acquires min(TotalParts, AdmissionCap) slots // inside DispatchInitialPartWave on a worker thread (off the dispatcher // and off the io strand) so a single multipart cannot starve other // dispatcher work while it drains the semaphore. Tail parts (TotalParts > // cap) are dispatched lazily by HandoffSlotToNextPart from each AsyncPut // completion, so the in-flight count per upload stays <= cap. State->PreAcquired = m_Admission ? std::min(State->TotalParts, m_AdmissionCap) : State->TotalParts; State->NextPartToDispatch.store(State->PreAcquired, std::memory_order_relaxed); std::string CanonicalQs = BuildCanonicalQueryString({{"uploads", ""}}); HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "POST", State->Path, CanonicalQs, S3EmptyPayloadHash); std::string FullPath = S3BuildRequestPath(State->Path, CanonicalQs); // CreateMultipartUpload / CompleteMultipartUpload / AbortMultipartUpload are // metadata operations - they do not move payload bytes and would distort the // per-request stats (avg/max latency) if folded into RequestCount. Counted // separately here means PhaseStats.RequestCount tracks data-bearing PUTs only, // matching the sync S3Storage path's accounting more closely. m_Client.AsyncPost( FullPath, [this, State](HttpClient::Response Resp) { if (!Resp.IsSuccess()) { std::string Err = S3ErrorMessage("S3 CreateMultipartUpload failed", Resp); State->Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': {}"sv, State->ContentHash, Err))); return; } // See PutMultipart entry: tolerate null exception_ptr on a missed Dismiss(). auto CallbackGuard = MakeGuard([State] { if (auto Ex = std::current_exception()) { State->Token->Fail(Ex); } }); std::string_view Body = Resp.AsText(); // S3 can answer 200 with an embedded body even on Create. // Mirror CompleteMultipart's parse so the original error code/message // surfaces instead of a generic "missing UploadId". std::string_view ErrorCode; std::string_view ErrorMessage; if (S3ExtractError(Body, ErrorCode, ErrorMessage)) { CallbackGuard.Dismiss(); State->Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': create returned error: {} - {}"sv, State->ContentHash, ErrorCode, ErrorMessage))); return; } std::string_view UploadId = S3ExtractXmlValue(Body, "UploadId"); if (UploadId.empty()) { CallbackGuard.Dismiss(); State->Token->Fail(std::make_exception_ptr( zen::runtime_error("S3AsyncStorage::PutMultipart '{}': missing UploadId in CreateMultipartUpload response"sv, State->ContentHash))); return; } State->UploadId = std::string(UploadId); // Hop the wave dispatch onto a worker so the per-part admission // acquire never blocks the io strand or the caller dispatcher. try { State->Work.ScheduleWork( State->Pool, [this, State](std::atomic&) { DispatchInitialPartWave(State); }, WorkerThreadPool::EMode::EnableBacklog); } catch (...) { CallbackGuard.Dismiss(); RecordPutPartFailure(*State, "S3 UploadPart wave schedule failed"); for (uint32_t J = 0; J < State->TotalParts; ++J) { FinalizePutPart(State); } return; } CallbackGuard.Dismiss(); }, Headers); FailGuard.Dismiss(); } void S3AsyncStorage::DispatchInitialPartWave(std::shared_ptr State) { const bool AdmissionEnabled = (m_Admission != nullptr); const uint32_t InitialDispatch = AdmissionEnabled ? State->PreAcquired : State->TotalParts; for (uint32_t I = 0; I < InitialDispatch; ++I) { const uint32_t PartNum = I + 1; std::shared_ptr SlotRef; try { if (AdmissionEnabled) { SlotRef = AcquireAdmissionSlot(m_Admission, &State->Stats); } State->Work.ScheduleWork( State->Pool, [this, State, PartNum, SlotRef = std::move(SlotRef)](std::atomic&) mutable { DispatchPartUpload(State, PartNum, std::move(SlotRef)); }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& Ex) { // Acquire/schedule failed mid-wave. Account for this part plus // the un-iterated wave tail, then drain the lazy tail past the // wave. Slot (if acquired) releases when SlotRef destructs. RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} schedule failed: {}", PartNum, Ex.what())); const uint32_t WaveTail = InitialDispatch - I; for (uint32_t J = 0; J < WaveTail; ++J) { FinalizePutPart(State); } DrainUndispatchedParts(State); return; } } } void S3AsyncStorage::HandoffSlotToNextPart(std::shared_ptr State, uint32_t PartIdx, std::shared_ptr SlotRef) { const uint32_t PartNum = PartIdx + 1; try { State->Work.ScheduleWork( State->Pool, [this, State, PartNum, SlotRef = std::move(SlotRef)](std::atomic&) mutable { DispatchPartUpload(State, PartNum, std::move(SlotRef)); }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& Ex) { // Handoff dispatch failed. The slot we were about to hand off // releases when SlotRef destructs at scope exit. Account for this // never-dispatched part and drain the rest of the tail. RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} handoff failed: {}", PartNum, Ex.what())); FinalizePutPart(State); DrainUndispatchedParts(State); } } void S3AsyncStorage::DrainUndispatchedParts(std::shared_ptr State) { const uint32_t Skipped = ClaimUndispatchedParts(*State); for (uint32_t J = 0; J < Skipped; ++J) { FinalizePutPart(State); } } #if ZEN_WITH_TESTS namespace s3asyncstorage_test_hooks { std::atomic g_ForceNextPartFailures{0}; void ForceNextPartFailures(uint32_t Count) { g_ForceNextPartFailures.store(Count, std::memory_order_relaxed); } } // namespace s3asyncstorage_test_hooks #endif void S3AsyncStorage::DispatchPartUpload(std::shared_ptr State, uint32_t PartNum, std::shared_ptr SlotRef) { const uint64_t Offset = static_cast(PartNum - 1) * State->PartSize; const uint64_t ChunkSize = std::min(State->PartSize, State->TotalSize - Offset); // Release File on this worker thread once all reads have been served, so // the close never runs on the curl io strand from a captured State ref in // an AsyncPut callback. auto ReleaseFileIfLast = [&State] { if (State->ReadsRemaining.fetch_sub(1, std::memory_order_acq_rel) == 1) { State->File.reset(); } }; #if ZEN_WITH_TESTS // Test hook: synthesize a part-level failure to drive the AbortMultipart // path. Decrement the counter while > 0; each consumed slot fails one part. { uint32_t Cur = s3asyncstorage_test_hooks::g_ForceNextPartFailures.load(std::memory_order_relaxed); while (Cur > 0 && !s3asyncstorage_test_hooks::g_ForceNextPartFailures.compare_exchange_weak(Cur, Cur - 1, std::memory_order_acq_rel)) { } if (Cur > 0) { ReleaseFileIfLast(); RecordPutPartFailure(*State, fmt::format("test-injected part failure (part {})", PartNum)); FinalizePutPart(State); DrainUndispatchedParts(State); return; } } #endif // Short-circuit if a foreign failure has flipped AnyFailed since this part was // scheduled (handoff race: success callback bumps the cursor to dispatch the // next part, drain runs concurrently). Skip the file read + sign + AsyncPut for // a doomed transfer; FinalizePutPart accounting is symmetric to the success path. if (State->AnyFailed.load(std::memory_order_acquire)) { ReleaseFileIfLast(); FinalizePutPart(State); return; } // Explicit chunked positional pread + Sha256Stream in one pass. One heap // IoBuffer of exact part size; no mmap, no IoBuffer auto-materialize. IoBuffer Part(static_cast(ChunkSize)); uint8_t* Dst = static_cast(Part.MutableData()); Sha256Stream Hasher; try { uint64_t Read = 0; while (Read < ChunkSize) { const size_t Take = static_cast(std::min(kPutReadChunk, ChunkSize - Read)); State->File->Read(Dst + Read, Take, Offset + Read); Hasher.Update(Dst + Read, Take); Read += Take; } ReleaseFileIfLast(); } catch (const std::exception& Ex) { ReleaseFileIfLast(); RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} read failed: {}", PartNum, Ex.what())); FinalizePutPart(State); DrainUndispatchedParts(State); return; } try { // Use the snapshot taken at PutMultipart entry; saves ~TotalParts // shared-lock acquisitions on the credential provider. const SigV4Credentials& Creds = State->Creds; std::string CanonicalQs = BuildCanonicalQueryString({ {"partNumber", fmt::format("{}", PartNum)}, {"uploadId", State->UploadId}, }); std::string PayloadHash = Sha256ToHex(Hasher.Finalize()); HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "PUT", State->Path, CanonicalQs, PayloadHash); std::string FullPath = S3BuildRequestPath(State->Path, CanonicalQs); Stopwatch Timer = State->Stats.BeginRequest(); // Pair Begin with End so an alloc throw inside AsyncPut does not strand // InFlight elevated. auto InFlightGuard = MakeGuard([&State] { State->Stats.EndRequest(0, 0); }); m_Client.AsyncPut( FullPath, std::move(Part), [this, State, PartNum, Timer, ChunkSize, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { State->Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? ChunkSize : 0); if (!Resp.IsSuccess()) { RecordPutPartFailure(*State, S3ErrorMessage(fmt::format("S3 UploadPart {} failed", PartNum), Resp)); DrainUndispatchedParts(State); FinalizePutPart(State); return; } std::string_view ETag = Resp.FindHeader("etag"); if (ETag.empty()) { RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} response missing ETag header", PartNum)); DrainUndispatchedParts(State); FinalizePutPart(State); return; } // Per-slot writes don't race - each part owns its slot. // PendingParts.fetch_sub(acq_rel) in FinalizePutPart is the // publication barrier: CompleteMultipart (gated on Remaining // == 1) sees every ETag write. New readers must go through // the same barrier or take ETagLock. State->ETags[PartNum - 1].assign(ETag); // Try to hand the slot off to the next undispatched part. Skip // the handoff if a prior failure has flipped AnyFailed (cursor // was already exchanged to TotalParts by the drain there). if (!State->AnyFailed.load(std::memory_order_acquire)) { const uint32_t NextIdx = State->NextPartToDispatch.fetch_add(1, std::memory_order_acq_rel); if (NextIdx < State->TotalParts) { HandoffSlotToNextPart(State, NextIdx, std::move(SlotRef)); } } // Account for THIS part. SlotRef may already be moved-from // (if handed off); if not, releases when this lambda destructs. FinalizePutPart(State); }, Headers); InFlightGuard.Dismiss(); } catch (const std::exception& Ex) { RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} dispatch failed: {}", PartNum, Ex.what())); FinalizePutPart(State); DrainUndispatchedParts(State); } } void S3AsyncStorage::FinalizePutPart(std::shared_ptr State) { const uint32_t Remaining = State->PendingParts.fetch_sub(1, std::memory_order_acq_rel); if (Remaining != 1) { return; } // Hop the Complete/Abort dispatch off the curl io strand. SHA over the // parts-XML and SignRequest are CPU work; AsyncPost itself queues a strand // wakeup. Running both inline here would pin the strand thread. const bool Failed = State->AnyFailed.load(std::memory_order_acquire); try { State->Work.ScheduleWork(State->Pool, [this, State, Failed](std::atomic&) { if (Failed) { AbortMultipart(State); } else { CompleteMultipart(State); } }); } catch (...) { // ScheduleWork failed before Complete/Abort could run; surface the leak via Token->Fail. State->Token->Fail(std::current_exception()); } } void S3AsyncStorage::CompleteMultipart(std::shared_ptr State) { // Reuse the snapshot taken at PutMultipart entry. The upload-id was issued // against these creds; refreshing here would only matter if creds expired // mid-upload, in which case all the part uploads would already have failed. const SigV4Credentials& Creds = State->Creds; ExtendableStringBuilder<1024> Xml; Xml.Append(""); for (uint32_t I = 0; I < State->TotalParts; ++I) { Xml.Append(fmt::format("{}{}", I + 1, State->ETags[I])); } Xml.Append(""); std::string_view XmlView = Xml.ToView(); std::string CanonicalQs = BuildCanonicalQueryString({{"uploadId", State->UploadId}}); std::string PayloadHash = Sha256ToHex(ComputeSha256(XmlView)); HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "POST", State->Path, CanonicalQs, PayloadHash); std::string FullPath = S3BuildRequestPath(State->Path, CanonicalQs); IoBuffer Payload(IoBuffer::Clone, XmlView.data(), XmlView.size()); // Metadata op; not counted in PhaseStats (see CreateMultipartUpload comment). m_Client.AsyncPost( FullPath, Payload, ZenContentType::kXML, [State](HttpClient::Response Resp) { if (!Resp.IsSuccess()) { std::string Err = S3ErrorMessage("S3 CompleteMultipartUpload failed", Resp); State->Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': {}"sv, State->ContentHash, Err))); return; } std::string_view Body = Resp.AsText(); std::string_view ErrorCode; std::string_view ErrorMessage; if (S3ExtractError(Body, ErrorCode, ErrorMessage)) { State->Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': complete returned error: {} - {}"sv, State->ContentHash, ErrorCode, ErrorMessage))); return; } State->Token->Complete(); }, Headers); } void S3AsyncStorage::AbortMultipart(std::shared_ptr State) { // Reuse the snapshot taken at PutMultipart entry. const SigV4Credentials& Creds = State->Creds; std::string CanonicalQs = BuildCanonicalQueryString({{"uploadId", State->UploadId}}); HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "DELETE", State->Path, CanonicalQs, S3EmptyPayloadHash); std::string FullPath = S3BuildRequestPath(State->Path, CanonicalQs); // Metadata op; not counted in PhaseStats (see CreateMultipartUpload comment). m_Client.AsyncDelete( FullPath, [State](HttpClient::Response Resp) { std::string FirstError; State->ETagLock.WithExclusiveLock([&] { FirstError = std::move(State->FirstError); }); if (!Resp.IsSuccess()) { State->Token->Fail(std::make_exception_ptr( zen::runtime_error("S3AsyncStorage::PutMultipart '{}': part failed ({}); abort also failed: {}"sv, State->ContentHash, FirstError, S3ErrorMessage("S3 AbortMultipartUpload failed", Resp)))); return; } State->Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': {}"sv, State->ContentHash, FirstError))); }, Headers); } void S3AsyncStorage::Get(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, S3AsyncStorageStats& Stats) { // Three tiers: in-memory AsyncGet (< 512 KiB), AsyncStream into pooled // 512 KiB write buffers (medium), GetMultipart (>= MultiRangeThreshold). constexpr uint64_t StreamingThreshold = 512u * 1024u; const uint64_t MultiRangeThreshold = m_MultipartChunkSize + (m_MultipartChunkSize / 4); if (Size >= MultiRangeThreshold) { GetMultipart(Work, Pool, Hash, Size, DestinationPath, Stats); return; } auto Token = std::make_shared(Work.RegisterExternal()); Stats.RecordScheduled(); // See PutMultipart: tolerate null exception_ptr from a refactor that // adds a non-throwing early return without Dismiss(). auto FailGuard = MakeGuard([Token] { if (auto Ex = std::current_exception()) { Token->Fail(Ex); } }); std::shared_ptr SlotRef = AcquireAdmissionSlot(m_Admission, &Stats); SigV4Credentials Creds = m_GetCreds(); if (Creds.AccessKeyId.empty()) { FailGuard.Dismiss(); Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': no credentials available"sv, Hash))); return; } std::string Path = CasPath(Hash); HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "GET", Path, "", S3EmptyPayloadHash); Stopwatch Timer = Stats.BeginRequest(); // Pair Begin with End so an alloc throw inside AsyncGet/AsyncStream does not // strand InFlight elevated. auto InFlightGuard = MakeGuard([&Stats] { Stats.EndRequest(0, 0); }); if (Size < StreamingThreshold) { // Small: in-memory AsyncGet, single worker-hop write at completion. m_Client.AsyncGet( Path, [Hash = IoHash(Hash), Token, Timer, Stats, DestPath = DestinationPath, Size, &Work, &Pool, SlotRef = std::move(SlotRef)]( HttpClient::Response Resp) mutable { Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? Resp.ResponsePayload.GetSize() : 0); // No remove() on the failure / size-mismatch paths: the destination // file is only created by the worker hop below on success, so // nothing exists to delete here. if (!Resp.IsSuccess()) { std::string Err = S3ErrorMessage("S3 GET failed", Resp); Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': {}"sv, Hash, Err))); return; } if (Resp.ResponsePayload.GetSize() != Size) { Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': received {} bytes, expected {}"sv, Hash, Resp.ResponsePayload.GetSize(), Size))); return; } try { Work.ScheduleWork( Pool, [Token, DestPath, Hash, Payload = std::move(Resp.ResponsePayload)](std::atomic&) mutable { try { BasicFile Out(DestPath, BasicFile::Mode::kTruncate); Out.Write(Payload.GetData(), Payload.GetSize(), 0); Token->Complete(); } catch (const std::exception& Ex) { std::error_code Ec; std::filesystem::remove(DestPath, Ec); Token->Fail(std::make_exception_ptr( zen::runtime_error("S3AsyncStorage::Get '{}': write failed: {}"sv, Hash, Ex.what()))); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& Ex) { Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': schedule failed: {}"sv, Hash, Ex.what()))); } }, Headers); InFlightGuard.Dismiss(); FailGuard.Dismiss(); return; } // Medium tier: AsyncStream + GetStreamState (see struct doc). const uint32_t StreamTotalBlocks = static_cast((Size + GetStreamState::WriteBufferSize - 1) / GetStreamState::WriteBufferSize); auto State = std::make_shared(Work, Pool, Stats, StreamTotalBlocks); State->Token = Token; State->ContentHash = Hash; State->DestPath = DestinationPath; State->ExpectedSize = Size; try { State->DestFile = std::make_shared(DestinationPath, BasicFile::Mode::kTruncate); std::error_code PrepareEc = PrepareFileForScatteredWrite(State->DestFile->Handle(), Size); if (PrepareEc) { throw zen::runtime_error("PrepareFileForScatteredWrite failed: {}"sv, PrepareEc.message()); } } catch (const std::exception& Ex) { FailGuard.Dismiss(); // Drop DestFile (if opened) before remove() so Windows lets the unlink // proceed; otherwise we leave a 0-byte file behind on the prealloc path. State->DestFile.reset(); std::error_code Ec; std::filesystem::remove(DestinationPath, Ec); Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': preallocate failed: {}"sv, Hash, Ex.what()))); return; } m_Client.AsyncStream( Path, [this, State](const uint8_t* Data, size_t SizeBytes, uint64_t /*TotalSize*/) -> bool { constexpr size_t WriteBufferSize = GetStreamState::WriteBufferSize; if (State->TotalReceived + SizeBytes > State->ExpectedSize) { RecordGetStreamFailure( *State, fmt::format("S3 GET overflow: got {} bytes past expected {}", State->TotalReceived + SizeBytes, State->ExpectedSize)); return false; } const uint8_t* Cur = Data; size_t Remaining = SizeBytes; while (Remaining > 0) { if (!State->ActiveBuf) { State->ActiveBuf = State->Buffers.Acquire(); } const size_t Avail = WriteBufferSize - State->BufFill; const size_t Take = std::min(Remaining, Avail); std::memcpy(State->ActiveBuf.MutableData() + State->BufFill, Cur, Take); State->BufFill += Take; State->TotalReceived += Take; Cur += Take; Remaining -= Take; if (State->BufFill == WriteBufferSize) { IoBuffer Buf = std::move(State->ActiveBuf); const size_t Fill = State->BufFill; const uint64_t AbsOffset = State->NextAbsOffset; State->NextAbsOffset += Fill; State->BufFill = 0; State->PendingWork.fetch_add(1, std::memory_order_acq_rel); try { State->Work.ScheduleWork( State->Pool, [this, State, Buf = std::move(Buf), Fill, AbsOffset](std::atomic&) mutable { try { State->DestFile->Write(Buf.GetData(), Fill, AbsOffset); } catch (const std::exception& Ex) { RecordGetStreamFailure(*State, fmt::format("S3 GET write failed: {}", Ex.what())); } State->Buffers.Release(std::move(Buf)); if (State->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1) { OnGetStreamFinalised(State); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& Ex) { // ScheduleWork failed before the worker took ownership of the // PendingWork increment we did above; balance it here and // abort the transfer so OnComplete tears down cleanly. Buf is // gone (moved into the lambda then destroyed when ScheduleWork // threw); restore the pool's acquire slot so OnComplete's // tail-flush can still Acquire if needed. RecordGetStreamFailure(*State, fmt::format("S3 GET schedule failed: {}", Ex.what())); State->Buffers.RestoreAcquireSlot(); State->PendingWork.fetch_sub(1, std::memory_order_acq_rel); return false; } } } return true; }, [this, State, Timer, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { State->Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? State->TotalReceived : 0); bool Failed = !Resp.IsSuccess(); if (Failed) { RecordGetStreamFailure(*State, S3ErrorMessage("S3 GET failed", Resp)); } else if (State->TotalReceived != State->ExpectedSize) { RecordGetStreamFailure(*State, fmt::format("S3 GET wrote {} bytes, expected {}", State->TotalReceived, State->ExpectedSize)); Failed = true; } if (!Failed && State->BufFill > 0) { IoBuffer Buf = std::move(State->ActiveBuf); const size_t Fill = State->BufFill; const uint64_t AbsOffset = State->NextAbsOffset; State->BufFill = 0; State->PendingWork.fetch_add(1, std::memory_order_acq_rel); try { State->Work.ScheduleWork( State->Pool, [this, State, Buf = std::move(Buf), Fill, AbsOffset](std::atomic&) mutable { try { State->DestFile->Write(Buf.GetData(), Fill, AbsOffset); } catch (const std::exception& Ex) { RecordGetStreamFailure(*State, fmt::format("S3 GET write failed: {}", Ex.what())); } State->Buffers.Release(std::move(Buf)); if (State->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1) { OnGetStreamFinalised(State); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& Ex) { RecordGetStreamFailure(*State, fmt::format("S3 GET tail-flush schedule failed: {}", Ex.what())); State->Buffers.RestoreAcquireSlot(); State->PendingWork.fetch_sub(1, std::memory_order_acq_rel); } } // PendingWork = 1 (stream slot, set in ctor) + N (per-buffer worker // dispatched in OnData) + (optional 1 for the tail-flush dispatched // just above). The fetch_sub returning 1 means we observed the value // that was 1 right before our decrement - i.e. we are the last // participant. Either this branch (no tail flush, no in-flight // workers) or the last per-buffer worker performs the finalise. Both // paths use acq_rel so the work-item writes are visible to whichever // thread runs OnGetStreamFinalised. if (State->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1) { OnGetStreamFinalised(State); } }, Headers); InFlightGuard.Dismiss(); FailGuard.Dismiss(); } void S3AsyncStorage::OnGetStreamFinalised(std::shared_ptr State) { // All disk I/O (close + unlink on failure) is hopped into the worker pool. // The last PendingWork dec can land on the curl io strand, and inline // disk syscalls there would block the curl_multi poll loop. const bool Failed = State->Failed.load(std::memory_order_acquire); try { State->Work.ScheduleWork( State->Pool, [State, Failed](std::atomic&) { if (Failed) { std::string Err; State->ErrorLock.WithExclusiveLock([&] { Err = std::move(State->FirstError); }); State->DestFile.reset(); std::error_code Ec; std::filesystem::remove(State->DestPath, Ec); State->Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': {}"sv, State->ContentHash, Err))); return; } // No Flush: consumer reads via page cache; crash recovery re-hydrates. State->DestFile.reset(); State->Token->Complete(); }, WorkerThreadPool::EMode::EnableBacklog); } catch (...) { // Schedule failed; release file handle inline and surface via Token. State->DestFile.reset(); std::error_code Ec; std::filesystem::remove(State->DestPath, Ec); State->Token->Fail(std::current_exception()); } } void S3AsyncStorage::GetMultipart(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, S3AsyncStorageStats& Stats) { auto Token = std::make_shared(Work.RegisterExternal()); Stats.RecordScheduled(); const uint64_t Chunk = m_MultipartChunkSize; const uint32_t RangeCount = static_cast((Size + Chunk - 1) / Chunk); uint32_t TotalBlocks = 0; for (uint32_t I = 0; I < RangeCount; ++I) { const uint64_t RS = std::min(Chunk, Size - static_cast(I) * Chunk); TotalBlocks += static_cast((RS + GetMultipartState::WriteBufferSize - 1) / GetMultipartState::WriteBufferSize); } auto State = std::make_shared(Work, Pool, Stats, TotalBlocks); State->Token = Token; State->ContentHash = Hash; State->DestPath = DestinationPath; State->TotalRanges = RangeCount; State->PendingRanges = RangeCount; // RangeCount > AdmissionCap is fine: the per-range acquire below blocks the // dispatcher (caller thread, NOT the io strand) until in-flight ranges fire // their AsyncStream completions and release slots. In-flight ranges per // upload stay bounded by AdmissionCap. try { // Sparse + preallocated. Sparse mode lets per-range writes land at // arbitrary offsets without the OS zero-filling intervening pages // first; preallocating the full size up front avoids fragmentation // and lazy-commit page faults during the writes themselves. State->DestFile = std::make_shared(DestinationPath, BasicFile::Mode::kTruncate); std::error_code PrepareEc = PrepareFileForScatteredWrite(State->DestFile->Handle(), Size); if (PrepareEc) { throw zen::runtime_error("PrepareFileForScatteredWrite failed: {}"sv, PrepareEc.message()); } } catch (const std::exception& Ex) { // Drop DestFile (if opened) before remove() so Windows lets the unlink // proceed; otherwise we leave a 0-byte file behind on the prealloc path. State->DestFile.reset(); std::error_code Ec; std::filesystem::remove(DestinationPath, Ec); Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::GetMultipart '{}': preallocate failed: {}"sv, Hash, Ex.what()))); return; } std::string Path = CasPath(Hash); // Snapshot creds once for the whole multipart Get. Mirrors PutMultipart; // avoids N shared-lock acquisitions on the credential provider when the // range count is large. const SigV4Credentials Creds = m_GetCreds(); if (Creds.AccessKeyId.empty()) { // Empty creds are loop-invariant; drive PendingRanges to zero in one shot // rather than acquiring TotalRanges admission slots only to fire identical // failures. RecordGetPartFailure CAS keeps the first message; subsequent // completions just decrement the counter. RecordGetPartFailure(*State, "no credentials available"); for (uint32_t I = 0; I < State->TotalRanges; ++I) { OnGetPartCompleted(State); } return; } for (uint32_t I = 0; I < State->TotalRanges; ++I) { const uint64_t Offset = static_cast(I) * Chunk; const uint64_t RangeSize = std::min(Chunk, Size - Offset); const uint32_t RangeIdx = I; try { // Per-range admission slot. Acquire blocks the caller thread when the // cap is reached; in-flight ranges release slots via SlotRef destruct // in their AsyncStream completion callback. Acquire on the caller // (vs the io strand) is safe to block: GetMultipart is called from // the provision-pool worker driving Storage::Get, never from the // curl io thread that has to drain completions. std::shared_ptr SlotRef = AcquireAdmissionSlot(m_Admission, &Stats); HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "GET", Path, "", S3EmptyPayloadHash); Headers->emplace("Range", fmt::format("bytes={}-{}", Offset, Offset + RangeSize - 1)); Stopwatch Timer = State->Stats.BeginRequest(); // Pair Begin with End so an alloc throw inside AsyncStream does not // strand InFlight elevated. auto InFlightGuard = MakeGuard([&State] { State->Stats.EndRequest(0, 0); }); // Per-range mirror of GetStreamState's buffered-write machinery (see // the medium-tier branch in Get()). constexpr size_t WriteBufferSize = GetMultipartState::WriteBufferSize; struct RangeWriteState { IoBuffer ActiveBuf; size_t BufFill = 0; uint64_t NextAbsOffset; uint64_t TotalReceived = 0; std::atomic PendingWork{1}; // +1 for stream completion }; auto WState = std::make_shared(); WState->NextAbsOffset = Offset; m_Client.AsyncStream( Path, [this, State, RangeSize, RangeIdx, WState](const uint8_t* Data, size_t SizeBytes, uint64_t /*TotalSize*/) -> bool { if (WState->TotalReceived + SizeBytes > RangeSize) { RecordGetPartFailure(*State, fmt::format("S3 GET range {} overflow: got {} bytes past expected {}", RangeIdx, WState->TotalReceived + SizeBytes, RangeSize)); return false; } const uint8_t* Cur = Data; size_t Remaining = SizeBytes; while (Remaining > 0) { if (!WState->ActiveBuf) { WState->ActiveBuf = State->Buffers.Acquire(); } const size_t Avail = WriteBufferSize - WState->BufFill; const size_t Take = std::min(Remaining, Avail); std::memcpy(WState->ActiveBuf.MutableData() + WState->BufFill, Cur, Take); WState->BufFill += Take; WState->TotalReceived += Take; Cur += Take; Remaining -= Take; if (WState->BufFill == WriteBufferSize) { IoBuffer Buf = std::move(WState->ActiveBuf); const size_t Fill = WState->BufFill; const uint64_t AbsOffset = WState->NextAbsOffset; WState->NextAbsOffset += Fill; WState->BufFill = 0; WState->PendingWork.fetch_add(1, std::memory_order_acq_rel); try { State->Work.ScheduleWork( State->Pool, [this, State, RangeIdx, WState, Buf = std::move(Buf), Fill, AbsOffset](std::atomic&) mutable { try { State->DestFile->Write(Buf.GetData(), Fill, AbsOffset); } catch (const std::exception& Ex) { RecordGetPartFailure(*State, fmt::format("S3 GET range {} write failed: {}", RangeIdx, Ex.what())); } State->Buffers.Release(std::move(Buf)); if (WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1) { OnGetPartCompleted(State); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& Ex) { RecordGetPartFailure(*State, fmt::format("S3 GET range {} schedule failed: {}", RangeIdx, Ex.what())); State->Buffers.RestoreAcquireSlot(); WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel); return false; } } } return true; }, [this, State, RangeSize, RangeIdx, WState, Timer, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { State->Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? WState->TotalReceived : 0); bool Failed = !Resp.IsSuccess(); if (Failed) { RecordGetPartFailure(*State, S3ErrorMessage(fmt::format("S3 GET range {} failed", RangeIdx), Resp)); } else if (WState->TotalReceived != RangeSize) { RecordGetPartFailure( *State, fmt::format("S3 GET range {} wrote {} bytes, expected {}", RangeIdx, WState->TotalReceived, RangeSize)); Failed = true; } if (!Failed && WState->BufFill > 0) { IoBuffer Buf = std::move(WState->ActiveBuf); const size_t Fill = WState->BufFill; const uint64_t AbsOffset = WState->NextAbsOffset; WState->BufFill = 0; WState->PendingWork.fetch_add(1, std::memory_order_acq_rel); try { State->Work.ScheduleWork( State->Pool, [this, State, RangeIdx, WState, Buf = std::move(Buf), Fill, AbsOffset](std::atomic&) mutable { try { State->DestFile->Write(Buf.GetData(), Fill, AbsOffset); } catch (const std::exception& Ex) { RecordGetPartFailure(*State, fmt::format("S3 GET range {} write failed: {}", RangeIdx, Ex.what())); } State->Buffers.Release(std::move(Buf)); if (WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1) { OnGetPartCompleted(State); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const std::exception& Ex) { RecordGetPartFailure(*State, fmt::format("S3 GET range {} tail-flush schedule failed: {}", RangeIdx, Ex.what())); State->Buffers.RestoreAcquireSlot(); WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel); } } if (WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1) { OnGetPartCompleted(State); } }, Headers); InFlightGuard.Dismiss(); } catch (const std::exception& Ex) { RecordGetPartFailure(*State, fmt::format("S3 GET range {} dispatch failed: {}", RangeIdx, Ex.what())); OnGetPartCompleted(State); } } } void S3AsyncStorage::OnGetPartCompleted(std::shared_ptr State) { const uint32_t Remaining = State->PendingRanges.fetch_sub(1, std::memory_order_acq_rel); if (Remaining != 1) { return; } // All disk I/O hopped to the worker pool: see note in // OnGetStreamFinalised. const bool Failed = State->AnyFailed.load(std::memory_order_acquire); try { State->Work.ScheduleWork( State->Pool, [State, Failed](std::atomic&) { if (Failed) { std::string FirstError; State->ErrorLock.WithExclusiveLock([&] { FirstError = std::move(State->FirstError); }); State->DestFile.reset(); std::error_code Ec; std::filesystem::remove(State->DestPath, Ec); State->Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': {}"sv, State->ContentHash, FirstError))); return; } State->DestFile.reset(); State->Token->Complete(); }, WorkerThreadPool::EMode::EnableBacklog); } catch (...) { State->DestFile.reset(); std::error_code Ec; std::filesystem::remove(State->DestPath, Ec); State->Token->Fail(std::current_exception()); } } void S3AsyncStorage::Touch(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, S3AsyncStorageStats& Stats) { auto Token = std::make_shared(Work.RegisterExternal()); Stats.RecordScheduled(); std::shared_ptr SlotRef = AcquireAdmissionSlot(m_Admission, &Stats); // Snapshot creds on the dispatcher; see PutSmall. SigV4Credentials Creds = m_GetCreds(); Work.ScheduleWork( Pool, [this, Hash = IoHash(Hash), Token, Stats, Creds = std::move(Creds), SlotRef = std::move(SlotRef)]( std::atomic& Abort) mutable { if (Abort.load()) { Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Touch '{}': aborted"sv, Hash))); return; } try { if (Creds.AccessKeyId.empty()) { Token->Fail( std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Touch '{}': no credentials available"sv, Hash))); return; } std::string Key = CasKey(Hash); std::string Path = CasPath(Hash); std::vector> ExtraSigned{ {"x-amz-copy-source", fmt::format("/{}/{}", m_Builder.BucketName(), AwsUriEncode(Key, false))}, {"x-amz-metadata-directive", "REPLACE"}, }; HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "PUT", Path, "", S3EmptyPayloadHash, ExtraSigned); Stopwatch Timer = Stats.BeginRequest(); // Pair Begin with End so an alloc throw inside AsyncPut does not strand // InFlight elevated. auto InFlightGuard = MakeGuard([&Stats] { Stats.EndRequest(0, 0); }); m_Client.AsyncPut( Path, [Hash, Token, Timer, Stats, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { Stats.EndRequest(Timer.GetElapsedTimeUs(), 0); if (!Resp.IsSuccess()) { std::string Err = S3ErrorMessage("S3 Touch failed", Resp); Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Touch '{}': {}"sv, Hash, Err))); return; } // PUT-COPY (REPLACE) can return HTTP 200 with an body. Mirror // the sync S3Client::Touch check; without this the dehydrate-touch // fails silently. std::string_view Body = Resp.AsText(); std::string_view ErrorCode; std::string_view ErrorMessage; if (S3ExtractError(Body, ErrorCode, ErrorMessage)) { Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Touch '{}': returned error: {} - {}"sv, Hash, ErrorCode, ErrorMessage))); return; } Token->Complete(); }, Headers); InFlightGuard.Dismiss(); } catch (...) { Token->Fail(std::current_exception()); } }); } std::vector S3AsyncStorage::ListAllObjects(std::string_view Prefix) { constexpr std::string_view ContentsCloseTag = ""; // List requests are not counted in PhaseStats; List/DeleteAll is admin // scaffolding (test fixture teardown, manual obliterate). Mirrors the // CreateMultipart/Complete/Abort exclusion in PutMultipart. // Snapshot creds once for the listing run; ListObjectsV2 pagination is // fast enough that mid-list refresh isn't needed. const SigV4Credentials Creds = m_GetCreds(); if (Creds.AccessKeyId.empty()) { throw zen::runtime_error("S3AsyncStorage::ListAllObjects: no credentials available"sv); } std::vector Keys; std::string ContinuationToken; std::string RootPath = m_Builder.BucketRootPath(); while (true) { std::string CanonicalQs = fmt::format("list-type=2&prefix={}", AwsUriEncode(Prefix)); if (!ContinuationToken.empty()) { CanonicalQs += fmt::format("&continuation-token={}", AwsUriEncode(ContinuationToken)); } HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "GET", RootPath, CanonicalQs, S3EmptyPayloadHash); std::string FullPath = S3BuildRequestPath(RootPath, CanonicalQs); HttpClient::Response Resp = m_Client.Get(FullPath, Headers).get(); if (!Resp.IsSuccess()) { throw zen::runtime_error("S3AsyncStorage::ListAllObjects '{}': {}"sv, Prefix, S3ErrorMessage("S3 LIST failed", Resp)); } if (!Resp.ResponsePayload) { break; } std::string_view Body(reinterpret_cast(Resp.ResponsePayload.GetData()), Resp.ResponsePayload.GetSize()); std::string_view Cursor = Body; while (true) { size_t ContentsOpen = Cursor.find(""); if (ContentsOpen == std::string_view::npos) { break; } size_t ContentsClose = Cursor.find(ContentsCloseTag, ContentsOpen); if (ContentsClose == std::string_view::npos) { break; } std::string_view ContentsBlock = Cursor.substr(ContentsOpen, ContentsClose - ContentsOpen); std::string_view Key = S3ExtractXmlValue(ContentsBlock, "Key"); Keys.emplace_back(Key); Cursor = Cursor.substr(ContentsClose + ContentsCloseTag.size()); } std::string_view IsTruncated = S3ExtractXmlValue(Body, "IsTruncated"); if (IsTruncated != "true") { break; } std::string_view NextToken = S3ExtractXmlValue(Body, "NextContinuationToken"); if (NextToken.empty()) { break; } ContinuationToken.assign(NextToken); } return Keys; } std::vector S3AsyncStorage::List() { std::string CasPrefix = fmt::format("{}/cas/", m_KeyPrefix); std::vector Keys = ListAllObjects(CasPrefix); std::vector Hashes; Hashes.reserve(Keys.size()); for (const std::string& Key : Keys) { size_t LastSlash = Key.rfind('/'); if (LastSlash == std::string::npos) { continue; } IoHash Hash; if (IoHash::TryParse(std::string_view(Key).substr(LastSlash + 1), Hash)) { Hashes.push_back(Hash); } } return Hashes; } void S3AsyncStorage::DeleteAll(ParallelWork& Work) { std::string Prefix = fmt::format("{}/", m_KeyPrefix); std::vector Keys = ListAllObjects(Prefix); // Snapshot creds once for the whole obliterate; saves N shared-lock // acquisitions on the credential provider. const SigV4Credentials DelCreds = m_GetCreds(); if (DelCreds.AccessKeyId.empty()) { auto Token = std::make_shared(Work.RegisterExternal()); Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::DeleteAll: no credentials available"sv))); return; } for (const std::string& Key : Keys) { auto Token = std::make_shared(Work.RegisterExternal()); try { std::string Path = m_Builder.KeyToPath(Key); HttpClient::KeyValueMap DelHeaders = m_Builder.SignRequest(DelCreds, "DELETE", Path, "", S3EmptyPayloadHash); // DeleteAll is untracked by stats - admission slot is acquired but the // wait is not recorded; passes Stats == nullptr to the helper. std::shared_ptr SlotRef = AcquireAdmissionSlot(m_Admission); m_Client.AsyncDelete( Path, [KeyCopy = Key, Token, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { if (!Resp.IsSuccess() && Resp.StatusCode != HttpResponseCode::NotFound) { std::string Err = S3ErrorMessage("S3 DELETE failed", Resp); Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::DeleteAll '{}': {}"sv, KeyCopy, Err))); return; } Token->Complete(); }, DelHeaders); } catch (...) { // Sign / acquire / dispatch threw before AsyncDelete posted. Surface via Token so // Wait() reports the partial failure instead of silently dropping the key. Token->Fail(std::current_exception()); } } } void s3asyncstorage_forcelink() { } #if ZEN_WITH_TESTS namespace { // Per-binary unique MinIO port. uint16_t AllocateMinioTestPort() { static const uint16_t Base = static_cast(20000u + (static_cast(GetCurrentProcessId()) % 30000u)); static std::atomic Slot{0}; return Base + Slot.fetch_add(1, std::memory_order_relaxed); } MinioProcessOptions MakeMinioOpts() { MinioProcessOptions Opts; Opts.Port = AllocateMinioTestPort(); return Opts; } struct AsyncS3Fixture { MinioProcess Minio{MakeMinioOpts()}; ScopedTemporaryDirectory TmpDir; std::unique_ptr ClientStorage; std::unique_ptr Builder; AsyncHttpClient* Client = nullptr; SigV4Credentials Creds; WorkerThreadPool Pool{4}; AsyncS3Fixture() { Minio.SpawnMinioServer(); Minio.CreateBucket("async-test"); Creds.AccessKeyId = "minioadmin"; Creds.SecretAccessKey = "minioadmin"; Builder = std::make_unique("us-east-1", "async-test", Minio.Endpoint(), /*PathStyle*/ true); HttpClientSettings Settings; Settings.LogCategory = "async-s3-test"; Settings.MaxConcurrentConnectionsPerHost = 16; // Match production S3Hydration: cover MinIO post-spawn 503 // (XMinioServerNotInitialized) transients before storage backend // finishes warming up after the spawn ready-check passes. Settings.RetryCount = 3; ClientStorage = std::make_unique(Minio.Endpoint(), Settings); Client = ClientStorage.get(); } }; std::filesystem::path WriteBlob(const std::filesystem::path& Path, const std::vector& Bytes) { WriteFile(Path, IoBuffer(IoBuffer::Wrap, Bytes.data(), Bytes.size())); return Path; } struct StatsBlock { std::atomic RequestCount{0}; std::atomic RequestTotalUs{0}; std::atomic RequestMaxUs{0}; std::atomic Bytes{0}; std::atomic InFlight{0}; std::atomic InFlightPeak{0}; std::atomic FirstScheduleUs{UINT64_MAX}; std::atomic FirstStartUs{UINT64_MAX}; std::atomic AdmissionWaitTotalUs{0}; std::atomic AdmissionWaitMaxUs{0}; Stopwatch PhaseClock; S3AsyncStorageStats Ref() { return S3AsyncStorageStats{RequestCount, RequestTotalUs, RequestMaxUs, Bytes, InFlight, InFlightPeak, FirstScheduleUs, FirstStartUs, AdmissionWaitTotalUs, AdmissionWaitMaxUs, PhaseClock}; } }; } // namespace TEST_SUITE_BEGIN("server.s3asyncstorage"); TEST_CASE("s3asyncstorage.put_get_round_trip") { StatsBlock Sb; AsyncS3Fixture F; S3AsyncStorage Storage( *F.Client, *F.Builder, [&F]() { return F.Creds; }, "module-A", 8u * 1024u * 1024u); const uint64_t Size = 64u * 1024u; std::vector Bytes(Size); for (size_t I = 0; I < Size; ++I) { Bytes[I] = static_cast((I * 31u) & 0xFF); } std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "src.bin", Bytes); IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Put(Work, F.Pool, ContentHash, Size, SrcPath, Stats); Work.Wait(); } std::filesystem::path DstPath = F.TmpDir.Path() / "dst.bin"; { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Get(Work, F.Pool, ContentHash, Size, DstPath, Stats); Work.Wait(); } REQUIRE(std::filesystem::exists(DstPath)); REQUIRE(std::filesystem::file_size(DstPath) == Size); BasicFile Out(DstPath, BasicFile::Mode::kRead); IoBuffer Read = Out.ReadAll(); IoHash ReadHash = IoHash::HashBuffer(Read); CHECK(ReadHash == ContentHash); } TEST_CASE("s3asyncstorage.touch_existing_object") { StatsBlock Sb; AsyncS3Fixture F; S3AsyncStorage Storage( *F.Client, *F.Builder, [&F]() { return F.Creds; }, "module-touch", 8u * 1024u * 1024u); std::vector Bytes{1, 2, 3, 4, 5}; std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "touch.bin", Bytes); IoHash Hash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Put(Work, F.Pool, Hash, Bytes.size(), SrcPath, Stats); Work.Wait(); } { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Touch(Work, F.Pool, Hash, Stats); Work.Wait(); } } TEST_CASE("s3asyncstorage.list_returns_uploaded_hashes") { StatsBlock Sb; AsyncS3Fixture F; S3AsyncStorage Storage( *F.Client, *F.Builder, [&F]() { return F.Creds; }, "module-list", 8u * 1024u * 1024u); const size_t N = 5; std::vector Hashes; std::vector SrcPaths; for (size_t I = 0; I < N; ++I) { std::vector Bytes(64, static_cast(I + 1)); std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("blob_{}.bin", I), Bytes); Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size())); SrcPaths.push_back(P); } { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); for (size_t I = 0; I < N; ++I) { Storage.Put(Work, F.Pool, Hashes[I], 64, SrcPaths[I], Stats); } Work.Wait(); } std::vector Listed = Storage.List(); std::sort(Listed.begin(), Listed.end()); std::vector Expected = Hashes; std::sort(Expected.begin(), Expected.end()); CHECK(Listed == Expected); } TEST_CASE("s3asyncstorage.streaming_download_round_trip") { // Object size sits between the streaming threshold (512 KiB) and the // multipart threshold (PartSize + PartSize/4). Exercises the medium-tier // AsyncStream branch in S3AsyncStorage::Get - body streams via a 512 KiB // IoBuffer pool with per-buffer worker write hops. StatsBlock Sb; AsyncS3Fixture F; S3AsyncStorage Storage( *F.Client, *F.Builder, [&F]() { return F.Creds; }, "module-stream", 8u * 1024u * 1024u); const uint64_t Size = 2u * 1024u * 1024u; // 2 MiB - between 512 KiB and 10 MiB std::vector Bytes(Size); for (size_t I = 0; I < Size; ++I) { Bytes[I] = static_cast((I * 53u + 11u) & 0xFF); } std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "stream.bin", Bytes); IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Put(Work, F.Pool, ContentHash, Size, SrcPath, Stats); Work.Wait(); } std::filesystem::path DstPath = F.TmpDir.Path() / "stream_out.bin"; { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Get(Work, F.Pool, ContentHash, Size, DstPath, Stats); Work.Wait(); } REQUIRE(std::filesystem::exists(DstPath)); REQUIRE(std::filesystem::file_size(DstPath) == Size); BasicFile Out(DstPath, BasicFile::Mode::kRead); IoBuffer Read = Out.ReadAll(); IoHash ReadHash = IoHash::HashBuffer(Read); CHECK(ReadHash == ContentHash); } TEST_CASE("s3asyncstorage.multipart_round_trip") { StatsBlock Sb; AsyncS3Fixture F; const uint64_t PartSize = 5u * 1024u * 1024u; // MinIO accepts >=5MiB parts. S3AsyncStorage Storage( *F.Client, *F.Builder, [&F]() { return F.Creds; }, "module-multipart", PartSize); // Three parts: 5 MiB + 5 MiB + 1 MiB. Threshold is PartSize + PartSize/4 (= // 6.25 MiB). Total 11 MiB triggers multipart path. const uint64_t TotalSize = 11u * 1024u * 1024u; std::vector Bytes(TotalSize); for (size_t I = 0; I < TotalSize; ++I) { Bytes[I] = static_cast((I * 131u + 7u) & 0xFF); } std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "big.bin", Bytes); IoHash Hash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Put(Work, F.Pool, Hash, TotalSize, SrcPath, Stats); Work.Wait(); } std::filesystem::path DstPath = F.TmpDir.Path() / "big_out.bin"; { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Get(Work, F.Pool, Hash, TotalSize, DstPath, Stats); Work.Wait(); } REQUIRE(std::filesystem::exists(DstPath)); REQUIRE(std::filesystem::file_size(DstPath) == TotalSize); BasicFile Out(DstPath, BasicFile::Mode::kRead); IoBuffer Read = Out.ReadAll(); IoHash ReadHash = IoHash::HashBuffer(Read); CHECK(ReadHash == Hash); } TEST_CASE("s3asyncstorage.parallel_puts") { StatsBlock Sb; AsyncS3Fixture F; S3AsyncStorage Storage( *F.Client, *F.Builder, [&F]() { return F.Creds; }, "module-parallel", 8u * 1024u * 1024u); const size_t N = 32; std::vector Hashes; std::vector SrcPaths; for (size_t I = 0; I < N; ++I) { std::vector Bytes(1024, static_cast(I)); std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("p_{}.bin", I), Bytes); Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size())); SrcPaths.push_back(P); } std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); for (size_t I = 0; I < N; ++I) { Storage.Put(Work, F.Pool, Hashes[I], 1024, SrcPaths[I], Stats); } Work.Wait(); CHECK(Storage.List().size() == N); } namespace { // One MinIO + builder + creds + pool, reused across multiple admission / // throttle test scopes. Each scope constructs its own AsyncHttpClient via // MakeClient(Cap) so different MaxConcurrentRequests values are exercised // without re-spawning MinIO. struct ThrottledAsyncS3Fixture { MinioProcess Minio{MakeMinioOpts()}; ScopedTemporaryDirectory TmpDir; std::unique_ptr Builder; SigV4Credentials Creds; WorkerThreadPool Pool{4}; ThrottledAsyncS3Fixture() { Minio.SpawnMinioServer(); Minio.CreateBucket("async-test-throttle"); Creds.AccessKeyId = "minioadmin"; Creds.SecretAccessKey = "minioadmin"; Builder = std::make_unique("us-east-1", "async-test-throttle", Minio.Endpoint(), /*PathStyle*/ true); } std::unique_ptr MakeClient(uint32_t MaxConcurrentRequests) { HttpClientSettings Settings; Settings.LogCategory = "async-s3-throttle-test"; Settings.MaxConcurrentConnectionsPerHost = 16; Settings.MaxConcurrentRequests = MaxConcurrentRequests; // Match production S3Hydration: cover MinIO post-spawn 503 // (XMinioServerNotInitialized) transients before storage backend // finishes warming up after the spawn ready-check passes. Settings.RetryCount = 3; return std::make_unique(Minio.Endpoint(), Settings); } }; } // namespace // Non-multipart admission scenarios: small-file fanout under a cap, fanout // with admission disabled, and admission-wait stat recording. All share one // MinIO instance via ThrottledAsyncS3Fixture; each scope picks its own Cap // and KeyPrefix so the bucket entries don't collide across scopes. TEST_CASE("s3asyncstorage.admission.fanout") { ThrottledAsyncS3Fixture F; // respects.async.client.cap - in-flight peak <= storage admission cap. { StatsBlock Sb; const uint32_t Cap = 2; auto Admission = std::make_shared(static_cast(Cap)); auto Client = F.MakeClient(Cap); S3AsyncStorage Storage( *Client, *F.Builder, [&F]() { return F.Creds; }, "module-cap", 8u * 1024u * 1024u, Admission, Cap); const size_t N = 8; std::vector Hashes; std::vector SrcPaths; for (size_t I = 0; I < N; ++I) { std::vector Bytes(256, static_cast(I)); std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("c_{}.bin", I), Bytes); Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size())); SrcPaths.push_back(P); } std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); for (size_t I = 0; I < N; ++I) { Storage.Put(Work, F.Pool, Hashes[I], 256, SrcPaths[I], Stats); } Work.Wait(); std::vector Listed = Storage.List(); std::sort(Listed.begin(), Listed.end()); std::vector Expected = Hashes; std::sort(Expected.begin(), Expected.end()); CHECK(Listed == Expected); CHECK(Sb.InFlightPeak.load() <= 2u); } // unlimited_concurrent_requests - admission disabled (nullptr semaphore, // Cap=0). Storage drains the fanout cleanly without gating. { StatsBlock Sb; auto Client = F.MakeClient(0); S3AsyncStorage Storage( *Client, *F.Builder, [&F]() { return F.Creds; }, "module-uncapped", 8u * 1024u * 1024u); const size_t N = 8; std::vector Hashes; std::vector SrcPaths; for (size_t I = 0; I < N; ++I) { std::vector Bytes(256, static_cast(I)); std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("u_{}.bin", I), Bytes); Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size())); SrcPaths.push_back(P); } std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); for (size_t I = 0; I < N; ++I) { Storage.Put(Work, F.Pool, Hashes[I], 256, SrcPaths[I], Stats); } Work.Wait(); std::vector Listed = Storage.List(); std::sort(Listed.begin(), Listed.end()); std::vector Expected = Hashes; std::sort(Expected.begin(), Expected.end()); CHECK(Listed == Expected); } // admission.wait.us.recorded - cap=2 with 8 fanout submits forces at least // some submissions to block on the admission semaphore. AdmissionWait // totals must be non-zero. { StatsBlock Sb; const uint32_t Cap = 2; auto Admission = std::make_shared(static_cast(Cap)); auto Client = F.MakeClient(Cap); S3AsyncStorage Storage( *Client, *F.Builder, [&F]() { return F.Creds; }, "module-adm-wait", 8u * 1024u * 1024u, Admission, Cap); const size_t N = 8; std::vector Hashes; std::vector SrcPaths; for (size_t I = 0; I < N; ++I) { std::vector Bytes(256, static_cast(I)); std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("aw_{}.bin", I), Bytes); Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size())); SrcPaths.push_back(P); } std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); for (size_t I = 0; I < N; ++I) { Storage.Put(Work, F.Pool, Hashes[I], 256, SrcPaths[I], Stats); } Work.Wait(); CHECK(Sb.AdmissionWaitTotalUs.load() > 0u); CHECK(Sb.AdmissionWaitMaxUs.load() > 0u); CHECK(Sb.InFlightPeak.load() <= Cap); } } // Drives the AbortMultipart path: a 3-part multipart upload where one part is // forced to fail via the test hook. AnyFailed flips, FinalizePutPart hops // off-strand once PendingParts reaches 1, AbortMultipart sends DELETE to S3 // to discard the upload-id. // Token must surface the original part failure; bucket must not contain the // uploaded CAS object after Wait returns. TEST_CASE("s3asyncstorage.multipart.abort_on_part_failure") { StatsBlock Sb; AsyncS3Fixture F; const uint64_t PartSize = 5u * 1024u * 1024u; S3AsyncStorage Storage( *F.Client, *F.Builder, [&F]() { return F.Creds; }, "module-multipart-abort", PartSize); const uint64_t TotalSize = 11u * 1024u * 1024u; // 3 parts: 5+5+1 MiB std::vector Bytes(TotalSize); for (size_t I = 0; I < TotalSize; ++I) { Bytes[I] = static_cast((I * 23u + 9u) & 0xFF); } std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "abort.bin", Bytes); IoHash Hash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); // Force one part to fail; AnyFailed -> AbortMultipart. s3asyncstorage_test_hooks::ForceNextPartFailures(1); bool ThrewFromWait = false; std::string ErrMessage; { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Put(Work, F.Pool, Hash, TotalSize, SrcPath, Stats); try { Work.Wait(); } catch (const std::exception& Ex) { ThrewFromWait = true; ErrMessage = Ex.what(); } } CHECK(ThrewFromWait); CHECK(ErrMessage.find("test-injected part failure") != std::string::npos); // Reset hook so subsequent multipart tests aren't affected. s3asyncstorage_test_hooks::ForceNextPartFailures(0); // Bucket must not contain the CAS object - AbortMultipart discards the // upload before S3 publishes the assembled object. std::vector Listed = Storage.List(); CHECK(std::find(Listed.begin(), Listed.end(), Hash) == Listed.end()); } // Multipart admission scenarios sharing one MinIO instance. Each scope picks // its own Cap, KeyPrefix, and payload pattern so bucket entries don't collide. // Covers: // - under.cap: TotalParts < cap (no slot handoff needed); round-trip Put+Get. // - no.deadlock.streaming.get: Put + ranged Get on same hydration pool. // - parts.exceed.cap.paces: TotalParts > cap; wave + slot-handoff completes // successfully with InFlightPeak <= cap. // - aborts.release.tokens: forced part failure releases held slots so a // follow-up Put can run. // - handoff.in.flight: cap=1 forces strictly sequential dispatch. TEST_CASE("s3asyncstorage.admission.multipart") { ThrottledAsyncS3Fixture F; const uint64_t PartSize = 5u * 1024u * 1024u; // MinIO accepts >= 5 MiB parts. // under.cap: 3-part Put + Get sits within the initial wave (cap=4). { StatsBlock Sb; const uint32_t Cap = 4; auto Admission = std::make_shared(static_cast(Cap)); auto Client = F.MakeClient(Cap); S3AsyncStorage Storage( *Client, *F.Builder, [&F]() { return F.Creds; }, "module-multipart-cap", PartSize, Admission, Cap); const uint64_t TotalSize = 11u * 1024u * 1024u; std::vector Bytes(TotalSize); for (size_t I = 0; I < TotalSize; ++I) { Bytes[I] = static_cast((I * 17u + 5u) & 0xFF); } std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp.bin", Bytes); IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats); Work.Wait(); } std::filesystem::path DstPath = F.TmpDir.Path() / "mp_out.bin"; { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Get(Work, F.Pool, ContentHash, TotalSize, DstPath, Stats); Work.Wait(); } REQUIRE(std::filesystem::exists(DstPath)); REQUIRE(std::filesystem::file_size(DstPath) == TotalSize); BasicFile Out(DstPath, BasicFile::Mode::kRead); IoBuffer Read = Out.ReadAll(); IoHash ReadHash = IoHash::HashBuffer(Read); CHECK(ReadHash == ContentHash); } // no.deadlock.streaming.get: PutMultipart fans onto hydration pool while // io strand fires AsyncPut completions; followed by Get exercising stream // write-back on the same pool. { StatsBlock Sb; const uint32_t Cap = 4; auto Admission = std::make_shared(static_cast(Cap)); auto Client = F.MakeClient(Cap); S3AsyncStorage Storage( *Client, *F.Builder, [&F]() { return F.Creds; }, "module-stream-no-deadlock", PartSize, Admission, Cap); const uint64_t TotalSize = 11u * 1024u * 1024u; std::vector Bytes(TotalSize); for (size_t I = 0; I < TotalSize; ++I) { Bytes[I] = static_cast((I * 41u + 7u) & 0xFF); } std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "sd.bin", Bytes); IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats); Work.Wait(); } std::filesystem::path DstPath = F.TmpDir.Path() / "sd_out.bin"; { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Get(Work, F.Pool, ContentHash, TotalSize, DstPath, Stats); Work.Wait(); } REQUIRE(std::filesystem::exists(DstPath)); REQUIRE(std::filesystem::file_size(DstPath) == TotalSize); BasicFile Out(DstPath, BasicFile::Mode::kRead); IoBuffer Read = Out.ReadAll(); IoHash ReadHash = IoHash::HashBuffer(Read); CHECK(ReadHash == ContentHash); } // parts.exceed.cap.paces: TotalParts > cap completes by pacing via slot // handoff. Initial wave dispatches Cap parts; each completion hands its // slot to the next undispatched part. InFlightPeak stays bounded by Cap. { StatsBlock Sb; const uint32_t Cap = 2; auto Admission = std::make_shared(static_cast(Cap)); auto Client = F.MakeClient(Cap); S3AsyncStorage Storage( *Client, *F.Builder, [&F]() { return F.Creds; }, "module-mp-paces", PartSize, Admission, Cap); const uint64_t TotalSize = 16u * 1024u * 1024u; // 4 parts > cap=2 std::vector Bytes(TotalSize, 0xA5); std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp_paces.bin", Bytes); IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats); Work.Wait(); } CHECK(Sb.InFlightPeak.load() <= Cap); std::vector Listed = Storage.List(); CHECK(std::find(Listed.begin(), Listed.end(), ContentHash) != Listed.end()); } // aborts.release.tokens: forced part failure mid-flight; drain releases // admission slots so a follow-up small Put can run without blocking. { StatsBlock Sb; const uint32_t Cap = 4; auto Admission = std::make_shared(static_cast(Cap)); auto Client = F.MakeClient(Cap); S3AsyncStorage Storage( *Client, *F.Builder, [&F]() { return F.Creds; }, "module-mp-abort-release", PartSize, Admission, Cap); const uint64_t TotalSize = 11u * 1024u * 1024u; // 3 parts under cap=4 std::vector Bytes(TotalSize, 0x5A); std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp_abrel.bin", Bytes); IoHash AbortHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); s3asyncstorage_test_hooks::ForceNextPartFailures(1); { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Put(Work, F.Pool, AbortHash, TotalSize, SrcPath, Stats); try { Work.Wait(); } catch (...) { } } s3asyncstorage_test_hooks::ForceNextPartFailures(0); std::vector Small(1024, 0x11); std::filesystem::path SmallPath = WriteBlob(F.TmpDir.Path() / "mp_abrel_small.bin", Small); IoHash SmallHash = IoHash::HashBuffer(Small.data(), Small.size()); { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Put(Work, F.Pool, SmallHash, Small.size(), SmallPath, Stats); Work.Wait(); } std::vector Listed = Storage.List(); CHECK(std::find(Listed.begin(), Listed.end(), SmallHash) != Listed.end()); } // handoff.in.flight: cap=1 forces strictly sequential dispatch (initial // wave size 1, every subsequent part dispatched via handoff from the prior // AsyncPut completion). { StatsBlock Sb; const uint32_t Cap = 1; auto Admission = std::make_shared(static_cast(Cap)); auto Client = F.MakeClient(Cap); S3AsyncStorage Storage( *Client, *F.Builder, [&F]() { return F.Creds; }, "module-mp-handoff", PartSize, Admission, Cap); const uint64_t TotalSize = 16u * 1024u * 1024u; // 4 parts; handoff fires 3 times std::vector Bytes(TotalSize, 0x33); std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp_handoff.bin", Bytes); IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); Storage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats); Work.Wait(); } CHECK(Sb.InFlightPeak.load() == 1u); std::vector Listed = Storage.List(); CHECK(std::find(Listed.begin(), Listed.end(), ContentHash) != Listed.end()); } // ranges.exceed.cap.paces: GetMultipart with RangeCount > AdmissionCap. // The dispatcher loop's per-range AcquireAdmissionSlot blocks the caller // thread until in-flight ranges fire AsyncStream completions and release // slots. InFlightPeak per Get stays bounded by Cap. Upload first with a // larger client cap so the Put itself doesn't pace, then re-open Storage // with the small cap purely to exercise the Get pacing path. { StatsBlock Sb; const uint32_t UploadCap = 8; auto UploadAdmission = std::make_shared(static_cast(UploadCap)); auto UploadClient = F.MakeClient(UploadCap); S3AsyncStorage UploadStorage( *UploadClient, *F.Builder, [&F]() { return F.Creds; }, "module-mp-get-paces", PartSize, UploadAdmission, UploadCap); const uint64_t TotalSize = 16u * 1024u * 1024u; // 4 ranges with PartSize=5 MiB std::vector Bytes(TotalSize, 0x77); std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp_get_paces.bin", Bytes); IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = Sb.Ref(); UploadStorage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats); Work.Wait(); } StatsBlock GetSb; const uint32_t GetCap = 2; auto GetAdmission = std::make_shared(static_cast(GetCap)); auto GetClient = F.MakeClient(GetCap); S3AsyncStorage GetStorage( *GetClient, *F.Builder, [&F]() { return F.Creds; }, "module-mp-get-paces", PartSize, GetAdmission, GetCap); std::filesystem::path DstPath = F.TmpDir.Path() / "mp_get_paces_out.bin"; { std::atomic AbortFlag{false}; std::atomic PauseFlag{false}; ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); S3AsyncStorageStats Stats = GetSb.Ref(); GetStorage.Get(Work, F.Pool, ContentHash, TotalSize, DstPath, Stats); Work.Wait(); } CHECK(GetSb.InFlightPeak.load() <= GetCap); REQUIRE(std::filesystem::exists(DstPath)); REQUIRE(std::filesystem::file_size(DstPath) == TotalSize); BasicFile Out(DstPath, BasicFile::Mode::kRead); IoBuffer Read = Out.ReadAll(); IoHash ReadHash = IoHash::HashBuffer(Read); CHECK(ReadHash == ContentHash); } } TEST_SUITE_END(); #endif // ZEN_WITH_TESTS } // namespace zen