aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-05-22 11:53:09 +0200
committerStefan Boberg <[email protected]>2021-05-22 11:53:09 +0200
commitf93d04dd9381eac38be82733c34baa2075efa11c (patch)
treeb5c4f9a173042580398e4dd96be97702909cc06b /zenserver/cache/structuredcachestore.cpp
parentStructured cache changes (diff)
downloadzen-f93d04dd9381eac38be82733c34baa2075efa11c.tar.xz
zen-f93d04dd9381eac38be82733c34baa2075efa11c.zip
Split out structured cache store code into dedicated cpp/h pair
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp554
1 files changed, 554 insertions, 0 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
new file mode 100644
index 000000000..3e0050fab
--- /dev/null
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -0,0 +1,554 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "structuredcachestore.h"
+
+#include <zencore/except.h>
+#include <zencore/windows.h>
+
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
+#include <zencore/string.h>
+#include <zencore/thread.h>
+#include <zenstore/basicfile.h>
+#include <zenstore/cas.h>
+#include <zenstore/caslog.h>
+
+#include <fmt/core.h>
+#include <spdlog/spdlog.h>
+#include <concepts>
+#include <filesystem>
+#include <gsl/gsl-lite.hpp>
+#include <unordered_map>
+
+#include <atlfile.h>
+
+using namespace zen;
+using namespace fmt::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir}
+{
+ zen::CreateDirectories(RootDir);
+}
+
+ZenCacheStore::~ZenCacheStore()
+{
+}
+
+bool
+ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue);
+
+ if (!Ok)
+ {
+ Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
+
+#if 0 // This would keep file handles open
+ if (ok)
+ {
+ m_memLayer.Put(InBucket, HashKey, OutValue);
+ }
+#endif
+ }
+
+ return Ok;
+}
+
+void
+ZenCacheStore::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value)
+{
+ m_MemLayer.Put(InBucket, HashKey, Value);
+ m_DiskLayer.Put(InBucket, HashKey, Value);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenCacheMemoryLayer::ZenCacheMemoryLayer()
+{
+}
+
+ZenCacheMemoryLayer::~ZenCacheMemoryLayer()
+{
+}
+
+bool
+ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ CacheBucket* Bucket = nullptr;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ auto it = m_Buckets.find(std::string(InBucket));
+
+ if (it != m_Buckets.end())
+ {
+ Bucket = &it->second;
+ }
+ }
+
+ if (Bucket == nullptr)
+ return false;
+
+ ZEN_ASSERT(Bucket != nullptr);
+
+ return Bucket->Get(HashKey, OutValue);
+}
+
+void
+ZenCacheMemoryLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value)
+{
+ CacheBucket* Bucket = nullptr;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ auto it = m_Buckets.find(std::string(InBucket));
+
+ if (it != m_Buckets.end())
+ {
+ Bucket = &it->second;
+ }
+ }
+
+ if (Bucket == nullptr)
+ {
+ // New bucket
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ Bucket = &m_Buckets[std::string(InBucket)];
+ }
+
+ ZEN_ASSERT(Bucket != nullptr);
+
+ Bucket->Put(HashKey, Value);
+}
+
+bool
+ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ RwLock::SharedLockScope _(m_bucketLock);
+
+ auto bucketIt = m_cacheMap.find(HashKey);
+
+ if (bucketIt == m_cacheMap.end())
+ {
+ return false;
+ }
+
+ OutValue.Value = bucketIt->second;
+
+ return true;
+}
+
+void
+ZenCacheMemoryLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value)
+{
+ RwLock::ExclusiveLockScope _(m_bucketLock);
+
+ m_cacheMap[HashKey] = Value.Value;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#pragma pack(push)
+#pragma pack(1)
+
+struct DiskLocation
+{
+ uint64_t Offset;
+ uint32_t Size;
+};
+
+struct DiskIndexEntry
+{
+ zen::IoHash Key;
+ DiskLocation Location;
+};
+
+#pragma pack(pop)
+
+static_assert(sizeof(DiskIndexEntry) == 32);
+
+struct ZenCacheDiskLayer::CacheBucket
+{
+ CacheBucket(CasStore& Cas);
+ ~CacheBucket();
+
+ void OpenOrCreate(std::filesystem::path BucketDir);
+
+ bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value);
+ void Flush();
+
+ inline bool IsOk() const { return m_Ok; }
+
+private:
+ CasStore& m_CasStore;
+ std::filesystem::path m_BucketDir;
+ Oid m_BucketId;
+ bool m_Ok = false;
+ uint64_t m_LargeObjectThreshold = 1024;
+
+ BasicFile m_SobsFile;
+ TCasLogFile<DiskIndexEntry> m_SlogFile;
+ BasicFile m_SidxFile;
+
+ void BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey);
+ void PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value);
+
+ RwLock m_IndexLock;
+ std::unordered_map<zen::IoHash, DiskLocation, zen::IoHash::Hasher> m_Index;
+ uint64_t m_WriteCursor = 0;
+};
+
+ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas)
+{
+}
+
+ZenCacheDiskLayer::CacheBucket::~CacheBucket()
+{
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
+{
+ std::filesystem::create_directories(BucketDir);
+
+ m_BucketDir = BucketDir;
+
+ std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"};
+ std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"};
+ std::filesystem::path SlogPath{m_BucketDir / "zen.slog"};
+ std::filesystem::path SidxPath{m_BucketDir / "zen.sidx"};
+
+ CAtlFile ManifestFile;
+
+ // Try opening existing file first
+
+ bool IsNew = false;
+
+ HRESULT hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, OPEN_EXISTING);
+
+ if (SUCCEEDED(hRes))
+ {
+ ULONGLONG FileSize;
+ ManifestFile.GetSize(FileSize);
+
+ if (FileSize == sizeof(Oid))
+ {
+ hRes = ManifestFile.Read(&m_BucketId, sizeof(Oid));
+
+ if (SUCCEEDED(hRes))
+ {
+ m_Ok = true;
+ }
+ }
+
+ if (!m_Ok)
+ ManifestFile.Close();
+ }
+
+ if (!m_Ok)
+ {
+ // This is a new bucket
+
+ hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS);
+
+ if (FAILED(hRes))
+ {
+ ThrowLastError("Failed to create bucket manifest '{}'"_format(ManifestPath));
+ }
+
+ m_BucketId.Generate();
+
+ hRes = ManifestFile.Write(&m_BucketId, sizeof(Oid));
+
+ IsNew = true;
+ }
+
+ // Initialize small object storage related files
+
+ m_SobsFile.Open(SobsPath, IsNew);
+ m_SidxFile.Open(SidxPath, IsNew);
+
+ // Open and replay log
+
+ m_SlogFile.Open(SlogPath, IsNew);
+
+ uint64_t MaxFileOffset = 0;
+
+ {
+ // This is not technically necessary but may help future static analysis
+ zen::RwLock::ExclusiveLockScope _(m_IndexLock);
+
+ m_SlogFile.Replay([&](const DiskIndexEntry& Record) {
+ m_Index[Record.Key] = Record.Location;
+
+ MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset + Record.Location.Size);
+ });
+ }
+
+ m_WriteCursor = (MaxFileOffset + 15) & ~15;
+
+ m_Ok = true;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey)
+{
+ char hex[sizeof(HashKey.Hash) * 2];
+ ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, hex);
+
+ Path.Append(m_BucketDir.c_str());
+ Path.Append(L"/");
+ Path.AppendAsciiRange(hex, hex + sizeof(hex));
+}
+
+bool
+ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ if (!m_Ok)
+ return false;
+
+ {
+ zen::RwLock::SharedLockScope _(m_IndexLock);
+
+ if (auto it = m_Index.find(HashKey); it != m_Index.end())
+ {
+ OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), it->second.Offset, it->second.Size);
+
+ return true;
+ }
+ }
+
+ WideStringBuilder<128> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+
+ zen::IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str());
+
+ if (!Data)
+ {
+ return false;
+ }
+
+ OutValue.Value = Data;
+
+ // TODO: should populate index?
+
+ return true;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value)
+{
+ if (!m_Ok)
+ {
+ return;
+ }
+
+ if (Value.Value.Size() >= m_LargeObjectThreshold)
+ {
+ return PutLargeObject(HashKey, Value);
+ }
+
+ // Small object put
+
+ zen::RwLock::ExclusiveLockScope _(m_IndexLock);
+
+ auto it = m_Index.find(HashKey);
+
+ DiskLocation Loc{.Offset = m_WriteCursor, .Size = gsl::narrow<uint32_t>(Value.Value.Size())};
+
+ m_WriteCursor = (m_WriteCursor + Loc.Size + 15) & ~15;
+
+ if (it == m_Index.end())
+ {
+ m_Index.insert({HashKey, Loc});
+ }
+ else
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+
+ it->second = Loc;
+ }
+
+ DiskIndexEntry IndexEntry{.Key = HashKey, .Location = Loc};
+
+ m_SlogFile.Append(IndexEntry);
+
+ m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset);
+
+ return;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::Flush()
+{
+ m_SobsFile.Flush();
+ m_SidxFile.Flush();
+ m_SlogFile.Flush();
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value)
+{
+ zen::WideStringBuilder<128> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+
+ // TODO: replace this with a more efficient implementation with proper atomic rename
+
+ CAtlTemporaryFile DataFile;
+
+ HRESULT hRes = DataFile.Create(m_BucketDir.c_str());
+
+ if (FAILED(hRes))
+ {
+ zen::ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir));
+ }
+
+ hRes = DataFile.Write(Value.Value.Data(), gsl::narrow<DWORD>(Value.Value.Size()));
+
+ if (FAILED(hRes))
+ {
+ zen::ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size())));
+ }
+
+ hRes = DataFile.Close(DataFilePath.c_str());
+
+ if (FAILED(hRes))
+ {
+ zen::ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(zen::WideToUtf8(DataFilePath)));
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir) : m_RootDir(RootDir), m_CasStore(Cas)
+{
+}
+
+ZenCacheDiskLayer::~ZenCacheDiskLayer() = default;
+
+bool
+ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ CacheBucket* Bucket = nullptr;
+
+ {
+ zen::RwLock::SharedLockScope _(m_Lock);
+
+ auto it = m_Buckets.find(std::string(InBucket));
+
+ if (it != m_Buckets.end())
+ {
+ Bucket = &it->second;
+ }
+ }
+
+ if (Bucket == nullptr)
+ {
+ // Bucket needs to be opened/created
+
+ zen::RwLock::ExclusiveLockScope _(m_Lock);
+
+ auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore);
+ Bucket = &It.first->second;
+
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= std::string(InBucket);
+
+ Bucket->OpenOrCreate(BucketPath.c_str());
+ }
+
+ ZEN_ASSERT(Bucket != nullptr);
+
+ return Bucket->Get(HashKey, OutValue);
+}
+
+void
+ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value)
+{
+ CacheBucket* Bucket = nullptr;
+
+ {
+ zen::RwLock::SharedLockScope _(m_Lock);
+
+ auto it = m_Buckets.find(std::string(InBucket));
+
+ if (it != m_Buckets.end())
+ {
+ Bucket = &it->second;
+ }
+ }
+
+ if (Bucket == nullptr)
+ {
+ // New bucket needs to be created
+
+ zen::RwLock::ExclusiveLockScope _(m_Lock);
+
+ auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore);
+ Bucket = &It.first->second;
+
+ std::filesystem::path bucketPath = m_RootDir;
+ bucketPath /= std::string(InBucket);
+
+ Bucket->OpenOrCreate(bucketPath.c_str());
+ }
+
+ ZEN_ASSERT(Bucket != nullptr);
+
+ if (Bucket->IsOk())
+ {
+ Bucket->Put(HashKey, Value);
+ }
+}
+
+void
+ZenCacheDiskLayer::Flush()
+{
+ std::vector<CacheBucket*> Buckets;
+ Buckets.reserve(m_Buckets.size());
+
+ {
+ zen::RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ Buckets.push_back(&Kv.second);
+ }
+ }
+
+ for (auto& Bucket : Buckets)
+ {
+ Bucket->Flush();
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore)
+{
+ ZEN_UNUSED(CacheStore);
+}
+
+ZenCacheTracker::~ZenCacheTracker()
+{
+}
+
+void
+ZenCacheTracker::TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey)
+{
+ ZEN_UNUSED(Bucket);
+ ZEN_UNUSED(HashKey);
+}
+
+void
+ZenCacheTracker::Flush()
+{
+}