aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-04-27 11:14:09 +0200
committerGitHub Enterprise <[email protected]>2026-04-27 11:14:09 +0200
commit753ab4e89b9a5952e50bc77d404198520b362a3a (patch)
tree39dbaad8389677981281b8c1585ac846251539f0 /src
parentfix crash when scavenging sequences or copying local chunks (#1013) (diff)
downloadarchived-zen-753ab4e89b9a5952e50bc77d404198520b362a3a.tar.xz
archived-zen-753ab4e89b9a5952e50bc77d404198520b362a3a.zip
hydration with pack (#1016)
- Feature: Hub hydration packs small files into raw CAS pack blobs to reduce request count for modules dominated by tiny metadata files - `--hub-hydration-enable-pack` (Lua: `hub.hydration.enablepack`, default true) - `--hub-hydration-pack-threshold-bytes` (Lua: `hub.hydration.packthresholdbytes`, default 256 KiB) - `--hub-hydration-max-pack-bytes` (Lua: `hub.hydration.maxpackbytes`, default 4 MiB) - Feature: Hub hydration and dehydration can be disabled per direction - `--hub-enable-hydration` (Lua: `hub.enablehydration`, default true) - `--hub-enable-dehydration` (Lua: `hub.enabledehydration`, default true) - Feature: Hub hydration accepts a configurable file exclude list via `HydrationOptions` `excludes` (array of wildcards). Built-in defaults skip transient runtime files (`.lock`, `.sentry-native/*`, `state_marker`, `*.bak`, `gc/reserve.gc`, `auth/*`) so they no longer participate in dehydrate scans. Override semantics: a present field replaces the default outright; explicit `[]` opts out of all defaults. - Improvement: Hub hydration completion logs now report per-request average and max latency, peak in-flight workers, queue wait, and hash-cache hit percentage; loose and pack-blob transfers are reported separately - Improvement: Hub hydration pre-creates unique parent directories before scheduling parallel writes - Improvement: S3 hydration retries transient HTTP failures (timeouts, 429 throttling, 5xx server errors, connection errors) up to 3 times via the HTTP client retry layer - Improvement: S3 hydration multipart chunk size is persisted in `state.cbo` per module so hydrate replays the partitioning used at dehydrate; default raised to 64 MiB (was 32 MiB) - Improvement: Hub hydration `Obliterate` retries backend delete once before falling back to local cleanup
Diffstat (limited to 'src')
-rw-r--r--src/zencore/include/zencore/string.h6
-rw-r--r--src/zencore/string.cpp62
-rw-r--r--src/zenserver/hub/hub.cpp68
-rw-r--r--src/zenserver/hub/hub.h5
-rw-r--r--src/zenserver/hub/hydration.cpp2246
-rw-r--r--src/zenserver/hub/hydration.h45
-rw-r--r--src/zenserver/hub/hydrationdefaults.h26
-rw-r--r--src/zenserver/hub/storageserverinstance.cpp24
-rw-r--r--src/zenserver/hub/storageserverinstance.h5
-rw-r--r--src/zenserver/hub/zenhubserver.cpp52
-rw-r--r--src/zenserver/hub/zenhubserver.h8
-rw-r--r--src/zenutil/cloud/s3client.cpp32
12 files changed, 2063 insertions, 516 deletions
diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h
index a1c7a3914..7ca2afc69 100644
--- a/src/zencore/include/zencore/string.h
+++ b/src/zencore/include/zencore/string.h
@@ -742,6 +742,7 @@ size_t NiceBytesToBuffer(uint64_t Num, std::span<char> Buffer);
size_t NiceByteRateToBuffer(uint64_t Num, uint64_t ms, std::span<char> Buffer);
size_t NiceLatencyNsToBuffer(uint64_t NanoSeconds, std::span<char> Buffer);
size_t NiceTimeSpanMsToBuffer(uint64_t Milliseconds, std::span<char> Buffer);
+size_t NiceTimeSpanUsToBuffer(uint64_t Microseconds, std::span<char> Buffer);
struct NiceBase
{
@@ -803,6 +804,11 @@ struct NiceTimeSpanMs : public NiceBase
inline NiceTimeSpanMs(uint64_t Milliseconds) { NiceTimeSpanMsToBuffer(Milliseconds, m_Buffer); }
};
+struct NiceTimeSpanUs : public NiceBase
+{
+ inline NiceTimeSpanUs(uint64_t Microseconds) { NiceTimeSpanUsToBuffer(Microseconds, m_Buffer); }
+};
+
//////////////////////////////////////////////////////////////////////////
inline std::string
diff --git a/src/zencore/string.cpp b/src/zencore/string.cpp
index 34519b83b..4072aec56 100644
--- a/src/zencore/string.cpp
+++ b/src/zencore/string.cpp
@@ -482,6 +482,24 @@ NiceTimeSpanMsToBuffer(uint64_t Millis, std::span<char> Buffer)
}
}
+size_t
+NiceTimeSpanUsToBuffer(uint64_t Micros, std::span<char> Buffer)
+{
+ if (Micros < 1000)
+ {
+ return snprintf(Buffer.data(), Buffer.size(), "%" PRIu64 "us", Micros);
+ }
+ else if (Micros < 10000)
+ {
+ return snprintf(Buffer.data(), Buffer.size(), "%.2fms", Micros / 1000.0);
+ }
+ else if (Micros < 1000000)
+ {
+ return snprintf(Buffer.data(), Buffer.size(), "%.1fms", Micros / 1000.0);
+ }
+ return NiceTimeSpanMsToBuffer(Micros / 1000, Buffer);
+}
+
//////////////////////////////////////////////////////////////////////////
template<typename C>
@@ -850,6 +868,13 @@ TEST_CASE("niceNum")
NiceNumGeneral(100000000000000, Buffer, kNicenumTime);
CHECK(StringEquals(Buffer, "100000s"));
+
+ // floor() instead of round(): 999.5us must NOT render as "1000us".
+ NiceNumGeneral(999500, Buffer, kNicenumTime);
+ CHECK(StringEquals(Buffer, "999us"));
+
+ NiceNumGeneral(999999, Buffer, kNicenumTime);
+ CHECK(StringEquals(Buffer, "999us"));
}
SUBCASE("bytes")
@@ -917,6 +942,43 @@ TEST_CASE("niceNum")
NiceTimeSpanMsToBuffer(360000000, Buffer);
CHECK(StringEquals(Buffer, "100h00m"));
}
+
+ SUBCASE("timespan_us")
+ {
+ NiceTimeSpanUsToBuffer(0, Buffer);
+ CHECK(StringEquals(Buffer, "0us"));
+
+ NiceTimeSpanUsToBuffer(1, Buffer);
+ CHECK(StringEquals(Buffer, "1us"));
+
+ NiceTimeSpanUsToBuffer(999, Buffer);
+ CHECK(StringEquals(Buffer, "999us"));
+
+ NiceTimeSpanUsToBuffer(1000, Buffer);
+ CHECK(StringEquals(Buffer, "1.00ms"));
+
+ NiceTimeSpanUsToBuffer(1500, Buffer);
+ CHECK(StringEquals(Buffer, "1.50ms"));
+
+ NiceTimeSpanUsToBuffer(9999, Buffer);
+ CHECK(StringEquals(Buffer, "10.00ms")); // %.2f rounds 9.999 -> 10.00
+
+ NiceTimeSpanUsToBuffer(10000, Buffer);
+ CHECK(StringEquals(Buffer, "10.0ms"));
+
+ NiceTimeSpanUsToBuffer(999500, Buffer);
+ CHECK(StringEquals(Buffer, "999.5ms"));
+
+ NiceTimeSpanUsToBuffer(999999, Buffer);
+ CHECK(StringEquals(Buffer, "1000.0ms")); // boundary just below 1s
+
+ // >=1s delegates to NiceTimeSpanMs.
+ NiceTimeSpanUsToBuffer(1000000, Buffer);
+ CHECK(StringEquals(Buffer, "1.00s"));
+
+ NiceTimeSpanUsToBuffer(60000000, Buffer);
+ CHECK(StringEquals(Buffer, "1m00s"));
+ }
}
TEST_CASE("StringBuilder")
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index c03c1a9a0..4ae8d0457 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -11,6 +11,7 @@
#include <zencore/scopeguard.h>
#include <zencore/string.h>
#include <zencore/timer.h>
+#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpclient.h>
@@ -248,6 +249,7 @@ Hub::~Hub()
void
Hub::Shutdown()
{
+ ZEN_TRACE_CPU("Hub::Shutdown");
ZEN_INFO("Hub service shutting down, deprovisioning any current instances");
bool Expected = false;
@@ -299,6 +301,7 @@ Hub::Shutdown()
Hub::Response
Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
{
+ ZEN_TRACE_CPU("Hub::Provision");
ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
bool IsNewInstance = false;
@@ -328,17 +331,22 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
auto NewInstance = std::make_unique<StorageServerInstance>(
m_RunEnvironment,
*m_Hydration,
- StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
- .StateDir = m_RunEnvironment.CreateChildDir(ModuleId),
- .TempDir = m_HydrationTempPath / ModuleId,
- .HttpThreadCount = m_Config.InstanceHttpThreadCount,
- .CoreLimit = m_Config.InstanceCoreLimit,
- .ConfigPath = m_Config.InstanceConfigPath,
- .Malloc = m_Config.InstanceMalloc,
- .Trace = m_Config.InstanceTrace,
- .TraceHost = m_Config.InstanceTraceHost,
- .TraceFile = m_Config.InstanceTraceFile,
- .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool},
+ StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
+ .StateDir = m_RunEnvironment.CreateChildDir(ModuleId),
+ .TempDir = m_HydrationTempPath / ModuleId,
+ .HttpThreadCount = m_Config.InstanceHttpThreadCount,
+ .CoreLimit = m_Config.InstanceCoreLimit,
+ .ConfigPath = m_Config.InstanceConfigPath,
+ .Malloc = m_Config.InstanceMalloc,
+ .Trace = m_Config.InstanceTrace,
+ .TraceHost = m_Config.InstanceTraceHost,
+ .TraceFile = m_Config.InstanceTraceFile,
+ .EnableHydration = m_Config.EnableHydration,
+ .EnableDehydration = m_Config.EnableDehydration,
+ .HydrationPackEnabled = m_Config.HydrationPackEnabled,
+ .HydrationPackThresholdBytes = m_Config.HydrationPackThresholdBytes,
+ .HydrationMaxPackBytes = m_Config.HydrationMaxPackBytes,
+ .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool},
ModuleId);
#if ZEN_PLATFORM_WINDOWS
@@ -511,6 +519,7 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
HubInstanceState OldState,
bool IsNewInstance)
{
+ ZEN_TRACE_CPU("Hub::CompleteProvision");
const std::string ModuleId(Instance.GetModuleId());
const uint16_t Port = Instance.GetBasePort();
std::string BaseUri; // TODO?
@@ -571,6 +580,7 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
Hub::Response
Hub::Deprovision(const std::string& ModuleId)
{
+ ZEN_TRACE_CPU("Hub::Deprovision");
ZEN_ASSERT(!m_ShutdownFlag.load());
return InternalDeprovision(ModuleId, [](ActiveInstance& Instance) {
ZEN_UNUSED(Instance);
@@ -581,6 +591,7 @@ Hub::Deprovision(const std::string& ModuleId)
Hub::Response
Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate)
{
+ ZEN_TRACE_CPU("Hub::InternalDeprovision");
StorageServerInstance::ExclusiveLockedPtr Instance;
size_t ActiveInstanceIndex = (size_t)-1;
{
@@ -694,6 +705,7 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI
Hub::Response
Hub::Obliterate(const std::string& ModuleId)
{
+ ZEN_TRACE_CPU("Hub::Obliterate");
ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
@@ -845,6 +857,7 @@ Hub::Obliterate(const std::string& ModuleId)
void
Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex)
{
+ ZEN_TRACE_CPU("Hub::CompleteObliterate");
const std::string ModuleId(Instance.GetModuleId());
const uint16_t Port = Instance.GetBasePort();
@@ -871,6 +884,7 @@ Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, siz
void
Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
{
+ ZEN_TRACE_CPU("Hub::CompleteDeprovision");
const std::string ModuleId(Instance.GetModuleId());
const uint16_t Port = Instance.GetBasePort();
@@ -938,6 +952,7 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si
Hub::Response
Hub::Hibernate(const std::string& ModuleId)
{
+ ZEN_TRACE_CPU("Hub::Hibernate");
ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
@@ -1051,6 +1066,7 @@ Hub::Hibernate(const std::string& ModuleId)
void
Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
{
+ ZEN_TRACE_CPU("Hub::CompleteHibernate");
const std::string ModuleId(Instance.GetModuleId());
const uint16_t Port = Instance.GetBasePort();
@@ -1074,6 +1090,7 @@ Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size
Hub::Response
Hub::Wake(const std::string& ModuleId)
{
+ ZEN_TRACE_CPU("Hub::Wake");
ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
@@ -1185,6 +1202,7 @@ Hub::Wake(const std::string& ModuleId)
void
Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
{
+ ZEN_TRACE_CPU("Hub::CompleteWake");
const std::string ModuleId(Instance.GetModuleId());
const uint16_t Port = Instance.GetBasePort();
@@ -1402,6 +1420,7 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS
void
Hub::AttemptRecoverInstance(std::string_view ModuleId)
{
+ ZEN_TRACE_CPU("Hub::AttemptRecoverInstance");
StorageServerInstance::ExclusiveLockedPtr Instance;
size_t ActiveInstanceIndex = (size_t)-1;
{
@@ -1506,6 +1525,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
StorageServerInstance::SharedLockedPtr&& LockedInstance,
size_t ActiveInstanceIndex)
{
+ ZEN_TRACE_CPU("Hub::CheckInstanceStatus");
const std::string ModuleId(LockedInstance.GetModuleId());
HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load();
@@ -1645,6 +1665,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
void
Hub::UpdateMachineMetrics()
{
+ ZEN_TRACE_CPU("Hub::UpdateMachineMetrics");
try
{
bool DiskSpaceOk = false;
@@ -1696,6 +1717,7 @@ Hub::WatchDog()
size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0
while (!m_ShutdownFlag.load() && !m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs)))
{
+ ZEN_TRACE_CPU("Hub::WatchDogCycle");
try
{
UpdateMachineMetrics();
@@ -2755,18 +2777,18 @@ TEST_CASE("hub.instance.inactivity.deprovision")
auto PokeInstance = [&](uint16_t Port) {
// Make a real storage request to increment the instance's activity sum.
// The watchdog detects the changed sum on the next cycle and resets LastActivityTime.
- {
- HttpClient PersistentClient(fmt::format("http://localhost:{}", Port),
- HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200)});
- uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() -
- std::chrono::steady_clock::time_point::min())
- .count();
- IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick));
- const HttpClient::Response PutResult =
- PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key),
- IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive"))));
- CHECK(PutResult);
- }
+ // Per-attempt connect is kept tight (200ms) so a genuinely dead endpoint fails fast;
+ // RetryCount=3 absorbs transient localhost-accept slowness on loaded CI runners.
+ HttpClient PersistentClient(fmt::format("http://localhost:{}", Port),
+ HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200), .RetryCount = 3});
+ uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() -
+ std::chrono::steady_clock::time_point::min())
+ .count();
+ IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick));
+ const HttpClient::Response PutResult =
+ PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key),
+ IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive"))));
+ REQUIRE_MESSAGE(PutResult, "PokeInstance PUT failed - cannot reset activity timer");
};
PokeInstance(IdleInfo.Port);
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index 40d046ce0..1ce9bc876 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -74,6 +74,11 @@ public:
std::filesystem::path InstanceConfigPath;
std::string HydrationTargetSpecification;
CbObject HydrationOptions;
+ bool EnableHydration = true;
+ bool EnableDehydration = true;
+ bool HydrationPackEnabled = true;
+ uint64_t HydrationPackThresholdBytes = DefaultPackThresholdBytes;
+ uint64_t HydrationMaxPackBytes = DefaultMaxPackBytes;
WatchDogConfiguration WatchDog;
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index c7f25bab6..2730ba059 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -10,15 +10,20 @@
#include <zencore/except_fmt.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
+#include <zencore/iohash.h>
#include <zencore/logging.h>
#include <zencore/parallelwork.h>
+#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/system.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zencore/uid.h>
#include <zenutil/cloud/imdscredentials.h>
#include <zenutil/cloud/s3client.h>
#include <zenutil/filesystemutils.h>
+#include <zenutil/wildcard.h>
#include <numeric>
#include <unordered_map>
@@ -34,6 +39,8 @@
namespace zen {
+using namespace std::literals;
+
namespace hydration_impl {
/// UTC time decomposed to calendar fields with sub-second milliseconds.
@@ -76,7 +83,51 @@ namespace hydration_impl {
std::atomic<bool>& PauseFlag,
const std::filesystem::path& Path)
{
- CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0);
+ CleanDirectoryResult Result = CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0);
+ for (const auto& [FailedPath, Ec] : Result.FailedRemovePaths)
+ {
+ ZEN_WARN("Failed to remove '{}' while cleaning '{}': {}", FailedPath, Path, Ec.message());
+ }
+ }
+
+ // Returns true if RelKey matches any wildcard in Excludes. Excluded paths are
+ // dropped at the dehydrate-side directory scan and never enter the manifest.
+ bool IsExcluded(std::string_view RelKey, std::span<const std::string> Excludes)
+ {
+ for (const std::string& W : Excludes)
+ {
+ if (MatchWildcard(W, RelKey, /*CaseSensitive*/ true))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ std::vector<std::string> ParseStringArray(CbFieldView Field)
+ {
+ std::vector<std::string> Out;
+ for (CbFieldView Entry : Field.AsArrayView())
+ {
+ Out.emplace_back(Entry.AsString());
+ }
+ return Out;
+ }
+
+ // Built-in exclude wildcards applied unless the hub config supplies an explicit
+ // `excludes` array (an empty array opts out of all defaults). Patterns match the
+ // dehydrate-side relative path (forward-slash form). `*` is path-separator-agnostic
+ // per zenutil/wildcard.h.
+ std::vector<std::string> DefaultExcludes()
+ {
+ return {
+ ".sentry-native/*", // sentry-native crash uploader DB; locked while child runs
+ "state_marker", // root-level liveness marker (zenstorageserver.cpp)
+ ".lock", // FILE_FLAG_DELETE_ON_CLOSE lock; locked while child runs
+ "*.bak", // transient backups produced by atomic file replace
+ "gc/reserve.gc", // GC disk reserve (gc subdir per zenstorageserver.cpp)
+ "auth/*", // encrypted auth state under auth/
+ };
}
///////////////////////////////////////////////////////////////////////
@@ -90,45 +141,110 @@ namespace hydration_impl {
std::atomic<uint64_t> Bytes{0}; // lambda-side: bytes transferred on successful completion
std::atomic<uint64_t> ElapsedUs{0}; // wall time around Work.Wait()
- RwLock ThreadIdsLock;
- std::unordered_set<int> ThreadIds;
+ // Per-request timing gathered inside the work lambdas. RequestCount + RequestTotalUs are
+ // summed across all completed requests. RequestMaxUs is the slowest single request
+ // observed (CAS-updated in EndRequest); avg vs max gap surfaces tail latency without
+ // keeping per-request samples.
+ std::atomic<uint64_t> RequestCount{0};
+ std::atomic<uint64_t> RequestTotalUs{0};
+ std::atomic<uint64_t> RequestMaxUs{0};
+ std::atomic<uint32_t> InFlight{0};
+ std::atomic<uint32_t> InFlightPeak{0};
+
+ // Scheduling latency. PhaseClock starts when the phase begins. FirstScheduleUs /
+ // FirstStartUs are the relative times of the earliest ScheduleWork call and the earliest
+ // worker lambda entry. Their difference is how long requests sat in the pool backlog
+ // before a worker picked one up (pool warm-up / backlog).
+ Stopwatch PhaseClock;
+ std::atomic<uint64_t> FirstScheduleUs{UINT64_MAX};
+ std::atomic<uint64_t> FirstStartUs{UINT64_MAX};
+
+ void RecordScheduled()
+ {
+ uint64_t Now = PhaseClock.GetElapsedTimeUs();
+ uint64_t Existing = FirstScheduleUs.load(std::memory_order_relaxed);
+ while (Now < Existing && !FirstScheduleUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed))
+ {
+ }
+ }
- void RecordThread()
+ // Returns a Stopwatch the caller runs across the actual request; call EndRequest with
+ // the elapsed microseconds when the request completes.
+ Stopwatch BeginRequest()
{
- int Tid = zen::GetCurrentThreadId();
- ThreadIdsLock.WithExclusiveLock([&] { ThreadIds.insert(Tid); });
+ uint64_t Now = PhaseClock.GetElapsedTimeUs();
+ uint64_t Existing = FirstStartUs.load(std::memory_order_relaxed);
+ while (Now < Existing && !FirstStartUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed))
+ {
+ }
+ uint32_t Current = InFlight.fetch_add(1, std::memory_order_relaxed) + 1;
+ uint32_t Peak = InFlightPeak.load(std::memory_order_relaxed);
+ while (Current > Peak && !InFlightPeak.compare_exchange_weak(Peak, Current, std::memory_order_relaxed))
+ {
+ }
+ return Stopwatch{};
+ }
+
+ void EndRequest(uint64_t ElapsedUsValue)
+ {
+ InFlight.fetch_sub(1, std::memory_order_relaxed);
+ RequestCount.fetch_add(1, std::memory_order_relaxed);
+ RequestTotalUs.fetch_add(ElapsedUsValue, std::memory_order_relaxed);
+ uint64_t Existing = RequestMaxUs.load(std::memory_order_relaxed);
+ while (ElapsedUsValue > Existing && !RequestMaxUs.compare_exchange_weak(Existing, ElapsedUsValue, std::memory_order_relaxed))
+ {
+ }
}
};
struct DehydrateStatistics
{
PhaseStats Hash;
- PhaseStats Upload;
- PhaseStats Touch; // Touch shares Upload's ParallelWork / ElapsedUs
+ PhaseStats Upload; // Loose CAS PUTs
+ PhaseStats Touch; // Mod-time refresh on pre-existing loose CAS entries; shares Upload's ParallelWork
+ PhaseStats PackUpload; // Pack-blob PUTs; shares Upload's ParallelWork
+ PhaseStats PackTouch; // Mod-time refresh on pre-existing pack blobs; shares Upload's ParallelWork
std::atomic<uint64_t> LoadStateUs{0};
std::atomic<uint64_t> DirScanUs{0};
std::atomic<uint64_t> ListExistingUs{0};
- std::atomic<uint64_t> MetadataSaveUs{0};
+ std::atomic<uint64_t> SaveMetadataUs{0};
std::atomic<uint64_t> CleanUs{0};
std::atomic<uint64_t> TotalFiles{0};
std::atomic<uint64_t> TotalBytes{0};
std::atomic<uint64_t> TotalUs{0};
+
+ // Pack phase stats
+ std::atomic<uint64_t> PackCount{0}; // number of packs built
+ std::atomic<uint64_t> PackedFiles{0}; // Files[] entries folded into packs (includes hash-duplicates)
+ std::atomic<uint64_t> PackBytes{0}; // total bytes across all packs
+ std::atomic<uint64_t> PackBuildUs{0}; // hash + write cost across all packs
};
struct HydrateStatistics
{
- PhaseStats Download;
+ PhaseStats Download; // Loose CAS GETs
+ PhaseStats PackDownload; // Pack-blob GETs; shares Download's ParallelWork
std::atomic<uint64_t> LoadMetadataUs{0};
+ std::atomic<uint64_t> CreateDirsUs{0};
+ std::atomic<uint64_t> CreateDirsCount{0}; // unique parent dirs passed to CreateDirectories
std::atomic<uint64_t> CleanUs{0};
- std::atomic<uint64_t> RenameOrCopyUs{0};
- std::atomic<uint64_t> VerifyScanUs{0};
+ std::atomic<uint64_t> FinalizeUs{0};
+ std::atomic<uint64_t> BuildStateUs{0};
std::atomic<uint64_t> TotalFiles{0};
std::atomic<uint64_t> TotalBytes{0};
std::atomic<uint64_t> TotalUs{0};
+
+ // Pack phase stats
+ std::atomic<uint64_t> PackCount{0}; // packs in manifest
+ std::atomic<uint64_t> PackedFiles{0}; // files unpacked into ServerStateDir (per-destination count)
+ std::atomic<uint64_t> PackUnpackUs{0}; // slice + parallel SafeWriteFile cost
+ // Bytes written to disk during the unpack-and-slice phase (sum of slice sizes
+ // touched by SafeWriteFile). Pairs with PackUnpackUs for disk-write throughput.
+ std::atomic<uint64_t> UnpackWriteBytes{0};
};
// Bits-per-second rate computed at microsecond precision. Zero-safe.
@@ -149,9 +265,13 @@ namespace hydration_impl {
public:
virtual ~StorageBase() = default;
- virtual std::string Describe() const = 0;
- virtual void SaveMetadata(const CbObject& Data) = 0;
- virtual CbObject LoadMetadata() = 0;
+ virtual std::string Describe() const = 0;
+ virtual void SaveMetadata(const CbObject& Data) = 0;
+ virtual CbObject LoadMetadata() = 0;
+ // Backend-specific settings that need to be persisted in state.cbo and reapplied
+ // on hydrate. Today only S3 uses this (MultipartChunkSize - the chunk size used at
+ // dehydrate must be carried forward so hydrate uses the same partitioning). File
+ // backend has no such settings and returns / accepts an empty object.
virtual CbObject GetSettings() = 0;
virtual void ParseSettings(const CbObjectView& Settings) = 0;
virtual std::vector<IoHash> List() = 0;
@@ -179,7 +299,7 @@ namespace hydration_impl {
explicit FileStorage(std::filesystem::path ModulePath);
- virtual std::string Describe() const override { return fmt::format("file://{}", m_StoragePath.generic_string()); }
+ virtual std::string Describe() const override { return fmt::format("file://{}"sv, m_StoragePath.generic_string()); }
virtual void SaveMetadata(const CbObject& Data) override;
virtual CbObject LoadMetadata() override;
virtual CbObject GetSettings() override { return {}; }
@@ -209,13 +329,12 @@ namespace hydration_impl {
class S3Storage : public StorageBase
{
public:
- static constexpr std::string_view Prefix = "s3://";
- static constexpr std::string_view Type = "s3";
- static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u;
+ static constexpr std::string_view Prefix = "s3://";
+ static constexpr std::string_view Type = "s3";
S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize);
- virtual std::string Describe() const override { return fmt::format("s3://{}/{}", m_Client.BucketName(), m_KeyPrefix); }
+ virtual std::string Describe() const override { return fmt::format("s3://{}/{}"sv, m_Client.BucketName(), m_KeyPrefix); }
virtual void SaveMetadata(const CbObject& Data) override;
virtual CbObject LoadMetadata() override;
virtual CbObject GetSettings() override;
@@ -256,6 +375,7 @@ namespace hydration_impl {
void FileStorage::SaveMetadata(const CbObject& Data)
{
+ ZEN_TRACE_CPU("FileStorage::SaveMetadata");
BinaryWriter Output;
SaveCompactBinary(Output, Data);
WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize()));
@@ -263,6 +383,7 @@ namespace hydration_impl {
CbObject FileStorage::LoadMetadata()
{
+ ZEN_TRACE_CPU("FileStorage::LoadMetadata");
if (!IsFile(m_StatePathName))
{
return {};
@@ -277,7 +398,7 @@ namespace hydration_impl {
CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error);
if (Error != CbValidateError::None)
{
- throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error)));
+ throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}"sv, m_StatePathName, ToString(Error)));
}
return Result;
}
@@ -309,13 +430,15 @@ namespace hydration_impl {
Work.ScheduleWork(
WorkerPool,
[this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic<bool>& AbortFlag) {
- Stats.RecordThread();
+ ZEN_TRACE_CPU("FileStorage::Put");
if (!AbortFlag.load())
{
- std::filesystem::path DestPath = m_CASPath / fmt::format("{}", Hash);
+ Stopwatch Timer = Stats.BeginRequest();
+ std::filesystem::path DestPath = m_CASPath / fmt::format("{}"sv, Hash);
+ auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); });
if (std::error_code Ec = CopyFile(SourcePath, DestPath, CopyFileOptions{.EnableClone = true}); Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestPath));
+ throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'"sv, SourcePath, DestPath));
}
Stats.Bytes.fetch_add(Size, std::memory_order_relaxed);
}
@@ -332,13 +455,16 @@ namespace hydration_impl {
Work.ScheduleWork(WorkerPool,
[this, Hash = IoHash(Hash), Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats](
std::atomic<bool>& AbortFlag) {
- Stats.RecordThread();
+ ZEN_TRACE_CPU("FileStorage::Get");
if (!AbortFlag.load())
{
- std::filesystem::path SourcePath = m_CASPath / fmt::format("{}", Hash);
+ Stopwatch Timer = Stats.BeginRequest();
+ auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); });
+ std::filesystem::path SourcePath = m_CASPath / fmt::format("{}"sv, Hash);
if (std::error_code Ec = CopyFile(SourcePath, DestinationPath, CopyFileOptions{.EnableClone = true}); Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestinationPath));
+ throw std::system_error(Ec,
+ fmt::format("Failed to copy '{}' to '{}'"sv, SourcePath, DestinationPath));
}
Stats.Bytes.fetch_add(Size, std::memory_order_relaxed);
}
@@ -365,6 +491,7 @@ namespace hydration_impl {
void S3Storage::SaveMetadata(const CbObject& Data)
{
+ ZEN_TRACE_CPU("S3Storage::SaveMetadata");
BinaryWriter Output;
SaveCompactBinary(Output, Data);
IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize());
@@ -373,12 +500,13 @@ namespace hydration_impl {
S3Result Result = m_Client.PutObject(Key, std::move(Payload));
if (!Result.IsSuccess())
{
- throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error);
+ throw zen::runtime_error("Failed to save incremental metadata to '{}': {}"sv, Key, Result.Error);
}
}
CbObject S3Storage::LoadMetadata()
{
+ ZEN_TRACE_CPU("S3Storage::LoadMetadata");
std::string Key = m_KeyPrefix + "/incremental-state.cbo";
S3GetObjectResult Result = m_Client.GetObject(Key);
if (!Result.IsSuccess())
@@ -387,14 +515,14 @@ namespace hydration_impl {
{
return {};
}
- throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error);
+ throw zen::runtime_error("Failed to load incremental metadata from '{}': {}"sv, Key, Result.Error);
}
CbValidateError Error;
CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error);
if (Error != CbValidateError::None)
{
- throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error));
+ throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}"sv, Key, ToString(Error));
}
return Meta;
}
@@ -402,13 +530,13 @@ namespace hydration_impl {
CbObject S3Storage::GetSettings()
{
CbObjectWriter Writer;
- Writer << "MultipartChunkSize" << m_MultipartChunkSize;
+ Writer << "MultipartChunkSize"sv << m_MultipartChunkSize;
return Writer.Save();
}
void S3Storage::ParseSettings(const CbObjectView& Settings)
{
- m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(S3Storage::DefaultMultipartChunkSize);
+ m_MultipartChunkSize = Settings["MultipartChunkSize"sv].AsUInt64(DefaultMultipartChunkSize);
}
std::vector<IoHash> S3Storage::List()
@@ -417,7 +545,7 @@ namespace hydration_impl {
S3ListObjectsResult Result = m_Client.ListObjects(CasPrefix);
if (!Result.IsSuccess())
{
- throw zen::runtime_error("Failed to list S3 objects under '{}': {}", CasPrefix, Result.Error);
+ throw zen::runtime_error("Failed to list S3 objects under '{}': {}"sv, CasPrefix, Result.Error);
}
std::vector<IoHash> Hashes;
@@ -448,13 +576,15 @@ namespace hydration_impl {
Work.ScheduleWork(
WorkerPool,
[this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic<bool>& AbortFlag) {
- Stats.RecordThread();
+ ZEN_TRACE_CPU("S3Storage::Put");
if (AbortFlag.load())
{
return;
}
- S3Client& Client = m_Client;
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+ Stopwatch Timer = Stats.BeginRequest();
+ auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); });
+ S3Client& Client = m_Client;
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash);
if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
{
@@ -466,7 +596,7 @@ namespace hydration_impl {
m_MultipartChunkSize);
if (!Result.IsSuccess())
{
- throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}"sv, Key, Result.Error);
}
}
else
@@ -475,7 +605,7 @@ namespace hydration_impl {
S3Result Result = Client.PutObject(Key, File.ReadAll());
if (!Result.IsSuccess())
{
- throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}"sv, Key, Result.Error);
}
}
Stats.Bytes.fetch_add(Size, std::memory_order_relaxed);
@@ -489,7 +619,7 @@ namespace hydration_impl {
const std::filesystem::path& DestinationPath,
PhaseStats& Stats)
{
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash);
if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
{
@@ -515,15 +645,17 @@ namespace hydration_impl {
uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset);
Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data, &Stats](std::atomic<bool>& AbortFlag) {
- Stats.RecordThread();
- if (AbortFlag)
+ ZEN_TRACE_CPU("S3Storage::GetRange");
+ if (AbortFlag.load())
{
return;
}
- S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize);
+ Stopwatch Timer = Stats.BeginRequest();
+ auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); });
+ S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize);
if (!Chunk.IsSuccess())
{
- throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}",
+ throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}"sv,
Key,
Offset,
Offset + ChunkSize - 1,
@@ -541,15 +673,17 @@ namespace hydration_impl {
Work.ScheduleWork(
WorkerPool,
[this, Key = Key, Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats](std::atomic<bool>& AbortFlag) {
- Stats.RecordThread();
- if (AbortFlag)
+ ZEN_TRACE_CPU("S3Storage::Get");
+ if (AbortFlag.load())
{
return;
}
- S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir);
+ Stopwatch Timer = Stats.BeginRequest();
+ auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); });
+ S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir);
if (!Chunk.IsSuccess())
{
- throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error);
+ throw zen::runtime_error("Failed to download '{}' from S3: {}"sv, Key, Chunk.Error);
}
if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
@@ -585,16 +719,18 @@ namespace hydration_impl {
void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats)
{
Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), &Stats](std::atomic<bool>& AbortFlag) {
- Stats.RecordThread();
+ ZEN_TRACE_CPU("S3Storage::Touch");
if (AbortFlag.load())
{
return;
}
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
- S3Result Result = m_Client.Touch(Key);
+ Stopwatch Timer = Stats.BeginRequest();
+ auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); });
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash);
+ S3Result Result = m_Client.Touch(Key);
if (!Result.IsSuccess())
{
- throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error);
+ throw zen::runtime_error("Failed to touch '{}' in S3: {}"sv, Key, Result.Error);
}
});
}
@@ -605,7 +741,7 @@ namespace hydration_impl {
S3ListObjectsResult ListResult = m_Client.ListObjects(ModulePrefix);
if (!ListResult.IsSuccess())
{
- throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", ModulePrefix, ListResult.Error);
+ throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}"sv, ModulePrefix, ListResult.Error);
}
for (const S3ObjectInfo& Obj : ListResult.Objects)
{
@@ -617,7 +753,7 @@ namespace hydration_impl {
S3Result DelResult = m_Client.DeleteObject(Key);
if (!DelResult.IsSuccess())
{
- throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error);
+ throw zen::runtime_error("Failed to delete S3 object '{}': {}"sv, Key, DelResult.Error);
}
});
}
@@ -627,51 +763,141 @@ namespace hydration_impl {
// IncrementalHydrator: the only HydrationStrategyBase implementation.
// Summary emission for hydrate/dehydrate operations.
+ // Queue-wait helper: time between earliest schedule and earliest worker start. UINT64_MAX
+ // sentinels mean the corresponding event never happened (no work scheduled / nothing ran).
+ inline uint64_t QueueWaitUs(uint64_t FirstScheduleUs, uint64_t FirstStartUs)
+ {
+ if (FirstScheduleUs == UINT64_MAX || FirstStartUs == UINT64_MAX || FirstStartUs <= FirstScheduleUs)
+ {
+ return 0;
+ }
+ return FirstStartUs - FirstScheduleUs;
+ }
+
+ inline uint64_t SafeAvg(uint64_t Total, uint64_t Count) { return Count ? Total / Count : 0; }
+
void LogDehydrateSummary(std::string_view Prefix,
const DehydrateStatistics& Stats,
std::string_view ModuleId,
const std::filesystem::path& Source,
std::string_view Target)
{
- const uint64_t HashUs = Stats.Hash.ElapsedUs.load();
- const uint64_t UploadUs = Stats.Upload.ElapsedUs.load();
+ const uint64_t TotalFiles = Stats.TotalFiles.load();
+ const uint64_t HashUs = Stats.Hash.ElapsedUs.load();
+ const uint64_t UploadUs = Stats.Upload.ElapsedUs.load();
+
+ // Hash phase: per-request data (BeginRequest/EndRequest tracks each hash op).
+ // Hash is the first phase to schedule work, so cold-pool warm-up shows up here.
+ const uint64_t HashFiles = Stats.Hash.Files.load();
+ const uint64_t HashBytes = Stats.Hash.Bytes.load();
+ const uint64_t HashReqCount = Stats.Hash.RequestCount.load();
+ const uint64_t HashReqTotalUs = Stats.Hash.RequestTotalUs.load();
+ const uint64_t HashReqMaxUs = Stats.Hash.RequestMaxUs.load();
+ const uint32_t HashPeak = Stats.Hash.InFlightPeak.load();
+ const uint64_t HashQueueUs = QueueWaitUs(Stats.Hash.FirstScheduleUs.load(), Stats.Hash.FirstStartUs.load());
+ // Cache hit rate: every TotalFile not re-hashed was served from the cached state.
+ const uint32_t CacheHitPct = TotalFiles ? gsl::narrow_cast<uint32_t>((TotalFiles - HashFiles) * 100 / TotalFiles) : 0;
+
+ // Upload phase shares one ParallelWork across loose Put (Stats.Upload), pack-blob Put
+ // (Stats.PackUpload), loose Touch (Stats.Touch), and pack-blob Touch (Stats.PackTouch).
+ // Per-request data is collected per PhaseStats by Storage::Put / Storage::Touch and
+ // reported as a single combined "Requests" line.
+ const uint64_t UpReqCount = Stats.Upload.RequestCount.load() + Stats.PackUpload.RequestCount.load() +
+ Stats.Touch.RequestCount.load() + Stats.PackTouch.RequestCount.load();
+ const uint64_t UpReqTotalUs = Stats.Upload.RequestTotalUs.load() + Stats.PackUpload.RequestTotalUs.load() +
+ Stats.Touch.RequestTotalUs.load() + Stats.PackTouch.RequestTotalUs.load();
+ const uint64_t UpReqMaxUs = std::max({Stats.Upload.RequestMaxUs.load(),
+ Stats.PackUpload.RequestMaxUs.load(),
+ Stats.Touch.RequestMaxUs.load(),
+ Stats.PackTouch.RequestMaxUs.load()});
+ const uint32_t UpPeak = std::max({Stats.Upload.InFlightPeak.load(),
+ Stats.PackUpload.InFlightPeak.load(),
+ Stats.Touch.InFlightPeak.load(),
+ Stats.PackTouch.InFlightPeak.load()});
+ // Combined first-schedule / first-start across all four phase counters. UINT64_MAX is
+ // the unset sentinel and naturally loses to any real timestamp under std::min, so empty
+ // phases fall through to the others.
+ const uint64_t UpFirstSchedUs = std::min({Stats.Upload.FirstScheduleUs.load(),
+ Stats.PackUpload.FirstScheduleUs.load(),
+ Stats.Touch.FirstScheduleUs.load(),
+ Stats.PackTouch.FirstScheduleUs.load()});
+ const uint64_t UpFirstStartUs = std::min({Stats.Upload.FirstStartUs.load(),
+ Stats.PackUpload.FirstStartUs.load(),
+ Stats.Touch.FirstStartUs.load(),
+ Stats.PackTouch.FirstStartUs.load()});
+ const uint64_t UpQueueUs = QueueWaitUs(UpFirstSchedUs, UpFirstStartUs);
+
+ const uint64_t LooseFiles = Stats.Upload.Files.load();
+ const uint64_t LooseBytes = Stats.Upload.Bytes.load();
+ const uint64_t TouchFiles = Stats.Touch.Files.load();
+ const uint64_t TouchBytes = Stats.Touch.Bytes.load();
+ const uint64_t PackTouchPacks = Stats.PackTouch.Files.load();
+ const uint64_t PackTouchBytes = Stats.PackTouch.Bytes.load();
+
+ const uint64_t PackCount = Stats.PackCount.load();
+ const uint64_t PackedFiles = Stats.PackedFiles.load();
+ const uint64_t PackBytes = Stats.PackBytes.load();
+ const uint64_t PackBuildUs = Stats.PackBuildUs.load();
+ const uint64_t PackUploadFiles = Stats.PackUpload.Files.load();
+ const uint64_t PackUploadBytes = Stats.PackUpload.Bytes.load();
+
ZEN_INFO(
"{} module '{}': {} files ({}) in {}\n"
" Source: {}\n"
" Target: {}\n"
" Load state: {}\n"
" Dir scan: {}\n"
- " Hash phase: {} {}/{} ({}) hashed, {}bits/s, {} threads\n"
+ " Hash: {} {}/{} files ({}) hashed, {}% cache hit, {}bits/s\n"
+ " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n"
" List existing: {}\n"
- " Upload phase: {} {}/{} ({}) uploaded, {} ({}) touched, {}bits/s, {} threads\n"
- " Metadata save: {}\n"
+ " Pack: {} {} packs, {} files, {}, {}bits/s\n"
+ " Upload: {} loose {} files ({}), packed {} blobs ({}), touched {} loose ({}) + {} packs ({}), {}bits/s\n"
+ " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n"
+ " Save metadata: {}\n"
" Clean: {}",
Prefix,
ModuleId,
- ThousandsNum(Stats.TotalFiles.load()),
+ ThousandsNum(TotalFiles),
NiceBytes(Stats.TotalBytes.load()),
- NiceLatencyNs(Stats.TotalUs.load() * 1000),
+ NiceTimeSpanUs(Stats.TotalUs.load()),
Source.generic_string(),
Target,
- NiceLatencyNs(Stats.LoadStateUs.load() * 1000),
- NiceLatencyNs(Stats.DirScanUs.load() * 1000),
- NiceLatencyNs(HashUs * 1000),
- ThousandsNum(Stats.Hash.Files.load()),
- ThousandsNum(Stats.TotalFiles.load()),
- NiceBytes(Stats.Hash.Bytes.load()),
- NiceNum(BitsPerSecond(Stats.Hash.Bytes.load(), HashUs)),
- Stats.Hash.ThreadIds.size(),
- NiceLatencyNs(Stats.ListExistingUs.load() * 1000),
- NiceLatencyNs(UploadUs * 1000),
- ThousandsNum(Stats.Upload.Files.load()),
- ThousandsNum(Stats.TotalFiles.load()),
- NiceBytes(Stats.Upload.Bytes.load()),
- ThousandsNum(Stats.Touch.Files.load()),
- NiceBytes(Stats.Touch.Bytes.load()),
- NiceNum(BitsPerSecond(Stats.Upload.Bytes.load(), UploadUs)),
- Stats.Upload.ThreadIds.size(),
- NiceLatencyNs(Stats.MetadataSaveUs.load() * 1000),
- NiceLatencyNs(Stats.CleanUs.load() * 1000));
+ NiceTimeSpanUs(Stats.LoadStateUs.load()),
+ NiceTimeSpanUs(Stats.DirScanUs.load()),
+ NiceTimeSpanUs(HashUs),
+ ThousandsNum(HashFiles),
+ ThousandsNum(TotalFiles),
+ NiceBytes(HashBytes),
+ CacheHitPct,
+ NiceNum(BitsPerSecond(HashBytes, HashUs)),
+ ThousandsNum(HashReqCount),
+ NiceTimeSpanUs(SafeAvg(HashReqTotalUs, HashReqCount)),
+ NiceTimeSpanUs(HashReqMaxUs),
+ HashPeak,
+ NiceTimeSpanUs(HashQueueUs),
+ NiceTimeSpanUs(Stats.ListExistingUs.load()),
+ NiceTimeSpanUs(PackBuildUs),
+ ThousandsNum(PackCount),
+ ThousandsNum(PackedFiles),
+ NiceBytes(PackBytes),
+ NiceNum(BitsPerSecond(PackBytes, PackBuildUs)),
+ NiceTimeSpanUs(UploadUs),
+ ThousandsNum(LooseFiles),
+ NiceBytes(LooseBytes),
+ ThousandsNum(PackUploadFiles),
+ NiceBytes(PackUploadBytes),
+ ThousandsNum(TouchFiles),
+ NiceBytes(TouchBytes),
+ ThousandsNum(PackTouchPacks),
+ NiceBytes(PackTouchBytes),
+ NiceNum(BitsPerSecond(LooseBytes + PackUploadBytes, UploadUs)),
+ ThousandsNum(UpReqCount),
+ NiceTimeSpanUs(SafeAvg(UpReqTotalUs, UpReqCount)),
+ NiceTimeSpanUs(UpReqMaxUs),
+ UpPeak,
+ NiceTimeSpanUs(UpQueueUs),
+ NiceTimeSpanUs(Stats.SaveMetadataUs.load()),
+ NiceTimeSpanUs(Stats.CleanUs.load()));
}
void LogHydrateSummary(std::string_view Prefix,
@@ -681,42 +907,137 @@ namespace hydration_impl {
const std::filesystem::path& Target)
{
const uint64_t DownloadUs = Stats.Download.ElapsedUs.load();
+
+ // Standalone (Stats.Download) and pack (Stats.PackDownload) downloads share one
+ // ParallelWork. Per-request data is collected per PhaseStats by Storage::Get;
+ // reported as a single combined "Requests" line. Multipart GETs may split a pack
+ // into several ranged requests, so DlReqCount can exceed file/blob counts.
+ const uint64_t StandaloneFiles = Stats.Download.Files.load();
+ const uint64_t StandaloneBytes = Stats.Download.Bytes.load();
+ const uint64_t PackDlFiles = Stats.PackDownload.Files.load();
+ const uint64_t PackDlBytes = Stats.PackDownload.Bytes.load();
+ const uint64_t DlReqCount = Stats.Download.RequestCount.load() + Stats.PackDownload.RequestCount.load();
+ const uint64_t DlReqTotalUs = Stats.Download.RequestTotalUs.load() + Stats.PackDownload.RequestTotalUs.load();
+ const uint64_t DlReqMaxUs = std::max(Stats.Download.RequestMaxUs.load(), Stats.PackDownload.RequestMaxUs.load());
+ const uint32_t DlPeak = std::max(Stats.Download.InFlightPeak.load(), Stats.PackDownload.InFlightPeak.load());
+ const uint64_t DlFirstSchedUs = std::min(Stats.Download.FirstScheduleUs.load(), Stats.PackDownload.FirstScheduleUs.load());
+ const uint64_t DlFirstStartUs = std::min(Stats.Download.FirstStartUs.load(), Stats.PackDownload.FirstStartUs.load());
+ const uint64_t QueueUs = QueueWaitUs(DlFirstSchedUs, DlFirstStartUs);
+
+ const uint64_t PackCount = Stats.PackCount.load();
+ const uint64_t PackedFiles = Stats.PackedFiles.load();
+ const uint64_t PackUnpackUs = Stats.PackUnpackUs.load();
+ const uint64_t UnpackWriteBytes = Stats.UnpackWriteBytes.load();
+
+ const uint64_t CreateDirsUs = Stats.CreateDirsUs.load();
+ const uint64_t CreateDirsCount = Stats.CreateDirsCount.load();
+ const uint64_t CreateDirsRate = CreateDirsUs ? (CreateDirsCount * 1'000'000ull / CreateDirsUs) : 0;
+
+ // Standalone and pack downloads share the Download phase elapsed reported below;
+ // the unpack line is its own clock (slice + parallel SafeWriteFile).
ZEN_INFO(
"{} module '{}': {} files ({}) in {}\n"
" Source: {}\n"
" Target: {}\n"
" Load metadata: {}\n"
- " Download phase: {} {}/{} ({}) downloaded, {}bits/s, {} threads\n"
+ " Create dirs: {} {} dirs, {} dirs/s\n"
+ " Download: {} loose {} files ({}), packed {} blobs ({}), {}bits/s\n"
+ " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n"
+ " Unpack: {} {} packs, {} files ({}), {}bits/s\n"
" Clean: {}\n"
- " Rename/copy: {}\n"
- " Verify scan: {}",
+ " Finalize: {}\n"
+ " Build state: {}",
Prefix,
ModuleId,
ThousandsNum(Stats.TotalFiles.load()),
NiceBytes(Stats.TotalBytes.load()),
- NiceLatencyNs(Stats.TotalUs.load() * 1000),
+ NiceTimeSpanUs(Stats.TotalUs.load()),
Source,
Target.generic_string(),
- NiceLatencyNs(Stats.LoadMetadataUs.load() * 1000),
- NiceLatencyNs(DownloadUs * 1000),
- ThousandsNum(Stats.Download.Files.load()),
- ThousandsNum(Stats.TotalFiles.load()),
- NiceBytes(Stats.Download.Bytes.load()),
- NiceNum(BitsPerSecond(Stats.Download.Bytes.load(), DownloadUs)),
- Stats.Download.ThreadIds.size(),
- NiceLatencyNs(Stats.CleanUs.load() * 1000),
- NiceLatencyNs(Stats.RenameOrCopyUs.load() * 1000),
- NiceLatencyNs(Stats.VerifyScanUs.load() * 1000));
+ NiceTimeSpanUs(Stats.LoadMetadataUs.load()),
+ NiceTimeSpanUs(CreateDirsUs),
+ ThousandsNum(CreateDirsCount),
+ NiceNum(CreateDirsRate),
+ NiceTimeSpanUs(DownloadUs),
+ ThousandsNum(StandaloneFiles),
+ NiceBytes(StandaloneBytes),
+ ThousandsNum(PackDlFiles),
+ NiceBytes(PackDlBytes),
+ NiceNum(BitsPerSecond(StandaloneBytes + PackDlBytes, DownloadUs)),
+ ThousandsNum(DlReqCount),
+ NiceTimeSpanUs(SafeAvg(DlReqTotalUs, DlReqCount)),
+ NiceTimeSpanUs(DlReqMaxUs),
+ DlPeak,
+ NiceTimeSpanUs(QueueUs),
+ NiceTimeSpanUs(PackUnpackUs),
+ ThousandsNum(PackCount),
+ ThousandsNum(PackedFiles),
+ NiceBytes(UnpackWriteBytes),
+ NiceNum(BitsPerSecond(UnpackWriteBytes, PackUnpackUs)),
+ NiceTimeSpanUs(Stats.CleanUs.load()),
+ NiceTimeSpanUs(Stats.FinalizeUs.load()),
+ NiceTimeSpanUs(Stats.BuildStateUs.load()));
}
///////////////////////////////////////////////////////////////////////
+ // File-manifest entry: one of these per file in a module's state. Lives in
+ // the namespace (rather than nested in IncrementalHydrator) so the helper
+ // functions below can take it by reference.
+
+ struct Entry
+ {
+ std::string RelativePath;
+ uint64_t Size;
+ uint64_t ModTick;
+ IoHash Hash;
+ bool IsPacked = false; // true if content is part of a pack (PackHash valid)
+ // Hash of the pack's concatenated raw bytes (= pack's CAS key). Hydrate downloads
+ // the pack once and slices this entry out at the offset recorded in Pack.Entries[]
+ // for the matching Entry.Hash.
+ IoHash PackHash;
+ };
+
+ ///////////////////////////////////////////////////////////////////////
+ // Pack types: produced by dehydrate's pack phase, consumed by the state
+ // writer; consumed during hydrate to reconstruct slices.
+
+ struct BuiltPackEntry
+ {
+ IoHash Hash;
+ uint64_t Size;
+ };
+
+ struct BuiltPack
+ {
+ IoHash PackHash;
+ uint64_t Size;
+ std::vector<BuiltPackEntry> Entries;
+ };
+
+ struct PackEntryDescriptor
+ {
+ IoHash Hash;
+ uint64_t Size;
+ uint64_t Offset;
+ };
+
+ struct PackDescriptor
+ {
+ uint64_t Size = 0;
+ std::vector<PackEntryDescriptor> Entries;
+ };
+
+ using EntryGroup = std::vector<size_t>;
+ using PackPlan = std::vector<EntryGroup>;
+
+ ///////////////////////////////////////////////////////////////////////
// Holds a per-module StorageBase and threading context; drives the
// hydrate/dehydrate algorithm.
class IncrementalHydrator : public HydrationStrategyBase
{
public:
- IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage);
+ IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage, std::span<const std::string> Excludes);
virtual ~IncrementalHydrator() override;
virtual void Dehydrate(const CbObject& CachedState) override;
@@ -724,16 +1045,9 @@ namespace hydration_impl {
virtual void Obliterate() override;
private:
- struct Entry
- {
- std::filesystem::path RelativePath;
- uint64_t Size;
- uint64_t ModTick;
- IoHash Hash;
- };
-
std::unique_ptr<StorageBase> m_Storage;
HydrationConfig m_Config;
+ std::vector<std::string> m_Excludes;
WorkerThreadPool m_FallbackWorkPool;
std::atomic<bool> m_FallbackAbortFlag{false};
std::atomic<bool> m_FallbackPauseFlag{false};
@@ -743,11 +1057,657 @@ namespace hydration_impl {
};
///////////////////////////////////////////////////////////////////////
+ // Phase helpers used by IncrementalHydrator::Dehydrate and ::Hydrate.
+ // These keep the two big member functions readable as a sequence of
+ // named phases. Helpers take only the data they need and never the
+ // full HydrationConfig or IncrementalHydrator.
+
+ // Removes each path, ignoring errors. Used by both Dehydrate and Hydrate
+ // pack-cleanup guards plus the explicit pre-rename cleanup on Hydrate.
+ void RemoveStagedPackFiles(const std::vector<std::filesystem::path>& Files)
+ {
+ for (const std::filesystem::path& P : Files)
+ {
+ std::error_code Ec;
+ RemoveFile(P, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to remove staged pack file '{}': {}", P, Ec.message());
+ }
+ }
+ }
+
+ // Collects parent_path() of each input, sorts lex ascending (= ancestor-first),
+ // uniques, then drops any entry that is a strict component-prefix of the next
+ // entry (its descendant's CreateDirectories recursion will create it). Calls
+ // CreateDirectories on the surviving leaves. Use before scheduling parallel
+ // writes so worker threads do not race to create the same parents.
+ size_t CreateParentDirectories(const std::vector<std::filesystem::path>& FilePaths)
+ {
+ if (FilePaths.empty())
+ {
+ return 0;
+ }
+
+ std::vector<std::filesystem::path> Dirs;
+ Dirs.reserve(FilePaths.size());
+ for (const std::filesystem::path& File : FilePaths)
+ {
+ if (File.has_parent_path())
+ {
+ Dirs.push_back(File.parent_path());
+ }
+ }
+ if (Dirs.empty())
+ {
+ return 0;
+ }
+
+ std::sort(Dirs.begin(), Dirs.end());
+ Dirs.erase(std::unique(Dirs.begin(), Dirs.end()), Dirs.end());
+
+ size_t Write = 0;
+ for (size_t Read = 0; Read < Dirs.size(); ++Read)
+ {
+ if (Read + 1 < Dirs.size())
+ {
+ const std::filesystem::path& Cur = Dirs[Read];
+ const std::filesystem::path& Next = Dirs[Read + 1];
+ const auto [ItCur, ItNext] = std::mismatch(Cur.begin(), Cur.end(), Next.begin(), Next.end());
+ if (ItCur == Cur.end() && ItNext != Next.end())
+ {
+ continue; // Cur is component-prefix of Next; descendant will create it
+ }
+ }
+ if (Write != Read)
+ {
+ Dirs[Write] = std::move(Dirs[Read]);
+ }
+ ++Write;
+ }
+ Dirs.resize(Write);
+
+ for (const std::filesystem::path& Dir : Dirs)
+ {
+ CreateDirectories(Dir);
+ }
+ return Dirs.size();
+ }
+
+ // Parses CachedState["Files"sv] into a path-keyed lookup + parallel Entries vector.
+ // Used by Dehydrate to seed its hash cache; Dehydrate ignores PackHash here.
+ void LoadCachedStateEntries(const CbObject& CachedState,
+ std::unordered_map<std::string, size_t>& OutLookup,
+ std::vector<Entry>& OutEntries)
+ {
+ for (CbFieldView FieldView : CachedState["Files"sv].AsArrayView())
+ {
+ CbObjectView EntryView = FieldView.AsObjectView();
+ std::string RelativePath(EntryView["Path"sv].AsString());
+ uint64_t Size = EntryView["Size"sv].AsUInt64();
+ uint64_t ModTick = EntryView["ModTick"sv].AsUInt64();
+ IoHash Hash = EntryView["Hash"sv].AsHash();
+
+ OutLookup.insert_or_assign(RelativePath, OutEntries.size());
+ OutEntries.push_back(Entry{.RelativePath = std::move(RelativePath), .Size = Size, .ModTick = ModTick, .Hash = Hash});
+ }
+ }
+
+ // Computes Out.Hash for a single file. For Oodle-compressed files in a `cas/` subdir
+ // the hash is a meta-hash combining the embedded RawHash with the file size, which
+ // avoids a collision between an uncompressed file and a same-content compressed file.
+ // All other files use a streaming raw hash via BasicFile + IoHashStream (sequential
+ // read, friendlier to the Windows cache manager than mmap).
+ void HashFileContent(const std::filesystem::path& AbsPath, Entry& Out)
+ {
+ if (AbsPath.extension().empty())
+ {
+ std::string_view Rel = Out.RelativePath;
+ std::string_view First = Rel.substr(0, Rel.find('/'));
+ if (First.ends_with("cas"))
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), RawHash, RawSize);
+ if (Compressed)
+ {
+ IoHashStream Hasher;
+ Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash));
+ Hasher.Append(&Out.Size, sizeof(Out.Size));
+ Out.Hash = Hasher.GetHash();
+ return;
+ }
+ }
+ }
+
+ BasicFile File(AbsPath, BasicFile::Mode::kRead);
+ IoHashStream Hasher;
+ File.StreamFile([&Hasher](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
+ Out.Hash = Hasher.GetHash();
+ }
+
+ // Walks DirContent, fills Entries[], schedules hash work for files whose hash
+ // is not in the StateEntries cache. The caller owns the ParallelWork's Wait()
+ // so other operations (e.g. Storage::List) can overlap with hashing. Files whose
+ // relative path matches any pattern in Excludes are dropped here (the hub-wide
+ // default list - see DefaultExcludes() above - covers transient runtime files
+ // like .lock and .sentry-native; the user can override via HydrationOptions).
+ // Returns the number of accepted (non-filtered) entries; OutTotalBytes accumulates
+ // their sizes.
+ size_t ScanAndScheduleHashWork(const DirectoryContent& DirContent,
+ const std::filesystem::path& ServerStateDir,
+ const std::unordered_map<std::string, size_t>& StateEntryLookup,
+ const std::vector<Entry>& StateEntries,
+ std::span<const std::string> Excludes,
+ std::vector<Entry>& Entries,
+ uint64_t& OutTotalBytes,
+ ParallelWork& Work,
+ WorkerThreadPool& Pool,
+ PhaseStats& HashStats)
+ {
+ size_t TotalFiles = 0;
+ OutTotalBytes = 0;
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
+ {
+ const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+ std::string RelKey = RelativePath.generic_string();
+ if (IsExcluded(RelKey, Excludes))
+ {
+ continue;
+ }
+ const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]);
+
+ Entry& CurrentEntry = Entries[TotalFiles];
+ CurrentEntry.RelativePath = std::move(RelKey);
+ CurrentEntry.Size = DirContent.FileSizes[FileIndex];
+ CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex];
+
+ bool FoundHash = false;
+ if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath); KnownIt != StateEntryLookup.end())
+ {
+ const Entry& StateEntry = StateEntries[KnownIt->second];
+ if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick)
+ {
+ CurrentEntry.Hash = StateEntry.Hash;
+ FoundHash = true;
+ }
+ }
+
+ if (!FoundHash)
+ {
+ Work.ScheduleWork(Pool, [AbsPath, EntryIndex = TotalFiles, &Entries, &HashStats](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ Stopwatch Timer = HashStats.BeginRequest();
+ auto GuardEnd = MakeGuard([&] { HashStats.EndRequest(Timer.GetElapsedTimeUs()); });
+ Entry& CurrentEntry = Entries[EntryIndex];
+ HashFileContent(AbsPath, CurrentEntry);
+ HashStats.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed);
+ });
+ HashStats.Files.fetch_add(1, std::memory_order_relaxed);
+ HashStats.RecordScheduled();
+ }
+ TotalFiles++;
+ OutTotalBytes += CurrentEntry.Size;
+ }
+ return TotalFiles;
+ }
+
+ // Plans pack composition deterministically: groups Entries by content hash for
+ // candidates Size < Threshold, sorts groups by ascending IoHash, bin-packs greedily
+ // up to MaxPackBytes, and discards any pack with fewer than two entries. Sets
+ // `IsPacked = true` on every entry that survives into a published pack so the caller
+ // can immediately distinguish loose-CAS uploads from pack-bound uploads.
+ // Returns one PackPlan per pack to build (empty if no packs are produced).
+ std::vector<PackPlan> PlanPacks(std::vector<Entry>& Entries, uint64_t Threshold, uint64_t MaxPackBytes)
+ {
+ // 1. Group small-file Entries[] indices by content hash. Every index in a group
+ // shares the same bytes, so any one of them sources the pack content; all of
+ // them get tagged IsPacked once the pack hash is known.
+ std::unordered_map<IoHash, EntryGroup, IoHash::Hasher> UniqueMap;
+ for (size_t Index = 0; Index < Entries.size(); ++Index)
+ {
+ if (Entries[Index].Size >= Threshold)
+ {
+ continue;
+ }
+ UniqueMap[Entries[Index].Hash].push_back(Index);
+ }
+
+ // Need at least 2 unique groups for any pack to survive the "discard 1-entry packs" rule.
+ if (UniqueMap.size() < 2)
+ {
+ return {};
+ }
+
+ auto GroupHash = [&](const EntryGroup& G) -> const IoHash& { return Entries[G.front()].Hash; };
+ auto GroupSize = [&](const EntryGroup& G) -> uint64_t { return Entries[G.front()].Size; };
+
+ // 2. Deterministic order: ascending IoHash. Drain the map so the index vectors move.
+ std::vector<EntryGroup> Ordered;
+ Ordered.reserve(UniqueMap.size());
+ for (auto& [h, g] : UniqueMap)
+ {
+ Ordered.push_back(std::move(g));
+ }
+ std::sort(Ordered.begin(), Ordered.end(), [&](const EntryGroup& A, const EntryGroup& B) { return GroupHash(A) < GroupHash(B); });
+
+ // 3. Bin-pack greedily under MaxPackBytes.
+ std::vector<PackPlan> Plans;
+ PackPlan Current;
+ uint64_t CurrentSize = 0;
+ for (EntryGroup& Group : Ordered)
+ {
+ const uint64_t Size = GroupSize(Group);
+ if (Size >= MaxPackBytes)
+ {
+ continue; // fallback to standalone upload
+ }
+ if (CurrentSize + Size > MaxPackBytes && !Current.empty())
+ {
+ if (Current.size() >= 2)
+ {
+ Plans.push_back(std::move(Current));
+ }
+ Current = {};
+ CurrentSize = 0;
+ }
+ Current.push_back(std::move(Group));
+ CurrentSize += Size;
+ }
+ if (Current.size() >= 2)
+ {
+ Plans.push_back(std::move(Current));
+ }
+
+ // Tag entries that survived into a published pack so the loose-upload loop can skip
+ // them. Done after bin-packing so groups discarded by the <2-entry rule are not tagged.
+ for (const PackPlan& Plan : Plans)
+ {
+ for (const EntryGroup& Group : Plan)
+ {
+ for (size_t Idx : Group)
+ {
+ Entries[Idx].IsPacked = true;
+ }
+ }
+ }
+ return Plans;
+ }
+
+ // Reads each source file in Plan, hashes the concatenation, writes raw bytes to
+ // TempPath. Throws on size mismatch (message includes ModuleId for grep). Scratch
+ // is owned by the caller and reused across packs; size must be >= the largest
+ // candidate file (i.e. the pack threshold).
+ BuiltPack BuildPack(const PackPlan& Plan,
+ const std::vector<Entry>& Entries,
+ const std::filesystem::path& ServerStateDir,
+ const std::filesystem::path& TempPath,
+ std::string_view ModuleId,
+ std::vector<uint8_t>& Scratch)
+ {
+ BuiltPack BP;
+ BP.Entries.reserve(Plan.size());
+ IoHashStream Hasher;
+ uint64_t Offset = 0;
+ {
+ BasicFile PackFile(TempPath, BasicFile::Mode::kTruncate);
+ BasicFileWriter Writer(PackFile, /*BufferSize*/ 64 * 1024);
+ for (const EntryGroup& Group : Plan)
+ {
+ // Every Entries[idx] in a group shares the same content hash (= same bytes),
+ // so the first one is a fine source.
+ const Entry& Rep = Entries[Group.front()];
+ std::filesystem::path AbsPath = MakeSafeAbsolutePath(ServerStateDir / Rep.RelativePath);
+ BasicFile Src(AbsPath, BasicFile::Mode::kRead);
+ const uint64_t Size = Src.FileSize();
+ if (Size != Rep.Size || Size > Scratch.size())
+ {
+ throw zen::runtime_error("Pack entry for hash {} (module '{}'): expected {} bytes, file is {} at '{}'"sv,
+ Rep.Hash,
+ ModuleId,
+ Rep.Size,
+ Size,
+ AbsPath);
+ }
+ Src.Read(Scratch.data(), Size, 0);
+ Hasher.Append(Scratch.data(), Size);
+ Writer.Write(Scratch.data(), Size, Offset);
+ Offset += Size;
+ BP.Entries.push_back(BuiltPackEntry{.Hash = Rep.Hash, .Size = Rep.Size});
+ }
+ Writer.Flush();
+ }
+
+ BP.PackHash = Hasher.GetHash();
+ BP.Size = Offset;
+ return BP;
+ }
+
+ // Schedules either a Put (if Hash is not in CAS) or a Touch (if it is). Updates
+ // counters on the matching PhaseStats - Files++ in both cases, Bytes+=Size on the
+ // touch path so touched-bytes accounting tracks size-equivalent work that did not
+ // transfer. Both paths call RecordScheduled so the queue-wait line covers cache-warm
+ // dehydrates (only Touches scheduled). Used for both loose CAS (UploadStats=Stats.Upload,
+ // TouchStats=Stats.Touch) and pack blobs (UploadStats=Stats.PackUpload, TouchStats=
+ // Stats.PackTouch). UploadStats and TouchStats must be distinct PhaseStats so the upload-
+ // throughput metric is not inflated by touched bytes that did not transfer.
+ void ScheduleUploadOrTouch(StorageBase& Storage,
+ ParallelWork& Work,
+ WorkerThreadPool& Pool,
+ const std::unordered_set<IoHash, IoHash::Hasher>& ExistsLookup,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath,
+ PhaseStats& UploadStats,
+ PhaseStats& TouchStats)
+ {
+ if (ExistsLookup.contains(Hash))
+ {
+ // Refresh the backend's modification time so lifecycle-expiration policies
+ // do not evict CAS entries that are still referenced by this module.
+ Storage.Touch(Work, Pool, Hash, TouchStats);
+ TouchStats.Files.fetch_add(1, std::memory_order_relaxed);
+ TouchStats.Bytes.fetch_add(Size, std::memory_order_relaxed);
+ TouchStats.RecordScheduled();
+ }
+ else
+ {
+ Storage.Put(Work, Pool, Hash, Size, SourcePath, UploadStats);
+ UploadStats.Files.fetch_add(1, std::memory_order_relaxed);
+ UploadStats.RecordScheduled();
+ }
+ }
+
+ // Builds and saves the dehydrate state.cbo: header fields, optional Packs[] array,
+ // and the Files[] array. ModuleId is stored in the manifest.
+ void WriteDehydrateMetadata(StorageBase& Storage,
+ const std::filesystem::path& ServerStateDir,
+ std::string_view ModuleId,
+ uint64_t TotalBytes,
+ uint64_t DehydrateDurationMs,
+ const std::vector<BuiltPack>& BuiltPacks,
+ const std::vector<Entry>& Entries)
+ {
+ UtcTime Now = UtcTime::Now();
+ std::string DehydrateTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z"sv,
+ Now.Tm.tm_year + 1900,
+ Now.Tm.tm_mon + 1,
+ Now.Tm.tm_mday,
+ Now.Tm.tm_hour,
+ Now.Tm.tm_min,
+ Now.Tm.tm_sec,
+ Now.Ms);
+
+ CbObjectWriter Meta;
+ Meta << "SchemaVersion"sv << HydrationSchemaVersion;
+ Meta << "SourceFolder"sv << ServerStateDir.generic_string();
+ Meta << "ModuleId"sv << ModuleId;
+ Meta << "HostName"sv << GetMachineName();
+ Meta << "DehydrateTimeUtc"sv << DehydrateTimeUtc;
+ Meta << "DehydrateDurationMs"sv << DehydrateDurationMs;
+ Meta << "TotalSizeBytes"sv << TotalBytes;
+ Meta << "StorageSettings"sv << Storage.GetSettings();
+
+ if (!BuiltPacks.empty())
+ {
+ Meta.BeginArray("Packs"sv);
+ for (const BuiltPack& BP : BuiltPacks)
+ {
+ Meta.BeginObject();
+ {
+ Meta << "Hash"sv << BP.PackHash;
+ Meta << "Size"sv << BP.Size;
+ Meta.BeginArray("Entries"sv);
+ for (const BuiltPackEntry& BPE : BP.Entries)
+ {
+ Meta.BeginObject();
+ {
+ Meta << "Hash"sv << BPE.Hash;
+ Meta << "Size"sv << BPE.Size;
+ }
+ Meta.EndObject();
+ }
+ Meta.EndArray();
+ }
+ Meta.EndObject();
+ }
+ Meta.EndArray();
+ }
+
+ Meta.BeginArray("Files"sv);
+ for (const Entry& CurrentEntry : Entries)
+ {
+ Meta.BeginObject();
+ {
+ Meta << "Path"sv << CurrentEntry.RelativePath;
+ Meta << "Size"sv << CurrentEntry.Size;
+ Meta << "ModTick"sv << CurrentEntry.ModTick;
+ Meta << "Hash"sv << CurrentEntry.Hash;
+ if (CurrentEntry.IsPacked)
+ {
+ Meta << "PackHash"sv << CurrentEntry.PackHash;
+ }
+ }
+ Meta.EndObject();
+ }
+ Meta.EndArray();
+
+ Storage.SaveMetadata(Meta.Save());
+ }
+
+ // Parses Meta["Files"sv] into Entries[] + path lookup. Reads PackHash and sets
+ // IsPacked when present. Used by Hydrate.
+ void ParseFilesArray(const CbObject& Meta,
+ std::vector<Entry>& OutEntries,
+ std::unordered_map<std::string, size_t>& OutLookup,
+ uint64_t& OutTotalSize)
+ {
+ OutTotalSize = 0;
+ for (CbFieldView FieldView : Meta["Files"sv])
+ {
+ CbObjectView EntryView = FieldView.AsObjectView();
+ if (EntryView)
+ {
+ Entry NewEntry = {.RelativePath{EntryView["Path"sv].AsString()},
+ .Size = EntryView["Size"sv].AsUInt64(),
+ .ModTick = EntryView["ModTick"sv].AsUInt64(),
+ .Hash = EntryView["Hash"sv].AsHash()};
+ IoHash PackHash = EntryView["PackHash"sv].AsHash();
+ if (PackHash != IoHash::Zero)
+ {
+ NewEntry.IsPacked = true;
+ NewEntry.PackHash = PackHash;
+ }
+ OutTotalSize += NewEntry.Size;
+ OutLookup.insert_or_assign(NewEntry.RelativePath, OutEntries.size());
+ OutEntries.emplace_back(std::move(NewEntry));
+ }
+ }
+ }
+
+ // Parses Meta["Packs"sv] into a hash-keyed descriptor map. Each PackDescriptor's
+ // Entries[] gets a prefix-sum offset for O(1) slice lookup at unpack time.
+ std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher> ParsePacksArray(const CbObject& Meta)
+ {
+ std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher> PackMap;
+ for (CbFieldView FieldView : Meta["Packs"sv])
+ {
+ CbObjectView PackView = FieldView.AsObjectView();
+ if (!PackView)
+ {
+ continue;
+ }
+ IoHash PackHash = PackView["Hash"sv].AsHash();
+ PackDescriptor PD;
+ PD.Size = PackView["Size"sv].AsUInt64();
+ uint64_t Offset = 0;
+ for (CbFieldView EF : PackView["Entries"sv])
+ {
+ CbObjectView EV = EF.AsObjectView();
+ if (!EV)
+ {
+ continue;
+ }
+ PackEntryDescriptor E{.Hash = EV["Hash"sv].AsHash(), .Size = EV["Size"sv].AsUInt64(), .Offset = Offset};
+ Offset += E.Size;
+ PD.Entries.push_back(E);
+ }
+ PackMap.emplace(PackHash, std::move(PD));
+ }
+ return PackMap;
+ }
+
+ // For each downloaded pack: read it into a heap buffer, verify size, and slice
+ // into per-entry IoBuffers (zero-copy views). Throws on size mismatch with the
+ // existing message format.
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> BuildHashToSlice(
+ const std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher>& PackMap,
+ const std::filesystem::path& TempDir,
+ std::string_view ModuleId)
+ {
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> HashToSlice;
+ size_t TotalPackEntries = 0;
+ for (const auto& [PackHash, PD] : PackMap)
+ {
+ TotalPackEntries += PD.Entries.size();
+ }
+ HashToSlice.reserve(TotalPackEntries);
+
+ for (const auto& [PackHash, PD] : PackMap)
+ {
+ std::filesystem::path PackPath = TempDir / "packs" / fmt::format("{}.bin"sv, PackHash);
+ // Heap-allocated buffer via direct ReadFile avoids mmap materialization
+ // and page-fault latency during the parallel unpack-write that follows.
+ BasicFile PackFile(PackPath, BasicFile::Mode::kRead);
+ IoBuffer PackBuf = PackFile.ReadAll();
+ if (PackBuf.GetSize() != PD.Size)
+ {
+ throw zen::runtime_error("Pack '{}' size mismatch for module '{}' at '{}': expected {}, got {}"sv,
+ PackHash,
+ ModuleId,
+ PackPath,
+ PD.Size,
+ PackBuf.GetSize());
+ }
+
+ for (const auto& E : PD.Entries)
+ {
+ HashToSlice.emplace(E.Hash, IoBuffer(PackBuf, E.Offset, E.Size));
+ }
+ }
+ return HashToSlice;
+ }
+
+ // Migrates contents of SourceDir into ServerStateDir. Same-volume: top-level rename
+ // per child. Different-volume: full CopyTree fallback. Caller is responsible for
+ // final cleanup of the parent temp directory (which may hold sibling staging dirs
+ // like packs/ that must NOT migrate).
+ void MigrateTempToState(const std::filesystem::path& SourceDir,
+ const std::filesystem::path& ServerStateDir,
+ const HydrationConfig::ThreadingOptions& Threading)
+ {
+ // If the two paths share at least one common component they are on the same drive/volume
+ // and atomic renames will succeed. Otherwise fall back to a full copy.
+ auto [ItSrc, ItState] = std::mismatch(SourceDir.begin(), SourceDir.end(), ServerStateDir.begin(), ServerStateDir.end());
+ if (ItSrc != SourceDir.begin())
+ {
+ DirectoryContent DirContent;
+ GetDirectoryContent(*Threading.WorkerPool,
+ SourceDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs,
+ DirContent);
+
+ for (const std::filesystem::path& AbsPath : DirContent.Directories)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'"sv, AbsPath, Dest));
+ }
+ }
+ for (const std::filesystem::path& AbsPath : DirContent.Files)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameFileWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'"sv, AbsPath, Dest));
+ }
+ }
+ }
+ else
+ {
+ // Slow path: source and target are on different filesystems, so rename
+ // would fail. Copy the tree instead.
+ ZEN_DEBUG("SourceDir and ServerStateDir are on different filesystems - using CopyTree");
+ CopyTree(SourceDir, ServerStateDir, {.EnableClone = true});
+ }
+ }
+
+ // Walks ServerStateDir and emits a Files[] cache for the next dehydrate's
+ // hash-shortcut (mirrors Load state on dehydrate). Files on disk that aren't in
+ // EntryLookup (manifest) are skipped with a WARN - typically leftovers from an
+ // earlier crashed hydrate.
+ CbObject BuildHydrateState(const std::filesystem::path& ServerStateDir,
+ const std::unordered_map<std::string, size_t>& EntryLookup,
+ const std::vector<Entry>& Entries,
+ std::string_view ModuleId,
+ const HydrationConfig::ThreadingOptions& Threading)
+ {
+ DirectoryContent DirContent;
+ GetDirectoryContent(*Threading.WorkerPool,
+ ServerStateDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
+ DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
+ DirContent);
+
+ CbObjectWriter HydrateState;
+ HydrateState.BeginArray("Files"sv);
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
+ {
+ std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+ std::string RelKey = RelativePath.generic_string();
+
+ if (auto It = EntryLookup.find(RelKey); It != EntryLookup.end())
+ {
+ HydrateState.BeginObject();
+ {
+ HydrateState << "Path"sv << RelKey;
+ HydrateState << "Size"sv << DirContent.FileSizes[FileIndex];
+ HydrateState << "ModTick"sv << DirContent.FileModificationTicks[FileIndex];
+ HydrateState << "Hash"sv << Entries[It->second].Hash;
+ }
+ HydrateState.EndObject();
+ }
+ else
+ {
+ // File on disk after hydrate but not in the manifest. Can happen when TempDir
+ // contained leftovers from a prior crashed hydrate that survived to the rename
+ // phase. Skip it rather than failing - the manifest is the source of truth for
+ // the cached state; the stray file is harmless and gets caught by the next
+ // dehydrate's directory scan.
+ ZEN_WARN("Hydrate: file '{}' present on disk but missing from manifest for module '{}'; skipping", RelKey, ModuleId);
+ }
+ }
+ HydrateState.EndArray();
+
+ return HydrateState.Save();
+ }
+
+ ///////////////////////////////////////////////////////////////////////
// IncrementalHydrator implementations
- IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage)
+ IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config,
+ std::unique_ptr<StorageBase> Storage,
+ std::span<const std::string> Excludes)
: m_Storage(std::move(Storage))
, m_Config(Config)
+ , m_Excludes(Excludes.begin(), Excludes.end())
, m_FallbackWorkPool(0)
{
if (Config.Threading)
@@ -760,6 +1720,7 @@ namespace hydration_impl {
void IncrementalHydrator::Dehydrate(const CbObject& CachedState)
{
+ ZEN_TRACE_CPU("IncrementalHydrator::Dehydrate");
Stopwatch TotalTimer;
DehydrateStatistics Stats;
const std::string StorageTarget = m_Storage->Describe();
@@ -767,24 +1728,17 @@ namespace hydration_impl {
const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
try
{
+ // Load the cache from the previous dehydrate to short-circuit re-hashing of
+ // unchanged files (matched by Path+Size+ModTick).
std::unordered_map<std::string, size_t> StateEntryLookup;
std::vector<Entry> StateEntries;
{
Stopwatch LoadStateTimer;
- for (CbFieldView FieldView : CachedState["Files"].AsArrayView())
- {
- CbObjectView EntryView = FieldView.AsObjectView();
- std::filesystem::path RelativePath(EntryView["Path"].AsString());
- uint64_t Size = EntryView["Size"].AsUInt64();
- uint64_t ModTick = EntryView["ModTick"].AsUInt64();
- IoHash Hash = EntryView["Hash"].AsHash();
-
- StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size());
- StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash});
- }
+ LoadCachedStateEntries(CachedState, StateEntryLookup, StateEntries);
Stats.LoadStateUs = LoadStateTimer.GetElapsedTimeUs();
}
+ // Scan the server state directory.
DirectoryContent DirContent;
{
Stopwatch DirScanTimer;
@@ -802,101 +1756,29 @@ namespace hydration_impl {
DirContent.Files.size(),
NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0))));
+ // Hash phase: build Entries[] and schedule hash work for files not in the cache.
+ // Storage::List runs in parallel with hashing to populate ExistsLookup before Wait.
std::vector<Entry> Entries;
Entries.resize(DirContent.Files.size());
- uint64_t TotalBytes = 0;
- uint64_t TotalFiles = 0;
-
- std::unordered_set<IoHash> ExistsLookup;
-
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
+ std::unordered_set<IoHash, IoHash::Hasher> ExistsLookup;
{
+ Stats.Hash.PhaseClock.Reset();
Stopwatch HashTimer;
ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
- {
- const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]);
- if (AbsPath.filename() == "reserve.gc")
- {
- continue;
- }
- const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
- if (*RelativePath.begin() == ".sentry-native")
- {
- continue;
- }
- if (RelativePath == ".lock")
- {
- continue;
- }
-
- Entry& CurrentEntry = Entries[TotalFiles];
- CurrentEntry.RelativePath = RelativePath;
- CurrentEntry.Size = DirContent.FileSizes[FileIndex];
- CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex];
-
- bool FoundHash = false;
- if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end())
- {
- const Entry& StateEntry = StateEntries[KnownIt->second];
- if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick)
- {
- CurrentEntry.Hash = StateEntry.Hash;
- FoundHash = true;
- }
- }
-
- if (!FoundHash)
- {
- Work.ScheduleWork(*m_Threading.WorkerPool,
- [AbsPath, EntryIndex = TotalFiles, &Entries, &Stats](std::atomic<bool>& AbortFlag) {
- Stats.Hash.RecordThread();
- if (AbortFlag)
- {
- return;
- }
-
- Entry& CurrentEntry = Entries[EntryIndex];
-
- bool FoundHash = false;
- if (AbsPath.extension().empty())
- {
- auto It = CurrentEntry.RelativePath.begin();
- if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas"))
- {
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(
- SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)),
- RawHash,
- RawSize);
- if (Compressed)
- {
- // We compose a meta-hash since taking the RawHash might collide with an
- // existing non-compressed file with the same content The collision is
- // unlikely except if the compressed data is zero bytes causing RawHash
- // to be the same as an empty file.
- IoHashStream Hasher;
- Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash));
- Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size));
- CurrentEntry.Hash = Hasher.GetHash();
- FoundHash = true;
- }
- }
- }
-
- if (!FoundHash)
- {
- CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath));
- }
- Stats.Hash.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed);
- });
- Stats.Hash.Files.fetch_add(1, std::memory_order_relaxed);
- }
- TotalFiles++;
- TotalBytes += CurrentEntry.Size;
- }
+ TotalFiles = ScanAndScheduleHashWork(DirContent,
+ ServerStateDir,
+ StateEntryLookup,
+ StateEntries,
+ m_Excludes,
+ Entries,
+ TotalBytes,
+ Work,
+ *m_Threading.WorkerPool,
+ Stats.Hash);
{
Stopwatch ListTimer;
@@ -906,82 +1788,136 @@ namespace hydration_impl {
}
Work.Wait();
-
Entries.resize(TotalFiles);
Stats.Hash.ElapsedUs = HashTimer.GetElapsedTimeUs();
Stats.TotalFiles = TotalFiles;
Stats.TotalBytes = TotalBytes;
}
- uint64_t UploadDurationMs = 0;
+ // Pack planning + unified upload phase. Plan first so we know which entries are
+ // packed, then run loose-CAS uploads and pack builds inside a single ParallelWork.
+ // Loose uploads are scheduled up front so they execute on the worker pool while
+ // the calling thread runs the serial pack-build loop; each completed pack hands
+ // its upload to the same ParallelWork. One Wait covers everything.
+ std::vector<BuiltPack> BuiltPacks;
+ std::vector<std::filesystem::path> StagedPackFiles;
+ auto PackCleanup = MakeGuard([&] {
+ RemoveStagedPackFiles(StagedPackFiles);
+ // Best-effort drop of the now-empty packs/ subdir so TempDir is clean after
+ // dehydrate. Mirrors the explicit cleanup on the hydrate-side success path.
+ std::error_code Ec;
+ DeleteDirectories(MakeSafeAbsolutePath(m_Config.TempDir) / "packs", Ec);
+ });
+
+ // PlanPacks tags Entries[Idx].IsPacked on every index that survives into a pack,
+ // so the loose-upload loop can skip them. PackHash is set later per-pack as each
+ // pack is built.
+ const std::vector<PackPlan> Pending =
+ m_Config.PackEnabled ? PlanPacks(Entries, m_Config.PackThresholdBytes, m_Config.MaxPackBytes) : std::vector<PackPlan>{};
+
+ uint64_t DehydrateDurationMs = 0;
{
+ // Upload, PackUpload, Touch, and PackTouch share one ParallelWork; reset all
+ // four PhaseClocks to the same baseline so the queue-wait line can combine
+ // their FirstScheduleUs / FirstStartUs across the four PhaseStats.
+ Stats.Upload.PhaseClock.Reset();
+ Stats.PackUpload.PhaseClock.Reset();
+ Stats.Touch.PhaseClock.Reset();
+ Stats.PackTouch.PhaseClock.Reset();
Stopwatch UploadTimer;
ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ // Schedule loose-CAS uploads first so they begin running while the pack-build
+ // loop below executes serially on this thread.
for (const Entry& CurrentEntry : Entries)
{
- if (!ExistsLookup.contains(CurrentEntry.Hash))
- {
- m_Storage->Put(Work,
- *m_Threading.WorkerPool,
- CurrentEntry.Hash,
- CurrentEntry.Size,
- MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath),
- Stats.Upload);
- Stats.Upload.Files.fetch_add(1, std::memory_order_relaxed);
- }
- else
+ if (CurrentEntry.IsPacked)
{
- // Refresh the backend's modification time so lifecycle-expiration policies
- // do not evict CAS entries that are still referenced by this module.
- m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, Stats.Touch);
- Stats.Touch.Files.fetch_add(1, std::memory_order_relaxed);
- Stats.Touch.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed);
+ continue; // pack phase covers it
}
+ ScheduleUploadOrTouch(*m_Storage,
+ Work,
+ *m_Threading.WorkerPool,
+ ExistsLookup,
+ CurrentEntry.Hash,
+ CurrentEntry.Size,
+ MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath),
+ Stats.Upload,
+ Stats.Touch);
}
- Work.Wait();
- Stats.Upload.ElapsedUs = UploadTimer.GetElapsedTimeUs();
- UploadDurationMs = TotalTimer.GetElapsedTimeMs();
-
- Stopwatch MetadataTimer;
- UtcTime Now = UtcTime::Now();
- std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
- Now.Tm.tm_year + 1900,
- Now.Tm.tm_mon + 1,
- Now.Tm.tm_mday,
- Now.Tm.tm_hour,
- Now.Tm.tm_min,
- Now.Tm.tm_sec,
- Now.Ms);
-
- CbObjectWriter Meta;
- Meta << "SourceFolder" << ServerStateDir.generic_string();
- Meta << "ModuleId" << m_Config.ModuleId;
- Meta << "HostName" << GetMachineName();
- Meta << "UploadTimeUtc" << UploadTimeUtc;
- Meta << "UploadDurationMs" << UploadDurationMs;
- Meta << "TotalSizeBytes" << TotalBytes;
- Meta << "StorageSettings" << m_Storage->GetSettings();
-
- Meta.BeginArray("Files");
- for (const Entry& CurrentEntry : Entries)
+ if (!Pending.empty())
{
- Meta.BeginObject();
+ ZEN_TRACE_CPU("IncrementalHydrator::Dehydrate::Pack");
+ std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
+ std::filesystem::path PacksDir = TempDir / "packs";
+ CreateDirectories(PacksDir);
+
+ // Reusable scratch for small-file reads. Every pack candidate has Size <
+ // PackThresholdBytes so a single buffer of that size holds any one file.
+ // Build runs serially on the caller's thread - typical modules produce 1-2
+ // packs at ~5 ms each, too small to be worth the parallel-dispatch overhead.
+ std::vector<uint8_t> Scratch(m_Config.PackThresholdBytes);
+
+ for (const PackPlan& Plan : Pending)
{
- Meta << "Path" << CurrentEntry.RelativePath.generic_string();
- Meta << "Size" << CurrentEntry.Size;
- Meta << "ModTick" << CurrentEntry.ModTick;
- Meta << "Hash" << CurrentEntry.Hash;
+ // Pre-register the staging path so PackCleanup removes it even if the
+ // stream-write loop below throws mid-flight.
+ Oid::String_t OidStr;
+ Oid::NewOid().ToString(OidStr);
+ std::filesystem::path PackTempPath = PacksDir / fmt::format("{}.bin"sv, OidStr);
+ StagedPackFiles.push_back(PackTempPath);
+
+ Stopwatch BuildTimer;
+ BuiltPack BP = BuildPack(Plan, Entries, ServerStateDir, PackTempPath, m_Config.ModuleId, Scratch);
+ Stats.PackBuildUs.fetch_add(BuildTimer.GetElapsedTimeUs(), std::memory_order_relaxed);
+
+ // Stamp the pack hash on every matching entry; state.cbo's Files[] reads
+ // PackHash off these entries when emitting per-file PackHash references.
+ uint64_t PackedEntryCount = 0;
+ for (const EntryGroup& Group : Plan)
+ {
+ for (size_t Idx : Group)
+ {
+ Entries[Idx].PackHash = BP.PackHash;
+ }
+ PackedEntryCount += Group.size();
+ }
+
+ Stats.PackCount.fetch_add(1, std::memory_order_relaxed);
+ Stats.PackedFiles.fetch_add(PackedEntryCount, std::memory_order_relaxed);
+ Stats.PackBytes.fetch_add(BP.Size, std::memory_order_relaxed);
+
+ ScheduleUploadOrTouch(*m_Storage,
+ Work,
+ *m_Threading.WorkerPool,
+ ExistsLookup,
+ BP.PackHash,
+ BP.Size,
+ PackTempPath,
+ Stats.PackUpload,
+ Stats.PackTouch);
+
+ BuiltPacks.push_back(std::move(BP));
}
- Meta.EndObject();
}
- Meta.EndArray();
- m_Storage->SaveMetadata(Meta.Save());
- Stats.MetadataSaveUs = MetadataTimer.GetElapsedTimeUs();
+ Work.Wait();
+ // Upload, PackUpload, Touch, and PackTouch share a single ParallelWork. Only
+ // Upload's ElapsedUs is read by the formatter; the others' bytes/requests are
+ // reported against the same Upload phase elapsed.
+ Stats.Upload.ElapsedUs = UploadTimer.GetElapsedTimeUs();
+ DehydrateDurationMs = TotalTimer.GetElapsedTimeMs();
}
+ // Persist the new state.cbo with header, Packs[], and Files[].
+ {
+ Stopwatch SaveMetadataTimer;
+ WriteDehydrateMetadata(*m_Storage, ServerStateDir, m_Config.ModuleId, TotalBytes, DehydrateDurationMs, BuiltPacks, Entries);
+ Stats.SaveMetadataUs = SaveMetadataTimer.GetElapsedTimeUs();
+ }
+
+ // Server-state dir contents have been uploaded; wipe them.
ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
{
Stopwatch CleanTimer;
@@ -990,7 +1926,7 @@ namespace hydration_impl {
}
Stats.TotalUs = TotalTimer.GetElapsedTimeUs();
- LogDehydrateSummary("Dehydration complete", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget);
+ LogDehydrateSummary("Dehydration complete"sv, Stats, m_Config.ModuleId, ServerStateDir, StorageTarget);
}
catch (const std::exception& Ex)
{
@@ -999,20 +1935,35 @@ namespace hydration_impl {
Ex.what(),
m_Config.ServerStateDir);
Stats.TotalUs = TotalTimer.GetElapsedTimeUs();
- LogDehydrateSummary("Dehydration failed", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget);
+ LogDehydrateSummary("Dehydration failed"sv, Stats, m_Config.ModuleId, ServerStateDir, StorageTarget);
}
}
CbObject IncrementalHydrator::Hydrate()
{
+ ZEN_TRACE_CPU("IncrementalHydrator::Hydrate");
Stopwatch TotalTimer;
HydrateStatistics Stats;
const std::string StorageSource = m_Storage->Describe();
const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
+ // Hydrated files land in TempDir/state/, pack staging blobs in TempDir/packs/. Keeping
+ // them in sibling subdirectories means MigrateTempToState only needs to hand the state/
+ // subtree across to ServerStateDir; pack staging never has a chance to leak into the
+ // final server state directory.
+ const std::filesystem::path TempStateDir = TempDir / "state";
try
{
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(TempDir);
+ // A prior hydrate may have crashed after downloading but before the rename phase,
+ // leaving stale files in TempDir that would otherwise get migrated into
+ // ServerStateDir and trip the post-rename manifest check.
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ CreateDirectories(TempStateDir);
+
+ // Load metadata; absent metadata means a fresh module - clean state, return.
CbObject Meta;
{
Stopwatch LoadTimer;
@@ -1028,147 +1979,165 @@ namespace hydration_impl {
return CbObject();
}
+ // Schema-version gate: refuse manifests written by a newer hub. Missing field is
+ // treated as version 0 (legacy / pre-versioning) and decoded best-effort - the
+ // optional fields (Packs[], StorageSettings) absent from v0 manifests fall back
+ // to defaults via ParsePacksArray / ParseSettings.
+ const uint32_t SchemaVersion = Meta["SchemaVersion"sv].AsUInt32(0);
+ if (SchemaVersion > HydrationSchemaVersion)
+ {
+ throw zen::runtime_error("State manifest for module '{}' has schema version {} but this hub supports up to {}"sv,
+ m_Config.ModuleId,
+ SchemaVersion,
+ HydrationSchemaVersion);
+ }
+
+ // Parse manifest: Files[] for per-file metadata, Packs[] (optional) for pack
+ // composition. Missing Packs[] = old-format state; treated as all-standalone.
std::unordered_map<std::string, size_t> EntryLookup;
std::vector<Entry> Entries;
uint64_t TotalSize = 0;
-
- for (CbFieldView FieldView : Meta["Files"])
- {
- CbObjectView EntryView = FieldView.AsObjectView();
- if (EntryView)
- {
- Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()),
- .Size = EntryView["Size"].AsUInt64(),
- .ModTick = EntryView["ModTick"].AsUInt64(),
- .Hash = EntryView["Hash"].AsHash()};
- TotalSize += NewEntry.Size;
- EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size());
- Entries.emplace_back(std::move(NewEntry));
- }
- }
+ ParseFilesArray(Meta, Entries, EntryLookup, TotalSize);
+ std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher> PackMap = ParsePacksArray(Meta);
Stats.TotalFiles = Entries.size();
Stats.TotalBytes = TotalSize;
+ Stats.PackCount = PackMap.size();
- ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files",
+ ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files, {} packs",
m_Config.ModuleId,
m_Config.ServerStateDir,
Entries.size(),
- NiceBytes(TotalSize));
-
- m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView());
+ NiceBytes(TotalSize),
+ PackMap.size());
+
+ // Re-apply storage settings from state.cbo (e.g. S3 multipart chunk size).
+ m_Storage->ParseSettings(Meta["StorageSettings"sv].AsObjectView());
+
+ // Per-entry destination paths under TempStateDir, indexed parallel to Entries[]. Used
+ // by the standalone download dispatch and (for IsPacked entries) by the unpack
+ // dispatch. Pre-creating parents once for the union covers both phases without a second pass.
+ std::vector<std::filesystem::path> EntryPaths;
+ EntryPaths.reserve(Entries.size());
+ for (const Entry& CurrentEntry : Entries)
+ {
+ EntryPaths.push_back(MakeSafeAbsolutePath(TempStateDir / CurrentEntry.RelativePath));
+ }
+ {
+ Stopwatch CreateDirsTimer;
+ auto RecordElapsed = MakeGuard([&] { Stats.CreateDirsUs = CreateDirsTimer.GetElapsedTimeUs(); });
+ Stats.CreateDirsCount = CreateParentDirectories(EntryPaths);
+ }
+ // Download phase: pack GETs first (so unpack can begin sooner), then standalone files.
+ // Both share the same ParallelWork; per-phase byte / request counts stay separate via
+ // PackDownload vs Download stats while the elapsed time is reported once.
{
+ // Download and PackDownload share one ParallelWork; reset both PhaseClocks
+ // to the same baseline so the queue-wait line can combine their FirstScheduleUs
+ // / FirstStartUs across the two PhaseStats.
+ Stats.Download.PhaseClock.Reset();
+ Stats.PackDownload.PhaseClock.Reset();
Stopwatch DownloadTimer;
ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- for (const Entry& CurrentEntry : Entries)
+ const std::filesystem::path PacksDir = TempDir / "packs";
+ if (!PackMap.empty())
+ {
+ CreateDirectories(PacksDir);
+ }
+
+ for (const auto& [PackHash, PD] : PackMap)
+ {
+ std::filesystem::path PackPath = PacksDir / fmt::format("{}.bin"sv, PackHash);
+ m_Storage->Get(Work, *m_Threading.WorkerPool, PackHash, PD.Size, PackPath, Stats.PackDownload);
+ Stats.PackDownload.Files.fetch_add(1, std::memory_order_relaxed);
+ Stats.PackDownload.RecordScheduled();
+ }
+
+ for (size_t I = 0; I < Entries.size(); ++I)
{
- std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath);
- CreateDirectories(Path.parent_path());
- m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path, Stats.Download);
+ if (Entries[I].IsPacked)
+ {
+ continue; // handled in the unpack phase below
+ }
+ m_Storage->Get(Work, *m_Threading.WorkerPool, Entries[I].Hash, Entries[I].Size, EntryPaths[I], Stats.Download);
Stats.Download.Files.fetch_add(1, std::memory_order_relaxed);
+ Stats.Download.RecordScheduled();
}
Work.Wait();
Stats.Download.ElapsedUs = DownloadTimer.GetElapsedTimeUs();
}
- // Downloaded successfully - swap into ServerStateDir
- ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ // Unpack phase: verify each downloaded pack, build hash->slice map, parallel-write.
+ if (!PackMap.empty())
{
- Stopwatch CleanTimer;
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
- Stats.CleanUs = CleanTimer.GetElapsedTimeUs();
- }
+ ZEN_TRACE_CPU("IncrementalHydrator::Hydrate::Unpack");
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> HashToSlice = BuildHashToSlice(PackMap, TempDir, m_Config.ModuleId);
- {
- Stopwatch RenameTimer;
- // If the two paths share at least one common component they are on the same drive/volume
- // and atomic renames will succeed. Otherwise fall back to a full copy.
- auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end());
- if (ItTmp != TempDir.begin())
{
- DirectoryContent DirContent;
- GetDirectoryContent(*m_Threading.WorkerPool,
- TempDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs,
- DirContent);
-
- for (const std::filesystem::path& AbsPath : DirContent.Directories)
+ Stopwatch UnpackTimer;
+ ParallelWork UnpackWork(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ for (size_t I = 0; I < Entries.size(); ++I)
{
- std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
- std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest);
- if (Ec)
+ if (!Entries[I].IsPacked)
{
- throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest));
+ continue;
}
- }
- for (const std::filesystem::path& AbsPath : DirContent.Files)
- {
- std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
- std::error_code Ec = RenameFileWithRetry(AbsPath, Dest);
- if (Ec)
+ auto It = HashToSlice.find(Entries[I].Hash);
+ if (It == HashToSlice.end())
{
- throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest));
+ throw zen::runtime_error("Packed file '{}' references unknown pack content hash '{}' in module '{}'"sv,
+ Entries[I].RelativePath,
+ Entries[I].Hash,
+ m_Config.ModuleId);
}
+ UnpackWork.ScheduleWork(*m_Threading.WorkerPool,
+ [&Stats, &Path = EntryPaths[I], Slice = &It->second](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ TemporaryFile::SafeWriteFile(Path, *Slice);
+ Stats.UnpackWriteBytes.fetch_add(Slice->GetSize(), std::memory_order_relaxed);
+ Stats.PackedFiles.fetch_add(1, std::memory_order_relaxed);
+ });
}
-
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ UnpackWork.Wait();
+ Stats.PackUnpackUs = UnpackTimer.GetElapsedTimeUs();
}
- else
- {
- // Slow path: TempDir and ServerStateDir are on different filesystems, so rename
- // would fail. Copy the tree instead and clean up the temp files afterwards.
- ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
- CopyTree(TempDir, ServerStateDir, {.EnableClone = true});
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
- }
- Stats.RenameOrCopyUs = RenameTimer.GetElapsedTimeUs();
+ // Release the pack buffers (each IoBuffer slice holds a ref to the pack's underlying
+ // heap buffer) before the rename/verify phase runs - avoids keeping ~sum(pack sizes)
+ // bytes alive across those phases.
+ HashToSlice.clear();
}
- CbObject StateObject;
+ // Downloaded successfully - swap TempStateDir contents into ServerStateDir, then
+ // sweep the rest of TempDir (empty TempStateDir, packs/, anything else).
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
{
- Stopwatch VerifyTimer;
- DirectoryContent DirContent;
- GetDirectoryContent(*m_Threading.WorkerPool,
- ServerStateDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
- DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
- DirContent);
-
- CbObjectWriter HydrateState;
- HydrateState.BeginArray("Files");
- for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
- {
- std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
-
- if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end())
- {
- HydrateState.BeginObject();
- {
- HydrateState << "Path" << RelativePath.generic_string();
- HydrateState << "Size" << DirContent.FileSizes[FileIndex];
- HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex];
- HydrateState << "Hash" << Entries[It->second].Hash;
- }
- HydrateState.EndObject();
- }
- else
- {
- ZEN_ASSERT(false);
- }
- }
- HydrateState.EndArray();
+ Stopwatch CleanTimer;
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ Stats.CleanUs = CleanTimer.GetElapsedTimeUs();
+ }
+ {
+ Stopwatch FinalizeTimer;
+ MigrateTempToState(TempStateDir, ServerStateDir, m_Threading);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ Stats.FinalizeUs = FinalizeTimer.GetElapsedTimeUs();
+ }
- StateObject = HydrateState.Save();
- Stats.VerifyScanUs = VerifyTimer.GetElapsedTimeUs();
+ // Build the cached state that the next Dehydrate will receive (mirrors Load state on dehydrate).
+ CbObject StateObject;
+ {
+ Stopwatch BuildStateTimer;
+ StateObject = BuildHydrateState(ServerStateDir, EntryLookup, Entries, m_Config.ModuleId, m_Threading);
+ Stats.BuildStateUs = BuildStateTimer.GetElapsedTimeUs();
}
Stats.TotalUs = TotalTimer.GetElapsedTimeUs();
- LogHydrateSummary("Hydration complete", Stats, m_Config.ModuleId, StorageSource, ServerStateDir);
+ LogHydrateSummary("Hydration complete"sv, Stats, m_Config.ModuleId, StorageSource, ServerStateDir);
return StateObject;
}
@@ -1182,25 +2151,42 @@ namespace hydration_impl {
ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
Stats.TotalUs = TotalTimer.GetElapsedTimeUs();
- LogHydrateSummary("Hydration failed", Stats, m_Config.ModuleId, StorageSource, ServerStateDir);
+ LogHydrateSummary("Hydration failed"sv, Stats, m_Config.ModuleId, StorageSource, ServerStateDir);
return {};
}
}
void IncrementalHydrator::Obliterate()
{
+ ZEN_TRACE_CPU("IncrementalHydrator::Obliterate");
const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
- try
- {
+ auto TryDeleteBackend = [&]() {
ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
m_Storage->Delete(Work, *m_Threading.WorkerPool);
Work.Wait();
+ };
+
+ try
+ {
+ TryDeleteBackend();
}
catch (const std::exception& Ex)
{
- ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what());
+ ZEN_WARN("Obliterate backend delete failed for module '{}' (attempt 1/2): {}. Retrying once.", m_Config.ModuleId, Ex.what());
+ try
+ {
+ TryDeleteBackend();
+ }
+ catch (const std::exception& Ex2)
+ {
+ ZEN_WARN(
+ "Obliterate backend delete failed for module '{}' (attempt 2/2): {}. Proceeding with local cleanup; backend data may "
+ "remain.",
+ m_Config.ModuleId,
+ Ex2.what());
+ }
}
CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
@@ -1243,26 +2229,33 @@ private:
uint64_t m_DefaultMultipartChunkSize;
};
+HydrationBase::HydrationBase(const Configuration& Config)
+{
+ using namespace hydration_impl;
+ CbFieldView ExcludesField = Config.Options["excludes"sv];
+ m_Excludes = ExcludesField.HasValue() ? ParseStringArray(ExcludesField) : DefaultExcludes();
+}
+
///////////////////////////////////////////////////////////////////////////
// Implementations
-FileHydration::FileHydration(const Configuration& Config)
+FileHydration::FileHydration(const Configuration& Config) : HydrationBase(Config)
{
if (!Config.TargetSpecification.empty())
{
m_StorageRoot = Utf8ToWide(Config.TargetSpecification.substr(hydration_impl::FileStorage::Prefix.length()));
if (m_StorageRoot.empty())
{
- throw zen::runtime_error("Hydration config 'file' type requires a directory path");
+ throw zen::runtime_error("Hydration config 'file' type requires a directory path"sv);
}
}
else
{
- CbObjectView Settings = Config.Options["settings"].AsObjectView();
- std::string_view Path = Settings["path"].AsString();
+ CbObjectView Settings = Config.Options["settings"sv].AsObjectView();
+ std::string_view Path = Settings["path"sv].AsString();
if (Path.empty())
{
- throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'");
+ throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"sv);
}
m_StorageRoot = Utf8ToWide(std::string(Path));
}
@@ -1273,14 +2266,14 @@ std::unique_ptr<HydrationStrategyBase>
FileHydration::CreateHydrator(const HydrationConfig& Config)
{
using namespace hydration_impl;
- return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId));
+ return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId), m_Excludes);
}
-S3Hydration::S3Hydration(const Configuration& Config)
+S3Hydration::S3Hydration(const Configuration& Config) : HydrationBase(Config)
{
using namespace hydration_impl;
- CbObjectView Settings = Config.Options["settings"].AsObjectView();
+ CbObjectView Settings = Config.Options["settings"sv].AsObjectView();
std::string_view Spec;
if (!Config.TargetSpecification.empty())
{
@@ -1289,10 +2282,10 @@ S3Hydration::S3Hydration(const Configuration& Config)
}
else
{
- std::string_view Uri = Settings["uri"].AsString();
+ std::string_view Uri = Settings["uri"sv].AsString();
if (Uri.empty())
{
- throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'");
+ throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"sv);
}
Spec = Uri;
Spec.remove_prefix(S3Storage::Prefix.size());
@@ -1304,10 +2297,10 @@ S3Hydration::S3Hydration(const Configuration& Config)
if (m_Bucket.empty())
{
- throw zen::runtime_error("Incremental S3 hydration config requires a bucket name");
+ throw zen::runtime_error("Incremental S3 hydration config requires a bucket name"sv);
}
- std::string Region = std::string(Settings["region"].AsString());
+ std::string Region = std::string(Settings["region"sv].AsString());
if (Region.empty())
{
Region = GetEnvVariable("AWS_DEFAULT_REGION");
@@ -1322,11 +2315,11 @@ S3Hydration::S3Hydration(const Configuration& Config)
}
m_Region = std::move(Region);
- std::string_view Endpoint = Settings["endpoint"].AsString();
+ std::string_view Endpoint = Settings["endpoint"sv].AsString();
if (!Endpoint.empty())
{
m_Endpoint = std::string(Endpoint);
- m_PathStyle = Settings["path-style"].AsBool();
+ m_PathStyle = Settings["path-style"sv].AsBool();
}
std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID");
@@ -1341,7 +2334,7 @@ S3Hydration::S3Hydration(const Configuration& Config)
m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN");
}
- m_DefaultMultipartChunkSize = Settings["chunksize"].AsUInt64(S3Storage::DefaultMultipartChunkSize);
+ m_DefaultMultipartChunkSize = Settings["chunksize"sv].AsUInt64(DefaultMultipartChunkSize);
S3ClientOptions ClientOptions;
ClientOptions.BucketName = m_Bucket;
@@ -1357,6 +2350,11 @@ S3Hydration::S3Hydration(const Configuration& Config)
ClientOptions.Credentials = m_Credentials;
}
ClientOptions.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u;
+ // Retry transient HTTP failures (429 throttle, 503 SlowDown, 5xx, connection errors) at the
+ // HTTP layer. CurlHttpClient::DoWithRetry uses 100*(Attempt+1) ms linear backoff between
+ // attempts. Three retries covers brief S3 rate-limit bursts without holding worker threads
+ // for long under sustained throttle.
+ ClientOptions.HttpSettings.RetryCount = 3;
m_Client = std::make_unique<S3Client>(ClientOptions);
}
@@ -1365,10 +2363,12 @@ std::unique_ptr<HydrationStrategyBase>
S3Hydration::CreateHydrator(const HydrationConfig& Config)
{
using namespace hydration_impl;
- std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}", m_KeyPrefixRoot, Config.ModuleId);
+ std::string KeyPrefix =
+ m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}"sv, m_KeyPrefixRoot, Config.ModuleId);
return std::make_unique<IncrementalHydrator>(
Config,
- std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize));
+ std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize),
+ m_Excludes);
}
std::unique_ptr<HydrationBase>
@@ -1386,10 +2386,10 @@ InitHydration(const HydrationBase::Configuration& Config)
{
return std::make_unique<S3Hydration>(Config);
}
- throw zen::runtime_error("Unknown hydration strategy: {}", Config.TargetSpecification);
+ throw zen::runtime_error("Unknown hydration strategy: {}"sv, Config.TargetSpecification);
}
- std::string_view Type = Config.Options["type"].AsString();
+ std::string_view Type = Config.Options["type"sv].AsString();
if (Type == FileStorage::Type)
{
return std::make_unique<FileHydration>(Config);
@@ -1400,9 +2400,9 @@ InitHydration(const HydrationBase::Configuration& Config)
}
if (!Type.empty())
{
- throw zen::runtime_error("Unknown hydration target type '{}'", Type);
+ throw zen::runtime_error("Unknown hydration target type '{}'"sv, Type);
}
- throw zen::runtime_error("No hydration target configured");
+ throw zen::runtime_error("No hydration target configured"sv);
}
#if ZEN_WITH_TESTS
@@ -1485,6 +2485,34 @@ namespace {
}
}
+ // Test fixture that centralizes the common scaffolding for file-backed hydration tests:
+ // a scratch temp dir containing the server state, the hydration store, and the hydration
+ // temp dir, plus an initialized FileHydration backend.
+ struct FileHarness
+ {
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ std::unique_ptr<HydrationBase> Hydration;
+
+ FileHarness()
+ {
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationStore);
+ CreateDirectories(HydrationTemp);
+ Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+ }
+
+ HydrationConfig MakeConfig(std::string_view ModuleId, HydrationConfig Overrides = {}) const
+ {
+ Overrides.ServerStateDir = ServerStateDir;
+ Overrides.TempDir = HydrationTemp;
+ Overrides.ModuleId = std::string(ModuleId);
+ return Overrides;
+ }
+ };
+
} // namespace
TEST_SUITE_BEGIN("server.hydration");
@@ -1495,140 +2523,386 @@ TEST_SUITE_BEGIN("server.hydration");
TEST_CASE("hydration.file.dehydrate_hydrate")
{
- ScopedTemporaryDirectory TempDir;
+ FileHarness H;
+ const auto TestFiles = CreateSmallTestTree(H.ServerStateDir);
+ constexpr auto ModuleId = "testmodule";
+ const auto Config = H.MakeConfig(ModuleId);
- std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
- std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
- std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
- CreateDirectories(ServerStateDir);
- CreateDirectories(HydrationStore);
- CreateDirectories(HydrationTemp);
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ CHECK(std::filesystem::exists(H.HydrationStore / ModuleId));
+ CHECK(std::filesystem::is_empty(H.ServerStateDir));
- const std::string ModuleId = "testmodule";
- auto TestFiles = CreateSmallTestTree(ServerStateDir);
-
- auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
- HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
+TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
+{
+ FileHarness H;
+ const auto TestFiles = CreateSmallTestTree(H.ServerStateDir);
+ const auto Config = H.MakeConfig("testmodule");
- // Dehydrate: copy server state to file store
- Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
- // Verify the module folder exists in the store and ServerStateDir was wiped
- CHECK(std::filesystem::exists(HydrationStore / ModuleId));
- CHECK(std::filesystem::is_empty(ServerStateDir));
+ // Stale file must be wiped by the rehydrate.
+ WriteFile(H.ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
+ H.Hydration->CreateHydrator(Config)->Hydrate();
- // Hydrate: restore server state from file store
- Hydration->CreateHydrator(Config)->Hydrate();
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "stale.bin"));
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
- // Verify restored contents match the original
- VerifyTree(ServerStateDir, TestFiles);
+TEST_CASE("hydration.file.excluded_files_not_dehydrated")
+{
+ FileHarness H;
+ const auto TestFiles = CreateSmallTestTree(H.ServerStateDir);
+
+ // Files matched by the built-in DefaultExcludes() set in hydration.cpp. Each must be
+ // skipped during dehydrate and not be recreated by hydrate.
+ CreateDirectories(H.ServerStateDir / "gc");
+ WriteFile(H.ServerStateDir / "gc" / "reserve.gc", CreateSemiRandomBlob(64));
+ CreateDirectories(H.ServerStateDir / ".sentry-native");
+ WriteFile(H.ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32));
+ WriteFile(H.ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128));
+ WriteFile(H.ServerStateDir / "state_marker", CreateSemiRandomBlob(16));
+ WriteFile(H.ServerStateDir / ".lock", CreateSemiRandomBlob(8));
+ WriteFile(H.ServerStateDir / "snapshot.bak", CreateSemiRandomBlob(48));
+ CreateDirectories(H.ServerStateDir / "auth");
+ WriteFile(H.ServerStateDir / "auth" / "authstate", CreateSemiRandomBlob(96));
+
+ const auto Config = H.MakeConfig("testmodule_excl");
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+
+ CleanDirectory(H.ServerStateDir, true);
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+
+ VerifyTree(H.ServerStateDir, TestFiles);
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "gc" / "reserve.gc"));
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / ".sentry-native"));
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "state_marker"));
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / ".lock"));
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "snapshot.bak"));
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "auth" / "authstate"));
}
-TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
+TEST_CASE("hydration.options.excludes_override")
{
+ // Explicit `excludes` replaces the built-in default list outright; `.lock` is not
+ // in the override list, so it must appear in the manifest. Use the Options-only
+ // path (type + settings.path) so the same Options object also carries the override
+ // `excludes` array.
ScopedTemporaryDirectory TempDir;
-
- std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
- std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
- std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ std::filesystem::path ServerStateDir = TempDir.Path() / "state";
+ std::filesystem::path HydrationStore = TempDir.Path() / "store";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "tmp";
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationStore);
CreateDirectories(HydrationTemp);
+ WriteFile(ServerStateDir / "regular.bin", CreateSemiRandomBlob(64));
+ WriteFile(ServerStateDir / ".lock", CreateSemiRandomBlob(8));
- auto TestFiles = CreateSmallTestTree(ServerStateDir);
-
- auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
-
- HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule"};
+ CbObjectWriter Options;
+ Options << "type"sv
+ << "file"sv;
+ Options.BeginObject("settings"sv);
+ {
+ Options << "path"sv << HydrationStore.generic_string();
+ }
+ Options.EndObject();
+ Options.BeginArray("excludes"sv);
+ {
+ Options << "*.tmp"sv;
+ }
+ Options.EndArray();
- Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ HydrationBase::Configuration HydrCfg{.Options = Options.Save()};
+ std::unique_ptr<HydrationBase> Hydration = InitHydration(HydrCfg);
+ HydrationConfig PerModuleCfg{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "excl_off"};
+ Hydration->CreateHydrator(PerModuleCfg)->Dehydrate(CbObject());
- // Put a stale file in ServerStateDir to simulate leftover state
- WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
+ const std::filesystem::path StateFile = HydrationStore / "excl_off" / "current-state.cbo";
+ REQUIRE(std::filesystem::exists(StateFile));
- // Hydrate - must wipe stale file and restore original
- Hydration->CreateHydrator(Config)->Hydrate();
+ FileContents Contents = ReadFile(StateFile);
+ REQUIRE(Contents);
+ IoBuffer Payload = Contents.Flatten();
+ CbValidateError Err;
+ CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Payload), Err);
+ REQUIRE_EQ(Err, CbValidateError::None);
- CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin"));
- VerifyTree(ServerStateDir, TestFiles);
+ bool HasLock = false;
+ for (CbFieldView F : Meta["Files"sv])
+ {
+ if (F.AsObjectView()["Path"sv].AsString() == ".lock")
+ {
+ HasLock = true;
+ break;
+ }
+ }
+ CHECK(HasLock);
}
-TEST_CASE("hydration.file.excluded_files_not_dehydrated")
+// ---------------------------------------------------------------------------
+// FileHydrator obliterate test
+// ---------------------------------------------------------------------------
+
+TEST_CASE("hydration.file.obliterate")
{
- ScopedTemporaryDirectory TempDir;
+ FileHarness H;
+ constexpr std::string_view ModuleId = "obliterate_test"sv;
+ CreateSmallTestTree(H.ServerStateDir);
+ const auto Config = H.MakeConfig(ModuleId);
- std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
- std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
- std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
- CreateDirectories(ServerStateDir);
- CreateDirectories(HydrationStore);
- CreateDirectories(HydrationTemp);
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ CHECK(std::filesystem::exists(H.HydrationStore / ModuleId));
- auto TestFiles = CreateSmallTestTree(ServerStateDir);
+ // Put files back in ServerStateDir + TempDir to verify cleanup.
+ CreateSmallTestTree(H.ServerStateDir);
+ WriteFile(H.HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
- // Add files that the dehydrator should skip
- WriteFile(ServerStateDir / "reserve.gc", CreateSemiRandomBlob(64));
- CreateDirectories(ServerStateDir / ".sentry-native");
- WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32));
- WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128));
+ H.Hydration->CreateHydrator(Config)->Obliterate();
- auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+ CHECK_FALSE(std::filesystem::exists(H.HydrationStore / ModuleId));
+ CHECK(std::filesystem::is_empty(H.ServerStateDir));
+ CHECK(std::filesystem::is_empty(H.HydrationTemp));
+}
- HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule_excl"};
+// ---------------------------------------------------------------------------
+// Pack tests - exercise small-file packing, unpacking, and fallback paths.
+// ---------------------------------------------------------------------------
- Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+TEST_CASE("hydration.file.pack_roundtrip")
+{
+ // CreateSmallTestTree produces 6 files all < 2 KB -> single pack with every file in it.
+ FileHarness H;
+ const auto TestFiles = CreateSmallTestTree(H.ServerStateDir);
+ const auto Config = H.MakeConfig("pack_roundtrip");
+
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ CHECK(std::filesystem::is_empty(H.ServerStateDir));
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
- // Hydrate into a clean directory
- CleanDirectory(ServerStateDir, true);
- Hydration->CreateHydrator(Config)->Hydrate();
+TEST_CASE("hydration.file.pack_disabled_fallback")
+{
+ // PackEnabled=false -> every file is a standalone CAS entry regardless of size.
+ FileHarness H;
+ const auto TestFiles = CreateSmallTestTree(H.ServerStateDir);
+ const auto Config = H.MakeConfig("pack_disabled", HydrationConfig{.PackEnabled = false});
+
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
- // Normal files must be restored
- VerifyTree(ServerStateDir, TestFiles);
- // Excluded files must NOT be restored
- CHECK_FALSE(std::filesystem::exists(ServerStateDir / "reserve.gc"));
- CHECK_FALSE(std::filesystem::exists(ServerStateDir / ".sentry-native"));
+TEST_CASE("hydration.file.pack_one_unique_fallback")
+{
+ // Only 1 unique small-file candidate -> no pack (min 2 entries); falls back to standalone.
+ FileHarness H;
+
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles;
+ IoBuffer Small = CreateSemiRandomBlob(128);
+ WriteFile(H.ServerStateDir / "tiny.bin", Small);
+ TestFiles.emplace_back("tiny.bin", std::move(Small));
+
+ IoBuffer Big = CreateSemiRandomBlob(8192);
+ WriteFile(H.ServerStateDir / "big.bin", Big);
+ TestFiles.emplace_back("big.bin", std::move(Big));
+
+ const auto Config = H.MakeConfig("pack_one_unique");
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
}
-// ---------------------------------------------------------------------------
-// FileHydrator obliterate test
-// ---------------------------------------------------------------------------
+TEST_CASE("hydration.file.pack_duplicate_hashes")
+{
+ // 10 files share one hash + 1 distinct file -> pack has 2 unique entries; hydrate writes
+ // all 11 destinations correctly.
+ FileHarness H;
+
+ IoBuffer Shared = CreateSemiRandomBlob(256);
+ IoBuffer Other = CreateSemiRandomBlob(256);
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles;
+ for (int I = 0; I < 10; ++I)
+ {
+ std::filesystem::path Rel = fmt::format("dup_{:02d}.bin"sv, I);
+ WriteFile(H.ServerStateDir / Rel, Shared);
+ TestFiles.emplace_back(Rel, Shared);
+ }
+ WriteFile(H.ServerStateDir / "other.bin", Other);
+ TestFiles.emplace_back("other.bin", Other);
+
+ const auto Config = H.MakeConfig("pack_duplicates");
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
-TEST_CASE("hydration.file.obliterate")
+TEST_CASE("hydration.file.pack_large_dataset")
{
- ScopedTemporaryDirectory TempDir;
+ // Mix of many small files + a few large ones, with a modest MaxPackBytes
+ // to force bin-packing into multiple packs. Verifies ordering, splitting, and the
+ // interaction between packed and standalone uploads.
+ FileHarness H;
+
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles;
+ constexpr int kSmallCount = 100;
+ constexpr int kLargeCount = 3;
+
+ // Varied small-file sizes (256-2048 B) avoid artificial uniformity in the bin-pack.
+ FastRandom Rand{.Seed = 0xcafebabe};
+ for (int I = 0; I < kSmallCount; ++I)
+ {
+ uint64_t Size = 256 + (Rand.Next() % 1793); // [256, 2048]
+ IoBuffer Blob = CreateSemiRandomBlob(Rand, Size);
+ auto Rel = std::filesystem::path(fmt::format("small/group{}/file_{:04d}.bin"sv, I / 25, I));
+ CreateDirectories((H.ServerStateDir / Rel).parent_path());
+ WriteFile(H.ServerStateDir / Rel, Blob);
+ TestFiles.emplace_back(std::move(Rel), std::move(Blob));
+ }
+ for (int I = 0; I < kLargeCount; ++I)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(Rand, 32 * 1024 + I * 4096);
+ auto Rel = std::filesystem::path(fmt::format("large/file_{:02d}.bulk"sv, I));
+ CreateDirectories((H.ServerStateDir / Rel).parent_path());
+ WriteFile(H.ServerStateDir / Rel, Blob);
+ TestFiles.emplace_back(std::move(Rel), std::move(Blob));
+ }
+
+ // Cap each pack at ~32 KB -> 100 small files (~115 KB raw) split across ~4 packs.
+ const auto Config = H.MakeConfig("pack_large", HydrationConfig{.MaxPackBytes = 32 * 1024});
+
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ CHECK(std::filesystem::is_empty(H.ServerStateDir));
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
- std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
- std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
- std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
- CreateDirectories(ServerStateDir);
- CreateDirectories(HydrationStore);
- CreateDirectories(HydrationTemp);
+TEST_CASE("hydration.file.pack_hash_determinism")
+{
+ // Two independent dehydrate runs over the same content must produce byte-identical state
+ // files (and therefore identical pack hashes). This is what keeps ExistsLookup dedup
+ // working across redeploys.
+ FileHarness H;
+
+ FastRandom Rand{.Seed = 0x12345678};
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> Files;
+ for (int I = 0; I < 40; ++I)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(Rand, 256 + (I % 7) * 200);
+ auto Rel = std::filesystem::path(fmt::format("tree/leaf_{:02d}.dat"sv, I));
+ CreateDirectories((H.ServerStateDir / Rel).parent_path());
+ WriteFile(H.ServerStateDir / Rel, Blob);
+ Files.emplace_back(std::move(Rel), std::move(Blob));
+ }
+
+ const auto Config = H.MakeConfig("pack_determinism");
+ const std::filesystem::path StateFile = H.HydrationStore / "pack_determinism" / "current-state.cbo";
+
+ // Extract the ordered pack-hash list from state.cbo. Timestamp / duration fields vary
+ // across runs so byte-identity is not achievable; the pack identities are.
+ auto ReadPackHashes = [&]() -> std::vector<IoHash> {
+ FileContents Contents = ReadFile(StateFile);
+ REQUIRE(Contents);
+ IoBuffer Payload = Contents.Flatten();
+ CbValidateError Err;
+ CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Payload), Err);
+ REQUIRE_EQ(Err, CbValidateError::None);
+ std::vector<IoHash> Hashes;
+ for (CbFieldView F : Meta["Packs"sv])
+ {
+ Hashes.push_back(F.AsObjectView()["Hash"sv].AsHash());
+ }
+ return Hashes;
+ };
- const std::string ModuleId = "obliterate_test";
- CreateSmallTestTree(ServerStateDir);
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ std::vector<IoHash> First = ReadPackHashes();
+ REQUIRE_FALSE(First.empty());
- auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+ // Rehydrate so the tree is back on disk, then dehydrate again with a fresh hydrator.
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, Files);
- HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
+ auto HydrationB = InitHydration({.TargetSpecification = "file://" + H.HydrationStore.string()});
+ HydrationB->CreateHydrator(Config)->Dehydrate(CbObject());
+ std::vector<IoHash> Second = ReadPackHashes();
- // Dehydrate so the backend store has data
- Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
- CHECK(std::filesystem::exists(HydrationStore / ModuleId));
+ REQUIRE_EQ(First.size(), Second.size());
+ for (size_t I = 0; I < First.size(); ++I)
+ {
+ CHECK_EQ(First[I], Second[I]);
+ }
+}
- // Put some files back in ServerStateDir and TempDir to verify cleanup
- CreateSmallTestTree(ServerStateDir);
- WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
+TEST_CASE("hydration.file.pack_backward_compat_read")
+{
+ // Hand-craft a state.cbo without any Packs[] / PackHash fields (old format). Hydrate must
+ // treat every file as standalone and roundtrip successfully.
+ FileHarness H;
+ const auto TestFiles = CreateSmallTestTree(H.ServerStateDir);
+ const auto Config = H.MakeConfig("pack_oldformat", HydrationConfig{.PackEnabled = false});
+
+ // Dehydrate with PackEnabled=false -> state.cbo has no Packs[] and no PackHash fields.
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ CHECK(std::filesystem::is_empty(H.ServerStateDir));
+
+ // Hydrate with PackEnabled=true -> the hydrator must still handle the old-format state.
+ const auto NewConfig = H.MakeConfig("pack_oldformat");
+ H.Hydration->CreateHydrator(NewConfig)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
- // Obliterate
- Hydration->CreateHydrator(Config)->Obliterate();
+// ---------------------------------------------------------------------------
+// CreateParentDirectories helper test
+// ---------------------------------------------------------------------------
- // Backend store directory deleted
- CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId));
- // ServerStateDir cleaned
- CHECK(std::filesystem::is_empty(ServerStateDir));
- // TempDir cleaned
- CHECK(std::filesystem::is_empty(HydrationTemp));
+TEST_CASE("hydration.createparentdirectories")
+{
+ ScopedTemporaryDirectory TempDir;
+ const std::filesystem::path Root = TempDir.Path();
+
+ // Edge: empty input.
+ CHECK_EQ(hydration_impl::CreateParentDirectories({}), 0u);
+
+ // Edge: bare filename has no parent_path() -> contributes nothing.
+ std::vector<std::filesystem::path> Bare{"bare.bin"};
+ CHECK_EQ(hydration_impl::CreateParentDirectories(Bare), 0u);
+
+ // Edge: single input. Triggers the Dirs.size() == 1 path that bypasses the prune loop.
+ const std::filesystem::path SingleRoot = Root / "single";
+ std::vector<std::filesystem::path> Single{SingleRoot / "only" / "a.bin"};
+ CHECK_EQ(hydration_impl::CreateParentDirectories(Single), 1u);
+ CHECK(std::filesystem::is_directory(SingleRoot / "only"));
+
+ // Edge: pre-existing dirs must not raise; count still reflects leaf set.
+ const std::filesystem::path PreRoot = Root / "preexisting";
+ CreateDirectories(PreRoot / "pre" / "made");
+ std::vector<std::filesystem::path> Pre{PreRoot / "pre" / "made" / "f.bin"};
+ CHECK_EQ(hydration_impl::CreateParentDirectories(Pre), 1u);
+ CHECK(std::filesystem::is_directory(PreRoot / "pre" / "made"));
+
+ // Generic: ancestor-chain pruning, parent dedup across files in same dir, disjoint
+ // siblings (cannot prune each other), nested-vs-flat coexistence. Expected leaves:
+ // deep/nest/leaf, deep/sibling, flat, lone.
+ const std::filesystem::path MixRoot = Root / "mixed";
+ std::vector<std::filesystem::path> Mix{MixRoot / "deep" / "nest" / "leaf" / "x.bin",
+ MixRoot / "deep" / "nest" / "leaf" / "y.bin", // shares parent with x
+ MixRoot / "deep" / "nest" / "g.bin", // ancestor of leaf -> pruned
+ MixRoot / "deep" / "h.bin", // ancestor of nest -> pruned
+ MixRoot / "deep" / "sibling" / "i.bin", // sibling of nest -> kept
+ MixRoot / "flat" / "j.bin", // top-level sibling -> kept
+ MixRoot / "lone" / "k.bin"}; // disjoint -> kept
+ CHECK_EQ(hydration_impl::CreateParentDirectories(Mix), 4u);
+ CHECK(std::filesystem::is_directory(MixRoot / "deep" / "nest" / "leaf"));
+ CHECK(std::filesystem::is_directory(MixRoot / "deep" / "sibling"));
+ CHECK(std::filesystem::is_directory(MixRoot / "flat"));
+ CHECK(std::filesystem::is_directory(MixRoot / "lone"));
+ // Pruned ancestors still exist via CreateDirectories recursion.
+ CHECK(std::filesystem::is_directory(MixRoot / "deep" / "nest"));
+ CHECK(std::filesystem::is_directory(MixRoot / "deep"));
}
// ---------------------------------------------------------------------------
@@ -1658,7 +2932,7 @@ TEST_CASE("hydration.file.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- std::string ModuleId = fmt::format("file_concurrent_{}", I);
+ std::string ModuleId = fmt::format("file_concurrent_{}"sv, I);
std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state";
std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp";
CreateDirectories(StateDir);
@@ -1817,7 +3091,7 @@ TEST_CASE("hydration.s3.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- std::string ModuleId = fmt::format("s3_concurrent_{}", I);
+ std::string ModuleId = fmt::format("s3_concurrent_{}"sv, I);
std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state";
std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp";
CreateDirectories(StateDir);
@@ -1890,7 +3164,7 @@ TEST_CASE("hydration.s3.obliterate")
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- const std::string ModuleId = "s3test_obliterate";
+ constexpr std::string_view ModuleId = "s3test_obliterate"sv;
HydrationBase::Configuration BaseConfig;
{
@@ -1904,7 +3178,7 @@ TEST_CASE("hydration.s3.obliterate")
}
auto Hydration = InitHydration(BaseConfig);
- HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = std::string(ModuleId)};
// Dehydrate to populate backend
CreateSmallTestTree(ServerStateDir);
@@ -1918,7 +3192,7 @@ TEST_CASE("hydration.s3.obliterate")
Opts.Credentials.AccessKeyId = Minio.RootUser();
Opts.Credentials.SecretAccessKey = Minio.RootPassword();
S3Client Client(Opts);
- return Client.ListObjects(ModuleId + "/");
+ return Client.ListObjects(fmt::format("{}/"sv, ModuleId));
};
// Verify objects exist in S3
@@ -2034,7 +3308,7 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip())
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- const std::string ModuleId = "s3test_performance";
+ constexpr std::string_view ModuleId = "s3test_performance"sv;
CopyTree("E:\\Dev\\hub\\brainrot\\20260402-225355-508", ServerStateDir, {.EnableClone = true});
// auto TestFiles = CreateTestTree(ServerStateDir);
@@ -2054,7 +3328,7 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip())
HydrationConfig Config{.ServerStateDir = ServerStateDir,
.TempDir = HydrationTemp,
- .ModuleId = ModuleId,
+ .ModuleId = std::string(ModuleId),
.Threading = Threading.Options};
// Dehydrate: upload server state to MinIO
@@ -2091,7 +3365,7 @@ TEST_CASE("hydration.file.incremental")
CreateDirectories(HydrationStore);
CreateDirectories(HydrationTemp);
- const std::string ModuleId = "testmodule";
+ constexpr std::string_view ModuleId = "testmodule"sv;
// auto TestFiles = CreateTestTree(ServerStateDir);
TestThreading Threading(4);
@@ -2100,7 +3374,7 @@ TEST_CASE("hydration.file.incremental")
HydrationConfig Config{.ServerStateDir = ServerStateDir,
.TempDir = HydrationTemp,
- .ModuleId = ModuleId,
+ .ModuleId = std::string(ModuleId),
.Threading = Threading.Options};
// Hydrate with no prior state
@@ -2170,7 +3444,7 @@ TEST_CASE("hydration.s3.incremental")
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- const std::string ModuleId = "s3test_incremental";
+ constexpr std::string_view ModuleId = "s3test_incremental"sv;
TestThreading Threading(8);
@@ -2188,7 +3462,7 @@ TEST_CASE("hydration.s3.incremental")
HydrationConfig Config{.ServerStateDir = ServerStateDir,
.TempDir = HydrationTemp,
- .ModuleId = ModuleId,
+ .ModuleId = std::string(ModuleId),
.Threading = Threading.Options};
// Hydrate with no prior state
diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h
index 0455dda91..d9a3dda5b 100644
--- a/src/zenserver/hub/hydration.h
+++ b/src/zenserver/hub/hydration.h
@@ -2,6 +2,8 @@
#pragma once
+#include "hydrationdefaults.h"
+
#include <zencore/compactbinary.h>
#include <atomic>
@@ -9,6 +11,7 @@
#include <memory>
#include <optional>
#include <string>
+#include <vector>
namespace zen {
@@ -32,6 +35,20 @@ struct HydrationConfig
// External threading for parallel I/O and hashing. If not set, work runs inline on the caller's thread.
std::optional<ThreadingOptions> Threading;
+
+ // When true, small files are concatenated into pack blobs during Dehydrate and sliced
+ // back out during Hydrate. Pack contents are stored raw (no compression); the CAS key
+ // of a pack is the hash of its concatenated raw bytes. Dramatically reduces request
+ // count for modules dominated by tiny metadata files.
+ bool PackEnabled = true;
+
+ // Files strictly smaller than this are candidates for packing.
+ uint64_t PackThresholdBytes = DefaultPackThresholdBytes;
+
+ // Upper bound on the concatenation size of a single pack. Candidates are bin-packed
+ // greedily; a pack that would exceed this is closed and a new one is started. A single
+ // unique candidate larger than this falls back to standalone upload.
+ uint64_t MaxPackBytes = DefaultMaxPackBytes;
};
/**
@@ -46,14 +63,23 @@ struct HydrationStrategyBase
virtual ~HydrationStrategyBase() = default;
// Upload server state to the configured target. ServerStateDir is wiped on success.
- // On failure, ServerStateDir is left intact.
+ // On failure, ServerStateDir is left intact and the failure is logged at WARN; no
+ // exception propagates to the caller. Callers that need to distinguish success from
+ // failure must inspect the log stream or observe downstream effects (e.g. presence of
+ // the metadata file on the backend) - success is not signalled through the API.
virtual void Dehydrate(const CbObject& CachedState) = 0;
- // Download state from the configured target into ServerStateDir. Returns cached state for the next Dehydrate.
- // On failure, ServerStateDir is wiped and an empty CbObject is returned.
+ // Download state from the configured target into ServerStateDir. Returns cached state
+ // for the next Dehydrate. On failure, ServerStateDir is wiped, the failure is logged,
+ // and an empty CbObject is returned (a no-op cache). Callers can check the result for
+ // emptiness as a failure indicator.
virtual CbObject Hydrate() = 0;
- // Delete all stored data for this module from the configured backend, then clean ServerStateDir and TempDir.
+ // Delete all stored data for this module from the configured backend, then clean
+ // ServerStateDir and TempDir. Backend-delete failures are retried once; if the retry
+ // also fails, local cleanup proceeds regardless and the failure is logged at WARN.
+ // Because backend deletion is best-effort, a return from Obliterate does not guarantee
+ // backend data is gone.
virtual void Obliterate() = 0;
};
@@ -73,14 +99,23 @@ public:
{
// Back-end specific target specification (e.g. "s3://bucket/prefix", "file:///path")
std::string TargetSpecification;
- // Full config object (mutually exclusive with TargetSpecification)
+ // Full config object (mutually exclusive with TargetSpecification). Backend-specific
+ // settings (e.g. S3 "chunksize") live inside Options["settings"]. The common
+ // `excludes` entry is parsed once by HydrationBase and shared across modules.
CbObject Options;
};
+ // Parses common Options entries (`excludes`) into m_Excludes, applying built-in
+ // defaults when the field is absent. Field present-but-empty `[]` is honored as an
+ // explicit override (no defaults applied).
+ explicit HydrationBase(const Configuration& Config);
virtual ~HydrationBase() = default;
// Create a configured per-module hydrator, ready to call Hydrate/Dehydrate/Obliterate.
virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) = 0;
+
+protected:
+ std::vector<std::string> m_Excludes;
};
// Factory: parses Config and returns the concrete backend (FileHydration or S3Hydration).
diff --git a/src/zenserver/hub/hydrationdefaults.h b/src/zenserver/hub/hydrationdefaults.h
new file mode 100644
index 000000000..8d9fb6d41
--- /dev/null
+++ b/src/zenserver/hub/hydrationdefaults.h
@@ -0,0 +1,26 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <cstdint>
+
+namespace zen {
+
+// state.cbo schema version. Bump on any incompatible change to the manifest layout.
+// Hydrate refuses to read a manifest with a higher version than this. Manifests written
+// by older binaries omit the field; treated as version 0 and decoded best-effort
+// (missing optional fields like Packs[] / StorageSettings fall back to defaults).
+inline constexpr uint32_t HydrationSchemaVersion = 1;
+
+// Multipart chunk size used by the S3 backend when hydrating/dehydrating large files.
+// Dehydrate writes the chosen size into state.cbo so hydrate replays with the same
+// partitioning. State records without the field fall back to this default.
+inline constexpr uint64_t DefaultMultipartChunkSize = 64u * 1024u * 1024u;
+
+// Files strictly smaller than this are candidates for packing.
+inline constexpr uint64_t DefaultPackThresholdBytes = 256u * 1024u;
+
+// Upper bound on the concatenation size of a single pack. Packs are stored raw (no compression).
+inline constexpr uint64_t DefaultMaxPackBytes = 4u * 1024u * 1024u;
+
+} // namespace zen
diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp
index 9d477fb10..8d36e6a46 100644
--- a/src/zenserver/hub/storageserverinstance.cpp
+++ b/src/zenserver/hub/storageserverinstance.cpp
@@ -10,6 +10,7 @@
#include <zencore/logging.h>
#include <zencore/string.h>
#include <zencore/timer.h>
+#include <zencore/trace.h>
namespace zen {
@@ -31,6 +32,7 @@ StorageServerInstance::~StorageServerInstance()
void
StorageServerInstance::SpawnServerProcess()
{
+ ZEN_TRACE_CPU("StorageServerInstance::SpawnServerProcess");
Stopwatch SpawnTimer;
ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId);
@@ -103,6 +105,7 @@ StorageServerInstance::ShutdownServerProcess()
{
return;
}
+ ZEN_TRACE_CPU("StorageServerInstance::ShutdownServerProcess");
Stopwatch ShutdownTimer;
// m_ServerInstance.Shutdown() never throws.
m_ServerInstance.Shutdown();
@@ -131,6 +134,7 @@ StorageServerInstance::ProvisionLocked()
return;
}
+ ZEN_TRACE_CPU("StorageServerInstance::ProvisionLocked");
ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_Config.StateDir);
try
{
@@ -150,6 +154,7 @@ StorageServerInstance::ProvisionLocked()
void
StorageServerInstance::DeprovisionLocked()
{
+ ZEN_TRACE_CPU("StorageServerInstance::DeprovisionLocked");
ShutdownServerProcess();
// Crashed or Hibernated: process already dead; skip Shutdown.
@@ -169,6 +174,7 @@ StorageServerInstance::DeprovisionLocked()
void
StorageServerInstance::ObliterateLocked()
{
+ ZEN_TRACE_CPU("StorageServerInstance::ObliterateLocked");
ShutdownServerProcess();
std::atomic<bool> AbortFlag{false};
@@ -181,6 +187,7 @@ StorageServerInstance::ObliterateLocked()
void
StorageServerInstance::HibernateLocked()
{
+ ZEN_TRACE_CPU("StorageServerInstance::HibernateLocked");
// Signal server to shut down, but keep data around for later wake
ShutdownServerProcess();
}
@@ -195,6 +202,8 @@ StorageServerInstance::WakeLocked()
return;
}
+ ZEN_TRACE_CPU("StorageServerInstance::WakeLocked");
+
try
{
SpawnServerProcess();
@@ -212,6 +221,12 @@ StorageServerInstance::WakeLocked()
void
StorageServerInstance::Hydrate()
{
+ if (!m_Config.EnableHydration)
+ {
+ ZEN_INFO("Hydration disabled; skipping hydrate for module '{}'", m_ModuleId);
+ return;
+ }
+ ZEN_TRACE_CPU("StorageServerInstance::Hydrate");
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
@@ -222,6 +237,12 @@ StorageServerInstance::Hydrate()
void
StorageServerInstance::Dehydrate()
{
+ if (!m_Config.EnableDehydration)
+ {
+ ZEN_INFO("Dehydration disabled; skipping dehydrate for module '{}'", m_ModuleId);
+ return;
+ }
+ ZEN_TRACE_CPU("StorageServerInstance::Dehydrate");
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
@@ -238,6 +259,9 @@ StorageServerInstance::MakeHydrationConfig(std::atomic<bool>& AbortFlag, std::at
Config.Threading.emplace(
HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalWorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag});
}
+ Config.PackEnabled = m_Config.HydrationPackEnabled;
+ Config.PackThresholdBytes = m_Config.HydrationPackThresholdBytes;
+ Config.MaxPackBytes = m_Config.HydrationMaxPackBytes;
return Config;
}
diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h
index 21ac1ada3..a2f376a23 100644
--- a/src/zenserver/hub/storageserverinstance.h
+++ b/src/zenserver/hub/storageserverinstance.h
@@ -36,6 +36,11 @@ public:
std::string Trace;
std::string TraceHost;
std::string TraceFile;
+ bool EnableHydration = true;
+ bool EnableDehydration = true;
+ bool HydrationPackEnabled = true;
+ uint64_t HydrationPackThresholdBytes = DefaultPackThresholdBytes;
+ uint64_t HydrationMaxPackBytes = DefaultMaxPackBytes;
WorkerThreadPool* OptionalWorkerPool = nullptr;
};
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index ebc2cf2f1..27c1c9fc4 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -235,6 +235,44 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("hub",
"",
+ "hub-enable-hydration",
+ "Load instance state from the hydration target on provision (default true)",
+ cxxopts::value<bool>(m_ServerOptions.HubEnableHydration)->default_value("true"),
+ "");
+
+ Options.add_option("hub",
+ "",
+ "hub-enable-dehydration",
+ "Save instance state to the hydration target on deprovision (default true)",
+ cxxopts::value<bool>(m_ServerOptions.HubEnableDehydration)->default_value("true"),
+ "");
+
+ Options.add_option(
+ "hub",
+ "",
+ "hub-hydration-enable-pack",
+ "Concatenate small files into raw CAS pack blobs during dehydrate (default true; see --hub-hydration-pack-threshold-bytes)",
+ cxxopts::value<bool>(m_ServerOptions.HubHydrationPackEnabled)->default_value("true"),
+ "");
+
+ Options.add_option("hub",
+ "",
+ "hub-hydration-pack-threshold-bytes",
+ fmt::format("Files strictly smaller than this are pack candidates (default {})", DefaultPackThresholdBytes),
+ cxxopts::value<uint64_t>(m_ServerOptions.HubHydrationPackThresholdBytes)
+ ->default_value(fmt::format("{}", DefaultPackThresholdBytes)),
+ "<bytes>");
+
+ Options.add_option(
+ "hub",
+ "",
+ "hub-hydration-max-pack-bytes",
+ fmt::format("Upper bound on a pack's concatenation size (default {})", DefaultMaxPackBytes),
+ cxxopts::value<uint64_t>(m_ServerOptions.HubHydrationMaxPackBytes)->default_value(fmt::format("{}", DefaultMaxPackBytes)),
+ "<bytes>");
+
+ Options.add_option("hub",
+ "",
"hub-watchdog-cycle-interval-ms",
"Interval between watchdog cycles in milliseconds",
cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleIntervalMs)->default_value("3000"),
@@ -367,6 +405,13 @@ ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options)
Options.AddOption("hub.hydration.targetspec"sv, m_ServerOptions.HydrationTargetSpecification, "hub-hydration-target-spec"sv);
Options.AddOption("hub.hydration.targetconfig"sv, m_ServerOptions.HydrationTargetConfigPath, "hub-hydration-target-config"sv);
Options.AddOption("hub.hydration.threads"sv, m_ServerOptions.HubHydrationThreadCount, "hub-hydration-threads"sv);
+ Options.AddOption("hub.enablehydration"sv, m_ServerOptions.HubEnableHydration, "hub-enable-hydration"sv);
+ Options.AddOption("hub.enabledehydration"sv, m_ServerOptions.HubEnableDehydration, "hub-enable-dehydration"sv);
+ Options.AddOption("hub.hydration.enablepack"sv, m_ServerOptions.HubHydrationPackEnabled, "hub-hydration-enable-pack"sv);
+ Options.AddOption("hub.hydration.packthresholdbytes"sv,
+ m_ServerOptions.HubHydrationPackThresholdBytes,
+ "hub-hydration-pack-threshold-bytes"sv);
+ Options.AddOption("hub.hydration.maxpackbytes"sv, m_ServerOptions.HubHydrationMaxPackBytes, "hub-hydration-max-pack-bytes"sv);
Options.AddOption("hub.watchdog.cycleintervalms"sv, m_ServerOptions.WatchdogConfig.CycleIntervalMs, "hub-watchdog-cycle-interval-ms"sv);
Options.AddOption("hub.watchdog.cycleprocessingbudgetms"sv,
@@ -453,7 +498,7 @@ ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId,
HubInstanceState PreviousState,
HubInstanceState NewState)
{
- ZEN_UNUSED(PreviousState);
+ ZEN_INFO("Module '{}' changed state from {} to {}", ModuleId, zen::ToString(PreviousState), zen::ToString(NewState));
if (NewState == HubInstanceState::Deprovisioning || NewState == HubInstanceState::Hibernating)
{
@@ -661,6 +706,11 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
.InstanceTraceFile = ServerConfig.HubInstanceTraceFile,
.InstanceConfigPath = ServerConfig.HubInstanceConfigPath,
.HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification,
+ .EnableHydration = ServerConfig.HubEnableHydration,
+ .EnableDehydration = ServerConfig.HubEnableDehydration,
+ .HydrationPackEnabled = ServerConfig.HubHydrationPackEnabled,
+ .HydrationPackThresholdBytes = ServerConfig.HubHydrationPackThresholdBytes,
+ .HydrationMaxPackBytes = ServerConfig.HubHydrationMaxPackBytes,
.WatchDog =
{
.CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs),
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
index 5e465bb14..6416792a6 100644
--- a/src/zenserver/hub/zenhubserver.h
+++ b/src/zenserver/hub/zenhubserver.h
@@ -3,6 +3,7 @@
#pragma once
#include "hubinstancestate.h"
+#include "hydrationdefaults.h"
#include "resourcemetrics.h"
#include "zenserver.h"
@@ -47,7 +48,12 @@ struct ZenHubServerConfig : public ZenServerConfig
uint16_t HubBasePortNumber = 21000;
int HubInstanceLimit = 1000;
bool HubUseJobObject = true;
- std::string HubInstanceHttpClass = "asio";
+ bool HubEnableHydration = true; // Load instance state from hydration target on provision
+ bool HubEnableDehydration = true; // Save instance state to hydration target on deprovision
+ bool HubHydrationPackEnabled = true; // Concatenate small files into raw CAS pack blobs during dehydrate
+ uint64_t HubHydrationPackThresholdBytes = DefaultPackThresholdBytes; // Files strictly smaller than this are pack candidates
+ uint64_t HubHydrationMaxPackBytes = DefaultMaxPackBytes; // Upper bound on a pack's concatenation size
+ std::string HubInstanceHttpClass = "asio";
std::string HubInstanceMalloc;
std::string HubInstanceTrace;
std::string HubInstanceTraceHost;
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp
index f8bed92da..ab80cfcc7 100644
--- a/src/zenutil/cloud/s3client.cpp
+++ b/src/zenutil/cloud/s3client.cpp
@@ -148,11 +148,31 @@ namespace {
return true;
}
+ /// True if the response indicates S3 throttling (503 SlowDown / ServiceUnavailable / 429).
+ /// Code is checked on both the HTTP status and the XML error code so we catch proxies that
+ /// return 200 with a SlowDown body.
+ bool IsS3Throttled(const HttpClient::Response& Response, std::string_view ErrorCode)
+ {
+ const int Status = static_cast<int>(Response.StatusCode);
+ if (Status == 503 || Status == 429)
+ {
+ return true;
+ }
+ if (ErrorCode == "SlowDown" || ErrorCode == "ServiceUnavailable" || ErrorCode == "ThrottlingException" ||
+ ErrorCode == "RequestLimitExceeded" || ErrorCode == "TooManyRequests")
+ {
+ return true;
+ }
+ return false;
+ }
+
/// Build a human-readable error message for a failed S3 response. When the response body
/// contains an S3 `<Error>` element, the Code and Message fields are included in the string
/// so transient 4xx/5xx failures (SignatureDoesNotMatch, AuthorizationHeaderMalformed, etc.)
/// show up in logs instead of being swallowed. Falls back to the generic HTTP/transport
/// message when no XML body is available (HEAD responses, transport errors).
+ /// Also emits a distinct `S3 THROTTLED` warning when the response indicates throttling so
+ /// callers can grep for it without parsing combined error text.
std::string S3ErrorMessage(std::string_view Prefix, const HttpClient::Response& Response)
{
if (!Response.Error.has_value() && Response.ResponsePayload)
@@ -164,9 +184,21 @@ namespace {
{
ExtendableStringBuilder<256> Decoded;
DecodeXmlEntities(Message, Decoded);
+ if (IsS3Throttled(Response, Code))
+ {
+ ZEN_WARN("S3 THROTTLED [{}] status={} code='{}' message='{}'",
+ Prefix,
+ static_cast<int>(Response.StatusCode),
+ Code,
+ Decoded.ToView());
+ }
return fmt::format("{}: HTTP status ({}) {} - {}", Prefix, static_cast<int>(Response.StatusCode), Code, Decoded.ToView());
}
}
+ if (IsS3Throttled(Response, {}))
+ {
+ ZEN_WARN("S3 THROTTLED [{}] status={} (no XML body)", Prefix, static_cast<int>(Response.StatusCode));
+ }
return Response.ErrorMessage(Prefix);
}