1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
|
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include <zencore/iohash.h>
#include <zencore/parallelwork.h>
#include <zencore/timer.h>
#include <zenhttp/asynchttpclient.h>
#include <zenutil/cloud/s3client.h>
#include <zenutil/cloud/s3requestbuilder.h>
#include <zenutil/cloud/sigv4.h>
#include <atomic>
#include <filesystem>
#include <functional>
#include <limits>
#include <memory>
#include <semaphore>
#include <string>
namespace zen {
// Storage-layer admission semaphore. Sized at runtime to MaxConcurrentRequests;
// LeastMaxValue is the compile-time upper bound. Caller may pass nullptr to
// disable gating entirely.
using AdmissionSemaphore = std::counting_semaphore<std::numeric_limits<std::ptrdiff_t>::max()>;
struct S3AsyncStorageStats
{
std::atomic<uint64_t>& RequestCount;
std::atomic<uint64_t>& RequestTotalUs;
std::atomic<uint64_t>& RequestMaxUs;
std::atomic<uint64_t>& Bytes;
std::atomic<uint32_t>& InFlight;
std::atomic<uint32_t>& InFlightPeak;
std::atomic<uint64_t>& FirstScheduleUs;
std::atomic<uint64_t>& FirstStartUs;
std::atomic<uint64_t>& AdmissionWaitTotalUs;
std::atomic<uint64_t>& AdmissionWaitMaxUs;
Stopwatch& PhaseClock;
void RecordScheduled()
{
const uint64_t Now = PhaseClock.GetElapsedTimeUs();
uint64_t Existing = FirstScheduleUs.load(std::memory_order_relaxed);
while (Now < Existing && !FirstScheduleUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed))
{
}
}
Stopwatch BeginRequest()
{
const uint64_t Now = PhaseClock.GetElapsedTimeUs();
uint64_t Existing = FirstStartUs.load(std::memory_order_relaxed);
while (Now < Existing && !FirstStartUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed))
{
}
const uint32_t Current = InFlight.fetch_add(1, std::memory_order_relaxed) + 1;
uint32_t Peak = InFlightPeak.load(std::memory_order_relaxed);
while (Current > Peak && !InFlightPeak.compare_exchange_weak(Peak, Current, std::memory_order_relaxed))
{
}
return Stopwatch{};
}
void EndRequest(uint64_t ElapsedUs, uint64_t BytesValue)
{
InFlight.fetch_sub(1, std::memory_order_relaxed);
RequestCount.fetch_add(1, std::memory_order_relaxed);
RequestTotalUs.fetch_add(ElapsedUs, std::memory_order_relaxed);
Bytes.fetch_add(BytesValue, std::memory_order_relaxed);
uint64_t Existing = RequestMaxUs.load(std::memory_order_relaxed);
while (ElapsedUs > Existing && !RequestMaxUs.compare_exchange_weak(Existing, ElapsedUs, std::memory_order_relaxed))
{
}
}
void RecordAdmissionWait(uint64_t Us)
{
AdmissionWaitTotalUs.fetch_add(Us, std::memory_order_relaxed);
uint64_t Prev = AdmissionWaitMaxUs.load(std::memory_order_relaxed);
while (Us > Prev && !AdmissionWaitMaxUs.compare_exchange_weak(Prev, Us, std::memory_order_relaxed))
{
}
}
};
// Async S3 storage adapter for hub hydration. Mirrors the behavior of the
// blocking `S3Storage` (defined inside hydration.cpp) but submits requests
// via `AsyncHttpClient` and counts in-flight work against `ParallelWork`
// using `ExternalWorkToken` instead of occupying worker-pool threads.
//
// Construction:
// S3AsyncStorage Storage(AsyncClient, RequestBuilder, GetCreds, KeyPrefix,
// MultipartChunkSize);
//
// AsyncClient and RequestBuilder are owned by the caller (typically the hub
// or a test fixture). GetCreds is a callable returning the latest SigV4
// credentials - keeps the storage decoupled from any specific provider.
//
// Per-call SourcePath/DestinationPath args carry temp-dir state from the
// caller (IncrementalHydrator picks paths under HydrationConfig::TempDir);
// S3AsyncStorage itself never owns a temp directory.
//
// DeleteAll blocks the caller while the prefix listing runs, then issues
// async deletes via Work.
class S3AsyncStorage
{
public:
using CredentialsCallback = std::function<SigV4Credentials()>;
S3AsyncStorage(AsyncHttpClient& Client,
S3RequestBuilder& Builder,
CredentialsCallback GetCreds,
std::string KeyPrefix,
uint64_t MultipartChunkSize,
std::shared_ptr<AdmissionSemaphore> Admission = nullptr,
uint32_t AdmissionCap = 0);
std::string_view KeyPrefix() const { return m_KeyPrefix; }
// Async data operations. Each registers a ParallelWork::ExternalWorkToken;
// the async callback completes/fails the token. Caller drives Work.Wait()
// to block until all submissions finish.
//
// Stats (must outlive in-flight callbacks): every S3 request the storage
// issues calls RecordScheduled at submit time, BeginRequest just before
// the network handoff, and EndRequest from the completion callback. Bytes
// is the payload size on success, 0 on failure or zero-payload requests.
// Multipart and ranged GET fire one Begin/EndRequest per part/range.
void Put(ParallelWork& Work,
WorkerThreadPool& Pool,
const IoHash& Hash,
uint64_t Size,
const std::filesystem::path& SourcePath,
S3AsyncStorageStats& Stats);
void Get(ParallelWork& Work,
WorkerThreadPool& Pool,
const IoHash& Hash,
uint64_t Size,
const std::filesystem::path& DestinationPath,
S3AsyncStorageStats& Stats);
void Touch(ParallelWork& Work, WorkerThreadPool& Pool, const IoHash& Hash, S3AsyncStorageStats& Stats);
// Synchronous list of all CAS hashes under this module's prefix.
std::vector<IoHash> List();
// Synchronous delete-all under the module prefix. Lists then issues async
// deletes via Work; caller still drives Work.Wait().
void DeleteAll(ParallelWork& Work);
// Forward declarations are public (not the structs themselves) so file-scope
// free helpers in s3asyncstorage.cpp can name the types in their signatures
// without being declared friends. The structs are defined privately in the
// .cpp; no out-of-module caller can construct or inspect them.
public:
struct PutMultipartState;
struct GetMultipartState;
struct GetStreamState;
private:
std::string CasKey(const IoHash& Hash) const;
std::string CasPath(const IoHash& Hash) const;
void PutSmall(ParallelWork& Work,
WorkerThreadPool& Pool,
const IoHash& Hash,
uint64_t Size,
const std::filesystem::path& SourcePath,
S3AsyncStorageStats& Stats);
void PutMedium(ParallelWork& Work,
WorkerThreadPool& Pool,
const IoHash& Hash,
uint64_t Size,
const std::filesystem::path& SourcePath,
S3AsyncStorageStats& Stats);
void PutMultipart(ParallelWork& Work,
WorkerThreadPool& Pool,
const IoHash& Hash,
uint64_t Size,
const std::filesystem::path& SourcePath,
S3AsyncStorageStats& Stats);
void DispatchInitialPartWave(std::shared_ptr<PutMultipartState> State);
void DispatchPartUpload(std::shared_ptr<PutMultipartState> State, uint32_t PartNum, std::shared_ptr<void> SlotRef);
void HandoffSlotToNextPart(std::shared_ptr<PutMultipartState> State, uint32_t PartIdx, std::shared_ptr<void> SlotRef);
void DrainUndispatchedParts(std::shared_ptr<PutMultipartState> State);
void FinalizePutPart(std::shared_ptr<PutMultipartState> State);
void CompleteMultipart(std::shared_ptr<PutMultipartState> State);
void AbortMultipart(std::shared_ptr<PutMultipartState> State);
void GetMultipart(ParallelWork& Work,
WorkerThreadPool& Pool,
const IoHash& Hash,
uint64_t Size,
const std::filesystem::path& DestinationPath,
S3AsyncStorageStats& Stats);
void OnGetPartCompleted(std::shared_ptr<GetMultipartState> State);
void OnGetStreamFinalised(std::shared_ptr<GetStreamState> State);
std::vector<std::string> ListAllObjects(std::string_view Prefix);
AsyncHttpClient& m_Client;
S3RequestBuilder& m_Builder;
CredentialsCallback m_GetCreds;
std::string m_KeyPrefix;
uint64_t m_MultipartChunkSize;
std::shared_ptr<AdmissionSemaphore> m_Admission; // nullable; when null, no admission gating
uint32_t m_AdmissionCap; // initial slot count; 0 when admission disabled
};
void s3asyncstorage_forcelink();
#if ZEN_WITH_TESTS
namespace s3asyncstorage_test_hooks {
// Per-process counter consumed by DispatchPartUpload. While > 0, each
// invocation decrements it and synthesizes a part-level failure (via
// RecordPutPartFailure + drain + FinalizePutPart fan-out) instead of
// issuing the UploadPart request. Used to drive the AbortMultipart path
// from tests without fault-injecting MinIO.
void ForceNextPartFailures(uint32_t Count);
} // namespace s3asyncstorage_test_hooks
#endif
} // namespace zen
|