aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/s3asyncstorage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/hub/s3asyncstorage.cpp')
-rw-r--r--src/zenserver/hub/s3asyncstorage.cpp2808
1 files changed, 2808 insertions, 0 deletions
diff --git a/src/zenserver/hub/s3asyncstorage.cpp b/src/zenserver/hub/s3asyncstorage.cpp
new file mode 100644
index 000000000..b8bbc55b2
--- /dev/null
+++ b/src/zenserver/hub/s3asyncstorage.cpp
@@ -0,0 +1,2808 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "s3asyncstorage.h"
+
+#include <zencore/basicfile.h>
+#include <zencore/except_fmt.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zenutil/cloud/s3response.h>
+
+#include <cstring>
+
+#if ZEN_WITH_TESTS
+# include <zencore/iohash.h>
+# include <zencore/process.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zenutil/cloud/minioprocess.h>
+#endif
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace {
+ // Bounded write-buffer pool. OnData callbacks on the io strand call
+ // Acquire to obtain a fixed-size IoBuffer; workers call Release after
+ // the dispatched write completes. The pool retains at most as many
+ // buffers as future Acquire calls remain (PendingAcquires), so it
+ // stabilises at peak write concurrency without dragging buffers past
+ // the point they can ever be re-used.
+ class WriteBufferPool
+ {
+ public:
+ WriteBufferPool(size_t InBufferSize, uint32_t TotalAcquires) : m_BufferSize(InBufferSize), m_PendingAcquires(TotalAcquires) {}
+
+ size_t BufferSize() const { return m_BufferSize; }
+
+ IoBuffer Acquire()
+ {
+ IoBuffer Buf;
+ bool Exhausted = false;
+ m_Lock.WithExclusiveLock([&] {
+ if (m_PendingAcquires == 0)
+ {
+ Exhausted = true;
+ return;
+ }
+ --m_PendingAcquires;
+ if (!m_Pool.empty())
+ {
+ Buf = std::move(m_Pool.back());
+ m_Pool.pop_back();
+ }
+ });
+ if (Exhausted)
+ {
+ // Caller (OnData / OnComplete tail flush on the curl io strand)
+ // over-acquired vs the precomputed TotalBlocks budget. Throw a
+ // real exception so the boundary catch in
+ // AsyncCurlStreamWriteCallback can surface it; never let
+ // ZEN_ASSERT propagate through curl's C frames.
+ throw zen::runtime_error("WriteBufferPool exhausted: more buffers requested than expected");
+ }
+ if (!Buf)
+ {
+ Buf = IoBuffer(m_BufferSize);
+ }
+ return Buf;
+ }
+
+ void Release(IoBuffer Buf)
+ {
+ m_Lock.WithExclusiveLock([&] {
+ if (m_Pool.size() < m_PendingAcquires)
+ {
+ m_Pool.push_back(std::move(Buf));
+ }
+ });
+ }
+
+ // Returns the slot consumed by Acquire() back to the budget without
+ // supplying a buffer; for paths where the buffer was already moved into a
+ // lambda (e.g. ScheduleWork threw after the move).
+ void RestoreAcquireSlot()
+ {
+ m_Lock.WithExclusiveLock([&] { ++m_PendingAcquires; });
+ }
+
+ private:
+ const size_t m_BufferSize;
+ RwLock m_Lock;
+ std::vector<IoBuffer> m_Pool;
+ uint32_t m_PendingAcquires = 0;
+ };
+
+ // Acquire one admission slot on the dispatcher thread. Returns a refcounted
+ // handle whose deleter releases the slot on last drop, so callers thread it
+ // through worker lambdas and AsyncXxx callbacks; whichever runs last fires
+ // the release. nullptr when admission is disabled (Sem == nullptr).
+ //
+ // Stats may be null when the caller has no per-request stat block (DeleteAll);
+ // the slot is still held but the wait time is not recorded.
+ //
+ // Exception safety: counting_semaphore::acquire is noexcept; only the
+ // shared_ptr control-block allocation can throw. The standard's deleter-
+ // invocation guarantee for shared_ptr(p, d) only kicks in when p is non-
+ // null (p == nullptr here), so libstdc++/MSVC happen to invoke the deleter
+ // on alloc failure but it is not portably required - hence the explicit
+ // guard.
+ std::shared_ptr<void> AcquireAdmissionSlot(const std::shared_ptr<AdmissionSemaphore>& Sem, S3AsyncStorageStats* Stats = nullptr)
+ {
+ if (!Sem)
+ {
+ return nullptr;
+ }
+ Stopwatch AdmWait;
+ Sem->acquire();
+ auto ReleaseGuard = MakeGuard([&Sem] { Sem->release(); });
+ std::shared_ptr<void> SlotRef(nullptr, [Sem](void*) { Sem->release(); });
+ ReleaseGuard.Dismiss();
+ if (Stats)
+ {
+ Stats->RecordAdmissionWait(AdmWait.GetElapsedTimeUs());
+ }
+ return SlotRef;
+ }
+} // namespace
+
+S3AsyncStorage::S3AsyncStorage(AsyncHttpClient& Client,
+ S3RequestBuilder& Builder,
+ CredentialsCallback GetCreds,
+ std::string KeyPrefix,
+ uint64_t MultipartChunkSize,
+ std::shared_ptr<AdmissionSemaphore> Admission,
+ uint32_t AdmissionCap)
+: m_Client(Client)
+, m_Builder(Builder)
+, m_GetCreds(std::move(GetCreds))
+, m_KeyPrefix(std::move(KeyPrefix))
+, m_MultipartChunkSize(MultipartChunkSize)
+, m_Admission(std::move(Admission))
+, m_AdmissionCap(m_Admission ? AdmissionCap : 0u)
+{
+}
+
+std::string
+S3AsyncStorage::CasKey(const IoHash& Hash) const
+{
+ return fmt::format("{}/cas/{}", m_KeyPrefix, Hash);
+}
+
+std::string
+S3AsyncStorage::CasPath(const IoHash& Hash) const
+{
+ return m_Builder.KeyToPath(CasKey(Hash));
+}
+
+// Per-range writes target the prealloc'd file at-offset; BasicFile::Write is positional and concurrency-safe.
+struct S3AsyncStorage::GetMultipartState
+{
+ GetMultipartState(ParallelWork& InWork, WorkerThreadPool& InPool, S3AsyncStorageStats InStats, uint32_t TotalBlocks)
+ : Work(InWork)
+ , Pool(InPool)
+ , Stats(InStats)
+ , Buffers(WriteBufferSize, TotalBlocks)
+ {
+ }
+
+ static constexpr size_t WriteBufferSize = 512u * 1024u;
+
+ ParallelWork& Work;
+ WorkerThreadPool& Pool;
+ std::shared_ptr<BasicFile> DestFile;
+ std::filesystem::path DestPath;
+ std::shared_ptr<ParallelWork::ExternalWorkToken> Token;
+ IoHash ContentHash;
+ S3AsyncStorageStats Stats;
+ uint32_t TotalRanges = 0;
+ std::atomic<uint32_t> PendingRanges{0};
+ std::atomic<bool> AnyFailed{false};
+ RwLock ErrorLock;
+ std::string FirstError;
+
+ WriteBufferPool Buffers;
+};
+
+// Shared state for a single-stream GET (medium tier - one ranged request,
+// streamed body). OnData fills 512 KiB IoBuffers from a shared pool on the
+// io strand and dispatches each filled buffer to a worker for one positional
+// Write. PendingWork counts in-flight writes plus a +1 slot for the stream
+// itself; the side that drops it to zero (last writer or OnComplete after
+// the final flush) finalises the request.
+struct S3AsyncStorage::GetStreamState
+{
+ GetStreamState(ParallelWork& InWork, WorkerThreadPool& InPool, S3AsyncStorageStats InStats, uint32_t TotalBlocks)
+ : Work(InWork)
+ , Pool(InPool)
+ , Stats(InStats)
+ , Buffers(WriteBufferSize, TotalBlocks)
+ {
+ }
+
+ static constexpr size_t WriteBufferSize = 512u * 1024u;
+
+ ParallelWork& Work;
+ WorkerThreadPool& Pool;
+ std::shared_ptr<BasicFile> DestFile;
+ std::filesystem::path DestPath;
+ std::shared_ptr<ParallelWork::ExternalWorkToken> Token;
+ IoHash ContentHash;
+ S3AsyncStorageStats Stats;
+ uint64_t ExpectedSize = 0;
+
+ IoBuffer ActiveBuf;
+ size_t BufFill = 0;
+ uint64_t NextAbsOffset = 0;
+ uint64_t TotalReceived = 0;
+
+ std::atomic<uint32_t> PendingWork{1}; // +1 for stream completion
+ std::atomic<bool> Failed{false};
+ RwLock ErrorLock;
+ std::string FirstError;
+
+ WriteBufferPool Buffers;
+};
+
+// Shared state for an in-flight multipart upload. PendingParts.fetch_sub(acq_rel)
+// in FinalizePutPart is the publication barrier for ETag writes; CompleteMultipart
+// (gated on Remaining == 1) sees every part's slot. ETagLock guards FirstError;
+// AnyFailed short-circuits the pipeline on first error.
+struct S3AsyncStorage::PutMultipartState
+{
+ PutMultipartState(ParallelWork& InWork, WorkerThreadPool& InPool, S3AsyncStorageStats InStats)
+ : Work(InWork)
+ , Pool(InPool)
+ , Stats(InStats)
+ {
+ }
+
+ ParallelWork& Work;
+ WorkerThreadPool& Pool;
+ std::shared_ptr<BasicFile> File;
+ std::shared_ptr<ParallelWork::ExternalWorkToken> Token;
+ IoHash ContentHash;
+ S3AsyncStorageStats Stats;
+ // Snapshot of credentials taken once at PutMultipart entry. Reused across
+ // all per-part signing + Complete/Abort. Avoids per-part m_GetCreds()
+ // shared-lock contention on the credential provider.
+ SigV4Credentials Creds;
+ std::string Key;
+ std::string Path;
+ std::string UploadId;
+ uint64_t TotalSize = 0;
+ uint64_t PartSize = 0;
+ uint32_t TotalParts = 0;
+ std::atomic<uint32_t> PendingParts{0};
+ // Monotonic dispatch cursor. Initialized to PreAcquired (min(TotalParts,
+ // AdmissionCap)) in PutMultipart. Each AsyncPut completion racing the
+ // handoff path issues fetch_add(1); whichever completion observes a value
+ // < TotalParts owns the next dispatch.
+ std::atomic<uint32_t> NextPartToDispatch{0};
+ uint32_t PreAcquired = 0;
+ // Counts pending ReadRange calls into File. Used to release File from the
+ // worker thread that does the last read so the close runs off the curl
+ // io strand.
+ std::atomic<uint32_t> ReadsRemaining{0};
+ std::atomic<bool> AnyFailed{false};
+
+ RwLock ETagLock;
+ std::vector<std::string> ETags; // indexed by PartNumber-1
+ std::string FirstError;
+};
+
+namespace {
+ // Sets AnyFailed (CAS) and captures FirstError on first failure only.
+ // Does NOT touch the dispatch cursor; callers that need to drain the
+ // undispatched tail invoke ClaimUndispatchedParts and fan out skips.
+ void RecordPutPartFailure(S3AsyncStorage::PutMultipartState& State, const std::string& Err)
+ {
+ // Log every failure so correlated S3 errors (e.g. AccessDenied on one
+ // part, ServiceUnavailable on another) all reach the operator. CAS still
+ // keeps the first message for the eventual Token->Fail.
+ ZEN_WARN("S3AsyncStorage::PutMultipart '{}': {}", State.ContentHash, Err);
+ bool ExpectedFalse = false;
+ if (State.AnyFailed.compare_exchange_strong(ExpectedFalse, true))
+ {
+ State.ETagLock.WithExclusiveLock([&] { State.FirstError = Err; });
+ }
+ }
+
+ // Claims every part index from NextPartToDispatch up to TotalParts.
+ // Returns the count claimed; caller is responsible for firing one
+ // FinalizePutPart per claimed index so PendingParts can reach 1 and
+ // trip CompleteMultipart/AbortMultipart. Idempotent on repeated calls
+ // (subsequent invocations return 0).
+ uint32_t ClaimUndispatchedParts(S3AsyncStorage::PutMultipartState& State)
+ {
+ const uint32_t Claimed = State.NextPartToDispatch.exchange(State.TotalParts, std::memory_order_acq_rel);
+ return Claimed < State.TotalParts ? State.TotalParts - Claimed : 0u;
+ }
+
+ void RecordGetPartFailure(S3AsyncStorage::GetMultipartState& State, const std::string& Err)
+ {
+ ZEN_WARN("S3AsyncStorage::GetMultipart '{}': {}", State.ContentHash, Err);
+ bool ExpectedFalse = false;
+ if (State.AnyFailed.compare_exchange_strong(ExpectedFalse, true))
+ {
+ State.ErrorLock.WithExclusiveLock([&] { State.FirstError = Err; });
+ }
+ }
+
+ void RecordGetStreamFailure(S3AsyncStorage::GetStreamState& State, const std::string& Err)
+ {
+ ZEN_WARN("S3AsyncStorage::Get '{}': {}", State.ContentHash, Err);
+ bool ExpectedFalse = false;
+ if (State.Failed.compare_exchange_strong(ExpectedFalse, true))
+ {
+ State.ErrorLock.WithExclusiveLock([&] { State.FirstError = Err; });
+ }
+ }
+} // namespace
+
+// Three tiers selected by Size:
+// - Small (< kPutSmallThreshold): single PUT, body materialized into one IoBuffer; SHA in one pass.
+// - Medium (< MultipartThreshold): single PUT, streaming source; two-pass (hash, then upload) with bounded RAM.
+// - Large (>= MultipartThreshold): S3 multipart; per-part body materialized in DispatchPartUpload.
+namespace {
+ constexpr uint64_t kPutSmallThreshold = 512u * 1024u;
+ constexpr size_t kPutReadChunk = 256u * 1024u;
+} // namespace
+
+void
+S3AsyncStorage::Put(ParallelWork& Work,
+ WorkerThreadPool& Pool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath,
+ S3AsyncStorageStats& Stats)
+{
+ const uint64_t MultipartThreshold = m_MultipartChunkSize + (m_MultipartChunkSize / 4);
+ if (Size >= MultipartThreshold)
+ {
+ PutMultipart(Work, Pool, Hash, Size, SourcePath, Stats);
+ return;
+ }
+ if (Size < kPutSmallThreshold)
+ {
+ PutSmall(Work, Pool, Hash, Size, SourcePath, Stats);
+ return;
+ }
+ PutMedium(Work, Pool, Hash, Size, SourcePath, Stats);
+}
+
+void
+S3AsyncStorage::PutSmall(ParallelWork& Work,
+ WorkerThreadPool& Pool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath,
+ S3AsyncStorageStats& Stats)
+{
+ auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal());
+ Stats.RecordScheduled();
+
+ // Acquire one admission slot on the dispatcher; SlotRef threads through
+ // worker lambda + AsyncPut callback so the slot is held until the network
+ // transfer completes (or the chain is destroyed on shutdown/cancel).
+ std::shared_ptr<void> SlotRef = AcquireAdmissionSlot(m_Admission, &Stats);
+
+ // Snapshot creds once on the dispatcher (single shared-lock acquire on the
+ // credential provider) and capture by value into the worker. Mirrors
+ // PutMultipart and avoids per-fanout contention on m_GetCreds.
+ SigV4Credentials Creds = m_GetCreds();
+
+ // Capture Stats BY VALUE: S3AsyncStorageStats is a struct of references to
+ // the long-lived PhaseStats atomics (PhaseStats outlives all in-flight
+ // transfers via the ParallelWork latch). The Stats wrapper struct itself
+ // is a local in the caller (S3AsyncStorageAdapter::Put) and goes out of
+ // scope before deferred work runs, so capturing &Stats would dangle.
+ Work.ScheduleWork(
+ Pool,
+ [this,
+ Hash = IoHash(Hash),
+ Size,
+ SourcePath = std::filesystem::path(SourcePath),
+ Token,
+ Stats,
+ Creds = std::move(Creds),
+ SlotRef = std::move(SlotRef)](std::atomic<bool>& Abort) mutable {
+ if (Abort.load())
+ {
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutSmall '{}': aborted"sv, Hash)));
+ return;
+ }
+ try
+ {
+ if (Creds.AccessKeyId.empty())
+ {
+ Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutSmall '{}': no credentials available"sv, Hash)));
+ return;
+ }
+
+ BasicFile File(SourcePath, BasicFile::Mode::kRead);
+ if (File.FileSize() != Size)
+ {
+ Token->Fail(std::make_exception_ptr(
+ zen::runtime_error("S3AsyncStorage::PutSmall '{}': source size {} differs from declared {}"sv,
+ Hash,
+ File.FileSize(),
+ Size)));
+ return;
+ }
+
+ // Heap IoBuffer of exact size; explicit chunked pread fills it. No
+ // IoBuffer auto-materialize, no mmap. SHA computed in one pass over
+ // the buffer once it's filled.
+ IoBuffer Body(static_cast<size_t>(Size));
+ uint8_t* Dst = static_cast<uint8_t*>(Body.MutableData());
+ uint64_t Off = 0;
+ while (Off < Size)
+ {
+ const size_t Take = static_cast<size_t>(std::min<uint64_t>(kPutReadChunk, Size - Off));
+ File.Read(Dst + Off, Take, Off);
+ Off += Take;
+ }
+
+ std::string PayloadHash = Sha256ToHex(ComputeSha256(Body.GetData(), Body.GetSize()));
+ std::string Path = CasPath(Hash);
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "PUT", Path, "", PayloadHash);
+
+ Stopwatch Timer = Stats.BeginRequest();
+ // Pair Begin with End so an alloc throw inside AsyncPut (lambda capture
+ // before submit) does not strand InFlight elevated.
+ auto InFlightGuard = MakeGuard([&Stats] { Stats.EndRequest(0, 0); });
+ m_Client.AsyncPut(
+ Path,
+ std::move(Body),
+ [Hash, Token, Timer, Stats, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable {
+ Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? static_cast<uint64_t>(Resp.UploadedBytes) : 0);
+ if (!Resp.IsSuccess())
+ {
+ std::string Err = S3ErrorMessage("S3 PUT failed", Resp);
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutSmall '{}': {}"sv, Hash, Err)));
+ return;
+ }
+ Token->Complete();
+ },
+ Headers);
+ InFlightGuard.Dismiss();
+ }
+ catch (...)
+ {
+ Token->Fail(std::current_exception());
+ }
+ });
+}
+
+void
+S3AsyncStorage::PutMedium(ParallelWork& Work,
+ WorkerThreadPool& Pool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath,
+ S3AsyncStorageStats& Stats)
+{
+ auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal());
+ Stats.RecordScheduled();
+
+ std::shared_ptr<void> SlotRef = AcquireAdmissionSlot(m_Admission, &Stats);
+
+ // Snapshot creds on the dispatcher; see PutSmall.
+ SigV4Credentials Creds = m_GetCreds();
+
+ Work.ScheduleWork(
+ Pool,
+ [this,
+ Hash = IoHash(Hash),
+ Size,
+ SourcePath = std::filesystem::path(SourcePath),
+ Token,
+ Stats,
+ Creds = std::move(Creds),
+ SlotRef = std::move(SlotRef)](std::atomic<bool>& Abort) mutable {
+ if (Abort.load())
+ {
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMedium '{}': aborted"sv, Hash)));
+ return;
+ }
+ try
+ {
+ if (Creds.AccessKeyId.empty())
+ {
+ Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMedium '{}': no credentials available"sv, Hash)));
+ return;
+ }
+
+ // Hash pass: stream-read 256 KiB chunks into a heap scratch buffer,
+ // feed Sha256Stream incrementally. No body materialization. The file
+ // handle is shared with the upload pass via shared_ptr so libcurl's
+ // READ callback can pread from the same handle on the io thread.
+ auto File = std::make_shared<BasicFile>(SourcePath, BasicFile::Mode::kRead);
+ if (File->FileSize() != Size)
+ {
+ Token->Fail(std::make_exception_ptr(
+ zen::runtime_error("S3AsyncStorage::PutMedium '{}': source size {} differs from declared {}"sv,
+ Hash,
+ File->FileSize(),
+ Size)));
+ return;
+ }
+
+ Sha256Stream Hasher;
+ {
+ std::unique_ptr<uint8_t[]> Scratch(new uint8_t[kPutReadChunk]);
+ uint64_t Off = 0;
+ while (Off < Size)
+ {
+ const size_t Take = static_cast<size_t>(std::min<uint64_t>(kPutReadChunk, Size - Off));
+ File->Read(Scratch.get(), Take, Off);
+ Hasher.Update(Scratch.get(), Take);
+ Off += Take;
+ }
+ }
+ std::string PayloadHash = Sha256ToHex(Hasher.Finalize());
+ std::string Path = CasPath(Hash);
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "PUT", Path, "", PayloadHash);
+
+ Stopwatch Timer = Stats.BeginRequest();
+ // Pair Begin with End so an alloc throw inside AsyncPut does not
+ // strand InFlight elevated.
+ auto InFlightGuard = MakeGuard([&Stats] { Stats.EndRequest(0, 0); });
+
+ // Streaming source: libcurl pulls 256 KiB chunks from the file on
+ // the io thread. Local-disk pread is fast enough for medium tier;
+ // page cache makes the second pass over the same bytes near-free.
+ m_Client.AsyncPut(
+ Path,
+ Size,
+ [File](uint8_t* DstBuf, size_t MaxBytes, uint64_t AbsOffset) -> size_t {
+ const uint64_t Remaining = File->FileSize() > AbsOffset ? File->FileSize() - AbsOffset : 0;
+ const size_t Take = static_cast<size_t>(std::min<uint64_t>(MaxBytes, Remaining));
+ if (Take == 0)
+ {
+ return 0;
+ }
+ File->Read(DstBuf, Take, AbsOffset);
+ return Take;
+ },
+ [Hash, Token, Timer, Size, Stats, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable {
+ Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? Size : 0);
+ if (!Resp.IsSuccess())
+ {
+ std::string Err = S3ErrorMessage("S3 PUT failed", Resp);
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMedium '{}': {}"sv, Hash, Err)));
+ return;
+ }
+ Token->Complete();
+ },
+ Headers);
+ InFlightGuard.Dismiss();
+ }
+ catch (...)
+ {
+ Token->Fail(std::current_exception());
+ }
+ });
+}
+
+void
+S3AsyncStorage::PutMultipart(ParallelWork& Work,
+ WorkerThreadPool& Pool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath,
+ S3AsyncStorageStats& Stats)
+{
+ auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal());
+ Stats.RecordScheduled();
+
+ // Fires only when an exception is in flight; a missed Dismiss() on a
+ // non-throwing early return falls through to Token's destructor safety
+ // net rather than asserting on Token->Fail(nullptr).
+ auto FailGuard = MakeGuard([Token] {
+ if (auto Ex = std::current_exception())
+ {
+ Token->Fail(Ex);
+ }
+ });
+
+ SigV4Credentials Creds = m_GetCreds();
+ if (Creds.AccessKeyId.empty())
+ {
+ FailGuard.Dismiss();
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': no credentials available"sv, Hash)));
+ return;
+ }
+
+ auto File = std::make_shared<BasicFile>(SourcePath, BasicFile::Mode::kRead);
+ if (File->FileSize() != Size)
+ {
+ FailGuard.Dismiss();
+ Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': source size {} differs from declared {}"sv,
+ Hash,
+ File->FileSize(),
+ Size)));
+ return;
+ }
+
+ auto State = std::make_shared<PutMultipartState>(Work, Pool, Stats);
+ State->File = std::move(File);
+ State->Token = Token;
+ State->ContentHash = Hash;
+ State->Creds = Creds; // snapshot reused across all parts + complete/abort
+ State->Key = CasKey(Hash);
+ State->Path = CasPath(Hash);
+ State->TotalSize = Size;
+ State->PartSize = m_MultipartChunkSize;
+ State->TotalParts = static_cast<uint32_t>((Size + State->PartSize - 1) / State->PartSize);
+ State->PendingParts = State->TotalParts;
+ State->ReadsRemaining = State->TotalParts;
+ State->ETags.resize(State->TotalParts);
+
+ // Admission gating: the wave acquires min(TotalParts, AdmissionCap) slots
+ // inside DispatchInitialPartWave on a worker thread (off the dispatcher
+ // and off the io strand) so a single multipart cannot starve other
+ // dispatcher work while it drains the semaphore. Tail parts (TotalParts >
+ // cap) are dispatched lazily by HandoffSlotToNextPart from each AsyncPut
+ // completion, so the in-flight count per upload stays <= cap.
+ State->PreAcquired = m_Admission ? std::min<uint32_t>(State->TotalParts, m_AdmissionCap) : State->TotalParts;
+ State->NextPartToDispatch.store(State->PreAcquired, std::memory_order_relaxed);
+
+ std::string CanonicalQs = BuildCanonicalQueryString({{"uploads", ""}});
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "POST", State->Path, CanonicalQs, S3EmptyPayloadHash);
+ std::string FullPath = S3BuildRequestPath(State->Path, CanonicalQs);
+
+ // CreateMultipartUpload / CompleteMultipartUpload / AbortMultipartUpload are
+ // metadata operations - they do not move payload bytes and would distort the
+ // per-request stats (avg/max latency) if folded into RequestCount. Counted
+ // separately here means PhaseStats.RequestCount tracks data-bearing PUTs only,
+ // matching the sync S3Storage path's accounting more closely.
+
+ m_Client.AsyncPost(
+ FullPath,
+ [this, State](HttpClient::Response Resp) {
+ if (!Resp.IsSuccess())
+ {
+ std::string Err = S3ErrorMessage("S3 CreateMultipartUpload failed", Resp);
+ State->Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': {}"sv, State->ContentHash, Err)));
+ return;
+ }
+ // See PutMultipart entry: tolerate null exception_ptr on a missed Dismiss().
+ auto CallbackGuard = MakeGuard([State] {
+ if (auto Ex = std::current_exception())
+ {
+ State->Token->Fail(Ex);
+ }
+ });
+ std::string_view Body = Resp.AsText();
+ // S3 can answer 200 with an embedded <Error> body even on Create.
+ // Mirror CompleteMultipart's parse so the original error code/message
+ // surfaces instead of a generic "missing UploadId".
+ std::string_view ErrorCode;
+ std::string_view ErrorMessage;
+ if (S3ExtractError(Body, ErrorCode, ErrorMessage))
+ {
+ CallbackGuard.Dismiss();
+ State->Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': create returned error: {} - {}"sv,
+ State->ContentHash,
+ ErrorCode,
+ ErrorMessage)));
+ return;
+ }
+ std::string_view UploadId = S3ExtractXmlValue(Body, "UploadId");
+ if (UploadId.empty())
+ {
+ CallbackGuard.Dismiss();
+ State->Token->Fail(std::make_exception_ptr(
+ zen::runtime_error("S3AsyncStorage::PutMultipart '{}': missing UploadId in CreateMultipartUpload response"sv,
+ State->ContentHash)));
+ return;
+ }
+ State->UploadId = std::string(UploadId);
+ // Hop the wave dispatch onto a worker so the per-part admission
+ // acquire never blocks the io strand or the caller dispatcher.
+ try
+ {
+ State->Work.ScheduleWork(
+ State->Pool,
+ [this, State](std::atomic<bool>&) { DispatchInitialPartWave(State); },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (...)
+ {
+ CallbackGuard.Dismiss();
+ RecordPutPartFailure(*State, "S3 UploadPart wave schedule failed");
+ for (uint32_t J = 0; J < State->TotalParts; ++J)
+ {
+ FinalizePutPart(State);
+ }
+ return;
+ }
+ CallbackGuard.Dismiss();
+ },
+ Headers);
+ FailGuard.Dismiss();
+}
+
+void
+S3AsyncStorage::DispatchInitialPartWave(std::shared_ptr<PutMultipartState> State)
+{
+ const bool AdmissionEnabled = (m_Admission != nullptr);
+ const uint32_t InitialDispatch = AdmissionEnabled ? State->PreAcquired : State->TotalParts;
+
+ for (uint32_t I = 0; I < InitialDispatch; ++I)
+ {
+ const uint32_t PartNum = I + 1;
+ std::shared_ptr<void> SlotRef;
+ try
+ {
+ if (AdmissionEnabled)
+ {
+ SlotRef = AcquireAdmissionSlot(m_Admission, &State->Stats);
+ }
+ State->Work.ScheduleWork(
+ State->Pool,
+ [this, State, PartNum, SlotRef = std::move(SlotRef)](std::atomic<bool>&) mutable {
+ DispatchPartUpload(State, PartNum, std::move(SlotRef));
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ // Acquire/schedule failed mid-wave. Account for this part plus
+ // the un-iterated wave tail, then drain the lazy tail past the
+ // wave. Slot (if acquired) releases when SlotRef destructs.
+ RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} schedule failed: {}", PartNum, Ex.what()));
+ const uint32_t WaveTail = InitialDispatch - I;
+ for (uint32_t J = 0; J < WaveTail; ++J)
+ {
+ FinalizePutPart(State);
+ }
+ DrainUndispatchedParts(State);
+ return;
+ }
+ }
+}
+
+void
+S3AsyncStorage::HandoffSlotToNextPart(std::shared_ptr<PutMultipartState> State, uint32_t PartIdx, std::shared_ptr<void> SlotRef)
+{
+ const uint32_t PartNum = PartIdx + 1;
+ try
+ {
+ State->Work.ScheduleWork(
+ State->Pool,
+ [this, State, PartNum, SlotRef = std::move(SlotRef)](std::atomic<bool>&) mutable {
+ DispatchPartUpload(State, PartNum, std::move(SlotRef));
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ // Handoff dispatch failed. The slot we were about to hand off
+ // releases when SlotRef destructs at scope exit. Account for this
+ // never-dispatched part and drain the rest of the tail.
+ RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} handoff failed: {}", PartNum, Ex.what()));
+ FinalizePutPart(State);
+ DrainUndispatchedParts(State);
+ }
+}
+
+void
+S3AsyncStorage::DrainUndispatchedParts(std::shared_ptr<PutMultipartState> State)
+{
+ const uint32_t Skipped = ClaimUndispatchedParts(*State);
+ for (uint32_t J = 0; J < Skipped; ++J)
+ {
+ FinalizePutPart(State);
+ }
+}
+
+#if ZEN_WITH_TESTS
+namespace s3asyncstorage_test_hooks {
+ std::atomic<uint32_t> g_ForceNextPartFailures{0};
+ void ForceNextPartFailures(uint32_t Count) { g_ForceNextPartFailures.store(Count, std::memory_order_relaxed); }
+} // namespace s3asyncstorage_test_hooks
+#endif
+
+void
+S3AsyncStorage::DispatchPartUpload(std::shared_ptr<PutMultipartState> State, uint32_t PartNum, std::shared_ptr<void> SlotRef)
+{
+ const uint64_t Offset = static_cast<uint64_t>(PartNum - 1) * State->PartSize;
+ const uint64_t ChunkSize = std::min<uint64_t>(State->PartSize, State->TotalSize - Offset);
+
+ // Release File on this worker thread once all reads have been served, so
+ // the close never runs on the curl io strand from a captured State ref in
+ // an AsyncPut callback.
+ auto ReleaseFileIfLast = [&State] {
+ if (State->ReadsRemaining.fetch_sub(1, std::memory_order_acq_rel) == 1)
+ {
+ State->File.reset();
+ }
+ };
+
+#if ZEN_WITH_TESTS
+ // Test hook: synthesize a part-level failure to drive the AbortMultipart
+ // path. Decrement the counter while > 0; each consumed slot fails one part.
+ {
+ uint32_t Cur = s3asyncstorage_test_hooks::g_ForceNextPartFailures.load(std::memory_order_relaxed);
+ while (Cur > 0 &&
+ !s3asyncstorage_test_hooks::g_ForceNextPartFailures.compare_exchange_weak(Cur, Cur - 1, std::memory_order_acq_rel))
+ {
+ }
+ if (Cur > 0)
+ {
+ ReleaseFileIfLast();
+ RecordPutPartFailure(*State, fmt::format("test-injected part failure (part {})", PartNum));
+ FinalizePutPart(State);
+ DrainUndispatchedParts(State);
+ return;
+ }
+ }
+#endif
+
+ // Short-circuit if a foreign failure has flipped AnyFailed since this part was
+ // scheduled (handoff race: success callback bumps the cursor to dispatch the
+ // next part, drain runs concurrently). Skip the file read + sign + AsyncPut for
+ // a doomed transfer; FinalizePutPart accounting is symmetric to the success path.
+ if (State->AnyFailed.load(std::memory_order_acquire))
+ {
+ ReleaseFileIfLast();
+ FinalizePutPart(State);
+ return;
+ }
+
+ // Explicit chunked positional pread + Sha256Stream in one pass. One heap
+ // IoBuffer of exact part size; no mmap, no IoBuffer auto-materialize.
+ IoBuffer Part(static_cast<size_t>(ChunkSize));
+ uint8_t* Dst = static_cast<uint8_t*>(Part.MutableData());
+ Sha256Stream Hasher;
+ try
+ {
+ uint64_t Read = 0;
+ while (Read < ChunkSize)
+ {
+ const size_t Take = static_cast<size_t>(std::min<uint64_t>(kPutReadChunk, ChunkSize - Read));
+ State->File->Read(Dst + Read, Take, Offset + Read);
+ Hasher.Update(Dst + Read, Take);
+ Read += Take;
+ }
+ ReleaseFileIfLast();
+ }
+ catch (const std::exception& Ex)
+ {
+ ReleaseFileIfLast();
+ RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} read failed: {}", PartNum, Ex.what()));
+ FinalizePutPart(State);
+ DrainUndispatchedParts(State);
+ return;
+ }
+
+ try
+ {
+ // Use the snapshot taken at PutMultipart entry; saves ~TotalParts
+ // shared-lock acquisitions on the credential provider.
+ const SigV4Credentials& Creds = State->Creds;
+
+ std::string CanonicalQs = BuildCanonicalQueryString({
+ {"partNumber", fmt::format("{}", PartNum)},
+ {"uploadId", State->UploadId},
+ });
+ std::string PayloadHash = Sha256ToHex(Hasher.Finalize());
+
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "PUT", State->Path, CanonicalQs, PayloadHash);
+ std::string FullPath = S3BuildRequestPath(State->Path, CanonicalQs);
+
+ Stopwatch Timer = State->Stats.BeginRequest();
+ // Pair Begin with End so an alloc throw inside AsyncPut does not strand
+ // InFlight elevated.
+ auto InFlightGuard = MakeGuard([&State] { State->Stats.EndRequest(0, 0); });
+
+ m_Client.AsyncPut(
+ FullPath,
+ std::move(Part),
+ [this, State, PartNum, Timer, ChunkSize, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable {
+ State->Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? ChunkSize : 0);
+ if (!Resp.IsSuccess())
+ {
+ RecordPutPartFailure(*State, S3ErrorMessage(fmt::format("S3 UploadPart {} failed", PartNum), Resp));
+ DrainUndispatchedParts(State);
+ FinalizePutPart(State);
+ return;
+ }
+ std::string_view ETag = Resp.FindHeader("etag");
+ if (ETag.empty())
+ {
+ RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} response missing ETag header", PartNum));
+ DrainUndispatchedParts(State);
+ FinalizePutPart(State);
+ return;
+ }
+ // Per-slot writes don't race - each part owns its slot.
+ // PendingParts.fetch_sub(acq_rel) in FinalizePutPart is the
+ // publication barrier: CompleteMultipart (gated on Remaining
+ // == 1) sees every ETag write. New readers must go through
+ // the same barrier or take ETagLock.
+ State->ETags[PartNum - 1].assign(ETag);
+
+ // Try to hand the slot off to the next undispatched part. Skip
+ // the handoff if a prior failure has flipped AnyFailed (cursor
+ // was already exchanged to TotalParts by the drain there).
+ if (!State->AnyFailed.load(std::memory_order_acquire))
+ {
+ const uint32_t NextIdx = State->NextPartToDispatch.fetch_add(1, std::memory_order_acq_rel);
+ if (NextIdx < State->TotalParts)
+ {
+ HandoffSlotToNextPart(State, NextIdx, std::move(SlotRef));
+ }
+ }
+ // Account for THIS part. SlotRef may already be moved-from
+ // (if handed off); if not, releases when this lambda destructs.
+ FinalizePutPart(State);
+ },
+ Headers);
+ InFlightGuard.Dismiss();
+ }
+ catch (const std::exception& Ex)
+ {
+ RecordPutPartFailure(*State, fmt::format("S3 UploadPart {} dispatch failed: {}", PartNum, Ex.what()));
+ FinalizePutPart(State);
+ DrainUndispatchedParts(State);
+ }
+}
+
+void
+S3AsyncStorage::FinalizePutPart(std::shared_ptr<PutMultipartState> State)
+{
+ const uint32_t Remaining = State->PendingParts.fetch_sub(1, std::memory_order_acq_rel);
+ if (Remaining != 1)
+ {
+ return;
+ }
+
+ // Hop the Complete/Abort dispatch off the curl io strand. SHA over the
+ // parts-XML and SignRequest are CPU work; AsyncPost itself queues a strand
+ // wakeup. Running both inline here would pin the strand thread.
+ const bool Failed = State->AnyFailed.load(std::memory_order_acquire);
+ try
+ {
+ State->Work.ScheduleWork(State->Pool, [this, State, Failed](std::atomic<bool>&) {
+ if (Failed)
+ {
+ AbortMultipart(State);
+ }
+ else
+ {
+ CompleteMultipart(State);
+ }
+ });
+ }
+ catch (...)
+ {
+ // ScheduleWork failed before Complete/Abort could run; surface the leak via Token->Fail.
+ State->Token->Fail(std::current_exception());
+ }
+}
+
+void
+S3AsyncStorage::CompleteMultipart(std::shared_ptr<PutMultipartState> State)
+{
+ // Reuse the snapshot taken at PutMultipart entry. The upload-id was issued
+ // against these creds; refreshing here would only matter if creds expired
+ // mid-upload, in which case all the part uploads would already have failed.
+ const SigV4Credentials& Creds = State->Creds;
+
+ ExtendableStringBuilder<1024> Xml;
+ Xml.Append("<CompleteMultipartUpload>");
+ for (uint32_t I = 0; I < State->TotalParts; ++I)
+ {
+ Xml.Append(fmt::format("<Part><PartNumber>{}</PartNumber><ETag>{}</ETag></Part>", I + 1, State->ETags[I]));
+ }
+ Xml.Append("</CompleteMultipartUpload>");
+ std::string_view XmlView = Xml.ToView();
+
+ std::string CanonicalQs = BuildCanonicalQueryString({{"uploadId", State->UploadId}});
+ std::string PayloadHash = Sha256ToHex(ComputeSha256(XmlView));
+
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "POST", State->Path, CanonicalQs, PayloadHash);
+ std::string FullPath = S3BuildRequestPath(State->Path, CanonicalQs);
+
+ IoBuffer Payload(IoBuffer::Clone, XmlView.data(), XmlView.size());
+
+ // Metadata op; not counted in PhaseStats (see CreateMultipartUpload comment).
+ m_Client.AsyncPost(
+ FullPath,
+ Payload,
+ ZenContentType::kXML,
+ [State](HttpClient::Response Resp) {
+ if (!Resp.IsSuccess())
+ {
+ std::string Err = S3ErrorMessage("S3 CompleteMultipartUpload failed", Resp);
+ State->Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': {}"sv, State->ContentHash, Err)));
+ return;
+ }
+ std::string_view Body = Resp.AsText();
+ std::string_view ErrorCode;
+ std::string_view ErrorMessage;
+ if (S3ExtractError(Body, ErrorCode, ErrorMessage))
+ {
+ State->Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': complete returned error: {} - {}"sv,
+ State->ContentHash,
+ ErrorCode,
+ ErrorMessage)));
+ return;
+ }
+ State->Token->Complete();
+ },
+ Headers);
+}
+
+void
+S3AsyncStorage::AbortMultipart(std::shared_ptr<PutMultipartState> State)
+{
+ // Reuse the snapshot taken at PutMultipart entry.
+ const SigV4Credentials& Creds = State->Creds;
+
+ std::string CanonicalQs = BuildCanonicalQueryString({{"uploadId", State->UploadId}});
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "DELETE", State->Path, CanonicalQs, S3EmptyPayloadHash);
+ std::string FullPath = S3BuildRequestPath(State->Path, CanonicalQs);
+
+ // Metadata op; not counted in PhaseStats (see CreateMultipartUpload comment).
+ m_Client.AsyncDelete(
+ FullPath,
+ [State](HttpClient::Response Resp) {
+ std::string FirstError;
+ State->ETagLock.WithExclusiveLock([&] { FirstError = std::move(State->FirstError); });
+ if (!Resp.IsSuccess())
+ {
+ State->Token->Fail(std::make_exception_ptr(
+ zen::runtime_error("S3AsyncStorage::PutMultipart '{}': part failed ({}); abort also failed: {}"sv,
+ State->ContentHash,
+ FirstError,
+ S3ErrorMessage("S3 AbortMultipartUpload failed", Resp))));
+ return;
+ }
+ State->Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::PutMultipart '{}': {}"sv, State->ContentHash, FirstError)));
+ },
+ Headers);
+}
+
+void
+S3AsyncStorage::Get(ParallelWork& Work,
+ WorkerThreadPool& Pool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath,
+ S3AsyncStorageStats& Stats)
+{
+ // Three tiers: in-memory AsyncGet (< 512 KiB), AsyncStream into pooled
+ // 512 KiB write buffers (medium), GetMultipart (>= MultiRangeThreshold).
+ constexpr uint64_t StreamingThreshold = 512u * 1024u;
+
+ const uint64_t MultiRangeThreshold = m_MultipartChunkSize + (m_MultipartChunkSize / 4);
+ if (Size >= MultiRangeThreshold)
+ {
+ GetMultipart(Work, Pool, Hash, Size, DestinationPath, Stats);
+ return;
+ }
+
+ auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal());
+ Stats.RecordScheduled();
+
+ // See PutMultipart: tolerate null exception_ptr from a refactor that
+ // adds a non-throwing early return without Dismiss().
+ auto FailGuard = MakeGuard([Token] {
+ if (auto Ex = std::current_exception())
+ {
+ Token->Fail(Ex);
+ }
+ });
+
+ std::shared_ptr<void> SlotRef = AcquireAdmissionSlot(m_Admission, &Stats);
+
+ SigV4Credentials Creds = m_GetCreds();
+ if (Creds.AccessKeyId.empty())
+ {
+ FailGuard.Dismiss();
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': no credentials available"sv, Hash)));
+ return;
+ }
+
+ std::string Path = CasPath(Hash);
+
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "GET", Path, "", S3EmptyPayloadHash);
+
+ Stopwatch Timer = Stats.BeginRequest();
+ // Pair Begin with End so an alloc throw inside AsyncGet/AsyncStream does not
+ // strand InFlight elevated.
+ auto InFlightGuard = MakeGuard([&Stats] { Stats.EndRequest(0, 0); });
+
+ if (Size < StreamingThreshold)
+ {
+ // Small: in-memory AsyncGet, single worker-hop write at completion.
+ m_Client.AsyncGet(
+ Path,
+ [Hash = IoHash(Hash), Token, Timer, Stats, DestPath = DestinationPath, Size, &Work, &Pool, SlotRef = std::move(SlotRef)](
+ HttpClient::Response Resp) mutable {
+ Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? Resp.ResponsePayload.GetSize() : 0);
+ // No remove() on the failure / size-mismatch paths: the destination
+ // file is only created by the worker hop below on success, so
+ // nothing exists to delete here.
+ if (!Resp.IsSuccess())
+ {
+ std::string Err = S3ErrorMessage("S3 GET failed", Resp);
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': {}"sv, Hash, Err)));
+ return;
+ }
+ if (Resp.ResponsePayload.GetSize() != Size)
+ {
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': received {} bytes, expected {}"sv,
+ Hash,
+ Resp.ResponsePayload.GetSize(),
+ Size)));
+ return;
+ }
+ try
+ {
+ Work.ScheduleWork(
+ Pool,
+ [Token, DestPath, Hash, Payload = std::move(Resp.ResponsePayload)](std::atomic<bool>&) mutable {
+ try
+ {
+ BasicFile Out(DestPath, BasicFile::Mode::kTruncate);
+ Out.Write(Payload.GetData(), Payload.GetSize(), 0);
+ Token->Complete();
+ }
+ catch (const std::exception& Ex)
+ {
+ std::error_code Ec;
+ std::filesystem::remove(DestPath, Ec);
+ Token->Fail(std::make_exception_ptr(
+ zen::runtime_error("S3AsyncStorage::Get '{}': write failed: {}"sv, Hash, Ex.what())));
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': schedule failed: {}"sv, Hash, Ex.what())));
+ }
+ },
+ Headers);
+ InFlightGuard.Dismiss();
+ FailGuard.Dismiss();
+ return;
+ }
+
+ // Medium tier: AsyncStream + GetStreamState (see struct doc).
+ const uint32_t StreamTotalBlocks =
+ static_cast<uint32_t>((Size + GetStreamState::WriteBufferSize - 1) / GetStreamState::WriteBufferSize);
+ auto State = std::make_shared<GetStreamState>(Work, Pool, Stats, StreamTotalBlocks);
+ State->Token = Token;
+ State->ContentHash = Hash;
+ State->DestPath = DestinationPath;
+ State->ExpectedSize = Size;
+
+ try
+ {
+ State->DestFile = std::make_shared<BasicFile>(DestinationPath, BasicFile::Mode::kTruncate);
+ std::error_code PrepareEc = PrepareFileForScatteredWrite(State->DestFile->Handle(), Size);
+ if (PrepareEc)
+ {
+ throw zen::runtime_error("PrepareFileForScatteredWrite failed: {}"sv, PrepareEc.message());
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ FailGuard.Dismiss();
+ // Drop DestFile (if opened) before remove() so Windows lets the unlink
+ // proceed; otherwise we leave a 0-byte file behind on the prealloc path.
+ State->DestFile.reset();
+ std::error_code Ec;
+ std::filesystem::remove(DestinationPath, Ec);
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': preallocate failed: {}"sv, Hash, Ex.what())));
+ return;
+ }
+
+ m_Client.AsyncStream(
+ Path,
+ [this, State](const uint8_t* Data, size_t SizeBytes, uint64_t /*TotalSize*/) -> bool {
+ constexpr size_t WriteBufferSize = GetStreamState::WriteBufferSize;
+ if (State->TotalReceived + SizeBytes > State->ExpectedSize)
+ {
+ RecordGetStreamFailure(
+ *State,
+ fmt::format("S3 GET overflow: got {} bytes past expected {}", State->TotalReceived + SizeBytes, State->ExpectedSize));
+ return false;
+ }
+ const uint8_t* Cur = Data;
+ size_t Remaining = SizeBytes;
+ while (Remaining > 0)
+ {
+ if (!State->ActiveBuf)
+ {
+ State->ActiveBuf = State->Buffers.Acquire();
+ }
+ const size_t Avail = WriteBufferSize - State->BufFill;
+ const size_t Take = std::min(Remaining, Avail);
+ std::memcpy(State->ActiveBuf.MutableData<uint8_t>() + State->BufFill, Cur, Take);
+ State->BufFill += Take;
+ State->TotalReceived += Take;
+ Cur += Take;
+ Remaining -= Take;
+ if (State->BufFill == WriteBufferSize)
+ {
+ IoBuffer Buf = std::move(State->ActiveBuf);
+ const size_t Fill = State->BufFill;
+ const uint64_t AbsOffset = State->NextAbsOffset;
+ State->NextAbsOffset += Fill;
+ State->BufFill = 0;
+ State->PendingWork.fetch_add(1, std::memory_order_acq_rel);
+ try
+ {
+ State->Work.ScheduleWork(
+ State->Pool,
+ [this, State, Buf = std::move(Buf), Fill, AbsOffset](std::atomic<bool>&) mutable {
+ try
+ {
+ State->DestFile->Write(Buf.GetData(), Fill, AbsOffset);
+ }
+ catch (const std::exception& Ex)
+ {
+ RecordGetStreamFailure(*State, fmt::format("S3 GET write failed: {}", Ex.what()));
+ }
+ State->Buffers.Release(std::move(Buf));
+ if (State->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1)
+ {
+ OnGetStreamFinalised(State);
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ // ScheduleWork failed before the worker took ownership of the
+ // PendingWork increment we did above; balance it here and
+ // abort the transfer so OnComplete tears down cleanly. Buf is
+ // gone (moved into the lambda then destroyed when ScheduleWork
+ // threw); restore the pool's acquire slot so OnComplete's
+ // tail-flush can still Acquire if needed.
+ RecordGetStreamFailure(*State, fmt::format("S3 GET schedule failed: {}", Ex.what()));
+ State->Buffers.RestoreAcquireSlot();
+ State->PendingWork.fetch_sub(1, std::memory_order_acq_rel);
+ return false;
+ }
+ }
+ }
+ return true;
+ },
+ [this, State, Timer, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable {
+ State->Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? State->TotalReceived : 0);
+ bool Failed = !Resp.IsSuccess();
+ if (Failed)
+ {
+ RecordGetStreamFailure(*State, S3ErrorMessage("S3 GET failed", Resp));
+ }
+ else if (State->TotalReceived != State->ExpectedSize)
+ {
+ RecordGetStreamFailure(*State,
+ fmt::format("S3 GET wrote {} bytes, expected {}", State->TotalReceived, State->ExpectedSize));
+ Failed = true;
+ }
+ if (!Failed && State->BufFill > 0)
+ {
+ IoBuffer Buf = std::move(State->ActiveBuf);
+ const size_t Fill = State->BufFill;
+ const uint64_t AbsOffset = State->NextAbsOffset;
+ State->BufFill = 0;
+ State->PendingWork.fetch_add(1, std::memory_order_acq_rel);
+ try
+ {
+ State->Work.ScheduleWork(
+ State->Pool,
+ [this, State, Buf = std::move(Buf), Fill, AbsOffset](std::atomic<bool>&) mutable {
+ try
+ {
+ State->DestFile->Write(Buf.GetData(), Fill, AbsOffset);
+ }
+ catch (const std::exception& Ex)
+ {
+ RecordGetStreamFailure(*State, fmt::format("S3 GET write failed: {}", Ex.what()));
+ }
+ State->Buffers.Release(std::move(Buf));
+ if (State->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1)
+ {
+ OnGetStreamFinalised(State);
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ RecordGetStreamFailure(*State, fmt::format("S3 GET tail-flush schedule failed: {}", Ex.what()));
+ State->Buffers.RestoreAcquireSlot();
+ State->PendingWork.fetch_sub(1, std::memory_order_acq_rel);
+ }
+ }
+ // PendingWork = 1 (stream slot, set in ctor) + N (per-buffer worker
+ // dispatched in OnData) + (optional 1 for the tail-flush dispatched
+ // just above). The fetch_sub returning 1 means we observed the value
+ // that was 1 right before our decrement - i.e. we are the last
+ // participant. Either this branch (no tail flush, no in-flight
+ // workers) or the last per-buffer worker performs the finalise. Both
+ // paths use acq_rel so the work-item writes are visible to whichever
+ // thread runs OnGetStreamFinalised.
+ if (State->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1)
+ {
+ OnGetStreamFinalised(State);
+ }
+ },
+ Headers);
+ InFlightGuard.Dismiss();
+ FailGuard.Dismiss();
+}
+
+void
+S3AsyncStorage::OnGetStreamFinalised(std::shared_ptr<GetStreamState> State)
+{
+ // All disk I/O (close + unlink on failure) is hopped into the worker pool.
+ // The last PendingWork dec can land on the curl io strand, and inline
+ // disk syscalls there would block the curl_multi poll loop.
+ const bool Failed = State->Failed.load(std::memory_order_acquire);
+ try
+ {
+ State->Work.ScheduleWork(
+ State->Pool,
+ [State, Failed](std::atomic<bool>&) {
+ if (Failed)
+ {
+ std::string Err;
+ State->ErrorLock.WithExclusiveLock([&] { Err = std::move(State->FirstError); });
+ State->DestFile.reset();
+ std::error_code Ec;
+ std::filesystem::remove(State->DestPath, Ec);
+ State->Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': {}"sv, State->ContentHash, Err)));
+ return;
+ }
+
+ // No Flush: consumer reads via page cache; crash recovery re-hydrates.
+ State->DestFile.reset();
+ State->Token->Complete();
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (...)
+ {
+ // Schedule failed; release file handle inline and surface via Token.
+ State->DestFile.reset();
+ std::error_code Ec;
+ std::filesystem::remove(State->DestPath, Ec);
+ State->Token->Fail(std::current_exception());
+ }
+}
+
+void
+S3AsyncStorage::GetMultipart(ParallelWork& Work,
+ WorkerThreadPool& Pool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath,
+ S3AsyncStorageStats& Stats)
+{
+ auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal());
+ Stats.RecordScheduled();
+
+ const uint64_t Chunk = m_MultipartChunkSize;
+ const uint32_t RangeCount = static_cast<uint32_t>((Size + Chunk - 1) / Chunk);
+ uint32_t TotalBlocks = 0;
+ for (uint32_t I = 0; I < RangeCount; ++I)
+ {
+ const uint64_t RS = std::min<uint64_t>(Chunk, Size - static_cast<uint64_t>(I) * Chunk);
+ TotalBlocks += static_cast<uint32_t>((RS + GetMultipartState::WriteBufferSize - 1) / GetMultipartState::WriteBufferSize);
+ }
+
+ auto State = std::make_shared<GetMultipartState>(Work, Pool, Stats, TotalBlocks);
+ State->Token = Token;
+ State->ContentHash = Hash;
+ State->DestPath = DestinationPath;
+ State->TotalRanges = RangeCount;
+ State->PendingRanges = RangeCount;
+
+ // RangeCount > AdmissionCap is fine: the per-range acquire below blocks the
+ // dispatcher (caller thread, NOT the io strand) until in-flight ranges fire
+ // their AsyncStream completions and release slots. In-flight ranges per
+ // upload stay bounded by AdmissionCap.
+
+ try
+ {
+ // Sparse + preallocated. Sparse mode lets per-range writes land at
+ // arbitrary offsets without the OS zero-filling intervening pages
+ // first; preallocating the full size up front avoids fragmentation
+ // and lazy-commit page faults during the writes themselves.
+ State->DestFile = std::make_shared<BasicFile>(DestinationPath, BasicFile::Mode::kTruncate);
+ std::error_code PrepareEc = PrepareFileForScatteredWrite(State->DestFile->Handle(), Size);
+ if (PrepareEc)
+ {
+ throw zen::runtime_error("PrepareFileForScatteredWrite failed: {}"sv, PrepareEc.message());
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ // Drop DestFile (if opened) before remove() so Windows lets the unlink
+ // proceed; otherwise we leave a 0-byte file behind on the prealloc path.
+ State->DestFile.reset();
+ std::error_code Ec;
+ std::filesystem::remove(DestinationPath, Ec);
+ Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::GetMultipart '{}': preallocate failed: {}"sv, Hash, Ex.what())));
+ return;
+ }
+
+ std::string Path = CasPath(Hash);
+
+ // Snapshot creds once for the whole multipart Get. Mirrors PutMultipart;
+ // avoids N shared-lock acquisitions on the credential provider when the
+ // range count is large.
+ const SigV4Credentials Creds = m_GetCreds();
+ if (Creds.AccessKeyId.empty())
+ {
+ // Empty creds are loop-invariant; drive PendingRanges to zero in one shot
+ // rather than acquiring TotalRanges admission slots only to fire identical
+ // failures. RecordGetPartFailure CAS keeps the first message; subsequent
+ // completions just decrement the counter.
+ RecordGetPartFailure(*State, "no credentials available");
+ for (uint32_t I = 0; I < State->TotalRanges; ++I)
+ {
+ OnGetPartCompleted(State);
+ }
+ return;
+ }
+
+ for (uint32_t I = 0; I < State->TotalRanges; ++I)
+ {
+ const uint64_t Offset = static_cast<uint64_t>(I) * Chunk;
+ const uint64_t RangeSize = std::min<uint64_t>(Chunk, Size - Offset);
+ const uint32_t RangeIdx = I;
+
+ try
+ {
+ // Per-range admission slot. Acquire blocks the caller thread when the
+ // cap is reached; in-flight ranges release slots via SlotRef destruct
+ // in their AsyncStream completion callback. Acquire on the caller
+ // (vs the io strand) is safe to block: GetMultipart is called from
+ // the provision-pool worker driving Storage::Get, never from the
+ // curl io thread that has to drain completions.
+ std::shared_ptr<void> SlotRef = AcquireAdmissionSlot(m_Admission, &Stats);
+
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "GET", Path, "", S3EmptyPayloadHash);
+ Headers->emplace("Range", fmt::format("bytes={}-{}", Offset, Offset + RangeSize - 1));
+
+ Stopwatch Timer = State->Stats.BeginRequest();
+ // Pair Begin with End so an alloc throw inside AsyncStream does not
+ // strand InFlight elevated.
+ auto InFlightGuard = MakeGuard([&State] { State->Stats.EndRequest(0, 0); });
+
+ // Per-range mirror of GetStreamState's buffered-write machinery (see
+ // the medium-tier branch in Get()).
+ constexpr size_t WriteBufferSize = GetMultipartState::WriteBufferSize;
+
+ struct RangeWriteState
+ {
+ IoBuffer ActiveBuf;
+ size_t BufFill = 0;
+ uint64_t NextAbsOffset;
+ uint64_t TotalReceived = 0;
+ std::atomic<uint32_t> PendingWork{1}; // +1 for stream completion
+ };
+
+ auto WState = std::make_shared<RangeWriteState>();
+ WState->NextAbsOffset = Offset;
+
+ m_Client.AsyncStream(
+ Path,
+ [this, State, RangeSize, RangeIdx, WState](const uint8_t* Data, size_t SizeBytes, uint64_t /*TotalSize*/) -> bool {
+ if (WState->TotalReceived + SizeBytes > RangeSize)
+ {
+ RecordGetPartFailure(*State,
+ fmt::format("S3 GET range {} overflow: got {} bytes past expected {}",
+ RangeIdx,
+ WState->TotalReceived + SizeBytes,
+ RangeSize));
+ return false;
+ }
+ const uint8_t* Cur = Data;
+ size_t Remaining = SizeBytes;
+ while (Remaining > 0)
+ {
+ if (!WState->ActiveBuf)
+ {
+ WState->ActiveBuf = State->Buffers.Acquire();
+ }
+ const size_t Avail = WriteBufferSize - WState->BufFill;
+ const size_t Take = std::min(Remaining, Avail);
+ std::memcpy(WState->ActiveBuf.MutableData<uint8_t>() + WState->BufFill, Cur, Take);
+ WState->BufFill += Take;
+ WState->TotalReceived += Take;
+ Cur += Take;
+ Remaining -= Take;
+ if (WState->BufFill == WriteBufferSize)
+ {
+ IoBuffer Buf = std::move(WState->ActiveBuf);
+ const size_t Fill = WState->BufFill;
+ const uint64_t AbsOffset = WState->NextAbsOffset;
+ WState->NextAbsOffset += Fill;
+ WState->BufFill = 0;
+ WState->PendingWork.fetch_add(1, std::memory_order_acq_rel);
+ try
+ {
+ State->Work.ScheduleWork(
+ State->Pool,
+ [this, State, RangeIdx, WState, Buf = std::move(Buf), Fill, AbsOffset](std::atomic<bool>&) mutable {
+ try
+ {
+ State->DestFile->Write(Buf.GetData(), Fill, AbsOffset);
+ }
+ catch (const std::exception& Ex)
+ {
+ RecordGetPartFailure(*State,
+ fmt::format("S3 GET range {} write failed: {}", RangeIdx, Ex.what()));
+ }
+ State->Buffers.Release(std::move(Buf));
+ if (WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1)
+ {
+ OnGetPartCompleted(State);
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ RecordGetPartFailure(*State, fmt::format("S3 GET range {} schedule failed: {}", RangeIdx, Ex.what()));
+ State->Buffers.RestoreAcquireSlot();
+ WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel);
+ return false;
+ }
+ }
+ }
+ return true;
+ },
+ [this, State, RangeSize, RangeIdx, WState, Timer, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable {
+ State->Stats.EndRequest(Timer.GetElapsedTimeUs(), Resp.IsSuccess() ? WState->TotalReceived : 0);
+ bool Failed = !Resp.IsSuccess();
+ if (Failed)
+ {
+ RecordGetPartFailure(*State, S3ErrorMessage(fmt::format("S3 GET range {} failed", RangeIdx), Resp));
+ }
+ else if (WState->TotalReceived != RangeSize)
+ {
+ RecordGetPartFailure(
+ *State,
+ fmt::format("S3 GET range {} wrote {} bytes, expected {}", RangeIdx, WState->TotalReceived, RangeSize));
+ Failed = true;
+ }
+ if (!Failed && WState->BufFill > 0)
+ {
+ IoBuffer Buf = std::move(WState->ActiveBuf);
+ const size_t Fill = WState->BufFill;
+ const uint64_t AbsOffset = WState->NextAbsOffset;
+ WState->BufFill = 0;
+ WState->PendingWork.fetch_add(1, std::memory_order_acq_rel);
+ try
+ {
+ State->Work.ScheduleWork(
+ State->Pool,
+ [this, State, RangeIdx, WState, Buf = std::move(Buf), Fill, AbsOffset](std::atomic<bool>&) mutable {
+ try
+ {
+ State->DestFile->Write(Buf.GetData(), Fill, AbsOffset);
+ }
+ catch (const std::exception& Ex)
+ {
+ RecordGetPartFailure(*State, fmt::format("S3 GET range {} write failed: {}", RangeIdx, Ex.what()));
+ }
+ State->Buffers.Release(std::move(Buf));
+ if (WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1)
+ {
+ OnGetPartCompleted(State);
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ RecordGetPartFailure(*State,
+ fmt::format("S3 GET range {} tail-flush schedule failed: {}", RangeIdx, Ex.what()));
+ State->Buffers.RestoreAcquireSlot();
+ WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel);
+ }
+ }
+ if (WState->PendingWork.fetch_sub(1, std::memory_order_acq_rel) == 1)
+ {
+ OnGetPartCompleted(State);
+ }
+ },
+ Headers);
+ InFlightGuard.Dismiss();
+ }
+ catch (const std::exception& Ex)
+ {
+ RecordGetPartFailure(*State, fmt::format("S3 GET range {} dispatch failed: {}", RangeIdx, Ex.what()));
+ OnGetPartCompleted(State);
+ }
+ }
+}
+
+void
+S3AsyncStorage::OnGetPartCompleted(std::shared_ptr<GetMultipartState> State)
+{
+ const uint32_t Remaining = State->PendingRanges.fetch_sub(1, std::memory_order_acq_rel);
+ if (Remaining != 1)
+ {
+ return;
+ }
+
+ // All disk I/O hopped to the worker pool: see note in
+ // OnGetStreamFinalised.
+ const bool Failed = State->AnyFailed.load(std::memory_order_acquire);
+ try
+ {
+ State->Work.ScheduleWork(
+ State->Pool,
+ [State, Failed](std::atomic<bool>&) {
+ if (Failed)
+ {
+ std::string FirstError;
+ State->ErrorLock.WithExclusiveLock([&] { FirstError = std::move(State->FirstError); });
+ State->DestFile.reset();
+ std::error_code Ec;
+ std::filesystem::remove(State->DestPath, Ec);
+ State->Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Get '{}': {}"sv, State->ContentHash, FirstError)));
+ return;
+ }
+
+ State->DestFile.reset();
+ State->Token->Complete();
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (...)
+ {
+ State->DestFile.reset();
+ std::error_code Ec;
+ std::filesystem::remove(State->DestPath, Ec);
+ State->Token->Fail(std::current_exception());
+ }
+}
+
+void
+S3AsyncStorage::Touch(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, S3AsyncStorageStats& Stats)
+{
+ auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal());
+ Stats.RecordScheduled();
+
+ std::shared_ptr<void> SlotRef = AcquireAdmissionSlot(m_Admission, &Stats);
+
+ // Snapshot creds on the dispatcher; see PutSmall.
+ SigV4Credentials Creds = m_GetCreds();
+
+ Work.ScheduleWork(
+ Pool,
+ [this, Hash = IoHash(Hash), Token, Stats, Creds = std::move(Creds), SlotRef = std::move(SlotRef)](
+ std::atomic<bool>& Abort) mutable {
+ if (Abort.load())
+ {
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Touch '{}': aborted"sv, Hash)));
+ return;
+ }
+ try
+ {
+ if (Creds.AccessKeyId.empty())
+ {
+ Token->Fail(
+ std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Touch '{}': no credentials available"sv, Hash)));
+ return;
+ }
+
+ std::string Key = CasKey(Hash);
+ std::string Path = CasPath(Hash);
+
+ std::vector<std::pair<std::string, std::string>> ExtraSigned{
+ {"x-amz-copy-source", fmt::format("/{}/{}", m_Builder.BucketName(), AwsUriEncode(Key, false))},
+ {"x-amz-metadata-directive", "REPLACE"},
+ };
+
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "PUT", Path, "", S3EmptyPayloadHash, ExtraSigned);
+
+ Stopwatch Timer = Stats.BeginRequest();
+ // Pair Begin with End so an alloc throw inside AsyncPut does not strand
+ // InFlight elevated.
+ auto InFlightGuard = MakeGuard([&Stats] { Stats.EndRequest(0, 0); });
+ m_Client.AsyncPut(
+ Path,
+ [Hash, Token, Timer, Stats, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable {
+ Stats.EndRequest(Timer.GetElapsedTimeUs(), 0);
+ if (!Resp.IsSuccess())
+ {
+ std::string Err = S3ErrorMessage("S3 Touch failed", Resp);
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Touch '{}': {}"sv, Hash, Err)));
+ return;
+ }
+ // PUT-COPY (REPLACE) can return HTTP 200 with an <Error> body. Mirror
+ // the sync S3Client::Touch check; without this the dehydrate-touch
+ // fails silently.
+ std::string_view Body = Resp.AsText();
+ std::string_view ErrorCode;
+ std::string_view ErrorMessage;
+ if (S3ExtractError(Body, ErrorCode, ErrorMessage))
+ {
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::Touch '{}': returned error: {} - {}"sv,
+ Hash,
+ ErrorCode,
+ ErrorMessage)));
+ return;
+ }
+ Token->Complete();
+ },
+ Headers);
+ InFlightGuard.Dismiss();
+ }
+ catch (...)
+ {
+ Token->Fail(std::current_exception());
+ }
+ });
+}
+
+std::vector<std::string>
+S3AsyncStorage::ListAllObjects(std::string_view Prefix)
+{
+ constexpr std::string_view ContentsCloseTag = "</Contents>";
+
+ // List requests are not counted in PhaseStats; List/DeleteAll is admin
+ // scaffolding (test fixture teardown, manual obliterate). Mirrors the
+ // CreateMultipart/Complete/Abort exclusion in PutMultipart.
+
+ // Snapshot creds once for the listing run; ListObjectsV2 pagination is
+ // fast enough that mid-list refresh isn't needed.
+ const SigV4Credentials Creds = m_GetCreds();
+ if (Creds.AccessKeyId.empty())
+ {
+ throw zen::runtime_error("S3AsyncStorage::ListAllObjects: no credentials available"sv);
+ }
+
+ std::vector<std::string> Keys;
+ std::string ContinuationToken;
+ std::string RootPath = m_Builder.BucketRootPath();
+ while (true)
+ {
+ std::string CanonicalQs = fmt::format("list-type=2&prefix={}", AwsUriEncode(Prefix));
+ if (!ContinuationToken.empty())
+ {
+ CanonicalQs += fmt::format("&continuation-token={}", AwsUriEncode(ContinuationToken));
+ }
+
+ HttpClient::KeyValueMap Headers = m_Builder.SignRequest(Creds, "GET", RootPath, CanonicalQs, S3EmptyPayloadHash);
+
+ std::string FullPath = S3BuildRequestPath(RootPath, CanonicalQs);
+ HttpClient::Response Resp = m_Client.Get(FullPath, Headers).get();
+ if (!Resp.IsSuccess())
+ {
+ throw zen::runtime_error("S3AsyncStorage::ListAllObjects '{}': {}"sv, Prefix, S3ErrorMessage("S3 LIST failed", Resp));
+ }
+
+ if (!Resp.ResponsePayload)
+ {
+ break;
+ }
+ std::string_view Body(reinterpret_cast<const char*>(Resp.ResponsePayload.GetData()), Resp.ResponsePayload.GetSize());
+
+ std::string_view Cursor = Body;
+ while (true)
+ {
+ size_t ContentsOpen = Cursor.find("<Contents>");
+ if (ContentsOpen == std::string_view::npos)
+ {
+ break;
+ }
+ size_t ContentsClose = Cursor.find(ContentsCloseTag, ContentsOpen);
+ if (ContentsClose == std::string_view::npos)
+ {
+ break;
+ }
+ std::string_view ContentsBlock = Cursor.substr(ContentsOpen, ContentsClose - ContentsOpen);
+ std::string_view Key = S3ExtractXmlValue(ContentsBlock, "Key");
+ Keys.emplace_back(Key);
+ Cursor = Cursor.substr(ContentsClose + ContentsCloseTag.size());
+ }
+
+ std::string_view IsTruncated = S3ExtractXmlValue(Body, "IsTruncated");
+ if (IsTruncated != "true")
+ {
+ break;
+ }
+ std::string_view NextToken = S3ExtractXmlValue(Body, "NextContinuationToken");
+ if (NextToken.empty())
+ {
+ break;
+ }
+ ContinuationToken.assign(NextToken);
+ }
+ return Keys;
+}
+
+std::vector<IoHash>
+S3AsyncStorage::List()
+{
+ std::string CasPrefix = fmt::format("{}/cas/", m_KeyPrefix);
+ std::vector<std::string> Keys = ListAllObjects(CasPrefix);
+
+ std::vector<IoHash> Hashes;
+ Hashes.reserve(Keys.size());
+ for (const std::string& Key : Keys)
+ {
+ size_t LastSlash = Key.rfind('/');
+ if (LastSlash == std::string::npos)
+ {
+ continue;
+ }
+ IoHash Hash;
+ if (IoHash::TryParse(std::string_view(Key).substr(LastSlash + 1), Hash))
+ {
+ Hashes.push_back(Hash);
+ }
+ }
+ return Hashes;
+}
+
+void
+S3AsyncStorage::DeleteAll(ParallelWork& Work)
+{
+ std::string Prefix = fmt::format("{}/", m_KeyPrefix);
+ std::vector<std::string> Keys = ListAllObjects(Prefix);
+
+ // Snapshot creds once for the whole obliterate; saves N shared-lock
+ // acquisitions on the credential provider.
+ const SigV4Credentials DelCreds = m_GetCreds();
+ if (DelCreds.AccessKeyId.empty())
+ {
+ auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal());
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::DeleteAll: no credentials available"sv)));
+ return;
+ }
+
+ for (const std::string& Key : Keys)
+ {
+ auto Token = std::make_shared<ParallelWork::ExternalWorkToken>(Work.RegisterExternal());
+
+ try
+ {
+ std::string Path = m_Builder.KeyToPath(Key);
+
+ HttpClient::KeyValueMap DelHeaders = m_Builder.SignRequest(DelCreds, "DELETE", Path, "", S3EmptyPayloadHash);
+
+ // DeleteAll is untracked by stats - admission slot is acquired but the
+ // wait is not recorded; passes Stats == nullptr to the helper.
+ std::shared_ptr<void> SlotRef = AcquireAdmissionSlot(m_Admission);
+
+ m_Client.AsyncDelete(
+ Path,
+ [KeyCopy = Key, Token, SlotRef = std::move(SlotRef)](HttpClient::Response Resp) mutable {
+ if (!Resp.IsSuccess() && Resp.StatusCode != HttpResponseCode::NotFound)
+ {
+ std::string Err = S3ErrorMessage("S3 DELETE failed", Resp);
+ Token->Fail(std::make_exception_ptr(zen::runtime_error("S3AsyncStorage::DeleteAll '{}': {}"sv, KeyCopy, Err)));
+ return;
+ }
+ Token->Complete();
+ },
+ DelHeaders);
+ }
+ catch (...)
+ {
+ // Sign / acquire / dispatch threw before AsyncDelete posted. Surface via Token so
+ // Wait() reports the partial failure instead of silently dropping the key.
+ Token->Fail(std::current_exception());
+ }
+ }
+}
+
+void
+s3asyncstorage_forcelink()
+{
+}
+
+#if ZEN_WITH_TESTS
+
+namespace {
+ // Per-binary unique MinIO port.
+ uint16_t AllocateMinioTestPort()
+ {
+ static const uint16_t Base = static_cast<uint16_t>(20000u + (static_cast<uint32_t>(GetCurrentProcessId()) % 30000u));
+ static std::atomic<uint16_t> Slot{0};
+ return Base + Slot.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ MinioProcessOptions MakeMinioOpts()
+ {
+ MinioProcessOptions Opts;
+ Opts.Port = AllocateMinioTestPort();
+ return Opts;
+ }
+
+ struct AsyncS3Fixture
+ {
+ MinioProcess Minio{MakeMinioOpts()};
+ ScopedTemporaryDirectory TmpDir;
+ std::unique_ptr<AsyncHttpClient> ClientStorage;
+ std::unique_ptr<S3RequestBuilder> Builder;
+ AsyncHttpClient* Client = nullptr;
+ SigV4Credentials Creds;
+ WorkerThreadPool Pool{4};
+
+ AsyncS3Fixture()
+ {
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("async-test");
+
+ Creds.AccessKeyId = "minioadmin";
+ Creds.SecretAccessKey = "minioadmin";
+
+ Builder = std::make_unique<S3RequestBuilder>("us-east-1", "async-test", Minio.Endpoint(), /*PathStyle*/ true);
+
+ HttpClientSettings Settings;
+ Settings.LogCategory = "async-s3-test";
+ Settings.MaxConcurrentConnectionsPerHost = 16;
+ // Match production S3Hydration: cover MinIO post-spawn 503
+ // (XMinioServerNotInitialized) transients before storage backend
+ // finishes warming up after the spawn ready-check passes.
+ Settings.RetryCount = 3;
+ ClientStorage = std::make_unique<AsyncHttpClient>(Minio.Endpoint(), Settings);
+ Client = ClientStorage.get();
+ }
+ };
+
+ std::filesystem::path WriteBlob(const std::filesystem::path& Path, const std::vector<uint8_t>& Bytes)
+ {
+ WriteFile(Path, IoBuffer(IoBuffer::Wrap, Bytes.data(), Bytes.size()));
+ return Path;
+ }
+
+ struct StatsBlock
+ {
+ std::atomic<uint64_t> RequestCount{0};
+ std::atomic<uint64_t> RequestTotalUs{0};
+ std::atomic<uint64_t> RequestMaxUs{0};
+ std::atomic<uint64_t> Bytes{0};
+ std::atomic<uint32_t> InFlight{0};
+ std::atomic<uint32_t> InFlightPeak{0};
+ std::atomic<uint64_t> FirstScheduleUs{UINT64_MAX};
+ std::atomic<uint64_t> FirstStartUs{UINT64_MAX};
+ std::atomic<uint64_t> AdmissionWaitTotalUs{0};
+ std::atomic<uint64_t> AdmissionWaitMaxUs{0};
+ Stopwatch PhaseClock;
+
+ S3AsyncStorageStats Ref()
+ {
+ return S3AsyncStorageStats{RequestCount,
+ RequestTotalUs,
+ RequestMaxUs,
+ Bytes,
+ InFlight,
+ InFlightPeak,
+ FirstScheduleUs,
+ FirstStartUs,
+ AdmissionWaitTotalUs,
+ AdmissionWaitMaxUs,
+ PhaseClock};
+ }
+ };
+} // namespace
+
+TEST_SUITE_BEGIN("server.s3asyncstorage");
+
+TEST_CASE("s3asyncstorage.put_get_round_trip")
+{
+ StatsBlock Sb;
+ AsyncS3Fixture F;
+ S3AsyncStorage Storage(
+ *F.Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-A",
+ 8u * 1024u * 1024u);
+
+ const uint64_t Size = 64u * 1024u;
+ std::vector<uint8_t> Bytes(Size);
+ for (size_t I = 0; I < Size; ++I)
+ {
+ Bytes[I] = static_cast<uint8_t>((I * 31u) & 0xFF);
+ }
+
+ std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "src.bin", Bytes);
+ IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size());
+
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Put(Work, F.Pool, ContentHash, Size, SrcPath, Stats);
+ Work.Wait();
+ }
+
+ std::filesystem::path DstPath = F.TmpDir.Path() / "dst.bin";
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Get(Work, F.Pool, ContentHash, Size, DstPath, Stats);
+ Work.Wait();
+ }
+
+ REQUIRE(std::filesystem::exists(DstPath));
+ REQUIRE(std::filesystem::file_size(DstPath) == Size);
+
+ BasicFile Out(DstPath, BasicFile::Mode::kRead);
+ IoBuffer Read = Out.ReadAll();
+ IoHash ReadHash = IoHash::HashBuffer(Read);
+ CHECK(ReadHash == ContentHash);
+}
+
+TEST_CASE("s3asyncstorage.touch_existing_object")
+{
+ StatsBlock Sb;
+ AsyncS3Fixture F;
+ S3AsyncStorage Storage(
+ *F.Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-touch",
+ 8u * 1024u * 1024u);
+
+ std::vector<uint8_t> Bytes{1, 2, 3, 4, 5};
+ std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "touch.bin", Bytes);
+ IoHash Hash = IoHash::HashBuffer(Bytes.data(), Bytes.size());
+
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Put(Work, F.Pool, Hash, Bytes.size(), SrcPath, Stats);
+ Work.Wait();
+ }
+
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Touch(Work, F.Pool, Hash, Stats);
+ Work.Wait();
+ }
+}
+
+TEST_CASE("s3asyncstorage.list_returns_uploaded_hashes")
+{
+ StatsBlock Sb;
+ AsyncS3Fixture F;
+ S3AsyncStorage Storage(
+ *F.Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-list",
+ 8u * 1024u * 1024u);
+
+ const size_t N = 5;
+ std::vector<IoHash> Hashes;
+ std::vector<std::filesystem::path> SrcPaths;
+ for (size_t I = 0; I < N; ++I)
+ {
+ std::vector<uint8_t> Bytes(64, static_cast<uint8_t>(I + 1));
+ std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("blob_{}.bin", I), Bytes);
+ Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size()));
+ SrcPaths.push_back(P);
+ }
+
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ for (size_t I = 0; I < N; ++I)
+ {
+ Storage.Put(Work, F.Pool, Hashes[I], 64, SrcPaths[I], Stats);
+ }
+ Work.Wait();
+ }
+
+ std::vector<IoHash> Listed = Storage.List();
+ std::sort(Listed.begin(), Listed.end());
+ std::vector<IoHash> Expected = Hashes;
+ std::sort(Expected.begin(), Expected.end());
+ CHECK(Listed == Expected);
+}
+
+TEST_CASE("s3asyncstorage.streaming_download_round_trip")
+{
+ // Object size sits between the streaming threshold (512 KiB) and the
+ // multipart threshold (PartSize + PartSize/4). Exercises the medium-tier
+ // AsyncStream branch in S3AsyncStorage::Get - body streams via a 512 KiB
+ // IoBuffer pool with per-buffer worker write hops.
+ StatsBlock Sb;
+ AsyncS3Fixture F;
+ S3AsyncStorage Storage(
+ *F.Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-stream",
+ 8u * 1024u * 1024u);
+
+ const uint64_t Size = 2u * 1024u * 1024u; // 2 MiB - between 512 KiB and 10 MiB
+ std::vector<uint8_t> Bytes(Size);
+ for (size_t I = 0; I < Size; ++I)
+ {
+ Bytes[I] = static_cast<uint8_t>((I * 53u + 11u) & 0xFF);
+ }
+ std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "stream.bin", Bytes);
+ IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size());
+
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Put(Work, F.Pool, ContentHash, Size, SrcPath, Stats);
+ Work.Wait();
+ }
+
+ std::filesystem::path DstPath = F.TmpDir.Path() / "stream_out.bin";
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Get(Work, F.Pool, ContentHash, Size, DstPath, Stats);
+ Work.Wait();
+ }
+
+ REQUIRE(std::filesystem::exists(DstPath));
+ REQUIRE(std::filesystem::file_size(DstPath) == Size);
+ BasicFile Out(DstPath, BasicFile::Mode::kRead);
+ IoBuffer Read = Out.ReadAll();
+ IoHash ReadHash = IoHash::HashBuffer(Read);
+ CHECK(ReadHash == ContentHash);
+}
+
+TEST_CASE("s3asyncstorage.multipart_round_trip")
+{
+ StatsBlock Sb;
+ AsyncS3Fixture F;
+ const uint64_t PartSize = 5u * 1024u * 1024u; // MinIO accepts >=5MiB parts.
+ S3AsyncStorage Storage(
+ *F.Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-multipart",
+ PartSize);
+
+ // Three parts: 5 MiB + 5 MiB + 1 MiB. Threshold is PartSize + PartSize/4 (=
+ // 6.25 MiB). Total 11 MiB triggers multipart path.
+ const uint64_t TotalSize = 11u * 1024u * 1024u;
+ std::vector<uint8_t> Bytes(TotalSize);
+ for (size_t I = 0; I < TotalSize; ++I)
+ {
+ Bytes[I] = static_cast<uint8_t>((I * 131u + 7u) & 0xFF);
+ }
+
+ std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "big.bin", Bytes);
+ IoHash Hash = IoHash::HashBuffer(Bytes.data(), Bytes.size());
+
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Put(Work, F.Pool, Hash, TotalSize, SrcPath, Stats);
+ Work.Wait();
+ }
+
+ std::filesystem::path DstPath = F.TmpDir.Path() / "big_out.bin";
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Get(Work, F.Pool, Hash, TotalSize, DstPath, Stats);
+ Work.Wait();
+ }
+
+ REQUIRE(std::filesystem::exists(DstPath));
+ REQUIRE(std::filesystem::file_size(DstPath) == TotalSize);
+
+ BasicFile Out(DstPath, BasicFile::Mode::kRead);
+ IoBuffer Read = Out.ReadAll();
+ IoHash ReadHash = IoHash::HashBuffer(Read);
+ CHECK(ReadHash == Hash);
+}
+
+TEST_CASE("s3asyncstorage.parallel_puts")
+{
+ StatsBlock Sb;
+ AsyncS3Fixture F;
+ S3AsyncStorage Storage(
+ *F.Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-parallel",
+ 8u * 1024u * 1024u);
+
+ const size_t N = 32;
+ std::vector<IoHash> Hashes;
+ std::vector<std::filesystem::path> SrcPaths;
+ for (size_t I = 0; I < N; ++I)
+ {
+ std::vector<uint8_t> Bytes(1024, static_cast<uint8_t>(I));
+ std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("p_{}.bin", I), Bytes);
+ Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size()));
+ SrcPaths.push_back(P);
+ }
+
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ for (size_t I = 0; I < N; ++I)
+ {
+ Storage.Put(Work, F.Pool, Hashes[I], 1024, SrcPaths[I], Stats);
+ }
+ Work.Wait();
+
+ CHECK(Storage.List().size() == N);
+}
+
+namespace {
+ // One MinIO + builder + creds + pool, reused across multiple admission /
+ // throttle test scopes. Each scope constructs its own AsyncHttpClient via
+ // MakeClient(Cap) so different MaxConcurrentRequests values are exercised
+ // without re-spawning MinIO.
+ struct ThrottledAsyncS3Fixture
+ {
+ MinioProcess Minio{MakeMinioOpts()};
+ ScopedTemporaryDirectory TmpDir;
+ std::unique_ptr<S3RequestBuilder> Builder;
+ SigV4Credentials Creds;
+ WorkerThreadPool Pool{4};
+
+ ThrottledAsyncS3Fixture()
+ {
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("async-test-throttle");
+
+ Creds.AccessKeyId = "minioadmin";
+ Creds.SecretAccessKey = "minioadmin";
+
+ Builder = std::make_unique<S3RequestBuilder>("us-east-1", "async-test-throttle", Minio.Endpoint(), /*PathStyle*/ true);
+ }
+
+ std::unique_ptr<AsyncHttpClient> MakeClient(uint32_t MaxConcurrentRequests)
+ {
+ HttpClientSettings Settings;
+ Settings.LogCategory = "async-s3-throttle-test";
+ Settings.MaxConcurrentConnectionsPerHost = 16;
+ Settings.MaxConcurrentRequests = MaxConcurrentRequests;
+ // Match production S3Hydration: cover MinIO post-spawn 503
+ // (XMinioServerNotInitialized) transients before storage backend
+ // finishes warming up after the spawn ready-check passes.
+ Settings.RetryCount = 3;
+ return std::make_unique<AsyncHttpClient>(Minio.Endpoint(), Settings);
+ }
+ };
+} // namespace
+
+// Non-multipart admission scenarios: small-file fanout under a cap, fanout
+// with admission disabled, and admission-wait stat recording. All share one
+// MinIO instance via ThrottledAsyncS3Fixture; each scope picks its own Cap
+// and KeyPrefix so the bucket entries don't collide across scopes.
+TEST_CASE("s3asyncstorage.admission.fanout")
+{
+ ThrottledAsyncS3Fixture F;
+
+ // respects.async.client.cap - in-flight peak <= storage admission cap.
+ {
+ StatsBlock Sb;
+ const uint32_t Cap = 2;
+ auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap));
+ auto Client = F.MakeClient(Cap);
+ S3AsyncStorage Storage(
+ *Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-cap",
+ 8u * 1024u * 1024u,
+ Admission,
+ Cap);
+
+ const size_t N = 8;
+ std::vector<IoHash> Hashes;
+ std::vector<std::filesystem::path> SrcPaths;
+ for (size_t I = 0; I < N; ++I)
+ {
+ std::vector<uint8_t> Bytes(256, static_cast<uint8_t>(I));
+ std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("c_{}.bin", I), Bytes);
+ Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size()));
+ SrcPaths.push_back(P);
+ }
+
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ for (size_t I = 0; I < N; ++I)
+ {
+ Storage.Put(Work, F.Pool, Hashes[I], 256, SrcPaths[I], Stats);
+ }
+ Work.Wait();
+
+ std::vector<IoHash> Listed = Storage.List();
+ std::sort(Listed.begin(), Listed.end());
+ std::vector<IoHash> Expected = Hashes;
+ std::sort(Expected.begin(), Expected.end());
+ CHECK(Listed == Expected);
+ CHECK(Sb.InFlightPeak.load() <= 2u);
+ }
+
+ // unlimited_concurrent_requests - admission disabled (nullptr semaphore,
+ // Cap=0). Storage drains the fanout cleanly without gating.
+ {
+ StatsBlock Sb;
+ auto Client = F.MakeClient(0);
+ S3AsyncStorage Storage(
+ *Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-uncapped",
+ 8u * 1024u * 1024u);
+
+ const size_t N = 8;
+ std::vector<IoHash> Hashes;
+ std::vector<std::filesystem::path> SrcPaths;
+ for (size_t I = 0; I < N; ++I)
+ {
+ std::vector<uint8_t> Bytes(256, static_cast<uint8_t>(I));
+ std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("u_{}.bin", I), Bytes);
+ Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size()));
+ SrcPaths.push_back(P);
+ }
+
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ for (size_t I = 0; I < N; ++I)
+ {
+ Storage.Put(Work, F.Pool, Hashes[I], 256, SrcPaths[I], Stats);
+ }
+ Work.Wait();
+
+ std::vector<IoHash> Listed = Storage.List();
+ std::sort(Listed.begin(), Listed.end());
+ std::vector<IoHash> Expected = Hashes;
+ std::sort(Expected.begin(), Expected.end());
+ CHECK(Listed == Expected);
+ }
+
+ // admission.wait.us.recorded - cap=2 with 8 fanout submits forces at least
+ // some submissions to block on the admission semaphore. AdmissionWait
+ // totals must be non-zero.
+ {
+ StatsBlock Sb;
+ const uint32_t Cap = 2;
+ auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap));
+ auto Client = F.MakeClient(Cap);
+ S3AsyncStorage Storage(
+ *Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-adm-wait",
+ 8u * 1024u * 1024u,
+ Admission,
+ Cap);
+
+ const size_t N = 8;
+ std::vector<IoHash> Hashes;
+ std::vector<std::filesystem::path> SrcPaths;
+ for (size_t I = 0; I < N; ++I)
+ {
+ std::vector<uint8_t> Bytes(256, static_cast<uint8_t>(I));
+ std::filesystem::path P = WriteBlob(F.TmpDir.Path() / fmt::format("aw_{}.bin", I), Bytes);
+ Hashes.push_back(IoHash::HashBuffer(Bytes.data(), Bytes.size()));
+ SrcPaths.push_back(P);
+ }
+
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ for (size_t I = 0; I < N; ++I)
+ {
+ Storage.Put(Work, F.Pool, Hashes[I], 256, SrcPaths[I], Stats);
+ }
+ Work.Wait();
+
+ CHECK(Sb.AdmissionWaitTotalUs.load() > 0u);
+ CHECK(Sb.AdmissionWaitMaxUs.load() > 0u);
+ CHECK(Sb.InFlightPeak.load() <= Cap);
+ }
+}
+
+// Drives the AbortMultipart path: a 3-part multipart upload where one part is
+// forced to fail via the test hook. AnyFailed flips, FinalizePutPart hops
+// off-strand once PendingParts reaches 1, AbortMultipart sends DELETE to S3
+// to discard the upload-id.
+// Token must surface the original part failure; bucket must not contain the
+// uploaded CAS object after Wait returns.
+TEST_CASE("s3asyncstorage.multipart.abort_on_part_failure")
+{
+ StatsBlock Sb;
+ AsyncS3Fixture F;
+ const uint64_t PartSize = 5u * 1024u * 1024u;
+ S3AsyncStorage Storage(
+ *F.Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-multipart-abort",
+ PartSize);
+
+ const uint64_t TotalSize = 11u * 1024u * 1024u; // 3 parts: 5+5+1 MiB
+ std::vector<uint8_t> Bytes(TotalSize);
+ for (size_t I = 0; I < TotalSize; ++I)
+ {
+ Bytes[I] = static_cast<uint8_t>((I * 23u + 9u) & 0xFF);
+ }
+ std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "abort.bin", Bytes);
+ IoHash Hash = IoHash::HashBuffer(Bytes.data(), Bytes.size());
+
+ // Force one part to fail; AnyFailed -> AbortMultipart.
+ s3asyncstorage_test_hooks::ForceNextPartFailures(1);
+
+ bool ThrewFromWait = false;
+ std::string ErrMessage;
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Put(Work, F.Pool, Hash, TotalSize, SrcPath, Stats);
+ try
+ {
+ Work.Wait();
+ }
+ catch (const std::exception& Ex)
+ {
+ ThrewFromWait = true;
+ ErrMessage = Ex.what();
+ }
+ }
+ CHECK(ThrewFromWait);
+ CHECK(ErrMessage.find("test-injected part failure") != std::string::npos);
+
+ // Reset hook so subsequent multipart tests aren't affected.
+ s3asyncstorage_test_hooks::ForceNextPartFailures(0);
+
+ // Bucket must not contain the CAS object - AbortMultipart discards the
+ // upload before S3 publishes the assembled object.
+ std::vector<IoHash> Listed = Storage.List();
+ CHECK(std::find(Listed.begin(), Listed.end(), Hash) == Listed.end());
+}
+
+// Multipart admission scenarios sharing one MinIO instance. Each scope picks
+// its own Cap, KeyPrefix, and payload pattern so bucket entries don't collide.
+// Covers:
+// - under.cap: TotalParts < cap (no slot handoff needed); round-trip Put+Get.
+// - no.deadlock.streaming.get: Put + ranged Get on same hydration pool.
+// - parts.exceed.cap.paces: TotalParts > cap; wave + slot-handoff completes
+// successfully with InFlightPeak <= cap.
+// - aborts.release.tokens: forced part failure releases held slots so a
+// follow-up Put can run.
+// - handoff.in.flight: cap=1 forces strictly sequential dispatch.
+TEST_CASE("s3asyncstorage.admission.multipart")
+{
+ ThrottledAsyncS3Fixture F;
+ const uint64_t PartSize = 5u * 1024u * 1024u; // MinIO accepts >= 5 MiB parts.
+
+ // under.cap: 3-part Put + Get sits within the initial wave (cap=4).
+ {
+ StatsBlock Sb;
+ const uint32_t Cap = 4;
+ auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap));
+ auto Client = F.MakeClient(Cap);
+ S3AsyncStorage Storage(
+ *Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-multipart-cap",
+ PartSize,
+ Admission,
+ Cap);
+
+ const uint64_t TotalSize = 11u * 1024u * 1024u;
+ std::vector<uint8_t> Bytes(TotalSize);
+ for (size_t I = 0; I < TotalSize; ++I)
+ {
+ Bytes[I] = static_cast<uint8_t>((I * 17u + 5u) & 0xFF);
+ }
+ std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp.bin", Bytes);
+ IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size());
+
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats);
+ Work.Wait();
+ }
+
+ std::filesystem::path DstPath = F.TmpDir.Path() / "mp_out.bin";
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Get(Work, F.Pool, ContentHash, TotalSize, DstPath, Stats);
+ Work.Wait();
+ }
+
+ REQUIRE(std::filesystem::exists(DstPath));
+ REQUIRE(std::filesystem::file_size(DstPath) == TotalSize);
+ BasicFile Out(DstPath, BasicFile::Mode::kRead);
+ IoBuffer Read = Out.ReadAll();
+ IoHash ReadHash = IoHash::HashBuffer(Read);
+ CHECK(ReadHash == ContentHash);
+ }
+
+ // no.deadlock.streaming.get: PutMultipart fans onto hydration pool while
+ // io strand fires AsyncPut completions; followed by Get exercising stream
+ // write-back on the same pool.
+ {
+ StatsBlock Sb;
+ const uint32_t Cap = 4;
+ auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap));
+ auto Client = F.MakeClient(Cap);
+ S3AsyncStorage Storage(
+ *Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-stream-no-deadlock",
+ PartSize,
+ Admission,
+ Cap);
+
+ const uint64_t TotalSize = 11u * 1024u * 1024u;
+ std::vector<uint8_t> Bytes(TotalSize);
+ for (size_t I = 0; I < TotalSize; ++I)
+ {
+ Bytes[I] = static_cast<uint8_t>((I * 41u + 7u) & 0xFF);
+ }
+ std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "sd.bin", Bytes);
+ IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size());
+
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats);
+ Work.Wait();
+ }
+
+ std::filesystem::path DstPath = F.TmpDir.Path() / "sd_out.bin";
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Get(Work, F.Pool, ContentHash, TotalSize, DstPath, Stats);
+ Work.Wait();
+ }
+
+ REQUIRE(std::filesystem::exists(DstPath));
+ REQUIRE(std::filesystem::file_size(DstPath) == TotalSize);
+ BasicFile Out(DstPath, BasicFile::Mode::kRead);
+ IoBuffer Read = Out.ReadAll();
+ IoHash ReadHash = IoHash::HashBuffer(Read);
+ CHECK(ReadHash == ContentHash);
+ }
+
+ // parts.exceed.cap.paces: TotalParts > cap completes by pacing via slot
+ // handoff. Initial wave dispatches Cap parts; each completion hands its
+ // slot to the next undispatched part. InFlightPeak stays bounded by Cap.
+ {
+ StatsBlock Sb;
+ const uint32_t Cap = 2;
+ auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap));
+ auto Client = F.MakeClient(Cap);
+ S3AsyncStorage Storage(
+ *Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-mp-paces",
+ PartSize,
+ Admission,
+ Cap);
+
+ const uint64_t TotalSize = 16u * 1024u * 1024u; // 4 parts > cap=2
+ std::vector<uint8_t> Bytes(TotalSize, 0xA5);
+ std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp_paces.bin", Bytes);
+ IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size());
+
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats);
+ Work.Wait();
+ }
+ CHECK(Sb.InFlightPeak.load() <= Cap);
+ std::vector<IoHash> Listed = Storage.List();
+ CHECK(std::find(Listed.begin(), Listed.end(), ContentHash) != Listed.end());
+ }
+
+ // aborts.release.tokens: forced part failure mid-flight; drain releases
+ // admission slots so a follow-up small Put can run without blocking.
+ {
+ StatsBlock Sb;
+ const uint32_t Cap = 4;
+ auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap));
+ auto Client = F.MakeClient(Cap);
+ S3AsyncStorage Storage(
+ *Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-mp-abort-release",
+ PartSize,
+ Admission,
+ Cap);
+
+ const uint64_t TotalSize = 11u * 1024u * 1024u; // 3 parts under cap=4
+ std::vector<uint8_t> Bytes(TotalSize, 0x5A);
+ std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp_abrel.bin", Bytes);
+ IoHash AbortHash = IoHash::HashBuffer(Bytes.data(), Bytes.size());
+
+ s3asyncstorage_test_hooks::ForceNextPartFailures(1);
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Put(Work, F.Pool, AbortHash, TotalSize, SrcPath, Stats);
+ try
+ {
+ Work.Wait();
+ }
+ catch (...)
+ {
+ }
+ }
+ s3asyncstorage_test_hooks::ForceNextPartFailures(0);
+
+ std::vector<uint8_t> Small(1024, 0x11);
+ std::filesystem::path SmallPath = WriteBlob(F.TmpDir.Path() / "mp_abrel_small.bin", Small);
+ IoHash SmallHash = IoHash::HashBuffer(Small.data(), Small.size());
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Put(Work, F.Pool, SmallHash, Small.size(), SmallPath, Stats);
+ Work.Wait();
+ }
+ std::vector<IoHash> Listed = Storage.List();
+ CHECK(std::find(Listed.begin(), Listed.end(), SmallHash) != Listed.end());
+ }
+
+ // handoff.in.flight: cap=1 forces strictly sequential dispatch (initial
+ // wave size 1, every subsequent part dispatched via handoff from the prior
+ // AsyncPut completion).
+ {
+ StatsBlock Sb;
+ const uint32_t Cap = 1;
+ auto Admission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(Cap));
+ auto Client = F.MakeClient(Cap);
+ S3AsyncStorage Storage(
+ *Client,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-mp-handoff",
+ PartSize,
+ Admission,
+ Cap);
+
+ const uint64_t TotalSize = 16u * 1024u * 1024u; // 4 parts; handoff fires 3 times
+ std::vector<uint8_t> Bytes(TotalSize, 0x33);
+ std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp_handoff.bin", Bytes);
+ IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size());
+
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ Storage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats);
+ Work.Wait();
+ }
+ CHECK(Sb.InFlightPeak.load() == 1u);
+ std::vector<IoHash> Listed = Storage.List();
+ CHECK(std::find(Listed.begin(), Listed.end(), ContentHash) != Listed.end());
+ }
+
+ // ranges.exceed.cap.paces: GetMultipart with RangeCount > AdmissionCap.
+ // The dispatcher loop's per-range AcquireAdmissionSlot blocks the caller
+ // thread until in-flight ranges fire AsyncStream completions and release
+ // slots. InFlightPeak per Get stays bounded by Cap. Upload first with a
+ // larger client cap so the Put itself doesn't pace, then re-open Storage
+ // with the small cap purely to exercise the Get pacing path.
+ {
+ StatsBlock Sb;
+ const uint32_t UploadCap = 8;
+ auto UploadAdmission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(UploadCap));
+ auto UploadClient = F.MakeClient(UploadCap);
+ S3AsyncStorage UploadStorage(
+ *UploadClient,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-mp-get-paces",
+ PartSize,
+ UploadAdmission,
+ UploadCap);
+
+ const uint64_t TotalSize = 16u * 1024u * 1024u; // 4 ranges with PartSize=5 MiB
+ std::vector<uint8_t> Bytes(TotalSize, 0x77);
+ std::filesystem::path SrcPath = WriteBlob(F.TmpDir.Path() / "mp_get_paces.bin", Bytes);
+ IoHash ContentHash = IoHash::HashBuffer(Bytes.data(), Bytes.size());
+
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = Sb.Ref();
+ UploadStorage.Put(Work, F.Pool, ContentHash, TotalSize, SrcPath, Stats);
+ Work.Wait();
+ }
+
+ StatsBlock GetSb;
+ const uint32_t GetCap = 2;
+ auto GetAdmission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(GetCap));
+ auto GetClient = F.MakeClient(GetCap);
+ S3AsyncStorage GetStorage(
+ *GetClient,
+ *F.Builder,
+ [&F]() { return F.Creds; },
+ "module-mp-get-paces",
+ PartSize,
+ GetAdmission,
+ GetCap);
+
+ std::filesystem::path DstPath = F.TmpDir.Path() / "mp_get_paces_out.bin";
+ {
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ S3AsyncStorageStats Stats = GetSb.Ref();
+ GetStorage.Get(Work, F.Pool, ContentHash, TotalSize, DstPath, Stats);
+ Work.Wait();
+ }
+ CHECK(GetSb.InFlightPeak.load() <= GetCap);
+ REQUIRE(std::filesystem::exists(DstPath));
+ REQUIRE(std::filesystem::file_size(DstPath) == TotalSize);
+ BasicFile Out(DstPath, BasicFile::Mode::kRead);
+ IoBuffer Read = Out.ReadAll();
+ IoHash ReadHash = IoHash::HashBuffer(Read);
+ CHECK(ReadHash == ContentHash);
+ }
+}
+
+TEST_SUITE_END();
+
+#endif // ZEN_WITH_TESTS
+
+} // namespace zen