aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-09-27 12:34:52 +0200
committerStefan Boberg <[email protected]>2021-09-27 12:34:52 +0200
commitf0036eada7f6bcf6e08afe3ea8517367ed73450e (patch)
treeb1ce3466bba36175cad369028fad1b410a34b5ec /zenserver
parentFixed httpsys Windows compilation error (diff)
parentGetWindowsErrorAsString() -> GetSystemErrorAsString() (diff)
downloadzen-f0036eada7f6bcf6e08afe3ea8517367ed73450e.tar.xz
zen-f0036eada7f6bcf6e08afe3ea8517367ed73450e.zip
Merged latest from main
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/cache/cachestore.cpp252
-rw-r--r--zenserver/cache/cachestore.h84
-rw-r--r--zenserver/cache/structuredcachestore.cpp271
-rw-r--r--zenserver/cache/structuredcachestore.h1
-rw-r--r--zenserver/config.cpp14
-rw-r--r--zenserver/config.h2
-rw-r--r--zenserver/diag/logging.cpp12
-rw-r--r--zenserver/testing/httptest.cpp11
-rw-r--r--zenserver/testing/httptest.h5
-rw-r--r--zenserver/upstream/jupiter.cpp208
-rw-r--r--zenserver/upstream/jupiter.h54
-rw-r--r--zenserver/upstream/upstreamcache.cpp207
-rw-r--r--zenserver/upstream/upstreamcache.h15
-rw-r--r--zenserver/upstream/zen.cpp4
-rw-r--r--zenserver/upstream/zen.h2
-rw-r--r--zenserver/zenserver.cpp15
-rw-r--r--zenserver/zenserver.vcxproj2
-rw-r--r--zenserver/zenserver.vcxproj.filters6
18 files changed, 525 insertions, 640 deletions
diff --git a/zenserver/cache/cachestore.cpp b/zenserver/cache/cachestore.cpp
deleted file mode 100644
index 2fc253a07..000000000
--- a/zenserver/cache/cachestore.cpp
+++ /dev/null
@@ -1,252 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "cachestore.h"
-
-#include <zencore/crc32.h>
-#include <zencore/except.h>
-#include <zencore/logging.h>
-#include <zencore/windows.h>
-
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/iobuffer.h>
-#include <zencore/string.h>
-#include <zencore/thread.h>
-#include <zenstore/basicfile.h>
-#include <zenstore/cas.h>
-#include <zenstore/caslog.h>
-
-#include <fmt/core.h>
-#include <concepts>
-#include <filesystem>
-#include <gsl/gsl-lite.hpp>
-#include <unordered_map>
-
-#include <atlfile.h>
-
-using namespace zen;
-using namespace fmt::literals;
-
-namespace UE {
-
-struct CorruptionTrailer
-{
- enum
- {
- /** Arbitrary number used to identify corruption **/
- MagicConstant = 0x1e873d89
- };
-
- uint32_t Magic = MagicConstant;
- uint32_t Version = 1;
- uint32_t CRCofPayload = 0;
- uint32_t SizeOfPayload = 0;
-
- void Initialize(const void* Data, size_t Size)
- {
- CRCofPayload = zen::MemCrc32_Deprecated(Data, Size);
- SizeOfPayload = (uint32_t)Size;
- }
-};
-
-std::filesystem::path
-GenerateDdcPath(std::string_view Key, std::filesystem::path& rootDir)
-{
- std::filesystem::path FilePath = rootDir;
-
- std::string k8{Key};
- for (auto& c : k8)
- c = (char)toupper(c);
-
- const uint32_t Hash = zen::StrCrc_Deprecated(k8.c_str());
-
- std::wstring DirName;
-
- DirName = u'0' + ((Hash / 100) % 10);
- FilePath /= DirName;
- DirName = u'0' + ((Hash / 10) % 10);
- FilePath /= DirName;
- DirName = u'0' + (Hash % 10);
- FilePath /= DirName;
-
- FilePath /= Key;
-
- auto NativePath = FilePath.native();
- NativePath.append(L".udd");
-
- return NativePath;
-}
-
-} // namespace UE
-
-//////////////////////////////////////////////////////////////////////////
-
-FileCacheStore::FileCacheStore(const char* RootDir, const char* ReadRootDir)
-{
- // Ensure root directory exists - create if it doesn't exist already
-
- ZEN_INFO("Initializing FileCacheStore at '{}'", std::string_view(RootDir));
-
- m_RootDir = RootDir;
-
- std::error_code ErrorCode;
-
- std::filesystem::create_directories(m_RootDir, ErrorCode);
-
- if (ErrorCode)
- {
- ExtendableStringBuilder<256> Name;
- WideToUtf8(m_RootDir.c_str(), Name);
-
- ZEN_ERROR("Could not open file cache directory '{}' for writing ({})", Name.c_str(), ErrorCode.message());
-
- m_IsOk = false;
- }
-
- if (ReadRootDir)
- {
- m_ReadRootDir = ReadRootDir;
-
- if (std::filesystem::exists(m_ReadRootDir, ErrorCode))
- {
- ZEN_INFO("FileCacheStore will use additional read tree at '{}'", std::string_view(ReadRootDir));
-
- m_ReadRootIsValid = true;
- }
- }
-}
-
-FileCacheStore::~FileCacheStore()
-{
-}
-
-bool
-FileCacheStore::Get(std::string_view Key, CacheValue& OutValue)
-{
- CAtlFile File;
-
- std::filesystem::path NativePath;
-
- HRESULT hRes = E_FAIL;
-
- if (m_ReadRootDir.empty() == false)
- {
- NativePath = UE::GenerateDdcPath(Key, m_ReadRootDir);
-
- hRes = File.Create(NativePath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
- }
-
- if (FAILED(hRes))
- {
- NativePath = UE::GenerateDdcPath(Key, m_RootDir);
-
- hRes = File.Create(NativePath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
- }
-
- if (FAILED(hRes))
- {
- ZEN_DEBUG("GET MISS {}", Key);
-
- return false;
- }
-
- ULONGLONG FileSize;
- File.GetSize(FileSize);
-
- if (FileSize <= 16)
- {
- return false;
- }
-
- FileSize -= 16; // CorruptionWrapper trailer
-
- OutValue.Value = IoBuffer(IoBuffer::File, File.Detach(), 0, FileSize);
-
- ZEN_DEBUG("GET HIT {}", Key);
-
- return true;
-}
-
-void
-FileCacheStore::Put(std::string_view Key, const CacheValue& Value)
-{
- const void* Data = Value.Value.Data();
- size_t Size = Value.Value.Size();
-
- UE::CorruptionTrailer Trailer;
- Trailer.Initialize(Data, Size);
-
- std::filesystem::path NativePath = UE::GenerateDdcPath(Key, m_RootDir);
-
- CAtlTemporaryFile File;
-
- ZEN_DEBUG("PUT {}", Key);
-
- HRESULT hRes = File.Create(m_RootDir.c_str());
-
- if (SUCCEEDED(hRes))
- {
- const uint8_t* WritePointer = reinterpret_cast<const uint8_t*>(Data);
-
- while (Size)
- {
- const int MaxChunkSize = 16 * 1024 * 1024;
- const int ChunkSize = (int)((Size > MaxChunkSize) ? MaxChunkSize : Size);
-
- DWORD BytesWritten = 0;
- File.Write(WritePointer, ChunkSize, &BytesWritten);
-
- Size -= BytesWritten;
- WritePointer += BytesWritten;
- }
-
- File.Write(&Trailer, sizeof Trailer);
- hRes = File.Close(NativePath.c_str()); // This renames the file to its final name
-
- if (FAILED(hRes))
- {
- ZEN_WARN("Failed to rename temp file for key '{}' - deleting temporary file", Key);
-
- if (!DeleteFile(File.TempFileName()))
- {
- ZEN_WARN("Temp file for key '{}' could not be deleted - no value persisted", Key);
- }
- }
- }
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-MemoryCacheStore::MemoryCacheStore()
-{
-}
-
-MemoryCacheStore::~MemoryCacheStore()
-{
-}
-
-bool
-MemoryCacheStore::Get(std::string_view InKey, CacheValue& OutValue)
-{
- RwLock::SharedLockScope _(m_Lock);
-
- auto it = m_CacheMap.find(std::string(InKey));
-
- if (it == m_CacheMap.end())
- {
- return false;
- }
- else
- {
- OutValue.Value = it->second;
-
- return true;
- }
-}
-
-void
-MemoryCacheStore::Put(std::string_view Key, const CacheValue& Value)
-{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_CacheMap[std::string(Key)] = Value.Value;
-}
diff --git a/zenserver/cache/cachestore.h b/zenserver/cache/cachestore.h
deleted file mode 100644
index 89c6396b8..000000000
--- a/zenserver/cache/cachestore.h
+++ /dev/null
@@ -1,84 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/IoBuffer.h>
-#include <zencore/iohash.h>
-#include <zencore/thread.h>
-#include <zencore/uid.h>
-#include <zenstore/cas.h>
-#include <compare>
-#include <filesystem>
-#include <unordered_map>
-
-namespace zen {
-
-class WideStringBuilderBase;
-class CasStore;
-
-} // namespace zen
-
-struct CacheValue
-{
- zen::IoBuffer Value;
-};
-
-/******************************************************************************
-
- /$$ /$$/$$ /$$ /$$$$$$ /$$
- | $$ /$$| $$ | $$ /$$__ $$ | $$
- | $$ /$$/| $$ | $$ | $$ \__/ /$$$$$$ /$$$$$$| $$$$$$$ /$$$$$$
- | $$$$$/ | $$ / $$/ | $$ |____ $$/$$_____| $$__ $$/$$__ $$
- | $$ $$ \ $$ $$/ | $$ /$$$$$$| $$ | $$ \ $| $$$$$$$$
- | $$\ $$ \ $$$/ | $$ $$/$$__ $| $$ | $$ | $| $$_____/
- | $$ \ $$ \ $/ | $$$$$$| $$$$$$| $$$$$$| $$ | $| $$$$$$$
- |__/ \__/ \_/ \______/ \_______/\_______|__/ |__/\_______/
-
- Basic Key-Value cache. No restrictions on keys, and values are always opaque
- binary blobs.
-
-******************************************************************************/
-
-class CacheStore
-{
-public:
- virtual bool Get(std::string_view Key, CacheValue& OutValue) = 0;
- virtual void Put(std::string_view Key, const CacheValue& Value) = 0;
-};
-
-/** File system based implementation
-
- Emulates the behaviour of UE4 with regards to file system structure,
- and also adds a file corruption trailer to remain compatible with
- the file-system based implementation (this should be made configurable)
-
- */
-class FileCacheStore : public CacheStore
-{
-public:
- FileCacheStore(const char* RootDir, const char* ReadRootDir = nullptr);
- ~FileCacheStore();
-
- virtual bool Get(std::string_view Key, CacheValue& OutValue) override;
- virtual void Put(std::string_view Key, const CacheValue& Value) override;
-
-private:
- std::filesystem::path m_RootDir;
- std::filesystem::path m_ReadRootDir;
- bool m_IsOk = true;
- bool m_ReadRootIsValid = false;
-};
-
-class MemoryCacheStore : public CacheStore
-{
-public:
- MemoryCacheStore();
- ~MemoryCacheStore();
-
- virtual bool Get(std::string_view Key, CacheValue& OutValue) override;
- virtual void Put(std::string_view Key, const CacheValue& Value) override;
-
-private:
- zen::RwLock m_Lock;
- std::unordered_map<std::string, zen::IoBuffer> m_CacheMap;
-};
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 3d80bb14c..5e93ebaa9 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -22,8 +22,6 @@
#include <gsl/gsl-lite.hpp>
#include <unordered_map>
-#include <atlfile.h>
-
//////////////////////////////////////////////////////////////////////////
namespace zen {
@@ -131,23 +129,18 @@ ZenCacheMemoryLayer::~ZenCacheMemoryLayer()
bool
ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
- CacheBucket* Bucket = nullptr;
-
- {
- RwLock::SharedLockScope _(m_Lock);
+ RwLock::SharedLockScope _(m_Lock);
- auto it = m_Buckets.find(std::string(InBucket));
+ auto it = m_Buckets.find(std::string(InBucket));
- if (it != m_Buckets.end())
- {
- Bucket = &it->second;
- }
+ if (it == m_Buckets.end())
+ {
+ return false;
}
- if (Bucket == nullptr)
- return false;
+ CacheBucket* Bucket = Bucket = &it->second;
- ZEN_ASSERT(Bucket != nullptr);
+ _.ReleaseNow();
return Bucket->Get(HashKey, OutValue);
}
@@ -177,8 +170,6 @@ ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const
Bucket = &m_Buckets[std::string(InBucket)];
}
- ZEN_ASSERT(Bucket != nullptr);
-
// Note that since the underlying IoBuffer is retained, the content type is also
Bucket->Put(HashKey, Value);
@@ -195,7 +186,31 @@ ZenCacheMemoryLayer::DropBucket(std::string_view Bucket)
void
ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx)
{
- ZEN_UNUSED(Ctx);
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ Kv.second.Scrub(Ctx);
+ }
+}
+
+void
+ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
+{
+ std::vector<IoHash> BadHashes;
+
+ for (auto& Kv : m_cacheMap)
+ {
+ if (Kv.first != IoHash::HashBuffer(Kv.second))
+ {
+ BadHashes.push_back(Kv.first);
+ }
+ }
+
+ if (!BadHashes.empty())
+ {
+ Ctx.ReportBadChunks(BadHashes);
+ }
}
bool
@@ -203,16 +218,16 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV
{
RwLock::SharedLockScope _(m_bucketLock);
- auto bucketIt = m_cacheMap.find(HashKey);
-
- if (bucketIt == m_cacheMap.end())
+ if (auto bucketIt = m_cacheMap.find(HashKey); bucketIt == m_cacheMap.end())
{
return false;
}
+ else
+ {
+ OutValue.Value = bucketIt->second;
- OutValue.Value = bucketIt->second;
-
- return true;
+ return true;
+ }
}
void
@@ -241,8 +256,19 @@ struct DiskLocation
static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
- inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
- inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
+ inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
+ inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
+ inline ZenContentType GetContentType() const
+ {
+ ZenContentType ContentType = ZenContentType::kBinary;
+
+ if (IsFlagSet(DiskLocation::kStructured))
+ {
+ ContentType = ZenContentType::kCbObject;
+ }
+
+ return ContentType;
+ }
};
struct DiskIndexEntry
@@ -267,6 +293,7 @@ struct ZenCacheDiskLayer::CacheBucket
void Put(const IoHash& HashKey, const ZenCacheValue& Value);
void Drop();
void Flush();
+ void Scrub(ScrubContext& Ctx);
inline bool IsOk() const { return m_Ok; }
@@ -277,15 +304,19 @@ private:
bool m_Ok = false;
uint64_t m_LargeObjectThreshold = 64 * 1024;
+ // These files are used to manage storage of small objects for this bucket
+
BasicFile m_SobsFile;
TCasLogFile<DiskIndexEntry> m_SlogFile;
- void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
- void PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value);
-
RwLock m_IndexLock;
tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index;
uint64_t m_WriteCursor = 0;
+
+ void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
+ void PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetStandaloneCacheValue(const IoHash& HashKey, ZenCacheValue& OutValue, const DiskLocation& Loc);
+ bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
};
ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas)
@@ -320,27 +351,24 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"};
std::filesystem::path SlogPath{m_BucketDir / "zen.slog"};
- CAtlFile ManifestFile;
+ BasicFile ManifestFile;
// Try opening existing manifest file first
bool IsNew = false;
- HRESULT hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, OPEN_EXISTING);
+ std::error_code Ec;
+ ManifestFile.Open(ManifestPath, /* IsCreate */ false, Ec);
- if (SUCCEEDED(hRes))
+ if (!Ec)
{
- ULONGLONG FileSize;
- ManifestFile.GetSize(FileSize);
+ uint64_t FileSize = ManifestFile.FileSize();
if (FileSize == sizeof(Oid))
{
- hRes = ManifestFile.Read(&m_BucketId, sizeof(Oid));
+ ManifestFile.Read(&m_BucketId, sizeof(Oid), 0);
- if (SUCCEEDED(hRes))
- {
- m_Ok = true;
- }
+ m_Ok = true;
}
if (!m_Ok)
@@ -353,16 +381,16 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
{
// No manifest file found, this is a new bucket
- hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS);
+ ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec);
- if (FAILED(hRes))
+ if (Ec)
{
- ThrowLastError("Failed to create bucket manifest '{}'"_format(ManifestPath));
+ throw std::system_error(Ec, "Failed to create bucket manifest '{}'"_format(ManifestPath));
}
m_BucketId.Generate();
- hRes = ManifestFile.Write(&m_BucketId, sizeof(Oid));
+ ManifestFile.Write(&m_BucketId, sizeof(Oid), /* FileOffset */ 0);
IsNew = true;
}
@@ -407,6 +435,37 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(WideStringBuilderBase& Path, const IoH
}
bool
+ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue)
+{
+ if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size);
+ OutValue.Value.SetContentType(Loc.GetContentType());
+
+ return true;
+ }
+
+ return false;
+}
+
+bool
+ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, ZenCacheValue& OutValue, const DiskLocation& Loc)
+{
+ WideStringBuilder<128> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+
+ if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str()))
+ {
+ OutValue.Value = Data;
+ OutValue.Value.SetContentType(Loc.GetContentType());
+
+ return true;
+ }
+
+ return false;
+}
+
+bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
if (!m_Ok)
@@ -420,35 +479,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
{
const DiskLocation& Loc = it->second;
- ZenContentType ContentType = ZenContentType::kBinary;
-
- if (Loc.IsFlagSet(DiskLocation::kStructured))
- {
- ContentType = ZenContentType::kCbObject;
- }
-
- if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ if (GetInlineCacheValue(Loc, OutValue))
{
- OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size);
- OutValue.Value.SetContentType(ContentType);
-
return true;
}
- else
- {
- _.ReleaseNow();
-
- WideStringBuilder<128> DataFilePath;
- BuildPath(DataFilePath, HashKey);
- if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str()))
- {
- OutValue.Value = Data;
- OutValue.Value.SetContentType(ContentType);
+ _.ReleaseNow();
- return true;
- }
- }
+ return GetStandaloneCacheValue(HashKey, OutValue, Loc);
}
return false;
@@ -518,9 +556,58 @@ ZenCacheDiskLayer::CacheBucket::Flush()
}
void
-ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
+ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
- ZEN_UNUSED(Ctx);
+ std::vector<DiskIndexEntry> StandaloneFiles;
+
+ std::vector<IoHash> BadChunks;
+ std::vector<IoBuffer> BadStandaloneChunks;
+
+ {
+ RwLock::SharedLockScope _(m_IndexLock);
+
+ for (auto& Kv : m_Index)
+ {
+ const IoHash& Hash = Kv.first;
+ const DiskLocation& Loc = Kv.second;
+
+ ZenCacheValue Value;
+
+ if (!GetInlineCacheValue(Loc, Value))
+ {
+ ZEN_ASSERT(Loc.IsFlagSet(DiskLocation::kStandaloneFile));
+ StandaloneFiles.push_back({.Key = Hash, .Location = Loc});
+ }
+ else
+ {
+ if (GetStandaloneCacheValue(Hash, Value, Loc))
+ {
+ // Hash contents
+
+ const IoHash ComputedHash = HashBuffer(Value.Value);
+
+ if (ComputedHash != Hash)
+ {
+ BadChunks.push_back(Hash);
+ }
+ }
+ else
+ {
+ // Non-existent
+ }
+ }
+ }
+ }
+
+ if (Ctx.RunRecovery())
+ {
+ // Clean out bad chunks
+ }
+
+ if (!BadChunks.empty())
+ {
+ Ctx.ReportBadChunks(BadChunks);
+ }
}
void
@@ -529,35 +616,38 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenC
WideStringBuilder<128> DataFilePath;
BuildPath(DataFilePath, HashKey);
- // TODO: replace this process with a more efficient implementation with proper atomic rename
- // and also avoid creating directories if we can
-
- std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path();
- CreateDirectories(ParentPath);
+ TemporaryFile DataFile;
- CAtlTemporaryFile DataFile;
+ std::error_code Ec;
+ DataFile.CreateTemporary(m_BucketDir.c_str(), Ec);
- HRESULT hRes = DataFile.Create(m_BucketDir.c_str());
-
- if (FAILED(hRes))
+ if (Ec)
{
- ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir));
+ throw std::system_error(Ec, "Failed to open temporary file for put at '{}'"_format(m_BucketDir));
}
- hRes = DataFile.Write(Value.Value.Data(), gsl::narrow<DWORD>(Value.Value.Size()));
+ DataFile.WriteAll(Value.Value, Ec);
- if (FAILED(hRes))
+ if (Ec)
{
- ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size())));
+ throw std::system_error(Ec, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size())));
}
- // Move file into place (note: not fully atomic!)
+ // Move file into place (atomically)
- hRes = DataFile.Close(DataFilePath.c_str());
+ DataFile.MoveTemporaryIntoPlace(DataFilePath.c_str(), Ec);
- if (FAILED(hRes))
+ if (Ec)
{
- ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath)));
+ std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path();
+ CreateDirectories(ParentPath);
+
+ DataFile.MoveTemporaryIntoPlace(DataFilePath.c_str(), Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath)));
+ }
}
// Update index
@@ -729,6 +819,17 @@ ZenCacheDiskLayer::Flush()
}
}
+void
+ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
+{
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ Kv.second.Scrub(Ctx);
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore)
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index 2cc3abb53..f96757409 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -65,6 +65,7 @@ private:
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(const IoHash& HashKey, const ZenCacheValue& Value);
+ void Scrub(ScrubContext& Ctx);
};
RwLock m_Lock;
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index c21638258..91fb80747 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -188,6 +188,13 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
options.add_option("cache",
"",
+ "upstream-jupiter-prod",
+ "Enable Jupiter upstream caching using production settings",
+ cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.UseProductionSettings)->default_value("false"),
+ "");
+
+ options.add_option("cache",
+ "",
"upstream-jupiter-dev",
"Enable Jupiter upstream caching using development settings",
cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.JupiterConfig.UseDevelopmentSettings)->default_value("false"),
@@ -214,6 +221,13 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"),
"");
+ options.add_option("cache",
+ "",
+ "upstream-stats",
+ "Collect performance metrics for upstream endpoints",
+ cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"),
+ "");
+
try
{
auto result = options.parse(argc, argv);
diff --git a/zenserver/config.h b/zenserver/config.h
index 6ade1b401..ce059bdb2 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -28,6 +28,7 @@ struct ZenUpstreamJupiterConfig
std::string Namespace;
std::string DdcNamespace;
bool UseDevelopmentSettings = false;
+ bool UseProductionSettings = false;
bool UseLegacyDdc = false;
};
@@ -50,6 +51,7 @@ struct ZenUpstreamCacheConfig
ZenUpstreamZenConfig ZenConfig;
int UpstreamThreadCount = 4;
UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite;
+ bool StatsEnabled = false;
};
struct ZenServiceConfig
diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp
index 41b140f90..bc7b883b5 100644
--- a/zenserver/diag/logging.cpp
+++ b/zenserver/diag/logging.cpp
@@ -258,6 +258,18 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
}
#endif
+ // HTTP server request logging
+
+ std::filesystem::path HttpLogPath = GlobalOptions.DataDir / "logs/http.log";
+
+ auto HttpSink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(zen::WideToUtf8(HttpLogPath.c_str()),
+ /* max size */ 128 * 1024 * 1024,
+ /* max files */ 16,
+ /* rotate on open */ true);
+
+ auto HttpLogger = std::make_shared<spdlog::logger>("http_requests", HttpSink);
+ spdlog::register_logger(HttpLogger);
+
// Jupiter - only log HTTP traffic to file
auto JupiterLogger = std::make_shared<spdlog::logger>("jupiter", FileSink);
diff --git a/zenserver/testing/httptest.cpp b/zenserver/testing/httptest.cpp
index c4fd6003c..18d63a6ef 100644
--- a/zenserver/testing/httptest.cpp
+++ b/zenserver/testing/httptest.cpp
@@ -2,6 +2,7 @@
#include "httptest.h"
+#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
namespace zen {
@@ -14,6 +15,16 @@ HttpTestingService::HttpTestingService()
HttpVerb::kGet);
m_Router.RegisterRoute(
+ "json",
+ [this](HttpRouterRequest& Req) {
+ CbObjectWriter Obj;
+ Obj.AddBool("ok", true);
+ Obj.AddInteger("counter", ++m_Counter);
+ Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"echo",
[this](HttpRouterRequest& Req) {
IoBuffer Body = Req.ServerRequest().ReadPayload();
diff --git a/zenserver/testing/httptest.h b/zenserver/testing/httptest.h
index b445fb450..f55780d05 100644
--- a/zenserver/testing/httptest.h
+++ b/zenserver/testing/httptest.h
@@ -5,6 +5,8 @@
#include <zencore/logging.h>
#include <zenhttp/httpserver.h>
+#include <atomic>
+
namespace zen {
/**
@@ -37,7 +39,8 @@ public:
};
private:
- HttpRequestRouter m_Router;
+ HttpRequestRouter m_Router;
+ std::atomic<uint32_t> m_Counter{0};
RwLock m_RwLock;
std::unordered_map<uint32_t, Ref<PackageHandler>> m_HandlerMap;
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 14da8cbcc..6eaa6423b 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -40,22 +40,32 @@ namespace detail {
CloudCacheSessionState(CloudCacheClient& Client) : OwnerClient(Client) {}
~CloudCacheSessionState() {}
- void Reset()
+ const CloudCacheAccessToken& GetAccessToken()
{
- std::string Auth;
- OwnerClient.AcquireAccessToken(Auth);
+ if (!AccessToken.IsValid())
+ {
+ AccessToken = OwnerClient.AcquireAccessToken();
+ }
+ return AccessToken;
+ }
+ void InvalidateAccessToken() { AccessToken = {}; }
+
+ void Reset()
+ {
Session.SetBody({});
- Session.SetOption(cpr::Header{{"Authorization", Auth}});
+ Session.SetHeader({});
+ AccessToken = GetAccessToken();
}
- CloudCacheClient& OwnerClient;
- cpr::Session Session;
+ CloudCacheClient& OwnerClient;
+ CloudCacheAccessToken AccessToken;
+ cpr::Session Session;
};
} // namespace detail
-CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_Log(OuterClient->Logger()), m_CacheClient(OuterClient)
+CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient)
{
m_SessionState = m_CacheClient->AllocSessionState();
}
@@ -68,16 +78,18 @@ CloudCacheSession::~CloudCacheSession()
CloudCacheResult
CloudCacheSession::Authenticate()
{
- std::string Auth;
- const bool Success = m_CacheClient->AcquireAccessToken(Auth);
- return {.Success = Success};
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ return {.Success = AccessToken.IsValid()};
}
CloudCacheResult
CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key)
{
- std::string Auth;
- m_CacheClient->AcquireAccessToken(Auth);
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw";
@@ -85,7 +97,7 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", Auth}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -94,6 +106,10 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -110,9 +126,13 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key)
CloudCacheResult
CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream";
- std::string Auth;
- m_CacheClient->AcquireAccessToken(Auth);
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
@@ -121,7 +141,7 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", ContentType}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -130,6 +150,10 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -140,8 +164,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
CloudCacheResult
CloudCacheSession::GetCompressedBlob(const IoHash& Key)
{
- std::string Auth;
- m_CacheClient->AcquireAccessToken(Auth);
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
@@ -149,7 +176,7 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-comp"}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -168,10 +195,13 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
CloudCacheResult
CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData)
{
- IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size());
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
- std::string Auth;
- m_CacheClient->AcquireAccessToken(Auth);
+ IoHash Hash = IoHash::HashBuffer(DerivedData.Data(), DerivedData.Size());
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key;
@@ -179,8 +209,9 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
auto& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(
- cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value},
+ {"X-Jupiter-IoHash", Hash.ToHexString()},
+ {"Content-Type", "application/octet-stream"}});
Session.SetBody(cpr::Body{(const char*)DerivedData.Data(), DerivedData.Size()});
cpr::Response Response = Session.Put();
@@ -190,6 +221,10 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
@@ -205,11 +240,15 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key,
CloudCacheResult
CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream";
- std::string Auth;
- m_CacheClient->AcquireAccessToken(Auth);
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
@@ -218,7 +257,8 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}});
+ Session.SetOption(
+ cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}});
Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()});
cpr::Response Response = Session.Put();
@@ -228,6 +268,10 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
@@ -237,8 +281,11 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
CloudCacheResult
CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
{
- std::string Auth;
- m_CacheClient->AcquireAccessToken(Auth);
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
@@ -246,7 +293,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Content-Type", "application/x-ue-comp"}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}});
Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()});
cpr::Response Response = Session.Put();
@@ -256,6 +303,10 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
@@ -274,22 +325,21 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>&
return {};
}
-//////////////////////////////////////////////////////////////////////////
-
-std::string
-CloudCacheAccessToken::GetAuthorizationHeaderValue()
+const CloudCacheAccessToken&
+CloudCacheSession::GetAccessToken()
{
- RwLock::SharedLockScope _(m_Lock);
-
- return "Bearer {}"_format(m_Token);
+ return m_SessionState->GetAccessToken();
}
-inline void
-CloudCacheAccessToken::SetToken(std::string_view Token)
+bool
+CloudCacheSession::VerifyAccessToken(long StatusCode)
{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Token = Token;
- ++m_Serial;
+ if (StatusCode == 401)
+ {
+ m_SessionState->InvalidateAccessToken();
+ return false;
+ }
+ return true;
}
//////////////////////////////////////////////////////////////////////////
@@ -354,60 +404,33 @@ CloudCacheClient::~CloudCacheClient()
}
}
-bool
-CloudCacheClient::AcquireAccessToken(std::string& AuthorizationHeaderValue)
+CloudCacheAccessToken
+CloudCacheClient::AcquireAccessToken()
{
- // TODO: check for expiration
-
- if (!m_IsValid)
- {
- ExtendableStringBuilder<128> OAuthFormData;
- OAuthFormData << "client_id=" << m_OAuthClientId
- << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret;
-
- const uint32_t CurrentSerial = m_AccessToken.GetSerial();
+ using namespace std::chrono;
- static RwLock AuthMutex;
- RwLock::ExclusiveLockScope _(AuthMutex);
-
- // Protect against redundant authentication operations
- if (m_AccessToken.GetSerial() != CurrentSerial)
- {
- // TODO: this could verify that the token is actually valid and retry if not?
-
- return true;
- }
+ ExtendableStringBuilder<128> OAuthFormData;
+ OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret;
- std::string data{OAuthFormData};
+ std::string Body{OAuthFormData};
- cpr::Response Response =
- cpr::Post(cpr::Url{m_OAuthFullUri}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{data});
+ cpr::Response Response =
+ cpr::Post(cpr::Url{m_OAuthFullUri}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{Body});
- std::string Body{std::move(Response.text)};
+ Body = std::move(Response.text);
- // Parse JSON response
-
- std::string JsonError;
- json11::Json JsonResponse = json11::Json::parse(Body, /* out */ JsonError);
- if (!JsonError.empty())
- {
- ZEN_WARN("failed to parse OAuth response: '{}'", JsonError);
-
- return false;
- }
-
- std::string AccessToken = JsonResponse["access_token"].string_value();
- int ExpiryTimeSeconds = JsonResponse["expires_in"].int_value();
- ZEN_UNUSED(ExpiryTimeSeconds);
-
- m_AccessToken.SetToken(AccessToken);
-
- m_IsValid = true;
+ std::string JsonError;
+ json11::Json JsonResponse = json11::Json::parse(Body, JsonError);
+ if (!JsonError.empty())
+ {
+ return {};
}
- AuthorizationHeaderValue = m_AccessToken.GetAuthorizationHeaderValue();
+ std::string AccessToken = std::string("Bearer ") + JsonResponse["access_token"].string_value();
+ int64_t ExpiresInSeconds = static_cast<int64_t>(JsonResponse["expires_in"].int_value());
+ steady_clock::time_point ExpireTime = steady_clock::now() + seconds(ExpiresInSeconds);
- return true;
+ return {std::move(AccessToken), ExpireTime};
}
detail::CloudCacheSessionState*
@@ -434,8 +457,19 @@ CloudCacheClient::AllocSessionState()
void
CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State)
{
+ const bool IsTokenValid = State->AccessToken.IsValid();
+
RwLock::ExclusiveLockScope _(m_SessionStateLock);
m_SessionStateCache.push_front(State);
+
+ // Invalidate all cached access tokens if any one fails
+ if (!IsTokenValid)
+ {
+ for (auto& CachedState : m_SessionStateCache)
+ {
+ CachedState->AccessToken = {};
+ }
+ }
}
} // namespace zen
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 94e7e7680..868a7b099 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -8,6 +8,7 @@
#include <zenhttp/httpserver.h>
#include <atomic>
+#include <chrono>
#include <list>
#include <memory>
#include <vector>
@@ -29,15 +30,17 @@ class CbObjectView;
*/
struct CloudCacheAccessToken
{
- std::string GetAuthorizationHeaderValue();
- void SetToken(std::string_view Token);
+ static constexpr int64_t ExpireMarginInSeconds = 30;
- inline uint32_t GetSerial() const { return m_Serial.load(std::memory_order::memory_order_relaxed); }
+ std::string Value;
+ std::chrono::steady_clock::time_point ExpireTime;
-private:
- RwLock m_Lock;
- std::string m_Token;
- std::atomic<uint32_t> m_Serial;
+ bool IsValid() const
+ {
+ return !Value.empty() &&
+ ExpireMarginInSeconds <
+ std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - std::chrono::steady_clock::now()).count();
+ }
};
struct CloudCacheResult
@@ -60,7 +63,7 @@ struct CloudCacheResult
class CloudCacheSession
{
public:
- CloudCacheSession(CloudCacheClient* OuterClient);
+ CloudCacheSession(CloudCacheClient* CacheClient);
~CloudCacheSession();
CloudCacheResult Authenticate();
@@ -77,7 +80,9 @@ public:
std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
private:
- inline spdlog::logger& Log() { return m_Log; }
+ inline spdlog::logger& Log() { return m_Log; }
+ const CloudCacheAccessToken& GetAccessToken();
+ bool VerifyAccessToken(long StatusCode);
spdlog::logger& m_Log;
RefPtr<CloudCacheClient> m_CacheClient;
@@ -104,26 +109,25 @@ public:
CloudCacheClient(const CloudCacheClientOptions& Options);
~CloudCacheClient();
- bool AcquireAccessToken(std::string& AuthorizationHeaderValue);
- std::string_view DdcNamespace() const { return m_DdcNamespace; }
- std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; }
- std::string_view ServiceUrl() const { return m_ServiceUrl; }
- bool IsValid() const { return m_IsValid; }
+ CloudCacheAccessToken AcquireAccessToken();
+ std::string_view DdcNamespace() const { return m_DdcNamespace; }
+ std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; }
+ std::string_view ServiceUrl() const { return m_ServiceUrl; }
+ bool IsValid() const { return m_IsValid; }
spdlog::logger& Logger() { return m_Log; }
private:
- spdlog::logger& m_Log;
- std::string m_ServiceUrl;
- std::string m_OAuthDomain;
- std::string m_OAuthUriPath;
- std::string m_OAuthFullUri;
- std::string m_DdcNamespace;
- std::string m_BlobStoreNamespace;
- std::string m_OAuthClientId;
- std::string m_OAuthSecret;
- CloudCacheAccessToken m_AccessToken;
- bool m_IsValid = false;
+ spdlog::logger& m_Log;
+ std::string m_ServiceUrl;
+ std::string m_OAuthDomain;
+ std::string m_OAuthUriPath;
+ std::string m_OAuthFullUri;
+ std::string m_DdcNamespace;
+ std::string m_BlobStoreNamespace;
+ std::string m_OAuthClientId;
+ std::string m_OAuthSecret;
+ bool m_IsValid = false;
RwLock m_SessionStateLock;
std::list<detail::CloudCacheSessionState*> m_SessionStateCache;
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index a889fb984..f056c1c76 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -153,6 +153,7 @@ namespace detail {
CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ Result.Bytes += AttachmentResult.Bytes;
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
Result.ErrorCode = AttachmentResult.ErrorCode;
@@ -176,7 +177,6 @@ namespace detail {
Package.Save(Writer);
Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- Result.Bytes = MemStream.Size();
}
}
}
@@ -247,21 +247,26 @@ namespace detail {
}
else
{
+ bool Success = false;
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
{
- CloudCacheResult Result;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
{
- Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ Result.Success)
+ {
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ Success = true;
+ break;
+ }
}
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
-
- if (!Result.Success)
+ if (!Success)
{
return {.Reason = "Failed to upload payload",
.Bytes = TotalBytes,
@@ -270,29 +275,38 @@ namespace detail {
}
}
- CloudCacheResult Result;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
{
- Result =
- Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject);
+ if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ RecordValue,
+ ZenContentType::kCbObject);
+ Result.Success)
+ {
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ Success = true;
+ break;
+ }
}
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
-
- return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
+ return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success};
}
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Reason = std::string(Err.what()), .Success = false};
}
}
+ virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
+
private:
bool m_UseLegacyDdc;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
+ UpstreamEndpointStats m_Stats;
std::atomic_bool m_HealthOk{false};
};
@@ -317,9 +331,9 @@ namespace detail {
ZenStructuredCacheSession Session(*m_Client);
ZenCacheResult Result;
- for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt)
+ for (int32_t Attempt = 0, MaxAttempts = 2; Attempt < MaxAttempts && !Result.Success; ++Attempt)
{
- Result = Session.SayHello();
+ Result = Session.CheckHealth();
}
m_HealthOk = Result.ErrorCode == 0;
@@ -473,9 +487,12 @@ namespace detail {
}
}
+ virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
+
private:
std::string m_DisplayName;
RefPtr<ZenStructuredCacheClient> m_Client;
+ UpstreamEndpointStats m_Stats;
std::atomic_bool m_HealthOk{false};
};
@@ -483,87 +500,95 @@ namespace detail {
//////////////////////////////////////////////////////////////////////////
-class UpstreamStats final
+struct UpstreamStats
{
static constexpr uint64_t MaxSampleCount = 100ull;
- struct StatCounters
- {
- int64_t Bytes = {};
- int64_t Count = {};
- double Seconds = {};
- };
-
- using StatsMap = std::unordered_map<const UpstreamEndpoint*, StatCounters>;
-
- struct EndpointStats
- {
- mutable std::mutex Lock;
- StatsMap Counters;
- };
-
-public:
- UpstreamStats() : m_Log(logging::Get("upstream")) {}
+ UpstreamStats(bool Enabled) : m_Enabled(Enabled) {}
- void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result)
+ void Add(spdlog::logger& Logger,
+ UpstreamEndpoint& Endpoint,
+ const GetUpstreamCacheResult& Result,
+ const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- std::unique_lock Lock(m_DownStats.Lock);
+ if (!m_Enabled)
+ {
+ return;
+ }
- auto& Counters = m_DownStats.Counters[&Endpoint];
- Counters.Bytes += Result.Bytes;
- Counters.Seconds += Result.ElapsedSeconds;
- Counters.Count++;
+ UpstreamEndpointStats& Stats = Endpoint.Stats();
+ if (Result.Success)
+ {
+ Stats.HitCount++;
+ Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
+ Stats.SecondsDown.fetch_add(Result.ElapsedSeconds);
+ }
+ else
+ {
+ Stats.MissCount++;
+ }
- if (Counters.Count >= MaxSampleCount)
+ if (m_SampleCount++ % MaxSampleCount)
{
- LogStats("STATS - (downstream):"sv, m_DownStats.Counters);
- Counters = StatCounters{};
+ Dump(Logger, Endpoints);
}
}
- void Add(const UpstreamEndpoint& Endpoint, const PutUpstreamCacheResult& Result)
+ void Add(spdlog::logger& Logger,
+ UpstreamEndpoint& Endpoint,
+ const PutUpstreamCacheResult& Result,
+ const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- std::unique_lock Lock(m_UpStats.Lock);
+ if (!m_Enabled)
+ {
+ return;
+ }
- auto& Counters = m_UpStats.Counters[&Endpoint];
- Counters.Bytes += Result.Bytes;
- Counters.Seconds += Result.ElapsedSeconds;
- Counters.Count++;
+ UpstreamEndpointStats& Stats = Endpoint.Stats();
+ if (Result.Success)
+ {
+ Stats.UpCount++;
+ Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
+ Stats.SecondsUp.fetch_add(Result.ElapsedSeconds);
+ }
- if (Counters.Count >= MaxSampleCount)
+ if (m_SampleCount++ % MaxSampleCount)
{
- LogStats("STATS - (upstream):"sv, m_UpStats.Counters);
- Counters = StatCounters{};
+ Dump(Logger, Endpoints);
}
}
-private:
- void LogStats(std::string_view What, const std::unordered_map<const UpstreamEndpoint*, StatCounters>& EndpointStats)
+ void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- for (const auto& Kv : EndpointStats)
+ for (auto& Ep : Endpoints)
{
- const UpstreamEndpoint& Endpoint = *Kv.first;
- const StatCounters& Counters = Kv.second;
- const double TotalMb = double(Counters.Bytes) / 1024.0 / 1024.0;
-
- ZEN_UNUSED(Endpoint);
-
- ZEN_INFO("{} Endpoint: {}, Bytes: {:.2f} MB, Time: {:.2f} s, Speed: {:.2f} MB/s, Avg: {:.2f} ms/request, Samples: {}",
- What,
- Kv.first->DisplayName(),
- TotalMb,
- Counters.Seconds,
- TotalMb / Counters.Seconds,
- (Counters.Seconds * 1000.0) / double(Counters.Count),
- Counters.Count);
+ // These stats will not be totally correct as the numbers are not captured atomically
+
+ UpstreamEndpointStats& Stats = Ep->Stats();
+ const uint64_t HitCount = Stats.HitCount;
+ const uint64_t MissCount = Stats.MissCount;
+ const double DownBytes = Stats.DownBytes;
+ const double SecondsDown = Stats.SecondsDown;
+ const double UpBytes = Stats.UpBytes;
+ const double SecondsUp = Stats.SecondsUp;
+
+ const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0;
+ const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0;
+ const uint64_t TotalCount = HitCount + MissCount;
+ const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0;
+
+ Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
+ Ep->DisplayName(),
+ HitRate,
+ DownBytes,
+ DownSpeed,
+ UpBytes,
+ UpSpeed);
}
}
- spdlog::logger& Log() { return m_Log; }
-
- spdlog::logger& m_Log;
- EndpointStats m_UpStats;
- EndpointStats m_DownStats;
+ bool m_Enabled;
+ std::atomic_uint64_t m_SampleCount = {};
};
//////////////////////////////////////////////////////////////////////////
@@ -576,6 +601,7 @@ public:
, m_Options(Options)
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
+ , m_Stats(Options.StatsEnabled)
{
}
@@ -621,9 +647,11 @@ public:
{
if (Endpoint->IsHealthy())
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ const GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type);
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+
+ if (Result.Success)
{
- m_Stats.Add(*Endpoint, Result);
return Result;
}
}
@@ -641,9 +669,11 @@ public:
{
if (Endpoint->IsHealthy())
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey);
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+
+ if (Result.Success)
{
- m_Stats.Add(*Endpoint, Result);
return Result;
}
}
@@ -707,18 +737,7 @@ private:
if (Endpoint->IsHealthy())
{
const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
- if (Result.Success)
- {
- m_Stats.Add(*Endpoint, Result);
- }
- else
- {
- ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- Endpoint->DisplayName(),
- Result.Reason);
- }
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
}
}
}
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 96ee8bddc..0e736480b 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -6,6 +6,7 @@
#include <zencore/iohash.h>
#include <zencore/zencore.h>
+#include <atomic>
#include <chrono>
#include <memory>
@@ -40,6 +41,7 @@ struct UpstreamCacheOptions
uint32_t ThreadCount = 4;
bool ReadUpstream = true;
bool WriteUpstream = true;
+ bool StatsEnabled = false;
};
enum class UpstreamStatusCode : uint8_t
@@ -79,6 +81,17 @@ struct UpstreamEndpointHealth
bool Ok = false;
};
+struct UpstreamEndpointStats
+{
+ std::atomic_uint64_t HitCount = {};
+ std::atomic_uint64_t MissCount = {};
+ std::atomic_uint64_t UpCount = {};
+ std::atomic<double> UpBytes = {};
+ std::atomic<double> DownBytes = {};
+ std::atomic<double> SecondsUp = {};
+ std::atomic<double> SecondsDown = {};
+};
+
/**
* The upstream endpont is responsible for handling upload/downloading of cache records.
*/
@@ -100,6 +113,8 @@ public:
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
std::span<IoBuffer const> Payloads) = 0;
+
+ virtual UpstreamEndpointStats& Stats() = 0;
};
/**
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 710d381c6..530bed32a 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -382,10 +382,10 @@ ZenStructuredCacheSession::~ZenStructuredCacheSession()
}
ZenCacheResult
-ZenStructuredCacheSession::SayHello()
+ZenStructuredCacheSession::CheckHealth()
{
ExtendableStringBuilder<256> Uri;
- Uri << m_Client.ServiceUrl() << "/test/hello";
+ Uri << m_Client.ServiceUrl() << "/health/check";
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 48886096d..158be668a 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -111,7 +111,7 @@ public:
ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient);
~ZenStructuredCacheSession();
- ZenCacheResult SayHello();
+ ZenCacheResult CheckHealth();
ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type);
ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId);
ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type);
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index e3b61568f..fe4f41ab5 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -188,6 +188,8 @@ public:
UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
}
+ UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled;
+
UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
if (!UpstreamConfig.ZenConfig.Url.empty())
@@ -198,7 +200,18 @@ public:
{
zen::CloudCacheClientOptions Options;
- if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
+ if (UpstreamConfig.JupiterConfig.UseProductionSettings)
+ {
+ Options = zen::CloudCacheClientOptions{
+ .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
+ .DdcNamespace = "ue.ddc"sv,
+ .BlobStoreNamespace = "ue.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ .UseLegacyDdc = false};
+ }
+ else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
{
Options = zen::CloudCacheClientOptions{
.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index 1671d98a6..29436d840 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -115,7 +115,6 @@
<ClInclude Include="upstream\jupiter.h" />
<ClInclude Include="projectstore.h" />
<ClInclude Include="cache\cacheagent.h" />
- <ClInclude Include="cache\cachestore.h" />
<ClInclude Include="testing\launch.h" />
<ClInclude Include="casstore.h" />
<ClInclude Include="diag\diagsvcs.h" />
@@ -138,7 +137,6 @@
<ClCompile Include="testing\httptest.cpp" />
<ClCompile Include="upstream\jupiter.cpp" />
<ClCompile Include="testing\launch.cpp" />
- <ClCompile Include="cache\cachestore.cpp" />
<ClCompile Include="casstore.cpp" />
<ClCompile Include="experimental\usnjournal.cpp" />
<ClCompile Include="upstream\upstreamcache.cpp" />
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index c51a8eb76..6b99ca8d7 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -9,9 +9,6 @@
<ClInclude Include="cache\cacheagent.h">
<Filter>cache</Filter>
</ClInclude>
- <ClInclude Include="cache\cachestore.h">
- <Filter>cache</Filter>
- </ClInclude>
<ClInclude Include="diag\diagsvcs.h">
<Filter>diag</Filter>
</ClInclude>
@@ -50,9 +47,6 @@
<ClCompile Include="cache\cacheagent.cpp">
<Filter>cache</Filter>
</ClCompile>
- <ClCompile Include="cache\cachestore.cpp">
- <Filter>cache</Filter>
- </ClCompile>
<ClCompile Include="experimental\usnjournal.cpp">
<Filter>experimental</Filter>
</ClCompile>