diff options
| author | Stefan Boberg <[email protected]> | 2021-09-27 12:34:52 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-09-27 12:34:52 +0200 |
| commit | f0036eada7f6bcf6e08afe3ea8517367ed73450e (patch) | |
| tree | b1ce3466bba36175cad369028fad1b410a34b5ec /zenserver | |
| parent | Fixed httpsys Windows compilation error (diff) | |
| parent | GetWindowsErrorAsString() -> GetSystemErrorAsString() (diff) | |
| download | zen-f0036eada7f6bcf6e08afe3ea8517367ed73450e.tar.xz zen-f0036eada7f6bcf6e08afe3ea8517367ed73450e.zip | |
Merged latest from main
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/cachestore.cpp | 252 | ||||
| -rw-r--r-- | zenserver/cache/cachestore.h | 84 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 271 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 1 | ||||
| -rw-r--r-- | zenserver/config.cpp | 14 | ||||
| -rw-r--r-- | zenserver/config.h | 2 | ||||
| -rw-r--r-- | zenserver/diag/logging.cpp | 12 | ||||
| -rw-r--r-- | zenserver/testing/httptest.cpp | 11 | ||||
| -rw-r--r-- | zenserver/testing/httptest.h | 5 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 208 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 54 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 207 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 15 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 4 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 2 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 15 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj | 2 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj.filters | 6 |
18 files changed, 525 insertions, 640 deletions
diff --git a/zenserver/cache/cachestore.cpp b/zenserver/cache/cachestore.cpp deleted file mode 100644 index 2fc253a07..000000000 --- a/zenserver/cache/cachestore.cpp +++ /dev/null @@ -1,252 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "cachestore.h" - -#include <zencore/crc32.h> -#include <zencore/except.h> -#include <zencore/logging.h> -#include <zencore/windows.h> - -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/iobuffer.h> -#include <zencore/string.h> -#include <zencore/thread.h> -#include <zenstore/basicfile.h> -#include <zenstore/cas.h> -#include <zenstore/caslog.h> - -#include <fmt/core.h> -#include <concepts> -#include <filesystem> -#include <gsl/gsl-lite.hpp> -#include <unordered_map> - -#include <atlfile.h> - -using namespace zen; -using namespace fmt::literals; - -namespace UE { - -struct CorruptionTrailer -{ - enum - { - /** Arbitrary number used to identify corruption **/ - MagicConstant = 0x1e873d89 - }; - - uint32_t Magic = MagicConstant; - uint32_t Version = 1; - uint32_t CRCofPayload = 0; - uint32_t SizeOfPayload = 0; - - void Initialize(const void* Data, size_t Size) - { - CRCofPayload = zen::MemCrc32_Deprecated(Data, Size); - SizeOfPayload = (uint32_t)Size; - } -}; - -std::filesystem::path -GenerateDdcPath(std::string_view Key, std::filesystem::path& rootDir) -{ - std::filesystem::path FilePath = rootDir; - - std::string k8{Key}; - for (auto& c : k8) - c = (char)toupper(c); - - const uint32_t Hash = zen::StrCrc_Deprecated(k8.c_str()); - - std::wstring DirName; - - DirName = u'0' + ((Hash / 100) % 10); - FilePath /= DirName; - DirName = u'0' + ((Hash / 10) % 10); - FilePath /= DirName; - DirName = u'0' + (Hash % 10); - FilePath /= DirName; - - FilePath /= Key; - - auto NativePath = FilePath.native(); - NativePath.append(L".udd"); - - return NativePath; -} - -} // namespace UE - -////////////////////////////////////////////////////////////////////////// - -FileCacheStore::FileCacheStore(const char* RootDir, const char* ReadRootDir) -{ - // Ensure root directory exists - create if it doesn't exist already - - ZEN_INFO("Initializing FileCacheStore at '{}'", std::string_view(RootDir)); - - m_RootDir = RootDir; - - std::error_code ErrorCode; - - std::filesystem::create_directories(m_RootDir, ErrorCode); - - if (ErrorCode) - { - ExtendableStringBuilder<256> Name; - WideToUtf8(m_RootDir.c_str(), Name); - - ZEN_ERROR("Could not open file cache directory '{}' for writing ({})", Name.c_str(), ErrorCode.message()); - - m_IsOk = false; - } - - if (ReadRootDir) - { - m_ReadRootDir = ReadRootDir; - - if (std::filesystem::exists(m_ReadRootDir, ErrorCode)) - { - ZEN_INFO("FileCacheStore will use additional read tree at '{}'", std::string_view(ReadRootDir)); - - m_ReadRootIsValid = true; - } - } -} - -FileCacheStore::~FileCacheStore() -{ -} - -bool -FileCacheStore::Get(std::string_view Key, CacheValue& OutValue) -{ - CAtlFile File; - - std::filesystem::path NativePath; - - HRESULT hRes = E_FAIL; - - if (m_ReadRootDir.empty() == false) - { - NativePath = UE::GenerateDdcPath(Key, m_ReadRootDir); - - hRes = File.Create(NativePath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); - } - - if (FAILED(hRes)) - { - NativePath = UE::GenerateDdcPath(Key, m_RootDir); - - hRes = File.Create(NativePath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); - } - - if (FAILED(hRes)) - { - ZEN_DEBUG("GET MISS {}", Key); - - return false; - } - - ULONGLONG FileSize; - File.GetSize(FileSize); - - if (FileSize <= 16) - { - return false; - } - - FileSize -= 16; // CorruptionWrapper trailer - - OutValue.Value = IoBuffer(IoBuffer::File, File.Detach(), 0, FileSize); - - ZEN_DEBUG("GET HIT {}", Key); - - return true; -} - -void -FileCacheStore::Put(std::string_view Key, const CacheValue& Value) -{ - const void* Data = Value.Value.Data(); - size_t Size = Value.Value.Size(); - - UE::CorruptionTrailer Trailer; - Trailer.Initialize(Data, Size); - - std::filesystem::path NativePath = UE::GenerateDdcPath(Key, m_RootDir); - - CAtlTemporaryFile File; - - ZEN_DEBUG("PUT {}", Key); - - HRESULT hRes = File.Create(m_RootDir.c_str()); - - if (SUCCEEDED(hRes)) - { - const uint8_t* WritePointer = reinterpret_cast<const uint8_t*>(Data); - - while (Size) - { - const int MaxChunkSize = 16 * 1024 * 1024; - const int ChunkSize = (int)((Size > MaxChunkSize) ? MaxChunkSize : Size); - - DWORD BytesWritten = 0; - File.Write(WritePointer, ChunkSize, &BytesWritten); - - Size -= BytesWritten; - WritePointer += BytesWritten; - } - - File.Write(&Trailer, sizeof Trailer); - hRes = File.Close(NativePath.c_str()); // This renames the file to its final name - - if (FAILED(hRes)) - { - ZEN_WARN("Failed to rename temp file for key '{}' - deleting temporary file", Key); - - if (!DeleteFile(File.TempFileName())) - { - ZEN_WARN("Temp file for key '{}' could not be deleted - no value persisted", Key); - } - } - } -} - -////////////////////////////////////////////////////////////////////////// - -MemoryCacheStore::MemoryCacheStore() -{ -} - -MemoryCacheStore::~MemoryCacheStore() -{ -} - -bool -MemoryCacheStore::Get(std::string_view InKey, CacheValue& OutValue) -{ - RwLock::SharedLockScope _(m_Lock); - - auto it = m_CacheMap.find(std::string(InKey)); - - if (it == m_CacheMap.end()) - { - return false; - } - else - { - OutValue.Value = it->second; - - return true; - } -} - -void -MemoryCacheStore::Put(std::string_view Key, const CacheValue& Value) -{ - RwLock::ExclusiveLockScope _(m_Lock); - m_CacheMap[std::string(Key)] = Value.Value; -} diff --git a/zenserver/cache/cachestore.h b/zenserver/cache/cachestore.h deleted file mode 100644 index 89c6396b8..000000000 --- a/zenserver/cache/cachestore.h +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/IoBuffer.h> -#include <zencore/iohash.h> -#include <zencore/thread.h> -#include <zencore/uid.h> -#include <zenstore/cas.h> -#include <compare> -#include <filesystem> -#include <unordered_map> - -namespace zen { - -class WideStringBuilderBase; -class CasStore; - -} // namespace zen - -struct CacheValue -{ - zen::IoBuffer Value; -}; - -/****************************************************************************** - - /$$ /$$/$$ /$$ /$$$$$$ /$$ - | $$ /$$| $$ | $$ /$$__ $$ | $$ - | $$ /$$/| $$ | $$ | $$ \__/ /$$$$$$ /$$$$$$| $$$$$$$ /$$$$$$ - | $$$$$/ | $$ / $$/ | $$ |____ $$/$$_____| $$__ $$/$$__ $$ - | $$ $$ \ $$ $$/ | $$ /$$$$$$| $$ | $$ \ $| $$$$$$$$ - | $$\ $$ \ $$$/ | $$ $$/$$__ $| $$ | $$ | $| $$_____/ - | $$ \ $$ \ $/ | $$$$$$| $$$$$$| $$$$$$| $$ | $| $$$$$$$ - |__/ \__/ \_/ \______/ \_______/\_______|__/ |__/\_______/ - - Basic Key-Value cache. No restrictions on keys, and values are always opaque - binary blobs. - -******************************************************************************/ - -class CacheStore -{ -public: - virtual bool Get(std::string_view Key, CacheValue& OutValue) = 0; - virtual void Put(std::string_view Key, const CacheValue& Value) = 0; -}; - -/** File system based implementation - - Emulates the behaviour of UE4 with regards to file system structure, - and also adds a file corruption trailer to remain compatible with - the file-system based implementation (this should be made configurable) - - */ -class FileCacheStore : public CacheStore -{ -public: - FileCacheStore(const char* RootDir, const char* ReadRootDir = nullptr); - ~FileCacheStore(); - - virtual bool Get(std::string_view Key, CacheValue& OutValue) override; - virtual void Put(std::string_view Key, const CacheValue& Value) override; - -private: - std::filesystem::path m_RootDir; - std::filesystem::path m_ReadRootDir; - bool m_IsOk = true; - bool m_ReadRootIsValid = false; -}; - -class MemoryCacheStore : public CacheStore -{ -public: - MemoryCacheStore(); - ~MemoryCacheStore(); - - virtual bool Get(std::string_view Key, CacheValue& OutValue) override; - virtual void Put(std::string_view Key, const CacheValue& Value) override; - -private: - zen::RwLock m_Lock; - std::unordered_map<std::string, zen::IoBuffer> m_CacheMap; -}; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 3d80bb14c..5e93ebaa9 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -22,8 +22,6 @@ #include <gsl/gsl-lite.hpp> #include <unordered_map> -#include <atlfile.h> - ////////////////////////////////////////////////////////////////////////// namespace zen { @@ -131,23 +129,18 @@ ZenCacheMemoryLayer::~ZenCacheMemoryLayer() bool ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope _(m_Lock); - auto it = m_Buckets.find(std::string(InBucket)); + auto it = m_Buckets.find(std::string(InBucket)); - if (it != m_Buckets.end()) - { - Bucket = &it->second; - } + if (it == m_Buckets.end()) + { + return false; } - if (Bucket == nullptr) - return false; + CacheBucket* Bucket = Bucket = &it->second; - ZEN_ASSERT(Bucket != nullptr); + _.ReleaseNow(); return Bucket->Get(HashKey, OutValue); } @@ -177,8 +170,6 @@ ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Bucket = &m_Buckets[std::string(InBucket)]; } - ZEN_ASSERT(Bucket != nullptr); - // Note that since the underlying IoBuffer is retained, the content type is also Bucket->Put(HashKey, Value); @@ -195,7 +186,31 @@ ZenCacheMemoryLayer::DropBucket(std::string_view Bucket) void ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) { - ZEN_UNUSED(Ctx); + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + Kv.second.Scrub(Ctx); + } +} + +void +ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) +{ + std::vector<IoHash> BadHashes; + + for (auto& Kv : m_cacheMap) + { + if (Kv.first != IoHash::HashBuffer(Kv.second)) + { + BadHashes.push_back(Kv.first); + } + } + + if (!BadHashes.empty()) + { + Ctx.ReportBadChunks(BadHashes); + } } bool @@ -203,16 +218,16 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV { RwLock::SharedLockScope _(m_bucketLock); - auto bucketIt = m_cacheMap.find(HashKey); - - if (bucketIt == m_cacheMap.end()) + if (auto bucketIt = m_cacheMap.find(HashKey); bucketIt == m_cacheMap.end()) { return false; } + else + { + OutValue.Value = bucketIt->second; - OutValue.Value = bucketIt->second; - - return true; + return true; + } } void @@ -241,8 +256,19 @@ struct DiskLocation static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } - inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } - inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } + inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } + inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } + inline ZenContentType GetContentType() const + { + ZenContentType ContentType = ZenContentType::kBinary; + + if (IsFlagSet(DiskLocation::kStructured)) + { + ContentType = ZenContentType::kCbObject; + } + + return ContentType; + } }; struct DiskIndexEntry @@ -267,6 +293,7 @@ struct ZenCacheDiskLayer::CacheBucket void Put(const IoHash& HashKey, const ZenCacheValue& Value); void Drop(); void Flush(); + void Scrub(ScrubContext& Ctx); inline bool IsOk() const { return m_Ok; } @@ -277,15 +304,19 @@ private: bool m_Ok = false; uint64_t m_LargeObjectThreshold = 64 * 1024; + // These files are used to manage storage of small objects for this bucket + BasicFile m_SobsFile; TCasLogFile<DiskIndexEntry> m_SlogFile; - void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); - void PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value); - RwLock m_IndexLock; tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index; uint64_t m_WriteCursor = 0; + + void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); + void PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetStandaloneCacheValue(const IoHash& HashKey, ZenCacheValue& OutValue, const DiskLocation& Loc); + bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); }; ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) @@ -320,27 +351,24 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"}; std::filesystem::path SlogPath{m_BucketDir / "zen.slog"}; - CAtlFile ManifestFile; + BasicFile ManifestFile; // Try opening existing manifest file first bool IsNew = false; - HRESULT hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, OPEN_EXISTING); + std::error_code Ec; + ManifestFile.Open(ManifestPath, /* IsCreate */ false, Ec); - if (SUCCEEDED(hRes)) + if (!Ec) { - ULONGLONG FileSize; - ManifestFile.GetSize(FileSize); + uint64_t FileSize = ManifestFile.FileSize(); if (FileSize == sizeof(Oid)) { - hRes = ManifestFile.Read(&m_BucketId, sizeof(Oid)); + ManifestFile.Read(&m_BucketId, sizeof(Oid), 0); - if (SUCCEEDED(hRes)) - { - m_Ok = true; - } + m_Ok = true; } if (!m_Ok) @@ -353,16 +381,16 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) { // No manifest file found, this is a new bucket - hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS); + ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec); - if (FAILED(hRes)) + if (Ec) { - ThrowLastError("Failed to create bucket manifest '{}'"_format(ManifestPath)); + throw std::system_error(Ec, "Failed to create bucket manifest '{}'"_format(ManifestPath)); } m_BucketId.Generate(); - hRes = ManifestFile.Write(&m_BucketId, sizeof(Oid)); + ManifestFile.Write(&m_BucketId, sizeof(Oid), /* FileOffset */ 0); IsNew = true; } @@ -407,6 +435,37 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(WideStringBuilderBase& Path, const IoH } bool +ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue) +{ + if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size); + OutValue.Value.SetContentType(Loc.GetContentType()); + + return true; + } + + return false; +} + +bool +ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, ZenCacheValue& OutValue, const DiskLocation& Loc) +{ + WideStringBuilder<128> DataFilePath; + BuildPath(DataFilePath, HashKey); + + if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str())) + { + OutValue.Value = Data; + OutValue.Value.SetContentType(Loc.GetContentType()); + + return true; + } + + return false; +} + +bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { if (!m_Ok) @@ -420,35 +479,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal { const DiskLocation& Loc = it->second; - ZenContentType ContentType = ZenContentType::kBinary; - - if (Loc.IsFlagSet(DiskLocation::kStructured)) - { - ContentType = ZenContentType::kCbObject; - } - - if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + if (GetInlineCacheValue(Loc, OutValue)) { - OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size); - OutValue.Value.SetContentType(ContentType); - return true; } - else - { - _.ReleaseNow(); - - WideStringBuilder<128> DataFilePath; - BuildPath(DataFilePath, HashKey); - if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str())) - { - OutValue.Value = Data; - OutValue.Value.SetContentType(ContentType); + _.ReleaseNow(); - return true; - } - } + return GetStandaloneCacheValue(HashKey, OutValue, Loc); } return false; @@ -518,9 +556,58 @@ ZenCacheDiskLayer::CacheBucket::Flush() } void -ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) +ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { - ZEN_UNUSED(Ctx); + std::vector<DiskIndexEntry> StandaloneFiles; + + std::vector<IoHash> BadChunks; + std::vector<IoBuffer> BadStandaloneChunks; + + { + RwLock::SharedLockScope _(m_IndexLock); + + for (auto& Kv : m_Index) + { + const IoHash& Hash = Kv.first; + const DiskLocation& Loc = Kv.second; + + ZenCacheValue Value; + + if (!GetInlineCacheValue(Loc, Value)) + { + ZEN_ASSERT(Loc.IsFlagSet(DiskLocation::kStandaloneFile)); + StandaloneFiles.push_back({.Key = Hash, .Location = Loc}); + } + else + { + if (GetStandaloneCacheValue(Hash, Value, Loc)) + { + // Hash contents + + const IoHash ComputedHash = HashBuffer(Value.Value); + + if (ComputedHash != Hash) + { + BadChunks.push_back(Hash); + } + } + else + { + // Non-existent + } + } + } + } + + if (Ctx.RunRecovery()) + { + // Clean out bad chunks + } + + if (!BadChunks.empty()) + { + Ctx.ReportBadChunks(BadChunks); + } } void @@ -529,35 +616,38 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenC WideStringBuilder<128> DataFilePath; BuildPath(DataFilePath, HashKey); - // TODO: replace this process with a more efficient implementation with proper atomic rename - // and also avoid creating directories if we can - - std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path(); - CreateDirectories(ParentPath); + TemporaryFile DataFile; - CAtlTemporaryFile DataFile; + std::error_code Ec; + DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); - HRESULT hRes = DataFile.Create(m_BucketDir.c_str()); - - if (FAILED(hRes)) + if (Ec) { - ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); + throw std::system_error(Ec, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); } - hRes = DataFile.Write(Value.Value.Data(), gsl::narrow<DWORD>(Value.Value.Size())); + DataFile.WriteAll(Value.Value, Ec); - if (FAILED(hRes)) + if (Ec) { - ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); + throw std::system_error(Ec, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); } - // Move file into place (note: not fully atomic!) + // Move file into place (atomically) - hRes = DataFile.Close(DataFilePath.c_str()); + DataFile.MoveTemporaryIntoPlace(DataFilePath.c_str(), Ec); - if (FAILED(hRes)) + if (Ec) { - ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath))); + std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path(); + CreateDirectories(ParentPath); + + DataFile.MoveTemporaryIntoPlace(DataFilePath.c_str(), Ec); + + if (Ec) + { + throw std::system_error(Ec, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath))); + } } // Update index @@ -729,6 +819,17 @@ ZenCacheDiskLayer::Flush() } } +void +ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) +{ + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + Kv.second.Scrub(Ctx); + } +} + ////////////////////////////////////////////////////////////////////////// ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore) diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 2cc3abb53..f96757409 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -65,6 +65,7 @@ private: bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); void Put(const IoHash& HashKey, const ZenCacheValue& Value); + void Scrub(ScrubContext& Ctx); }; RwLock m_Lock; diff --git a/zenserver/config.cpp b/zenserver/config.cpp index c21638258..91fb80747 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -188,6 +188,13 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z options.add_option("cache", "", + "upstream-jupiter-prod", + "Enable Jupiter upstream caching using production settings", + cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.UseProductionSettings)->default_value("false"), + ""); + + options.add_option("cache", + "", "upstream-jupiter-dev", "Enable Jupiter upstream caching using development settings", cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.UseDevelopmentSettings)->default_value("false"), @@ -214,6 +221,13 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"), ""); + options.add_option("cache", + "", + "upstream-stats", + "Collect performance metrics for upstream endpoints", + cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"), + ""); + try { auto result = options.parse(argc, argv); diff --git a/zenserver/config.h b/zenserver/config.h index 6ade1b401..ce059bdb2 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -28,6 +28,7 @@ struct ZenUpstreamJupiterConfig std::string Namespace; std::string DdcNamespace; bool UseDevelopmentSettings = false; + bool UseProductionSettings = false; bool UseLegacyDdc = false; }; @@ -50,6 +51,7 @@ struct ZenUpstreamCacheConfig ZenUpstreamZenConfig ZenConfig; int UpstreamThreadCount = 4; UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite; + bool StatsEnabled = false; }; struct ZenServiceConfig diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp index 41b140f90..bc7b883b5 100644 --- a/zenserver/diag/logging.cpp +++ b/zenserver/diag/logging.cpp @@ -258,6 +258,18 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) } #endif + // HTTP server request logging + + std::filesystem::path HttpLogPath = GlobalOptions.DataDir / "logs/http.log"; + + auto HttpSink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(zen::WideToUtf8(HttpLogPath.c_str()), + /* max size */ 128 * 1024 * 1024, + /* max files */ 16, + /* rotate on open */ true); + + auto HttpLogger = std::make_shared<spdlog::logger>("http_requests", HttpSink); + spdlog::register_logger(HttpLogger); + // Jupiter - only log HTTP traffic to file auto JupiterLogger = std::make_shared<spdlog::logger>("jupiter", FileSink); diff --git a/zenserver/testing/httptest.cpp b/zenserver/testing/httptest.cpp index c4fd6003c..18d63a6ef 100644 --- a/zenserver/testing/httptest.cpp +++ b/zenserver/testing/httptest.cpp @@ -2,6 +2,7 @@ #include "httptest.h" +#include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> namespace zen { @@ -14,6 +15,16 @@ HttpTestingService::HttpTestingService() HttpVerb::kGet); m_Router.RegisterRoute( + "json", + [this](HttpRouterRequest& Req) { + CbObjectWriter Obj; + Obj.AddBool("ok", true); + Obj.AddInteger("counter", ++m_Counter); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "echo", [this](HttpRouterRequest& Req) { IoBuffer Body = Req.ServerRequest().ReadPayload(); diff --git a/zenserver/testing/httptest.h b/zenserver/testing/httptest.h index b445fb450..f55780d05 100644 --- a/zenserver/testing/httptest.h +++ b/zenserver/testing/httptest.h @@ -5,6 +5,8 @@ #include <zencore/logging.h> #include <zenhttp/httpserver.h> +#include <atomic> + namespace zen { /** @@ -37,7 +39,8 @@ public: }; private: - HttpRequestRouter m_Router; + HttpRequestRouter m_Router; + std::atomic<uint32_t> m_Counter{0}; RwLock m_RwLock; std::unordered_map<uint32_t, Ref<PackageHandler>> m_HandlerMap; diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 14da8cbcc..6eaa6423b 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -40,22 +40,32 @@ namespace detail { CloudCacheSessionState(CloudCacheClient& Client) : OwnerClient(Client) {} ~CloudCacheSessionState() {} - void Reset() + const CloudCacheAccessToken& GetAccessToken() { - std::string Auth; - OwnerClient.AcquireAccessToken(Auth); + if (!AccessToken.IsValid()) + { + AccessToken = OwnerClient.AcquireAccessToken(); + } + return AccessToken; + } + void InvalidateAccessToken() { AccessToken = {}; } + + void Reset() + { Session.SetBody({}); - Session.SetOption(cpr::Header{{"Authorization", Auth}}); + Session.SetHeader({}); + AccessToken = GetAccessToken(); } - CloudCacheClient& OwnerClient; - cpr::Session Session; + CloudCacheClient& OwnerClient; + CloudCacheAccessToken AccessToken; + cpr::Session Session; }; } // namespace detail -CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_Log(OuterClient->Logger()), m_CacheClient(OuterClient) +CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) { m_SessionState = m_CacheClient->AllocSessionState(); } @@ -68,16 +78,18 @@ CloudCacheSession::~CloudCacheSession() CloudCacheResult CloudCacheSession::Authenticate() { - std::string Auth; - const bool Success = m_CacheClient->AcquireAccessToken(Auth); - return {.Success = Success}; + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + return {.Success = AccessToken.IsValid()}; } CloudCacheResult CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key) { - std::string Auth; - m_CacheClient->AcquireAccessToken(Auth); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw"; @@ -85,7 +97,7 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", Auth}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -94,6 +106,10 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -110,9 +126,13 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key) CloudCacheResult CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType) { + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; - std::string Auth; - m_CacheClient->AcquireAccessToken(Auth); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" @@ -121,7 +141,7 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", ContentType}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -130,6 +150,10 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -140,8 +164,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte CloudCacheResult CloudCacheSession::GetCompressedBlob(const IoHash& Key) { - std::string Auth; - m_CacheClient->AcquireAccessToken(Auth); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); @@ -149,7 +176,7 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-comp"}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -168,10 +195,13 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) CloudCacheResult CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) { - IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size()); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } - std::string Auth; - m_CacheClient->AcquireAccessToken(Auth); + IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size()); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; @@ -179,8 +209,9 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke auto& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, + {"X-Jupiter-IoHash", Hash.ToHexString()}, + {"Content-Type", "application/octet-stream"}}); Session.SetBody(cpr::Body{(const char*)DerivedData.Data(), DerivedData.Size()}); cpr::Response Response = Session.Put(); @@ -190,6 +221,10 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, @@ -205,11 +240,15 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, CloudCacheResult CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; - std::string Auth; - m_CacheClient->AcquireAccessToken(Auth); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" @@ -218,7 +257,8 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}}); + Session.SetOption( + cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}}); Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); cpr::Response Response = Session.Put(); @@ -228,6 +268,10 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, @@ -237,8 +281,11 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer CloudCacheResult CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) { - std::string Auth; - m_CacheClient->AcquireAccessToken(Auth); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); @@ -246,7 +293,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Content-Type", "application/x-ue-comp"}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); cpr::Response Response = Session.Put(); @@ -256,6 +303,10 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, @@ -274,22 +325,21 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& return {}; } -////////////////////////////////////////////////////////////////////////// - -std::string -CloudCacheAccessToken::GetAuthorizationHeaderValue() +const CloudCacheAccessToken& +CloudCacheSession::GetAccessToken() { - RwLock::SharedLockScope _(m_Lock); - - return "Bearer {}"_format(m_Token); + return m_SessionState->GetAccessToken(); } -inline void -CloudCacheAccessToken::SetToken(std::string_view Token) +bool +CloudCacheSession::VerifyAccessToken(long StatusCode) { - RwLock::ExclusiveLockScope _(m_Lock); - m_Token = Token; - ++m_Serial; + if (StatusCode == 401) + { + m_SessionState->InvalidateAccessToken(); + return false; + } + return true; } ////////////////////////////////////////////////////////////////////////// @@ -354,60 +404,33 @@ CloudCacheClient::~CloudCacheClient() } } -bool -CloudCacheClient::AcquireAccessToken(std::string& AuthorizationHeaderValue) +CloudCacheAccessToken +CloudCacheClient::AcquireAccessToken() { - // TODO: check for expiration - - if (!m_IsValid) - { - ExtendableStringBuilder<128> OAuthFormData; - OAuthFormData << "client_id=" << m_OAuthClientId - << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret; - - const uint32_t CurrentSerial = m_AccessToken.GetSerial(); + using namespace std::chrono; - static RwLock AuthMutex; - RwLock::ExclusiveLockScope _(AuthMutex); - - // Protect against redundant authentication operations - if (m_AccessToken.GetSerial() != CurrentSerial) - { - // TODO: this could verify that the token is actually valid and retry if not? - - return true; - } + ExtendableStringBuilder<128> OAuthFormData; + OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret; - std::string data{OAuthFormData}; + std::string Body{OAuthFormData}; - cpr::Response Response = - cpr::Post(cpr::Url{m_OAuthFullUri}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{data}); + cpr::Response Response = + cpr::Post(cpr::Url{m_OAuthFullUri}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{Body}); - std::string Body{std::move(Response.text)}; + Body = std::move(Response.text); - // Parse JSON response - - std::string JsonError; - json11::Json JsonResponse = json11::Json::parse(Body, /* out */ JsonError); - if (!JsonError.empty()) - { - ZEN_WARN("failed to parse OAuth response: '{}'", JsonError); - - return false; - } - - std::string AccessToken = JsonResponse["access_token"].string_value(); - int ExpiryTimeSeconds = JsonResponse["expires_in"].int_value(); - ZEN_UNUSED(ExpiryTimeSeconds); - - m_AccessToken.SetToken(AccessToken); - - m_IsValid = true; + std::string JsonError; + json11::Json JsonResponse = json11::Json::parse(Body, JsonError); + if (!JsonError.empty()) + { + return {}; } - AuthorizationHeaderValue = m_AccessToken.GetAuthorizationHeaderValue(); + std::string AccessToken = std::string("Bearer ") + JsonResponse["access_token"].string_value(); + int64_t ExpiresInSeconds = static_cast<int64_t>(JsonResponse["expires_in"].int_value()); + steady_clock::time_point ExpireTime = steady_clock::now() + seconds(ExpiresInSeconds); - return true; + return {std::move(AccessToken), ExpireTime}; } detail::CloudCacheSessionState* @@ -434,8 +457,19 @@ CloudCacheClient::AllocSessionState() void CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State) { + const bool IsTokenValid = State->AccessToken.IsValid(); + RwLock::ExclusiveLockScope _(m_SessionStateLock); m_SessionStateCache.push_front(State); + + // Invalidate all cached access tokens if any one fails + if (!IsTokenValid) + { + for (auto& CachedState : m_SessionStateCache) + { + CachedState->AccessToken = {}; + } + } } } // namespace zen diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 94e7e7680..868a7b099 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -8,6 +8,7 @@ #include <zenhttp/httpserver.h> #include <atomic> +#include <chrono> #include <list> #include <memory> #include <vector> @@ -29,15 +30,17 @@ class CbObjectView; */ struct CloudCacheAccessToken { - std::string GetAuthorizationHeaderValue(); - void SetToken(std::string_view Token); + static constexpr int64_t ExpireMarginInSeconds = 30; - inline uint32_t GetSerial() const { return m_Serial.load(std::memory_order::memory_order_relaxed); } + std::string Value; + std::chrono::steady_clock::time_point ExpireTime; -private: - RwLock m_Lock; - std::string m_Token; - std::atomic<uint32_t> m_Serial; + bool IsValid() const + { + return !Value.empty() && + ExpireMarginInSeconds < + std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - std::chrono::steady_clock::now()).count(); + } }; struct CloudCacheResult @@ -60,7 +63,7 @@ struct CloudCacheResult class CloudCacheSession { public: - CloudCacheSession(CloudCacheClient* OuterClient); + CloudCacheSession(CloudCacheClient* CacheClient); ~CloudCacheSession(); CloudCacheResult Authenticate(); @@ -77,7 +80,9 @@ public: std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); private: - inline spdlog::logger& Log() { return m_Log; } + inline spdlog::logger& Log() { return m_Log; } + const CloudCacheAccessToken& GetAccessToken(); + bool VerifyAccessToken(long StatusCode); spdlog::logger& m_Log; RefPtr<CloudCacheClient> m_CacheClient; @@ -104,26 +109,25 @@ public: CloudCacheClient(const CloudCacheClientOptions& Options); ~CloudCacheClient(); - bool AcquireAccessToken(std::string& AuthorizationHeaderValue); - std::string_view DdcNamespace() const { return m_DdcNamespace; } - std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; } - std::string_view ServiceUrl() const { return m_ServiceUrl; } - bool IsValid() const { return m_IsValid; } + CloudCacheAccessToken AcquireAccessToken(); + std::string_view DdcNamespace() const { return m_DdcNamespace; } + std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; } + std::string_view ServiceUrl() const { return m_ServiceUrl; } + bool IsValid() const { return m_IsValid; } spdlog::logger& Logger() { return m_Log; } private: - spdlog::logger& m_Log; - std::string m_ServiceUrl; - std::string m_OAuthDomain; - std::string m_OAuthUriPath; - std::string m_OAuthFullUri; - std::string m_DdcNamespace; - std::string m_BlobStoreNamespace; - std::string m_OAuthClientId; - std::string m_OAuthSecret; - CloudCacheAccessToken m_AccessToken; - bool m_IsValid = false; + spdlog::logger& m_Log; + std::string m_ServiceUrl; + std::string m_OAuthDomain; + std::string m_OAuthUriPath; + std::string m_OAuthFullUri; + std::string m_DdcNamespace; + std::string m_BlobStoreNamespace; + std::string m_OAuthClientId; + std::string m_OAuthSecret; + bool m_IsValid = false; RwLock m_SessionStateLock; std::list<detail::CloudCacheSessionState*> m_SessionStateCache; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index a889fb984..f056c1c76 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -153,6 +153,7 @@ namespace detail { CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + Result.Bytes += AttachmentResult.Bytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; @@ -176,7 +177,6 @@ namespace detail { Package.Save(Writer); Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - Result.Bytes = MemStream.Size(); } } } @@ -247,21 +247,26 @@ namespace detail { } else { + bool Success = false; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { - CloudCacheResult Result; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) { - Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + Result.Success) + { + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + Success = true; + break; + } } - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - - if (!Result.Success) + if (!Success) { return {.Reason = "Failed to upload payload", .Bytes = TotalBytes, @@ -270,29 +275,38 @@ namespace detail { } } - CloudCacheResult Result; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) { - Result = - Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); + if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RecordValue, + ZenContentType::kCbObject); + Result.Success) + { + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + Success = true; + break; + } } - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success}; } } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Reason = std::string(Err.what()), .Success = false}; } } + virtual UpstreamEndpointStats& Stats() override { return m_Stats; } + private: bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; + UpstreamEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; }; @@ -317,9 +331,9 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; - for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt) + for (int32_t Attempt = 0, MaxAttempts = 2; Attempt < MaxAttempts && !Result.Success; ++Attempt) { - Result = Session.SayHello(); + Result = Session.CheckHealth(); } m_HealthOk = Result.ErrorCode == 0; @@ -473,9 +487,12 @@ namespace detail { } } + virtual UpstreamEndpointStats& Stats() override { return m_Stats; } + private: std::string m_DisplayName; RefPtr<ZenStructuredCacheClient> m_Client; + UpstreamEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; }; @@ -483,87 +500,95 @@ namespace detail { ////////////////////////////////////////////////////////////////////////// -class UpstreamStats final +struct UpstreamStats { static constexpr uint64_t MaxSampleCount = 100ull; - struct StatCounters - { - int64_t Bytes = {}; - int64_t Count = {}; - double Seconds = {}; - }; - - using StatsMap = std::unordered_map<const UpstreamEndpoint*, StatCounters>; - - struct EndpointStats - { - mutable std::mutex Lock; - StatsMap Counters; - }; - -public: - UpstreamStats() : m_Log(logging::Get("upstream")) {} + UpstreamStats(bool Enabled) : m_Enabled(Enabled) {} - void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result) + void Add(spdlog::logger& Logger, + UpstreamEndpoint& Endpoint, + const GetUpstreamCacheResult& Result, + const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) { - std::unique_lock Lock(m_DownStats.Lock); + if (!m_Enabled) + { + return; + } - auto& Counters = m_DownStats.Counters[&Endpoint]; - Counters.Bytes += Result.Bytes; - Counters.Seconds += Result.ElapsedSeconds; - Counters.Count++; + UpstreamEndpointStats& Stats = Endpoint.Stats(); + if (Result.Success) + { + Stats.HitCount++; + Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); + Stats.SecondsDown.fetch_add(Result.ElapsedSeconds); + } + else + { + Stats.MissCount++; + } - if (Counters.Count >= MaxSampleCount) + if (m_SampleCount++ % MaxSampleCount) { - LogStats("STATS - (downstream):"sv, m_DownStats.Counters); - Counters = StatCounters{}; + Dump(Logger, Endpoints); } } - void Add(const UpstreamEndpoint& Endpoint, const PutUpstreamCacheResult& Result) + void Add(spdlog::logger& Logger, + UpstreamEndpoint& Endpoint, + const PutUpstreamCacheResult& Result, + const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) { - std::unique_lock Lock(m_UpStats.Lock); + if (!m_Enabled) + { + return; + } - auto& Counters = m_UpStats.Counters[&Endpoint]; - Counters.Bytes += Result.Bytes; - Counters.Seconds += Result.ElapsedSeconds; - Counters.Count++; + UpstreamEndpointStats& Stats = Endpoint.Stats(); + if (Result.Success) + { + Stats.UpCount++; + Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); + Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); + } - if (Counters.Count >= MaxSampleCount) + if (m_SampleCount++ % MaxSampleCount) { - LogStats("STATS - (upstream):"sv, m_UpStats.Counters); - Counters = StatCounters{}; + Dump(Logger, Endpoints); } } -private: - void LogStats(std::string_view What, const std::unordered_map<const UpstreamEndpoint*, StatCounters>& EndpointStats) + void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) { - for (const auto& Kv : EndpointStats) + for (auto& Ep : Endpoints) { - const UpstreamEndpoint& Endpoint = *Kv.first; - const StatCounters& Counters = Kv.second; - const double TotalMb = double(Counters.Bytes) / 1024.0 / 1024.0; - - ZEN_UNUSED(Endpoint); - - ZEN_INFO("{} Endpoint: {}, Bytes: {:.2f} MB, Time: {:.2f} s, Speed: {:.2f} MB/s, Avg: {:.2f} ms/request, Samples: {}", - What, - Kv.first->DisplayName(), - TotalMb, - Counters.Seconds, - TotalMb / Counters.Seconds, - (Counters.Seconds * 1000.0) / double(Counters.Count), - Counters.Count); + // These stats will not be totally correct as the numbers are not captured atomically + + UpstreamEndpointStats& Stats = Ep->Stats(); + const uint64_t HitCount = Stats.HitCount; + const uint64_t MissCount = Stats.MissCount; + const double DownBytes = Stats.DownBytes; + const double SecondsDown = Stats.SecondsDown; + const double UpBytes = Stats.UpBytes; + const double SecondsUp = Stats.SecondsUp; + + const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0; + const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0; + const uint64_t TotalCount = HitCount + MissCount; + const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; + + Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", + Ep->DisplayName(), + HitRate, + DownBytes, + DownSpeed, + UpBytes, + UpSpeed); } } - spdlog::logger& Log() { return m_Log; } - - spdlog::logger& m_Log; - EndpointStats m_UpStats; - EndpointStats m_DownStats; + bool m_Enabled; + std::atomic_uint64_t m_SampleCount = {}; }; ////////////////////////////////////////////////////////////////////////// @@ -576,6 +601,7 @@ public: , m_Options(Options) , m_CacheStore(CacheStore) , m_CidStore(CidStore) + , m_Stats(Options.StatsEnabled) { } @@ -621,9 +647,11 @@ public: { if (Endpoint->IsHealthy()) { - if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + const GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + + if (Result.Success) { - m_Stats.Add(*Endpoint, Result); return Result; } } @@ -641,9 +669,11 @@ public: { if (Endpoint->IsHealthy()) { - if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + + if (Result.Success) { - m_Stats.Add(*Endpoint, Result); return Result; } } @@ -707,18 +737,7 @@ private: if (Endpoint->IsHealthy()) { const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); - if (Result.Success) - { - m_Stats.Add(*Endpoint, Result); - } - else - { - ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - Endpoint->DisplayName(), - Result.Reason); - } + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); } } } diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 96ee8bddc..0e736480b 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -6,6 +6,7 @@ #include <zencore/iohash.h> #include <zencore/zencore.h> +#include <atomic> #include <chrono> #include <memory> @@ -40,6 +41,7 @@ struct UpstreamCacheOptions uint32_t ThreadCount = 4; bool ReadUpstream = true; bool WriteUpstream = true; + bool StatsEnabled = false; }; enum class UpstreamStatusCode : uint8_t @@ -79,6 +81,17 @@ struct UpstreamEndpointHealth bool Ok = false; }; +struct UpstreamEndpointStats +{ + std::atomic_uint64_t HitCount = {}; + std::atomic_uint64_t MissCount = {}; + std::atomic_uint64_t UpCount = {}; + std::atomic<double> UpBytes = {}; + std::atomic<double> DownBytes = {}; + std::atomic<double> SecondsUp = {}; + std::atomic<double> SecondsDown = {}; +}; + /** * The upstream endpont is responsible for handling upload/downloading of cache records. */ @@ -100,6 +113,8 @@ public: virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span<IoBuffer const> Payloads) = 0; + + virtual UpstreamEndpointStats& Stats() = 0; }; /** diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 710d381c6..530bed32a 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -382,10 +382,10 @@ ZenStructuredCacheSession::~ZenStructuredCacheSession() } ZenCacheResult -ZenStructuredCacheSession::SayHello() +ZenStructuredCacheSession::CheckHealth() { ExtendableStringBuilder<256> Uri; - Uri << m_Client.ServiceUrl() << "/test/hello"; + Uri << m_Client.ServiceUrl() << "/health/check"; cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 48886096d..158be668a 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -111,7 +111,7 @@ public: ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient); ~ZenStructuredCacheSession(); - ZenCacheResult SayHello(); + ZenCacheResult CheckHealth(); ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type); ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId); ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index e3b61568f..fe4f41ab5 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -188,6 +188,8 @@ public: UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); } + UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled; + UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); if (!UpstreamConfig.ZenConfig.Url.empty()) @@ -198,7 +200,18 @@ public: { zen::CloudCacheClientOptions Options; - if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) + if (UpstreamConfig.JupiterConfig.UseProductionSettings) + { + Options = zen::CloudCacheClientOptions{ + .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, + .DdcNamespace = "ue.ddc"sv, + .BlobStoreNamespace = "ue.ddc"sv, + .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, + .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, + .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, + .UseLegacyDdc = false}; + } + else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) { Options = zen::CloudCacheClientOptions{ .ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index 1671d98a6..29436d840 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -115,7 +115,6 @@ <ClInclude Include="upstream\jupiter.h" /> <ClInclude Include="projectstore.h" /> <ClInclude Include="cache\cacheagent.h" /> - <ClInclude Include="cache\cachestore.h" /> <ClInclude Include="testing\launch.h" /> <ClInclude Include="casstore.h" /> <ClInclude Include="diag\diagsvcs.h" /> @@ -138,7 +137,6 @@ <ClCompile Include="testing\httptest.cpp" /> <ClCompile Include="upstream\jupiter.cpp" /> <ClCompile Include="testing\launch.cpp" /> - <ClCompile Include="cache\cachestore.cpp" /> <ClCompile Include="casstore.cpp" /> <ClCompile Include="experimental\usnjournal.cpp" /> <ClCompile Include="upstream\upstreamcache.cpp" /> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index c51a8eb76..6b99ca8d7 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -9,9 +9,6 @@ <ClInclude Include="cache\cacheagent.h"> <Filter>cache</Filter> </ClInclude> - <ClInclude Include="cache\cachestore.h"> - <Filter>cache</Filter> - </ClInclude> <ClInclude Include="diag\diagsvcs.h"> <Filter>diag</Filter> </ClInclude> @@ -50,9 +47,6 @@ <ClCompile Include="cache\cacheagent.cpp"> <Filter>cache</Filter> </ClCompile> - <ClCompile Include="cache\cachestore.cpp"> - <Filter>cache</Filter> - </ClCompile> <ClCompile Include="experimental\usnjournal.cpp"> <Filter>experimental</Filter> </ClCompile> |