diff options
| author | Stefan Boberg <[email protected]> | 2021-10-04 15:27:45 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-10-04 15:27:45 +0200 |
| commit | 2356411f391ae65544ef1cd3810945f3ebf8e8dc (patch) | |
| tree | cc838ddc32d7e6198af7b2f6532c7d686040e216 | |
| parent | monitoring: added stubs for /stats and /status endpoints (diff) | |
| download | zen-2356411f391ae65544ef1cd3810945f3ebf8e8dc.tar.xz zen-2356411f391ae65544ef1cd3810945f3ebf8e8dc.zip | |
zenserver: Changed initialization flow
- HTTP server is now started earlier, so it can be queried while scrubbing/recovery runs
- Stats/status services are initialized before anything else, so they can be used to monitor progress while scrubbing happens
- Structured cache initialization is now in a separate function
- Scrubbing now emits some summary stats at the point of completion
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 194 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 31 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 210 | ||||
| -rw-r--r-- | zenstore/cidstore.cpp | 2 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 2 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 12 |
6 files changed, 296 insertions, 155 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index b97f0830f..83a21f87a 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -32,6 +32,8 @@ ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir { ZEN_INFO("initializing structured cache at '{}'", RootDir); CreateDirectories(RootDir); + + m_DiskLayer.DiscoverBuckets(); } ZenCacheStore::~ZenCacheStore() @@ -149,6 +151,10 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa _.ReleaseNow(); + // There's a race here. Since the lock is released early to allow + // inserts, the bucket delete path could end up deleting the + // underlying data structure + return Bucket->Get(HashKey, OutValue); } @@ -210,11 +216,13 @@ ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx) void ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) { + RwLock::SharedLockScope _(m_bucketLock); + std::vector<IoHash> BadHashes; for (auto& Kv : m_cacheMap) { - if (Kv.first != IoHash::HashBuffer(Kv.second)) + if (Kv.first != IoHash::HashBuffer(Kv.second.Payload)) { BadHashes.push_back(Kv.first); } @@ -222,10 +230,16 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) if (!BadHashes.empty()) { - Ctx.ReportBadChunks(BadHashes); + Ctx.ReportBadCasChunks(BadHashes); } } +void +ZenCacheMemoryLayer::CacheBucket::GarbageCollect(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + bool ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -237,18 +251,26 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV } else { - OutValue.Value = bucketIt->second; + BucketValue& Value = bucketIt.value(); + OutValue.Value = Value.Payload; + Value.LastAccess = GetCurrentTimeStamp(); return true; } } +uint64_t +ZenCacheMemoryLayer::CacheBucket::GetCurrentTimeStamp() +{ + return GetLofreqTimerValue(); +} + void ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { RwLock::ExclusiveLockScope _(m_bucketLock); - m_cacheMap[HashKey] = Value.Value; + m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GetCurrentTimeStamp(), .Payload = Value.Value}); } ////////////////////////////////////////////////////////////////////////// @@ -258,11 +280,17 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue struct DiskLocation { - uint64_t OffsetAndFlags; - uint32_t Size; - uint32_t IndexDataSize; + inline DiskLocation() = default; + + inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) + : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) + , LowerSize(ValueSize & 0xFFFFffff) + , IndexDataSize(IndexSize) + { + } - static const uint64_t kOffsetMask = 0x00FF'ffFF'ffFF'ffFFull; + static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; + static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; static const uint64_t kStructured = 0x4000'0000'0000'0000ull; @@ -270,6 +298,7 @@ struct DiskLocation static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } + inline uint64_t Size() const { return LowerSize; } inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } inline ZenContentType GetContentType() const { @@ -282,6 +311,11 @@ struct DiskLocation return ContentType; } + +private: + uint64_t OffsetAndFlags = 0; + uint32_t LowerSize = 0; + uint32_t IndexDataSize = 0; }; struct DiskIndexEntry @@ -299,7 +333,7 @@ struct ZenCacheDiskLayer::CacheBucket CacheBucket(CasStore& Cas); ~CacheBucket(); - void OpenOrCreate(std::filesystem::path BucketDir); + void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); static bool Delete(std::filesystem::path BucketDir); bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); @@ -309,13 +343,13 @@ struct ZenCacheDiskLayer::CacheBucket void Scrub(ScrubContext& Ctx); void GarbageCollect(GcContext& GcCtx); - inline bool IsOk() const { return m_Ok; } + inline bool IsOk() const { return m_IsOk; } private: CasStore& m_CasStore; std::filesystem::path m_BucketDir; Oid m_BucketId; - bool m_Ok = false; + bool m_IsOk = false; uint64_t m_LargeObjectThreshold = 64 * 1024; // These files are used to manage storage of small objects for this bucket @@ -355,7 +389,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) } void -ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) +ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { CreateDirectories(BucketDir); @@ -382,17 +416,23 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) { ManifestFile.Read(&m_BucketId, sizeof(Oid), 0); - m_Ok = true; + m_IsOk = true; } - if (!m_Ok) + if (!m_IsOk) { ManifestFile.Close(); } } - if (!m_Ok) + if (!m_IsOk) { + if (AllowCreate == false) + { + // Invalid bucket + return; + } + // No manifest file found, this is a new bucket ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec); @@ -424,13 +464,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) m_SlogFile.Replay([&](const DiskIndexEntry& Record) { m_Index[Record.Key] = Record.Location; - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size); + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size()); }); m_WriteCursor = (MaxFileOffset + 15) & ~15; } - m_Ok = true; + m_IsOk = true; } void @@ -453,7 +493,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, Zen { if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size); + OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size()); OutValue.Value.SetContentType(Loc.GetContentType()); return true; @@ -482,7 +522,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, Z bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { - if (!m_Ok) + if (!m_IsOk) { return false; } @@ -509,7 +549,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { - if (!m_Ok) + if (!m_IsOk) { return; } @@ -531,10 +571,9 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& RwLock::ExclusiveLockScope _(m_IndexLock); - DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags), - .Size = gsl::narrow<uint32_t>(Value.Value.Size())}; + DiskLocation Loc(m_WriteCursor, Value.Value.Size(), 0, EntryFlags); - m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size, 16); + m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size(), 16); if (auto it = m_Index.find(HashKey); it == m_Index.end()) { @@ -544,11 +583,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& else { // TODO: should check if write is idempotent and bail out if it is? + // this would requiring comparing contents on disk unless we add a + // content hash to the index entry it.value() = Loc; } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset()); + m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset()); } } @@ -572,55 +613,45 @@ ZenCacheDiskLayer::CacheBucket::Flush() void ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { - std::vector<DiskIndexEntry> StandaloneFiles; - - std::vector<IoHash> BadChunks; - std::vector<IoBuffer> BadStandaloneChunks; + std::atomic<uint64_t> ScrubbedChunks{0}, ScrubbedBytes{0}; { RwLock::SharedLockScope _(m_IndexLock); for (auto& Kv : m_Index) { - const IoHash& Hash = Kv.first; - const DiskLocation& Loc = Kv.second; + const IoHash& HashKey = Kv.first; + const DiskLocation& Loc = Kv.second; ZenCacheValue Value; - if (!GetInlineCacheValue(Loc, Value)) + if (GetInlineCacheValue(Loc, Value)) { - ZEN_ASSERT(Loc.IsFlagSet(DiskLocation::kStandaloneFile)); - StandaloneFiles.push_back({.Key = Hash, .Location = Loc}); + // Validate contents } else { - if (GetStandaloneCacheValue(Hash, Value, Loc)) + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - // Hash contents - - const IoHash ComputedHash = HashBuffer(Value.Value); - - if (ComputedHash != Hash) + if (GetStandaloneCacheValue(HashKey, Value, Loc)) { - BadChunks.push_back(Hash); + // Note: we cannot currently validate contents since we don't + // have a content hash! + } + else + { + // Value not found } - } - else - { - // Non-existent } } } } - if (Ctx.RunRecovery()) - { - // Clean out bad chunks - } + Ctx.ReportScrubbed(ScrubbedChunks, ScrubbedBytes); - if (!BadChunks.empty()) + if (Ctx.RunRecovery()) { - Ctx.ReportBadChunks(BadChunks); + // Clean out bad data } } @@ -681,7 +712,7 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenC RwLock::ExclusiveLockScope _(m_IndexLock); - DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0}; + DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags); if (auto it = m_Index.find(HashKey); it == m_Index.end()) { @@ -739,7 +770,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); - Bucket->OpenOrCreate(BucketPath.c_str()); + Bucket->OpenOrCreate(BucketPath); } } @@ -782,7 +813,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z std::filesystem::path bucketPath = m_RootDir; bucketPath /= std::string(InBucket); - Bucket->OpenOrCreate(bucketPath.c_str()); + Bucket->OpenOrCreate(bucketPath); } } @@ -794,6 +825,63 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z } } +void +ZenCacheDiskLayer::DiscoverBuckets() +{ + FileSystemTraversal Traversal; + struct Visitor : public FileSystemTraversal::TreeVisitor + { + virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent, + [[maybe_unused]] const path_view& File, + [[maybe_unused]] uint64_t FileSize) override + { + } + + virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override + { + Dirs.push_back(std::wstring(DirectoryName)); + return false; + } + + std::vector<std::wstring> Dirs; + } Visit; + + Traversal.TraverseFileSystem(m_RootDir, Visit); + + // Initialize buckets + + RwLock::ExclusiveLockScope _(m_Lock); + + for (const std::wstring& BucketName : Visit.Dirs) + { + // New bucket needs to be created + + std::string BucketName8 = WideToUtf8(BucketName); + + if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end()) + { + } + else + { + auto InsertResult = m_Buckets.try_emplace(BucketName8, m_CasStore); + + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName8; + + CacheBucket& Bucket = InsertResult.first->second; + + Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false); + + if (!Bucket.IsOk()) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir); + + m_Buckets.erase(InsertResult.first); + } + } + } +} + bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 011f13323..4753af627 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -46,6 +46,11 @@ struct ZenCacheValue CbObject IndexData; }; +/** In-memory cache storage + + Intended for small values which are frequently accessed + + */ class ZenCacheMemoryLayer { public: @@ -58,19 +63,39 @@ public: void Scrub(ScrubContext& Ctx); void GarbageCollect(GcContext& GcCtx); + struct Configuration + { + uint64_t TargetFootprintBytes = 16 * 1024 * 1024; + uint64_t ScavengeThreshold = 4 * 1024 * 1024; + }; + + const Configuration& GetConfiguration() const { return m_Configuration; } + void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; } + private: struct CacheBucket { - RwLock m_bucketLock; - tsl::robin_map<IoHash, IoBuffer> m_cacheMap; + struct BucketValue + { + uint64_t LastAccess = 0; + IoBuffer Payload; + }; + + RwLock m_bucketLock; + tsl::robin_map<IoHash, BucketValue> m_cacheMap; bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); void Put(const IoHash& HashKey, const ZenCacheValue& Value); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); + + private: + uint64_t GetCurrentTimeStamp(); }; RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; + Configuration m_Configuration; }; class ZenCacheDiskLayer @@ -86,6 +111,8 @@ public: void Scrub(ScrubContext& Ctx); void GarbageCollect(GcContext& GcCtx); + void DiscoverBuckets(); + private: /** A cache bucket manages a single directory containing metadata and data for that bucket diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index f5d8364cb..0bec56593 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -32,7 +32,7 @@ #include <unordered_map> #if !defined(BUILD_VERSION) -#define BUILD_VERSION ("DEV-BUILD") +# define BUILD_VERSION ("DEV-BUILD") #endif ////////////////////////////////////////////////////////////////////////// @@ -87,6 +87,8 @@ #include "diag/diagsvcs.h" #include "experimental/frontend.h" #include "experimental/usnjournal.h" +#include "monitoring/httpstats.h" +#include "monitoring/httpstatus.h" #include "projectstore.h" #include "testing/httptest.h" #include "testing/launch.h" @@ -143,6 +145,14 @@ public: // Ok so now we're configured, let's kick things off + m_Http = zen::CreateHttpServer(); + m_Http->Initialize(BasePort); + m_Http->RegisterService(m_HealthService); + m_Http->RegisterService(m_StatsService); + m_Http->RegisterService(m_StatusService); + + // Initialize storage and services + ZEN_INFO("initializing storage"); zen::CasStoreConfiguration Config; @@ -170,95 +180,7 @@ public: if (ServiceConfig.StructuredCacheEnabled) { - using namespace std::literals; - auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; - - ZEN_INFO("instantiating structured cache service"); - m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); - - std::unique_ptr<zen::UpstreamCache> UpstreamCache; - if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) - { - const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig; - - zen::UpstreamCacheOptions UpstreamOptions; - UpstreamOptions.ReadUpstream = - (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; - UpstreamOptions.WriteUpstream = - (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; - - if (UpstreamConfig.UpstreamThreadCount < 32) - { - UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); - } - - UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled; - - UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); - - if (!UpstreamConfig.ZenConfig.Urls.empty()) - { - std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls); - UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); - } - - { - zen::CloudCacheClientOptions Options; - if (UpstreamConfig.JupiterConfig.UseProductionSettings) - { - Options = zen::CloudCacheClientOptions{ - .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, - .DdcNamespace = "ue.ddc"sv, - .BlobStoreNamespace = "ue.ddc"sv, - .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, - .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, - .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, - .UseLegacyDdc = false}; - } - else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) - { - Options = zen::CloudCacheClientOptions{ - .ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, - .DdcNamespace = "ue4.ddc"sv, - .BlobStoreNamespace = "test.ddc"sv, - .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, - .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, - .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, - .UseLegacyDdc = false}; - } - - Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl); - Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace); - Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace); - Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider); - Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId); - Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret); - Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc; - - if (!Options.ServiceUrl.empty()) - { - std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options); - UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); - } - } - - if (UpstreamCache->Initialize()) - { - ZEN_INFO("upstream cache active ({})", - UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE" - : UpstreamOptions.ReadUpstream ? "READONLY" - : UpstreamOptions.WriteUpstream ? "WRITEONLY" - : "DISABLED"); - } - else - { - UpstreamCache.reset(); - ZEN_INFO("NOT using upstream cache"); - } - } - - m_StructuredCacheService.reset( - new zen::HttpStructuredCacheService(*m_CacheStore, *m_CasStore, *m_CidStore, std::move(UpstreamCache))); + InitializeStructuredCache(ServiceConfig); } else { @@ -276,13 +198,8 @@ public: } #endif - m_Http = zen::CreateHttpServer(); - m_Http->Initialize(BasePort); - m_Http->RegisterService(m_HealthService); - m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics m_Http->RegisterService(m_TestingService); - m_Http->RegisterService(m_AdminService); if (m_HttpProjectService) @@ -308,12 +225,15 @@ public: } m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot); + if (m_FrontendService) { m_Http->RegisterService(*m_FrontendService); } } + void InitializeStructuredCache(ZenServiceConfig& ServiceConfig); + #if ZEN_ENABLE_MESH void StartMesh(int BasePort) { @@ -428,6 +348,7 @@ public: void Scrub() { + Stopwatch Timer; ZEN_INFO("Storage validation STARTING"); ScrubContext Ctx; @@ -436,7 +357,13 @@ public: m_ProjectStore->Scrub(Ctx); m_StructuredCacheService->Scrub(Ctx); - ZEN_INFO("Storage validation DONE"); + const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); + + ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})", + NiceTimeSpanMs(ElapsedTimeMs), + NiceBytes(Ctx.ScrubbedBytes()), + Ctx.ScrubbedChunks(), + NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); } void Flush() @@ -466,6 +393,8 @@ private: zen::NamedMutex m_ServerMutex; zen::Ref<zen::HttpServer> m_Http; + zen::HttpStatusService m_StatusService; + zen::HttpStatsService m_StatsService; std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()}; std::unique_ptr<zen::CidStore> m_CidStore; std::unique_ptr<zen::ZenCacheStore> m_CacheStore; @@ -488,6 +417,95 @@ private: bool m_DebugOptionForcedCrash = false; }; +void +ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) +{ + using namespace std::literals; + auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; + + ZEN_INFO("instantiating structured cache service"); + m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); + + std::unique_ptr<zen::UpstreamCache> UpstreamCache; + if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) + { + const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig; + + zen::UpstreamCacheOptions UpstreamOptions; + UpstreamOptions.ReadUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; + UpstreamOptions.WriteUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; + + if (UpstreamConfig.UpstreamThreadCount < 32) + { + UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); + } + + UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled; + + UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); + + if (!UpstreamConfig.ZenConfig.Urls.empty()) + { + std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls); + UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); + } + + { + zen::CloudCacheClientOptions Options; + if (UpstreamConfig.JupiterConfig.UseProductionSettings) + { + Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, + .DdcNamespace = "ue.ddc"sv, + .BlobStoreNamespace = "ue.ddc"sv, + .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, + .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, + .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, + .UseLegacyDdc = false}; + } + else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) + { + Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, + .DdcNamespace = "ue4.ddc"sv, + .BlobStoreNamespace = "test.ddc"sv, + .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, + .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, + .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, + .UseLegacyDdc = false}; + } + + Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl); + Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace); + Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace); + Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider); + Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId); + Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret); + Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc; + + if (!Options.ServiceUrl.empty()) + { + std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options); + UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); + } + } + + if (UpstreamCache->Initialize()) + { + ZEN_INFO("upstream cache active ({})", + UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE" + : UpstreamOptions.ReadUpstream ? "READONLY" + : UpstreamOptions.WriteUpstream ? "WRITEONLY" + : "DISABLED"); + } + else + { + UpstreamCache.reset(); + ZEN_INFO("NOT using upstream cache"); + } + } + + m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(*m_CacheStore, *m_CasStore, *m_CidStore, std::move(UpstreamCache))); +} + } // namespace zen class ZenWindowsService : public WindowsService diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index df5c32d25..7a5d7bcf4 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -204,7 +204,7 @@ struct CidStore::Impl // TODO: Should compute a snapshot index here - Ctx.ReportBadChunks(BadChunks); + Ctx.ReportBadCasChunks(BadChunks); } uint64_t m_LastScrubTime = 0; diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 5fc3ac356..612f87c7c 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -254,7 +254,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do - Ctx.ReportBadChunks(BadChunkHashes); + Ctx.ReportBadCasChunks(BadChunkHashes); } void diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 0b18848d5..ee641b80a 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -394,7 +394,8 @@ FileCasStrategy::Flush() void FileCasStrategy::Scrub(ScrubContext& Ctx) { - std::vector<IoHash> BadHashes; + std::vector<IoHash> BadHashes; + std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { IoHashStream Hasher; @@ -405,8 +406,13 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) { BadHashes.push_back(Hash); } + + ++ChunkCount; + ChunkBytes.fetch_add(Payload.FileSize()); }); + Ctx.ReportScrubbed(ChunkCount, ChunkBytes); + if (!BadHashes.empty()) { ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size()); @@ -428,7 +434,9 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) } } - Ctx.ReportBadChunks(BadHashes); + Ctx.ReportBadCasChunks(BadHashes); + + ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes)); } void |