// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace zen { // Storage-layer admission semaphore. Sized at runtime to MaxConcurrentRequests; // LeastMaxValue is the compile-time upper bound. Caller may pass nullptr to // disable gating entirely. using AdmissionSemaphore = std::counting_semaphore::max()>; struct S3AsyncStorageStats { std::atomic& RequestCount; std::atomic& RequestTotalUs; std::atomic& RequestMaxUs; std::atomic& Bytes; std::atomic& InFlight; std::atomic& InFlightPeak; std::atomic& FirstScheduleUs; std::atomic& FirstStartUs; std::atomic& AdmissionWaitTotalUs; std::atomic& AdmissionWaitMaxUs; Stopwatch& PhaseClock; void RecordScheduled() { const uint64_t Now = PhaseClock.GetElapsedTimeUs(); uint64_t Existing = FirstScheduleUs.load(std::memory_order_relaxed); while (Now < Existing && !FirstScheduleUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed)) { } } Stopwatch BeginRequest() { const uint64_t Now = PhaseClock.GetElapsedTimeUs(); uint64_t Existing = FirstStartUs.load(std::memory_order_relaxed); while (Now < Existing && !FirstStartUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed)) { } const uint32_t Current = InFlight.fetch_add(1, std::memory_order_relaxed) + 1; uint32_t Peak = InFlightPeak.load(std::memory_order_relaxed); while (Current > Peak && !InFlightPeak.compare_exchange_weak(Peak, Current, std::memory_order_relaxed)) { } return Stopwatch{}; } void EndRequest(uint64_t ElapsedUs, uint64_t BytesValue) { InFlight.fetch_sub(1, std::memory_order_relaxed); RequestCount.fetch_add(1, std::memory_order_relaxed); RequestTotalUs.fetch_add(ElapsedUs, std::memory_order_relaxed); Bytes.fetch_add(BytesValue, std::memory_order_relaxed); uint64_t Existing = RequestMaxUs.load(std::memory_order_relaxed); while (ElapsedUs > Existing && !RequestMaxUs.compare_exchange_weak(Existing, ElapsedUs, std::memory_order_relaxed)) { } } void RecordAdmissionWait(uint64_t Us) { AdmissionWaitTotalUs.fetch_add(Us, std::memory_order_relaxed); uint64_t Prev = AdmissionWaitMaxUs.load(std::memory_order_relaxed); while (Us > Prev && !AdmissionWaitMaxUs.compare_exchange_weak(Prev, Us, std::memory_order_relaxed)) { } } }; // Async S3 storage adapter for hub hydration. Mirrors the behavior of the // blocking `S3Storage` (defined inside hydration.cpp) but submits requests // via `AsyncHttpClient` and counts in-flight work against `ParallelWork` // using `ExternalWorkToken` instead of occupying worker-pool threads. // // Construction: // S3AsyncStorage Storage(AsyncClient, RequestBuilder, GetCreds, KeyPrefix, // MultipartChunkSize); // // AsyncClient and RequestBuilder are owned by the caller (typically the hub // or a test fixture). GetCreds is a callable returning the latest SigV4 // credentials - keeps the storage decoupled from any specific provider. // // Per-call SourcePath/DestinationPath args carry temp-dir state from the // caller (IncrementalHydrator picks paths under HydrationConfig::TempDir); // S3AsyncStorage itself never owns a temp directory. // // DeleteAll blocks the caller while the prefix listing runs, then issues // async deletes via Work. class S3AsyncStorage { public: using CredentialsCallback = std::function; S3AsyncStorage(AsyncHttpClient& Client, S3RequestBuilder& Builder, CredentialsCallback GetCreds, std::string KeyPrefix, uint64_t MultipartChunkSize, std::shared_ptr Admission = nullptr, uint32_t AdmissionCap = 0); std::string_view KeyPrefix() const { return m_KeyPrefix; } // Async data operations. Each registers a ParallelWork::ExternalWorkToken; // the async callback completes/fails the token. Caller drives Work.Wait() // to block until all submissions finish. // // Stats (must outlive in-flight callbacks): every S3 request the storage // issues calls RecordScheduled at submit time, BeginRequest just before // the network handoff, and EndRequest from the completion callback. Bytes // is the payload size on success, 0 on failure or zero-payload requests. // Multipart and ranged GET fire one Begin/EndRequest per part/range. void Put(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, S3AsyncStorageStats& Stats); void Get(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, S3AsyncStorageStats& Stats); void Touch(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, S3AsyncStorageStats& Stats); // Synchronous list of all CAS hashes under this module's prefix. std::vector List(); // Synchronous delete-all under the module prefix. Lists then issues async // deletes via Work; caller still drives Work.Wait(). void DeleteAll(ParallelWork& Work); // Forward declarations are public (not the structs themselves) so file-scope // free helpers in s3asyncstorage.cpp can name the types in their signatures // without being declared friends. The structs are defined privately in the // .cpp; no out-of-module caller can construct or inspect them. public: struct PutMultipartState; struct GetMultipartState; struct GetStreamState; private: std::string CasKey(const IoHash& Hash) const; std::string CasPath(const IoHash& Hash) const; void PutSmall(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, S3AsyncStorageStats& Stats); void PutMedium(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, S3AsyncStorageStats& Stats); void PutMultipart(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& SourcePath, S3AsyncStorageStats& Stats); void DispatchInitialPartWave(std::shared_ptr State); void DispatchPartUpload(std::shared_ptr State, uint32_t PartNum, std::shared_ptr SlotRef); void HandoffSlotToNextPart(std::shared_ptr State, uint32_t PartIdx, std::shared_ptr SlotRef); void DrainUndispatchedParts(std::shared_ptr State); void FinalizePutPart(std::shared_ptr State); void CompleteMultipart(std::shared_ptr State); void AbortMultipart(std::shared_ptr State); void GetMultipart(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, uint64_t Size, const std::filesystem::path& DestinationPath, S3AsyncStorageStats& Stats); void OnGetPartCompleted(std::shared_ptr State); void OnGetStreamFinalised(std::shared_ptr State); std::vector ListAllObjects(std::string_view Prefix); AsyncHttpClient& m_Client; S3RequestBuilder& m_Builder; CredentialsCallback m_GetCreds; std::string m_KeyPrefix; uint64_t m_MultipartChunkSize; std::shared_ptr m_Admission; // nullable; when null, no admission gating uint32_t m_AdmissionCap; // initial slot count; 0 when admission disabled }; void s3asyncstorage_forcelink(); #if ZEN_WITH_TESTS namespace s3asyncstorage_test_hooks { // Per-process counter consumed by DispatchPartUpload. While > 0, each // invocation decrements it and synthesizes a part-level failure (via // RecordPutPartFailure + drain + FinalizePutPart fan-out) instead of // issuing the UploadPart request. Used to drive the AbortMultipart path // from tests without fault-injecting MinIO. void ForceNextPartFailures(uint32_t Count); } // namespace s3asyncstorage_test_hooks #endif } // namespace zen