diff options
| author | Dan Engelbrecht <[email protected]> | 2026-04-27 11:14:09 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-27 11:14:09 +0200 |
| commit | 753ab4e89b9a5952e50bc77d404198520b362a3a (patch) | |
| tree | 39dbaad8389677981281b8c1585ac846251539f0 /src | |
| parent | fix crash when scavenging sequences or copying local chunks (#1013) (diff) | |
| download | archived-zen-753ab4e89b9a5952e50bc77d404198520b362a3a.tar.xz archived-zen-753ab4e89b9a5952e50bc77d404198520b362a3a.zip | |
hydration with pack (#1016)
- Feature: Hub hydration packs small files into raw CAS pack blobs to reduce request count for modules dominated by tiny metadata files
- `--hub-hydration-enable-pack` (Lua: `hub.hydration.enablepack`, default true)
- `--hub-hydration-pack-threshold-bytes` (Lua: `hub.hydration.packthresholdbytes`, default 256 KiB)
- `--hub-hydration-max-pack-bytes` (Lua: `hub.hydration.maxpackbytes`, default 4 MiB)
- Feature: Hub hydration and dehydration can be disabled per direction
- `--hub-enable-hydration` (Lua: `hub.enablehydration`, default true)
- `--hub-enable-dehydration` (Lua: `hub.enabledehydration`, default true)
- Feature: Hub hydration accepts a configurable file exclude list via `HydrationOptions` `excludes` (array of wildcards). Built-in defaults skip transient runtime files (`.lock`, `.sentry-native/*`, `state_marker`, `*.bak`, `gc/reserve.gc`, `auth/*`) so they no longer participate in dehydrate scans. Override semantics: a present field replaces the default outright; explicit `[]` opts out of all defaults.
- Improvement: Hub hydration completion logs now report per-request average and max latency, peak in-flight workers, queue wait, and hash-cache hit percentage; loose and pack-blob transfers are reported separately
- Improvement: Hub hydration pre-creates unique parent directories before scheduling parallel writes
- Improvement: S3 hydration retries transient HTTP failures (timeouts, 429 throttling, 5xx server errors, connection errors) up to 3 times via the HTTP client retry layer
- Improvement: S3 hydration multipart chunk size is persisted in `state.cbo` per module so hydrate replays the partitioning used at dehydrate; default raised to 64 MiB (was 32 MiB)
- Improvement: Hub hydration `Obliterate` retries backend delete once before falling back to local cleanup
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/include/zencore/string.h | 6 | ||||
| -rw-r--r-- | src/zencore/string.cpp | 62 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 68 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 5 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 2246 | ||||
| -rw-r--r-- | src/zenserver/hub/hydration.h | 45 | ||||
| -rw-r--r-- | src/zenserver/hub/hydrationdefaults.h | 26 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.cpp | 24 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.h | 5 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 52 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 8 | ||||
| -rw-r--r-- | src/zenutil/cloud/s3client.cpp | 32 |
12 files changed, 2063 insertions, 516 deletions
diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h index a1c7a3914..7ca2afc69 100644 --- a/src/zencore/include/zencore/string.h +++ b/src/zencore/include/zencore/string.h @@ -742,6 +742,7 @@ size_t NiceBytesToBuffer(uint64_t Num, std::span<char> Buffer); size_t NiceByteRateToBuffer(uint64_t Num, uint64_t ms, std::span<char> Buffer); size_t NiceLatencyNsToBuffer(uint64_t NanoSeconds, std::span<char> Buffer); size_t NiceTimeSpanMsToBuffer(uint64_t Milliseconds, std::span<char> Buffer); +size_t NiceTimeSpanUsToBuffer(uint64_t Microseconds, std::span<char> Buffer); struct NiceBase { @@ -803,6 +804,11 @@ struct NiceTimeSpanMs : public NiceBase inline NiceTimeSpanMs(uint64_t Milliseconds) { NiceTimeSpanMsToBuffer(Milliseconds, m_Buffer); } }; +struct NiceTimeSpanUs : public NiceBase +{ + inline NiceTimeSpanUs(uint64_t Microseconds) { NiceTimeSpanUsToBuffer(Microseconds, m_Buffer); } +}; + ////////////////////////////////////////////////////////////////////////// inline std::string diff --git a/src/zencore/string.cpp b/src/zencore/string.cpp index 34519b83b..4072aec56 100644 --- a/src/zencore/string.cpp +++ b/src/zencore/string.cpp @@ -482,6 +482,24 @@ NiceTimeSpanMsToBuffer(uint64_t Millis, std::span<char> Buffer) } } +size_t +NiceTimeSpanUsToBuffer(uint64_t Micros, std::span<char> Buffer) +{ + if (Micros < 1000) + { + return snprintf(Buffer.data(), Buffer.size(), "%" PRIu64 "us", Micros); + } + else if (Micros < 10000) + { + return snprintf(Buffer.data(), Buffer.size(), "%.2fms", Micros / 1000.0); + } + else if (Micros < 1000000) + { + return snprintf(Buffer.data(), Buffer.size(), "%.1fms", Micros / 1000.0); + } + return NiceTimeSpanMsToBuffer(Micros / 1000, Buffer); +} + ////////////////////////////////////////////////////////////////////////// template<typename C> @@ -850,6 +868,13 @@ TEST_CASE("niceNum") NiceNumGeneral(100000000000000, Buffer, kNicenumTime); CHECK(StringEquals(Buffer, "100000s")); + + // floor() instead of round(): 999.5us must NOT render as "1000us". + NiceNumGeneral(999500, Buffer, kNicenumTime); + CHECK(StringEquals(Buffer, "999us")); + + NiceNumGeneral(999999, Buffer, kNicenumTime); + CHECK(StringEquals(Buffer, "999us")); } SUBCASE("bytes") @@ -917,6 +942,43 @@ TEST_CASE("niceNum") NiceTimeSpanMsToBuffer(360000000, Buffer); CHECK(StringEquals(Buffer, "100h00m")); } + + SUBCASE("timespan_us") + { + NiceTimeSpanUsToBuffer(0, Buffer); + CHECK(StringEquals(Buffer, "0us")); + + NiceTimeSpanUsToBuffer(1, Buffer); + CHECK(StringEquals(Buffer, "1us")); + + NiceTimeSpanUsToBuffer(999, Buffer); + CHECK(StringEquals(Buffer, "999us")); + + NiceTimeSpanUsToBuffer(1000, Buffer); + CHECK(StringEquals(Buffer, "1.00ms")); + + NiceTimeSpanUsToBuffer(1500, Buffer); + CHECK(StringEquals(Buffer, "1.50ms")); + + NiceTimeSpanUsToBuffer(9999, Buffer); + CHECK(StringEquals(Buffer, "10.00ms")); // %.2f rounds 9.999 -> 10.00 + + NiceTimeSpanUsToBuffer(10000, Buffer); + CHECK(StringEquals(Buffer, "10.0ms")); + + NiceTimeSpanUsToBuffer(999500, Buffer); + CHECK(StringEquals(Buffer, "999.5ms")); + + NiceTimeSpanUsToBuffer(999999, Buffer); + CHECK(StringEquals(Buffer, "1000.0ms")); // boundary just below 1s + + // >=1s delegates to NiceTimeSpanMs. + NiceTimeSpanUsToBuffer(1000000, Buffer); + CHECK(StringEquals(Buffer, "1.00s")); + + NiceTimeSpanUsToBuffer(60000000, Buffer); + CHECK(StringEquals(Buffer, "1m00s")); + } } TEST_CASE("StringBuilder") diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index c03c1a9a0..4ae8d0457 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -11,6 +11,7 @@ #include <zencore/scopeguard.h> #include <zencore/string.h> #include <zencore/timer.h> +#include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpclient.h> @@ -248,6 +249,7 @@ Hub::~Hub() void Hub::Shutdown() { + ZEN_TRACE_CPU("Hub::Shutdown"); ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); bool Expected = false; @@ -299,6 +301,7 @@ Hub::Shutdown() Hub::Response Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { + ZEN_TRACE_CPU("Hub::Provision"); ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; bool IsNewInstance = false; @@ -328,17 +331,22 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) auto NewInstance = std::make_unique<StorageServerInstance>( m_RunEnvironment, *m_Hydration, - StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), - .StateDir = m_RunEnvironment.CreateChildDir(ModuleId), - .TempDir = m_HydrationTempPath / ModuleId, - .HttpThreadCount = m_Config.InstanceHttpThreadCount, - .CoreLimit = m_Config.InstanceCoreLimit, - .ConfigPath = m_Config.InstanceConfigPath, - .Malloc = m_Config.InstanceMalloc, - .Trace = m_Config.InstanceTrace, - .TraceHost = m_Config.InstanceTraceHost, - .TraceFile = m_Config.InstanceTraceFile, - .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool}, + StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), + .StateDir = m_RunEnvironment.CreateChildDir(ModuleId), + .TempDir = m_HydrationTempPath / ModuleId, + .HttpThreadCount = m_Config.InstanceHttpThreadCount, + .CoreLimit = m_Config.InstanceCoreLimit, + .ConfigPath = m_Config.InstanceConfigPath, + .Malloc = m_Config.InstanceMalloc, + .Trace = m_Config.InstanceTrace, + .TraceHost = m_Config.InstanceTraceHost, + .TraceFile = m_Config.InstanceTraceFile, + .EnableHydration = m_Config.EnableHydration, + .EnableDehydration = m_Config.EnableDehydration, + .HydrationPackEnabled = m_Config.HydrationPackEnabled, + .HydrationPackThresholdBytes = m_Config.HydrationPackThresholdBytes, + .HydrationMaxPackBytes = m_Config.HydrationMaxPackBytes, + .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool}, ModuleId); #if ZEN_PLATFORM_WINDOWS @@ -511,6 +519,7 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, HubInstanceState OldState, bool IsNewInstance) { + ZEN_TRACE_CPU("Hub::CompleteProvision"); const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); std::string BaseUri; // TODO? @@ -571,6 +580,7 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, Hub::Response Hub::Deprovision(const std::string& ModuleId) { + ZEN_TRACE_CPU("Hub::Deprovision"); ZEN_ASSERT(!m_ShutdownFlag.load()); return InternalDeprovision(ModuleId, [](ActiveInstance& Instance) { ZEN_UNUSED(Instance); @@ -581,6 +591,7 @@ Hub::Deprovision(const std::string& ModuleId) Hub::Response Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate) { + ZEN_TRACE_CPU("Hub::InternalDeprovision"); StorageServerInstance::ExclusiveLockedPtr Instance; size_t ActiveInstanceIndex = (size_t)-1; { @@ -694,6 +705,7 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI Hub::Response Hub::Obliterate(const std::string& ModuleId) { + ZEN_TRACE_CPU("Hub::Obliterate"); ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; @@ -845,6 +857,7 @@ Hub::Obliterate(const std::string& ModuleId) void Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex) { + ZEN_TRACE_CPU("Hub::CompleteObliterate"); const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); @@ -871,6 +884,7 @@ Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, siz void Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { + ZEN_TRACE_CPU("Hub::CompleteDeprovision"); const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); @@ -938,6 +952,7 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si Hub::Response Hub::Hibernate(const std::string& ModuleId) { + ZEN_TRACE_CPU("Hub::Hibernate"); ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; @@ -1051,6 +1066,7 @@ Hub::Hibernate(const std::string& ModuleId) void Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { + ZEN_TRACE_CPU("Hub::CompleteHibernate"); const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); @@ -1074,6 +1090,7 @@ Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size Hub::Response Hub::Wake(const std::string& ModuleId) { + ZEN_TRACE_CPU("Hub::Wake"); ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; @@ -1185,6 +1202,7 @@ Hub::Wake(const std::string& ModuleId) void Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { + ZEN_TRACE_CPU("Hub::CompleteWake"); const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); @@ -1402,6 +1420,7 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS void Hub::AttemptRecoverInstance(std::string_view ModuleId) { + ZEN_TRACE_CPU("Hub::AttemptRecoverInstance"); StorageServerInstance::ExclusiveLockedPtr Instance; size_t ActiveInstanceIndex = (size_t)-1; { @@ -1506,6 +1525,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, StorageServerInstance::SharedLockedPtr&& LockedInstance, size_t ActiveInstanceIndex) { + ZEN_TRACE_CPU("Hub::CheckInstanceStatus"); const std::string ModuleId(LockedInstance.GetModuleId()); HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load(); @@ -1645,6 +1665,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, void Hub::UpdateMachineMetrics() { + ZEN_TRACE_CPU("Hub::UpdateMachineMetrics"); try { bool DiskSpaceOk = false; @@ -1696,6 +1717,7 @@ Hub::WatchDog() size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0 while (!m_ShutdownFlag.load() && !m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs))) { + ZEN_TRACE_CPU("Hub::WatchDogCycle"); try { UpdateMachineMetrics(); @@ -2755,18 +2777,18 @@ TEST_CASE("hub.instance.inactivity.deprovision") auto PokeInstance = [&](uint16_t Port) { // Make a real storage request to increment the instance's activity sum. // The watchdog detects the changed sum on the next cycle and resets LastActivityTime. - { - HttpClient PersistentClient(fmt::format("http://localhost:{}", Port), - HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200)}); - uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() - - std::chrono::steady_clock::time_point::min()) - .count(); - IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick)); - const HttpClient::Response PutResult = - PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key), - IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive")))); - CHECK(PutResult); - } + // Per-attempt connect is kept tight (200ms) so a genuinely dead endpoint fails fast; + // RetryCount=3 absorbs transient localhost-accept slowness on loaded CI runners. + HttpClient PersistentClient(fmt::format("http://localhost:{}", Port), + HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200), .RetryCount = 3}); + uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() - + std::chrono::steady_clock::time_point::min()) + .count(); + IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick)); + const HttpClient::Response PutResult = + PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key), + IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive")))); + REQUIRE_MESSAGE(PutResult, "PokeInstance PUT failed - cannot reset activity timer"); }; PokeInstance(IdleInfo.Port); diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index 40d046ce0..1ce9bc876 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -74,6 +74,11 @@ public: std::filesystem::path InstanceConfigPath; std::string HydrationTargetSpecification; CbObject HydrationOptions; + bool EnableHydration = true; + bool EnableDehydration = true; + bool HydrationPackEnabled = true; + uint64_t HydrationPackThresholdBytes = DefaultPackThresholdBytes; + uint64_t HydrationMaxPackBytes = DefaultMaxPackBytes; WatchDogConfiguration WatchDog; diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index c7f25bab6..2730ba059 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -10,15 +10,20 @@ #include <zencore/except_fmt.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/iohash.h> #include <zencore/logging.h> #include <zencore/parallelwork.h> +#include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/system.h> #include <zencore/thread.h> #include <zencore/timer.h> +#include <zencore/trace.h> +#include <zencore/uid.h> #include <zenutil/cloud/imdscredentials.h> #include <zenutil/cloud/s3client.h> #include <zenutil/filesystemutils.h> +#include <zenutil/wildcard.h> #include <numeric> #include <unordered_map> @@ -34,6 +39,8 @@ namespace zen { +using namespace std::literals; + namespace hydration_impl { /// UTC time decomposed to calendar fields with sub-second milliseconds. @@ -76,7 +83,51 @@ namespace hydration_impl { std::atomic<bool>& PauseFlag, const std::filesystem::path& Path) { - CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0); + CleanDirectoryResult Result = CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0); + for (const auto& [FailedPath, Ec] : Result.FailedRemovePaths) + { + ZEN_WARN("Failed to remove '{}' while cleaning '{}': {}", FailedPath, Path, Ec.message()); + } + } + + // Returns true if RelKey matches any wildcard in Excludes. Excluded paths are + // dropped at the dehydrate-side directory scan and never enter the manifest. + bool IsExcluded(std::string_view RelKey, std::span<const std::string> Excludes) + { + for (const std::string& W : Excludes) + { + if (MatchWildcard(W, RelKey, /*CaseSensitive*/ true)) + { + return true; + } + } + return false; + } + + std::vector<std::string> ParseStringArray(CbFieldView Field) + { + std::vector<std::string> Out; + for (CbFieldView Entry : Field.AsArrayView()) + { + Out.emplace_back(Entry.AsString()); + } + return Out; + } + + // Built-in exclude wildcards applied unless the hub config supplies an explicit + // `excludes` array (an empty array opts out of all defaults). Patterns match the + // dehydrate-side relative path (forward-slash form). `*` is path-separator-agnostic + // per zenutil/wildcard.h. + std::vector<std::string> DefaultExcludes() + { + return { + ".sentry-native/*", // sentry-native crash uploader DB; locked while child runs + "state_marker", // root-level liveness marker (zenstorageserver.cpp) + ".lock", // FILE_FLAG_DELETE_ON_CLOSE lock; locked while child runs + "*.bak", // transient backups produced by atomic file replace + "gc/reserve.gc", // GC disk reserve (gc subdir per zenstorageserver.cpp) + "auth/*", // encrypted auth state under auth/ + }; } /////////////////////////////////////////////////////////////////////// @@ -90,45 +141,110 @@ namespace hydration_impl { std::atomic<uint64_t> Bytes{0}; // lambda-side: bytes transferred on successful completion std::atomic<uint64_t> ElapsedUs{0}; // wall time around Work.Wait() - RwLock ThreadIdsLock; - std::unordered_set<int> ThreadIds; + // Per-request timing gathered inside the work lambdas. RequestCount + RequestTotalUs are + // summed across all completed requests. RequestMaxUs is the slowest single request + // observed (CAS-updated in EndRequest); avg vs max gap surfaces tail latency without + // keeping per-request samples. + std::atomic<uint64_t> RequestCount{0}; + std::atomic<uint64_t> RequestTotalUs{0}; + std::atomic<uint64_t> RequestMaxUs{0}; + std::atomic<uint32_t> InFlight{0}; + std::atomic<uint32_t> InFlightPeak{0}; + + // Scheduling latency. PhaseClock starts when the phase begins. FirstScheduleUs / + // FirstStartUs are the relative times of the earliest ScheduleWork call and the earliest + // worker lambda entry. Their difference is how long requests sat in the pool backlog + // before a worker picked one up (pool warm-up / backlog). + Stopwatch PhaseClock; + std::atomic<uint64_t> FirstScheduleUs{UINT64_MAX}; + std::atomic<uint64_t> FirstStartUs{UINT64_MAX}; + + void RecordScheduled() + { + uint64_t Now = PhaseClock.GetElapsedTimeUs(); + uint64_t Existing = FirstScheduleUs.load(std::memory_order_relaxed); + while (Now < Existing && !FirstScheduleUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed)) + { + } + } - void RecordThread() + // Returns a Stopwatch the caller runs across the actual request; call EndRequest with + // the elapsed microseconds when the request completes. + Stopwatch BeginRequest() { - int Tid = zen::GetCurrentThreadId(); - ThreadIdsLock.WithExclusiveLock([&] { ThreadIds.insert(Tid); }); + uint64_t Now = PhaseClock.GetElapsedTimeUs(); + uint64_t Existing = FirstStartUs.load(std::memory_order_relaxed); + while (Now < Existing && !FirstStartUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed)) + { + } + uint32_t Current = InFlight.fetch_add(1, std::memory_order_relaxed) + 1; + uint32_t Peak = InFlightPeak.load(std::memory_order_relaxed); + while (Current > Peak && !InFlightPeak.compare_exchange_weak(Peak, Current, std::memory_order_relaxed)) + { + } + return Stopwatch{}; + } + + void EndRequest(uint64_t ElapsedUsValue) + { + InFlight.fetch_sub(1, std::memory_order_relaxed); + RequestCount.fetch_add(1, std::memory_order_relaxed); + RequestTotalUs.fetch_add(ElapsedUsValue, std::memory_order_relaxed); + uint64_t Existing = RequestMaxUs.load(std::memory_order_relaxed); + while (ElapsedUsValue > Existing && !RequestMaxUs.compare_exchange_weak(Existing, ElapsedUsValue, std::memory_order_relaxed)) + { + } } }; struct DehydrateStatistics { PhaseStats Hash; - PhaseStats Upload; - PhaseStats Touch; // Touch shares Upload's ParallelWork / ElapsedUs + PhaseStats Upload; // Loose CAS PUTs + PhaseStats Touch; // Mod-time refresh on pre-existing loose CAS entries; shares Upload's ParallelWork + PhaseStats PackUpload; // Pack-blob PUTs; shares Upload's ParallelWork + PhaseStats PackTouch; // Mod-time refresh on pre-existing pack blobs; shares Upload's ParallelWork std::atomic<uint64_t> LoadStateUs{0}; std::atomic<uint64_t> DirScanUs{0}; std::atomic<uint64_t> ListExistingUs{0}; - std::atomic<uint64_t> MetadataSaveUs{0}; + std::atomic<uint64_t> SaveMetadataUs{0}; std::atomic<uint64_t> CleanUs{0}; std::atomic<uint64_t> TotalFiles{0}; std::atomic<uint64_t> TotalBytes{0}; std::atomic<uint64_t> TotalUs{0}; + + // Pack phase stats + std::atomic<uint64_t> PackCount{0}; // number of packs built + std::atomic<uint64_t> PackedFiles{0}; // Files[] entries folded into packs (includes hash-duplicates) + std::atomic<uint64_t> PackBytes{0}; // total bytes across all packs + std::atomic<uint64_t> PackBuildUs{0}; // hash + write cost across all packs }; struct HydrateStatistics { - PhaseStats Download; + PhaseStats Download; // Loose CAS GETs + PhaseStats PackDownload; // Pack-blob GETs; shares Download's ParallelWork std::atomic<uint64_t> LoadMetadataUs{0}; + std::atomic<uint64_t> CreateDirsUs{0}; + std::atomic<uint64_t> CreateDirsCount{0}; // unique parent dirs passed to CreateDirectories std::atomic<uint64_t> CleanUs{0}; - std::atomic<uint64_t> RenameOrCopyUs{0}; - std::atomic<uint64_t> VerifyScanUs{0}; + std::atomic<uint64_t> FinalizeUs{0}; + std::atomic<uint64_t> BuildStateUs{0}; std::atomic<uint64_t> TotalFiles{0}; std::atomic<uint64_t> TotalBytes{0}; std::atomic<uint64_t> TotalUs{0}; + + // Pack phase stats + std::atomic<uint64_t> PackCount{0}; // packs in manifest + std::atomic<uint64_t> PackedFiles{0}; // files unpacked into ServerStateDir (per-destination count) + std::atomic<uint64_t> PackUnpackUs{0}; // slice + parallel SafeWriteFile cost + // Bytes written to disk during the unpack-and-slice phase (sum of slice sizes + // touched by SafeWriteFile). Pairs with PackUnpackUs for disk-write throughput. + std::atomic<uint64_t> UnpackWriteBytes{0}; }; // Bits-per-second rate computed at microsecond precision. Zero-safe. @@ -149,9 +265,13 @@ namespace hydration_impl { public: virtual ~StorageBase() = default; - virtual std::string Describe() const = 0; - virtual void SaveMetadata(const CbObject& Data) = 0; - virtual CbObject LoadMetadata() = 0; + virtual std::string Describe() const = 0; + virtual void SaveMetadata(const CbObject& Data) = 0; + virtual CbObject LoadMetadata() = 0; + // Backend-specific settings that need to be persisted in state.cbo and reapplied + // on hydrate. Today only S3 uses this (MultipartChunkSize - the chunk size used at + // dehydrate must be carried forward so hydrate uses the same partitioning). File + // backend has no such settings and returns / accepts an empty object. virtual CbObject GetSettings() = 0; virtual void ParseSettings(const CbObjectView& Settings) = 0; virtual std::vector<IoHash> List() = 0; @@ -179,7 +299,7 @@ namespace hydration_impl { explicit FileStorage(std::filesystem::path ModulePath); - virtual std::string Describe() const override { return fmt::format("file://{}", m_StoragePath.generic_string()); } + virtual std::string Describe() const override { return fmt::format("file://{}"sv, m_StoragePath.generic_string()); } virtual void SaveMetadata(const CbObject& Data) override; virtual CbObject LoadMetadata() override; virtual CbObject GetSettings() override { return {}; } @@ -209,13 +329,12 @@ namespace hydration_impl { class S3Storage : public StorageBase { public: - static constexpr std::string_view Prefix = "s3://"; - static constexpr std::string_view Type = "s3"; - static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u; + static constexpr std::string_view Prefix = "s3://"; + static constexpr std::string_view Type = "s3"; S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize); - virtual std::string Describe() const override { return fmt::format("s3://{}/{}", m_Client.BucketName(), m_KeyPrefix); } + virtual std::string Describe() const override { return fmt::format("s3://{}/{}"sv, m_Client.BucketName(), m_KeyPrefix); } virtual void SaveMetadata(const CbObject& Data) override; virtual CbObject LoadMetadata() override; virtual CbObject GetSettings() override; @@ -256,6 +375,7 @@ namespace hydration_impl { void FileStorage::SaveMetadata(const CbObject& Data) { + ZEN_TRACE_CPU("FileStorage::SaveMetadata"); BinaryWriter Output; SaveCompactBinary(Output, Data); WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize())); @@ -263,6 +383,7 @@ namespace hydration_impl { CbObject FileStorage::LoadMetadata() { + ZEN_TRACE_CPU("FileStorage::LoadMetadata"); if (!IsFile(m_StatePathName)) { return {}; @@ -277,7 +398,7 @@ namespace hydration_impl { CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error); if (Error != CbValidateError::None) { - throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error))); + throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}"sv, m_StatePathName, ToString(Error))); } return Result; } @@ -309,13 +430,15 @@ namespace hydration_impl { Work.ScheduleWork( WorkerPool, [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic<bool>& AbortFlag) { - Stats.RecordThread(); + ZEN_TRACE_CPU("FileStorage::Put"); if (!AbortFlag.load()) { - std::filesystem::path DestPath = m_CASPath / fmt::format("{}", Hash); + Stopwatch Timer = Stats.BeginRequest(); + std::filesystem::path DestPath = m_CASPath / fmt::format("{}"sv, Hash); + auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); if (std::error_code Ec = CopyFile(SourcePath, DestPath, CopyFileOptions{.EnableClone = true}); Ec) { - throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestPath)); + throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'"sv, SourcePath, DestPath)); } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); } @@ -332,13 +455,16 @@ namespace hydration_impl { Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats]( std::atomic<bool>& AbortFlag) { - Stats.RecordThread(); + ZEN_TRACE_CPU("FileStorage::Get"); if (!AbortFlag.load()) { - std::filesystem::path SourcePath = m_CASPath / fmt::format("{}", Hash); + Stopwatch Timer = Stats.BeginRequest(); + auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); + std::filesystem::path SourcePath = m_CASPath / fmt::format("{}"sv, Hash); if (std::error_code Ec = CopyFile(SourcePath, DestinationPath, CopyFileOptions{.EnableClone = true}); Ec) { - throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestinationPath)); + throw std::system_error(Ec, + fmt::format("Failed to copy '{}' to '{}'"sv, SourcePath, DestinationPath)); } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); } @@ -365,6 +491,7 @@ namespace hydration_impl { void S3Storage::SaveMetadata(const CbObject& Data) { + ZEN_TRACE_CPU("S3Storage::SaveMetadata"); BinaryWriter Output; SaveCompactBinary(Output, Data); IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize()); @@ -373,12 +500,13 @@ namespace hydration_impl { S3Result Result = m_Client.PutObject(Key, std::move(Payload)); if (!Result.IsSuccess()) { - throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error); + throw zen::runtime_error("Failed to save incremental metadata to '{}': {}"sv, Key, Result.Error); } } CbObject S3Storage::LoadMetadata() { + ZEN_TRACE_CPU("S3Storage::LoadMetadata"); std::string Key = m_KeyPrefix + "/incremental-state.cbo"; S3GetObjectResult Result = m_Client.GetObject(Key); if (!Result.IsSuccess()) @@ -387,14 +515,14 @@ namespace hydration_impl { { return {}; } - throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error); + throw zen::runtime_error("Failed to load incremental metadata from '{}': {}"sv, Key, Result.Error); } CbValidateError Error; CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error); if (Error != CbValidateError::None) { - throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error)); + throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}"sv, Key, ToString(Error)); } return Meta; } @@ -402,13 +530,13 @@ namespace hydration_impl { CbObject S3Storage::GetSettings() { CbObjectWriter Writer; - Writer << "MultipartChunkSize" << m_MultipartChunkSize; + Writer << "MultipartChunkSize"sv << m_MultipartChunkSize; return Writer.Save(); } void S3Storage::ParseSettings(const CbObjectView& Settings) { - m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(S3Storage::DefaultMultipartChunkSize); + m_MultipartChunkSize = Settings["MultipartChunkSize"sv].AsUInt64(DefaultMultipartChunkSize); } std::vector<IoHash> S3Storage::List() @@ -417,7 +545,7 @@ namespace hydration_impl { S3ListObjectsResult Result = m_Client.ListObjects(CasPrefix); if (!Result.IsSuccess()) { - throw zen::runtime_error("Failed to list S3 objects under '{}': {}", CasPrefix, Result.Error); + throw zen::runtime_error("Failed to list S3 objects under '{}': {}"sv, CasPrefix, Result.Error); } std::vector<IoHash> Hashes; @@ -448,13 +576,15 @@ namespace hydration_impl { Work.ScheduleWork( WorkerPool, [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic<bool>& AbortFlag) { - Stats.RecordThread(); + ZEN_TRACE_CPU("S3Storage::Put"); if (AbortFlag.load()) { return; } - S3Client& Client = m_Client; - std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); + Stopwatch Timer = Stats.BeginRequest(); + auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); + S3Client& Client = m_Client; + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash); if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) { @@ -466,7 +596,7 @@ namespace hydration_impl { m_MultipartChunkSize); if (!Result.IsSuccess()) { - throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); + throw zen::runtime_error("Failed to upload '{}' to S3: {}"sv, Key, Result.Error); } } else @@ -475,7 +605,7 @@ namespace hydration_impl { S3Result Result = Client.PutObject(Key, File.ReadAll()); if (!Result.IsSuccess()) { - throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); + throw zen::runtime_error("Failed to upload '{}' to S3: {}"sv, Key, Result.Error); } } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); @@ -489,7 +619,7 @@ namespace hydration_impl { const std::filesystem::path& DestinationPath, PhaseStats& Stats) { - std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash); if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) { @@ -515,15 +645,17 @@ namespace hydration_impl { uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset); Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data, &Stats](std::atomic<bool>& AbortFlag) { - Stats.RecordThread(); - if (AbortFlag) + ZEN_TRACE_CPU("S3Storage::GetRange"); + if (AbortFlag.load()) { return; } - S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize); + Stopwatch Timer = Stats.BeginRequest(); + auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); + S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize); if (!Chunk.IsSuccess()) { - throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", + throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}"sv, Key, Offset, Offset + ChunkSize - 1, @@ -541,15 +673,17 @@ namespace hydration_impl { Work.ScheduleWork( WorkerPool, [this, Key = Key, Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats](std::atomic<bool>& AbortFlag) { - Stats.RecordThread(); - if (AbortFlag) + ZEN_TRACE_CPU("S3Storage::Get"); + if (AbortFlag.load()) { return; } - S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir); + Stopwatch Timer = Stats.BeginRequest(); + auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); + S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir); if (!Chunk.IsSuccess()) { - throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error); + throw zen::runtime_error("Failed to download '{}' from S3: {}"sv, Key, Chunk.Error); } if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) @@ -585,16 +719,18 @@ namespace hydration_impl { void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) { Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), &Stats](std::atomic<bool>& AbortFlag) { - Stats.RecordThread(); + ZEN_TRACE_CPU("S3Storage::Touch"); if (AbortFlag.load()) { return; } - std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); - S3Result Result = m_Client.Touch(Key); + Stopwatch Timer = Stats.BeginRequest(); + auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash); + S3Result Result = m_Client.Touch(Key); if (!Result.IsSuccess()) { - throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error); + throw zen::runtime_error("Failed to touch '{}' in S3: {}"sv, Key, Result.Error); } }); } @@ -605,7 +741,7 @@ namespace hydration_impl { S3ListObjectsResult ListResult = m_Client.ListObjects(ModulePrefix); if (!ListResult.IsSuccess()) { - throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", ModulePrefix, ListResult.Error); + throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}"sv, ModulePrefix, ListResult.Error); } for (const S3ObjectInfo& Obj : ListResult.Objects) { @@ -617,7 +753,7 @@ namespace hydration_impl { S3Result DelResult = m_Client.DeleteObject(Key); if (!DelResult.IsSuccess()) { - throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error); + throw zen::runtime_error("Failed to delete S3 object '{}': {}"sv, Key, DelResult.Error); } }); } @@ -627,51 +763,141 @@ namespace hydration_impl { // IncrementalHydrator: the only HydrationStrategyBase implementation. // Summary emission for hydrate/dehydrate operations. + // Queue-wait helper: time between earliest schedule and earliest worker start. UINT64_MAX + // sentinels mean the corresponding event never happened (no work scheduled / nothing ran). + inline uint64_t QueueWaitUs(uint64_t FirstScheduleUs, uint64_t FirstStartUs) + { + if (FirstScheduleUs == UINT64_MAX || FirstStartUs == UINT64_MAX || FirstStartUs <= FirstScheduleUs) + { + return 0; + } + return FirstStartUs - FirstScheduleUs; + } + + inline uint64_t SafeAvg(uint64_t Total, uint64_t Count) { return Count ? Total / Count : 0; } + void LogDehydrateSummary(std::string_view Prefix, const DehydrateStatistics& Stats, std::string_view ModuleId, const std::filesystem::path& Source, std::string_view Target) { - const uint64_t HashUs = Stats.Hash.ElapsedUs.load(); - const uint64_t UploadUs = Stats.Upload.ElapsedUs.load(); + const uint64_t TotalFiles = Stats.TotalFiles.load(); + const uint64_t HashUs = Stats.Hash.ElapsedUs.load(); + const uint64_t UploadUs = Stats.Upload.ElapsedUs.load(); + + // Hash phase: per-request data (BeginRequest/EndRequest tracks each hash op). + // Hash is the first phase to schedule work, so cold-pool warm-up shows up here. + const uint64_t HashFiles = Stats.Hash.Files.load(); + const uint64_t HashBytes = Stats.Hash.Bytes.load(); + const uint64_t HashReqCount = Stats.Hash.RequestCount.load(); + const uint64_t HashReqTotalUs = Stats.Hash.RequestTotalUs.load(); + const uint64_t HashReqMaxUs = Stats.Hash.RequestMaxUs.load(); + const uint32_t HashPeak = Stats.Hash.InFlightPeak.load(); + const uint64_t HashQueueUs = QueueWaitUs(Stats.Hash.FirstScheduleUs.load(), Stats.Hash.FirstStartUs.load()); + // Cache hit rate: every TotalFile not re-hashed was served from the cached state. + const uint32_t CacheHitPct = TotalFiles ? gsl::narrow_cast<uint32_t>((TotalFiles - HashFiles) * 100 / TotalFiles) : 0; + + // Upload phase shares one ParallelWork across loose Put (Stats.Upload), pack-blob Put + // (Stats.PackUpload), loose Touch (Stats.Touch), and pack-blob Touch (Stats.PackTouch). + // Per-request data is collected per PhaseStats by Storage::Put / Storage::Touch and + // reported as a single combined "Requests" line. + const uint64_t UpReqCount = Stats.Upload.RequestCount.load() + Stats.PackUpload.RequestCount.load() + + Stats.Touch.RequestCount.load() + Stats.PackTouch.RequestCount.load(); + const uint64_t UpReqTotalUs = Stats.Upload.RequestTotalUs.load() + Stats.PackUpload.RequestTotalUs.load() + + Stats.Touch.RequestTotalUs.load() + Stats.PackTouch.RequestTotalUs.load(); + const uint64_t UpReqMaxUs = std::max({Stats.Upload.RequestMaxUs.load(), + Stats.PackUpload.RequestMaxUs.load(), + Stats.Touch.RequestMaxUs.load(), + Stats.PackTouch.RequestMaxUs.load()}); + const uint32_t UpPeak = std::max({Stats.Upload.InFlightPeak.load(), + Stats.PackUpload.InFlightPeak.load(), + Stats.Touch.InFlightPeak.load(), + Stats.PackTouch.InFlightPeak.load()}); + // Combined first-schedule / first-start across all four phase counters. UINT64_MAX is + // the unset sentinel and naturally loses to any real timestamp under std::min, so empty + // phases fall through to the others. + const uint64_t UpFirstSchedUs = std::min({Stats.Upload.FirstScheduleUs.load(), + Stats.PackUpload.FirstScheduleUs.load(), + Stats.Touch.FirstScheduleUs.load(), + Stats.PackTouch.FirstScheduleUs.load()}); + const uint64_t UpFirstStartUs = std::min({Stats.Upload.FirstStartUs.load(), + Stats.PackUpload.FirstStartUs.load(), + Stats.Touch.FirstStartUs.load(), + Stats.PackTouch.FirstStartUs.load()}); + const uint64_t UpQueueUs = QueueWaitUs(UpFirstSchedUs, UpFirstStartUs); + + const uint64_t LooseFiles = Stats.Upload.Files.load(); + const uint64_t LooseBytes = Stats.Upload.Bytes.load(); + const uint64_t TouchFiles = Stats.Touch.Files.load(); + const uint64_t TouchBytes = Stats.Touch.Bytes.load(); + const uint64_t PackTouchPacks = Stats.PackTouch.Files.load(); + const uint64_t PackTouchBytes = Stats.PackTouch.Bytes.load(); + + const uint64_t PackCount = Stats.PackCount.load(); + const uint64_t PackedFiles = Stats.PackedFiles.load(); + const uint64_t PackBytes = Stats.PackBytes.load(); + const uint64_t PackBuildUs = Stats.PackBuildUs.load(); + const uint64_t PackUploadFiles = Stats.PackUpload.Files.load(); + const uint64_t PackUploadBytes = Stats.PackUpload.Bytes.load(); + ZEN_INFO( "{} module '{}': {} files ({}) in {}\n" " Source: {}\n" " Target: {}\n" " Load state: {}\n" " Dir scan: {}\n" - " Hash phase: {} {}/{} ({}) hashed, {}bits/s, {} threads\n" + " Hash: {} {}/{} files ({}) hashed, {}% cache hit, {}bits/s\n" + " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n" " List existing: {}\n" - " Upload phase: {} {}/{} ({}) uploaded, {} ({}) touched, {}bits/s, {} threads\n" - " Metadata save: {}\n" + " Pack: {} {} packs, {} files, {}, {}bits/s\n" + " Upload: {} loose {} files ({}), packed {} blobs ({}), touched {} loose ({}) + {} packs ({}), {}bits/s\n" + " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n" + " Save metadata: {}\n" " Clean: {}", Prefix, ModuleId, - ThousandsNum(Stats.TotalFiles.load()), + ThousandsNum(TotalFiles), NiceBytes(Stats.TotalBytes.load()), - NiceLatencyNs(Stats.TotalUs.load() * 1000), + NiceTimeSpanUs(Stats.TotalUs.load()), Source.generic_string(), Target, - NiceLatencyNs(Stats.LoadStateUs.load() * 1000), - NiceLatencyNs(Stats.DirScanUs.load() * 1000), - NiceLatencyNs(HashUs * 1000), - ThousandsNum(Stats.Hash.Files.load()), - ThousandsNum(Stats.TotalFiles.load()), - NiceBytes(Stats.Hash.Bytes.load()), - NiceNum(BitsPerSecond(Stats.Hash.Bytes.load(), HashUs)), - Stats.Hash.ThreadIds.size(), - NiceLatencyNs(Stats.ListExistingUs.load() * 1000), - NiceLatencyNs(UploadUs * 1000), - ThousandsNum(Stats.Upload.Files.load()), - ThousandsNum(Stats.TotalFiles.load()), - NiceBytes(Stats.Upload.Bytes.load()), - ThousandsNum(Stats.Touch.Files.load()), - NiceBytes(Stats.Touch.Bytes.load()), - NiceNum(BitsPerSecond(Stats.Upload.Bytes.load(), UploadUs)), - Stats.Upload.ThreadIds.size(), - NiceLatencyNs(Stats.MetadataSaveUs.load() * 1000), - NiceLatencyNs(Stats.CleanUs.load() * 1000)); + NiceTimeSpanUs(Stats.LoadStateUs.load()), + NiceTimeSpanUs(Stats.DirScanUs.load()), + NiceTimeSpanUs(HashUs), + ThousandsNum(HashFiles), + ThousandsNum(TotalFiles), + NiceBytes(HashBytes), + CacheHitPct, + NiceNum(BitsPerSecond(HashBytes, HashUs)), + ThousandsNum(HashReqCount), + NiceTimeSpanUs(SafeAvg(HashReqTotalUs, HashReqCount)), + NiceTimeSpanUs(HashReqMaxUs), + HashPeak, + NiceTimeSpanUs(HashQueueUs), + NiceTimeSpanUs(Stats.ListExistingUs.load()), + NiceTimeSpanUs(PackBuildUs), + ThousandsNum(PackCount), + ThousandsNum(PackedFiles), + NiceBytes(PackBytes), + NiceNum(BitsPerSecond(PackBytes, PackBuildUs)), + NiceTimeSpanUs(UploadUs), + ThousandsNum(LooseFiles), + NiceBytes(LooseBytes), + ThousandsNum(PackUploadFiles), + NiceBytes(PackUploadBytes), + ThousandsNum(TouchFiles), + NiceBytes(TouchBytes), + ThousandsNum(PackTouchPacks), + NiceBytes(PackTouchBytes), + NiceNum(BitsPerSecond(LooseBytes + PackUploadBytes, UploadUs)), + ThousandsNum(UpReqCount), + NiceTimeSpanUs(SafeAvg(UpReqTotalUs, UpReqCount)), + NiceTimeSpanUs(UpReqMaxUs), + UpPeak, + NiceTimeSpanUs(UpQueueUs), + NiceTimeSpanUs(Stats.SaveMetadataUs.load()), + NiceTimeSpanUs(Stats.CleanUs.load())); } void LogHydrateSummary(std::string_view Prefix, @@ -681,42 +907,137 @@ namespace hydration_impl { const std::filesystem::path& Target) { const uint64_t DownloadUs = Stats.Download.ElapsedUs.load(); + + // Standalone (Stats.Download) and pack (Stats.PackDownload) downloads share one + // ParallelWork. Per-request data is collected per PhaseStats by Storage::Get; + // reported as a single combined "Requests" line. Multipart GETs may split a pack + // into several ranged requests, so DlReqCount can exceed file/blob counts. + const uint64_t StandaloneFiles = Stats.Download.Files.load(); + const uint64_t StandaloneBytes = Stats.Download.Bytes.load(); + const uint64_t PackDlFiles = Stats.PackDownload.Files.load(); + const uint64_t PackDlBytes = Stats.PackDownload.Bytes.load(); + const uint64_t DlReqCount = Stats.Download.RequestCount.load() + Stats.PackDownload.RequestCount.load(); + const uint64_t DlReqTotalUs = Stats.Download.RequestTotalUs.load() + Stats.PackDownload.RequestTotalUs.load(); + const uint64_t DlReqMaxUs = std::max(Stats.Download.RequestMaxUs.load(), Stats.PackDownload.RequestMaxUs.load()); + const uint32_t DlPeak = std::max(Stats.Download.InFlightPeak.load(), Stats.PackDownload.InFlightPeak.load()); + const uint64_t DlFirstSchedUs = std::min(Stats.Download.FirstScheduleUs.load(), Stats.PackDownload.FirstScheduleUs.load()); + const uint64_t DlFirstStartUs = std::min(Stats.Download.FirstStartUs.load(), Stats.PackDownload.FirstStartUs.load()); + const uint64_t QueueUs = QueueWaitUs(DlFirstSchedUs, DlFirstStartUs); + + const uint64_t PackCount = Stats.PackCount.load(); + const uint64_t PackedFiles = Stats.PackedFiles.load(); + const uint64_t PackUnpackUs = Stats.PackUnpackUs.load(); + const uint64_t UnpackWriteBytes = Stats.UnpackWriteBytes.load(); + + const uint64_t CreateDirsUs = Stats.CreateDirsUs.load(); + const uint64_t CreateDirsCount = Stats.CreateDirsCount.load(); + const uint64_t CreateDirsRate = CreateDirsUs ? (CreateDirsCount * 1'000'000ull / CreateDirsUs) : 0; + + // Standalone and pack downloads share the Download phase elapsed reported below; + // the unpack line is its own clock (slice + parallel SafeWriteFile). ZEN_INFO( "{} module '{}': {} files ({}) in {}\n" " Source: {}\n" " Target: {}\n" " Load metadata: {}\n" - " Download phase: {} {}/{} ({}) downloaded, {}bits/s, {} threads\n" + " Create dirs: {} {} dirs, {} dirs/s\n" + " Download: {} loose {} files ({}), packed {} blobs ({}), {}bits/s\n" + " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n" + " Unpack: {} {} packs, {} files ({}), {}bits/s\n" " Clean: {}\n" - " Rename/copy: {}\n" - " Verify scan: {}", + " Finalize: {}\n" + " Build state: {}", Prefix, ModuleId, ThousandsNum(Stats.TotalFiles.load()), NiceBytes(Stats.TotalBytes.load()), - NiceLatencyNs(Stats.TotalUs.load() * 1000), + NiceTimeSpanUs(Stats.TotalUs.load()), Source, Target.generic_string(), - NiceLatencyNs(Stats.LoadMetadataUs.load() * 1000), - NiceLatencyNs(DownloadUs * 1000), - ThousandsNum(Stats.Download.Files.load()), - ThousandsNum(Stats.TotalFiles.load()), - NiceBytes(Stats.Download.Bytes.load()), - NiceNum(BitsPerSecond(Stats.Download.Bytes.load(), DownloadUs)), - Stats.Download.ThreadIds.size(), - NiceLatencyNs(Stats.CleanUs.load() * 1000), - NiceLatencyNs(Stats.RenameOrCopyUs.load() * 1000), - NiceLatencyNs(Stats.VerifyScanUs.load() * 1000)); + NiceTimeSpanUs(Stats.LoadMetadataUs.load()), + NiceTimeSpanUs(CreateDirsUs), + ThousandsNum(CreateDirsCount), + NiceNum(CreateDirsRate), + NiceTimeSpanUs(DownloadUs), + ThousandsNum(StandaloneFiles), + NiceBytes(StandaloneBytes), + ThousandsNum(PackDlFiles), + NiceBytes(PackDlBytes), + NiceNum(BitsPerSecond(StandaloneBytes + PackDlBytes, DownloadUs)), + ThousandsNum(DlReqCount), + NiceTimeSpanUs(SafeAvg(DlReqTotalUs, DlReqCount)), + NiceTimeSpanUs(DlReqMaxUs), + DlPeak, + NiceTimeSpanUs(QueueUs), + NiceTimeSpanUs(PackUnpackUs), + ThousandsNum(PackCount), + ThousandsNum(PackedFiles), + NiceBytes(UnpackWriteBytes), + NiceNum(BitsPerSecond(UnpackWriteBytes, PackUnpackUs)), + NiceTimeSpanUs(Stats.CleanUs.load()), + NiceTimeSpanUs(Stats.FinalizeUs.load()), + NiceTimeSpanUs(Stats.BuildStateUs.load())); } /////////////////////////////////////////////////////////////////////// + // File-manifest entry: one of these per file in a module's state. Lives in + // the namespace (rather than nested in IncrementalHydrator) so the helper + // functions below can take it by reference. + + struct Entry + { + std::string RelativePath; + uint64_t Size; + uint64_t ModTick; + IoHash Hash; + bool IsPacked = false; // true if content is part of a pack (PackHash valid) + // Hash of the pack's concatenated raw bytes (= pack's CAS key). Hydrate downloads + // the pack once and slices this entry out at the offset recorded in Pack.Entries[] + // for the matching Entry.Hash. + IoHash PackHash; + }; + + /////////////////////////////////////////////////////////////////////// + // Pack types: produced by dehydrate's pack phase, consumed by the state + // writer; consumed during hydrate to reconstruct slices. + + struct BuiltPackEntry + { + IoHash Hash; + uint64_t Size; + }; + + struct BuiltPack + { + IoHash PackHash; + uint64_t Size; + std::vector<BuiltPackEntry> Entries; + }; + + struct PackEntryDescriptor + { + IoHash Hash; + uint64_t Size; + uint64_t Offset; + }; + + struct PackDescriptor + { + uint64_t Size = 0; + std::vector<PackEntryDescriptor> Entries; + }; + + using EntryGroup = std::vector<size_t>; + using PackPlan = std::vector<EntryGroup>; + + /////////////////////////////////////////////////////////////////////// // Holds a per-module StorageBase and threading context; drives the // hydrate/dehydrate algorithm. class IncrementalHydrator : public HydrationStrategyBase { public: - IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage); + IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage, std::span<const std::string> Excludes); virtual ~IncrementalHydrator() override; virtual void Dehydrate(const CbObject& CachedState) override; @@ -724,16 +1045,9 @@ namespace hydration_impl { virtual void Obliterate() override; private: - struct Entry - { - std::filesystem::path RelativePath; - uint64_t Size; - uint64_t ModTick; - IoHash Hash; - }; - std::unique_ptr<StorageBase> m_Storage; HydrationConfig m_Config; + std::vector<std::string> m_Excludes; WorkerThreadPool m_FallbackWorkPool; std::atomic<bool> m_FallbackAbortFlag{false}; std::atomic<bool> m_FallbackPauseFlag{false}; @@ -743,11 +1057,657 @@ namespace hydration_impl { }; /////////////////////////////////////////////////////////////////////// + // Phase helpers used by IncrementalHydrator::Dehydrate and ::Hydrate. + // These keep the two big member functions readable as a sequence of + // named phases. Helpers take only the data they need and never the + // full HydrationConfig or IncrementalHydrator. + + // Removes each path, ignoring errors. Used by both Dehydrate and Hydrate + // pack-cleanup guards plus the explicit pre-rename cleanup on Hydrate. + void RemoveStagedPackFiles(const std::vector<std::filesystem::path>& Files) + { + for (const std::filesystem::path& P : Files) + { + std::error_code Ec; + RemoveFile(P, Ec); + if (Ec) + { + ZEN_WARN("Failed to remove staged pack file '{}': {}", P, Ec.message()); + } + } + } + + // Collects parent_path() of each input, sorts lex ascending (= ancestor-first), + // uniques, then drops any entry that is a strict component-prefix of the next + // entry (its descendant's CreateDirectories recursion will create it). Calls + // CreateDirectories on the surviving leaves. Use before scheduling parallel + // writes so worker threads do not race to create the same parents. + size_t CreateParentDirectories(const std::vector<std::filesystem::path>& FilePaths) + { + if (FilePaths.empty()) + { + return 0; + } + + std::vector<std::filesystem::path> Dirs; + Dirs.reserve(FilePaths.size()); + for (const std::filesystem::path& File : FilePaths) + { + if (File.has_parent_path()) + { + Dirs.push_back(File.parent_path()); + } + } + if (Dirs.empty()) + { + return 0; + } + + std::sort(Dirs.begin(), Dirs.end()); + Dirs.erase(std::unique(Dirs.begin(), Dirs.end()), Dirs.end()); + + size_t Write = 0; + for (size_t Read = 0; Read < Dirs.size(); ++Read) + { + if (Read + 1 < Dirs.size()) + { + const std::filesystem::path& Cur = Dirs[Read]; + const std::filesystem::path& Next = Dirs[Read + 1]; + const auto [ItCur, ItNext] = std::mismatch(Cur.begin(), Cur.end(), Next.begin(), Next.end()); + if (ItCur == Cur.end() && ItNext != Next.end()) + { + continue; // Cur is component-prefix of Next; descendant will create it + } + } + if (Write != Read) + { + Dirs[Write] = std::move(Dirs[Read]); + } + ++Write; + } + Dirs.resize(Write); + + for (const std::filesystem::path& Dir : Dirs) + { + CreateDirectories(Dir); + } + return Dirs.size(); + } + + // Parses CachedState["Files"sv] into a path-keyed lookup + parallel Entries vector. + // Used by Dehydrate to seed its hash cache; Dehydrate ignores PackHash here. + void LoadCachedStateEntries(const CbObject& CachedState, + std::unordered_map<std::string, size_t>& OutLookup, + std::vector<Entry>& OutEntries) + { + for (CbFieldView FieldView : CachedState["Files"sv].AsArrayView()) + { + CbObjectView EntryView = FieldView.AsObjectView(); + std::string RelativePath(EntryView["Path"sv].AsString()); + uint64_t Size = EntryView["Size"sv].AsUInt64(); + uint64_t ModTick = EntryView["ModTick"sv].AsUInt64(); + IoHash Hash = EntryView["Hash"sv].AsHash(); + + OutLookup.insert_or_assign(RelativePath, OutEntries.size()); + OutEntries.push_back(Entry{.RelativePath = std::move(RelativePath), .Size = Size, .ModTick = ModTick, .Hash = Hash}); + } + } + + // Computes Out.Hash for a single file. For Oodle-compressed files in a `cas/` subdir + // the hash is a meta-hash combining the embedded RawHash with the file size, which + // avoids a collision between an uncompressed file and a same-content compressed file. + // All other files use a streaming raw hash via BasicFile + IoHashStream (sequential + // read, friendlier to the Windows cache manager than mmap). + void HashFileContent(const std::filesystem::path& AbsPath, Entry& Out) + { + if (AbsPath.extension().empty()) + { + std::string_view Rel = Out.RelativePath; + std::string_view First = Rel.substr(0, Rel.find('/')); + if (First.ends_with("cas")) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), RawHash, RawSize); + if (Compressed) + { + IoHashStream Hasher; + Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); + Hasher.Append(&Out.Size, sizeof(Out.Size)); + Out.Hash = Hasher.GetHash(); + return; + } + } + } + + BasicFile File(AbsPath, BasicFile::Mode::kRead); + IoHashStream Hasher; + File.StreamFile([&Hasher](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); + Out.Hash = Hasher.GetHash(); + } + + // Walks DirContent, fills Entries[], schedules hash work for files whose hash + // is not in the StateEntries cache. The caller owns the ParallelWork's Wait() + // so other operations (e.g. Storage::List) can overlap with hashing. Files whose + // relative path matches any pattern in Excludes are dropped here (the hub-wide + // default list - see DefaultExcludes() above - covers transient runtime files + // like .lock and .sentry-native; the user can override via HydrationOptions). + // Returns the number of accepted (non-filtered) entries; OutTotalBytes accumulates + // their sizes. + size_t ScanAndScheduleHashWork(const DirectoryContent& DirContent, + const std::filesystem::path& ServerStateDir, + const std::unordered_map<std::string, size_t>& StateEntryLookup, + const std::vector<Entry>& StateEntries, + std::span<const std::string> Excludes, + std::vector<Entry>& Entries, + uint64_t& OutTotalBytes, + ParallelWork& Work, + WorkerThreadPool& Pool, + PhaseStats& HashStats) + { + size_t TotalFiles = 0; + OutTotalBytes = 0; + for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) + { + const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); + std::string RelKey = RelativePath.generic_string(); + if (IsExcluded(RelKey, Excludes)) + { + continue; + } + const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]); + + Entry& CurrentEntry = Entries[TotalFiles]; + CurrentEntry.RelativePath = std::move(RelKey); + CurrentEntry.Size = DirContent.FileSizes[FileIndex]; + CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex]; + + bool FoundHash = false; + if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath); KnownIt != StateEntryLookup.end()) + { + const Entry& StateEntry = StateEntries[KnownIt->second]; + if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick) + { + CurrentEntry.Hash = StateEntry.Hash; + FoundHash = true; + } + } + + if (!FoundHash) + { + Work.ScheduleWork(Pool, [AbsPath, EntryIndex = TotalFiles, &Entries, &HashStats](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + Stopwatch Timer = HashStats.BeginRequest(); + auto GuardEnd = MakeGuard([&] { HashStats.EndRequest(Timer.GetElapsedTimeUs()); }); + Entry& CurrentEntry = Entries[EntryIndex]; + HashFileContent(AbsPath, CurrentEntry); + HashStats.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed); + }); + HashStats.Files.fetch_add(1, std::memory_order_relaxed); + HashStats.RecordScheduled(); + } + TotalFiles++; + OutTotalBytes += CurrentEntry.Size; + } + return TotalFiles; + } + + // Plans pack composition deterministically: groups Entries by content hash for + // candidates Size < Threshold, sorts groups by ascending IoHash, bin-packs greedily + // up to MaxPackBytes, and discards any pack with fewer than two entries. Sets + // `IsPacked = true` on every entry that survives into a published pack so the caller + // can immediately distinguish loose-CAS uploads from pack-bound uploads. + // Returns one PackPlan per pack to build (empty if no packs are produced). + std::vector<PackPlan> PlanPacks(std::vector<Entry>& Entries, uint64_t Threshold, uint64_t MaxPackBytes) + { + // 1. Group small-file Entries[] indices by content hash. Every index in a group + // shares the same bytes, so any one of them sources the pack content; all of + // them get tagged IsPacked once the pack hash is known. + std::unordered_map<IoHash, EntryGroup, IoHash::Hasher> UniqueMap; + for (size_t Index = 0; Index < Entries.size(); ++Index) + { + if (Entries[Index].Size >= Threshold) + { + continue; + } + UniqueMap[Entries[Index].Hash].push_back(Index); + } + + // Need at least 2 unique groups for any pack to survive the "discard 1-entry packs" rule. + if (UniqueMap.size() < 2) + { + return {}; + } + + auto GroupHash = [&](const EntryGroup& G) -> const IoHash& { return Entries[G.front()].Hash; }; + auto GroupSize = [&](const EntryGroup& G) -> uint64_t { return Entries[G.front()].Size; }; + + // 2. Deterministic order: ascending IoHash. Drain the map so the index vectors move. + std::vector<EntryGroup> Ordered; + Ordered.reserve(UniqueMap.size()); + for (auto& [h, g] : UniqueMap) + { + Ordered.push_back(std::move(g)); + } + std::sort(Ordered.begin(), Ordered.end(), [&](const EntryGroup& A, const EntryGroup& B) { return GroupHash(A) < GroupHash(B); }); + + // 3. Bin-pack greedily under MaxPackBytes. + std::vector<PackPlan> Plans; + PackPlan Current; + uint64_t CurrentSize = 0; + for (EntryGroup& Group : Ordered) + { + const uint64_t Size = GroupSize(Group); + if (Size >= MaxPackBytes) + { + continue; // fallback to standalone upload + } + if (CurrentSize + Size > MaxPackBytes && !Current.empty()) + { + if (Current.size() >= 2) + { + Plans.push_back(std::move(Current)); + } + Current = {}; + CurrentSize = 0; + } + Current.push_back(std::move(Group)); + CurrentSize += Size; + } + if (Current.size() >= 2) + { + Plans.push_back(std::move(Current)); + } + + // Tag entries that survived into a published pack so the loose-upload loop can skip + // them. Done after bin-packing so groups discarded by the <2-entry rule are not tagged. + for (const PackPlan& Plan : Plans) + { + for (const EntryGroup& Group : Plan) + { + for (size_t Idx : Group) + { + Entries[Idx].IsPacked = true; + } + } + } + return Plans; + } + + // Reads each source file in Plan, hashes the concatenation, writes raw bytes to + // TempPath. Throws on size mismatch (message includes ModuleId for grep). Scratch + // is owned by the caller and reused across packs; size must be >= the largest + // candidate file (i.e. the pack threshold). + BuiltPack BuildPack(const PackPlan& Plan, + const std::vector<Entry>& Entries, + const std::filesystem::path& ServerStateDir, + const std::filesystem::path& TempPath, + std::string_view ModuleId, + std::vector<uint8_t>& Scratch) + { + BuiltPack BP; + BP.Entries.reserve(Plan.size()); + IoHashStream Hasher; + uint64_t Offset = 0; + { + BasicFile PackFile(TempPath, BasicFile::Mode::kTruncate); + BasicFileWriter Writer(PackFile, /*BufferSize*/ 64 * 1024); + for (const EntryGroup& Group : Plan) + { + // Every Entries[idx] in a group shares the same content hash (= same bytes), + // so the first one is a fine source. + const Entry& Rep = Entries[Group.front()]; + std::filesystem::path AbsPath = MakeSafeAbsolutePath(ServerStateDir / Rep.RelativePath); + BasicFile Src(AbsPath, BasicFile::Mode::kRead); + const uint64_t Size = Src.FileSize(); + if (Size != Rep.Size || Size > Scratch.size()) + { + throw zen::runtime_error("Pack entry for hash {} (module '{}'): expected {} bytes, file is {} at '{}'"sv, + Rep.Hash, + ModuleId, + Rep.Size, + Size, + AbsPath); + } + Src.Read(Scratch.data(), Size, 0); + Hasher.Append(Scratch.data(), Size); + Writer.Write(Scratch.data(), Size, Offset); + Offset += Size; + BP.Entries.push_back(BuiltPackEntry{.Hash = Rep.Hash, .Size = Rep.Size}); + } + Writer.Flush(); + } + + BP.PackHash = Hasher.GetHash(); + BP.Size = Offset; + return BP; + } + + // Schedules either a Put (if Hash is not in CAS) or a Touch (if it is). Updates + // counters on the matching PhaseStats - Files++ in both cases, Bytes+=Size on the + // touch path so touched-bytes accounting tracks size-equivalent work that did not + // transfer. Both paths call RecordScheduled so the queue-wait line covers cache-warm + // dehydrates (only Touches scheduled). Used for both loose CAS (UploadStats=Stats.Upload, + // TouchStats=Stats.Touch) and pack blobs (UploadStats=Stats.PackUpload, TouchStats= + // Stats.PackTouch). UploadStats and TouchStats must be distinct PhaseStats so the upload- + // throughput metric is not inflated by touched bytes that did not transfer. + void ScheduleUploadOrTouch(StorageBase& Storage, + ParallelWork& Work, + WorkerThreadPool& Pool, + const std::unordered_set<IoHash, IoHash::Hasher>& ExistsLookup, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath, + PhaseStats& UploadStats, + PhaseStats& TouchStats) + { + if (ExistsLookup.contains(Hash)) + { + // Refresh the backend's modification time so lifecycle-expiration policies + // do not evict CAS entries that are still referenced by this module. + Storage.Touch(Work, Pool, Hash, TouchStats); + TouchStats.Files.fetch_add(1, std::memory_order_relaxed); + TouchStats.Bytes.fetch_add(Size, std::memory_order_relaxed); + TouchStats.RecordScheduled(); + } + else + { + Storage.Put(Work, Pool, Hash, Size, SourcePath, UploadStats); + UploadStats.Files.fetch_add(1, std::memory_order_relaxed); + UploadStats.RecordScheduled(); + } + } + + // Builds and saves the dehydrate state.cbo: header fields, optional Packs[] array, + // and the Files[] array. ModuleId is stored in the manifest. + void WriteDehydrateMetadata(StorageBase& Storage, + const std::filesystem::path& ServerStateDir, + std::string_view ModuleId, + uint64_t TotalBytes, + uint64_t DehydrateDurationMs, + const std::vector<BuiltPack>& BuiltPacks, + const std::vector<Entry>& Entries) + { + UtcTime Now = UtcTime::Now(); + std::string DehydrateTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z"sv, + Now.Tm.tm_year + 1900, + Now.Tm.tm_mon + 1, + Now.Tm.tm_mday, + Now.Tm.tm_hour, + Now.Tm.tm_min, + Now.Tm.tm_sec, + Now.Ms); + + CbObjectWriter Meta; + Meta << "SchemaVersion"sv << HydrationSchemaVersion; + Meta << "SourceFolder"sv << ServerStateDir.generic_string(); + Meta << "ModuleId"sv << ModuleId; + Meta << "HostName"sv << GetMachineName(); + Meta << "DehydrateTimeUtc"sv << DehydrateTimeUtc; + Meta << "DehydrateDurationMs"sv << DehydrateDurationMs; + Meta << "TotalSizeBytes"sv << TotalBytes; + Meta << "StorageSettings"sv << Storage.GetSettings(); + + if (!BuiltPacks.empty()) + { + Meta.BeginArray("Packs"sv); + for (const BuiltPack& BP : BuiltPacks) + { + Meta.BeginObject(); + { + Meta << "Hash"sv << BP.PackHash; + Meta << "Size"sv << BP.Size; + Meta.BeginArray("Entries"sv); + for (const BuiltPackEntry& BPE : BP.Entries) + { + Meta.BeginObject(); + { + Meta << "Hash"sv << BPE.Hash; + Meta << "Size"sv << BPE.Size; + } + Meta.EndObject(); + } + Meta.EndArray(); + } + Meta.EndObject(); + } + Meta.EndArray(); + } + + Meta.BeginArray("Files"sv); + for (const Entry& CurrentEntry : Entries) + { + Meta.BeginObject(); + { + Meta << "Path"sv << CurrentEntry.RelativePath; + Meta << "Size"sv << CurrentEntry.Size; + Meta << "ModTick"sv << CurrentEntry.ModTick; + Meta << "Hash"sv << CurrentEntry.Hash; + if (CurrentEntry.IsPacked) + { + Meta << "PackHash"sv << CurrentEntry.PackHash; + } + } + Meta.EndObject(); + } + Meta.EndArray(); + + Storage.SaveMetadata(Meta.Save()); + } + + // Parses Meta["Files"sv] into Entries[] + path lookup. Reads PackHash and sets + // IsPacked when present. Used by Hydrate. + void ParseFilesArray(const CbObject& Meta, + std::vector<Entry>& OutEntries, + std::unordered_map<std::string, size_t>& OutLookup, + uint64_t& OutTotalSize) + { + OutTotalSize = 0; + for (CbFieldView FieldView : Meta["Files"sv]) + { + CbObjectView EntryView = FieldView.AsObjectView(); + if (EntryView) + { + Entry NewEntry = {.RelativePath{EntryView["Path"sv].AsString()}, + .Size = EntryView["Size"sv].AsUInt64(), + .ModTick = EntryView["ModTick"sv].AsUInt64(), + .Hash = EntryView["Hash"sv].AsHash()}; + IoHash PackHash = EntryView["PackHash"sv].AsHash(); + if (PackHash != IoHash::Zero) + { + NewEntry.IsPacked = true; + NewEntry.PackHash = PackHash; + } + OutTotalSize += NewEntry.Size; + OutLookup.insert_or_assign(NewEntry.RelativePath, OutEntries.size()); + OutEntries.emplace_back(std::move(NewEntry)); + } + } + } + + // Parses Meta["Packs"sv] into a hash-keyed descriptor map. Each PackDescriptor's + // Entries[] gets a prefix-sum offset for O(1) slice lookup at unpack time. + std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher> ParsePacksArray(const CbObject& Meta) + { + std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher> PackMap; + for (CbFieldView FieldView : Meta["Packs"sv]) + { + CbObjectView PackView = FieldView.AsObjectView(); + if (!PackView) + { + continue; + } + IoHash PackHash = PackView["Hash"sv].AsHash(); + PackDescriptor PD; + PD.Size = PackView["Size"sv].AsUInt64(); + uint64_t Offset = 0; + for (CbFieldView EF : PackView["Entries"sv]) + { + CbObjectView EV = EF.AsObjectView(); + if (!EV) + { + continue; + } + PackEntryDescriptor E{.Hash = EV["Hash"sv].AsHash(), .Size = EV["Size"sv].AsUInt64(), .Offset = Offset}; + Offset += E.Size; + PD.Entries.push_back(E); + } + PackMap.emplace(PackHash, std::move(PD)); + } + return PackMap; + } + + // For each downloaded pack: read it into a heap buffer, verify size, and slice + // into per-entry IoBuffers (zero-copy views). Throws on size mismatch with the + // existing message format. + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> BuildHashToSlice( + const std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher>& PackMap, + const std::filesystem::path& TempDir, + std::string_view ModuleId) + { + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> HashToSlice; + size_t TotalPackEntries = 0; + for (const auto& [PackHash, PD] : PackMap) + { + TotalPackEntries += PD.Entries.size(); + } + HashToSlice.reserve(TotalPackEntries); + + for (const auto& [PackHash, PD] : PackMap) + { + std::filesystem::path PackPath = TempDir / "packs" / fmt::format("{}.bin"sv, PackHash); + // Heap-allocated buffer via direct ReadFile avoids mmap materialization + // and page-fault latency during the parallel unpack-write that follows. + BasicFile PackFile(PackPath, BasicFile::Mode::kRead); + IoBuffer PackBuf = PackFile.ReadAll(); + if (PackBuf.GetSize() != PD.Size) + { + throw zen::runtime_error("Pack '{}' size mismatch for module '{}' at '{}': expected {}, got {}"sv, + PackHash, + ModuleId, + PackPath, + PD.Size, + PackBuf.GetSize()); + } + + for (const auto& E : PD.Entries) + { + HashToSlice.emplace(E.Hash, IoBuffer(PackBuf, E.Offset, E.Size)); + } + } + return HashToSlice; + } + + // Migrates contents of SourceDir into ServerStateDir. Same-volume: top-level rename + // per child. Different-volume: full CopyTree fallback. Caller is responsible for + // final cleanup of the parent temp directory (which may hold sibling staging dirs + // like packs/ that must NOT migrate). + void MigrateTempToState(const std::filesystem::path& SourceDir, + const std::filesystem::path& ServerStateDir, + const HydrationConfig::ThreadingOptions& Threading) + { + // If the two paths share at least one common component they are on the same drive/volume + // and atomic renames will succeed. Otherwise fall back to a full copy. + auto [ItSrc, ItState] = std::mismatch(SourceDir.begin(), SourceDir.end(), ServerStateDir.begin(), ServerStateDir.end()); + if (ItSrc != SourceDir.begin()) + { + DirectoryContent DirContent; + GetDirectoryContent(*Threading.WorkerPool, + SourceDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, + DirContent); + + for (const std::filesystem::path& AbsPath : DirContent.Directories) + { + std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); + std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'"sv, AbsPath, Dest)); + } + } + for (const std::filesystem::path& AbsPath : DirContent.Files) + { + std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); + std::error_code Ec = RenameFileWithRetry(AbsPath, Dest); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'"sv, AbsPath, Dest)); + } + } + } + else + { + // Slow path: source and target are on different filesystems, so rename + // would fail. Copy the tree instead. + ZEN_DEBUG("SourceDir and ServerStateDir are on different filesystems - using CopyTree"); + CopyTree(SourceDir, ServerStateDir, {.EnableClone = true}); + } + } + + // Walks ServerStateDir and emits a Files[] cache for the next dehydrate's + // hash-shortcut (mirrors Load state on dehydrate). Files on disk that aren't in + // EntryLookup (manifest) are skipped with a WARN - typically leftovers from an + // earlier crashed hydrate. + CbObject BuildHydrateState(const std::filesystem::path& ServerStateDir, + const std::unordered_map<std::string, size_t>& EntryLookup, + const std::vector<Entry>& Entries, + std::string_view ModuleId, + const HydrationConfig::ThreadingOptions& Threading) + { + DirectoryContent DirContent; + GetDirectoryContent(*Threading.WorkerPool, + ServerStateDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | + DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, + DirContent); + + CbObjectWriter HydrateState; + HydrateState.BeginArray("Files"sv); + for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) + { + std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); + std::string RelKey = RelativePath.generic_string(); + + if (auto It = EntryLookup.find(RelKey); It != EntryLookup.end()) + { + HydrateState.BeginObject(); + { + HydrateState << "Path"sv << RelKey; + HydrateState << "Size"sv << DirContent.FileSizes[FileIndex]; + HydrateState << "ModTick"sv << DirContent.FileModificationTicks[FileIndex]; + HydrateState << "Hash"sv << Entries[It->second].Hash; + } + HydrateState.EndObject(); + } + else + { + // File on disk after hydrate but not in the manifest. Can happen when TempDir + // contained leftovers from a prior crashed hydrate that survived to the rename + // phase. Skip it rather than failing - the manifest is the source of truth for + // the cached state; the stray file is harmless and gets caught by the next + // dehydrate's directory scan. + ZEN_WARN("Hydrate: file '{}' present on disk but missing from manifest for module '{}'; skipping", RelKey, ModuleId); + } + } + HydrateState.EndArray(); + + return HydrateState.Save(); + } + + /////////////////////////////////////////////////////////////////////// // IncrementalHydrator implementations - IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage) + IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, + std::unique_ptr<StorageBase> Storage, + std::span<const std::string> Excludes) : m_Storage(std::move(Storage)) , m_Config(Config) + , m_Excludes(Excludes.begin(), Excludes.end()) , m_FallbackWorkPool(0) { if (Config.Threading) @@ -760,6 +1720,7 @@ namespace hydration_impl { void IncrementalHydrator::Dehydrate(const CbObject& CachedState) { + ZEN_TRACE_CPU("IncrementalHydrator::Dehydrate"); Stopwatch TotalTimer; DehydrateStatistics Stats; const std::string StorageTarget = m_Storage->Describe(); @@ -767,24 +1728,17 @@ namespace hydration_impl { const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); try { + // Load the cache from the previous dehydrate to short-circuit re-hashing of + // unchanged files (matched by Path+Size+ModTick). std::unordered_map<std::string, size_t> StateEntryLookup; std::vector<Entry> StateEntries; { Stopwatch LoadStateTimer; - for (CbFieldView FieldView : CachedState["Files"].AsArrayView()) - { - CbObjectView EntryView = FieldView.AsObjectView(); - std::filesystem::path RelativePath(EntryView["Path"].AsString()); - uint64_t Size = EntryView["Size"].AsUInt64(); - uint64_t ModTick = EntryView["ModTick"].AsUInt64(); - IoHash Hash = EntryView["Hash"].AsHash(); - - StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size()); - StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash}); - } + LoadCachedStateEntries(CachedState, StateEntryLookup, StateEntries); Stats.LoadStateUs = LoadStateTimer.GetElapsedTimeUs(); } + // Scan the server state directory. DirectoryContent DirContent; { Stopwatch DirScanTimer; @@ -802,101 +1756,29 @@ namespace hydration_impl { DirContent.Files.size(), NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0)))); + // Hash phase: build Entries[] and schedule hash work for files not in the cache. + // Storage::List runs in parallel with hashing to populate ExistsLookup before Wait. std::vector<Entry> Entries; Entries.resize(DirContent.Files.size()); - uint64_t TotalBytes = 0; - uint64_t TotalFiles = 0; - - std::unordered_set<IoHash> ExistsLookup; - + uint64_t TotalBytes = 0; + uint64_t TotalFiles = 0; + std::unordered_set<IoHash, IoHash::Hasher> ExistsLookup; { + Stats.Hash.PhaseClock.Reset(); Stopwatch HashTimer; ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) - { - const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]); - if (AbsPath.filename() == "reserve.gc") - { - continue; - } - const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); - if (*RelativePath.begin() == ".sentry-native") - { - continue; - } - if (RelativePath == ".lock") - { - continue; - } - - Entry& CurrentEntry = Entries[TotalFiles]; - CurrentEntry.RelativePath = RelativePath; - CurrentEntry.Size = DirContent.FileSizes[FileIndex]; - CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex]; - - bool FoundHash = false; - if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end()) - { - const Entry& StateEntry = StateEntries[KnownIt->second]; - if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick) - { - CurrentEntry.Hash = StateEntry.Hash; - FoundHash = true; - } - } - - if (!FoundHash) - { - Work.ScheduleWork(*m_Threading.WorkerPool, - [AbsPath, EntryIndex = TotalFiles, &Entries, &Stats](std::atomic<bool>& AbortFlag) { - Stats.Hash.RecordThread(); - if (AbortFlag) - { - return; - } - - Entry& CurrentEntry = Entries[EntryIndex]; - - bool FoundHash = false; - if (AbsPath.extension().empty()) - { - auto It = CurrentEntry.RelativePath.begin(); - if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas")) - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed( - SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), - RawHash, - RawSize); - if (Compressed) - { - // We compose a meta-hash since taking the RawHash might collide with an - // existing non-compressed file with the same content The collision is - // unlikely except if the compressed data is zero bytes causing RawHash - // to be the same as an empty file. - IoHashStream Hasher; - Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); - Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size)); - CurrentEntry.Hash = Hasher.GetHash(); - FoundHash = true; - } - } - } - - if (!FoundHash) - { - CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath)); - } - Stats.Hash.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed); - }); - Stats.Hash.Files.fetch_add(1, std::memory_order_relaxed); - } - TotalFiles++; - TotalBytes += CurrentEntry.Size; - } + TotalFiles = ScanAndScheduleHashWork(DirContent, + ServerStateDir, + StateEntryLookup, + StateEntries, + m_Excludes, + Entries, + TotalBytes, + Work, + *m_Threading.WorkerPool, + Stats.Hash); { Stopwatch ListTimer; @@ -906,82 +1788,136 @@ namespace hydration_impl { } Work.Wait(); - Entries.resize(TotalFiles); Stats.Hash.ElapsedUs = HashTimer.GetElapsedTimeUs(); Stats.TotalFiles = TotalFiles; Stats.TotalBytes = TotalBytes; } - uint64_t UploadDurationMs = 0; + // Pack planning + unified upload phase. Plan first so we know which entries are + // packed, then run loose-CAS uploads and pack builds inside a single ParallelWork. + // Loose uploads are scheduled up front so they execute on the worker pool while + // the calling thread runs the serial pack-build loop; each completed pack hands + // its upload to the same ParallelWork. One Wait covers everything. + std::vector<BuiltPack> BuiltPacks; + std::vector<std::filesystem::path> StagedPackFiles; + auto PackCleanup = MakeGuard([&] { + RemoveStagedPackFiles(StagedPackFiles); + // Best-effort drop of the now-empty packs/ subdir so TempDir is clean after + // dehydrate. Mirrors the explicit cleanup on the hydrate-side success path. + std::error_code Ec; + DeleteDirectories(MakeSafeAbsolutePath(m_Config.TempDir) / "packs", Ec); + }); + + // PlanPacks tags Entries[Idx].IsPacked on every index that survives into a pack, + // so the loose-upload loop can skip them. PackHash is set later per-pack as each + // pack is built. + const std::vector<PackPlan> Pending = + m_Config.PackEnabled ? PlanPacks(Entries, m_Config.PackThresholdBytes, m_Config.MaxPackBytes) : std::vector<PackPlan>{}; + + uint64_t DehydrateDurationMs = 0; { + // Upload, PackUpload, Touch, and PackTouch share one ParallelWork; reset all + // four PhaseClocks to the same baseline so the queue-wait line can combine + // their FirstScheduleUs / FirstStartUs across the four PhaseStats. + Stats.Upload.PhaseClock.Reset(); + Stats.PackUpload.PhaseClock.Reset(); + Stats.Touch.PhaseClock.Reset(); + Stats.PackTouch.PhaseClock.Reset(); Stopwatch UploadTimer; ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + // Schedule loose-CAS uploads first so they begin running while the pack-build + // loop below executes serially on this thread. for (const Entry& CurrentEntry : Entries) { - if (!ExistsLookup.contains(CurrentEntry.Hash)) - { - m_Storage->Put(Work, - *m_Threading.WorkerPool, - CurrentEntry.Hash, - CurrentEntry.Size, - MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath), - Stats.Upload); - Stats.Upload.Files.fetch_add(1, std::memory_order_relaxed); - } - else + if (CurrentEntry.IsPacked) { - // Refresh the backend's modification time so lifecycle-expiration policies - // do not evict CAS entries that are still referenced by this module. - m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, Stats.Touch); - Stats.Touch.Files.fetch_add(1, std::memory_order_relaxed); - Stats.Touch.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed); + continue; // pack phase covers it } + ScheduleUploadOrTouch(*m_Storage, + Work, + *m_Threading.WorkerPool, + ExistsLookup, + CurrentEntry.Hash, + CurrentEntry.Size, + MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath), + Stats.Upload, + Stats.Touch); } - Work.Wait(); - Stats.Upload.ElapsedUs = UploadTimer.GetElapsedTimeUs(); - UploadDurationMs = TotalTimer.GetElapsedTimeMs(); - - Stopwatch MetadataTimer; - UtcTime Now = UtcTime::Now(); - std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", - Now.Tm.tm_year + 1900, - Now.Tm.tm_mon + 1, - Now.Tm.tm_mday, - Now.Tm.tm_hour, - Now.Tm.tm_min, - Now.Tm.tm_sec, - Now.Ms); - - CbObjectWriter Meta; - Meta << "SourceFolder" << ServerStateDir.generic_string(); - Meta << "ModuleId" << m_Config.ModuleId; - Meta << "HostName" << GetMachineName(); - Meta << "UploadTimeUtc" << UploadTimeUtc; - Meta << "UploadDurationMs" << UploadDurationMs; - Meta << "TotalSizeBytes" << TotalBytes; - Meta << "StorageSettings" << m_Storage->GetSettings(); - - Meta.BeginArray("Files"); - for (const Entry& CurrentEntry : Entries) + if (!Pending.empty()) { - Meta.BeginObject(); + ZEN_TRACE_CPU("IncrementalHydrator::Dehydrate::Pack"); + std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); + std::filesystem::path PacksDir = TempDir / "packs"; + CreateDirectories(PacksDir); + + // Reusable scratch for small-file reads. Every pack candidate has Size < + // PackThresholdBytes so a single buffer of that size holds any one file. + // Build runs serially on the caller's thread - typical modules produce 1-2 + // packs at ~5 ms each, too small to be worth the parallel-dispatch overhead. + std::vector<uint8_t> Scratch(m_Config.PackThresholdBytes); + + for (const PackPlan& Plan : Pending) { - Meta << "Path" << CurrentEntry.RelativePath.generic_string(); - Meta << "Size" << CurrentEntry.Size; - Meta << "ModTick" << CurrentEntry.ModTick; - Meta << "Hash" << CurrentEntry.Hash; + // Pre-register the staging path so PackCleanup removes it even if the + // stream-write loop below throws mid-flight. + Oid::String_t OidStr; + Oid::NewOid().ToString(OidStr); + std::filesystem::path PackTempPath = PacksDir / fmt::format("{}.bin"sv, OidStr); + StagedPackFiles.push_back(PackTempPath); + + Stopwatch BuildTimer; + BuiltPack BP = BuildPack(Plan, Entries, ServerStateDir, PackTempPath, m_Config.ModuleId, Scratch); + Stats.PackBuildUs.fetch_add(BuildTimer.GetElapsedTimeUs(), std::memory_order_relaxed); + + // Stamp the pack hash on every matching entry; state.cbo's Files[] reads + // PackHash off these entries when emitting per-file PackHash references. + uint64_t PackedEntryCount = 0; + for (const EntryGroup& Group : Plan) + { + for (size_t Idx : Group) + { + Entries[Idx].PackHash = BP.PackHash; + } + PackedEntryCount += Group.size(); + } + + Stats.PackCount.fetch_add(1, std::memory_order_relaxed); + Stats.PackedFiles.fetch_add(PackedEntryCount, std::memory_order_relaxed); + Stats.PackBytes.fetch_add(BP.Size, std::memory_order_relaxed); + + ScheduleUploadOrTouch(*m_Storage, + Work, + *m_Threading.WorkerPool, + ExistsLookup, + BP.PackHash, + BP.Size, + PackTempPath, + Stats.PackUpload, + Stats.PackTouch); + + BuiltPacks.push_back(std::move(BP)); } - Meta.EndObject(); } - Meta.EndArray(); - m_Storage->SaveMetadata(Meta.Save()); - Stats.MetadataSaveUs = MetadataTimer.GetElapsedTimeUs(); + Work.Wait(); + // Upload, PackUpload, Touch, and PackTouch share a single ParallelWork. Only + // Upload's ElapsedUs is read by the formatter; the others' bytes/requests are + // reported against the same Upload phase elapsed. + Stats.Upload.ElapsedUs = UploadTimer.GetElapsedTimeUs(); + DehydrateDurationMs = TotalTimer.GetElapsedTimeMs(); } + // Persist the new state.cbo with header, Packs[], and Files[]. + { + Stopwatch SaveMetadataTimer; + WriteDehydrateMetadata(*m_Storage, ServerStateDir, m_Config.ModuleId, TotalBytes, DehydrateDurationMs, BuiltPacks, Entries); + Stats.SaveMetadataUs = SaveMetadataTimer.GetElapsedTimeUs(); + } + + // Server-state dir contents have been uploaded; wipe them. ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); { Stopwatch CleanTimer; @@ -990,7 +1926,7 @@ namespace hydration_impl { } Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); - LogDehydrateSummary("Dehydration complete", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); + LogDehydrateSummary("Dehydration complete"sv, Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); } catch (const std::exception& Ex) { @@ -999,20 +1935,35 @@ namespace hydration_impl { Ex.what(), m_Config.ServerStateDir); Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); - LogDehydrateSummary("Dehydration failed", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); + LogDehydrateSummary("Dehydration failed"sv, Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); } } CbObject IncrementalHydrator::Hydrate() { + ZEN_TRACE_CPU("IncrementalHydrator::Hydrate"); Stopwatch TotalTimer; HydrateStatistics Stats; const std::string StorageSource = m_Storage->Describe(); const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); + // Hydrated files land in TempDir/state/, pack staging blobs in TempDir/packs/. Keeping + // them in sibling subdirectories means MigrateTempToState only needs to hand the state/ + // subtree across to ServerStateDir; pack staging never has a chance to leak into the + // final server state directory. + const std::filesystem::path TempStateDir = TempDir / "state"; try { + CreateDirectories(ServerStateDir); + CreateDirectories(TempDir); + // A prior hydrate may have crashed after downloading but before the rename phase, + // leaving stale files in TempDir that would otherwise get migrated into + // ServerStateDir and trip the post-rename manifest check. + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + CreateDirectories(TempStateDir); + + // Load metadata; absent metadata means a fresh module - clean state, return. CbObject Meta; { Stopwatch LoadTimer; @@ -1028,147 +1979,165 @@ namespace hydration_impl { return CbObject(); } + // Schema-version gate: refuse manifests written by a newer hub. Missing field is + // treated as version 0 (legacy / pre-versioning) and decoded best-effort - the + // optional fields (Packs[], StorageSettings) absent from v0 manifests fall back + // to defaults via ParsePacksArray / ParseSettings. + const uint32_t SchemaVersion = Meta["SchemaVersion"sv].AsUInt32(0); + if (SchemaVersion > HydrationSchemaVersion) + { + throw zen::runtime_error("State manifest for module '{}' has schema version {} but this hub supports up to {}"sv, + m_Config.ModuleId, + SchemaVersion, + HydrationSchemaVersion); + } + + // Parse manifest: Files[] for per-file metadata, Packs[] (optional) for pack + // composition. Missing Packs[] = old-format state; treated as all-standalone. std::unordered_map<std::string, size_t> EntryLookup; std::vector<Entry> Entries; uint64_t TotalSize = 0; - - for (CbFieldView FieldView : Meta["Files"]) - { - CbObjectView EntryView = FieldView.AsObjectView(); - if (EntryView) - { - Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()), - .Size = EntryView["Size"].AsUInt64(), - .ModTick = EntryView["ModTick"].AsUInt64(), - .Hash = EntryView["Hash"].AsHash()}; - TotalSize += NewEntry.Size; - EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size()); - Entries.emplace_back(std::move(NewEntry)); - } - } + ParseFilesArray(Meta, Entries, EntryLookup, TotalSize); + std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher> PackMap = ParsePacksArray(Meta); Stats.TotalFiles = Entries.size(); Stats.TotalBytes = TotalSize; + Stats.PackCount = PackMap.size(); - ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files", + ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files, {} packs", m_Config.ModuleId, m_Config.ServerStateDir, Entries.size(), - NiceBytes(TotalSize)); - - m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView()); + NiceBytes(TotalSize), + PackMap.size()); + + // Re-apply storage settings from state.cbo (e.g. S3 multipart chunk size). + m_Storage->ParseSettings(Meta["StorageSettings"sv].AsObjectView()); + + // Per-entry destination paths under TempStateDir, indexed parallel to Entries[]. Used + // by the standalone download dispatch and (for IsPacked entries) by the unpack + // dispatch. Pre-creating parents once for the union covers both phases without a second pass. + std::vector<std::filesystem::path> EntryPaths; + EntryPaths.reserve(Entries.size()); + for (const Entry& CurrentEntry : Entries) + { + EntryPaths.push_back(MakeSafeAbsolutePath(TempStateDir / CurrentEntry.RelativePath)); + } + { + Stopwatch CreateDirsTimer; + auto RecordElapsed = MakeGuard([&] { Stats.CreateDirsUs = CreateDirsTimer.GetElapsedTimeUs(); }); + Stats.CreateDirsCount = CreateParentDirectories(EntryPaths); + } + // Download phase: pack GETs first (so unpack can begin sooner), then standalone files. + // Both share the same ParallelWork; per-phase byte / request counts stay separate via + // PackDownload vs Download stats while the elapsed time is reported once. { + // Download and PackDownload share one ParallelWork; reset both PhaseClocks + // to the same baseline so the queue-wait line can combine their FirstScheduleUs + // / FirstStartUs across the two PhaseStats. + Stats.Download.PhaseClock.Reset(); + Stats.PackDownload.PhaseClock.Reset(); Stopwatch DownloadTimer; ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - for (const Entry& CurrentEntry : Entries) + const std::filesystem::path PacksDir = TempDir / "packs"; + if (!PackMap.empty()) + { + CreateDirectories(PacksDir); + } + + for (const auto& [PackHash, PD] : PackMap) + { + std::filesystem::path PackPath = PacksDir / fmt::format("{}.bin"sv, PackHash); + m_Storage->Get(Work, *m_Threading.WorkerPool, PackHash, PD.Size, PackPath, Stats.PackDownload); + Stats.PackDownload.Files.fetch_add(1, std::memory_order_relaxed); + Stats.PackDownload.RecordScheduled(); + } + + for (size_t I = 0; I < Entries.size(); ++I) { - std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath); - CreateDirectories(Path.parent_path()); - m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path, Stats.Download); + if (Entries[I].IsPacked) + { + continue; // handled in the unpack phase below + } + m_Storage->Get(Work, *m_Threading.WorkerPool, Entries[I].Hash, Entries[I].Size, EntryPaths[I], Stats.Download); Stats.Download.Files.fetch_add(1, std::memory_order_relaxed); + Stats.Download.RecordScheduled(); } Work.Wait(); Stats.Download.ElapsedUs = DownloadTimer.GetElapsedTimeUs(); } - // Downloaded successfully - swap into ServerStateDir - ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + // Unpack phase: verify each downloaded pack, build hash->slice map, parallel-write. + if (!PackMap.empty()) { - Stopwatch CleanTimer; - CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); - Stats.CleanUs = CleanTimer.GetElapsedTimeUs(); - } + ZEN_TRACE_CPU("IncrementalHydrator::Hydrate::Unpack"); + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> HashToSlice = BuildHashToSlice(PackMap, TempDir, m_Config.ModuleId); - { - Stopwatch RenameTimer; - // If the two paths share at least one common component they are on the same drive/volume - // and atomic renames will succeed. Otherwise fall back to a full copy. - auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end()); - if (ItTmp != TempDir.begin()) { - DirectoryContent DirContent; - GetDirectoryContent(*m_Threading.WorkerPool, - TempDir, - DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, - DirContent); - - for (const std::filesystem::path& AbsPath : DirContent.Directories) + Stopwatch UnpackTimer; + ParallelWork UnpackWork(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + for (size_t I = 0; I < Entries.size(); ++I) { - std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); - std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest); - if (Ec) + if (!Entries[I].IsPacked) { - throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest)); + continue; } - } - for (const std::filesystem::path& AbsPath : DirContent.Files) - { - std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); - std::error_code Ec = RenameFileWithRetry(AbsPath, Dest); - if (Ec) + auto It = HashToSlice.find(Entries[I].Hash); + if (It == HashToSlice.end()) { - throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest)); + throw zen::runtime_error("Packed file '{}' references unknown pack content hash '{}' in module '{}'"sv, + Entries[I].RelativePath, + Entries[I].Hash, + m_Config.ModuleId); } + UnpackWork.ScheduleWork(*m_Threading.WorkerPool, + [&Stats, &Path = EntryPaths[I], Slice = &It->second](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + TemporaryFile::SafeWriteFile(Path, *Slice); + Stats.UnpackWriteBytes.fetch_add(Slice->GetSize(), std::memory_order_relaxed); + Stats.PackedFiles.fetch_add(1, std::memory_order_relaxed); + }); } - - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + UnpackWork.Wait(); + Stats.PackUnpackUs = UnpackTimer.GetElapsedTimeUs(); } - else - { - // Slow path: TempDir and ServerStateDir are on different filesystems, so rename - // would fail. Copy the tree instead and clean up the temp files afterwards. - ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); - CopyTree(TempDir, ServerStateDir, {.EnableClone = true}); - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); - } - Stats.RenameOrCopyUs = RenameTimer.GetElapsedTimeUs(); + // Release the pack buffers (each IoBuffer slice holds a ref to the pack's underlying + // heap buffer) before the rename/verify phase runs - avoids keeping ~sum(pack sizes) + // bytes alive across those phases. + HashToSlice.clear(); } - CbObject StateObject; + // Downloaded successfully - swap TempStateDir contents into ServerStateDir, then + // sweep the rest of TempDir (empty TempStateDir, packs/, anything else). + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); { - Stopwatch VerifyTimer; - DirectoryContent DirContent; - GetDirectoryContent(*m_Threading.WorkerPool, - ServerStateDir, - DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | - DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, - DirContent); - - CbObjectWriter HydrateState; - HydrateState.BeginArray("Files"); - for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) - { - std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); - - if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end()) - { - HydrateState.BeginObject(); - { - HydrateState << "Path" << RelativePath.generic_string(); - HydrateState << "Size" << DirContent.FileSizes[FileIndex]; - HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex]; - HydrateState << "Hash" << Entries[It->second].Hash; - } - HydrateState.EndObject(); - } - else - { - ZEN_ASSERT(false); - } - } - HydrateState.EndArray(); + Stopwatch CleanTimer; + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + Stats.CleanUs = CleanTimer.GetElapsedTimeUs(); + } + { + Stopwatch FinalizeTimer; + MigrateTempToState(TempStateDir, ServerStateDir, m_Threading); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + Stats.FinalizeUs = FinalizeTimer.GetElapsedTimeUs(); + } - StateObject = HydrateState.Save(); - Stats.VerifyScanUs = VerifyTimer.GetElapsedTimeUs(); + // Build the cached state that the next Dehydrate will receive (mirrors Load state on dehydrate). + CbObject StateObject; + { + Stopwatch BuildStateTimer; + StateObject = BuildHydrateState(ServerStateDir, EntryLookup, Entries, m_Config.ModuleId, m_Threading); + Stats.BuildStateUs = BuildStateTimer.GetElapsedTimeUs(); } Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); - LogHydrateSummary("Hydration complete", Stats, m_Config.ModuleId, StorageSource, ServerStateDir); + LogHydrateSummary("Hydration complete"sv, Stats, m_Config.ModuleId, StorageSource, ServerStateDir); return StateObject; } @@ -1182,25 +2151,42 @@ namespace hydration_impl { ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); - LogHydrateSummary("Hydration failed", Stats, m_Config.ModuleId, StorageSource, ServerStateDir); + LogHydrateSummary("Hydration failed"sv, Stats, m_Config.ModuleId, StorageSource, ServerStateDir); return {}; } } void IncrementalHydrator::Obliterate() { + ZEN_TRACE_CPU("IncrementalHydrator::Obliterate"); const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); - try - { + auto TryDeleteBackend = [&]() { ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); m_Storage->Delete(Work, *m_Threading.WorkerPool); Work.Wait(); + }; + + try + { + TryDeleteBackend(); } catch (const std::exception& Ex) { - ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what()); + ZEN_WARN("Obliterate backend delete failed for module '{}' (attempt 1/2): {}. Retrying once.", m_Config.ModuleId, Ex.what()); + try + { + TryDeleteBackend(); + } + catch (const std::exception& Ex2) + { + ZEN_WARN( + "Obliterate backend delete failed for module '{}' (attempt 2/2): {}. Proceeding with local cleanup; backend data may " + "remain.", + m_Config.ModuleId, + Ex2.what()); + } } CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); @@ -1243,26 +2229,33 @@ private: uint64_t m_DefaultMultipartChunkSize; }; +HydrationBase::HydrationBase(const Configuration& Config) +{ + using namespace hydration_impl; + CbFieldView ExcludesField = Config.Options["excludes"sv]; + m_Excludes = ExcludesField.HasValue() ? ParseStringArray(ExcludesField) : DefaultExcludes(); +} + /////////////////////////////////////////////////////////////////////////// // Implementations -FileHydration::FileHydration(const Configuration& Config) +FileHydration::FileHydration(const Configuration& Config) : HydrationBase(Config) { if (!Config.TargetSpecification.empty()) { m_StorageRoot = Utf8ToWide(Config.TargetSpecification.substr(hydration_impl::FileStorage::Prefix.length())); if (m_StorageRoot.empty()) { - throw zen::runtime_error("Hydration config 'file' type requires a directory path"); + throw zen::runtime_error("Hydration config 'file' type requires a directory path"sv); } } else { - CbObjectView Settings = Config.Options["settings"].AsObjectView(); - std::string_view Path = Settings["path"].AsString(); + CbObjectView Settings = Config.Options["settings"sv].AsObjectView(); + std::string_view Path = Settings["path"sv].AsString(); if (Path.empty()) { - throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"); + throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"sv); } m_StorageRoot = Utf8ToWide(std::string(Path)); } @@ -1273,14 +2266,14 @@ std::unique_ptr<HydrationStrategyBase> FileHydration::CreateHydrator(const HydrationConfig& Config) { using namespace hydration_impl; - return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId)); + return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId), m_Excludes); } -S3Hydration::S3Hydration(const Configuration& Config) +S3Hydration::S3Hydration(const Configuration& Config) : HydrationBase(Config) { using namespace hydration_impl; - CbObjectView Settings = Config.Options["settings"].AsObjectView(); + CbObjectView Settings = Config.Options["settings"sv].AsObjectView(); std::string_view Spec; if (!Config.TargetSpecification.empty()) { @@ -1289,10 +2282,10 @@ S3Hydration::S3Hydration(const Configuration& Config) } else { - std::string_view Uri = Settings["uri"].AsString(); + std::string_view Uri = Settings["uri"sv].AsString(); if (Uri.empty()) { - throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"); + throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"sv); } Spec = Uri; Spec.remove_prefix(S3Storage::Prefix.size()); @@ -1304,10 +2297,10 @@ S3Hydration::S3Hydration(const Configuration& Config) if (m_Bucket.empty()) { - throw zen::runtime_error("Incremental S3 hydration config requires a bucket name"); + throw zen::runtime_error("Incremental S3 hydration config requires a bucket name"sv); } - std::string Region = std::string(Settings["region"].AsString()); + std::string Region = std::string(Settings["region"sv].AsString()); if (Region.empty()) { Region = GetEnvVariable("AWS_DEFAULT_REGION"); @@ -1322,11 +2315,11 @@ S3Hydration::S3Hydration(const Configuration& Config) } m_Region = std::move(Region); - std::string_view Endpoint = Settings["endpoint"].AsString(); + std::string_view Endpoint = Settings["endpoint"sv].AsString(); if (!Endpoint.empty()) { m_Endpoint = std::string(Endpoint); - m_PathStyle = Settings["path-style"].AsBool(); + m_PathStyle = Settings["path-style"sv].AsBool(); } std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); @@ -1341,7 +2334,7 @@ S3Hydration::S3Hydration(const Configuration& Config) m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); } - m_DefaultMultipartChunkSize = Settings["chunksize"].AsUInt64(S3Storage::DefaultMultipartChunkSize); + m_DefaultMultipartChunkSize = Settings["chunksize"sv].AsUInt64(DefaultMultipartChunkSize); S3ClientOptions ClientOptions; ClientOptions.BucketName = m_Bucket; @@ -1357,6 +2350,11 @@ S3Hydration::S3Hydration(const Configuration& Config) ClientOptions.Credentials = m_Credentials; } ClientOptions.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; + // Retry transient HTTP failures (429 throttle, 503 SlowDown, 5xx, connection errors) at the + // HTTP layer. CurlHttpClient::DoWithRetry uses 100*(Attempt+1) ms linear backoff between + // attempts. Three retries covers brief S3 rate-limit bursts without holding worker threads + // for long under sustained throttle. + ClientOptions.HttpSettings.RetryCount = 3; m_Client = std::make_unique<S3Client>(ClientOptions); } @@ -1365,10 +2363,12 @@ std::unique_ptr<HydrationStrategyBase> S3Hydration::CreateHydrator(const HydrationConfig& Config) { using namespace hydration_impl; - std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}", m_KeyPrefixRoot, Config.ModuleId); + std::string KeyPrefix = + m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}"sv, m_KeyPrefixRoot, Config.ModuleId); return std::make_unique<IncrementalHydrator>( Config, - std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize)); + std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize), + m_Excludes); } std::unique_ptr<HydrationBase> @@ -1386,10 +2386,10 @@ InitHydration(const HydrationBase::Configuration& Config) { return std::make_unique<S3Hydration>(Config); } - throw zen::runtime_error("Unknown hydration strategy: {}", Config.TargetSpecification); + throw zen::runtime_error("Unknown hydration strategy: {}"sv, Config.TargetSpecification); } - std::string_view Type = Config.Options["type"].AsString(); + std::string_view Type = Config.Options["type"sv].AsString(); if (Type == FileStorage::Type) { return std::make_unique<FileHydration>(Config); @@ -1400,9 +2400,9 @@ InitHydration(const HydrationBase::Configuration& Config) } if (!Type.empty()) { - throw zen::runtime_error("Unknown hydration target type '{}'", Type); + throw zen::runtime_error("Unknown hydration target type '{}'"sv, Type); } - throw zen::runtime_error("No hydration target configured"); + throw zen::runtime_error("No hydration target configured"sv); } #if ZEN_WITH_TESTS @@ -1485,6 +2485,34 @@ namespace { } } + // Test fixture that centralizes the common scaffolding for file-backed hydration tests: + // a scratch temp dir containing the server state, the hydration store, and the hydration + // temp dir, plus an initialized FileHydration backend. + struct FileHarness + { + ScopedTemporaryDirectory TempDir; + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + std::unique_ptr<HydrationBase> Hydration; + + FileHarness() + { + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationStore); + CreateDirectories(HydrationTemp); + Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + } + + HydrationConfig MakeConfig(std::string_view ModuleId, HydrationConfig Overrides = {}) const + { + Overrides.ServerStateDir = ServerStateDir; + Overrides.TempDir = HydrationTemp; + Overrides.ModuleId = std::string(ModuleId); + return Overrides; + } + }; + } // namespace TEST_SUITE_BEGIN("server.hydration"); @@ -1495,140 +2523,386 @@ TEST_SUITE_BEGIN("server.hydration"); TEST_CASE("hydration.file.dehydrate_hydrate") { - ScopedTemporaryDirectory TempDir; + FileHarness H; + const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); + constexpr auto ModuleId = "testmodule"; + const auto Config = H.MakeConfig(ModuleId); - std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; - std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; - std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; - CreateDirectories(ServerStateDir); - CreateDirectories(HydrationStore); - CreateDirectories(HydrationTemp); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::exists(H.HydrationStore / ModuleId)); + CHECK(std::filesystem::is_empty(H.ServerStateDir)); - const std::string ModuleId = "testmodule"; - auto TestFiles = CreateSmallTestTree(ServerStateDir); - - auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); +} - HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; +TEST_CASE("hydration.file.hydrate_overwrites_existing_state") +{ + FileHarness H; + const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); + const auto Config = H.MakeConfig("testmodule"); - // Dehydrate: copy server state to file store - Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); - // Verify the module folder exists in the store and ServerStateDir was wiped - CHECK(std::filesystem::exists(HydrationStore / ModuleId)); - CHECK(std::filesystem::is_empty(ServerStateDir)); + // Stale file must be wiped by the rehydrate. + WriteFile(H.ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + H.Hydration->CreateHydrator(Config)->Hydrate(); - // Hydrate: restore server state from file store - Hydration->CreateHydrator(Config)->Hydrate(); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "stale.bin")); + VerifyTree(H.ServerStateDir, TestFiles); +} - // Verify restored contents match the original - VerifyTree(ServerStateDir, TestFiles); +TEST_CASE("hydration.file.excluded_files_not_dehydrated") +{ + FileHarness H; + const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); + + // Files matched by the built-in DefaultExcludes() set in hydration.cpp. Each must be + // skipped during dehydrate and not be recreated by hydrate. + CreateDirectories(H.ServerStateDir / "gc"); + WriteFile(H.ServerStateDir / "gc" / "reserve.gc", CreateSemiRandomBlob(64)); + CreateDirectories(H.ServerStateDir / ".sentry-native"); + WriteFile(H.ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32)); + WriteFile(H.ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128)); + WriteFile(H.ServerStateDir / "state_marker", CreateSemiRandomBlob(16)); + WriteFile(H.ServerStateDir / ".lock", CreateSemiRandomBlob(8)); + WriteFile(H.ServerStateDir / "snapshot.bak", CreateSemiRandomBlob(48)); + CreateDirectories(H.ServerStateDir / "auth"); + WriteFile(H.ServerStateDir / "auth" / "authstate", CreateSemiRandomBlob(96)); + + const auto Config = H.MakeConfig("testmodule_excl"); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + CleanDirectory(H.ServerStateDir, true); + H.Hydration->CreateHydrator(Config)->Hydrate(); + + VerifyTree(H.ServerStateDir, TestFiles); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "gc" / "reserve.gc")); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / ".sentry-native")); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "state_marker")); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / ".lock")); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "snapshot.bak")); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "auth" / "authstate")); } -TEST_CASE("hydration.file.hydrate_overwrites_existing_state") +TEST_CASE("hydration.options.excludes_override") { + // Explicit `excludes` replaces the built-in default list outright; `.lock` is not + // in the override list, so it must appear in the manifest. Use the Options-only + // path (type + settings.path) so the same Options object also carries the override + // `excludes` array. ScopedTemporaryDirectory TempDir; - - std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; - std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; - std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + std::filesystem::path ServerStateDir = TempDir.Path() / "state"; + std::filesystem::path HydrationStore = TempDir.Path() / "store"; + std::filesystem::path HydrationTemp = TempDir.Path() / "tmp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); + WriteFile(ServerStateDir / "regular.bin", CreateSemiRandomBlob(64)); + WriteFile(ServerStateDir / ".lock", CreateSemiRandomBlob(8)); - auto TestFiles = CreateSmallTestTree(ServerStateDir); - - auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); - - HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule"}; + CbObjectWriter Options; + Options << "type"sv + << "file"sv; + Options.BeginObject("settings"sv); + { + Options << "path"sv << HydrationStore.generic_string(); + } + Options.EndObject(); + Options.BeginArray("excludes"sv); + { + Options << "*.tmp"sv; + } + Options.EndArray(); - Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + HydrationBase::Configuration HydrCfg{.Options = Options.Save()}; + std::unique_ptr<HydrationBase> Hydration = InitHydration(HydrCfg); + HydrationConfig PerModuleCfg{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "excl_off"}; + Hydration->CreateHydrator(PerModuleCfg)->Dehydrate(CbObject()); - // Put a stale file in ServerStateDir to simulate leftover state - WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + const std::filesystem::path StateFile = HydrationStore / "excl_off" / "current-state.cbo"; + REQUIRE(std::filesystem::exists(StateFile)); - // Hydrate - must wipe stale file and restore original - Hydration->CreateHydrator(Config)->Hydrate(); + FileContents Contents = ReadFile(StateFile); + REQUIRE(Contents); + IoBuffer Payload = Contents.Flatten(); + CbValidateError Err; + CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Payload), Err); + REQUIRE_EQ(Err, CbValidateError::None); - CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin")); - VerifyTree(ServerStateDir, TestFiles); + bool HasLock = false; + for (CbFieldView F : Meta["Files"sv]) + { + if (F.AsObjectView()["Path"sv].AsString() == ".lock") + { + HasLock = true; + break; + } + } + CHECK(HasLock); } -TEST_CASE("hydration.file.excluded_files_not_dehydrated") +// --------------------------------------------------------------------------- +// FileHydrator obliterate test +// --------------------------------------------------------------------------- + +TEST_CASE("hydration.file.obliterate") { - ScopedTemporaryDirectory TempDir; + FileHarness H; + constexpr std::string_view ModuleId = "obliterate_test"sv; + CreateSmallTestTree(H.ServerStateDir); + const auto Config = H.MakeConfig(ModuleId); - std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; - std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; - std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; - CreateDirectories(ServerStateDir); - CreateDirectories(HydrationStore); - CreateDirectories(HydrationTemp); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::exists(H.HydrationStore / ModuleId)); - auto TestFiles = CreateSmallTestTree(ServerStateDir); + // Put files back in ServerStateDir + TempDir to verify cleanup. + CreateSmallTestTree(H.ServerStateDir); + WriteFile(H.HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); - // Add files that the dehydrator should skip - WriteFile(ServerStateDir / "reserve.gc", CreateSemiRandomBlob(64)); - CreateDirectories(ServerStateDir / ".sentry-native"); - WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32)); - WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128)); + H.Hydration->CreateHydrator(Config)->Obliterate(); - auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + CHECK_FALSE(std::filesystem::exists(H.HydrationStore / ModuleId)); + CHECK(std::filesystem::is_empty(H.ServerStateDir)); + CHECK(std::filesystem::is_empty(H.HydrationTemp)); +} - HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule_excl"}; +// --------------------------------------------------------------------------- +// Pack tests - exercise small-file packing, unpacking, and fallback paths. +// --------------------------------------------------------------------------- - Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); +TEST_CASE("hydration.file.pack_roundtrip") +{ + // CreateSmallTestTree produces 6 files all < 2 KB -> single pack with every file in it. + FileHarness H; + const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); + const auto Config = H.MakeConfig("pack_roundtrip"); + + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::is_empty(H.ServerStateDir)); + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); +} - // Hydrate into a clean directory - CleanDirectory(ServerStateDir, true); - Hydration->CreateHydrator(Config)->Hydrate(); +TEST_CASE("hydration.file.pack_disabled_fallback") +{ + // PackEnabled=false -> every file is a standalone CAS entry regardless of size. + FileHarness H; + const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); + const auto Config = H.MakeConfig("pack_disabled", HydrationConfig{.PackEnabled = false}); + + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); +} - // Normal files must be restored - VerifyTree(ServerStateDir, TestFiles); - // Excluded files must NOT be restored - CHECK_FALSE(std::filesystem::exists(ServerStateDir / "reserve.gc")); - CHECK_FALSE(std::filesystem::exists(ServerStateDir / ".sentry-native")); +TEST_CASE("hydration.file.pack_one_unique_fallback") +{ + // Only 1 unique small-file candidate -> no pack (min 2 entries); falls back to standalone. + FileHarness H; + + std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles; + IoBuffer Small = CreateSemiRandomBlob(128); + WriteFile(H.ServerStateDir / "tiny.bin", Small); + TestFiles.emplace_back("tiny.bin", std::move(Small)); + + IoBuffer Big = CreateSemiRandomBlob(8192); + WriteFile(H.ServerStateDir / "big.bin", Big); + TestFiles.emplace_back("big.bin", std::move(Big)); + + const auto Config = H.MakeConfig("pack_one_unique"); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); } -// --------------------------------------------------------------------------- -// FileHydrator obliterate test -// --------------------------------------------------------------------------- +TEST_CASE("hydration.file.pack_duplicate_hashes") +{ + // 10 files share one hash + 1 distinct file -> pack has 2 unique entries; hydrate writes + // all 11 destinations correctly. + FileHarness H; + + IoBuffer Shared = CreateSemiRandomBlob(256); + IoBuffer Other = CreateSemiRandomBlob(256); + std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles; + for (int I = 0; I < 10; ++I) + { + std::filesystem::path Rel = fmt::format("dup_{:02d}.bin"sv, I); + WriteFile(H.ServerStateDir / Rel, Shared); + TestFiles.emplace_back(Rel, Shared); + } + WriteFile(H.ServerStateDir / "other.bin", Other); + TestFiles.emplace_back("other.bin", Other); + + const auto Config = H.MakeConfig("pack_duplicates"); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); +} -TEST_CASE("hydration.file.obliterate") +TEST_CASE("hydration.file.pack_large_dataset") { - ScopedTemporaryDirectory TempDir; + // Mix of many small files + a few large ones, with a modest MaxPackBytes + // to force bin-packing into multiple packs. Verifies ordering, splitting, and the + // interaction between packed and standalone uploads. + FileHarness H; + + std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles; + constexpr int kSmallCount = 100; + constexpr int kLargeCount = 3; + + // Varied small-file sizes (256-2048 B) avoid artificial uniformity in the bin-pack. + FastRandom Rand{.Seed = 0xcafebabe}; + for (int I = 0; I < kSmallCount; ++I) + { + uint64_t Size = 256 + (Rand.Next() % 1793); // [256, 2048] + IoBuffer Blob = CreateSemiRandomBlob(Rand, Size); + auto Rel = std::filesystem::path(fmt::format("small/group{}/file_{:04d}.bin"sv, I / 25, I)); + CreateDirectories((H.ServerStateDir / Rel).parent_path()); + WriteFile(H.ServerStateDir / Rel, Blob); + TestFiles.emplace_back(std::move(Rel), std::move(Blob)); + } + for (int I = 0; I < kLargeCount; ++I) + { + IoBuffer Blob = CreateSemiRandomBlob(Rand, 32 * 1024 + I * 4096); + auto Rel = std::filesystem::path(fmt::format("large/file_{:02d}.bulk"sv, I)); + CreateDirectories((H.ServerStateDir / Rel).parent_path()); + WriteFile(H.ServerStateDir / Rel, Blob); + TestFiles.emplace_back(std::move(Rel), std::move(Blob)); + } + + // Cap each pack at ~32 KB -> 100 small files (~115 KB raw) split across ~4 packs. + const auto Config = H.MakeConfig("pack_large", HydrationConfig{.MaxPackBytes = 32 * 1024}); + + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::is_empty(H.ServerStateDir)); + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); +} - std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; - std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; - std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; - CreateDirectories(ServerStateDir); - CreateDirectories(HydrationStore); - CreateDirectories(HydrationTemp); +TEST_CASE("hydration.file.pack_hash_determinism") +{ + // Two independent dehydrate runs over the same content must produce byte-identical state + // files (and therefore identical pack hashes). This is what keeps ExistsLookup dedup + // working across redeploys. + FileHarness H; + + FastRandom Rand{.Seed = 0x12345678}; + std::vector<std::pair<std::filesystem::path, IoBuffer>> Files; + for (int I = 0; I < 40; ++I) + { + IoBuffer Blob = CreateSemiRandomBlob(Rand, 256 + (I % 7) * 200); + auto Rel = std::filesystem::path(fmt::format("tree/leaf_{:02d}.dat"sv, I)); + CreateDirectories((H.ServerStateDir / Rel).parent_path()); + WriteFile(H.ServerStateDir / Rel, Blob); + Files.emplace_back(std::move(Rel), std::move(Blob)); + } + + const auto Config = H.MakeConfig("pack_determinism"); + const std::filesystem::path StateFile = H.HydrationStore / "pack_determinism" / "current-state.cbo"; + + // Extract the ordered pack-hash list from state.cbo. Timestamp / duration fields vary + // across runs so byte-identity is not achievable; the pack identities are. + auto ReadPackHashes = [&]() -> std::vector<IoHash> { + FileContents Contents = ReadFile(StateFile); + REQUIRE(Contents); + IoBuffer Payload = Contents.Flatten(); + CbValidateError Err; + CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Payload), Err); + REQUIRE_EQ(Err, CbValidateError::None); + std::vector<IoHash> Hashes; + for (CbFieldView F : Meta["Packs"sv]) + { + Hashes.push_back(F.AsObjectView()["Hash"sv].AsHash()); + } + return Hashes; + }; - const std::string ModuleId = "obliterate_test"; - CreateSmallTestTree(ServerStateDir); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + std::vector<IoHash> First = ReadPackHashes(); + REQUIRE_FALSE(First.empty()); - auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + // Rehydrate so the tree is back on disk, then dehydrate again with a fresh hydrator. + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, Files); - HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; + auto HydrationB = InitHydration({.TargetSpecification = "file://" + H.HydrationStore.string()}); + HydrationB->CreateHydrator(Config)->Dehydrate(CbObject()); + std::vector<IoHash> Second = ReadPackHashes(); - // Dehydrate so the backend store has data - Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); - CHECK(std::filesystem::exists(HydrationStore / ModuleId)); + REQUIRE_EQ(First.size(), Second.size()); + for (size_t I = 0; I < First.size(); ++I) + { + CHECK_EQ(First[I], Second[I]); + } +} - // Put some files back in ServerStateDir and TempDir to verify cleanup - CreateSmallTestTree(ServerStateDir); - WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); +TEST_CASE("hydration.file.pack_backward_compat_read") +{ + // Hand-craft a state.cbo without any Packs[] / PackHash fields (old format). Hydrate must + // treat every file as standalone and roundtrip successfully. + FileHarness H; + const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); + const auto Config = H.MakeConfig("pack_oldformat", HydrationConfig{.PackEnabled = false}); + + // Dehydrate with PackEnabled=false -> state.cbo has no Packs[] and no PackHash fields. + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::is_empty(H.ServerStateDir)); + + // Hydrate with PackEnabled=true -> the hydrator must still handle the old-format state. + const auto NewConfig = H.MakeConfig("pack_oldformat"); + H.Hydration->CreateHydrator(NewConfig)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); +} - // Obliterate - Hydration->CreateHydrator(Config)->Obliterate(); +// --------------------------------------------------------------------------- +// CreateParentDirectories helper test +// --------------------------------------------------------------------------- - // Backend store directory deleted - CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId)); - // ServerStateDir cleaned - CHECK(std::filesystem::is_empty(ServerStateDir)); - // TempDir cleaned - CHECK(std::filesystem::is_empty(HydrationTemp)); +TEST_CASE("hydration.createparentdirectories") +{ + ScopedTemporaryDirectory TempDir; + const std::filesystem::path Root = TempDir.Path(); + + // Edge: empty input. + CHECK_EQ(hydration_impl::CreateParentDirectories({}), 0u); + + // Edge: bare filename has no parent_path() -> contributes nothing. + std::vector<std::filesystem::path> Bare{"bare.bin"}; + CHECK_EQ(hydration_impl::CreateParentDirectories(Bare), 0u); + + // Edge: single input. Triggers the Dirs.size() == 1 path that bypasses the prune loop. + const std::filesystem::path SingleRoot = Root / "single"; + std::vector<std::filesystem::path> Single{SingleRoot / "only" / "a.bin"}; + CHECK_EQ(hydration_impl::CreateParentDirectories(Single), 1u); + CHECK(std::filesystem::is_directory(SingleRoot / "only")); + + // Edge: pre-existing dirs must not raise; count still reflects leaf set. + const std::filesystem::path PreRoot = Root / "preexisting"; + CreateDirectories(PreRoot / "pre" / "made"); + std::vector<std::filesystem::path> Pre{PreRoot / "pre" / "made" / "f.bin"}; + CHECK_EQ(hydration_impl::CreateParentDirectories(Pre), 1u); + CHECK(std::filesystem::is_directory(PreRoot / "pre" / "made")); + + // Generic: ancestor-chain pruning, parent dedup across files in same dir, disjoint + // siblings (cannot prune each other), nested-vs-flat coexistence. Expected leaves: + // deep/nest/leaf, deep/sibling, flat, lone. + const std::filesystem::path MixRoot = Root / "mixed"; + std::vector<std::filesystem::path> Mix{MixRoot / "deep" / "nest" / "leaf" / "x.bin", + MixRoot / "deep" / "nest" / "leaf" / "y.bin", // shares parent with x + MixRoot / "deep" / "nest" / "g.bin", // ancestor of leaf -> pruned + MixRoot / "deep" / "h.bin", // ancestor of nest -> pruned + MixRoot / "deep" / "sibling" / "i.bin", // sibling of nest -> kept + MixRoot / "flat" / "j.bin", // top-level sibling -> kept + MixRoot / "lone" / "k.bin"}; // disjoint -> kept + CHECK_EQ(hydration_impl::CreateParentDirectories(Mix), 4u); + CHECK(std::filesystem::is_directory(MixRoot / "deep" / "nest" / "leaf")); + CHECK(std::filesystem::is_directory(MixRoot / "deep" / "sibling")); + CHECK(std::filesystem::is_directory(MixRoot / "flat")); + CHECK(std::filesystem::is_directory(MixRoot / "lone")); + // Pruned ancestors still exist via CreateDirectories recursion. + CHECK(std::filesystem::is_directory(MixRoot / "deep" / "nest")); + CHECK(std::filesystem::is_directory(MixRoot / "deep")); } // --------------------------------------------------------------------------- @@ -1658,7 +2932,7 @@ TEST_CASE("hydration.file.concurrent") for (int I = 0; I < kModuleCount; ++I) { - std::string ModuleId = fmt::format("file_concurrent_{}", I); + std::string ModuleId = fmt::format("file_concurrent_{}"sv, I); std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); @@ -1817,7 +3091,7 @@ TEST_CASE("hydration.s3.concurrent") for (int I = 0; I < kModuleCount; ++I) { - std::string ModuleId = fmt::format("s3_concurrent_{}", I); + std::string ModuleId = fmt::format("s3_concurrent_{}"sv, I); std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); @@ -1890,7 +3164,7 @@ TEST_CASE("hydration.s3.obliterate") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - const std::string ModuleId = "s3test_obliterate"; + constexpr std::string_view ModuleId = "s3test_obliterate"sv; HydrationBase::Configuration BaseConfig; { @@ -1904,7 +3178,7 @@ TEST_CASE("hydration.s3.obliterate") } auto Hydration = InitHydration(BaseConfig); - HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = std::string(ModuleId)}; // Dehydrate to populate backend CreateSmallTestTree(ServerStateDir); @@ -1918,7 +3192,7 @@ TEST_CASE("hydration.s3.obliterate") Opts.Credentials.AccessKeyId = Minio.RootUser(); Opts.Credentials.SecretAccessKey = Minio.RootPassword(); S3Client Client(Opts); - return Client.ListObjects(ModuleId + "/"); + return Client.ListObjects(fmt::format("{}/"sv, ModuleId)); }; // Verify objects exist in S3 @@ -2034,7 +3308,7 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - const std::string ModuleId = "s3test_performance"; + constexpr std::string_view ModuleId = "s3test_performance"sv; CopyTree("E:\\Dev\\hub\\brainrot\\20260402-225355-508", ServerStateDir, {.EnableClone = true}); // auto TestFiles = CreateTestTree(ServerStateDir); @@ -2054,7 +3328,7 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, - .ModuleId = ModuleId, + .ModuleId = std::string(ModuleId), .Threading = Threading.Options}; // Dehydrate: upload server state to MinIO @@ -2091,7 +3365,7 @@ TEST_CASE("hydration.file.incremental") CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); - const std::string ModuleId = "testmodule"; + constexpr std::string_view ModuleId = "testmodule"sv; // auto TestFiles = CreateTestTree(ServerStateDir); TestThreading Threading(4); @@ -2100,7 +3374,7 @@ TEST_CASE("hydration.file.incremental") HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, - .ModuleId = ModuleId, + .ModuleId = std::string(ModuleId), .Threading = Threading.Options}; // Hydrate with no prior state @@ -2170,7 +3444,7 @@ TEST_CASE("hydration.s3.incremental") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - const std::string ModuleId = "s3test_incremental"; + constexpr std::string_view ModuleId = "s3test_incremental"sv; TestThreading Threading(8); @@ -2188,7 +3462,7 @@ TEST_CASE("hydration.s3.incremental") HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, - .ModuleId = ModuleId, + .ModuleId = std::string(ModuleId), .Threading = Threading.Options}; // Hydrate with no prior state diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h index 0455dda91..d9a3dda5b 100644 --- a/src/zenserver/hub/hydration.h +++ b/src/zenserver/hub/hydration.h @@ -2,6 +2,8 @@ #pragma once +#include "hydrationdefaults.h" + #include <zencore/compactbinary.h> #include <atomic> @@ -9,6 +11,7 @@ #include <memory> #include <optional> #include <string> +#include <vector> namespace zen { @@ -32,6 +35,20 @@ struct HydrationConfig // External threading for parallel I/O and hashing. If not set, work runs inline on the caller's thread. std::optional<ThreadingOptions> Threading; + + // When true, small files are concatenated into pack blobs during Dehydrate and sliced + // back out during Hydrate. Pack contents are stored raw (no compression); the CAS key + // of a pack is the hash of its concatenated raw bytes. Dramatically reduces request + // count for modules dominated by tiny metadata files. + bool PackEnabled = true; + + // Files strictly smaller than this are candidates for packing. + uint64_t PackThresholdBytes = DefaultPackThresholdBytes; + + // Upper bound on the concatenation size of a single pack. Candidates are bin-packed + // greedily; a pack that would exceed this is closed and a new one is started. A single + // unique candidate larger than this falls back to standalone upload. + uint64_t MaxPackBytes = DefaultMaxPackBytes; }; /** @@ -46,14 +63,23 @@ struct HydrationStrategyBase virtual ~HydrationStrategyBase() = default; // Upload server state to the configured target. ServerStateDir is wiped on success. - // On failure, ServerStateDir is left intact. + // On failure, ServerStateDir is left intact and the failure is logged at WARN; no + // exception propagates to the caller. Callers that need to distinguish success from + // failure must inspect the log stream or observe downstream effects (e.g. presence of + // the metadata file on the backend) - success is not signalled through the API. virtual void Dehydrate(const CbObject& CachedState) = 0; - // Download state from the configured target into ServerStateDir. Returns cached state for the next Dehydrate. - // On failure, ServerStateDir is wiped and an empty CbObject is returned. + // Download state from the configured target into ServerStateDir. Returns cached state + // for the next Dehydrate. On failure, ServerStateDir is wiped, the failure is logged, + // and an empty CbObject is returned (a no-op cache). Callers can check the result for + // emptiness as a failure indicator. virtual CbObject Hydrate() = 0; - // Delete all stored data for this module from the configured backend, then clean ServerStateDir and TempDir. + // Delete all stored data for this module from the configured backend, then clean + // ServerStateDir and TempDir. Backend-delete failures are retried once; if the retry + // also fails, local cleanup proceeds regardless and the failure is logged at WARN. + // Because backend deletion is best-effort, a return from Obliterate does not guarantee + // backend data is gone. virtual void Obliterate() = 0; }; @@ -73,14 +99,23 @@ public: { // Back-end specific target specification (e.g. "s3://bucket/prefix", "file:///path") std::string TargetSpecification; - // Full config object (mutually exclusive with TargetSpecification) + // Full config object (mutually exclusive with TargetSpecification). Backend-specific + // settings (e.g. S3 "chunksize") live inside Options["settings"]. The common + // `excludes` entry is parsed once by HydrationBase and shared across modules. CbObject Options; }; + // Parses common Options entries (`excludes`) into m_Excludes, applying built-in + // defaults when the field is absent. Field present-but-empty `[]` is honored as an + // explicit override (no defaults applied). + explicit HydrationBase(const Configuration& Config); virtual ~HydrationBase() = default; // Create a configured per-module hydrator, ready to call Hydrate/Dehydrate/Obliterate. virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) = 0; + +protected: + std::vector<std::string> m_Excludes; }; // Factory: parses Config and returns the concrete backend (FileHydration or S3Hydration). diff --git a/src/zenserver/hub/hydrationdefaults.h b/src/zenserver/hub/hydrationdefaults.h new file mode 100644 index 000000000..8d9fb6d41 --- /dev/null +++ b/src/zenserver/hub/hydrationdefaults.h @@ -0,0 +1,26 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <cstdint> + +namespace zen { + +// state.cbo schema version. Bump on any incompatible change to the manifest layout. +// Hydrate refuses to read a manifest with a higher version than this. Manifests written +// by older binaries omit the field; treated as version 0 and decoded best-effort +// (missing optional fields like Packs[] / StorageSettings fall back to defaults). +inline constexpr uint32_t HydrationSchemaVersion = 1; + +// Multipart chunk size used by the S3 backend when hydrating/dehydrating large files. +// Dehydrate writes the chosen size into state.cbo so hydrate replays with the same +// partitioning. State records without the field fall back to this default. +inline constexpr uint64_t DefaultMultipartChunkSize = 64u * 1024u * 1024u; + +// Files strictly smaller than this are candidates for packing. +inline constexpr uint64_t DefaultPackThresholdBytes = 256u * 1024u; + +// Upper bound on the concatenation size of a single pack. Packs are stored raw (no compression). +inline constexpr uint64_t DefaultMaxPackBytes = 4u * 1024u * 1024u; + +} // namespace zen diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp index 9d477fb10..8d36e6a46 100644 --- a/src/zenserver/hub/storageserverinstance.cpp +++ b/src/zenserver/hub/storageserverinstance.cpp @@ -10,6 +10,7 @@ #include <zencore/logging.h> #include <zencore/string.h> #include <zencore/timer.h> +#include <zencore/trace.h> namespace zen { @@ -31,6 +32,7 @@ StorageServerInstance::~StorageServerInstance() void StorageServerInstance::SpawnServerProcess() { + ZEN_TRACE_CPU("StorageServerInstance::SpawnServerProcess"); Stopwatch SpawnTimer; ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); @@ -103,6 +105,7 @@ StorageServerInstance::ShutdownServerProcess() { return; } + ZEN_TRACE_CPU("StorageServerInstance::ShutdownServerProcess"); Stopwatch ShutdownTimer; // m_ServerInstance.Shutdown() never throws. m_ServerInstance.Shutdown(); @@ -131,6 +134,7 @@ StorageServerInstance::ProvisionLocked() return; } + ZEN_TRACE_CPU("StorageServerInstance::ProvisionLocked"); ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_Config.StateDir); try { @@ -150,6 +154,7 @@ StorageServerInstance::ProvisionLocked() void StorageServerInstance::DeprovisionLocked() { + ZEN_TRACE_CPU("StorageServerInstance::DeprovisionLocked"); ShutdownServerProcess(); // Crashed or Hibernated: process already dead; skip Shutdown. @@ -169,6 +174,7 @@ StorageServerInstance::DeprovisionLocked() void StorageServerInstance::ObliterateLocked() { + ZEN_TRACE_CPU("StorageServerInstance::ObliterateLocked"); ShutdownServerProcess(); std::atomic<bool> AbortFlag{false}; @@ -181,6 +187,7 @@ StorageServerInstance::ObliterateLocked() void StorageServerInstance::HibernateLocked() { + ZEN_TRACE_CPU("StorageServerInstance::HibernateLocked"); // Signal server to shut down, but keep data around for later wake ShutdownServerProcess(); } @@ -195,6 +202,8 @@ StorageServerInstance::WakeLocked() return; } + ZEN_TRACE_CPU("StorageServerInstance::WakeLocked"); + try { SpawnServerProcess(); @@ -212,6 +221,12 @@ StorageServerInstance::WakeLocked() void StorageServerInstance::Hydrate() { + if (!m_Config.EnableHydration) + { + ZEN_INFO("Hydration disabled; skipping hydrate for module '{}'", m_ModuleId); + return; + } + ZEN_TRACE_CPU("StorageServerInstance::Hydrate"); std::atomic<bool> AbortFlag{false}; std::atomic<bool> PauseFlag{false}; HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); @@ -222,6 +237,12 @@ StorageServerInstance::Hydrate() void StorageServerInstance::Dehydrate() { + if (!m_Config.EnableDehydration) + { + ZEN_INFO("Dehydration disabled; skipping dehydrate for module '{}'", m_ModuleId); + return; + } + ZEN_TRACE_CPU("StorageServerInstance::Dehydrate"); std::atomic<bool> AbortFlag{false}; std::atomic<bool> PauseFlag{false}; HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); @@ -238,6 +259,9 @@ StorageServerInstance::MakeHydrationConfig(std::atomic<bool>& AbortFlag, std::at Config.Threading.emplace( HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalWorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag}); } + Config.PackEnabled = m_Config.HydrationPackEnabled; + Config.PackThresholdBytes = m_Config.HydrationPackThresholdBytes; + Config.MaxPackBytes = m_Config.HydrationMaxPackBytes; return Config; } diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h index 21ac1ada3..a2f376a23 100644 --- a/src/zenserver/hub/storageserverinstance.h +++ b/src/zenserver/hub/storageserverinstance.h @@ -36,6 +36,11 @@ public: std::string Trace; std::string TraceHost; std::string TraceFile; + bool EnableHydration = true; + bool EnableDehydration = true; + bool HydrationPackEnabled = true; + uint64_t HydrationPackThresholdBytes = DefaultPackThresholdBytes; + uint64_t HydrationMaxPackBytes = DefaultMaxPackBytes; WorkerThreadPool* OptionalWorkerPool = nullptr; }; diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index ebc2cf2f1..27c1c9fc4 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -235,6 +235,44 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("hub", "", + "hub-enable-hydration", + "Load instance state from the hydration target on provision (default true)", + cxxopts::value<bool>(m_ServerOptions.HubEnableHydration)->default_value("true"), + ""); + + Options.add_option("hub", + "", + "hub-enable-dehydration", + "Save instance state to the hydration target on deprovision (default true)", + cxxopts::value<bool>(m_ServerOptions.HubEnableDehydration)->default_value("true"), + ""); + + Options.add_option( + "hub", + "", + "hub-hydration-enable-pack", + "Concatenate small files into raw CAS pack blobs during dehydrate (default true; see --hub-hydration-pack-threshold-bytes)", + cxxopts::value<bool>(m_ServerOptions.HubHydrationPackEnabled)->default_value("true"), + ""); + + Options.add_option("hub", + "", + "hub-hydration-pack-threshold-bytes", + fmt::format("Files strictly smaller than this are pack candidates (default {})", DefaultPackThresholdBytes), + cxxopts::value<uint64_t>(m_ServerOptions.HubHydrationPackThresholdBytes) + ->default_value(fmt::format("{}", DefaultPackThresholdBytes)), + "<bytes>"); + + Options.add_option( + "hub", + "", + "hub-hydration-max-pack-bytes", + fmt::format("Upper bound on a pack's concatenation size (default {})", DefaultMaxPackBytes), + cxxopts::value<uint64_t>(m_ServerOptions.HubHydrationMaxPackBytes)->default_value(fmt::format("{}", DefaultMaxPackBytes)), + "<bytes>"); + + Options.add_option("hub", + "", "hub-watchdog-cycle-interval-ms", "Interval between watchdog cycles in milliseconds", cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleIntervalMs)->default_value("3000"), @@ -367,6 +405,13 @@ ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) Options.AddOption("hub.hydration.targetspec"sv, m_ServerOptions.HydrationTargetSpecification, "hub-hydration-target-spec"sv); Options.AddOption("hub.hydration.targetconfig"sv, m_ServerOptions.HydrationTargetConfigPath, "hub-hydration-target-config"sv); Options.AddOption("hub.hydration.threads"sv, m_ServerOptions.HubHydrationThreadCount, "hub-hydration-threads"sv); + Options.AddOption("hub.enablehydration"sv, m_ServerOptions.HubEnableHydration, "hub-enable-hydration"sv); + Options.AddOption("hub.enabledehydration"sv, m_ServerOptions.HubEnableDehydration, "hub-enable-dehydration"sv); + Options.AddOption("hub.hydration.enablepack"sv, m_ServerOptions.HubHydrationPackEnabled, "hub-hydration-enable-pack"sv); + Options.AddOption("hub.hydration.packthresholdbytes"sv, + m_ServerOptions.HubHydrationPackThresholdBytes, + "hub-hydration-pack-threshold-bytes"sv); + Options.AddOption("hub.hydration.maxpackbytes"sv, m_ServerOptions.HubHydrationMaxPackBytes, "hub-hydration-max-pack-bytes"sv); Options.AddOption("hub.watchdog.cycleintervalms"sv, m_ServerOptions.WatchdogConfig.CycleIntervalMs, "hub-watchdog-cycle-interval-ms"sv); Options.AddOption("hub.watchdog.cycleprocessingbudgetms"sv, @@ -453,7 +498,7 @@ ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId, HubInstanceState PreviousState, HubInstanceState NewState) { - ZEN_UNUSED(PreviousState); + ZEN_INFO("Module '{}' changed state from {} to {}", ModuleId, zen::ToString(PreviousState), zen::ToString(NewState)); if (NewState == HubInstanceState::Deprovisioning || NewState == HubInstanceState::Hibernating) { @@ -661,6 +706,11 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) .InstanceTraceFile = ServerConfig.HubInstanceTraceFile, .InstanceConfigPath = ServerConfig.HubInstanceConfigPath, .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification, + .EnableHydration = ServerConfig.HubEnableHydration, + .EnableDehydration = ServerConfig.HubEnableDehydration, + .HydrationPackEnabled = ServerConfig.HubHydrationPackEnabled, + .HydrationPackThresholdBytes = ServerConfig.HubHydrationPackThresholdBytes, + .HydrationMaxPackBytes = ServerConfig.HubHydrationMaxPackBytes, .WatchDog = { .CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs), diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h index 5e465bb14..6416792a6 100644 --- a/src/zenserver/hub/zenhubserver.h +++ b/src/zenserver/hub/zenhubserver.h @@ -3,6 +3,7 @@ #pragma once #include "hubinstancestate.h" +#include "hydrationdefaults.h" #include "resourcemetrics.h" #include "zenserver.h" @@ -47,7 +48,12 @@ struct ZenHubServerConfig : public ZenServerConfig uint16_t HubBasePortNumber = 21000; int HubInstanceLimit = 1000; bool HubUseJobObject = true; - std::string HubInstanceHttpClass = "asio"; + bool HubEnableHydration = true; // Load instance state from hydration target on provision + bool HubEnableDehydration = true; // Save instance state to hydration target on deprovision + bool HubHydrationPackEnabled = true; // Concatenate small files into raw CAS pack blobs during dehydrate + uint64_t HubHydrationPackThresholdBytes = DefaultPackThresholdBytes; // Files strictly smaller than this are pack candidates + uint64_t HubHydrationMaxPackBytes = DefaultMaxPackBytes; // Upper bound on a pack's concatenation size + std::string HubInstanceHttpClass = "asio"; std::string HubInstanceMalloc; std::string HubInstanceTrace; std::string HubInstanceTraceHost; diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp index f8bed92da..ab80cfcc7 100644 --- a/src/zenutil/cloud/s3client.cpp +++ b/src/zenutil/cloud/s3client.cpp @@ -148,11 +148,31 @@ namespace { return true; } + /// True if the response indicates S3 throttling (503 SlowDown / ServiceUnavailable / 429). + /// Code is checked on both the HTTP status and the XML error code so we catch proxies that + /// return 200 with a SlowDown body. + bool IsS3Throttled(const HttpClient::Response& Response, std::string_view ErrorCode) + { + const int Status = static_cast<int>(Response.StatusCode); + if (Status == 503 || Status == 429) + { + return true; + } + if (ErrorCode == "SlowDown" || ErrorCode == "ServiceUnavailable" || ErrorCode == "ThrottlingException" || + ErrorCode == "RequestLimitExceeded" || ErrorCode == "TooManyRequests") + { + return true; + } + return false; + } + /// Build a human-readable error message for a failed S3 response. When the response body /// contains an S3 `<Error>` element, the Code and Message fields are included in the string /// so transient 4xx/5xx failures (SignatureDoesNotMatch, AuthorizationHeaderMalformed, etc.) /// show up in logs instead of being swallowed. Falls back to the generic HTTP/transport /// message when no XML body is available (HEAD responses, transport errors). + /// Also emits a distinct `S3 THROTTLED` warning when the response indicates throttling so + /// callers can grep for it without parsing combined error text. std::string S3ErrorMessage(std::string_view Prefix, const HttpClient::Response& Response) { if (!Response.Error.has_value() && Response.ResponsePayload) @@ -164,9 +184,21 @@ namespace { { ExtendableStringBuilder<256> Decoded; DecodeXmlEntities(Message, Decoded); + if (IsS3Throttled(Response, Code)) + { + ZEN_WARN("S3 THROTTLED [{}] status={} code='{}' message='{}'", + Prefix, + static_cast<int>(Response.StatusCode), + Code, + Decoded.ToView()); + } return fmt::format("{}: HTTP status ({}) {} - {}", Prefix, static_cast<int>(Response.StatusCode), Code, Decoded.ToView()); } } + if (IsS3Throttled(Response, {})) + { + ZEN_WARN("S3 THROTTLED [{}] status={} (no XML body)", Prefix, static_cast<int>(Response.StatusCode)); + } return Response.ErrorMessage(Prefix); } |