diff options
| author | Stefan Boberg <[email protected]> | 2021-05-22 11:53:09 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-05-22 11:53:09 +0200 |
| commit | f93d04dd9381eac38be82733c34baa2075efa11c (patch) | |
| tree | b5c4f9a173042580398e4dd96be97702909cc06b /zenserver | |
| parent | Structured cache changes (diff) | |
| download | zen-f93d04dd9381eac38be82733c34baa2075efa11c.tar.xz zen-f93d04dd9381eac38be82733c34baa2075efa11c.zip | |
Split out structured cache store code into dedicated cpp/h pair
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/cachestore.cpp | 568 | ||||
| -rw-r--r-- | zenserver/cache/cachestore.h | 98 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 2 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 2 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 554 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 117 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 2 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj | 2 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj.filters | 2 |
9 files changed, 688 insertions, 659 deletions
diff --git a/zenserver/cache/cachestore.cpp b/zenserver/cache/cachestore.cpp index f99caa24f..1977d5f01 100644 --- a/zenserver/cache/cachestore.cpp +++ b/zenserver/cache/cachestore.cpp @@ -337,7 +337,7 @@ struct CorruptionTrailer } }; -std::wstring +std::filesystem::path GenerateDdcPath(std::string_view Key, std::filesystem::path& rootDir) { std::filesystem::path FilePath = rootDir; @@ -413,22 +413,22 @@ FileCacheStore::Get(std::string_view Key, CacheValue& OutValue) { CAtlFile File; - std::wstring nativePath; + std::filesystem::path NativePath; HRESULT hRes = E_FAIL; if (m_ReadRootDir.empty() == false) { - nativePath = UE::GenerateDdcPath(Key, m_ReadRootDir); + NativePath = UE::GenerateDdcPath(Key, m_ReadRootDir); - hRes = File.Create(nativePath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + hRes = File.Create(NativePath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); } if (FAILED(hRes)) { - nativePath = UE::GenerateDdcPath(Key, m_RootDir); + NativePath = UE::GenerateDdcPath(Key, m_RootDir); - hRes = File.Create(nativePath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + hRes = File.Create(NativePath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); } if (FAILED(hRes)) @@ -448,28 +448,7 @@ FileCacheStore::Get(std::string_view Key, CacheValue& OutValue) FileSize -= 16; // CorruptionWrapper trailer - IoBuffer Value(FileSize); - - uint8_t* ReadPointer = (uint8_t*)Value.Data(); - - while (FileSize) - { - const int MaxChunkSize = 16 * 1024 * 1024; - const int ChunkSize = gsl::narrow_cast<int>((FileSize > MaxChunkSize) ? MaxChunkSize : FileSize); - - DWORD BytesRead = 0; - hRes = File.Read(ReadPointer, ChunkSize, BytesRead); - - if (FAILED(hRes)) - { - return false; - } - - ReadPointer += BytesRead; - FileSize -= BytesRead; - } - - OutValue.Value = std::move(Value); + OutValue.Value = IoBuffer(IoBuffer::File, File.Detach(), 0, FileSize); spdlog::debug("GET HIT {}", Key); @@ -485,7 +464,7 @@ FileCacheStore::Put(std::string_view Key, const CacheValue& Value) UE::CorruptionTrailer Trailer; Trailer.Initialize(Data, Size); - std::wstring nativePath = UE::GenerateDdcPath(Key, m_RootDir); + std::filesystem::path NativePath = UE::GenerateDdcPath(Key, m_RootDir); CAtlTemporaryFile File; @@ -495,7 +474,7 @@ FileCacheStore::Put(std::string_view Key, const CacheValue& Value) if (SUCCEEDED(hRes)) { - const uint8_t* WritePointer = (const uint8_t*)Data; + const uint8_t* WritePointer = reinterpret_cast<const uint8_t*>(Data); while (Size) { @@ -510,7 +489,7 @@ FileCacheStore::Put(std::string_view Key, const CacheValue& Value) } File.Write(&Trailer, sizeof Trailer); - hRes = File.Close(nativePath.c_str()); // This renames the file to its final name + hRes = File.Close(NativePath.c_str()); // This renames the file to its final name if (FAILED(hRes)) { @@ -559,530 +538,3 @@ MemoryCacheStore::Put(std::string_view Key, const CacheValue& Value) RwLock::ExclusiveLockScope _(m_Lock); m_CacheMap[std::string(Key)] = Value.Value; } - -////////////////////////////////////////////////////////////////////////// - -ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} -{ - zen::CreateDirectories(RootDir); -} - -ZenCacheStore::~ZenCacheStore() -{ -} - -bool -ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) -{ - bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); - - if (!Ok) - { - Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); - -#if 0 // This would keep file handles open - if (ok) - { - m_memLayer.Put(InBucket, HashKey, OutValue); - } -#endif - } - - return Ok; -} - -void -ZenCacheStore::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) -{ - m_MemLayer.Put(InBucket, HashKey, Value); - m_DiskLayer.Put(InBucket, HashKey, Value); -} - -////////////////////////////////////////////////////////////////////////// - -ZenCacheMemoryLayer::ZenCacheMemoryLayer() -{ -} - -ZenCacheMemoryLayer::~ZenCacheMemoryLayer() -{ -} - -bool -ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) -{ - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - auto it = m_Buckets.find(std::string(InBucket)); - - if (it != m_Buckets.end()) - { - Bucket = &it->second; - } - } - - if (Bucket == nullptr) - return false; - - ZEN_ASSERT(Bucket != nullptr); - - return Bucket->Get(HashKey, OutValue); -} - -void -ZenCacheMemoryLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) -{ - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - auto it = m_Buckets.find(std::string(InBucket)); - - if (it != m_Buckets.end()) - { - Bucket = &it->second; - } - } - - if (Bucket == nullptr) - { - // New bucket - - RwLock::ExclusiveLockScope _(m_Lock); - - Bucket = &m_Buckets[std::string(InBucket)]; - } - - ZEN_ASSERT(Bucket != nullptr); - - Bucket->Put(HashKey, Value); -} - -bool -ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) -{ - RwLock::SharedLockScope _(m_bucketLock); - - auto bucketIt = m_cacheMap.find(HashKey); - - if (bucketIt == m_cacheMap.end()) - { - return false; - } - - OutValue.Value = bucketIt->second; - - return true; -} - -void -ZenCacheMemoryLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) -{ - RwLock::ExclusiveLockScope _(m_bucketLock); - - m_cacheMap[HashKey] = Value.Value; -} - -////////////////////////////////////////////////////////////////////////// - -#pragma pack(push) -#pragma pack(1) - -struct DiskLocation -{ - uint64_t Offset; - uint32_t Size; -}; - -struct DiskIndexEntry -{ - zen::IoHash Key; - DiskLocation Location; -}; - -#pragma pack(pop) - -static_assert(sizeof(DiskIndexEntry) == 32); - -struct ZenCacheDiskLayer::CacheBucket -{ - CacheBucket(CasStore& Cas); - ~CacheBucket(); - - void OpenOrCreate(std::filesystem::path BucketDir); - - bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); - void Flush(); - - inline bool IsOk() const { return m_Ok; } - -private: - CasStore& m_CasStore; - std::filesystem::path m_BucketDir; - Oid m_BucketId; - bool m_Ok = false; - uint64_t m_LargeObjectThreshold = 1024; - - BasicFile m_SobsFile; - TCasLogFile<DiskIndexEntry> m_SlogFile; - BasicFile m_SidxFile; - - void BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey); - void PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value); - - RwLock m_IndexLock; - std::unordered_map<zen::IoHash, DiskLocation, zen::IoHash::Hasher> m_Index; - uint64_t m_WriteCursor = 0; -}; - -ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) -{ -} - -ZenCacheDiskLayer::CacheBucket::~CacheBucket() -{ -} - -void -ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) -{ - std::filesystem::create_directories(BucketDir); - - m_BucketDir = BucketDir; - - std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; - std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"}; - std::filesystem::path SlogPath{m_BucketDir / "zen.slog"}; - std::filesystem::path SidxPath{m_BucketDir / "zen.sidx"}; - - CAtlFile ManifestFile; - - // Try opening existing file first - - bool IsNew = false; - - HRESULT hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, OPEN_EXISTING); - - if (SUCCEEDED(hRes)) - { - ULONGLONG FileSize; - ManifestFile.GetSize(FileSize); - - if (FileSize == sizeof(Oid)) - { - hRes = ManifestFile.Read(&m_BucketId, sizeof(Oid)); - - if (SUCCEEDED(hRes)) - { - m_Ok = true; - } - } - - if (!m_Ok) - ManifestFile.Close(); - } - - if (!m_Ok) - { - // This is a new bucket - - hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS); - - if (FAILED(hRes)) - { - ThrowLastError("Failed to create bucket manifest '{}'"_format(ManifestPath)); - } - - m_BucketId.Generate(); - - hRes = ManifestFile.Write(&m_BucketId, sizeof(Oid)); - - IsNew = true; - } - - // Initialize small object storage related files - - m_SobsFile.Open(SobsPath, IsNew); - m_SidxFile.Open(SidxPath, IsNew); - - // Open and replay log - - m_SlogFile.Open(SlogPath, IsNew); - - uint64_t MaxFileOffset = 0; - - { - // This is not technically necessary but may help future static analysis - zen::RwLock::ExclusiveLockScope _(m_IndexLock); - - m_SlogFile.Replay([&](const DiskIndexEntry& Record) { - m_Index[Record.Key] = Record.Location; - - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset + Record.Location.Size); - }); - } - - m_WriteCursor = (MaxFileOffset + 15) & ~15; - - m_Ok = true; -} - -void -ZenCacheDiskLayer::CacheBucket::BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey) -{ - char hex[sizeof(HashKey.Hash) * 2]; - ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, hex); - - Path.Append(m_BucketDir.c_str()); - Path.Append(L"/"); - Path.AppendAsciiRange(hex, hex + sizeof(hex)); -} - -bool -ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) -{ - if (!m_Ok) - return false; - - { - zen::RwLock::SharedLockScope _(m_IndexLock); - - if (auto it = m_Index.find(HashKey); it != m_Index.end()) - { - OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), it->second.Offset, it->second.Size); - - return true; - } - } - - WideStringBuilder<128> DataFilePath; - BuildPath(DataFilePath, HashKey); - - zen::IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str()); - - if (!Data) - { - return false; - } - - OutValue.Value = Data; - - // TODO: should populate index? - - return true; -} - -void -ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) -{ - if (!m_Ok) - { - return; - } - - if (Value.Value.Size() >= m_LargeObjectThreshold) - { - return PutLargeObject(HashKey, Value); - } - - // Small object put - - zen::RwLock::ExclusiveLockScope _(m_IndexLock); - - auto it = m_Index.find(HashKey); - - DiskLocation Loc{.Offset = m_WriteCursor, .Size = gsl::narrow<uint32_t>(Value.Value.Size())}; - - m_WriteCursor = (m_WriteCursor + Loc.Size + 15) & ~15; - - if (it == m_Index.end()) - { - m_Index.insert({HashKey, Loc}); - } - else - { - // TODO: should check if write is idempotent and bail out if it is? - - it->second = Loc; - } - - DiskIndexEntry IndexEntry{.Key = HashKey, .Location = Loc}; - - m_SlogFile.Append(IndexEntry); - - m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset); - - return; -} - -void -ZenCacheDiskLayer::CacheBucket::Flush() -{ - m_SobsFile.Flush(); - m_SidxFile.Flush(); - m_SlogFile.Flush(); -} - -void -ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value) -{ - zen::WideStringBuilder<128> DataFilePath; - BuildPath(DataFilePath, HashKey); - - // TODO: replace this with a more efficient implementation with proper atomic rename - - CAtlTemporaryFile DataFile; - - HRESULT hRes = DataFile.Create(m_BucketDir.c_str()); - - if (FAILED(hRes)) - { - zen::ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); - } - - hRes = DataFile.Write(Value.Value.Data(), gsl::narrow<DWORD>(Value.Value.Size())); - - if (FAILED(hRes)) - { - zen::ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); - } - - hRes = DataFile.Close(DataFilePath.c_str()); - - if (FAILED(hRes)) - { - zen::ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(zen::WideToUtf8(DataFilePath))); - } -} - -////////////////////////////////////////////////////////////////////////// - -ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir) : m_RootDir(RootDir), m_CasStore(Cas) -{ -} - -ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; - -bool -ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) -{ - CacheBucket* Bucket = nullptr; - - { - zen::RwLock::SharedLockScope _(m_Lock); - - auto it = m_Buckets.find(std::string(InBucket)); - - if (it != m_Buckets.end()) - { - Bucket = &it->second; - } - } - - if (Bucket == nullptr) - { - // Bucket needs to be opened/created - - zen::RwLock::ExclusiveLockScope _(m_Lock); - - auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); - Bucket = &It.first->second; - - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= std::string(InBucket); - - Bucket->OpenOrCreate(BucketPath.c_str()); - } - - ZEN_ASSERT(Bucket != nullptr); - - return Bucket->Get(HashKey, OutValue); -} - -void -ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) -{ - CacheBucket* Bucket = nullptr; - - { - zen::RwLock::SharedLockScope _(m_Lock); - - auto it = m_Buckets.find(std::string(InBucket)); - - if (it != m_Buckets.end()) - { - Bucket = &it->second; - } - } - - if (Bucket == nullptr) - { - // New bucket needs to be created - - zen::RwLock::ExclusiveLockScope _(m_Lock); - - auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); - Bucket = &It.first->second; - - std::filesystem::path bucketPath = m_RootDir; - bucketPath /= std::string(InBucket); - - Bucket->OpenOrCreate(bucketPath.c_str()); - } - - ZEN_ASSERT(Bucket != nullptr); - - if (Bucket->IsOk()) - { - Bucket->Put(HashKey, Value); - } -} - -void -ZenCacheDiskLayer::Flush() -{ - std::vector<CacheBucket*> Buckets; - Buckets.reserve(m_Buckets.size()); - - { - zen::RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_Buckets) - { - Buckets.push_back(&Kv.second); - } - } - - for (auto& Bucket : Buckets) - { - Bucket->Flush(); - } -} - -////////////////////////////////////////////////////////////////////////// - -ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore) -{ - ZEN_UNUSED(CacheStore); -} - -ZenCacheTracker::~ZenCacheTracker() -{ -} - -void -ZenCacheTracker::TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey) -{ - ZEN_UNUSED(Bucket); - ZEN_UNUSED(HashKey); -} - -void -ZenCacheTracker::Flush() -{ -} diff --git a/zenserver/cache/cachestore.h b/zenserver/cache/cachestore.h index 3e5fa5fc8..89c6396b8 100644 --- a/zenserver/cache/cachestore.h +++ b/zenserver/cache/cachestore.h @@ -82,101 +82,3 @@ private: zen::RwLock m_Lock; std::unordered_map<std::string, zen::IoBuffer> m_CacheMap; }; - -/****************************************************************************** - - /$$$$$$$$ /$$$$$$ /$$ - |_____ $$ /$$__ $$ | $$ - /$$/ /$$$$$$ /$$$$$$$ | $$ \__/ /$$$$$$ /$$$$$$| $$$$$$$ /$$$$$$ - /$$/ /$$__ $| $$__ $$ | $$ |____ $$/$$_____| $$__ $$/$$__ $$ - /$$/ | $$$$$$$| $$ \ $$ | $$ /$$$$$$| $$ | $$ \ $| $$$$$$$$ - /$$/ | $$_____| $$ | $$ | $$ $$/$$__ $| $$ | $$ | $| $$_____/ - /$$$$$$$| $$$$$$| $$ | $$ | $$$$$$| $$$$$$| $$$$$$| $$ | $| $$$$$$$ - |________/\_______|__/ |__/ \______/ \_______/\_______|__/ |__/\_______/ - - Cache store for UE5. Restricts keys to "{bucket}/{hash}" pairs where the hash - is 40 (hex) chars in size. Values may be opaque blobs or structured objects - which can in turn contain references to other objects (or blobs). - -******************************************************************************/ - -struct ZenCacheValue -{ - zen::IoBuffer Value; - bool IsCompactBinary = false; -}; - -class ZenCacheMemoryLayer -{ -public: - ZenCacheMemoryLayer(); - ~ZenCacheMemoryLayer(); - - bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); - -private: - struct CacheBucket - { - zen::RwLock m_bucketLock; - std::unordered_map<zen::IoHash, zen::IoBuffer, zen::IoHash::Hasher> m_cacheMap; - - bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); - }; - - zen::RwLock m_Lock; - std::unordered_map<std::string, CacheBucket> m_Buckets; -}; - -class ZenCacheDiskLayer -{ -public: - ZenCacheDiskLayer(zen::CasStore& Cas, const std::filesystem::path& RootDir); - ~ZenCacheDiskLayer(); - - bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); - - void Flush(); - -private: - /** A cache bucket manages a single directory containing - metadata and data for that bucket - */ - struct CacheBucket; - - zen::CasStore& m_CasStore; - std::filesystem::path m_RootDir; - zen::RwLock m_Lock; - std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive -}; - -class ZenCacheStore -{ -public: - ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir); - ~ZenCacheStore(); - - bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); - -private: - std::filesystem::path m_RootDir; - ZenCacheMemoryLayer m_MemLayer; - ZenCacheDiskLayer m_DiskLayer; -}; - -/** Tracks cache entry access, stats and orchestrates cleanup activities - */ -class ZenCacheTracker -{ -public: - ZenCacheTracker(ZenCacheStore& CacheStore); - ~ZenCacheTracker(); - - void TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey); - void Flush(); - -private: -}; diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 9e771a715..2704bedc3 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -6,8 +6,8 @@ #include <zencore/httpserver.h> #include <zencore/timer.h> -#include "cachestore.h" #include "structuredcache.h" +#include "structuredcachestore.h" #include "upstream/jupiter.h" #include <spdlog/spdlog.h> diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index ead9644f5..111a4af50 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -5,7 +5,7 @@ #include <zencore/httpserver.h> #include <zencore/refcount.h> -#include "cachestore.h" +#include "structuredcachestore.h" #include "upstream/jupiter.h" namespace zen { diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp new file mode 100644 index 000000000..3e0050fab --- /dev/null +++ b/zenserver/cache/structuredcachestore.cpp @@ -0,0 +1,554 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "structuredcachestore.h" + +#include <zencore/except.h> +#include <zencore/windows.h> + +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/string.h> +#include <zencore/thread.h> +#include <zenstore/basicfile.h> +#include <zenstore/cas.h> +#include <zenstore/caslog.h> + +#include <fmt/core.h> +#include <spdlog/spdlog.h> +#include <concepts> +#include <filesystem> +#include <gsl/gsl-lite.hpp> +#include <unordered_map> + +#include <atlfile.h> + +using namespace zen; +using namespace fmt::literals; + +////////////////////////////////////////////////////////////////////////// + +ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} +{ + zen::CreateDirectories(RootDir); +} + +ZenCacheStore::~ZenCacheStore() +{ +} + +bool +ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); + + if (!Ok) + { + Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); + +#if 0 // This would keep file handles open + if (ok) + { + m_memLayer.Put(InBucket, HashKey, OutValue); + } +#endif + } + + return Ok; +} + +void +ZenCacheStore::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + m_MemLayer.Put(InBucket, HashKey, Value); + m_DiskLayer.Put(InBucket, HashKey, Value); +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheMemoryLayer::ZenCacheMemoryLayer() +{ +} + +ZenCacheMemoryLayer::~ZenCacheMemoryLayer() +{ +} + +bool +ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto it = m_Buckets.find(std::string(InBucket)); + + if (it != m_Buckets.end()) + { + Bucket = &it->second; + } + } + + if (Bucket == nullptr) + return false; + + ZEN_ASSERT(Bucket != nullptr); + + return Bucket->Get(HashKey, OutValue); +} + +void +ZenCacheMemoryLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto it = m_Buckets.find(std::string(InBucket)); + + if (it != m_Buckets.end()) + { + Bucket = &it->second; + } + } + + if (Bucket == nullptr) + { + // New bucket + + RwLock::ExclusiveLockScope _(m_Lock); + + Bucket = &m_Buckets[std::string(InBucket)]; + } + + ZEN_ASSERT(Bucket != nullptr); + + Bucket->Put(HashKey, Value); +} + +bool +ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + RwLock::SharedLockScope _(m_bucketLock); + + auto bucketIt = m_cacheMap.find(HashKey); + + if (bucketIt == m_cacheMap.end()) + { + return false; + } + + OutValue.Value = bucketIt->second; + + return true; +} + +void +ZenCacheMemoryLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + RwLock::ExclusiveLockScope _(m_bucketLock); + + m_cacheMap[HashKey] = Value.Value; +} + +////////////////////////////////////////////////////////////////////////// + +#pragma pack(push) +#pragma pack(1) + +struct DiskLocation +{ + uint64_t Offset; + uint32_t Size; +}; + +struct DiskIndexEntry +{ + zen::IoHash Key; + DiskLocation Location; +}; + +#pragma pack(pop) + +static_assert(sizeof(DiskIndexEntry) == 32); + +struct ZenCacheDiskLayer::CacheBucket +{ + CacheBucket(CasStore& Cas); + ~CacheBucket(); + + void OpenOrCreate(std::filesystem::path BucketDir); + + bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); + void Flush(); + + inline bool IsOk() const { return m_Ok; } + +private: + CasStore& m_CasStore; + std::filesystem::path m_BucketDir; + Oid m_BucketId; + bool m_Ok = false; + uint64_t m_LargeObjectThreshold = 1024; + + BasicFile m_SobsFile; + TCasLogFile<DiskIndexEntry> m_SlogFile; + BasicFile m_SidxFile; + + void BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey); + void PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value); + + RwLock m_IndexLock; + std::unordered_map<zen::IoHash, DiskLocation, zen::IoHash::Hasher> m_Index; + uint64_t m_WriteCursor = 0; +}; + +ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) +{ +} + +ZenCacheDiskLayer::CacheBucket::~CacheBucket() +{ +} + +void +ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) +{ + std::filesystem::create_directories(BucketDir); + + m_BucketDir = BucketDir; + + std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; + std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"}; + std::filesystem::path SlogPath{m_BucketDir / "zen.slog"}; + std::filesystem::path SidxPath{m_BucketDir / "zen.sidx"}; + + CAtlFile ManifestFile; + + // Try opening existing file first + + bool IsNew = false; + + HRESULT hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, OPEN_EXISTING); + + if (SUCCEEDED(hRes)) + { + ULONGLONG FileSize; + ManifestFile.GetSize(FileSize); + + if (FileSize == sizeof(Oid)) + { + hRes = ManifestFile.Read(&m_BucketId, sizeof(Oid)); + + if (SUCCEEDED(hRes)) + { + m_Ok = true; + } + } + + if (!m_Ok) + ManifestFile.Close(); + } + + if (!m_Ok) + { + // This is a new bucket + + hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS); + + if (FAILED(hRes)) + { + ThrowLastError("Failed to create bucket manifest '{}'"_format(ManifestPath)); + } + + m_BucketId.Generate(); + + hRes = ManifestFile.Write(&m_BucketId, sizeof(Oid)); + + IsNew = true; + } + + // Initialize small object storage related files + + m_SobsFile.Open(SobsPath, IsNew); + m_SidxFile.Open(SidxPath, IsNew); + + // Open and replay log + + m_SlogFile.Open(SlogPath, IsNew); + + uint64_t MaxFileOffset = 0; + + { + // This is not technically necessary but may help future static analysis + zen::RwLock::ExclusiveLockScope _(m_IndexLock); + + m_SlogFile.Replay([&](const DiskIndexEntry& Record) { + m_Index[Record.Key] = Record.Location; + + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset + Record.Location.Size); + }); + } + + m_WriteCursor = (MaxFileOffset + 15) & ~15; + + m_Ok = true; +} + +void +ZenCacheDiskLayer::CacheBucket::BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey) +{ + char hex[sizeof(HashKey.Hash) * 2]; + ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, hex); + + Path.Append(m_BucketDir.c_str()); + Path.Append(L"/"); + Path.AppendAsciiRange(hex, hex + sizeof(hex)); +} + +bool +ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + if (!m_Ok) + return false; + + { + zen::RwLock::SharedLockScope _(m_IndexLock); + + if (auto it = m_Index.find(HashKey); it != m_Index.end()) + { + OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), it->second.Offset, it->second.Size); + + return true; + } + } + + WideStringBuilder<128> DataFilePath; + BuildPath(DataFilePath, HashKey); + + zen::IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str()); + + if (!Data) + { + return false; + } + + OutValue.Value = Data; + + // TODO: should populate index? + + return true; +} + +void +ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + if (!m_Ok) + { + return; + } + + if (Value.Value.Size() >= m_LargeObjectThreshold) + { + return PutLargeObject(HashKey, Value); + } + + // Small object put + + zen::RwLock::ExclusiveLockScope _(m_IndexLock); + + auto it = m_Index.find(HashKey); + + DiskLocation Loc{.Offset = m_WriteCursor, .Size = gsl::narrow<uint32_t>(Value.Value.Size())}; + + m_WriteCursor = (m_WriteCursor + Loc.Size + 15) & ~15; + + if (it == m_Index.end()) + { + m_Index.insert({HashKey, Loc}); + } + else + { + // TODO: should check if write is idempotent and bail out if it is? + + it->second = Loc; + } + + DiskIndexEntry IndexEntry{.Key = HashKey, .Location = Loc}; + + m_SlogFile.Append(IndexEntry); + + m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset); + + return; +} + +void +ZenCacheDiskLayer::CacheBucket::Flush() +{ + m_SobsFile.Flush(); + m_SidxFile.Flush(); + m_SlogFile.Flush(); +} + +void +ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + zen::WideStringBuilder<128> DataFilePath; + BuildPath(DataFilePath, HashKey); + + // TODO: replace this with a more efficient implementation with proper atomic rename + + CAtlTemporaryFile DataFile; + + HRESULT hRes = DataFile.Create(m_BucketDir.c_str()); + + if (FAILED(hRes)) + { + zen::ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); + } + + hRes = DataFile.Write(Value.Value.Data(), gsl::narrow<DWORD>(Value.Value.Size())); + + if (FAILED(hRes)) + { + zen::ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); + } + + hRes = DataFile.Close(DataFilePath.c_str()); + + if (FAILED(hRes)) + { + zen::ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(zen::WideToUtf8(DataFilePath))); + } +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir) : m_RootDir(RootDir), m_CasStore(Cas) +{ +} + +ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; + +bool +ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + CacheBucket* Bucket = nullptr; + + { + zen::RwLock::SharedLockScope _(m_Lock); + + auto it = m_Buckets.find(std::string(InBucket)); + + if (it != m_Buckets.end()) + { + Bucket = &it->second; + } + } + + if (Bucket == nullptr) + { + // Bucket needs to be opened/created + + zen::RwLock::ExclusiveLockScope _(m_Lock); + + auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); + Bucket = &It.first->second; + + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= std::string(InBucket); + + Bucket->OpenOrCreate(BucketPath.c_str()); + } + + ZEN_ASSERT(Bucket != nullptr); + + return Bucket->Get(HashKey, OutValue); +} + +void +ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + CacheBucket* Bucket = nullptr; + + { + zen::RwLock::SharedLockScope _(m_Lock); + + auto it = m_Buckets.find(std::string(InBucket)); + + if (it != m_Buckets.end()) + { + Bucket = &it->second; + } + } + + if (Bucket == nullptr) + { + // New bucket needs to be created + + zen::RwLock::ExclusiveLockScope _(m_Lock); + + auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); + Bucket = &It.first->second; + + std::filesystem::path bucketPath = m_RootDir; + bucketPath /= std::string(InBucket); + + Bucket->OpenOrCreate(bucketPath.c_str()); + } + + ZEN_ASSERT(Bucket != nullptr); + + if (Bucket->IsOk()) + { + Bucket->Put(HashKey, Value); + } +} + +void +ZenCacheDiskLayer::Flush() +{ + std::vector<CacheBucket*> Buckets; + Buckets.reserve(m_Buckets.size()); + + { + zen::RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + Buckets.push_back(&Kv.second); + } + } + + for (auto& Bucket : Buckets) + { + Bucket->Flush(); + } +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore) +{ + ZEN_UNUSED(CacheStore); +} + +ZenCacheTracker::~ZenCacheTracker() +{ +} + +void +ZenCacheTracker::TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey) +{ + ZEN_UNUSED(Bucket); + ZEN_UNUSED(HashKey); +} + +void +ZenCacheTracker::Flush() +{ +} diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h new file mode 100644 index 000000000..7936f9d84 --- /dev/null +++ b/zenserver/cache/structuredcachestore.h @@ -0,0 +1,117 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/IoBuffer.h> +#include <zencore/iohash.h> +#include <zencore/thread.h> +#include <zencore/uid.h> +#include <zenstore/cas.h> +#include <compare> +#include <filesystem> +#include <unordered_map> + +namespace zen { + +class WideStringBuilderBase; +class CasStore; + +} // namespace zen + +/****************************************************************************** + + /$$$$$$$$ /$$$$$$ /$$ + |_____ $$ /$$__ $$ | $$ + /$$/ /$$$$$$ /$$$$$$$ | $$ \__/ /$$$$$$ /$$$$$$| $$$$$$$ /$$$$$$ + /$$/ /$$__ $| $$__ $$ | $$ |____ $$/$$_____| $$__ $$/$$__ $$ + /$$/ | $$$$$$$| $$ \ $$ | $$ /$$$$$$| $$ | $$ \ $| $$$$$$$$ + /$$/ | $$_____| $$ | $$ | $$ $$/$$__ $| $$ | $$ | $| $$_____/ + /$$$$$$$| $$$$$$| $$ | $$ | $$$$$$| $$$$$$| $$$$$$| $$ | $| $$$$$$$ + |________/\_______|__/ |__/ \______/ \_______/\_______|__/ |__/\_______/ + + Cache store for UE5. Restricts keys to "{bucket}/{hash}" pairs where the hash + is 40 (hex) chars in size. Values may be opaque blobs or structured objects + which can in turn contain references to other objects (or blobs). + +******************************************************************************/ + +struct ZenCacheValue +{ + zen::IoBuffer Value; + bool IsCompactBinary = false; +}; + +class ZenCacheMemoryLayer +{ +public: + ZenCacheMemoryLayer(); + ~ZenCacheMemoryLayer(); + + bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + +private: + struct CacheBucket + { + zen::RwLock m_bucketLock; + std::unordered_map<zen::IoHash, zen::IoBuffer, zen::IoHash::Hasher> m_cacheMap; + + bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); + }; + + zen::RwLock m_Lock; + std::unordered_map<std::string, CacheBucket> m_Buckets; +}; + +class ZenCacheDiskLayer +{ +public: + ZenCacheDiskLayer(zen::CasStore& Cas, const std::filesystem::path& RootDir); + ~ZenCacheDiskLayer(); + + bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + + void Flush(); + +private: + /** A cache bucket manages a single directory containing + metadata and data for that bucket + */ + struct CacheBucket; + + zen::CasStore& m_CasStore; + std::filesystem::path m_RootDir; + zen::RwLock m_Lock; + std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive +}; + +class ZenCacheStore +{ +public: + ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir); + ~ZenCacheStore(); + + bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + +private: + std::filesystem::path m_RootDir; + ZenCacheMemoryLayer m_MemLayer; + ZenCacheDiskLayer m_DiskLayer; +}; + +/** Tracks cache entry access, stats and orchestrates cleanup activities + */ +class ZenCacheTracker +{ +public: + ZenCacheTracker(ZenCacheStore& CacheStore); + ~ZenCacheTracker(); + + void TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey); + void Flush(); + +private: +}; diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 319a6f781..755e0bca4 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -2,7 +2,7 @@ #include "jupiter.h" -#include "cache/cachestore.h" +#include "cache/structuredcachestore.h" #include <fmt/format.h> #include <zencore/compactbinary.h> diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index 2ba6bd551..449603334 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -105,6 +105,7 @@ <ItemGroup> <ClInclude Include="admin\admin.h" /> <ClInclude Include="cache\structuredcache.h" /> + <ClInclude Include="cache\structuredcachestore.h" /> <ClInclude Include="config.h" /> <ClInclude Include="diag\crashreport.h" /> <ClInclude Include="diag\logging.h" /> @@ -124,6 +125,7 @@ <ItemGroup> <ClCompile Include="cache\kvcache.cpp" /> <ClCompile Include="cache\structuredcache.cpp" /> + <ClCompile Include="cache\structuredcachestore.cpp" /> <ClCompile Include="config.cpp" /> <ClCompile Include="diag\crashreport.cpp" /> <ClCompile Include="diag\logging.cpp" /> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index fcf869e19..a8375b0b4 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -36,6 +36,7 @@ <ClInclude Include="config.h" /> <ClInclude Include="diag\logging.h" /> <ClInclude Include="diag\crashreport.h" /> + <ClInclude Include="cache\structuredcachestore.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="zenserver.cpp" /> @@ -67,6 +68,7 @@ <ClCompile Include="config.cpp" /> <ClCompile Include="diag\logging.cpp" /> <ClCompile Include="diag\crashreport.cpp" /> + <ClCompile Include="cache\structuredcachestore.cpp" /> </ItemGroup> <ItemGroup> <Filter Include="cache"> |