diff options
Diffstat (limited to 'src/zenserver/hub/s3asyncstorage.cpp')
| -rw-r--r-- | src/zenserver/hub/s3asyncstorage.cpp | 2808 |
1 files changed, 2808 insertions, 0 deletions
diff --git a/src/zenserver/hub/s3asyncstorage.cpp b/src/zenserver/hub/s3asyncstorage.cpp new file mode 100644 index 000000000..b8bbc55b2 --- /dev/null +++ b/src/zenserver/hub/s3asyncstorage.cpp @@ -0,0 +1,2808 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "s3asyncstorage.h" + +#include <zencore/basicfile.h> +#include <zencore/except_fmt.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zenutil/cloud/s3response.h> + +#include <cstring> + +#if ZEN_WITH_TESTS +# include <zencore/iohash.h> +# include <zencore/process.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zenutil/cloud/minioprocess.h> +#endif + +namespace zen { + +using namespace std::literals; + +namespace { + // Bounded write-buffer pool. OnData callbacks on the io strand call + // Acquire to obtain a fixed-size IoBuffer; workers call Release after + // the dispatched write completes. The pool retains at most as many + // buffers as future Acquire calls remain (PendingAcquires), so it + // stabilises at peak write concurrency without dragging buffers past + // the point they can ever be re-used. + class WriteBufferPool + { + public: + WriteBufferPool(size_t InBufferSize, uint32_t TotalAcquires) : m_BufferSize(InBufferSize), m_PendingAcquires(TotalAcquires) {} + + size_t BufferSize() const { return m_BufferSize; } + + IoBuffer Acquire() + { + IoBuffer Buf; + bool Exhausted = false; + m_Lock.WithExclusiveLock([&] { + if (m_PendingAcquires == 0) + { + Exhausted = true; + return; + } + --m_PendingAcquires; + if (!m_Pool.empty()) + { + Buf = std::move(m_Pool.back()); + m_Pool.pop_back(); + } + }); + if (Exhausted) + { + // Caller (OnData / OnComplete tail flush on the curl io strand) + // over-acquired vs the precomputed TotalBlocks budget. Throw a + // real exception so the boundary catch in + // AsyncCurlStreamWriteCallback can surface it; never let + // ZEN_ASSERT propagate through curl's C frames. + throw zen::runtime_error("WriteBufferPool exhausted: more buffers requested than expected"); + } + if (!Buf) + { + Buf = IoBuffer(m_BufferSize); + } + return Buf; + } + + void Release(IoBuffer Buf) + { + m_Lock.WithExclusiveLock([&] { + if (m_Pool.size() < m_PendingAcquires) + { + m_Pool.push_back(std::move(Buf)); + } + }); + } + + // Returns the slot consumed by Acquire() back to the budget without + // supplying a buffer; for paths where the buffer was already moved into a + // lambda (e.g. ScheduleWork threw after the move). + void RestoreAcquireSlot() + { + m_Lock.WithExclusiveLock([&] { ++m_PendingAcquires; }); + } + + private: + const size_t m_BufferSize; + RwLock m_Lock; + std::vector<IoBuffer> m_Pool; + uint32_t m_PendingAcquires = 0; + }; + + // Acquire one admission slot on the dispatcher thread. Returns a refcounted + // handle whose deleter releases the slot on last drop, so callers thread it + // through worker lambdas and AsyncXxx callbacks; whichever runs last fires + // the release. nullptr when admission is disabled (Sem == nullptr). + // + // Stats may be null when the caller has no per-request stat block (DeleteAll); + // the slot is still held but the wait time is not recorded. + // + // Exception safety: counting_semaphore::acquire is noexcept; only the + // shared_ptr control-block allocation can throw. The standard's deleter- + // invocation guarantee for shared_ptr(p, d) only kicks in when p is non- + // null (p == nullptr here), so libstdc++/MSVC happen to invoke the deleter + // on alloc failure but it is not portably required - hence the explicit + // guard. + std::shared_ptr<void> AcquireAdmissionSlot(const std::shared_ptr<AdmissionSemaphore>& Sem, S3AsyncStorageStats* Stats = nullptr) + { + if (!Sem) + { + return nullptr; + } + Stopwatch AdmWait; + Sem->acquire(); + auto ReleaseGuard = MakeGuard([&Sem] { Sem->release(); }); + std::shared_ptr<void> SlotRef(nullptr, [Sem](void*) { Sem->release(); }); + ReleaseGuard.Dismiss(); + if (Stats) + { + Stats->RecordAdmissionWait(AdmWait.GetElapsedTimeUs()); + } + return SlotRef; + } +} // namespace + +S3AsyncStorage::S3AsyncStorage(AsyncHttpClient& Client, + S3RequestBuilder& Builder, + CredentialsCallback GetCreds, + std::string KeyPrefix, + uint64_t MultipartChunkSize, + std::shared_ptr<AdmissionSemaphore> Admission, + uint32_t AdmissionCap) +: m_Client(Client) +, m_Builder(Builder) +, m_GetCreds(std::move(GetCreds)) +, m_KeyPrefix(std::move(KeyPrefix)) +, m_MultipartChunkSize(MultipartChunkSize) +, m_Admission(std::move(Admission)) +, m_AdmissionCap(m_Admission ? AdmissionCap : 0u) +{ +} + +std::string +S3AsyncStorage::CasKey(const IoHash& Hash) const +{ + return fmt::format("{}/cas/{}", m_KeyPrefix, Hash); +} + +std::string +S3AsyncStorage::CasPath(const IoHash& Hash) const +{ + return m_Builder.KeyToPath(CasKey(Hash)); +} + +// Per-range writes target the prealloc'd file at-offset; BasicFile::Write is positional and concurrency-safe. +struct S3AsyncStorage::GetMultipartState +{ + GetMultipartState(ParallelWork& InWork, WorkerThreadPool& InPool, S3AsyncStorageStats InStats, uint32_t TotalBlocks) + : Work(InWork) + , Pool(InPool) + , Stats(InStats) + , Buffers(WriteBufferSize, TotalBlocks) + { + } + + static constexpr size_t WriteBufferSize = 512u * 1024u; + + ParallelWork& Work; + WorkerThreadPool& Pool; + std::shared_ptr<BasicFile> DestFile; + std::filesystem::path DestPath; + std::shared_ptr<ParallelWork::ExternalWorkToken> Token; + IoHash ContentHash; + S3AsyncStorageStats Stats; + uint32_t TotalRanges = 0; + std::atomic<uint32_t> PendingRanges{0}; + std::atomic<bool> AnyFailed{false}; + RwLock ErrorLock; + std::string FirstError; + + WriteBufferPool Buffers; +}; + +// Shared state for a single-stream GET (medium tier - one ranged request, +// streamed body). OnData fills 512 KiB IoBuffers from a shared pool on the +// io strand and dispatches each filled buffer to a worker for one positional +// Write. PendingWork counts in-flight writes plus a +1 slot for the stream +// itself; the side that drops it to zero (last writer or OnComplete after +// the final flush) finalises the request. +struct S3AsyncStorage::GetStreamState +{ + GetStreamState(ParallelWork& InWork, WorkerThreadPool& InPool, S3AsyncStorageStats InStats, uint32_t TotalBlocks) + : Work(InWork) + , Pool(InPool) + , Stats(InStats) + , Buffers(WriteBufferSize, TotalBlocks) + { + } + + static constexpr size_t WriteBufferSize = 512u * 1024u; + + ParallelWork& Work; + WorkerThreadPool& Pool; + std::shared_ptr<BasicFile> DestFile; + std::filesystem::path DestPath; + std::shared_ptr<ParallelWork::ExternalWorkToken> Token; + IoHash ContentHash; + S3AsyncStorageStats Stats; + uint64_t ExpectedSize = 0; + + IoBuffer ActiveBuf; + size_t BufFill = 0; + uint64_t NextAbsOffset = 0; + uint64_t TotalReceived = 0; + + std::atomic<uint32_t> PendingWork{1}; // +1 for stream completion + std::atomic<bool> Failed{false}; + RwLock ErrorLock; + std::string FirstError; + + WriteBufferPool Buffers; +}; + +// Shared state for an in-flight multipart upload. PendingParts.fetch_sub(acq_rel) +// in FinalizePutPart is the publication barrier for ETag writes; CompleteMultipart +// (gated on Remaining == 1) sees every part's slot. ETagLock guards FirstError; +// AnyFailed short-circuits the pipeline on first error. +struct S3AsyncStorage::PutMultipartState +{ + PutMultipartState(ParallelWork& InWork, WorkerThreadPool& InPool, S3AsyncStorageStats InStats) + : Work(InWork) + , Pool(InPool) + , Stats(InStats) + { + } + + ParallelWork& Work; + WorkerThreadPool& Pool; + std::shared_ptr<BasicFile> File; + std::shared_ptr<ParallelWork::ExternalWorkToken> Token; + IoHash ContentHash; + S3AsyncStorageStats Stats; + // Snapshot of credentials taken once at PutMultipart entry. Reused across + // all per-part signing + Complete/Abort. Avoids per-part m_GetCreds() + // shared-lock contention on the credential provider. + SigV4Credentials Creds; + std::string Key; + std::string Path; + std::string UploadId; + uint64_t TotalSize = 0; + uint64_t PartSize = 0; + uint32_t TotalParts = 0; + std::atomic<uint32_t> PendingParts{0}; + // Monotonic dispatch cursor. Initialized to PreAcquired (min(TotalParts, + // AdmissionCap)) in PutMultipart. Each AsyncPut completion racing the + // handoff path issues fetch_add(1); whichever completion observes a value + // < TotalParts owns the next dispatch. + std::atomic<uint32_t> NextPartToDispatch{0}; + uint32_t PreAcquired = 0; + // Counts pending ReadRange calls into File. Used to release File from the + // worker thread that does the last read so the close runs off the curl + // io strand. + std::atomic<uint32_t> ReadsRemaining{0}; + std::atomic<bool> AnyFailed{false}; + + RwLock ETagLock; + std::vector<std::string> ETags; // indexed by PartNumber-1 + std::string FirstError; +}; + +namespace { + // Sets AnyFailed (CAS) and captures FirstError on first failure only. + // Does NOT touch the dispatch cursor; callers that need to drain the + // undispatched tail invoke ClaimUndispatchedParts and fan out skips. + void RecordPutPartFailure(S3AsyncStorage::PutMultipartState& State, const std::string& Err) + { + // Log every failure so correlated S3 errors (e.g. AccessDenied on one + // part, ServiceUnavailable on another) all reach the operator. CAS still + // keeps the first message for the eventual Token->Fail. + ZEN_WARN("S3AsyncStorage::PutMultipart '{}': {}", State.ContentHash, Err); + bool ExpectedFalse = false; + if (State.AnyFailed.compare_exchange_strong(ExpectedFalse, true)) + { + State.ETagLock.WithExclusiveLock([&] { State.FirstError = Err; }); + } + } + + // Claims every part index from NextPartToDispatch up to TotalParts. + // Returns the count claimed; caller is responsible for firing one + // FinalizePutPart per claimed index so PendingParts can reach 1 and + // trip CompleteMultipart/AbortMultipart. Idempotent on repeated calls + // (subsequent invocations return 0). + uint32_t ClaimUndispatchedParts(S3AsyncStorage::PutMultipartState& State) + { + const uint32_t Claimed = State.NextPartToDispatch.exchange(State.TotalParts, std::memory_order_acq_rel); + return Claimed < State.TotalParts ? State.TotalParts - Claimed : 0u; + } + + void RecordGetPartFailure(S3AsyncStorage::GetMultipartState& State, const std::string& Err) + { + ZEN_WARN("S3AsyncStorage::GetMultipart '{}': {}", State.ContentHash, Err); + bool ExpectedFalse = false; + if (State.AnyFailed.compare_exchange_strong(ExpectedFalse, true)) + { + State.ErrorLock.WithExclusiveLock([&] { State.FirstError = Err; }); + } + } + + void RecordGetStreamFailure(S3AsyncStorage::GetStreamState& State, const std::string& Err) + { + ZEN_WARN("S3AsyncStorage::Get '{}': {}", State.ContentHash, Err); + bool ExpectedFalse = false; + if (State.Failed.compare_exchange_strong(ExpectedFalse, true)) + { + State.ErrorLock.WithExclusiveLock([&] { State.FirstError = Err; }); + } + } +} // namespace + +// Three tiers selected by Size: +// - Small (< kPutSmallThreshold): single PUT, body materialized into one IoBuffer; SHA in one pass. +// - Medium (< MultipartThreshold): single PUT, streaming source; two-pass (hash, then upload) with bounded RAM. +// - Large (>= MultipartThreshold): S3 multipart; per-part body materialized in DispatchPartUpload. +namespace { + constexpr uint64_t kPutSmallThreshold = 512u * 1024u; + constexpr size_t kPutReadChunk = 256u * 1024u; +} // namespace + +void +S3AsyncStorage::Put(ParallelWork& Work, + WorkerThreadPool& Pool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath, + S3AsyncStorageStats& Stats) +{ + const uint64_t MultipartThreshold = m_MultipartChunkSize + (m_MultipartChunkSize / 4); + if (Size >= MultipartThreshold) + { + PutMultipart(Work, Pool, Hash, Size, SourcePath, Stats); + return; + } + if (Size < kPutSmallThreshold) + { + PutSmall(Work, Pool, Hash, Size, SourcePath, Stats); + return; + } + PutMedium(Work, Pool, Hash, Size, SourcePath, Stats); +} + +void +S3AsyncStorage::PutSmall(ParallelWork& Work, + WorkerThreadPool& Pool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath, + S3AsyncStorageStats& Stats) +{ + auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal()); + Stats.RecordScheduled(); + + // Acquire one admission slot on the dispatcher; SlotRef threads through + // worker lambda + AsyncPut callback so the slot is held until the network + // transfer completes (or the chain is destroyed on shutdown/cancel). + std::shared_ptr<void> SlotRef = AcquireAdmissionSlot(m_Admission, &Stats); + + // Snapshot creds once on the dispatcher (single shared-lock acquire on the + // credential provider) and capture by value into the worker. Mirrors + // PutMultipart and avoids per-fanout contention on m_GetCreds. + SigV4Credentials Creds = m_GetCreds(); + + // Capture Stats BY VALUE: S3AsyncStorageStats is a struct of references to + // the long-lived PhaseStats atomics (PhaseStats outlives all in-flight + // transfers via the ParallelWork latch). The Stats wrapper struct itself + // is a local in the caller (S3AsyncStorageAdapter::Put) and goes out of + // scope before deferred work runs, so capturing &Stats would dangle. + Work.ScheduleWork( + Pool, + [this, + Hash = IoHash(Hash), + Size, + SourcePath = std::filesystem::path(SourcePath), + Token, + Stats, + Creds = std::move(Creds), + SlotRef = std::move(SlotRef)](std::atomic<bool>& Abort) mutable { + if (Abort.load()) + { + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutSmall '{}': aborted"sv, Hash))); + return; + } + try + { + if (Creds.AccessKeyId.empty()) + { + Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutSmall '{}': no credentials available"sv, Hash))); + return; + } + + BasicFile File(SourcePath, BasicFile::Mode::kRead); + if (File.FileSize() != Size) + { + Token->Fail(std::make_exception_ptr( + zen::runtime_error("S3AsyncStorage::PutSmall '{}': source size {} differs from declared {}"sv, + Hash, + File.FileSize(), + Size))); + return; + } + + // Heap IoBuffer of exact size; explicit chunked pread fills it. No + // IoBuffer auto-materialize, no mmap. SHA computed in one pass over + // the buffer once it's filled. + IoBuffer Body(static_cast<size_t>(Size)); + uint8_t* Dst = static_cast<uint8_t*>(Body.MutableData()); + uint64_t Off = 0; + while (Off < Size) + { + const size_t Take = static_cast<size_t>(std::min<uint64_t>(kPutReadChunk, Size - Off)); + File.Read(Dst + Off, Take, Off); + Off += Take; + } + + std::string PayloadHash = Sha256ToHex(ComputeSha256(Body.GetData(), Body.GetSize())); + std::string Path = CasPath(Hash); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "PUT", Path, "", PayloadHash); + + Stopwatch Timer = Stats.BeginRequest(); + // Pair Begin with End so an alloc throw inside AsyncPut (lambda capture + // before submit) does not strand InFlight elevated. + auto InFlightGuard = MakeGuard([&Stats] { Stats.EndRequest(0, 0); }); + m_Client.AsyncPut( + Path, + std::move(Body), + [Hash, Token, Timer, Stats, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { + Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? static_cast<uint64_t>(Resp.UploadedBytes) : 0); + if (!Resp.IsSuccess()) + { + std::string Err = S3ErrorMessage("S3 PUT failed", Resp); + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutSmall '{}': {}"sv, Hash, Err))); + return; + } + Token->Complete(); + }, + Headers); + InFlightGuard.Dismiss(); + } + catch (...) + { + Token->Fail(std::current_exception()); + } + }); +} + +void +S3AsyncStorage::PutMedium(ParallelWork& Work, + WorkerThreadPool& Pool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath, + S3AsyncStorageStats& Stats) +{ + auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal()); + Stats.RecordScheduled(); + + std::shared_ptr<void> SlotRef = AcquireAdmissionSlot(m_Admission, &Stats); + + // Snapshot creds on the dispatcher; see PutSmall. + SigV4Credentials Creds = m_GetCreds(); + + Work.ScheduleWork( + Pool, + [this, + Hash = IoHash(Hash), + Size, + SourcePath = std::filesystem::path(SourcePath), + Token, + Stats, + Creds = std::move(Creds), + SlotRef = std::move(SlotRef)](std::atomic<bool>& Abort) mutable { + if (Abort.load()) + { + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMedium '{}': aborted"sv, Hash))); + return; + } + try + { + if (Creds.AccessKeyId.empty()) + { + Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMedium '{}': no credentials available"sv, Hash))); + return; + } + + // Hash pass: stream-read 256 KiB chunks into a heap scratch buffer, + // feed Sha256Stream incrementally. No body materialization. The file + // handle is shared with the upload pass via shared_ptr so libcurl's + // READ callback can pread from the same handle on the io thread. + auto File = std::make_shared<BasicFile>(SourcePath, BasicFile::Mode::kRead); + if (File->FileSize() != Size) + { + Token->Fail(std::make_exception_ptr( + zen::runtime_error("S3AsyncStorage::PutMedium '{}': source size {} differs from declared {}"sv, + Hash, + File->FileSize(), + Size))); + return; + } + + Sha256Stream Hasher; + { + std::unique_ptr<uint8_t[]> Scratch(new uint8_t[kPutReadChunk]); + uint64_t Off = 0; + while (Off < Size) + { + const size_t Take = static_cast<size_t>(std::min<uint64_t>(kPutReadChunk, Size - Off)); + File->Read(Scratch.get(), Take, Off); + Hasher.Update(Scratch.get(), Take); + Off += Take; + } + } + std::string PayloadHash = Sha256ToHex(Hasher.Finalize()); + std::string Path = CasPath(Hash); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "PUT", Path, "", PayloadHash); + + Stopwatch Timer = Stats.BeginRequest(); + // Pair Begin with End so an alloc throw inside AsyncPut does not + // strand InFlight elevated. + auto InFlightGuard = MakeGuard([&Stats] { Stats.EndRequest(0, 0); }); + + // Streaming source: libcurl pulls 256 KiB chunks from the file on + // the io thread. Local-disk pread is fast enough for medium tier; + // page cache makes the second pass over the same bytes near-free. + m_Client.AsyncPut( + Path, + Size, + [File](uint8_t* DstBuf, size_t MaxBytes, uint64_t AbsOffset) -> size_t { + const uint64_t Remaining = File->FileSize() > AbsOffset ? File->FileSize() - AbsOffset : 0; + const size_t Take = static_cast<size_t>(std::min<uint64_t>(MaxBytes, Remaining)); + if (Take == 0) + { + return 0; + } + File->Read(DstBuf, Take, AbsOffset); + return Take; + }, + [Hash, Token, Timer, Size, Stats, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { + Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? Size : 0); + if (!Resp.IsSuccess()) + { + std::string Err = S3ErrorMessage("S3 PUT failed", Resp); + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMedium '{}': {}"sv, Hash, Err))); + return; + } + Token->Complete(); + }, + Headers); + InFlightGuard.Dismiss(); + } + catch (...) + { + Token->Fail(std::current_exception()); + } + }); +} + +void +S3AsyncStorage::PutMultipart(ParallelWork& Work, + WorkerThreadPool& Pool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath, + S3AsyncStorageStats& Stats) +{ + auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal()); + Stats.RecordScheduled(); + + // Fires only when an exception is in flight; a missed Dismiss() on a + // non-throwing early return falls through to Token's destructor safety + // net rather than asserting on Token->Fail(nullptr). + auto FailGuard = MakeGuard([Token] { + if (auto Ex = std::current_exception()) + { + Token->Fail(Ex); + } + }); + + SigV4Credentials Creds = m_GetCreds(); + if (Creds.AccessKeyId.empty()) + { + FailGuard.Dismiss(); + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': no credentials available"sv, Hash))); + return; + } + + auto File = std::make_shared<BasicFile>(SourcePath, BasicFile::Mode::kRead); + if (File->FileSize() != Size) + { + FailGuard.Dismiss(); + Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': source size {} differs from declared {}"sv, + Hash, + File->FileSize(), + Size))); + return; + } + + auto State = std::make_shared<PutMultipartState>(Work, Pool, Stats); + State->File = std::move(File); + State->Token = Token; + State->ContentHash = Hash; + State->Creds = Creds; // snapshot reused across all parts + complete/abort + State->Key = CasKey(Hash); + State->Path = CasPath(Hash); + State->TotalSize = Size; + State->PartSize = m_MultipartChunkSize; + State->TotalParts = static_cast<uint32_t>((Size + State->PartSize - 1) / State->PartSize); + State->PendingParts = State->TotalParts; + State->ReadsRemaining = State->TotalParts; + State->ETags.resize(State->TotalParts); + + // Admission gating: the wave acquires min(TotalParts, AdmissionCap) slots + // inside DispatchInitialPartWave on a worker thread (off the dispatcher + // and off the io strand) so a single multipart cannot starve other + // dispatcher work while it drains the semaphore. Tail parts (TotalParts > + // cap) are dispatched lazily by HandoffSlotToNextPart from each AsyncPut + // completion, so the in-flight count per upload stays <= cap. + State->PreAcquired = m_Admission ? std::min<uint32_t>(State->TotalParts, m_AdmissionCap) : State->TotalParts; + State->NextPartToDispatch.store(State->PreAcquired, std::memory_order_relaxed); + + std::string CanonicalQs = BuildCanonicalQueryString({{"uploads", ""}}); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "POST", State->Path, CanonicalQs, S3EmptyPayloadHash); + std::string FullPath = S3BuildRequestPath(State->Path, CanonicalQs); + + // CreateMultipartUpload / CompleteMultipartUpload / AbortMultipartUpload are + // metadata operations - they do not move payload bytes and would distort the + // per-request stats (avg/max latency) if folded into RequestCount. Counted + // separately here means PhaseStats.RequestCount tracks data-bearing PUTs only, + // matching the sync S3Storage path's accounting more closely. + + m_Client.AsyncPost( + FullPath, + [this, State](HttpClient::Response Resp) { + if (!Resp.IsSuccess()) + { + std::string Err = S3ErrorMessage("S3 CreateMultipartUpload failed", Resp); + State->Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': {}"sv, State->ContentHash, Err))); + return; + } + // See PutMultipart entry: tolerate null exception_ptr on a missed Dismiss(). + auto CallbackGuard = MakeGuard([State] { + if (auto Ex = std::current_exception()) + { + State->Token->Fail(Ex); + } + }); + std::string_view Body = Resp.AsText(); + // S3 can answer 200 with an embedded <Error> body even on Create. + // Mirror CompleteMultipart's parse so the original error code/message + // surfaces instead of a generic "missing UploadId". + std::string_view ErrorCode; + std::string_view ErrorMessage; + if (S3ExtractError(Body, ErrorCode, ErrorMessage)) + { + CallbackGuard.Dismiss(); + State->Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': create returned error: {} - {}"sv, + State->ContentHash, + ErrorCode, + ErrorMessage))); + return; + } + std::string_view UploadId = S3ExtractXmlValue(Body, "UploadId"); + if (UploadId.empty()) + { + CallbackGuard.Dismiss(); + State->Token->Fail(std::make_exception_ptr( + zen::runtime_error("S3AsyncStorage::PutMultipart '{}': missing UploadId in CreateMultipartUpload response"sv, + State->ContentHash))); + return; + } + State->UploadId = std::string(UploadId); + // Hop the wave dispatch onto a worker so the per-part admission + // acquire never blocks the io strand or the caller dispatcher. + try + { + State->Work.ScheduleWork( + State->Pool, + [this, State](std::atomic<bool>&) { DispatchInitialPartWave(State); }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (...) + { + CallbackGuard.Dismiss(); + RecordPutPartFailure(*State, "S3 UploadPart wave schedule failed"); + for (uint32_t J = 0; J < State->TotalParts; ++J) + { + FinalizePutPart(State); + } + return; + } + CallbackGuard.Dismiss(); + }, + Headers); + FailGuard.Dismiss(); +} + +void +S3AsyncStorage::DispatchInitialPartWave(std::shared_ptr<PutMultipartState> State) +{ + const bool AdmissionEnabled = (m_Admission != nullptr); + const uint32_t InitialDispatch = AdmissionEnabled ? State->PreAcquired : State->TotalParts; + + for (uint32_t I = 0; I < InitialDispatch; ++I) + { + const uint32_t PartNum = I + 1; + std::shared_ptr<void> SlotRef; + try + { + if (AdmissionEnabled) + { + SlotRef = AcquireAdmissionSlot(m_Admission, &State->Stats); + } + State->Work.ScheduleWork( + State->Pool, + [this, State, PartNum, SlotRef = std::move(SlotRef)](std::atomic<bool>&) mutable { + DispatchPartUpload(State, PartNum, std::move(SlotRef)); + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + // Acquire/schedule failed mid-wave. Account for this part plus + // the un-iterated wave tail, then drain the lazy tail past the + // wave. Slot (if acquired) releases when SlotRef destructs. + RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} schedule failed: {}", PartNum, Ex.what())); + const uint32_t WaveTail = InitialDispatch - I; + for (uint32_t J = 0; J < WaveTail; ++J) + { + FinalizePutPart(State); + } + DrainUndispatchedParts(State); + return; + } + } +} + +void +S3AsyncStorage::HandoffSlotToNextPart(std::shared_ptr<PutMultipartState> State, uint32_t PartIdx, std::shared_ptr<void> SlotRef) +{ + const uint32_t PartNum = PartIdx + 1; + try + { + State->Work.ScheduleWork( + State->Pool, + [this, State, PartNum, SlotRef = std::move(SlotRef)](std::atomic<bool>&) mutable { + DispatchPartUpload(State, PartNum, std::move(SlotRef)); + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + // Handoff dispatch failed. The slot we were about to hand off + // releases when SlotRef destructs at scope exit. Account for this + // never-dispatched part and drain the rest of the tail. + RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} handoff failed: {}", PartNum, Ex.what())); + FinalizePutPart(State); + DrainUndispatchedParts(State); + } +} + +void +S3AsyncStorage::DrainUndispatchedParts(std::shared_ptr<PutMultipartState> State) +{ + const uint32_t Skipped = ClaimUndispatchedParts(*State); + for (uint32_t J = 0; J < Skipped; ++J) + { + FinalizePutPart(State); + } +} + +#if ZEN_WITH_TESTS +namespace s3asyncstorage_test_hooks { + std::atomic<uint32_t> g_ForceNextPartFailures{0}; + void ForceNextPartFailures(uint32_t Count) { g_ForceNextPartFailures.store(Count, std::memory_order_relaxed); } +} // namespace s3asyncstorage_test_hooks +#endif + +void +S3AsyncStorage::DispatchPartUpload(std::shared_ptr<PutMultipartState> State, uint32_t PartNum, std::shared_ptr<void> SlotRef) +{ + const uint64_t Offset = static_cast<uint64_t>(PartNum - 1) * State->PartSize; + const uint64_t ChunkSize = std::min<uint64_t>(State->PartSize, State->TotalSize - Offset); + + // Release File on this worker thread once all reads have been served, so + // the close never runs on the curl io strand from a captured State ref in + // an AsyncPut callback. + auto ReleaseFileIfLast = [&State] { + if (State->ReadsRemaining.fetch_sub(1, std::memory_order_acq_rel) == 1) + { + State->File.reset(); + } + }; + +#if ZEN_WITH_TESTS + // Test hook: synthesize a part-level failure to drive the AbortMultipart + // path. Decrement the counter while > 0; each consumed slot fails one part. + { + uint32_t Cur = s3asyncstorage_test_hooks::g_ForceNextPartFailures.load(std::memory_order_relaxed); + while (Cur > 0 && + !s3asyncstorage_test_hooks::g_ForceNextPartFailures.compare_exchange_weak(Cur, Cur - 1, std::memory_order_acq_rel)) + { + } + if (Cur > 0) + { + ReleaseFileIfLast(); + RecordPutPartFailure(*State, fmt::format("test-injected part failure (part {})", PartNum)); + FinalizePutPart(State); + DrainUndispatchedParts(State); + return; + } + } +#endif + + // Short-circuit if a foreign failure has flipped AnyFailed since this part was + // scheduled (handoff race: success callback bumps the cursor to dispatch the + // next part, drain runs concurrently). Skip the file read + sign + AsyncPut for + // a doomed transfer; FinalizePutPart accounting is symmetric to the success path. + if (State->AnyFailed.load(std::memory_order_acquire)) + { + ReleaseFileIfLast(); + FinalizePutPart(State); + return; + } + + // Explicit chunked positional pread + Sha256Stream in one pass. One heap + // IoBuffer of exact part size; no mmap, no IoBuffer auto-materialize. + IoBuffer Part(static_cast<size_t>(ChunkSize)); + uint8_t* Dst = static_cast<uint8_t*>(Part.MutableData()); + Sha256Stream Hasher; + try + { + uint64_t Read = 0; + while (Read < ChunkSize) + { + const size_t Take = static_cast<size_t>(std::min<uint64_t>(kPutReadChunk, ChunkSize - Read)); + State->File->Read(Dst + Read, Take, Offset + Read); + Hasher.Update(Dst + Read, Take); + Read += Take; + } + ReleaseFileIfLast(); + } + catch (const std::exception& Ex) + { + ReleaseFileIfLast(); + RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} read failed: {}", PartNum, Ex.what())); + FinalizePutPart(State); + DrainUndispatchedParts(State); + return; + } + + try + { + // Use the snapshot taken at PutMultipart entry; saves ~TotalParts + // shared-lock acquisitions on the credential provider. + const SigV4Credentials& Creds = State->Creds; + + std::string CanonicalQs = BuildCanonicalQueryString({ + {"partNumber", fmt::format("{}", PartNum)}, + {"uploadId", State->UploadId}, + }); + std::string PayloadHash = Sha256ToHex(Hasher.Finalize()); + + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "PUT", State->Path, CanonicalQs, PayloadHash); + std::string FullPath = S3BuildRequestPath(State->Path, CanonicalQs); + + Stopwatch Timer = State->Stats.BeginRequest(); + // Pair Begin with End so an alloc throw inside AsyncPut does not strand + // InFlight elevated. + auto InFlightGuard = MakeGuard([&State] { State->Stats.EndRequest(0, 0); }); + + m_Client.AsyncPut( + FullPath, + std::move(Part), + [this, State, PartNum, Timer, ChunkSize, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { + State->Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? ChunkSize : 0); + if (!Resp.IsSuccess()) + { + RecordPutPartFailure(*State, S3ErrorMessage(fmt::format("S3 UploadPart {} failed", PartNum), Resp)); + DrainUndispatchedParts(State); + FinalizePutPart(State); + return; + } + std::string_view ETag = Resp.FindHeader("etag"); + if (ETag.empty()) + { + RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} response missing ETag header", PartNum)); + DrainUndispatchedParts(State); + FinalizePutPart(State); + return; + } + // Per-slot writes don't race - each part owns its slot. + // PendingParts.fetch_sub(acq_rel) in FinalizePutPart is the + // publication barrier: CompleteMultipart (gated on Remaining + // == 1) sees every ETag write. New readers must go through + // the same barrier or take ETagLock. + State->ETags[PartNum - 1].assign(ETag); + + // Try to hand the slot off to the next undispatched part. Skip + // the handoff if a prior failure has flipped AnyFailed (cursor + // was already exchanged to TotalParts by the drain there). + if (!State->AnyFailed.load(std::memory_order_acquire)) + { + const uint32_t NextIdx = State->NextPartToDispatch.fetch_add(1, std::memory_order_acq_rel); + if (NextIdx < State->TotalParts) + { + HandoffSlotToNextPart(State, NextIdx, std::move(SlotRef)); + } + } + // Account for THIS part. SlotRef may already be moved-from + // (if handed off); if not, releases when this lambda destructs. + FinalizePutPart(State); + }, + Headers); + InFlightGuard.Dismiss(); + } + catch (const std::exception& Ex) + { + RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} dispatch failed: {}", PartNum, Ex.what())); + FinalizePutPart(State); + DrainUndispatchedParts(State); + } +} + +void +S3AsyncStorage::FinalizePutPart(std::shared_ptr<PutMultipartState> State) +{ + const uint32_t Remaining = State->PendingParts.fetch_sub(1, std::memory_order_acq_rel); + if (Remaining != 1) + { + return; + } + + // Hop the Complete/Abort dispatch off the curl io strand. SHA over the + // parts-XML and SignRequest are CPU work; AsyncPost itself queues a strand + // wakeup. Running both inline here would pin the strand thread. + const bool Failed = State->AnyFailed.load(std::memory_order_acquire); + try + { + State->Work.ScheduleWork(State->Pool, [this, State, Failed](std::atomic<bool>&) { + if (Failed) + { + AbortMultipart(State); + } + else + { + CompleteMultipart(State); + } + }); + } + catch (...) + { + // ScheduleWork failed before Complete/Abort could run; surface the leak via Token->Fail. + State->Token->Fail(std::current_exception()); + } +} + +void +S3AsyncStorage::CompleteMultipart(std::shared_ptr<PutMultipartState> State) +{ + // Reuse the snapshot taken at PutMultipart entry. The upload-id was issued + // against these creds; refreshing here would only matter if creds expired + // mid-upload, in which case all the part uploads would already have failed. + const SigV4Credentials& Creds = State->Creds; + + ExtendableStringBuilder<1024> Xml; + Xml.Append("<CompleteMultipartUpload>"); + for (uint32_t I = 0; I < State->TotalParts; ++I) + { + Xml.Append(fmt::format("<Part><PartNumber>{}</PartNumber><ETag>{}</ETag></Part>", I + 1, State->ETags[I])); + } + Xml.Append("</CompleteMultipartUpload>"); + std::string_view XmlView = Xml.ToView(); + + std::string CanonicalQs = BuildCanonicalQueryString({{"uploadId", State->UploadId}}); + std::string PayloadHash = Sha256ToHex(ComputeSha256(XmlView)); + + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "POST", State->Path, CanonicalQs, PayloadHash); + std::string FullPath = S3BuildRequestPath(State->Path, CanonicalQs); + + IoBuffer Payload(IoBuffer::Clone, XmlView.data(), XmlView.size()); + + // Metadata op; not counted in PhaseStats (see CreateMultipartUpload comment). + m_Client.AsyncPost( + FullPath, + Payload, + ZenContentType::kXML, + [State](HttpClient::Response Resp) { + if (!Resp.IsSuccess()) + { + std::string Err = S3ErrorMessage("S3 CompleteMultipartUpload failed", Resp); + State->Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': {}"sv, State->ContentHash, Err))); + return; + } + std::string_view Body = Resp.AsText(); + std::string_view ErrorCode; + std::string_view ErrorMessage; + if (S3ExtractError(Body, ErrorCode, ErrorMessage)) + { + State->Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': complete returned error: {} - {}"sv, + State->ContentHash, + ErrorCode, + ErrorMessage))); + return; + } + State->Token->Complete(); + }, + Headers); +} + +void +S3AsyncStorage::AbortMultipart(std::shared_ptr<PutMultipartState> State) +{ + // Reuse the snapshot taken at PutMultipart entry. + const SigV4Credentials& Creds = State->Creds; + + std::string CanonicalQs = BuildCanonicalQueryString({{"uploadId", State->UploadId}}); + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "DELETE", State->Path, CanonicalQs, S3EmptyPayloadHash); + std::string FullPath = S3BuildRequestPath(State->Path, CanonicalQs); + + // Metadata op; not counted in PhaseStats (see CreateMultipartUpload comment). + m_Client.AsyncDelete( + FullPath, + [State](HttpClient::Response Resp) { + std::string FirstError; + State->ETagLock.WithExclusiveLock([&] { FirstError = std::move(State->FirstError); }); + if (!Resp.IsSuccess()) + { + State->Token->Fail(std::make_exception_ptr( + zen::runtime_error("S3AsyncStorage::PutMultipart '{}': part failed ({}); abort also failed: {}"sv, + State->ContentHash, + FirstError, + S3ErrorMessage("S3 AbortMultipartUpload failed", Resp)))); + return; + } + State->Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': {}"sv, State->ContentHash, FirstError))); + }, + Headers); +} + +void +S3AsyncStorage::Get(ParallelWork& Work, + WorkerThreadPool& Pool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath, + S3AsyncStorageStats& Stats) +{ + // Three tiers: in-memory AsyncGet (< 512 KiB), AsyncStream into pooled + // 512 KiB write buffers (medium), GetMultipart (>= MultiRangeThreshold). + constexpr uint64_t StreamingThreshold = 512u * 1024u; + + const uint64_t MultiRangeThreshold = m_MultipartChunkSize + (m_MultipartChunkSize / 4); + if (Size >= MultiRangeThreshold) + { + GetMultipart(Work, Pool, Hash, Size, DestinationPath, Stats); + return; + } + + auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal()); + Stats.RecordScheduled(); + + // See PutMultipart: tolerate null exception_ptr from a refactor that + // adds a non-throwing early return without Dismiss(). + auto FailGuard = MakeGuard([Token] { + if (auto Ex = std::current_exception()) + { + Token->Fail(Ex); + } + }); + + std::shared_ptr<void> SlotRef = AcquireAdmissionSlot(m_Admission, &Stats); + + SigV4Credentials Creds = m_GetCreds(); + if (Creds.AccessKeyId.empty()) + { + FailGuard.Dismiss(); + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': no credentials available"sv, Hash))); + return; + } + + std::string Path = CasPath(Hash); + + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "GET", Path, "", S3EmptyPayloadHash); + + Stopwatch Timer = Stats.BeginRequest(); + // Pair Begin with End so an alloc throw inside AsyncGet/AsyncStream does not + // strand InFlight elevated. + auto InFlightGuard = MakeGuard([&Stats] { Stats.EndRequest(0, 0); }); + + if (Size < StreamingThreshold) + { + // Small: in-memory AsyncGet, single worker-hop write at completion. + m_Client.AsyncGet( + Path, + [Hash = IoHash(Hash), Token, Timer, Stats, DestPath = DestinationPath, Size, &Work, &Pool, SlotRef = std::move(SlotRef)]( + HttpClient::Response Resp) mutable { + Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? Resp.ResponsePayload.GetSize() : 0); + // No remove() on the failure / size-mismatch paths: the destination + // file is only created by the worker hop below on success, so + // nothing exists to delete here. + if (!Resp.IsSuccess()) + { + std::string Err = S3ErrorMessage("S3 GET failed", Resp); + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': {}"sv, Hash, Err))); + return; + } + if (Resp.ResponsePayload.GetSize() != Size) + { + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': received {} bytes, expected {}"sv, + Hash, + Resp.ResponsePayload.GetSize(), + Size))); + return; + } + try + { + Work.ScheduleWork( + Pool, + [Token, DestPath, Hash, Payload = std::move(Resp.ResponsePayload)](std::atomic<bool>&) mutable { + try + { + BasicFile Out(DestPath, BasicFile::Mode::kTruncate); + Out.Write(Payload.GetData(), Payload.GetSize(), 0); + Token->Complete(); + } + catch (const std::exception& Ex) + { + std::error_code Ec; + std::filesystem::remove(DestPath, Ec); + Token->Fail(std::make_exception_ptr( + zen::runtime_error("S3AsyncStorage::Get '{}': write failed: {}"sv, Hash, Ex.what()))); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': schedule failed: {}"sv, Hash, Ex.what()))); + } + }, + Headers); + InFlightGuard.Dismiss(); + FailGuard.Dismiss(); + return; + } + + // Medium tier: AsyncStream + GetStreamState (see struct doc). + const uint32_t StreamTotalBlocks = + static_cast<uint32_t>((Size + GetStreamState::WriteBufferSize - 1) / GetStreamState::WriteBufferSize); + auto State = std::make_shared<GetStreamState>(Work, Pool, Stats, StreamTotalBlocks); + State->Token = Token; + State->ContentHash = Hash; + State->DestPath = DestinationPath; + State->ExpectedSize = Size; + + try + { + State->DestFile = std::make_shared<BasicFile>(DestinationPath, BasicFile::Mode::kTruncate); + std::error_code PrepareEc = PrepareFileForScatteredWrite(State->DestFile->Handle(), Size); + if (PrepareEc) + { + throw zen::runtime_error("PrepareFileForScatteredWrite failed: {}"sv, PrepareEc.message()); + } + } + catch (const std::exception& Ex) + { + FailGuard.Dismiss(); + // Drop DestFile (if opened) before remove() so Windows lets the unlink + // proceed; otherwise we leave a 0-byte file behind on the prealloc path. + State->DestFile.reset(); + std::error_code Ec; + std::filesystem::remove(DestinationPath, Ec); + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': preallocate failed: {}"sv, Hash, Ex.what()))); + return; + } + + m_Client.AsyncStream( + Path, + [this, State](const uint8_t* Data, size_t SizeBytes, uint64_t /*TotalSize*/) -> bool { + constexpr size_t WriteBufferSize = GetStreamState::WriteBufferSize; + if (State->TotalReceived + SizeBytes > State->ExpectedSize) + { + RecordGetStreamFailure( + *State, + fmt::format("S3 GET overflow: got {} bytes past expected {}", State->TotalReceived + SizeBytes, State->ExpectedSize)); + return false; + } + const uint8_t* Cur = Data; + size_t Remaining = SizeBytes; + while (Remaining > 0) + { + if (!State->ActiveBuf) + { + State->ActiveBuf = State->Buffers.Acquire(); + } + const size_t Avail = WriteBufferSize - State->BufFill; + const size_t Take = std::min(Remaining, Avail); + std::memcpy(State->ActiveBuf.MutableData<uint8_t>() + State->BufFill, Cur, Take); + State->BufFill += Take; + State->TotalReceived += Take; + Cur += Take; + Remaining -= Take; + if (State->BufFill == WriteBufferSize) + { + IoBuffer Buf = std::move(State->ActiveBuf); + const size_t Fill = State->BufFill; + const uint64_t AbsOffset = State->NextAbsOffset; + State->NextAbsOffset += Fill; + State->BufFill = 0; + State->PendingWork.fetch_add(1, std::memory_order_acq_rel); + try + { + State->Work.ScheduleWork( + State->Pool, + [this, State, Buf = std::move(Buf), Fill, AbsOffset](std::atomic<bool>&) mutable { + try + { + State->DestFile->Write(Buf.GetData(), Fill, AbsOffset); + } + catch (const std::exception& Ex) + { + RecordGetStreamFailure(*State, fmt::format("S3 GET write failed: {}", Ex.what())); + } + State->Buffers.Release(std::move(Buf)); + if (State->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1) + { + OnGetStreamFinalised(State); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + // ScheduleWork failed before the worker took ownership of the + // PendingWork increment we did above; balance it here and + // abort the transfer so OnComplete tears down cleanly. Buf is + // gone (moved into the lambda then destroyed when ScheduleWork + // threw); restore the pool's acquire slot so OnComplete's + // tail-flush can still Acquire if needed. + RecordGetStreamFailure(*State, fmt::format("S3 GET schedule failed: {}", Ex.what())); + State->Buffers.RestoreAcquireSlot(); + State->PendingWork.fetch_sub(1, std::memory_order_acq_rel); + return false; + } + } + } + return true; + }, + [this, State, Timer, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { + State->Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? State->TotalReceived : 0); + bool Failed = !Resp.IsSuccess(); + if (Failed) + { + RecordGetStreamFailure(*State, S3ErrorMessage("S3 GET failed", Resp)); + } + else if (State->TotalReceived != State->ExpectedSize) + { + RecordGetStreamFailure(*State, + fmt::format("S3 GET wrote {} bytes, expected {}", State->TotalReceived, State->ExpectedSize)); + Failed = true; + } + if (!Failed && State->BufFill > 0) + { + IoBuffer Buf = std::move(State->ActiveBuf); + const size_t Fill = State->BufFill; + const uint64_t AbsOffset = State->NextAbsOffset; + State->BufFill = 0; + State->PendingWork.fetch_add(1, std::memory_order_acq_rel); + try + { + State->Work.ScheduleWork( + State->Pool, + [this, State, Buf = std::move(Buf), Fill, AbsOffset](std::atomic<bool>&) mutable { + try + { + State->DestFile->Write(Buf.GetData(), Fill, AbsOffset); + } + catch (const std::exception& Ex) + { + RecordGetStreamFailure(*State, fmt::format("S3 GET write failed: {}", Ex.what())); + } + State->Buffers.Release(std::move(Buf)); + if (State->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1) + { + OnGetStreamFinalised(State); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + RecordGetStreamFailure(*State, fmt::format("S3 GET tail-flush schedule failed: {}", Ex.what())); + State->Buffers.RestoreAcquireSlot(); + State->PendingWork.fetch_sub(1, std::memory_order_acq_rel); + } + } + // PendingWork = 1 (stream slot, set in ctor) + N (per-buffer worker + // dispatched in OnData) + (optional 1 for the tail-flush dispatched + // just above). The fetch_sub returning 1 means we observed the value + // that was 1 right before our decrement - i.e. we are the last + // participant. Either this branch (no tail flush, no in-flight + // workers) or the last per-buffer worker performs the finalise. Both + // paths use acq_rel so the work-item writes are visible to whichever + // thread runs OnGetStreamFinalised. + if (State->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1) + { + OnGetStreamFinalised(State); + } + }, + Headers); + InFlightGuard.Dismiss(); + FailGuard.Dismiss(); +} + +void +S3AsyncStorage::OnGetStreamFinalised(std::shared_ptr<GetStreamState> State) +{ + // All disk I/O (close + unlink on failure) is hopped into the worker pool. + // The last PendingWork dec can land on the curl io strand, and inline + // disk syscalls there would block the curl_multi poll loop. + const bool Failed = State->Failed.load(std::memory_order_acquire); + try + { + State->Work.ScheduleWork( + State->Pool, + [State, Failed](std::atomic<bool>&) { + if (Failed) + { + std::string Err; + State->ErrorLock.WithExclusiveLock([&] { Err = std::move(State->FirstError); }); + State->DestFile.reset(); + std::error_code Ec; + std::filesystem::remove(State->DestPath, Ec); + State->Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': {}"sv, State->ContentHash, Err))); + return; + } + + // No Flush: consumer reads via page cache; crash recovery re-hydrates. + State->DestFile.reset(); + State->Token->Complete(); + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (...) + { + // Schedule failed; release file handle inline and surface via Token. + State->DestFile.reset(); + std::error_code Ec; + std::filesystem::remove(State->DestPath, Ec); + State->Token->Fail(std::current_exception()); + } +} + +void +S3AsyncStorage::GetMultipart(ParallelWork& Work, + WorkerThreadPool& Pool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath, + S3AsyncStorageStats& Stats) +{ + auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal()); + Stats.RecordScheduled(); + + const uint64_t Chunk = m_MultipartChunkSize; + const uint32_t RangeCount = static_cast<uint32_t>((Size + Chunk - 1) / Chunk); + uint32_t TotalBlocks = 0; + for (uint32_t I = 0; I < RangeCount; ++I) + { + const uint64_t RS = std::min<uint64_t>(Chunk, Size - static_cast<uint64_t>(I) * Chunk); + TotalBlocks += static_cast<uint32_t>((RS + GetMultipartState::WriteBufferSize - 1) / GetMultipartState::WriteBufferSize); + } + + auto State = std::make_shared<GetMultipartState>(Work, Pool, Stats, TotalBlocks); + State->Token = Token; + State->ContentHash = Hash; + State->DestPath = DestinationPath; + State->TotalRanges = RangeCount; + State->PendingRanges = RangeCount; + + // RangeCount > AdmissionCap is fine: the per-range acquire below blocks the + // dispatcher (caller thread, NOT the io strand) until in-flight ranges fire + // their AsyncStream completions and release slots. In-flight ranges per + // upload stay bounded by AdmissionCap. + + try + { + // Sparse + preallocated. Sparse mode lets per-range writes land at + // arbitrary offsets without the OS zero-filling intervening pages + // first; preallocating the full size up front avoids fragmentation + // and lazy-commit page faults during the writes themselves. + State->DestFile = std::make_shared<BasicFile>(DestinationPath, BasicFile::Mode::kTruncate); + std::error_code PrepareEc = PrepareFileForScatteredWrite(State->DestFile->Handle(), Size); + if (PrepareEc) + { + throw zen::runtime_error("PrepareFileForScatteredWrite failed: {}"sv, PrepareEc.message()); + } + } + catch (const std::exception& Ex) + { + // Drop DestFile (if opened) before remove() so Windows lets the unlink + // proceed; otherwise we leave a 0-byte file behind on the prealloc path. + State->DestFile.reset(); + std::error_code Ec; + std::filesystem::remove(DestinationPath, Ec); + Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::GetMultipart '{}': preallocate failed: {}"sv, Hash, Ex.what()))); + return; + } + + std::string Path = CasPath(Hash); + + // Snapshot creds once for the whole multipart Get. Mirrors PutMultipart; + // avoids N shared-lock acquisitions on the credential provider when the + // range count is large. + const SigV4Credentials Creds = m_GetCreds(); + if (Creds.AccessKeyId.empty()) + { + // Empty creds are loop-invariant; drive PendingRanges to zero in one shot + // rather than acquiring TotalRanges admission slots only to fire identical + // failures. RecordGetPartFailure CAS keeps the first message; subsequent + // completions just decrement the counter. + RecordGetPartFailure(*State, "no credentials available"); + for (uint32_t I = 0; I < State->TotalRanges; ++I) + { + OnGetPartCompleted(State); + } + return; + } + + for (uint32_t I = 0; I < State->TotalRanges; ++I) + { + const uint64_t Offset = static_cast<uint64_t>(I) * Chunk; + const uint64_t RangeSize = std::min<uint64_t>(Chunk, Size - Offset); + const uint32_t RangeIdx = I; + + try + { + // Per-range admission slot. Acquire blocks the caller thread when the + // cap is reached; in-flight ranges release slots via SlotRef destruct + // in their AsyncStream completion callback. Acquire on the caller + // (vs the io strand) is safe to block: GetMultipart is called from + // the provision-pool worker driving Storage::Get, never from the + // curl io thread that has to drain completions. + std::shared_ptr<void> SlotRef = AcquireAdmissionSlot(m_Admission, &Stats); + + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "GET", Path, "", S3EmptyPayloadHash); + Headers->emplace("Range", fmt::format("bytes={}-{}", Offset, Offset + RangeSize - 1)); + + Stopwatch Timer = State->Stats.BeginRequest(); + // Pair Begin with End so an alloc throw inside AsyncStream does not + // strand InFlight elevated. + auto InFlightGuard = MakeGuard([&State] { State->Stats.EndRequest(0, 0); }); + + // Per-range mirror of GetStreamState's buffered-write machinery (see + // the medium-tier branch in Get()). + constexpr size_t WriteBufferSize = GetMultipartState::WriteBufferSize; + + struct RangeWriteState + { + IoBuffer ActiveBuf; + size_t BufFill = 0; + uint64_t NextAbsOffset; + uint64_t TotalReceived = 0; + std::atomic<uint32_t> PendingWork{1}; // +1 for stream completion + }; + + auto WState = std::make_shared<RangeWriteState>(); + WState->NextAbsOffset = Offset; + + m_Client.AsyncStream( + Path, + [this, State, RangeSize, RangeIdx, WState](const uint8_t* Data, size_t SizeBytes, uint64_t /*TotalSize*/) -> bool { + if (WState->TotalReceived + SizeBytes > RangeSize) + { + RecordGetPartFailure(*State, + fmt::format("S3 GET range {} overflow: got {} bytes past expected {}", + RangeIdx, + WState->TotalReceived + SizeBytes, + RangeSize)); + return false; + } + const uint8_t* Cur = Data; + size_t Remaining = SizeBytes; + while (Remaining > 0) + { + if (!WState->ActiveBuf) + { + WState->ActiveBuf = State->Buffers.Acquire(); + } + const size_t Avail = WriteBufferSize - WState->BufFill; + const size_t Take = std::min(Remaining, Avail); + std::memcpy(WState->ActiveBuf.MutableData<uint8_t>() + WState->BufFill, Cur, Take); + WState->BufFill += Take; + WState->TotalReceived += Take; + Cur += Take; + Remaining -= Take; + if (WState->BufFill == WriteBufferSize) + { + IoBuffer Buf = std::move(WState->ActiveBuf); + const size_t Fill = WState->BufFill; + const uint64_t AbsOffset = WState->NextAbsOffset; + WState->NextAbsOffset += Fill; + WState->BufFill = 0; + WState->PendingWork.fetch_add(1, std::memory_order_acq_rel); + try + { + State->Work.ScheduleWork( + State->Pool, + [this, State, RangeIdx, WState, Buf = std::move(Buf), Fill, AbsOffset](std::atomic<bool>&) mutable { + try + { + State->DestFile->Write(Buf.GetData(), Fill, AbsOffset); + } + catch (const std::exception& Ex) + { + RecordGetPartFailure(*State, + fmt::format("S3 GET range {} write failed: {}", RangeIdx, Ex.what())); + } + State->Buffers.Release(std::move(Buf)); + if (WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1) + { + OnGetPartCompleted(State); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + RecordGetPartFailure(*State, fmt::format("S3 GET range {} schedule failed: {}", RangeIdx, Ex.what())); + State->Buffers.RestoreAcquireSlot(); + WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel); + return false; + } + } + } + return true; + }, + [this, State, RangeSize, RangeIdx, WState, Timer, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { + State->Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? WState->TotalReceived : 0); + bool Failed = !Resp.IsSuccess(); + if (Failed) + { + RecordGetPartFailure(*State, S3ErrorMessage(fmt::format("S3 GET range {} failed", RangeIdx), Resp)); + } + else if (WState->TotalReceived != RangeSize) + { + RecordGetPartFailure( + *State, + fmt::format("S3 GET range {} wrote {} bytes, expected {}", RangeIdx, WState->TotalReceived, RangeSize)); + Failed = true; + } + if (!Failed && WState->BufFill > 0) + { + IoBuffer Buf = std::move(WState->ActiveBuf); + const size_t Fill = WState->BufFill; + const uint64_t AbsOffset = WState->NextAbsOffset; + WState->BufFill = 0; + WState->PendingWork.fetch_add(1, std::memory_order_acq_rel); + try + { + State->Work.ScheduleWork( + State->Pool, + [this, State, RangeIdx, WState, Buf = std::move(Buf), Fill, AbsOffset](std::atomic<bool>&) mutable { + try + { + State->DestFile->Write(Buf.GetData(), Fill, AbsOffset); + } + catch (const std::exception& Ex) + { + RecordGetPartFailure(*State, fmt::format("S3 GET range {} write failed: {}", RangeIdx, Ex.what())); + } + State->Buffers.Release(std::move(Buf)); + if (WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1) + { + OnGetPartCompleted(State); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& Ex) + { + RecordGetPartFailure(*State, + fmt::format("S3 GET range {} tail-flush schedule failed: {}", RangeIdx, Ex.what())); + State->Buffers.RestoreAcquireSlot(); + WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel); + } + } + if (WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1) + { + OnGetPartCompleted(State); + } + }, + Headers); + InFlightGuard.Dismiss(); + } + catch (const std::exception& Ex) + { + RecordGetPartFailure(*State, fmt::format("S3 GET range {} dispatch failed: {}", RangeIdx, Ex.what())); + OnGetPartCompleted(State); + } + } +} + +void +S3AsyncStorage::OnGetPartCompleted(std::shared_ptr<GetMultipartState> State) +{ + const uint32_t Remaining = State->PendingRanges.fetch_sub(1, std::memory_order_acq_rel); + if (Remaining != 1) + { + return; + } + + // All disk I/O hopped to the worker pool: see note in + // OnGetStreamFinalised. + const bool Failed = State->AnyFailed.load(std::memory_order_acquire); + try + { + State->Work.ScheduleWork( + State->Pool, + [State, Failed](std::atomic<bool>&) { + if (Failed) + { + std::string FirstError; + State->ErrorLock.WithExclusiveLock([&] { FirstError = std::move(State->FirstError); }); + State->DestFile.reset(); + std::error_code Ec; + std::filesystem::remove(State->DestPath, Ec); + State->Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': {}"sv, State->ContentHash, FirstError))); + return; + } + + State->DestFile.reset(); + State->Token->Complete(); + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (...) + { + State->DestFile.reset(); + std::error_code Ec; + std::filesystem::remove(State->DestPath, Ec); + State->Token->Fail(std::current_exception()); + } +} + +void +S3AsyncStorage::Touch(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, S3AsyncStorageStats& Stats) +{ + auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal()); + Stats.RecordScheduled(); + + std::shared_ptr<void> SlotRef = AcquireAdmissionSlot(m_Admission, &Stats); + + // Snapshot creds on the dispatcher; see PutSmall. + SigV4Credentials Creds = m_GetCreds(); + + Work.ScheduleWork( + Pool, + [this, Hash = IoHash(Hash), Token, Stats, Creds = std::move(Creds), SlotRef = std::move(SlotRef)]( + std::atomic<bool>& Abort) mutable { + if (Abort.load()) + { + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Touch '{}': aborted"sv, Hash))); + return; + } + try + { + if (Creds.AccessKeyId.empty()) + { + Token->Fail( + std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Touch '{}': no credentials available"sv, Hash))); + return; + } + + std::string Key = CasKey(Hash); + std::string Path = CasPath(Hash); + + std::vector<std::pair<std::string, std::string>> ExtraSigned{ + {"x-amz-copy-source", fmt::format("/{}/{}", m_Builder.BucketName(), AwsUriEncode(Key, false))}, + {"x-amz-metadata-directive", "REPLACE"}, + }; + + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "PUT", Path, "", S3EmptyPayloadHash, ExtraSigned); + + Stopwatch Timer = Stats.BeginRequest(); + // Pair Begin with End so an alloc throw inside AsyncPut does not strand + // InFlight elevated. + auto InFlightGuard = MakeGuard([&Stats] { Stats.EndRequest(0, 0); }); + m_Client.AsyncPut( + Path, + [Hash, Token, Timer, Stats, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { + Stats.EndRequest(Timer.GetElapsedTimeUs(), 0); + if (!Resp.IsSuccess()) + { + std::string Err = S3ErrorMessage("S3 Touch failed", Resp); + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Touch '{}': {}"sv, Hash, Err))); + return; + } + // PUT-COPY (REPLACE) can return HTTP 200 with an <Error> body. Mirror + // the sync S3Client::Touch check; without this the dehydrate-touch + // fails silently. + std::string_view Body = Resp.AsText(); + std::string_view ErrorCode; + std::string_view ErrorMessage; + if (S3ExtractError(Body, ErrorCode, ErrorMessage)) + { + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Touch '{}': returned error: {} - {}"sv, + Hash, + ErrorCode, + ErrorMessage))); + return; + } + Token->Complete(); + }, + Headers); + InFlightGuard.Dismiss(); + } + catch (...) + { + Token->Fail(std::current_exception()); + } + }); +} + +std::vector<std::string> +S3AsyncStorage::ListAllObjects(std::string_view Prefix) +{ + constexpr std::string_view ContentsCloseTag = "</Contents>"; + + // List requests are not counted in PhaseStats; List/DeleteAll is admin + // scaffolding (test fixture teardown, manual obliterate). Mirrors the + // CreateMultipart/Complete/Abort exclusion in PutMultipart. + + // Snapshot creds once for the listing run; ListObjectsV2 pagination is + // fast enough that mid-list refresh isn't needed. + const SigV4Credentials Creds = m_GetCreds(); + if (Creds.AccessKeyId.empty()) + { + throw zen::runtime_error("S3AsyncStorage::ListAllObjects: no credentials available"sv); + } + + std::vector<std::string> Keys; + std::string ContinuationToken; + std::string RootPath = m_Builder.BucketRootPath(); + while (true) + { + std::string CanonicalQs = fmt::format("list-type=2&prefix={}", AwsUriEncode(Prefix)); + if (!ContinuationToken.empty()) + { + CanonicalQs += fmt::format("&continuation-token={}", AwsUriEncode(ContinuationToken)); + } + + HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "GET", RootPath, CanonicalQs, S3EmptyPayloadHash); + + std::string FullPath = S3BuildRequestPath(RootPath, CanonicalQs); + HttpClient::Response Resp = m_Client.Get(FullPath, Headers).get(); + if (!Resp.IsSuccess()) + { + throw zen::runtime_error("S3AsyncStorage::ListAllObjects '{}': {}"sv, Prefix, S3ErrorMessage("S3 LIST failed", Resp)); + } + + if (!Resp.ResponsePayload) + { + break; + } + std::string_view Body(reinterpret_cast<const char*>(Resp.ResponsePayload.GetData()), Resp.ResponsePayload.GetSize()); + + std::string_view Cursor = Body; + while (true) + { + size_t ContentsOpen = Cursor.find("<Contents>"); + if (ContentsOpen == std::string_view::npos) + { + break; + } + size_t ContentsClose = Cursor.find(ContentsCloseTag, ContentsOpen); + if (ContentsClose == std::string_view::npos) + { + break; + } + std::string_view ContentsBlock = Cursor.substr(ContentsOpen, ContentsClose - ContentsOpen); + std::string_view Key = S3ExtractXmlValue(ContentsBlock, "Key"); + Keys.emplace_back(Key); + Cursor = Cursor.substr(ContentsClose + ContentsCloseTag.size()); + } + + std::string_view IsTruncated = S3ExtractXmlValue(Body, "IsTruncated"); + if (IsTruncated != "true") + { + break; + } + std::string_view NextToken = S3ExtractXmlValue(Body, "NextContinuationToken"); + if (NextToken.empty()) + { + break; + } + ContinuationToken.assign(NextToken); + } + return Keys; +} + +std::vector<IoHash> +S3AsyncStorage::List() +{ + std::string CasPrefix = fmt::format("{}/cas/", m_KeyPrefix); + std::vector<std::string> Keys = ListAllObjects(CasPrefix); + + std::vector<IoHash> Hashes; + Hashes.reserve(Keys.size()); + for (const std::string& Key : Keys) + { + size_t LastSlash = Key.rfind('/'); + if (LastSlash == std::string::npos) + { + continue; + } + IoHash Hash; + if (IoHash::TryParse(std::string_view(Key).substr(LastSlash + 1), Hash)) + { + Hashes.push_back(Hash); + } + } + return Hashes; +} + +void +S3AsyncStorage::DeleteAll(ParallelWork& Work) +{ + std::string Prefix = fmt::format("{}/", m_KeyPrefix); + std::vector<std::string> Keys = ListAllObjects(Prefix); + + // Snapshot creds once for the whole obliterate; saves N shared-lock + // acquisitions on the credential provider. + const SigV4Credentials DelCreds = m_GetCreds(); + if (DelCreds.AccessKeyId.empty()) + { + auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal()); + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::DeleteAll: no credentials available"sv))); + return; + } + + for (const std::string& Key : Keys) + { + auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal()); + + try + { + std::string Path = m_Builder.KeyToPath(Key); + + HttpClient::KeyValueMap DelHeaders = m_Builder.SignRequest(DelCreds, "DELETE", Path, "", S3EmptyPayloadHash); + + // DeleteAll is untracked by stats - admission slot is acquired but the + // wait is not recorded; passes Stats == nullptr to the helper. + std::shared_ptr<void> SlotRef = AcquireAdmissionSlot(m_Admission); + + m_Client.AsyncDelete( + Path, + [KeyCopy = Key, Token, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable { + if (!Resp.IsSuccess() && Resp.StatusCode != HttpResponseCode::NotFound) + { + std::string Err = S3ErrorMessage("S3 DELETE failed", Resp); + Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::DeleteAll '{}': {}"sv, KeyCopy, Err))); + return; + } + Token->Complete(); + }, + DelHeaders); + } + catch (...) + { + // Sign / acquire / dispatch threw before AsyncDelete posted. Surface via Token so + // Wait() reports the partial failure instead of silently dropping the key. + Token->Fail(std::current_exception()); + } + } +} + +void +s3asyncstorage_forcelink() +{ +} + +#if ZEN_WITH_TESTS + +namespace { + // Per-binary unique MinIO port. + uint16_t AllocateMinioTestPort() + { + static const uint16_t Base = static_cast<uint16_t>(20000u + (static_cast<uint32_t>(GetCurrentProcessId()) % 30000u)); + static std::atomic<uint16_t> Slot{0}; + return Base + Slot.fetch_add(1, std::memory_order_relaxed); + } + + MinioProcessOptions MakeMinioOpts() + { + MinioProcessOptions Opts; + Opts.Port = AllocateMinioTestPort(); + return Opts; + } + + struct AsyncS3Fixture + { + MinioProcess Minio{MakeMinioOpts()}; + ScopedTemporaryDirectory TmpDir; + std::unique_ptr<AsyncHttpClient> ClientStorage; + std::unique_ptr<S3RequestBuilder> Builder; + AsyncHttpClient* Client = nullptr; + SigV4Credentials Creds; + WorkerThreadPool Pool{4}; + + AsyncS3Fixture() + { + Minio.SpawnMinioServer(); + Minio.CreateBucket("async-test"); + + Creds.AccessKeyId = "minioadmin"; + Creds.SecretAccessKey = "minioadmin"; + + Builder = std::make_unique<S3RequestBuilder>("us-east-1", "async-test", Minio.Endpoint(), /*PathStyle*/ true); + + HttpClientSettings Settings; + Settings.LogCategory = "async-s3-test"; + Settings.MaxConcurrentConnectionsPerHost = 16; + // Match production S3Hydration: cover MinIO post-spawn 503 + // (XMinioServerNotInitialized) transients before storage backend + // finishes warming up after the spawn ready-check passes. + Settings.RetryCount = 3; + ClientStorage = std::make_unique<AsyncHttpClient>(Minio.Endpoint(), Settings); + Client = ClientStorage.get(); + } + }; + + std::filesystem::path WriteBlob(const std::filesystem::path& Path, const std::vector<uint8_t>& Bytes) + { + WriteFile(Path, IoBuffer(IoBuffer::Wrap, Bytes.data(), Bytes.size())); + return Path; + } + + struct StatsBlock + { + std::atomic<uint64_t> RequestCount{0}; + std::atomic<uint64_t> RequestTotalUs{0}; + std::atomic<uint64_t> RequestMaxUs{0}; + std::atomic<uint64_t> Bytes{0}; + std::atomic<uint32_t> InFlight{0}; + std::atomic<uint32_t> InFlightPeak{0}; + std::atomic<uint64_t> FirstScheduleUs{UINT64_MAX}; + std::atomic<uint64_t> FirstStartUs{UINT64_MAX}; + std::atomic<uint64_t> AdmissionWaitTotalUs{0}; + std::atomic<uint64_t> AdmissionWaitMaxUs{0}; + Stopwatch PhaseClock; + + S3AsyncStorageStats Ref() + { + return S3AsyncStorageStats{RequestCount, + RequestTotalUs, + RequestMaxUs, + Bytes, + InFlight, + InFlightPeak, + FirstScheduleUs, + FirstStartUs, + AdmissionWaitTotalUs, + AdmissionWaitMaxUs, + PhaseClock}; + } + }; +} // namespace + +TEST_SUITE_BEGIN("server.s3asyncstorage"); + +TEST_CASE("s3asyncstorage.put_get_round_trip") +{ + StatsBlock Sb; + AsyncS3Fixture F; + S3AsyncStorage Storage( + *F.Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-A", + 8u * 1024u * 1024u); + + const uint64_t Size = 64u * 1024u; + std::vector<uint8_t> Bytes(Size); + for (size_t I = 0; I < Size; ++I) + { + Bytes[I] = static_cast<uint8_t>((I * 31u) & 0xFF); + } + + std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "src.bin", Bytes); + IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); + + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Put(Work, F.Pool, ContentHash, Size, SrcPath, Stats); + Work.Wait(); + } + + std::filesystem::path DstPath = F.TmpDir.Path() / "dst.bin"; + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Get(Work, F.Pool, ContentHash, Size, DstPath, Stats); + Work.Wait(); + } + + REQUIRE(std::filesystem::exists(DstPath)); + REQUIRE(std::filesystem::file_size(DstPath) == Size); + + BasicFile Out(DstPath, BasicFile::Mode::kRead); + IoBuffer Read = Out.ReadAll(); + IoHash ReadHash = IoHash::HashBuffer(Read); + CHECK(ReadHash == ContentHash); +} + +TEST_CASE("s3asyncstorage.touch_existing_object") +{ + StatsBlock Sb; + AsyncS3Fixture F; + S3AsyncStorage Storage( + *F.Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-touch", + 8u * 1024u * 1024u); + + std::vector<uint8_t> Bytes{1, 2, 3, 4, 5}; + std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "touch.bin", Bytes); + IoHash Hash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); + + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Put(Work, F.Pool, Hash, Bytes.size(), SrcPath, Stats); + Work.Wait(); + } + + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Touch(Work, F.Pool, Hash, Stats); + Work.Wait(); + } +} + +TEST_CASE("s3asyncstorage.list_returns_uploaded_hashes") +{ + StatsBlock Sb; + AsyncS3Fixture F; + S3AsyncStorage Storage( + *F.Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-list", + 8u * 1024u * 1024u); + + const size_t N = 5; + std::vector<IoHash> Hashes; + std::vector<std::filesystem::path> SrcPaths; + for (size_t I = 0; I < N; ++I) + { + std::vector<uint8_t> Bytes(64, static_cast<uint8_t>(I + 1)); + std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("blob_{}.bin", I), Bytes); + Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size())); + SrcPaths.push_back(P); + } + + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + for (size_t I = 0; I < N; ++I) + { + Storage.Put(Work, F.Pool, Hashes[I], 64, SrcPaths[I], Stats); + } + Work.Wait(); + } + + std::vector<IoHash> Listed = Storage.List(); + std::sort(Listed.begin(), Listed.end()); + std::vector<IoHash> Expected = Hashes; + std::sort(Expected.begin(), Expected.end()); + CHECK(Listed == Expected); +} + +TEST_CASE("s3asyncstorage.streaming_download_round_trip") +{ + // Object size sits between the streaming threshold (512 KiB) and the + // multipart threshold (PartSize + PartSize/4). Exercises the medium-tier + // AsyncStream branch in S3AsyncStorage::Get - body streams via a 512 KiB + // IoBuffer pool with per-buffer worker write hops. + StatsBlock Sb; + AsyncS3Fixture F; + S3AsyncStorage Storage( + *F.Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-stream", + 8u * 1024u * 1024u); + + const uint64_t Size = 2u * 1024u * 1024u; // 2 MiB - between 512 KiB and 10 MiB + std::vector<uint8_t> Bytes(Size); + for (size_t I = 0; I < Size; ++I) + { + Bytes[I] = static_cast<uint8_t>((I * 53u + 11u) & 0xFF); + } + std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "stream.bin", Bytes); + IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); + + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Put(Work, F.Pool, ContentHash, Size, SrcPath, Stats); + Work.Wait(); + } + + std::filesystem::path DstPath = F.TmpDir.Path() / "stream_out.bin"; + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Get(Work, F.Pool, ContentHash, Size, DstPath, Stats); + Work.Wait(); + } + + REQUIRE(std::filesystem::exists(DstPath)); + REQUIRE(std::filesystem::file_size(DstPath) == Size); + BasicFile Out(DstPath, BasicFile::Mode::kRead); + IoBuffer Read = Out.ReadAll(); + IoHash ReadHash = IoHash::HashBuffer(Read); + CHECK(ReadHash == ContentHash); +} + +TEST_CASE("s3asyncstorage.multipart_round_trip") +{ + StatsBlock Sb; + AsyncS3Fixture F; + const uint64_t PartSize = 5u * 1024u * 1024u; // MinIO accepts >=5MiB parts. + S3AsyncStorage Storage( + *F.Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-multipart", + PartSize); + + // Three parts: 5 MiB + 5 MiB + 1 MiB. Threshold is PartSize + PartSize/4 (= + // 6.25 MiB). Total 11 MiB triggers multipart path. + const uint64_t TotalSize = 11u * 1024u * 1024u; + std::vector<uint8_t> Bytes(TotalSize); + for (size_t I = 0; I < TotalSize; ++I) + { + Bytes[I] = static_cast<uint8_t>((I * 131u + 7u) & 0xFF); + } + + std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "big.bin", Bytes); + IoHash Hash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); + + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Put(Work, F.Pool, Hash, TotalSize, SrcPath, Stats); + Work.Wait(); + } + + std::filesystem::path DstPath = F.TmpDir.Path() / "big_out.bin"; + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Get(Work, F.Pool, Hash, TotalSize, DstPath, Stats); + Work.Wait(); + } + + REQUIRE(std::filesystem::exists(DstPath)); + REQUIRE(std::filesystem::file_size(DstPath) == TotalSize); + + BasicFile Out(DstPath, BasicFile::Mode::kRead); + IoBuffer Read = Out.ReadAll(); + IoHash ReadHash = IoHash::HashBuffer(Read); + CHECK(ReadHash == Hash); +} + +TEST_CASE("s3asyncstorage.parallel_puts") +{ + StatsBlock Sb; + AsyncS3Fixture F; + S3AsyncStorage Storage( + *F.Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-parallel", + 8u * 1024u * 1024u); + + const size_t N = 32; + std::vector<IoHash> Hashes; + std::vector<std::filesystem::path> SrcPaths; + for (size_t I = 0; I < N; ++I) + { + std::vector<uint8_t> Bytes(1024, static_cast<uint8_t>(I)); + std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("p_{}.bin", I), Bytes); + Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size())); + SrcPaths.push_back(P); + } + + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + for (size_t I = 0; I < N; ++I) + { + Storage.Put(Work, F.Pool, Hashes[I], 1024, SrcPaths[I], Stats); + } + Work.Wait(); + + CHECK(Storage.List().size() == N); +} + +namespace { + // One MinIO + builder + creds + pool, reused across multiple admission / + // throttle test scopes. Each scope constructs its own AsyncHttpClient via + // MakeClient(Cap) so different MaxConcurrentRequests values are exercised + // without re-spawning MinIO. + struct ThrottledAsyncS3Fixture + { + MinioProcess Minio{MakeMinioOpts()}; + ScopedTemporaryDirectory TmpDir; + std::unique_ptr<S3RequestBuilder> Builder; + SigV4Credentials Creds; + WorkerThreadPool Pool{4}; + + ThrottledAsyncS3Fixture() + { + Minio.SpawnMinioServer(); + Minio.CreateBucket("async-test-throttle"); + + Creds.AccessKeyId = "minioadmin"; + Creds.SecretAccessKey = "minioadmin"; + + Builder = std::make_unique<S3RequestBuilder>("us-east-1", "async-test-throttle", Minio.Endpoint(), /*PathStyle*/ true); + } + + std::unique_ptr<AsyncHttpClient> MakeClient(uint32_t MaxConcurrentRequests) + { + HttpClientSettings Settings; + Settings.LogCategory = "async-s3-throttle-test"; + Settings.MaxConcurrentConnectionsPerHost = 16; + Settings.MaxConcurrentRequests = MaxConcurrentRequests; + // Match production S3Hydration: cover MinIO post-spawn 503 + // (XMinioServerNotInitialized) transients before storage backend + // finishes warming up after the spawn ready-check passes. + Settings.RetryCount = 3; + return std::make_unique<AsyncHttpClient>(Minio.Endpoint(), Settings); + } + }; +} // namespace + +// Non-multipart admission scenarios: small-file fanout under a cap, fanout +// with admission disabled, and admission-wait stat recording. All share one +// MinIO instance via ThrottledAsyncS3Fixture; each scope picks its own Cap +// and KeyPrefix so the bucket entries don't collide across scopes. +TEST_CASE("s3asyncstorage.admission.fanout") +{ + ThrottledAsyncS3Fixture F; + + // respects.async.client.cap - in-flight peak <= storage admission cap. + { + StatsBlock Sb; + const uint32_t Cap = 2; + auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap)); + auto Client = F.MakeClient(Cap); + S3AsyncStorage Storage( + *Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-cap", + 8u * 1024u * 1024u, + Admission, + Cap); + + const size_t N = 8; + std::vector<IoHash> Hashes; + std::vector<std::filesystem::path> SrcPaths; + for (size_t I = 0; I < N; ++I) + { + std::vector<uint8_t> Bytes(256, static_cast<uint8_t>(I)); + std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("c_{}.bin", I), Bytes); + Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size())); + SrcPaths.push_back(P); + } + + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + for (size_t I = 0; I < N; ++I) + { + Storage.Put(Work, F.Pool, Hashes[I], 256, SrcPaths[I], Stats); + } + Work.Wait(); + + std::vector<IoHash> Listed = Storage.List(); + std::sort(Listed.begin(), Listed.end()); + std::vector<IoHash> Expected = Hashes; + std::sort(Expected.begin(), Expected.end()); + CHECK(Listed == Expected); + CHECK(Sb.InFlightPeak.load() <= 2u); + } + + // unlimited_concurrent_requests - admission disabled (nullptr semaphore, + // Cap=0). Storage drains the fanout cleanly without gating. + { + StatsBlock Sb; + auto Client = F.MakeClient(0); + S3AsyncStorage Storage( + *Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-uncapped", + 8u * 1024u * 1024u); + + const size_t N = 8; + std::vector<IoHash> Hashes; + std::vector<std::filesystem::path> SrcPaths; + for (size_t I = 0; I < N; ++I) + { + std::vector<uint8_t> Bytes(256, static_cast<uint8_t>(I)); + std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("u_{}.bin", I), Bytes); + Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size())); + SrcPaths.push_back(P); + } + + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + for (size_t I = 0; I < N; ++I) + { + Storage.Put(Work, F.Pool, Hashes[I], 256, SrcPaths[I], Stats); + } + Work.Wait(); + + std::vector<IoHash> Listed = Storage.List(); + std::sort(Listed.begin(), Listed.end()); + std::vector<IoHash> Expected = Hashes; + std::sort(Expected.begin(), Expected.end()); + CHECK(Listed == Expected); + } + + // admission.wait.us.recorded - cap=2 with 8 fanout submits forces at least + // some submissions to block on the admission semaphore. AdmissionWait + // totals must be non-zero. + { + StatsBlock Sb; + const uint32_t Cap = 2; + auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap)); + auto Client = F.MakeClient(Cap); + S3AsyncStorage Storage( + *Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-adm-wait", + 8u * 1024u * 1024u, + Admission, + Cap); + + const size_t N = 8; + std::vector<IoHash> Hashes; + std::vector<std::filesystem::path> SrcPaths; + for (size_t I = 0; I < N; ++I) + { + std::vector<uint8_t> Bytes(256, static_cast<uint8_t>(I)); + std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("aw_{}.bin", I), Bytes); + Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size())); + SrcPaths.push_back(P); + } + + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + for (size_t I = 0; I < N; ++I) + { + Storage.Put(Work, F.Pool, Hashes[I], 256, SrcPaths[I], Stats); + } + Work.Wait(); + + CHECK(Sb.AdmissionWaitTotalUs.load() > 0u); + CHECK(Sb.AdmissionWaitMaxUs.load() > 0u); + CHECK(Sb.InFlightPeak.load() <= Cap); + } +} + +// Drives the AbortMultipart path: a 3-part multipart upload where one part is +// forced to fail via the test hook. AnyFailed flips, FinalizePutPart hops +// off-strand once PendingParts reaches 1, AbortMultipart sends DELETE to S3 +// to discard the upload-id. +// Token must surface the original part failure; bucket must not contain the +// uploaded CAS object after Wait returns. +TEST_CASE("s3asyncstorage.multipart.abort_on_part_failure") +{ + StatsBlock Sb; + AsyncS3Fixture F; + const uint64_t PartSize = 5u * 1024u * 1024u; + S3AsyncStorage Storage( + *F.Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-multipart-abort", + PartSize); + + const uint64_t TotalSize = 11u * 1024u * 1024u; // 3 parts: 5+5+1 MiB + std::vector<uint8_t> Bytes(TotalSize); + for (size_t I = 0; I < TotalSize; ++I) + { + Bytes[I] = static_cast<uint8_t>((I * 23u + 9u) & 0xFF); + } + std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "abort.bin", Bytes); + IoHash Hash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); + + // Force one part to fail; AnyFailed -> AbortMultipart. + s3asyncstorage_test_hooks::ForceNextPartFailures(1); + + bool ThrewFromWait = false; + std::string ErrMessage; + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Put(Work, F.Pool, Hash, TotalSize, SrcPath, Stats); + try + { + Work.Wait(); + } + catch (const std::exception& Ex) + { + ThrewFromWait = true; + ErrMessage = Ex.what(); + } + } + CHECK(ThrewFromWait); + CHECK(ErrMessage.find("test-injected part failure") != std::string::npos); + + // Reset hook so subsequent multipart tests aren't affected. + s3asyncstorage_test_hooks::ForceNextPartFailures(0); + + // Bucket must not contain the CAS object - AbortMultipart discards the + // upload before S3 publishes the assembled object. + std::vector<IoHash> Listed = Storage.List(); + CHECK(std::find(Listed.begin(), Listed.end(), Hash) == Listed.end()); +} + +// Multipart admission scenarios sharing one MinIO instance. Each scope picks +// its own Cap, KeyPrefix, and payload pattern so bucket entries don't collide. +// Covers: +// - under.cap: TotalParts < cap (no slot handoff needed); round-trip Put+Get. +// - no.deadlock.streaming.get: Put + ranged Get on same hydration pool. +// - parts.exceed.cap.paces: TotalParts > cap; wave + slot-handoff completes +// successfully with InFlightPeak <= cap. +// - aborts.release.tokens: forced part failure releases held slots so a +// follow-up Put can run. +// - handoff.in.flight: cap=1 forces strictly sequential dispatch. +TEST_CASE("s3asyncstorage.admission.multipart") +{ + ThrottledAsyncS3Fixture F; + const uint64_t PartSize = 5u * 1024u * 1024u; // MinIO accepts >= 5 MiB parts. + + // under.cap: 3-part Put + Get sits within the initial wave (cap=4). + { + StatsBlock Sb; + const uint32_t Cap = 4; + auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap)); + auto Client = F.MakeClient(Cap); + S3AsyncStorage Storage( + *Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-multipart-cap", + PartSize, + Admission, + Cap); + + const uint64_t TotalSize = 11u * 1024u * 1024u; + std::vector<uint8_t> Bytes(TotalSize); + for (size_t I = 0; I < TotalSize; ++I) + { + Bytes[I] = static_cast<uint8_t>((I * 17u + 5u) & 0xFF); + } + std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp.bin", Bytes); + IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); + + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats); + Work.Wait(); + } + + std::filesystem::path DstPath = F.TmpDir.Path() / "mp_out.bin"; + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Get(Work, F.Pool, ContentHash, TotalSize, DstPath, Stats); + Work.Wait(); + } + + REQUIRE(std::filesystem::exists(DstPath)); + REQUIRE(std::filesystem::file_size(DstPath) == TotalSize); + BasicFile Out(DstPath, BasicFile::Mode::kRead); + IoBuffer Read = Out.ReadAll(); + IoHash ReadHash = IoHash::HashBuffer(Read); + CHECK(ReadHash == ContentHash); + } + + // no.deadlock.streaming.get: PutMultipart fans onto hydration pool while + // io strand fires AsyncPut completions; followed by Get exercising stream + // write-back on the same pool. + { + StatsBlock Sb; + const uint32_t Cap = 4; + auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap)); + auto Client = F.MakeClient(Cap); + S3AsyncStorage Storage( + *Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-stream-no-deadlock", + PartSize, + Admission, + Cap); + + const uint64_t TotalSize = 11u * 1024u * 1024u; + std::vector<uint8_t> Bytes(TotalSize); + for (size_t I = 0; I < TotalSize; ++I) + { + Bytes[I] = static_cast<uint8_t>((I * 41u + 7u) & 0xFF); + } + std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "sd.bin", Bytes); + IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); + + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats); + Work.Wait(); + } + + std::filesystem::path DstPath = F.TmpDir.Path() / "sd_out.bin"; + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Get(Work, F.Pool, ContentHash, TotalSize, DstPath, Stats); + Work.Wait(); + } + + REQUIRE(std::filesystem::exists(DstPath)); + REQUIRE(std::filesystem::file_size(DstPath) == TotalSize); + BasicFile Out(DstPath, BasicFile::Mode::kRead); + IoBuffer Read = Out.ReadAll(); + IoHash ReadHash = IoHash::HashBuffer(Read); + CHECK(ReadHash == ContentHash); + } + + // parts.exceed.cap.paces: TotalParts > cap completes by pacing via slot + // handoff. Initial wave dispatches Cap parts; each completion hands its + // slot to the next undispatched part. InFlightPeak stays bounded by Cap. + { + StatsBlock Sb; + const uint32_t Cap = 2; + auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap)); + auto Client = F.MakeClient(Cap); + S3AsyncStorage Storage( + *Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-mp-paces", + PartSize, + Admission, + Cap); + + const uint64_t TotalSize = 16u * 1024u * 1024u; // 4 parts > cap=2 + std::vector<uint8_t> Bytes(TotalSize, 0xA5); + std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp_paces.bin", Bytes); + IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); + + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats); + Work.Wait(); + } + CHECK(Sb.InFlightPeak.load() <= Cap); + std::vector<IoHash> Listed = Storage.List(); + CHECK(std::find(Listed.begin(), Listed.end(), ContentHash) != Listed.end()); + } + + // aborts.release.tokens: forced part failure mid-flight; drain releases + // admission slots so a follow-up small Put can run without blocking. + { + StatsBlock Sb; + const uint32_t Cap = 4; + auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap)); + auto Client = F.MakeClient(Cap); + S3AsyncStorage Storage( + *Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-mp-abort-release", + PartSize, + Admission, + Cap); + + const uint64_t TotalSize = 11u * 1024u * 1024u; // 3 parts under cap=4 + std::vector<uint8_t> Bytes(TotalSize, 0x5A); + std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp_abrel.bin", Bytes); + IoHash AbortHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); + + s3asyncstorage_test_hooks::ForceNextPartFailures(1); + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Put(Work, F.Pool, AbortHash, TotalSize, SrcPath, Stats); + try + { + Work.Wait(); + } + catch (...) + { + } + } + s3asyncstorage_test_hooks::ForceNextPartFailures(0); + + std::vector<uint8_t> Small(1024, 0x11); + std::filesystem::path SmallPath = WriteBlob(F.TmpDir.Path() / "mp_abrel_small.bin", Small); + IoHash SmallHash = IoHash::HashBuffer(Small.data(), Small.size()); + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Put(Work, F.Pool, SmallHash, Small.size(), SmallPath, Stats); + Work.Wait(); + } + std::vector<IoHash> Listed = Storage.List(); + CHECK(std::find(Listed.begin(), Listed.end(), SmallHash) != Listed.end()); + } + + // handoff.in.flight: cap=1 forces strictly sequential dispatch (initial + // wave size 1, every subsequent part dispatched via handoff from the prior + // AsyncPut completion). + { + StatsBlock Sb; + const uint32_t Cap = 1; + auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap)); + auto Client = F.MakeClient(Cap); + S3AsyncStorage Storage( + *Client, + *F.Builder, + [&F]() { return F.Creds; }, + "module-mp-handoff", + PartSize, + Admission, + Cap); + + const uint64_t TotalSize = 16u * 1024u * 1024u; // 4 parts; handoff fires 3 times + std::vector<uint8_t> Bytes(TotalSize, 0x33); + std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp_handoff.bin", Bytes); + IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); + + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + Storage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats); + Work.Wait(); + } + CHECK(Sb.InFlightPeak.load() == 1u); + std::vector<IoHash> Listed = Storage.List(); + CHECK(std::find(Listed.begin(), Listed.end(), ContentHash) != Listed.end()); + } + + // ranges.exceed.cap.paces: GetMultipart with RangeCount > AdmissionCap. + // The dispatcher loop's per-range AcquireAdmissionSlot blocks the caller + // thread until in-flight ranges fire AsyncStream completions and release + // slots. InFlightPeak per Get stays bounded by Cap. Upload first with a + // larger client cap so the Put itself doesn't pace, then re-open Storage + // with the small cap purely to exercise the Get pacing path. + { + StatsBlock Sb; + const uint32_t UploadCap = 8; + auto UploadAdmission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(UploadCap)); + auto UploadClient = F.MakeClient(UploadCap); + S3AsyncStorage UploadStorage( + *UploadClient, + *F.Builder, + [&F]() { return F.Creds; }, + "module-mp-get-paces", + PartSize, + UploadAdmission, + UploadCap); + + const uint64_t TotalSize = 16u * 1024u * 1024u; // 4 ranges with PartSize=5 MiB + std::vector<uint8_t> Bytes(TotalSize, 0x77); + std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp_get_paces.bin", Bytes); + IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size()); + + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = Sb.Ref(); + UploadStorage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats); + Work.Wait(); + } + + StatsBlock GetSb; + const uint32_t GetCap = 2; + auto GetAdmission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(GetCap)); + auto GetClient = F.MakeClient(GetCap); + S3AsyncStorage GetStorage( + *GetClient, + *F.Builder, + [&F]() { return F.Creds; }, + "module-mp-get-paces", + PartSize, + GetAdmission, + GetCap); + + std::filesystem::path DstPath = F.TmpDir.Path() / "mp_get_paces_out.bin"; + { + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + S3AsyncStorageStats Stats = GetSb.Ref(); + GetStorage.Get(Work, F.Pool, ContentHash, TotalSize, DstPath, Stats); + Work.Wait(); + } + CHECK(GetSb.InFlightPeak.load() <= GetCap); + REQUIRE(std::filesystem::exists(DstPath)); + REQUIRE(std::filesystem::file_size(DstPath) == TotalSize); + BasicFile Out(DstPath, BasicFile::Mode::kRead); + IoBuffer Read = Out.ReadAll(); + IoHash ReadHash = IoHash::HashBuffer(Read); + CHECK(ReadHash == ContentHash); + } +} + +TEST_SUITE_END(); + +#endif // ZEN_WITH_TESTS + +} // namespace zen |