aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-10-04 15:27:45 +0200
committerStefan Boberg <[email protected]>2021-10-04 15:27:45 +0200
commit2356411f391ae65544ef1cd3810945f3ebf8e8dc (patch)
treecc838ddc32d7e6198af7b2f6532c7d686040e216
parentmonitoring: added stubs for /stats and /status endpoints (diff)
downloadzen-2356411f391ae65544ef1cd3810945f3ebf8e8dc.tar.xz
zen-2356411f391ae65544ef1cd3810945f3ebf8e8dc.zip
zenserver: Changed initialization flow
- HTTP server is now started earlier, so it can be queried while scrubbing/recovery runs - Stats/status services are initialized before anything else, so they can be used to monitor progress while scrubbing happens - Structured cache initialization is now in a separate function - Scrubbing now emits some summary stats at the point of completion
-rw-r--r--zenserver/cache/structuredcachestore.cpp194
-rw-r--r--zenserver/cache/structuredcachestore.h31
-rw-r--r--zenserver/zenserver.cpp210
-rw-r--r--zenstore/cidstore.cpp2
-rw-r--r--zenstore/compactcas.cpp2
-rw-r--r--zenstore/filecas.cpp12
6 files changed, 296 insertions, 155 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index b97f0830f..83a21f87a 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -32,6 +32,8 @@ ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir
{
ZEN_INFO("initializing structured cache at '{}'", RootDir);
CreateDirectories(RootDir);
+
+ m_DiskLayer.DiscoverBuckets();
}
ZenCacheStore::~ZenCacheStore()
@@ -149,6 +151,10 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa
_.ReleaseNow();
+ // There's a race here. Since the lock is released early to allow
+ // inserts, the bucket delete path could end up deleting the
+ // underlying data structure
+
return Bucket->Get(HashKey, OutValue);
}
@@ -210,11 +216,13 @@ ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx)
void
ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
+ RwLock::SharedLockScope _(m_bucketLock);
+
std::vector<IoHash> BadHashes;
for (auto& Kv : m_cacheMap)
{
- if (Kv.first != IoHash::HashBuffer(Kv.second))
+ if (Kv.first != IoHash::HashBuffer(Kv.second.Payload))
{
BadHashes.push_back(Kv.first);
}
@@ -222,10 +230,16 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
if (!BadHashes.empty())
{
- Ctx.ReportBadChunks(BadHashes);
+ Ctx.ReportBadCasChunks(BadHashes);
}
}
+void
+ZenCacheMemoryLayer::CacheBucket::GarbageCollect(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
bool
ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
@@ -237,18 +251,26 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV
}
else
{
- OutValue.Value = bucketIt->second;
+ BucketValue& Value = bucketIt.value();
+ OutValue.Value = Value.Payload;
+ Value.LastAccess = GetCurrentTimeStamp();
return true;
}
}
+uint64_t
+ZenCacheMemoryLayer::CacheBucket::GetCurrentTimeStamp()
+{
+ return GetLofreqTimerValue();
+}
+
void
ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
RwLock::ExclusiveLockScope _(m_bucketLock);
- m_cacheMap[HashKey] = Value.Value;
+ m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GetCurrentTimeStamp(), .Payload = Value.Value});
}
//////////////////////////////////////////////////////////////////////////
@@ -258,11 +280,17 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
struct DiskLocation
{
- uint64_t OffsetAndFlags;
- uint32_t Size;
- uint32_t IndexDataSize;
+ inline DiskLocation() = default;
+
+ inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
+ : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
+ , LowerSize(ValueSize & 0xFFFFffff)
+ , IndexDataSize(IndexSize)
+ {
+ }
- static const uint64_t kOffsetMask = 0x00FF'ffFF'ffFF'ffFFull;
+ static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
@@ -270,6 +298,7 @@ struct DiskLocation
static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
+ inline uint64_t Size() const { return LowerSize; }
inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
inline ZenContentType GetContentType() const
{
@@ -282,6 +311,11 @@ struct DiskLocation
return ContentType;
}
+
+private:
+ uint64_t OffsetAndFlags = 0;
+ uint32_t LowerSize = 0;
+ uint32_t IndexDataSize = 0;
};
struct DiskIndexEntry
@@ -299,7 +333,7 @@ struct ZenCacheDiskLayer::CacheBucket
CacheBucket(CasStore& Cas);
~CacheBucket();
- void OpenOrCreate(std::filesystem::path BucketDir);
+ void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
static bool Delete(std::filesystem::path BucketDir);
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -309,13 +343,13 @@ struct ZenCacheDiskLayer::CacheBucket
void Scrub(ScrubContext& Ctx);
void GarbageCollect(GcContext& GcCtx);
- inline bool IsOk() const { return m_Ok; }
+ inline bool IsOk() const { return m_IsOk; }
private:
CasStore& m_CasStore;
std::filesystem::path m_BucketDir;
Oid m_BucketId;
- bool m_Ok = false;
+ bool m_IsOk = false;
uint64_t m_LargeObjectThreshold = 64 * 1024;
// These files are used to manage storage of small objects for this bucket
@@ -355,7 +389,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir)
}
void
-ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
+ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate)
{
CreateDirectories(BucketDir);
@@ -382,17 +416,23 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
{
ManifestFile.Read(&m_BucketId, sizeof(Oid), 0);
- m_Ok = true;
+ m_IsOk = true;
}
- if (!m_Ok)
+ if (!m_IsOk)
{
ManifestFile.Close();
}
}
- if (!m_Ok)
+ if (!m_IsOk)
{
+ if (AllowCreate == false)
+ {
+ // Invalid bucket
+ return;
+ }
+
// No manifest file found, this is a new bucket
ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec);
@@ -424,13 +464,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
m_SlogFile.Replay([&](const DiskIndexEntry& Record) {
m_Index[Record.Key] = Record.Location;
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size);
+ MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size());
});
m_WriteCursor = (MaxFileOffset + 15) & ~15;
}
- m_Ok = true;
+ m_IsOk = true;
}
void
@@ -453,7 +493,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, Zen
{
if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size);
+ OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size());
OutValue.Value.SetContentType(Loc.GetContentType());
return true;
@@ -482,7 +522,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, Z
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
- if (!m_Ok)
+ if (!m_IsOk)
{
return false;
}
@@ -509,7 +549,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
void
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
- if (!m_Ok)
+ if (!m_IsOk)
{
return;
}
@@ -531,10 +571,9 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
RwLock::ExclusiveLockScope _(m_IndexLock);
- DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags),
- .Size = gsl::narrow<uint32_t>(Value.Value.Size())};
+ DiskLocation Loc(m_WriteCursor, Value.Value.Size(), 0, EntryFlags);
- m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size, 16);
+ m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size(), 16);
if (auto it = m_Index.find(HashKey); it == m_Index.end())
{
@@ -544,11 +583,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
else
{
// TODO: should check if write is idempotent and bail out if it is?
+ // this would requiring comparing contents on disk unless we add a
+ // content hash to the index entry
it.value() = Loc;
}
m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset());
+ m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset());
}
}
@@ -572,55 +613,45 @@ ZenCacheDiskLayer::CacheBucket::Flush()
void
ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
- std::vector<DiskIndexEntry> StandaloneFiles;
-
- std::vector<IoHash> BadChunks;
- std::vector<IoBuffer> BadStandaloneChunks;
+ std::atomic<uint64_t> ScrubbedChunks{0}, ScrubbedBytes{0};
{
RwLock::SharedLockScope _(m_IndexLock);
for (auto& Kv : m_Index)
{
- const IoHash& Hash = Kv.first;
- const DiskLocation& Loc = Kv.second;
+ const IoHash& HashKey = Kv.first;
+ const DiskLocation& Loc = Kv.second;
ZenCacheValue Value;
- if (!GetInlineCacheValue(Loc, Value))
+ if (GetInlineCacheValue(Loc, Value))
{
- ZEN_ASSERT(Loc.IsFlagSet(DiskLocation::kStandaloneFile));
- StandaloneFiles.push_back({.Key = Hash, .Location = Loc});
+ // Validate contents
}
else
{
- if (GetStandaloneCacheValue(Hash, Value, Loc))
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- // Hash contents
-
- const IoHash ComputedHash = HashBuffer(Value.Value);
-
- if (ComputedHash != Hash)
+ if (GetStandaloneCacheValue(HashKey, Value, Loc))
{
- BadChunks.push_back(Hash);
+ // Note: we cannot currently validate contents since we don't
+ // have a content hash!
+ }
+ else
+ {
+ // Value not found
}
- }
- else
- {
- // Non-existent
}
}
}
}
- if (Ctx.RunRecovery())
- {
- // Clean out bad chunks
- }
+ Ctx.ReportScrubbed(ScrubbedChunks, ScrubbedBytes);
- if (!BadChunks.empty())
+ if (Ctx.RunRecovery())
{
- Ctx.ReportBadChunks(BadChunks);
+ // Clean out bad data
}
}
@@ -681,7 +712,7 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenC
RwLock::ExclusiveLockScope _(m_IndexLock);
- DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0};
+ DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags);
if (auto it = m_Index.find(HashKey); it == m_Index.end())
{
@@ -739,7 +770,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= std::string(InBucket);
- Bucket->OpenOrCreate(BucketPath.c_str());
+ Bucket->OpenOrCreate(BucketPath);
}
}
@@ -782,7 +813,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
std::filesystem::path bucketPath = m_RootDir;
bucketPath /= std::string(InBucket);
- Bucket->OpenOrCreate(bucketPath.c_str());
+ Bucket->OpenOrCreate(bucketPath);
}
}
@@ -794,6 +825,63 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
}
}
+void
+ZenCacheDiskLayer::DiscoverBuckets()
+{
+ FileSystemTraversal Traversal;
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& File,
+ [[maybe_unused]] uint64_t FileSize) override
+ {
+ }
+
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override
+ {
+ Dirs.push_back(std::wstring(DirectoryName));
+ return false;
+ }
+
+ std::vector<std::wstring> Dirs;
+ } Visit;
+
+ Traversal.TraverseFileSystem(m_RootDir, Visit);
+
+ // Initialize buckets
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ for (const std::wstring& BucketName : Visit.Dirs)
+ {
+ // New bucket needs to be created
+
+ std::string BucketName8 = WideToUtf8(BucketName);
+
+ if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end())
+ {
+ }
+ else
+ {
+ auto InsertResult = m_Buckets.try_emplace(BucketName8, m_CasStore);
+
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= BucketName8;
+
+ CacheBucket& Bucket = InsertResult.first->second;
+
+ Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false);
+
+ if (!Bucket.IsOk())
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir);
+
+ m_Buckets.erase(InsertResult.first);
+ }
+ }
+ }
+}
+
bool
ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
{
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index 011f13323..4753af627 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -46,6 +46,11 @@ struct ZenCacheValue
CbObject IndexData;
};
+/** In-memory cache storage
+
+ Intended for small values which are frequently accessed
+
+ */
class ZenCacheMemoryLayer
{
public:
@@ -58,19 +63,39 @@ public:
void Scrub(ScrubContext& Ctx);
void GarbageCollect(GcContext& GcCtx);
+ struct Configuration
+ {
+ uint64_t TargetFootprintBytes = 16 * 1024 * 1024;
+ uint64_t ScavengeThreshold = 4 * 1024 * 1024;
+ };
+
+ const Configuration& GetConfiguration() const { return m_Configuration; }
+ void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; }
+
private:
struct CacheBucket
{
- RwLock m_bucketLock;
- tsl::robin_map<IoHash, IoBuffer> m_cacheMap;
+ struct BucketValue
+ {
+ uint64_t LastAccess = 0;
+ IoBuffer Payload;
+ };
+
+ RwLock m_bucketLock;
+ tsl::robin_map<IoHash, BucketValue> m_cacheMap;
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(const IoHash& HashKey, const ZenCacheValue& Value);
void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
+
+ private:
+ uint64_t GetCurrentTimeStamp();
};
RwLock m_Lock;
std::unordered_map<std::string, CacheBucket> m_Buckets;
+ Configuration m_Configuration;
};
class ZenCacheDiskLayer
@@ -86,6 +111,8 @@ public:
void Scrub(ScrubContext& Ctx);
void GarbageCollect(GcContext& GcCtx);
+ void DiscoverBuckets();
+
private:
/** A cache bucket manages a single directory containing
metadata and data for that bucket
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index f5d8364cb..0bec56593 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -32,7 +32,7 @@
#include <unordered_map>
#if !defined(BUILD_VERSION)
-#define BUILD_VERSION ("DEV-BUILD")
+# define BUILD_VERSION ("DEV-BUILD")
#endif
//////////////////////////////////////////////////////////////////////////
@@ -87,6 +87,8 @@
#include "diag/diagsvcs.h"
#include "experimental/frontend.h"
#include "experimental/usnjournal.h"
+#include "monitoring/httpstats.h"
+#include "monitoring/httpstatus.h"
#include "projectstore.h"
#include "testing/httptest.h"
#include "testing/launch.h"
@@ -143,6 +145,14 @@ public:
// Ok so now we're configured, let's kick things off
+ m_Http = zen::CreateHttpServer();
+ m_Http->Initialize(BasePort);
+ m_Http->RegisterService(m_HealthService);
+ m_Http->RegisterService(m_StatsService);
+ m_Http->RegisterService(m_StatusService);
+
+ // Initialize storage and services
+
ZEN_INFO("initializing storage");
zen::CasStoreConfiguration Config;
@@ -170,95 +180,7 @@ public:
if (ServiceConfig.StructuredCacheEnabled)
{
- using namespace std::literals;
- auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
-
- ZEN_INFO("instantiating structured cache service");
- m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
-
- std::unique_ptr<zen::UpstreamCache> UpstreamCache;
- if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
- {
- const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig;
-
- zen::UpstreamCacheOptions UpstreamOptions;
- UpstreamOptions.ReadUpstream =
- (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
- UpstreamOptions.WriteUpstream =
- (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
-
- if (UpstreamConfig.UpstreamThreadCount < 32)
- {
- UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
- }
-
- UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled;
-
- UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
-
- if (!UpstreamConfig.ZenConfig.Urls.empty())
- {
- std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls);
- UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
- }
-
- {
- zen::CloudCacheClientOptions Options;
- if (UpstreamConfig.JupiterConfig.UseProductionSettings)
- {
- Options = zen::CloudCacheClientOptions{
- .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
- .DdcNamespace = "ue.ddc"sv,
- .BlobStoreNamespace = "ue.ddc"sv,
- .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
- .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
- .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
- .UseLegacyDdc = false};
- }
- else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
- {
- Options = zen::CloudCacheClientOptions{
- .ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
- .DdcNamespace = "ue4.ddc"sv,
- .BlobStoreNamespace = "test.ddc"sv,
- .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
- .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
- .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
- .UseLegacyDdc = false};
- }
-
- Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl);
- Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace);
- Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace);
- Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider);
- Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId);
- Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret);
- Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc;
-
- if (!Options.ServiceUrl.empty())
- {
- std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options);
- UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
- }
- }
-
- if (UpstreamCache->Initialize())
- {
- ZEN_INFO("upstream cache active ({})",
- UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE"
- : UpstreamOptions.ReadUpstream ? "READONLY"
- : UpstreamOptions.WriteUpstream ? "WRITEONLY"
- : "DISABLED");
- }
- else
- {
- UpstreamCache.reset();
- ZEN_INFO("NOT using upstream cache");
- }
- }
-
- m_StructuredCacheService.reset(
- new zen::HttpStructuredCacheService(*m_CacheStore, *m_CasStore, *m_CidStore, std::move(UpstreamCache)));
+ InitializeStructuredCache(ServiceConfig);
}
else
{
@@ -276,13 +198,8 @@ public:
}
#endif
- m_Http = zen::CreateHttpServer();
- m_Http->Initialize(BasePort);
- m_Http->RegisterService(m_HealthService);
-
m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics
m_Http->RegisterService(m_TestingService);
-
m_Http->RegisterService(m_AdminService);
if (m_HttpProjectService)
@@ -308,12 +225,15 @@ public:
}
m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot);
+
if (m_FrontendService)
{
m_Http->RegisterService(*m_FrontendService);
}
}
+ void InitializeStructuredCache(ZenServiceConfig& ServiceConfig);
+
#if ZEN_ENABLE_MESH
void StartMesh(int BasePort)
{
@@ -428,6 +348,7 @@ public:
void Scrub()
{
+ Stopwatch Timer;
ZEN_INFO("Storage validation STARTING");
ScrubContext Ctx;
@@ -436,7 +357,13 @@ public:
m_ProjectStore->Scrub(Ctx);
m_StructuredCacheService->Scrub(Ctx);
- ZEN_INFO("Storage validation DONE");
+ const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
+
+ ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})",
+ NiceTimeSpanMs(ElapsedTimeMs),
+ NiceBytes(Ctx.ScrubbedBytes()),
+ Ctx.ScrubbedChunks(),
+ NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs));
}
void Flush()
@@ -466,6 +393,8 @@ private:
zen::NamedMutex m_ServerMutex;
zen::Ref<zen::HttpServer> m_Http;
+ zen::HttpStatusService m_StatusService;
+ zen::HttpStatsService m_StatsService;
std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()};
std::unique_ptr<zen::CidStore> m_CidStore;
std::unique_ptr<zen::ZenCacheStore> m_CacheStore;
@@ -488,6 +417,95 @@ private:
bool m_DebugOptionForcedCrash = false;
};
+void
+ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig)
+{
+ using namespace std::literals;
+ auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
+
+ ZEN_INFO("instantiating structured cache service");
+ m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
+
+ std::unique_ptr<zen::UpstreamCache> UpstreamCache;
+ if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
+ {
+ const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig;
+
+ zen::UpstreamCacheOptions UpstreamOptions;
+ UpstreamOptions.ReadUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
+ UpstreamOptions.WriteUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
+
+ if (UpstreamConfig.UpstreamThreadCount < 32)
+ {
+ UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
+ }
+
+ UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled;
+
+ UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
+
+ if (!UpstreamConfig.ZenConfig.Urls.empty())
+ {
+ std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls);
+ UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
+ }
+
+ {
+ zen::CloudCacheClientOptions Options;
+ if (UpstreamConfig.JupiterConfig.UseProductionSettings)
+ {
+ Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
+ .DdcNamespace = "ue.ddc"sv,
+ .BlobStoreNamespace = "ue.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ .UseLegacyDdc = false};
+ }
+ else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
+ {
+ Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
+ .DdcNamespace = "ue4.ddc"sv,
+ .BlobStoreNamespace = "test.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ .UseLegacyDdc = false};
+ }
+
+ Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl);
+ Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace);
+ Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace);
+ Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider);
+ Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId);
+ Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret);
+ Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc;
+
+ if (!Options.ServiceUrl.empty())
+ {
+ std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options);
+ UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
+ }
+ }
+
+ if (UpstreamCache->Initialize())
+ {
+ ZEN_INFO("upstream cache active ({})",
+ UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE"
+ : UpstreamOptions.ReadUpstream ? "READONLY"
+ : UpstreamOptions.WriteUpstream ? "WRITEONLY"
+ : "DISABLED");
+ }
+ else
+ {
+ UpstreamCache.reset();
+ ZEN_INFO("NOT using upstream cache");
+ }
+ }
+
+ m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(*m_CacheStore, *m_CasStore, *m_CidStore, std::move(UpstreamCache)));
+}
+
} // namespace zen
class ZenWindowsService : public WindowsService
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index df5c32d25..7a5d7bcf4 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -204,7 +204,7 @@ struct CidStore::Impl
// TODO: Should compute a snapshot index here
- Ctx.ReportBadChunks(BadChunks);
+ Ctx.ReportBadCasChunks(BadChunks);
}
uint64_t m_LastScrubTime = 0;
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 5fc3ac356..612f87c7c 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -254,7 +254,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadChunks(BadChunkHashes);
+ Ctx.ReportBadCasChunks(BadChunkHashes);
}
void
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index 0b18848d5..ee641b80a 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -394,7 +394,8 @@ FileCasStrategy::Flush()
void
FileCasStrategy::Scrub(ScrubContext& Ctx)
{
- std::vector<IoHash> BadHashes;
+ std::vector<IoHash> BadHashes;
+ std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
IoHashStream Hasher;
@@ -405,8 +406,13 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
{
BadHashes.push_back(Hash);
}
+
+ ++ChunkCount;
+ ChunkBytes.fetch_add(Payload.FileSize());
});
+ Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
+
if (!BadHashes.empty())
{
ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size());
@@ -428,7 +434,9 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
}
}
- Ctx.ReportBadChunks(BadHashes);
+ Ctx.ReportBadCasChunks(BadHashes);
+
+ ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes));
}
void