diff options
| author | Stefan Boberg <[email protected]> | 2021-05-22 11:53:09 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-05-22 11:53:09 +0200 |
| commit | f93d04dd9381eac38be82733c34baa2075efa11c (patch) | |
| tree | b5c4f9a173042580398e4dd96be97702909cc06b /zenserver/cache/structuredcachestore.cpp | |
| parent | Structured cache changes (diff) | |
| download | zen-f93d04dd9381eac38be82733c34baa2075efa11c.tar.xz zen-f93d04dd9381eac38be82733c34baa2075efa11c.zip | |
Split out structured cache store code into dedicated cpp/h pair
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 554 |
1 files changed, 554 insertions, 0 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp new file mode 100644 index 000000000..3e0050fab --- /dev/null +++ b/zenserver/cache/structuredcachestore.cpp @@ -0,0 +1,554 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "structuredcachestore.h" + +#include <zencore/except.h> +#include <zencore/windows.h> + +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/string.h> +#include <zencore/thread.h> +#include <zenstore/basicfile.h> +#include <zenstore/cas.h> +#include <zenstore/caslog.h> + +#include <fmt/core.h> +#include <spdlog/spdlog.h> +#include <concepts> +#include <filesystem> +#include <gsl/gsl-lite.hpp> +#include <unordered_map> + +#include <atlfile.h> + +using namespace zen; +using namespace fmt::literals; + +////////////////////////////////////////////////////////////////////////// + +ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} +{ + zen::CreateDirectories(RootDir); +} + +ZenCacheStore::~ZenCacheStore() +{ +} + +bool +ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); + + if (!Ok) + { + Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); + +#if 0 // This would keep file handles open + if (ok) + { + m_memLayer.Put(InBucket, HashKey, OutValue); + } +#endif + } + + return Ok; +} + +void +ZenCacheStore::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + m_MemLayer.Put(InBucket, HashKey, Value); + m_DiskLayer.Put(InBucket, HashKey, Value); +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheMemoryLayer::ZenCacheMemoryLayer() +{ +} + +ZenCacheMemoryLayer::~ZenCacheMemoryLayer() +{ +} + +bool +ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto it = m_Buckets.find(std::string(InBucket)); + + if (it != m_Buckets.end()) + { + Bucket = &it->second; + } + } + + if (Bucket == nullptr) + return false; + + ZEN_ASSERT(Bucket != nullptr); + + return Bucket->Get(HashKey, OutValue); +} + +void +ZenCacheMemoryLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto it = m_Buckets.find(std::string(InBucket)); + + if (it != m_Buckets.end()) + { + Bucket = &it->second; + } + } + + if (Bucket == nullptr) + { + // New bucket + + RwLock::ExclusiveLockScope _(m_Lock); + + Bucket = &m_Buckets[std::string(InBucket)]; + } + + ZEN_ASSERT(Bucket != nullptr); + + Bucket->Put(HashKey, Value); +} + +bool +ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + RwLock::SharedLockScope _(m_bucketLock); + + auto bucketIt = m_cacheMap.find(HashKey); + + if (bucketIt == m_cacheMap.end()) + { + return false; + } + + OutValue.Value = bucketIt->second; + + return true; +} + +void +ZenCacheMemoryLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + RwLock::ExclusiveLockScope _(m_bucketLock); + + m_cacheMap[HashKey] = Value.Value; +} + +////////////////////////////////////////////////////////////////////////// + +#pragma pack(push) +#pragma pack(1) + +struct DiskLocation +{ + uint64_t Offset; + uint32_t Size; +}; + +struct DiskIndexEntry +{ + zen::IoHash Key; + DiskLocation Location; +}; + +#pragma pack(pop) + +static_assert(sizeof(DiskIndexEntry) == 32); + +struct ZenCacheDiskLayer::CacheBucket +{ + CacheBucket(CasStore& Cas); + ~CacheBucket(); + + void OpenOrCreate(std::filesystem::path BucketDir); + + bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); + void Flush(); + + inline bool IsOk() const { return m_Ok; } + +private: + CasStore& m_CasStore; + std::filesystem::path m_BucketDir; + Oid m_BucketId; + bool m_Ok = false; + uint64_t m_LargeObjectThreshold = 1024; + + BasicFile m_SobsFile; + TCasLogFile<DiskIndexEntry> m_SlogFile; + BasicFile m_SidxFile; + + void BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey); + void PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value); + + RwLock m_IndexLock; + std::unordered_map<zen::IoHash, DiskLocation, zen::IoHash::Hasher> m_Index; + uint64_t m_WriteCursor = 0; +}; + +ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) +{ +} + +ZenCacheDiskLayer::CacheBucket::~CacheBucket() +{ +} + +void +ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) +{ + std::filesystem::create_directories(BucketDir); + + m_BucketDir = BucketDir; + + std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; + std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"}; + std::filesystem::path SlogPath{m_BucketDir / "zen.slog"}; + std::filesystem::path SidxPath{m_BucketDir / "zen.sidx"}; + + CAtlFile ManifestFile; + + // Try opening existing file first + + bool IsNew = false; + + HRESULT hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, OPEN_EXISTING); + + if (SUCCEEDED(hRes)) + { + ULONGLONG FileSize; + ManifestFile.GetSize(FileSize); + + if (FileSize == sizeof(Oid)) + { + hRes = ManifestFile.Read(&m_BucketId, sizeof(Oid)); + + if (SUCCEEDED(hRes)) + { + m_Ok = true; + } + } + + if (!m_Ok) + ManifestFile.Close(); + } + + if (!m_Ok) + { + // This is a new bucket + + hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS); + + if (FAILED(hRes)) + { + ThrowLastError("Failed to create bucket manifest '{}'"_format(ManifestPath)); + } + + m_BucketId.Generate(); + + hRes = ManifestFile.Write(&m_BucketId, sizeof(Oid)); + + IsNew = true; + } + + // Initialize small object storage related files + + m_SobsFile.Open(SobsPath, IsNew); + m_SidxFile.Open(SidxPath, IsNew); + + // Open and replay log + + m_SlogFile.Open(SlogPath, IsNew); + + uint64_t MaxFileOffset = 0; + + { + // This is not technically necessary but may help future static analysis + zen::RwLock::ExclusiveLockScope _(m_IndexLock); + + m_SlogFile.Replay([&](const DiskIndexEntry& Record) { + m_Index[Record.Key] = Record.Location; + + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset + Record.Location.Size); + }); + } + + m_WriteCursor = (MaxFileOffset + 15) & ~15; + + m_Ok = true; +} + +void +ZenCacheDiskLayer::CacheBucket::BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey) +{ + char hex[sizeof(HashKey.Hash) * 2]; + ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, hex); + + Path.Append(m_BucketDir.c_str()); + Path.Append(L"/"); + Path.AppendAsciiRange(hex, hex + sizeof(hex)); +} + +bool +ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + if (!m_Ok) + return false; + + { + zen::RwLock::SharedLockScope _(m_IndexLock); + + if (auto it = m_Index.find(HashKey); it != m_Index.end()) + { + OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), it->second.Offset, it->second.Size); + + return true; + } + } + + WideStringBuilder<128> DataFilePath; + BuildPath(DataFilePath, HashKey); + + zen::IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str()); + + if (!Data) + { + return false; + } + + OutValue.Value = Data; + + // TODO: should populate index? + + return true; +} + +void +ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + if (!m_Ok) + { + return; + } + + if (Value.Value.Size() >= m_LargeObjectThreshold) + { + return PutLargeObject(HashKey, Value); + } + + // Small object put + + zen::RwLock::ExclusiveLockScope _(m_IndexLock); + + auto it = m_Index.find(HashKey); + + DiskLocation Loc{.Offset = m_WriteCursor, .Size = gsl::narrow<uint32_t>(Value.Value.Size())}; + + m_WriteCursor = (m_WriteCursor + Loc.Size + 15) & ~15; + + if (it == m_Index.end()) + { + m_Index.insert({HashKey, Loc}); + } + else + { + // TODO: should check if write is idempotent and bail out if it is? + + it->second = Loc; + } + + DiskIndexEntry IndexEntry{.Key = HashKey, .Location = Loc}; + + m_SlogFile.Append(IndexEntry); + + m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset); + + return; +} + +void +ZenCacheDiskLayer::CacheBucket::Flush() +{ + m_SobsFile.Flush(); + m_SidxFile.Flush(); + m_SlogFile.Flush(); +} + +void +ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + zen::WideStringBuilder<128> DataFilePath; + BuildPath(DataFilePath, HashKey); + + // TODO: replace this with a more efficient implementation with proper atomic rename + + CAtlTemporaryFile DataFile; + + HRESULT hRes = DataFile.Create(m_BucketDir.c_str()); + + if (FAILED(hRes)) + { + zen::ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); + } + + hRes = DataFile.Write(Value.Value.Data(), gsl::narrow<DWORD>(Value.Value.Size())); + + if (FAILED(hRes)) + { + zen::ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); + } + + hRes = DataFile.Close(DataFilePath.c_str()); + + if (FAILED(hRes)) + { + zen::ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(zen::WideToUtf8(DataFilePath))); + } +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir) : m_RootDir(RootDir), m_CasStore(Cas) +{ +} + +ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; + +bool +ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + CacheBucket* Bucket = nullptr; + + { + zen::RwLock::SharedLockScope _(m_Lock); + + auto it = m_Buckets.find(std::string(InBucket)); + + if (it != m_Buckets.end()) + { + Bucket = &it->second; + } + } + + if (Bucket == nullptr) + { + // Bucket needs to be opened/created + + zen::RwLock::ExclusiveLockScope _(m_Lock); + + auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); + Bucket = &It.first->second; + + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= std::string(InBucket); + + Bucket->OpenOrCreate(BucketPath.c_str()); + } + + ZEN_ASSERT(Bucket != nullptr); + + return Bucket->Get(HashKey, OutValue); +} + +void +ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + CacheBucket* Bucket = nullptr; + + { + zen::RwLock::SharedLockScope _(m_Lock); + + auto it = m_Buckets.find(std::string(InBucket)); + + if (it != m_Buckets.end()) + { + Bucket = &it->second; + } + } + + if (Bucket == nullptr) + { + // New bucket needs to be created + + zen::RwLock::ExclusiveLockScope _(m_Lock); + + auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); + Bucket = &It.first->second; + + std::filesystem::path bucketPath = m_RootDir; + bucketPath /= std::string(InBucket); + + Bucket->OpenOrCreate(bucketPath.c_str()); + } + + ZEN_ASSERT(Bucket != nullptr); + + if (Bucket->IsOk()) + { + Bucket->Put(HashKey, Value); + } +} + +void +ZenCacheDiskLayer::Flush() +{ + std::vector<CacheBucket*> Buckets; + Buckets.reserve(m_Buckets.size()); + + { + zen::RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + Buckets.push_back(&Kv.second); + } + } + + for (auto& Bucket : Buckets) + { + Bucket->Flush(); + } +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore) +{ + ZEN_UNUSED(CacheStore); +} + +ZenCacheTracker::~ZenCacheTracker() +{ +} + +void +ZenCacheTracker::TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey) +{ + ZEN_UNUSED(Bucket); + ZEN_UNUSED(HashKey); +} + +void +ZenCacheTracker::Flush() +{ +} |