diff options
Diffstat (limited to 'src/zenserver/hub/hydration.cpp')
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 689 |
1 files changed, 662 insertions, 27 deletions
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index 621af8a46..2df326fab 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -2,6 +2,12 @@ #include "hydration.h" +#include "s3asyncstorage.h" + +#include <zenhttp/asynchttpclient.h> +#include <zenutil/cloud/s3requestbuilder.h> +#include <zenutil/cloud/s3response.h> + #include <zencore/basicfile.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> @@ -30,6 +36,7 @@ #include <unordered_set> #if ZEN_WITH_TESTS +# include <zencore/process.h> # include <zencore/testing.h> # include <zencore/testutils.h> # include <zencore/workthreadpool.h> @@ -159,6 +166,12 @@ namespace hydration_impl { std::atomic<uint64_t> FirstScheduleUs{UINT64_MAX}; std::atomic<uint64_t> FirstStartUs{UINT64_MAX}; + // Admission-gate wait. AdmissionWaitTotalUs is summed across all requests that blocked on + // the storage-layer concurrency semaphore; AdmissionWaitMaxUs is the worst single wait + // observed. Total exposes back-pressure cost; Max surfaces head-of-line blocking. + std::atomic<uint64_t> AdmissionWaitTotalUs{0}; + std::atomic<uint64_t> AdmissionWaitMaxUs{0}; + void RecordScheduled() { uint64_t Now = PhaseClock.GetElapsedTimeUs(); @@ -195,6 +208,15 @@ namespace hydration_impl { { } } + + void RecordAdmissionWait(uint64_t Us) + { + AdmissionWaitTotalUs.fetch_add(Us, std::memory_order_relaxed); + uint64_t Prev = AdmissionWaitMaxUs.load(std::memory_order_relaxed); + while (Us > Prev && !AdmissionWaitMaxUs.compare_exchange_weak(Prev, Us, std::memory_order_relaxed)) + { + } + } }; struct DehydrateStatistics @@ -362,6 +384,116 @@ namespace hydration_impl { uint64_t m_MultipartChunkSize; }; + S3AsyncStorageStats AsyncStatsFrom(PhaseStats& Stats) + { + return S3AsyncStorageStats{Stats.RequestCount, + Stats.RequestTotalUs, + Stats.RequestMaxUs, + Stats.Bytes, + Stats.InFlight, + Stats.InFlightPeak, + Stats.FirstScheduleUs, + Stats.FirstStartUs, + Stats.AdmissionWaitTotalUs, + Stats.AdmissionWaitMaxUs, + Stats.PhaseClock}; + } + + class S3AsyncStorageAdapter : public StorageBase + { + public: + static constexpr std::string_view Type = "s3-async"; + + S3AsyncStorageAdapter(AsyncHttpClient& Client, + S3RequestBuilder& Builder, + S3AsyncStorage::CredentialsCallback GetCreds, + std::string KeyPrefix, + uint64_t MultipartChunkSize, + std::shared_ptr<AdmissionSemaphore> Admission, + uint32_t AdmissionCap) + : m_Client(Client) + , m_Builder(Builder) + , m_GetCreds(std::move(GetCreds)) + , m_KeyPrefix(std::move(KeyPrefix)) + , m_MultipartChunkSize(MultipartChunkSize) + , m_Admission(std::move(Admission)) + , m_AdmissionCap(AdmissionCap) + , m_Storage( + std::make_unique<S3AsyncStorage>(Client, Builder, m_GetCreds, m_KeyPrefix, m_MultipartChunkSize, m_Admission, m_AdmissionCap)) + { + } + + virtual std::string Describe() const override { return fmt::format("s3-async://{}/{}"sv, m_Builder.BucketName(), m_KeyPrefix); } + + virtual void SaveMetadata(const CbObject& Data) override; + virtual CbObject LoadMetadata() override; + virtual CbObject GetSettings() override + { + CbObjectWriter Writer; + Writer << "MultipartChunkSize"sv << m_MultipartChunkSize; + return Writer.Save(); + } + virtual void ParseSettings(const CbObjectView& Settings) override + { + m_MultipartChunkSize = Settings["MultipartChunkSize"sv].AsUInt64(DefaultMultipartChunkSize); + m_Storage = std::make_unique<S3AsyncStorage>(m_Client, + m_Builder, + m_GetCreds, + m_KeyPrefix, + m_MultipartChunkSize, + m_Admission, + m_AdmissionCap); + } + virtual std::vector<IoHash> List() override { return m_Storage->List(); } + + virtual void Put(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath, + PhaseStats& Stats) override + { + Stats.Files.fetch_add(1, std::memory_order_relaxed); + S3AsyncStorageStats AsyncStats = AsyncStatsFrom(Stats); + m_Storage->Put(Work, WorkerPool, Hash, Size, SourcePath, AsyncStats); + } + + virtual void Get(ParallelWork& Work, + WorkerThreadPool& WorkerPool, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& DestinationPath, + PhaseStats& Stats) override + { + Stats.Files.fetch_add(1, std::memory_order_relaxed); + S3AsyncStorageStats AsyncStats = AsyncStatsFrom(Stats); + m_Storage->Get(Work, WorkerPool, Hash, Size, DestinationPath, AsyncStats); + } + + virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) override + { + Stats.Files.fetch_add(1, std::memory_order_relaxed); + S3AsyncStorageStats AsyncStats = AsyncStatsFrom(Stats); + m_Storage->Touch(Work, WorkerPool, Hash, AsyncStats); + } + + virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override + { + ZEN_UNUSED(WorkerPool); + m_Storage->DeleteAll(Work); + } + + private: + AsyncHttpClient& m_Client; + S3RequestBuilder& m_Builder; + S3AsyncStorage::CredentialsCallback m_GetCreds; + std::string m_KeyPrefix; + uint64_t m_MultipartChunkSize; + std::shared_ptr<AdmissionSemaphore> m_Admission; + uint32_t m_AdmissionCap = 0; + std::unique_ptr<S3AsyncStorage> m_Storage; + }; + /////////////////////////////////////////////////////////////////////// // FileStorage implementations @@ -701,9 +833,14 @@ namespace hydration_impl { RenameFile(ChunkPath, DestinationPath, Ec); if (Ec) { - Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath); - Chunk.Content.SetDeleteOnClose(true); - WriteFile(DestinationPath, Chunk.Content); + // Cross-volume rename failed; copy the temp file to the destination + // using explicit positional reads (no mmap). Caller is responsible + // for the source file's eventual cleanup via the temp dir sweep. + BasicFile Src(ChunkPath, BasicFile::Mode::kRead); + IoBuffer Body = Src.ReadAll(); + WriteFile(DestinationPath, Body); + std::error_code RemoveEc; + std::filesystem::remove(ChunkPath, RemoveEc); } } } @@ -759,6 +896,63 @@ namespace hydration_impl { } } + void S3AsyncStorageAdapter::SaveMetadata(const CbObject& Data) + { + ZEN_TRACE_CPU("S3AsyncStorageAdapter::SaveMetadata"); + BinaryWriter Output; + SaveCompactBinary(Output, Data); + + SigV4Credentials Creds = m_GetCreds(); + if (Creds.AccessKeyId.empty()) + { + throw zen::runtime_error("S3AsyncStorageAdapter::SaveMetadata: no credentials available"sv); + } + + std::string Key = fmt::format("{}/incremental-state.cbo", m_KeyPrefix); + std::string Path = m_Builder.KeyToPath(Key); + std::string Hash = Sha256ToHex(ComputeSha256(Output.GetData(), Output.GetSize())); + HttpClient::KeyValueMap Signed = m_Builder.SignRequest(Creds, "PUT", Path, "", Hash); + IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize()); + + HttpClient::Response Resp = m_Client.Put(Path, Payload, Signed).get(); + if (!Resp.IsSuccess()) + { + throw zen::runtime_error("Failed to save incremental metadata to '{}': {}"sv, Key, S3ErrorMessage("S3 PUT failed", Resp)); + } + } + + CbObject S3AsyncStorageAdapter::LoadMetadata() + { + ZEN_TRACE_CPU("S3AsyncStorageAdapter::LoadMetadata"); + SigV4Credentials Creds = m_GetCreds(); + if (Creds.AccessKeyId.empty()) + { + throw zen::runtime_error("S3AsyncStorageAdapter::LoadMetadata: no credentials available"sv); + } + + std::string Key = fmt::format("{}/incremental-state.cbo", m_KeyPrefix); + std::string Path = m_Builder.KeyToPath(Key); + HttpClient::KeyValueMap Signed = m_Builder.SignRequest(Creds, "GET", Path, "", S3EmptyPayloadHash); + + HttpClient::Response Resp = m_Client.Get(Path, Signed).get(); + if (!Resp.IsSuccess()) + { + if (Resp.StatusCode == HttpResponseCode::NotFound) + { + return {}; + } + throw zen::runtime_error("Failed to load incremental metadata from '{}': {}"sv, Key, S3ErrorMessage("S3 GET failed", Resp)); + } + + CbValidateError Error; + CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Resp.ResponsePayload), Error); + if (Error != CbValidateError::None) + { + throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}"sv, Key, ToString(Error)); + } + return Meta; + } + /////////////////////////////////////////////////////////////////////// // IncrementalHydrator: the only HydrationStrategyBase implementation. // Summary emission for hydrate/dehydrate operations. @@ -802,6 +996,7 @@ namespace hydration_impl { // (Stats.PackUpload), loose Touch (Stats.Touch), and pack-blob Touch (Stats.PackTouch). // Per-request data is collected per PhaseStats by Storage::Put / Storage::Touch and // reported as a single combined "Requests" line. + // const uint64_t UpReqCount = Stats.Upload.RequestCount.load() + Stats.PackUpload.RequestCount.load() + Stats.Touch.RequestCount.load() + Stats.PackTouch.RequestCount.load(); const uint64_t UpReqTotalUs = Stats.Upload.RequestTotalUs.load() + Stats.PackUpload.RequestTotalUs.load() + @@ -827,6 +1022,16 @@ namespace hydration_impl { Stats.PackTouch.FirstStartUs.load()}); const uint64_t UpQueueUs = QueueWaitUs(UpFirstSchedUs, UpFirstStartUs); + // Storage-layer admission wait. Sum across all four upload sub-phases gives total + // time blocked acquiring slots on the dispatcher; max is the worst single wait + // observed. Both zero when admission is disabled (file-backend / blocking S3). + const uint64_t UpAdmTotalUs = Stats.Upload.AdmissionWaitTotalUs.load() + Stats.PackUpload.AdmissionWaitTotalUs.load() + + Stats.Touch.AdmissionWaitTotalUs.load() + Stats.PackTouch.AdmissionWaitTotalUs.load(); + const uint64_t UpAdmMaxUs = std::max({Stats.Upload.AdmissionWaitMaxUs.load(), + Stats.PackUpload.AdmissionWaitMaxUs.load(), + Stats.Touch.AdmissionWaitMaxUs.load(), + Stats.PackTouch.AdmissionWaitMaxUs.load()}); + const uint64_t LooseFiles = Stats.Upload.Files.load(); const uint64_t LooseBytes = Stats.Upload.Bytes.load(); const uint64_t TouchFiles = Stats.Touch.Files.load(); @@ -852,7 +1057,7 @@ namespace hydration_impl { " List existing: {}\n" " Pack: {} {} packs, {} files, {}, {}bits/s\n" " Upload: {} loose {} files ({}), packed {} blobs ({}), touched {} loose ({}) + {} packs ({}), {}bits/s\n" - " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n" + " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}, admission wait avg {}/req max {}\n" " Save metadata: {}\n" " Clean: {}", Prefix, @@ -896,6 +1101,8 @@ namespace hydration_impl { NiceTimeSpanUs(UpReqMaxUs), UpPeak, NiceTimeSpanUs(UpQueueUs), + NiceTimeSpanUs(SafeAvg(UpAdmTotalUs, UpReqCount)), + NiceTimeSpanUs(UpAdmMaxUs), NiceTimeSpanUs(Stats.SaveMetadataUs.load()), NiceTimeSpanUs(Stats.CleanUs.load())); } @@ -924,6 +1131,12 @@ namespace hydration_impl { const uint64_t DlFirstStartUs = std::min(Stats.Download.FirstStartUs.load(), Stats.PackDownload.FirstStartUs.load()); const uint64_t QueueUs = QueueWaitUs(DlFirstSchedUs, DlFirstStartUs); + // Storage-layer admission wait. Sum across loose + pack downloads gives total + // dispatcher block time; max is the worst single wait. Both zero when admission + // is disabled (file-backend / blocking S3). + const uint64_t DlAdmTotalUs = Stats.Download.AdmissionWaitTotalUs.load() + Stats.PackDownload.AdmissionWaitTotalUs.load(); + const uint64_t DlAdmMaxUs = std::max(Stats.Download.AdmissionWaitMaxUs.load(), Stats.PackDownload.AdmissionWaitMaxUs.load()); + const uint64_t PackCount = Stats.PackCount.load(); const uint64_t PackedFiles = Stats.PackedFiles.load(); const uint64_t PackUnpackUs = Stats.PackUnpackUs.load(); @@ -942,7 +1155,7 @@ namespace hydration_impl { " Load metadata: {}\n" " Create dirs: {} {} dirs, {} dirs/s\n" " Download: {} loose {} files ({}), packed {} blobs ({}), {}bits/s\n" - " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n" + " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}, admission wait avg {}/req max {}\n" " Unpack: {} {} packs, {} files ({}), {}bits/s\n" " Clean: {}\n" " Finalize: {}\n" @@ -969,6 +1182,8 @@ namespace hydration_impl { NiceTimeSpanUs(DlReqMaxUs), DlPeak, NiceTimeSpanUs(QueueUs), + NiceTimeSpanUs(SafeAvg(DlAdmTotalUs, DlReqCount)), + NiceTimeSpanUs(DlAdmMaxUs), NiceTimeSpanUs(PackUnpackUs), ThousandsNum(PackCount), ThousandsNum(PackedFiles), @@ -1157,20 +1372,26 @@ namespace hydration_impl { // the hash is a meta-hash combining the embedded RawHash with the file size, which // avoids a collision between an uncompressed file and a same-content compressed file. // All other files use a streaming raw hash via BasicFile + IoHashStream (sequential - // read, friendlier to the Windows cache manager than mmap). + // reads). All reads are explicit positional reads via BasicFile - no mmap, no + // IoBufferBuilder::MakeFromFile materialization. void HashFileContent(const std::filesystem::path& AbsPath, Entry& Out) { + BasicFile File(AbsPath, BasicFile::Mode::kRead); + if (AbsPath.extension().empty()) { std::string_view Rel = Out.RelativePath; std::string_view First = Rel.substr(0, Rel.find('/')); if (First.ends_with("cas")) { + // Read compressed bytes into a heap IoBuffer (single positional read, no mmap) + // and probe with FromCompressed. On success, derive a meta-hash from the + // embedded RawHash + size and return without hashing the bytes themselves. + IoBuffer Compressed = File.ReadAll(); IoHash RawHash; uint64_t RawSize; - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), RawHash, RawSize); - if (Compressed) + CompressedBuffer Probe = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Compressed)), RawHash, RawSize); + if (Probe) { IoHashStream Hasher; Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); @@ -1178,10 +1399,10 @@ namespace hydration_impl { Out.Hash = Hasher.GetHash(); return; } + // Not a compressed file - fall through to streaming raw hash. } } - BasicFile File(AbsPath, BasicFile::Mode::kRead); IoHashStream Hasher; File.StreamFile([&Hasher](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); Out.Hash = Hasher.GetHash(); @@ -1264,9 +1485,7 @@ namespace hydration_impl { // Returns one PackPlan per pack to build (empty if no packs are produced). std::vector<PackPlan> PlanPacks(std::vector<Entry>& Entries, uint64_t Threshold, uint64_t MaxPackBytes) { - // 1. Group small-file Entries[] indices by content hash. Every index in a group - // shares the same bytes, so any one of them sources the pack content; all of - // them get tagged IsPacked once the pack hash is known. + // Group small-file Entries[] indices by content hash. std::unordered_map<IoHash, EntryGroup, IoHash::Hasher> UniqueMap; for (size_t Index = 0; Index < Entries.size(); ++Index) { @@ -1277,7 +1496,6 @@ namespace hydration_impl { UniqueMap[Entries[Index].Hash].push_back(Index); } - // Need at least 2 unique groups for any pack to survive the "discard 1-entry packs" rule. if (UniqueMap.size() < 2) { return {}; @@ -1286,7 +1504,7 @@ namespace hydration_impl { auto GroupHash = [&](const EntryGroup& G) -> const IoHash& { return Entries[G.front()].Hash; }; auto GroupSize = [&](const EntryGroup& G) -> uint64_t { return Entries[G.front()].Size; }; - // 2. Deterministic order: ascending IoHash. Drain the map so the index vectors move. + // Sort groups by ascending IoHash for deterministic pack composition. std::vector<EntryGroup> Ordered; Ordered.reserve(UniqueMap.size()); for (auto& [h, g] : UniqueMap) @@ -1295,7 +1513,7 @@ namespace hydration_impl { } std::sort(Ordered.begin(), Ordered.end(), [&](const EntryGroup& A, const EntryGroup& B) { return GroupHash(A) < GroupHash(B); }); - // 3. Bin-pack greedily under MaxPackBytes. + // Bin-pack greedily under MaxPackBytes. std::vector<PackPlan> Plans; PackPlan Current; uint64_t CurrentSize = 0; @@ -1815,6 +2033,17 @@ namespace hydration_impl { const std::vector<PackPlan> Pending = m_Config.PackEnabled ? PlanPacks(Entries, m_Config.PackThresholdBytes, m_Config.MaxPackBytes) : std::vector<PackPlan>{}; + // Pre-build absolute paths once. MakeSafeAbsolutePath does path normalization + + // Windows long-path prefix application; ~microseconds per entry. With 100k+ + // entries the per-iter cost in the dispatch loop adds up. Mirrors the Hydrate + // side which already pre-builds EntryPaths. + std::vector<std::filesystem::path> EntryPaths; + EntryPaths.reserve(Entries.size()); + for (const Entry& CurrentEntry : Entries) + { + EntryPaths.push_back(MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath)); + } + uint64_t DehydrateDurationMs = 0; { // Upload, PackUpload, Touch, and PackTouch share one ParallelWork; reset all @@ -1829,9 +2058,9 @@ namespace hydration_impl { // Schedule loose-CAS uploads first so they begin running while the pack-build // loop below executes serially on this thread. - for (const Entry& CurrentEntry : Entries) + for (size_t I = 0; I < Entries.size(); ++I) { - if (CurrentEntry.IsPacked) + if (Entries[I].IsPacked) { continue; // pack phase covers it } @@ -1839,9 +2068,9 @@ namespace hydration_impl { Work, *m_Threading.WorkerPool, ExistsLookup, - CurrentEntry.Hash, - CurrentEntry.Size, - MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath), + Entries[I].Hash, + Entries[I].Size, + EntryPaths[I], Stats.Upload, Stats.Touch); } @@ -1930,6 +2159,8 @@ namespace hydration_impl { } catch (const std::exception& Ex) { + // Failure is OK to swallow: next dehydrate or fresh hydrate falls back to + // the older state still on the backend. ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'", m_Config.ModuleId, Ex.what(), @@ -2143,6 +2374,8 @@ namespace hydration_impl { } catch (const std::exception& Ex) { + // Failure is OK to swallow: starts the instance with empty state, next + // dehydrate re-publishes from whatever the running instance materializes. ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'", m_Config.ModuleId, Ex.what(), @@ -2227,6 +2460,20 @@ private: Ref<ImdsCredentialProvider> m_CredentialProvider; std::unique_ptr<S3Client> m_Client; uint64_t m_DefaultMultipartChunkSize; + + // Async path: when Config.AsyncEnabled, build AsyncHttpClient + S3RequestBuilder + // shared across all per-module storage instances. Null otherwise. + std::unique_ptr<AsyncHttpClient> m_AsyncClient; + std::unique_ptr<S3RequestBuilder> m_AsyncBuilder; + + // Storage-layer admission gate, shared across all per-module S3AsyncStorage + // instances. Initial slot count = AsyncMaxConcurrentRequests. + std::shared_ptr<AdmissionSemaphore> m_AsyncAdmission; + uint32_t m_AsyncAdmissionCap = 0; + + // Captures m_Credentials / m_CredentialProvider state via callable so per-module + // S3AsyncStorage instances stay decoupled from the credential rotation logic. + S3AsyncStorage::CredentialsCallback BuildCredentialsCallback(); }; HydrationBase::HydrationBase(const Configuration& Config) @@ -2357,6 +2604,49 @@ S3Hydration::S3Hydration(const Configuration& Config) : HydrationBase(Config) ClientOptions.HttpSettings.RetryCount = 3; m_Client = std::make_unique<S3Client>(ClientOptions); + + if (Config.AsyncEnabled) + { + // Curl conn caps pinned to the request cap so handles never sit on + // libcurl's internal queue waiting for a connection slot (CONNECTTIMEOUT + // would tick there). With one S3 endpoint all connections go to the same + // host: PerHost is the binding cap, Total mirrors. MaxConcurrentRequests + // is a hint shared with the storage admission semaphore below. + HttpClientSettings AsyncSettings = ClientOptions.HttpSettings; + AsyncSettings.MaxConcurrentRequests = Config.AsyncMaxConcurrentRequests; + AsyncSettings.MaxConcurrentConnectionsPerHost = Config.AsyncMaxConcurrentRequests; + AsyncSettings.MaxConcurrentConnectionsTotal = Config.AsyncMaxConcurrentRequests; + + m_AsyncBuilder = std::make_unique<S3RequestBuilder>(m_Region, m_Bucket, m_Endpoint, m_PathStyle); + m_AsyncClient = std::make_unique<AsyncHttpClient>(m_AsyncBuilder->Endpoint(), AsyncSettings); + + // Storage-layer admission: paces S3 fan-out at the same in-flight cap + // curl uses for connection limits. Acquire happens on the dispatcher + // thread that drives Hydrate/Dehydrate, so back-pressure flows back to + // the caller without blocking io strand or hydration-pool workers. + m_AsyncAdmissionCap = Config.AsyncMaxConcurrentRequests; + m_AsyncAdmission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(m_AsyncAdmissionCap)); + ZEN_INFO("S3 hydration: async path enabled (max-concurrent-requests={})", Config.AsyncMaxConcurrentRequests); + } + else + { + ZEN_INFO("S3 hydration: blocking S3Client path"); + } +} + +S3AsyncStorage::CredentialsCallback +S3Hydration::BuildCredentialsCallback() +{ + if (m_CredentialProvider) + { + Ref<ImdsCredentialProvider> Provider = m_CredentialProvider; + return [Provider]() { + SigV4Credentials Creds = Provider->GetCredentials(); + return Creds; + }; + } + SigV4Credentials Creds = m_Credentials; + return [Creds]() { return Creds; }; } std::unique_ptr<HydrationStrategyBase> @@ -2365,6 +2655,19 @@ S3Hydration::CreateHydrator(const HydrationConfig& Config) using namespace hydration_impl; std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}"sv, m_KeyPrefixRoot, Config.ModuleId); + + if (m_AsyncClient) + { + return std::make_unique<IncrementalHydrator>(Config, + std::make_unique<S3AsyncStorageAdapter>(*m_AsyncClient, + *m_AsyncBuilder, + BuildCredentialsCallback(), + std::move(KeyPrefix), + m_DefaultMultipartChunkSize, + m_AsyncAdmission, + m_AsyncAdmissionCap), + m_Excludes); + } return std::make_unique<IncrementalHydrator>( Config, std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize), @@ -2993,10 +3296,20 @@ TEST_CASE("hydration.file.concurrent") // The MinIO binary must be present in the same directory as the test executable (copied by xmake). // --------------------------------------------------------------------------- +namespace { + // Per-binary unique MinIO port. + uint16_t AllocateHydrationMinioTestPort() + { + static const uint16_t Base = static_cast<uint16_t>(20000u + (static_cast<uint32_t>(GetCurrentProcessId()) % 30000u)); + static std::atomic<uint16_t> Slot{0}; + return Base + Slot.fetch_add(1, std::memory_order_relaxed); + } +} // namespace + TEST_CASE("hydration.s3.dehydrate_hydrate") { MinioProcessOptions MinioOpts; - MinioOpts.Port = 19011; + MinioOpts.Port = AllocateHydrationMinioTestPort(); MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); @@ -3055,7 +3368,7 @@ TEST_CASE("hydration.s3.concurrent") // N modules dehydrate and hydrate concurrently against MinIO. // Each module has a distinct ModuleId, so S3 key prefixes don't overlap. MinioProcessOptions MinioOpts; - MinioOpts.Port = 19013; + MinioOpts.Port = AllocateHydrationMinioTestPort(); MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); @@ -3149,7 +3462,7 @@ TEST_CASE("hydration.s3.concurrent") TEST_CASE("hydration.s3.obliterate") { MinioProcessOptions MinioOpts; - MinioOpts.Port = 19019; + MinioOpts.Port = AllocateHydrationMinioTestPort(); MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); @@ -3215,7 +3528,7 @@ TEST_CASE("hydration.s3.obliterate") TEST_CASE("hydration.s3.config_overrides") { MinioProcessOptions MinioOpts; - MinioOpts.Port = 19015; + MinioOpts.Port = AllocateHydrationMinioTestPort(); MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); @@ -3293,7 +3606,7 @@ TEST_CASE("hydration.s3.config_overrides") TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) { MinioProcessOptions MinioOpts; - MinioOpts.Port = 19010; + MinioOpts.Port = AllocateHydrationMinioTestPort(); MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); @@ -3425,7 +3738,7 @@ TEST_CASE("hydration.file.incremental") TEST_CASE("hydration.s3.incremental") { MinioProcessOptions MinioOpts; - MinioOpts.Port = 19017; + MinioOpts.Port = AllocateHydrationMinioTestPort(); MinioProcess Minio(MinioOpts); Minio.SpawnMinioServer(); Minio.CreateBucket("zen-hydration-test"); @@ -3524,6 +3837,328 @@ TEST_CASE("hydration.create_hydrator_rejects_invalid_config") CHECK_THROWS(InitHydration({})); } +TEST_CASE("hydration.s3async.dehydrate_hydrate") +{ + MinioProcessOptions MinioOpts; + MinioOpts.Port = AllocateHydrationMinioTestPort(); + MinioProcess Minio(MinioOpts); + Minio.SpawnMinioServer(); + Minio.CreateBucket("zen-hydration-test"); + + ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); + ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); + + ScopedTemporaryDirectory TempDir; + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationTemp); + + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + BaseConfig.AsyncEnabled = true; + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3async_roundtrip"}; + + WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + Hydration->CreateHydrator(Config)->Hydrate(); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + CreateSmallTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + CreateSmallTestTree(ServerStateDir); + WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64)); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + CleanDirectory(ServerStateDir, true); + Hydration->CreateHydrator(Config)->Hydrate(); + + CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin")); + CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "file_b.bin")); + CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "nested" / "file_c.bin")); +} + +// Exercises all three Put tiers (Small/Medium/Multipart) plus pack uploads in +// one round-trip. CreateTestTree adds 256K/512K/9M/63M blobs on top of the +// small-file set; the small files get packed, the 9M lands in PutMedium, and +// the 63M lands in PutMultipart. +TEST_CASE("hydration.s3async.dehydrate_hydrate.all_tiers") +{ + MinioProcessOptions MinioOpts; + MinioOpts.Port = AllocateHydrationMinioTestPort(); + MinioProcess Minio(MinioOpts); + Minio.SpawnMinioServer(); + Minio.CreateBucket("zen-hydration-test"); + + ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); + ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); + + ScopedTemporaryDirectory TempDir; + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationTemp); + + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = fmt::format( + R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"chunksize":{}}}}})", + Minio.Endpoint(), + 5u * 1024u * 1024u); // 5 MiB chunks -> multipart threshold ~6.25 MiB + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + BaseConfig.AsyncEnabled = true; + auto Hydration = InitHydration(BaseConfig); + + TestThreading Threading(8); + HydrationConfig Config{.ServerStateDir = ServerStateDir, + .TempDir = HydrationTemp, + .ModuleId = "s3async_all_tiers", + .Threading = Threading.Options}; + + // CreateTestTree: small files (pack candidates) + 256K (Small), 512K (Medium), + // 9M (Medium), 63M (Multipart). + auto Files = CreateTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(ServerStateDir, Files); +} + +TEST_CASE("hydration.s3async.concurrent") +{ + MinioProcessOptions MinioOpts; + MinioOpts.Port = AllocateHydrationMinioTestPort(); + MinioProcess Minio(MinioOpts); + Minio.SpawnMinioServer(); + Minio.CreateBucket("zen-hydration-test"); + + ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); + ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); + + constexpr int kModuleCount = 6; + constexpr int kThreadCount = 4; + + TestThreading Threading(kThreadCount); + + ScopedTemporaryDirectory TempDir; + + struct ModuleData + { + HydrationConfig Config; + std::vector<std::pair<std::filesystem::path, IoBuffer>> Files; + }; + std::vector<ModuleData> Modules(kModuleCount); + + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + BaseConfig.AsyncEnabled = true; + auto Hydration = InitHydration(BaseConfig); + + for (int I = 0; I < kModuleCount; ++I) + { + std::string ModuleId = fmt::format("s3async_concurrent_{}"sv, I); + std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; + std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; + CreateDirectories(StateDir); + CreateDirectories(TempPath); + + Modules[I].Config.ServerStateDir = StateDir; + Modules[I].Config.TempDir = TempPath; + Modules[I].Config.ModuleId = ModuleId; + Modules[I].Config.Threading = Threading.Options; + Modules[I].Files = CreateTestTree(StateDir); + } + + { + WorkerThreadPool Pool(kThreadCount, "hydration_s3async_dehy"); + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + for (int I = 0; I < kModuleCount; ++I) + { + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + }); + } + Work.Wait(); + CHECK_FALSE(Work.IsAborted()); + } + + { + WorkerThreadPool Pool(kThreadCount, "hydration_s3async_hy"); + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + for (int I = 0; I < kModuleCount; ++I) + { + Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) { + CleanDirectory(Config.ServerStateDir, true); + Hydration->CreateHydrator(Config)->Hydrate(); + }); + } + Work.Wait(); + CHECK_FALSE(Work.IsAborted()); + } + + for (int I = 0; I < kModuleCount; ++I) + { + VerifyTree(Modules[I].Config.ServerStateDir, Modules[I].Files); + } +} + +TEST_CASE("hydration.s3async.obliterate") +{ + MinioProcessOptions MinioOpts; + MinioOpts.Port = AllocateHydrationMinioTestPort(); + MinioProcess Minio(MinioOpts); + Minio.SpawnMinioServer(); + Minio.CreateBucket("zen-hydration-test"); + + ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); + ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); + + ScopedTemporaryDirectory TempDir; + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationTemp); + + constexpr std::string_view ModuleId = "s3async_obliterate"sv; + + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + BaseConfig.AsyncEnabled = true; + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = std::string(ModuleId)}; + + CreateSmallTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + auto ListModuleObjects = [&]() { + S3ClientOptions Opts; + Opts.BucketName = "zen-hydration-test"; + Opts.Endpoint = Minio.Endpoint(); + Opts.PathStyle = true; + Opts.Credentials.AccessKeyId = Minio.RootUser(); + Opts.Credentials.SecretAccessKey = Minio.RootPassword(); + S3Client Client(Opts); + return Client.ListObjects(fmt::format("{}/"sv, ModuleId)); + }; + + CHECK(!ListModuleObjects().Objects.empty()); + + CreateSmallTestTree(ServerStateDir); + WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); + + Hydration->CreateHydrator(Config)->Obliterate(); + + CHECK(ListModuleObjects().Objects.empty()); + CHECK(std::filesystem::is_empty(ServerStateDir)); + CHECK(std::filesystem::is_empty(HydrationTemp)); +} + +TEST_CASE("hydration.s3async.incremental") +{ + MinioProcessOptions MinioOpts; + MinioOpts.Port = AllocateHydrationMinioTestPort(); + MinioProcess Minio(MinioOpts); + Minio.SpawnMinioServer(); + Minio.CreateBucket("zen-hydration-test"); + + ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); + ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); + + ScopedTemporaryDirectory TempDir; + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationTemp); + + constexpr std::string_view ModuleId = "s3async_incremental"sv; + + TestThreading Threading(8); + HydrationBase::Configuration BaseConfig; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + BaseConfig.Options = std::move(Root).AsObject(); + } + BaseConfig.AsyncEnabled = true; + auto Hydration = InitHydration(BaseConfig); + + HydrationConfig Config{.ServerStateDir = ServerStateDir, + .TempDir = HydrationTemp, + .ModuleId = std::string(ModuleId), + .Threading = Threading.Options}; + + // Mirrors hydration.s3.incremental but with Config.IoContext set so the + // async S3AsyncStorageAdapter handles I/O. Each Dehydrate empties + // ServerStateDir as a side effect; subsequent Hydrate/Dehydrate calls + // thread the prior HydrationState so incremental dehydrate can hit its + // cache instead of re-uploading. + + CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + CHECK_FALSE(HydrationState); + + auto TestFiles = CreateTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(ServerStateDir, TestFiles); + + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + CHECK(std::filesystem::is_empty(ServerStateDir)); + + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + + TestFiles = CreateTestTree(ServerStateDir); + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); + + HydrationState = Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(ServerStateDir, TestFiles); + + Hydration->CreateHydrator(Config)->Dehydrate(HydrationState); +} + TEST_SUITE_END(); void |