1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include "hydrationdefaults.h"
#include <zencore/compactbinary.h>
#include <atomic>
#include <filesystem>
#include <memory>
#include <optional>
#include <string>
#include <vector>
namespace zen {
class WorkerThreadPool;
struct HydrationConfig
{
// Location of server state to hydrate/dehydrate
std::filesystem::path ServerStateDir;
// Temporary directory available for use during hydration/dehydration
std::filesystem::path TempDir;
// Module ID of the server state being hydrated/dehydrated
std::string ModuleId;
struct ThreadingOptions
{
WorkerThreadPool* WorkerPool = nullptr;
std::atomic<bool>* AbortFlag = nullptr;
std::atomic<bool>* PauseFlag = nullptr;
};
// External threading for parallel I/O and hashing. If not set, work runs inline on the caller's thread.
std::optional<ThreadingOptions> Threading;
// When true, small files are concatenated into pack blobs during Dehydrate and sliced
// back out during Hydrate. Pack contents are stored raw (no compression); the CAS key
// of a pack is the hash of its concatenated raw bytes. Dramatically reduces request
// count for modules dominated by tiny metadata files.
bool PackEnabled = true;
// Files strictly smaller than this are candidates for packing.
uint64_t PackThresholdBytes = DefaultPackThresholdBytes;
// Upper bound on the concatenation size of a single pack. Candidates are bin-packed
// greedily; a pack that would exceed this is closed and a new one is started. A single
// unique candidate larger than this falls back to standalone upload.
uint64_t MaxPackBytes = DefaultMaxPackBytes;
};
/**
* @brief State hydration strategy interface
*
* An instance of this interface is used to perform hydration OR
* dehydration of server state. It's expected to be used only once
* and not reused.
*/
struct HydrationStrategyBase
{
virtual ~HydrationStrategyBase() = default;
// Upload server state to the configured target. ServerStateDir is wiped on success.
// On failure, ServerStateDir is left intact and the failure is logged at WARN; no
// exception propagates to the caller. Callers that need to distinguish success from
// failure must inspect the log stream or observe downstream effects (e.g. presence of
// the metadata file on the backend) - success is not signalled through the API.
virtual void Dehydrate(const CbObject& CachedState) = 0;
// Download state from the configured target into ServerStateDir. Returns cached state
// for the next Dehydrate. On failure, ServerStateDir is wiped, the failure is logged,
// and an empty CbObject is returned (a no-op cache). Callers can check the result for
// emptiness as a failure indicator.
virtual CbObject Hydrate() = 0;
// Delete all stored data for this module from the configured backend, then clean
// ServerStateDir and TempDir. Backend-delete failures are retried once; if the retry
// also fails, local cleanup proceeds regardless and the failure is logged at WARN.
// Because backend deletion is best-effort, a return from Obliterate does not guarantee
// backend data is gone.
virtual void Obliterate() = 0;
};
/**
* @brief Hub-wide hydration backend
*
* Constructed once per hub via InitHydration. Holds the shared connection / client /
* credentials state for the configured backend (e.g. a single S3 client and IMDS
* credential provider shared by all modules). CreateHydrator produces a ready-to-use
* per-module HydrationStrategyBase that references the shared state - no per-module
* backend setup cost.
*/
class HydrationBase
{
public:
struct Configuration
{
// Back-end specific target specification (e.g. "s3://bucket/prefix", "file:///path")
std::string TargetSpecification;
// Full config object (mutually exclusive with TargetSpecification). Backend-specific
// settings (e.g. S3 "chunksize") live inside Options["settings"]. The common
// `excludes` entry is parsed once by HydrationBase and shared across modules.
CbObject Options;
// Routes S3 hydration through AsyncHttpClient + S3AsyncStorage when true; false
// falls back to the blocking S3Client + S3Storage path. Ignored by FileHydration.
bool AsyncEnabled = false;
// Per-AsyncHttpClient max in-flight requests. Only consulted when AsyncEnabled.
uint32_t AsyncMaxConcurrentRequests = 128;
};
// Parses common Options entries (`excludes`) into m_Excludes, applying built-in
// defaults when the field is absent. Field present-but-empty `[]` is honored as an
// explicit override (no defaults applied).
explicit HydrationBase(const Configuration& Config);
virtual ~HydrationBase() = default;
// Create a configured per-module hydrator, ready to call Hydrate/Dehydrate/Obliterate.
virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) = 0;
protected:
std::vector<std::string> m_Excludes;
};
// Factory: parses Config and returns the concrete backend (FileHydration or S3Hydration).
// Throws zen::runtime_error if the config cannot be resolved to a known backend or if
// backend-specific validation fails.
std::unique_ptr<HydrationBase> InitHydration(const HydrationBase::Configuration& Config);
#if ZEN_WITH_TESTS
void hydration_forcelink();
#endif // ZEN_WITH_TESTS
} // namespace zen
|