aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/hydration.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/hub/hydration.cpp')
-rw-r--r--src/zenserver/hub/hydration.cpp689
1 files changed, 662 insertions, 27 deletions
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index 621af8a46..2df326fab 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -2,6 +2,12 @@
#include "hydration.h"
+#include "s3asyncstorage.h"
+
+#include <zenhttp/asynchttpclient.h>
+#include <zenutil/cloud/s3requestbuilder.h>
+#include <zenutil/cloud/s3response.h>
+
#include <zencore/basicfile.h>
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
@@ -30,6 +36,7 @@
#include <unordered_set>
#if ZEN_WITH_TESTS
+# include <zencore/process.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
# include <zencore/workthreadpool.h>
@@ -159,6 +166,12 @@ namespace hydration_impl {
std::atomic<uint64_t> FirstScheduleUs{UINT64_MAX};
std::atomic<uint64_t> FirstStartUs{UINT64_MAX};
+ // Admission-gate wait. AdmissionWaitTotalUs is summed across all requests that blocked on
+ // the storage-layer concurrency semaphore; AdmissionWaitMaxUs is the worst single wait
+ // observed. Total exposes back-pressure cost; Max surfaces head-of-line blocking.
+ std::atomic<uint64_t> AdmissionWaitTotalUs{0};
+ std::atomic<uint64_t> AdmissionWaitMaxUs{0};
+
void RecordScheduled()
{
uint64_t Now = PhaseClock.GetElapsedTimeUs();
@@ -195,6 +208,15 @@ namespace hydration_impl {
{
}
}
+
+ void RecordAdmissionWait(uint64_t Us)
+ {
+ AdmissionWaitTotalUs.fetch_add(Us, std::memory_order_relaxed);
+ uint64_t Prev = AdmissionWaitMaxUs.load(std::memory_order_relaxed);
+ while (Us > Prev && !AdmissionWaitMaxUs.compare_exchange_weak(Prev, Us, std::memory_order_relaxed))
+ {
+ }
+ }
};
struct DehydrateStatistics
@@ -362,6 +384,116 @@ namespace hydration_impl {
uint64_t m_MultipartChunkSize;
};
+ S3AsyncStorageStats AsyncStatsFrom(PhaseStats& Stats)
+ {
+ return S3AsyncStorageStats{Stats.RequestCount,
+ Stats.RequestTotalUs,
+ Stats.RequestMaxUs,
+ Stats.Bytes,
+ Stats.InFlight,
+ Stats.InFlightPeak,
+ Stats.FirstScheduleUs,
+ Stats.FirstStartUs,
+ Stats.AdmissionWaitTotalUs,
+ Stats.AdmissionWaitMaxUs,
+ Stats.PhaseClock};
+ }
+
+ class S3AsyncStorageAdapter : public StorageBase
+ {
+ public:
+ static constexpr std::string_view Type = "s3-async";
+
+ S3AsyncStorageAdapter(AsyncHttpClient& Client,
+ S3RequestBuilder& Builder,
+ S3AsyncStorage::CredentialsCallback GetCreds,
+ std::string KeyPrefix,
+ uint64_t MultipartChunkSize,
+ std::shared_ptr<AdmissionSemaphore> Admission,
+ uint32_t AdmissionCap)
+ : m_Client(Client)
+ , m_Builder(Builder)
+ , m_GetCreds(std::move(GetCreds))
+ , m_KeyPrefix(std::move(KeyPrefix))
+ , m_MultipartChunkSize(MultipartChunkSize)
+ , m_Admission(std::move(Admission))
+ , m_AdmissionCap(AdmissionCap)
+ , m_Storage(
+ std::make_unique<S3AsyncStorage>(Client, Builder, m_GetCreds, m_KeyPrefix, m_MultipartChunkSize, m_Admission, m_AdmissionCap))
+ {
+ }
+
+ virtual std::string Describe() const override { return fmt::format("s3-async://{}/{}"sv, m_Builder.BucketName(), m_KeyPrefix); }
+
+ virtual void SaveMetadata(const CbObject& Data) override;
+ virtual CbObject LoadMetadata() override;
+ virtual CbObject GetSettings() override
+ {
+ CbObjectWriter Writer;
+ Writer << "MultipartChunkSize"sv << m_MultipartChunkSize;
+ return Writer.Save();
+ }
+ virtual void ParseSettings(const CbObjectView& Settings) override
+ {
+ m_MultipartChunkSize = Settings["MultipartChunkSize"sv].AsUInt64(DefaultMultipartChunkSize);
+ m_Storage = std::make_unique<S3AsyncStorage>(m_Client,
+ m_Builder,
+ m_GetCreds,
+ m_KeyPrefix,
+ m_MultipartChunkSize,
+ m_Admission,
+ m_AdmissionCap);
+ }
+ virtual std::vector<IoHash> List() override { return m_Storage->List(); }
+
+ virtual void Put(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath,
+ PhaseStats& Stats) override
+ {
+ Stats.Files.fetch_add(1, std::memory_order_relaxed);
+ S3AsyncStorageStats AsyncStats = AsyncStatsFrom(Stats);
+ m_Storage->Put(Work, WorkerPool, Hash, Size, SourcePath, AsyncStats);
+ }
+
+ virtual void Get(ParallelWork& Work,
+ WorkerThreadPool& WorkerPool,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& DestinationPath,
+ PhaseStats& Stats) override
+ {
+ Stats.Files.fetch_add(1, std::memory_order_relaxed);
+ S3AsyncStorageStats AsyncStats = AsyncStatsFrom(Stats);
+ m_Storage->Get(Work, WorkerPool, Hash, Size, DestinationPath, AsyncStats);
+ }
+
+ virtual void Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) override
+ {
+ Stats.Files.fetch_add(1, std::memory_order_relaxed);
+ S3AsyncStorageStats AsyncStats = AsyncStatsFrom(Stats);
+ m_Storage->Touch(Work, WorkerPool, Hash, AsyncStats);
+ }
+
+ virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override
+ {
+ ZEN_UNUSED(WorkerPool);
+ m_Storage->DeleteAll(Work);
+ }
+
+ private:
+ AsyncHttpClient& m_Client;
+ S3RequestBuilder& m_Builder;
+ S3AsyncStorage::CredentialsCallback m_GetCreds;
+ std::string m_KeyPrefix;
+ uint64_t m_MultipartChunkSize;
+ std::shared_ptr<AdmissionSemaphore> m_Admission;
+ uint32_t m_AdmissionCap = 0;
+ std::unique_ptr<S3AsyncStorage> m_Storage;
+ };
+
///////////////////////////////////////////////////////////////////////
// FileStorage implementations
@@ -701,9 +833,14 @@ namespace hydration_impl {
RenameFile(ChunkPath, DestinationPath, Ec);
if (Ec)
{
- Chunk.Content = IoBufferBuilder::MakeFromFile(ChunkPath);
- Chunk.Content.SetDeleteOnClose(true);
- WriteFile(DestinationPath, Chunk.Content);
+ // Cross-volume rename failed; copy the temp file to the destination
+ // using explicit positional reads (no mmap). Caller is responsible
+ // for the source file's eventual cleanup via the temp dir sweep.
+ BasicFile Src(ChunkPath, BasicFile::Mode::kRead);
+ IoBuffer Body = Src.ReadAll();
+ WriteFile(DestinationPath, Body);
+ std::error_code RemoveEc;
+ std::filesystem::remove(ChunkPath, RemoveEc);
}
}
}
@@ -759,6 +896,63 @@ namespace hydration_impl {
}
}
+ void S3AsyncStorageAdapter::SaveMetadata(const CbObject& Data)
+ {
+ ZEN_TRACE_CPU("S3AsyncStorageAdapter::SaveMetadata");
+ BinaryWriter Output;
+ SaveCompactBinary(Output, Data);
+
+ SigV4Credentials Creds = m_GetCreds();
+ if (Creds.AccessKeyId.empty())
+ {
+ throw zen::runtime_error("S3AsyncStorageAdapter::SaveMetadata: no credentials available"sv);
+ }
+
+ std::string Key = fmt::format("{}/incremental-state.cbo", m_KeyPrefix);
+ std::string Path = m_Builder.KeyToPath(Key);
+ std::string Hash = Sha256ToHex(ComputeSha256(Output.GetData(), Output.GetSize()));
+ HttpClient::KeyValueMap Signed = m_Builder.SignRequest(Creds, "PUT", Path, "", Hash);
+ IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize());
+
+ HttpClient::Response Resp = m_Client.Put(Path, Payload, Signed).get();
+ if (!Resp.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to save incremental metadata to '{}': {}"sv, Key, S3ErrorMessage("S3 PUT failed", Resp));
+ }
+ }
+
+ CbObject S3AsyncStorageAdapter::LoadMetadata()
+ {
+ ZEN_TRACE_CPU("S3AsyncStorageAdapter::LoadMetadata");
+ SigV4Credentials Creds = m_GetCreds();
+ if (Creds.AccessKeyId.empty())
+ {
+ throw zen::runtime_error("S3AsyncStorageAdapter::LoadMetadata: no credentials available"sv);
+ }
+
+ std::string Key = fmt::format("{}/incremental-state.cbo", m_KeyPrefix);
+ std::string Path = m_Builder.KeyToPath(Key);
+ HttpClient::KeyValueMap Signed = m_Builder.SignRequest(Creds, "GET", Path, "", S3EmptyPayloadHash);
+
+ HttpClient::Response Resp = m_Client.Get(Path, Signed).get();
+ if (!Resp.IsSuccess())
+ {
+ if (Resp.StatusCode == HttpResponseCode::NotFound)
+ {
+ return {};
+ }
+ throw zen::runtime_error("Failed to load incremental metadata from '{}': {}"sv, Key, S3ErrorMessage("S3 GET failed", Resp));
+ }
+
+ CbValidateError Error;
+ CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Resp.ResponsePayload), Error);
+ if (Error != CbValidateError::None)
+ {
+ throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}"sv, Key, ToString(Error));
+ }
+ return Meta;
+ }
+
///////////////////////////////////////////////////////////////////////
// IncrementalHydrator: the only HydrationStrategyBase implementation.
// Summary emission for hydrate/dehydrate operations.
@@ -802,6 +996,7 @@ namespace hydration_impl {
// (Stats.PackUpload), loose Touch (Stats.Touch), and pack-blob Touch (Stats.PackTouch).
// Per-request data is collected per PhaseStats by Storage::Put / Storage::Touch and
// reported as a single combined "Requests" line.
+ //
const uint64_t UpReqCount = Stats.Upload.RequestCount.load() + Stats.PackUpload.RequestCount.load() +
Stats.Touch.RequestCount.load() + Stats.PackTouch.RequestCount.load();
const uint64_t UpReqTotalUs = Stats.Upload.RequestTotalUs.load() + Stats.PackUpload.RequestTotalUs.load() +
@@ -827,6 +1022,16 @@ namespace hydration_impl {
Stats.PackTouch.FirstStartUs.load()});
const uint64_t UpQueueUs = QueueWaitUs(UpFirstSchedUs, UpFirstStartUs);
+ // Storage-layer admission wait. Sum across all four upload sub-phases gives total
+ // time blocked acquiring slots on the dispatcher; max is the worst single wait
+ // observed. Both zero when admission is disabled (file-backend / blocking S3).
+ const uint64_t UpAdmTotalUs = Stats.Upload.AdmissionWaitTotalUs.load() + Stats.PackUpload.AdmissionWaitTotalUs.load() +
+ Stats.Touch.AdmissionWaitTotalUs.load() + Stats.PackTouch.AdmissionWaitTotalUs.load();
+ const uint64_t UpAdmMaxUs = std::max({Stats.Upload.AdmissionWaitMaxUs.load(),
+ Stats.PackUpload.AdmissionWaitMaxUs.load(),
+ Stats.Touch.AdmissionWaitMaxUs.load(),
+ Stats.PackTouch.AdmissionWaitMaxUs.load()});
+
const uint64_t LooseFiles = Stats.Upload.Files.load();
const uint64_t LooseBytes = Stats.Upload.Bytes.load();
const uint64_t TouchFiles = Stats.Touch.Files.load();
@@ -852,7 +1057,7 @@ namespace hydration_impl {
" List existing: {}\n"
" Pack: {} {} packs, {} files, {}, {}bits/s\n"
" Upload: {} loose {} files ({}), packed {} blobs ({}), touched {} loose ({}) + {} packs ({}), {}bits/s\n"
- " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n"
+ " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}, admission wait avg {}/req max {}\n"
" Save metadata: {}\n"
" Clean: {}",
Prefix,
@@ -896,6 +1101,8 @@ namespace hydration_impl {
NiceTimeSpanUs(UpReqMaxUs),
UpPeak,
NiceTimeSpanUs(UpQueueUs),
+ NiceTimeSpanUs(SafeAvg(UpAdmTotalUs, UpReqCount)),
+ NiceTimeSpanUs(UpAdmMaxUs),
NiceTimeSpanUs(Stats.SaveMetadataUs.load()),
NiceTimeSpanUs(Stats.CleanUs.load()));
}
@@ -924,6 +1131,12 @@ namespace hydration_impl {
const uint64_t DlFirstStartUs = std::min(Stats.Download.FirstStartUs.load(), Stats.PackDownload.FirstStartUs.load());
const uint64_t QueueUs = QueueWaitUs(DlFirstSchedUs, DlFirstStartUs);
+ // Storage-layer admission wait. Sum across loose + pack downloads gives total
+ // dispatcher block time; max is the worst single wait. Both zero when admission
+ // is disabled (file-backend / blocking S3).
+ const uint64_t DlAdmTotalUs = Stats.Download.AdmissionWaitTotalUs.load() + Stats.PackDownload.AdmissionWaitTotalUs.load();
+ const uint64_t DlAdmMaxUs = std::max(Stats.Download.AdmissionWaitMaxUs.load(), Stats.PackDownload.AdmissionWaitMaxUs.load());
+
const uint64_t PackCount = Stats.PackCount.load();
const uint64_t PackedFiles = Stats.PackedFiles.load();
const uint64_t PackUnpackUs = Stats.PackUnpackUs.load();
@@ -942,7 +1155,7 @@ namespace hydration_impl {
" Load metadata: {}\n"
" Create dirs: {} {} dirs, {} dirs/s\n"
" Download: {} loose {} files ({}), packed {} blobs ({}), {}bits/s\n"
- " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n"
+ " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}, admission wait avg {}/req max {}\n"
" Unpack: {} {} packs, {} files ({}), {}bits/s\n"
" Clean: {}\n"
" Finalize: {}\n"
@@ -969,6 +1182,8 @@ namespace hydration_impl {
NiceTimeSpanUs(DlReqMaxUs),
DlPeak,
NiceTimeSpanUs(QueueUs),
+ NiceTimeSpanUs(SafeAvg(DlAdmTotalUs, DlReqCount)),
+ NiceTimeSpanUs(DlAdmMaxUs),
NiceTimeSpanUs(PackUnpackUs),
ThousandsNum(PackCount),
ThousandsNum(PackedFiles),
@@ -1157,20 +1372,26 @@ namespace hydration_impl {
// the hash is a meta-hash combining the embedded RawHash with the file size, which
// avoids a collision between an uncompressed file and a same-content compressed file.
// All other files use a streaming raw hash via BasicFile + IoHashStream (sequential
- // read, friendlier to the Windows cache manager than mmap).
+ // reads). All reads are explicit positional reads via BasicFile - no mmap, no
+ // IoBufferBuilder::MakeFromFile materialization.
void HashFileContent(const std::filesystem::path& AbsPath, Entry& Out)
{
+ BasicFile File(AbsPath, BasicFile::Mode::kRead);
+
if (AbsPath.extension().empty())
{
std::string_view Rel = Out.RelativePath;
std::string_view First = Rel.substr(0, Rel.find('/'));
if (First.ends_with("cas"))
{
+ // Read compressed bytes into a heap IoBuffer (single positional read, no mmap)
+ // and probe with FromCompressed. On success, derive a meta-hash from the
+ // embedded RawHash + size and return without hashing the bytes themselves.
+ IoBuffer Compressed = File.ReadAll();
IoHash RawHash;
uint64_t RawSize;
- CompressedBuffer Compressed =
- CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), RawHash, RawSize);
- if (Compressed)
+ CompressedBuffer Probe = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Compressed)), RawHash, RawSize);
+ if (Probe)
{
IoHashStream Hasher;
Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash));
@@ -1178,10 +1399,10 @@ namespace hydration_impl {
Out.Hash = Hasher.GetHash();
return;
}
+ // Not a compressed file - fall through to streaming raw hash.
}
}
- BasicFile File(AbsPath, BasicFile::Mode::kRead);
IoHashStream Hasher;
File.StreamFile([&Hasher](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
Out.Hash = Hasher.GetHash();
@@ -1264,9 +1485,7 @@ namespace hydration_impl {
// Returns one PackPlan per pack to build (empty if no packs are produced).
std::vector<PackPlan> PlanPacks(std::vector<Entry>& Entries, uint64_t Threshold, uint64_t MaxPackBytes)
{
- // 1. Group small-file Entries[] indices by content hash. Every index in a group
- // shares the same bytes, so any one of them sources the pack content; all of
- // them get tagged IsPacked once the pack hash is known.
+ // Group small-file Entries[] indices by content hash.
std::unordered_map<IoHash, EntryGroup, IoHash::Hasher> UniqueMap;
for (size_t Index = 0; Index < Entries.size(); ++Index)
{
@@ -1277,7 +1496,6 @@ namespace hydration_impl {
UniqueMap[Entries[Index].Hash].push_back(Index);
}
- // Need at least 2 unique groups for any pack to survive the "discard 1-entry packs" rule.
if (UniqueMap.size() < 2)
{
return {};
@@ -1286,7 +1504,7 @@ namespace hydration_impl {
auto GroupHash = [&](const EntryGroup& G) -> const IoHash& { return Entries[G.front()].Hash; };
auto GroupSize = [&](const EntryGroup& G) -> uint64_t { return Entries[G.front()].Size; };
- // 2. Deterministic order: ascending IoHash. Drain the map so the index vectors move.
+ // Sort groups by ascending IoHash for deterministic pack composition.
std::vector<EntryGroup> Ordered;
Ordered.reserve(UniqueMap.size());
for (auto& [h, g] : UniqueMap)
@@ -1295,7 +1513,7 @@ namespace hydration_impl {
}
std::sort(Ordered.begin(), Ordered.end(), [&](const EntryGroup& A, const EntryGroup& B) { return GroupHash(A) < GroupHash(B); });
- // 3. Bin-pack greedily under MaxPackBytes.
+ // Bin-pack greedily under MaxPackBytes.
std::vector<PackPlan> Plans;
PackPlan Current;
uint64_t CurrentSize = 0;
@@ -1815,6 +2033,17 @@ namespace hydration_impl {
const std::vector<PackPlan> Pending =
m_Config.PackEnabled ? PlanPacks(Entries, m_Config.PackThresholdBytes, m_Config.MaxPackBytes) : std::vector<PackPlan>{};
+ // Pre-build absolute paths once. MakeSafeAbsolutePath does path normalization +
+ // Windows long-path prefix application; ~microseconds per entry. With 100k+
+ // entries the per-iter cost in the dispatch loop adds up. Mirrors the Hydrate
+ // side which already pre-builds EntryPaths.
+ std::vector<std::filesystem::path> EntryPaths;
+ EntryPaths.reserve(Entries.size());
+ for (const Entry& CurrentEntry : Entries)
+ {
+ EntryPaths.push_back(MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath));
+ }
+
uint64_t DehydrateDurationMs = 0;
{
// Upload, PackUpload, Touch, and PackTouch share one ParallelWork; reset all
@@ -1829,9 +2058,9 @@ namespace hydration_impl {
// Schedule loose-CAS uploads first so they begin running while the pack-build
// loop below executes serially on this thread.
- for (const Entry& CurrentEntry : Entries)
+ for (size_t I = 0; I < Entries.size(); ++I)
{
- if (CurrentEntry.IsPacked)
+ if (Entries[I].IsPacked)
{
continue; // pack phase covers it
}
@@ -1839,9 +2068,9 @@ namespace hydration_impl {
Work,
*m_Threading.WorkerPool,
ExistsLookup,
- CurrentEntry.Hash,
- CurrentEntry.Size,
- MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath),
+ Entries[I].Hash,
+ Entries[I].Size,
+ EntryPaths[I],
Stats.Upload,
Stats.Touch);
}
@@ -1930,6 +2159,8 @@ namespace hydration_impl {
}
catch (const std::exception& Ex)
{
+ // Failure is OK to swallow: next dehydrate or fresh hydrate falls back to
+ // the older state still on the backend.
ZEN_WARN("Dehydration of module '{}' failed: {}. Leaving server state '{}'",
m_Config.ModuleId,
Ex.what(),
@@ -2143,6 +2374,8 @@ namespace hydration_impl {
}
catch (const std::exception& Ex)
{
+ // Failure is OK to swallow: starts the instance with empty state, next
+ // dehydrate re-publishes from whatever the running instance materializes.
ZEN_WARN("Hydration of module '{}' failed: {}. Cleaning server state '{}'",
m_Config.ModuleId,
Ex.what(),
@@ -2227,6 +2460,20 @@ private:
Ref<ImdsCredentialProvider> m_CredentialProvider;
std::unique_ptr<S3Client> m_Client;
uint64_t m_DefaultMultipartChunkSize;
+
+ // Async path: when Config.AsyncEnabled, build AsyncHttpClient + S3RequestBuilder
+ // shared across all per-module storage instances. Null otherwise.
+ std::unique_ptr<AsyncHttpClient> m_AsyncClient;
+ std::unique_ptr<S3RequestBuilder> m_AsyncBuilder;
+
+ // Storage-layer admission gate, shared across all per-module S3AsyncStorage
+ // instances. Initial slot count = AsyncMaxConcurrentRequests.
+ std::shared_ptr<AdmissionSemaphore> m_AsyncAdmission;
+ uint32_t m_AsyncAdmissionCap = 0;
+
+ // Captures m_Credentials / m_CredentialProvider state via callable so per-module
+ // S3AsyncStorage instances stay decoupled from the credential rotation logic.
+ S3AsyncStorage::CredentialsCallback BuildCredentialsCallback();
};
HydrationBase::HydrationBase(const Configuration& Config)
@@ -2357,6 +2604,49 @@ S3Hydration::S3Hydration(const Configuration& Config) : HydrationBase(Config)
ClientOptions.HttpSettings.RetryCount = 3;
m_Client = std::make_unique<S3Client>(ClientOptions);
+
+ if (Config.AsyncEnabled)
+ {
+ // Curl conn caps pinned to the request cap so handles never sit on
+ // libcurl's internal queue waiting for a connection slot (CONNECTTIMEOUT
+ // would tick there). With one S3 endpoint all connections go to the same
+ // host: PerHost is the binding cap, Total mirrors. MaxConcurrentRequests
+ // is a hint shared with the storage admission semaphore below.
+ HttpClientSettings AsyncSettings = ClientOptions.HttpSettings;
+ AsyncSettings.MaxConcurrentRequests = Config.AsyncMaxConcurrentRequests;
+ AsyncSettings.MaxConcurrentConnectionsPerHost = Config.AsyncMaxConcurrentRequests;
+ AsyncSettings.MaxConcurrentConnectionsTotal = Config.AsyncMaxConcurrentRequests;
+
+ m_AsyncBuilder = std::make_unique<S3RequestBuilder>(m_Region, m_Bucket, m_Endpoint, m_PathStyle);
+ m_AsyncClient = std::make_unique<AsyncHttpClient>(m_AsyncBuilder->Endpoint(), AsyncSettings);
+
+ // Storage-layer admission: paces S3 fan-out at the same in-flight cap
+ // curl uses for connection limits. Acquire happens on the dispatcher
+ // thread that drives Hydrate/Dehydrate, so back-pressure flows back to
+ // the caller without blocking io strand or hydration-pool workers.
+ m_AsyncAdmissionCap = Config.AsyncMaxConcurrentRequests;
+ m_AsyncAdmission = std::make_shared<AdmissionSemaphore>(static_cast<std::ptrdiff_t>(m_AsyncAdmissionCap));
+ ZEN_INFO("S3 hydration: async path enabled (max-concurrent-requests={})", Config.AsyncMaxConcurrentRequests);
+ }
+ else
+ {
+ ZEN_INFO("S3 hydration: blocking S3Client path");
+ }
+}
+
+S3AsyncStorage::CredentialsCallback
+S3Hydration::BuildCredentialsCallback()
+{
+ if (m_CredentialProvider)
+ {
+ Ref<ImdsCredentialProvider> Provider = m_CredentialProvider;
+ return [Provider]() {
+ SigV4Credentials Creds = Provider->GetCredentials();
+ return Creds;
+ };
+ }
+ SigV4Credentials Creds = m_Credentials;
+ return [Creds]() { return Creds; };
}
std::unique_ptr<HydrationStrategyBase>
@@ -2365,6 +2655,19 @@ S3Hydration::CreateHydrator(const HydrationConfig& Config)
using namespace hydration_impl;
std::string KeyPrefix =
m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}"sv, m_KeyPrefixRoot, Config.ModuleId);
+
+ if (m_AsyncClient)
+ {
+ return std::make_unique<IncrementalHydrator>(Config,
+ std::make_unique<S3AsyncStorageAdapter>(*m_AsyncClient,
+ *m_AsyncBuilder,
+ BuildCredentialsCallback(),
+ std::move(KeyPrefix),
+ m_DefaultMultipartChunkSize,
+ m_AsyncAdmission,
+ m_AsyncAdmissionCap),
+ m_Excludes);
+ }
return std::make_unique<IncrementalHydrator>(
Config,
std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize),
@@ -2993,10 +3296,20 @@ TEST_CASE("hydration.file.concurrent")
// The MinIO binary must be present in the same directory as the test executable (copied by xmake).
// ---------------------------------------------------------------------------
+namespace {
+ // Per-binary unique MinIO port.
+ uint16_t AllocateHydrationMinioTestPort()
+ {
+ static const uint16_t Base = static_cast<uint16_t>(20000u + (static_cast<uint32_t>(GetCurrentProcessId()) % 30000u));
+ static std::atomic<uint16_t> Slot{0};
+ return Base + Slot.fetch_add(1, std::memory_order_relaxed);
+ }
+} // namespace
+
TEST_CASE("hydration.s3.dehydrate_hydrate")
{
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19011;
+ MinioOpts.Port = AllocateHydrationMinioTestPort();
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
Minio.CreateBucket("zen-hydration-test");
@@ -3055,7 +3368,7 @@ TEST_CASE("hydration.s3.concurrent")
// N modules dehydrate and hydrate concurrently against MinIO.
// Each module has a distinct ModuleId, so S3 key prefixes don't overlap.
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19013;
+ MinioOpts.Port = AllocateHydrationMinioTestPort();
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
Minio.CreateBucket("zen-hydration-test");
@@ -3149,7 +3462,7 @@ TEST_CASE("hydration.s3.concurrent")
TEST_CASE("hydration.s3.obliterate")
{
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19019;
+ MinioOpts.Port = AllocateHydrationMinioTestPort();
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
Minio.CreateBucket("zen-hydration-test");
@@ -3215,7 +3528,7 @@ TEST_CASE("hydration.s3.obliterate")
TEST_CASE("hydration.s3.config_overrides")
{
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19015;
+ MinioOpts.Port = AllocateHydrationMinioTestPort();
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
Minio.CreateBucket("zen-hydration-test");
@@ -3293,7 +3606,7 @@ TEST_CASE("hydration.s3.config_overrides")
TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip())
{
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19010;
+ MinioOpts.Port = AllocateHydrationMinioTestPort();
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
Minio.CreateBucket("zen-hydration-test");
@@ -3425,7 +3738,7 @@ TEST_CASE("hydration.file.incremental")
TEST_CASE("hydration.s3.incremental")
{
MinioProcessOptions MinioOpts;
- MinioOpts.Port = 19017;
+ MinioOpts.Port = AllocateHydrationMinioTestPort();
MinioProcess Minio(MinioOpts);
Minio.SpawnMinioServer();
Minio.CreateBucket("zen-hydration-test");
@@ -3524,6 +3837,328 @@ TEST_CASE("hydration.create_hydrator_rejects_invalid_config")
CHECK_THROWS(InitHydration({}));
}
+TEST_CASE("hydration.s3async.dehydrate_hydrate")
+{
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = AllocateHydrationMinioTestPort();
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ HydrationBase::Configuration BaseConfig;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
+ }
+ BaseConfig.AsyncEnabled = true;
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "s3async_roundtrip"};
+
+ WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
+ Hydration->CreateHydrator(Config)->Hydrate();
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ CreateSmallTestTree(ServerStateDir);
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+
+ CreateSmallTestTree(ServerStateDir);
+ WriteFile(ServerStateDir / "v2marker.bin", CreateSemiRandomBlob(64));
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+
+ CleanDirectory(ServerStateDir, true);
+ Hydration->CreateHydrator(Config)->Hydrate();
+
+ CHECK(std::filesystem::exists(ServerStateDir / "v2marker.bin"));
+ CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "file_b.bin"));
+ CHECK(std::filesystem::exists(ServerStateDir / "subdir" / "nested" / "file_c.bin"));
+}
+
+// Exercises all three Put tiers (Small/Medium/Multipart) plus pack uploads in
+// one round-trip. CreateTestTree adds 256K/512K/9M/63M blobs on top of the
+// small-file set; the small files get packed, the 9M lands in PutMedium, and
+// the 63M lands in PutMultipart.
+TEST_CASE("hydration.s3async.dehydrate_hydrate.all_tiers")
+{
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = AllocateHydrationMinioTestPort();
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ HydrationBase::Configuration BaseConfig;
+ {
+ std::string ConfigJson = fmt::format(
+ R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"chunksize":{}}}}})",
+ Minio.Endpoint(),
+ 5u * 1024u * 1024u); // 5 MiB chunks -> multipart threshold ~6.25 MiB
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
+ }
+ BaseConfig.AsyncEnabled = true;
+ auto Hydration = InitHydration(BaseConfig);
+
+ TestThreading Threading(8);
+ HydrationConfig Config{.ServerStateDir = ServerStateDir,
+ .TempDir = HydrationTemp,
+ .ModuleId = "s3async_all_tiers",
+ .Threading = Threading.Options};
+
+ // CreateTestTree: small files (pack candidates) + 256K (Small), 512K (Medium),
+ // 9M (Medium), 63M (Multipart).
+ auto Files = CreateTestTree(ServerStateDir);
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(ServerStateDir, Files);
+}
+
+TEST_CASE("hydration.s3async.concurrent")
+{
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = AllocateHydrationMinioTestPort();
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ constexpr int kModuleCount = 6;
+ constexpr int kThreadCount = 4;
+
+ TestThreading Threading(kThreadCount);
+
+ ScopedTemporaryDirectory TempDir;
+
+ struct ModuleData
+ {
+ HydrationConfig Config;
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> Files;
+ };
+ std::vector<ModuleData> Modules(kModuleCount);
+
+ HydrationBase::Configuration BaseConfig;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
+ }
+ BaseConfig.AsyncEnabled = true;
+ auto Hydration = InitHydration(BaseConfig);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ std::string ModuleId = fmt::format("s3async_concurrent_{}"sv, I);
+ std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state";
+ std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp";
+ CreateDirectories(StateDir);
+ CreateDirectories(TempPath);
+
+ Modules[I].Config.ServerStateDir = StateDir;
+ Modules[I].Config.TempDir = TempPath;
+ Modules[I].Config.ModuleId = ModuleId;
+ Modules[I].Config.Threading = Threading.Options;
+ Modules[I].Files = CreateTestTree(StateDir);
+ }
+
+ {
+ WorkerThreadPool Pool(kThreadCount, "hydration_s3async_dehy");
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ });
+ }
+ Work.Wait();
+ CHECK_FALSE(Work.IsAborted());
+ }
+
+ {
+ WorkerThreadPool Pool(kThreadCount, "hydration_s3async_hy");
+ std::atomic<bool> AbortFlag{false};
+ std::atomic<bool> PauseFlag{false};
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ Work.ScheduleWork(Pool, [&Hydration, &Config = Modules[I].Config](std::atomic<bool>&) {
+ CleanDirectory(Config.ServerStateDir, true);
+ Hydration->CreateHydrator(Config)->Hydrate();
+ });
+ }
+ Work.Wait();
+ CHECK_FALSE(Work.IsAborted());
+ }
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ VerifyTree(Modules[I].Config.ServerStateDir, Modules[I].Files);
+ }
+}
+
+TEST_CASE("hydration.s3async.obliterate")
+{
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = AllocateHydrationMinioTestPort();
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ constexpr std::string_view ModuleId = "s3async_obliterate"sv;
+
+ HydrationBase::Configuration BaseConfig;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
+ }
+ BaseConfig.AsyncEnabled = true;
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = std::string(ModuleId)};
+
+ CreateSmallTestTree(ServerStateDir);
+ Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+
+ auto ListModuleObjects = [&]() {
+ S3ClientOptions Opts;
+ Opts.BucketName = "zen-hydration-test";
+ Opts.Endpoint = Minio.Endpoint();
+ Opts.PathStyle = true;
+ Opts.Credentials.AccessKeyId = Minio.RootUser();
+ Opts.Credentials.SecretAccessKey = Minio.RootPassword();
+ S3Client Client(Opts);
+ return Client.ListObjects(fmt::format("{}/"sv, ModuleId));
+ };
+
+ CHECK(!ListModuleObjects().Objects.empty());
+
+ CreateSmallTestTree(ServerStateDir);
+ WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
+
+ Hydration->CreateHydrator(Config)->Obliterate();
+
+ CHECK(ListModuleObjects().Objects.empty());
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+ CHECK(std::filesystem::is_empty(HydrationTemp));
+}
+
+TEST_CASE("hydration.s3async.incremental")
+{
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = AllocateHydrationMinioTestPort();
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ constexpr std::string_view ModuleId = "s3async_incremental"sv;
+
+ TestThreading Threading(8);
+ HydrationBase::Configuration BaseConfig;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ BaseConfig.Options = std::move(Root).AsObject();
+ }
+ BaseConfig.AsyncEnabled = true;
+ auto Hydration = InitHydration(BaseConfig);
+
+ HydrationConfig Config{.ServerStateDir = ServerStateDir,
+ .TempDir = HydrationTemp,
+ .ModuleId = std::string(ModuleId),
+ .Threading = Threading.Options};
+
+ // Mirrors hydration.s3.incremental but with Config.IoContext set so the
+ // async S3AsyncStorageAdapter handles I/O. Each Dehydrate empties
+ // ServerStateDir as a side effect; subsequent Hydrate/Dehydrate calls
+ // thread the prior HydrationState so incremental dehydrate can hit its
+ // cache instead of re-uploading.
+
+ CbObject HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+ CHECK_FALSE(HydrationState);
+
+ auto TestFiles = CreateTestTree(ServerStateDir);
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(ServerStateDir, TestFiles);
+
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+
+ TestFiles = CreateTestTree(ServerStateDir);
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
+
+ HydrationState = Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(ServerStateDir, TestFiles);
+
+ Hydration->CreateHydrator(Config)->Dehydrate(HydrationState);
+}
+
TEST_SUITE_END();
void