aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-05-25 09:54:09 +0200
committerStefan Boberg <[email protected]>2021-05-25 09:54:09 +0200
commit882e93e4786f9e67e0edf6c276b16bb40848bae9 (patch)
treec5d4c45679c676c6aeb804c7601f43340b78ea0b /zenserver/cache/structuredcachestore.cpp
parentUpdated structured cache description (diff)
parentCompile out all rocksdb code for a smaller binary (diff)
downloadzen-882e93e4786f9e67e0edf6c276b16bb40848bae9.tar.xz
zen-882e93e4786f9e67e0edf6c276b16bb40848bae9.zip
Merged from origin/main
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp625
1 files changed, 625 insertions, 0 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
new file mode 100644
index 000000000..764bce93e
--- /dev/null
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -0,0 +1,625 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "structuredcachestore.h"
+
+#include <zencore/except.h>
+#include <zencore/windows.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
+#include <zencore/string.h>
+#include <zencore/thread.h>
+#include <zenstore/basicfile.h>
+#include <zenstore/cas.h>
+#include <zenstore/caslog.h>
+
+#include <fmt/core.h>
+#include <spdlog/spdlog.h>
+#include <concepts>
+#include <filesystem>
+#include <gsl/gsl-lite.hpp>
+#include <unordered_map>
+
+#include <atlfile.h>
+
+using namespace zen;
+using namespace fmt::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir}
+{
+ zen::CreateDirectories(RootDir);
+}
+
+ZenCacheStore::~ZenCacheStore()
+{
+}
+
+bool
+ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue);
+
+ if (Ok)
+ {
+ ZEN_ASSERT(OutValue.Value.Size());
+ }
+
+ if (!Ok)
+ {
+ Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
+
+ if (Ok)
+ {
+ ZEN_ASSERT(OutValue.Value.Size());
+ }
+
+ if (Ok && (OutValue.Value.Size() <= m_DiskLayerSizeThreshold))
+ {
+ m_MemLayer.Put(InBucket, HashKey, OutValue);
+ }
+ }
+
+ return Ok;
+}
+
+void
+ZenCacheStore::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value)
+{
+ // Store value and index
+
+ ZEN_ASSERT(Value.Value.Size());
+
+ m_DiskLayer.Put(InBucket, HashKey, Value);
+
+ if (Value.Value.Size() <= m_DiskLayerSizeThreshold)
+ {
+ m_MemLayer.Put(InBucket, HashKey, Value);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenCacheMemoryLayer::ZenCacheMemoryLayer()
+{
+}
+
+ZenCacheMemoryLayer::~ZenCacheMemoryLayer()
+{
+}
+
+bool
+ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ CacheBucket* Bucket = nullptr;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ auto it = m_Buckets.find(std::string(InBucket));
+
+ if (it != m_Buckets.end())
+ {
+ Bucket = &it->second;
+ }
+ }
+
+ if (Bucket == nullptr)
+ return false;
+
+ ZEN_ASSERT(Bucket != nullptr);
+
+ return Bucket->Get(HashKey, OutValue);
+}
+
+void
+ZenCacheMemoryLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value)
+{
+ CacheBucket* Bucket = nullptr;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ auto it = m_Buckets.find(std::string(InBucket));
+
+ if (it != m_Buckets.end())
+ {
+ Bucket = &it->second;
+ }
+ }
+
+ if (Bucket == nullptr)
+ {
+ // New bucket
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ Bucket = &m_Buckets[std::string(InBucket)];
+ }
+
+ ZEN_ASSERT(Bucket != nullptr);
+
+ // Note that since the underlying IoBuffer is retained, the content type is also
+
+ Bucket->Put(HashKey, Value);
+}
+
+bool
+ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ RwLock::SharedLockScope _(m_bucketLock);
+
+ auto bucketIt = m_cacheMap.find(HashKey);
+
+ if (bucketIt == m_cacheMap.end())
+ {
+ return false;
+ }
+
+ OutValue.Value = bucketIt->second;
+
+ return true;
+}
+
+void
+ZenCacheMemoryLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value)
+{
+ RwLock::ExclusiveLockScope _(m_bucketLock);
+
+ m_cacheMap[HashKey] = Value.Value;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#pragma pack(push)
+#pragma pack(1)
+
+struct DiskLocation
+{
+ uint64_t OffsetAndFlags;
+ uint32_t Size;
+ uint32_t IndexDataSize;
+
+ static const uint64_t kOffsetMask = 0x00FF'ffFF'ffFF'ffFFull;
+ static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
+ static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
+ static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
+
+ static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
+
+ inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
+ inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
+};
+
+struct DiskIndexEntry
+{
+ zen::IoHash Key;
+ DiskLocation Location;
+};
+
+#pragma pack(pop)
+
+static_assert(sizeof(DiskIndexEntry) == 36);
+
+struct ZenCacheDiskLayer::CacheBucket
+{
+ CacheBucket(CasStore& Cas);
+ ~CacheBucket();
+
+ void OpenOrCreate(std::filesystem::path BucketDir);
+
+ bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value);
+ void Flush();
+
+ inline bool IsOk() const { return m_Ok; }
+
+private:
+ CasStore& m_CasStore;
+ std::filesystem::path m_BucketDir;
+ Oid m_BucketId;
+ bool m_Ok = false;
+ uint64_t m_LargeObjectThreshold = 1024;
+
+ BasicFile m_SobsFile;
+ TCasLogFile<DiskIndexEntry> m_SlogFile;
+
+ void BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey);
+ void PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value);
+
+ RwLock m_IndexLock;
+ tsl::robin_map<zen::IoHash, DiskLocation, zen::IoHash::Hasher> m_Index;
+ uint64_t m_WriteCursor = 0;
+};
+
+ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas)
+{
+}
+
+ZenCacheDiskLayer::CacheBucket::~CacheBucket()
+{
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
+{
+ zen::CreateDirectories(BucketDir);
+
+ m_BucketDir = BucketDir;
+
+ std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"};
+ std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"};
+ std::filesystem::path SlogPath{m_BucketDir / "zen.slog"};
+
+ CAtlFile ManifestFile;
+
+ // Try opening existing file first
+
+ bool IsNew = false;
+
+ HRESULT hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, OPEN_EXISTING);
+
+ if (SUCCEEDED(hRes))
+ {
+ ULONGLONG FileSize;
+ ManifestFile.GetSize(FileSize);
+
+ if (FileSize == sizeof(Oid))
+ {
+ hRes = ManifestFile.Read(&m_BucketId, sizeof(Oid));
+
+ if (SUCCEEDED(hRes))
+ {
+ m_Ok = true;
+ }
+ }
+
+ if (!m_Ok)
+ {
+ ManifestFile.Close();
+ }
+ }
+
+ if (!m_Ok)
+ {
+ // This is a new bucket
+
+ hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS);
+
+ if (FAILED(hRes))
+ {
+ ThrowLastError("Failed to create bucket manifest '{}'"_format(ManifestPath));
+ }
+
+ m_BucketId.Generate();
+
+ hRes = ManifestFile.Write(&m_BucketId, sizeof(Oid));
+
+ IsNew = true;
+ }
+
+ // Initialize small object storage related files
+
+ m_SobsFile.Open(SobsPath, IsNew);
+
+ // Open and replay log
+
+ m_SlogFile.Open(SlogPath, IsNew);
+
+ uint64_t MaxFileOffset = 0;
+
+ {
+ // This is not technically necessary but may help future static analysis
+ zen::RwLock::ExclusiveLockScope _(m_IndexLock);
+
+ m_SlogFile.Replay([&](const DiskIndexEntry& Record) {
+ m_Index[Record.Key] = Record.Location;
+
+ MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size);
+ });
+ }
+
+ m_WriteCursor = (MaxFileOffset + 15) & ~15;
+
+ m_Ok = true;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey)
+{
+ char hex[sizeof(HashKey.Hash) * 2];
+ ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, hex);
+
+ Path.Append(m_BucketDir.c_str());
+ Path.Append(L"/");
+ Path.AppendAsciiRange(hex, hex + sizeof(hex));
+}
+
+bool
+ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ if (!m_Ok)
+ {
+ return false;
+ }
+
+ zen::RwLock::SharedLockScope _(m_IndexLock);
+
+ if (auto it = m_Index.find(HashKey); it != m_Index.end())
+ {
+ const DiskLocation& Loc = it->second;
+
+ ZenContentType ContentType = ZenContentType::kBinary;
+
+ if (Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ ContentType = ZenContentType::kCbObject;
+ }
+
+ if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size);
+ OutValue.Value.SetContentType(ContentType);
+
+ return true;
+ }
+ else
+ {
+ _.ReleaseNow();
+
+ WideStringBuilder<128> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+
+ if (zen::IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str()))
+ {
+ OutValue.Value = Data;
+ OutValue.Value.SetContentType(ContentType);
+
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value)
+{
+ if (!m_Ok)
+ {
+ return;
+ }
+
+ if (Value.Value.Size() >= m_LargeObjectThreshold)
+ {
+ return PutLargeObject(HashKey, Value);
+ }
+ else
+ {
+ // Small object put
+
+ uint64_t EntryFlags = 0;
+
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ EntryFlags |= DiskLocation::kStructured;
+ }
+
+ zen::RwLock::ExclusiveLockScope _(m_IndexLock);
+
+ DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags),
+ .Size = gsl::narrow<uint32_t>(Value.Value.Size())};
+
+ m_WriteCursor = zen::RoundUp(m_WriteCursor + Loc.Size, 16);
+
+ if (auto it = m_Index.find(HashKey); it == m_Index.end())
+ {
+ // Previously unknown object
+ m_Index.insert({HashKey, Loc});
+ }
+ else
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ it.value() = Loc;
+ }
+
+ m_SlogFile.Append({.Key = HashKey, .Location = Loc});
+ m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset());
+ }
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::Flush()
+{
+ m_SobsFile.Flush();
+ m_SlogFile.Flush();
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value)
+{
+ zen::WideStringBuilder<128> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
+
+ // TODO: replace this with a more efficient implementation with proper atomic rename
+
+ CAtlTemporaryFile DataFile;
+
+ HRESULT hRes = DataFile.Create(m_BucketDir.c_str());
+
+ if (FAILED(hRes))
+ {
+ zen::ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir));
+ }
+
+ hRes = DataFile.Write(Value.Value.Data(), gsl::narrow<DWORD>(Value.Value.Size()));
+
+ if (FAILED(hRes))
+ {
+ zen::ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size())));
+ }
+
+ hRes = DataFile.Close(DataFilePath.c_str());
+
+ if (FAILED(hRes))
+ {
+ zen::ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(zen::WideToUtf8(DataFilePath)));
+ }
+
+ // Update index
+
+ uint64_t EntryFlags = DiskLocation::kStandaloneFile;
+
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ EntryFlags |= DiskLocation::kStructured;
+ }
+
+ zen::RwLock::ExclusiveLockScope _(m_IndexLock);
+
+ DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0};
+
+ if (auto it = m_Index.find(HashKey); it == m_Index.end())
+ {
+ // Previously unknown object
+ m_Index.insert({HashKey, Loc});
+ }
+ else
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ it.value() = Loc;
+ }
+
+ m_SlogFile.Append({.Key = HashKey, .Location = Loc});
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir) : m_RootDir(RootDir), m_CasStore(Cas)
+{
+}
+
+ZenCacheDiskLayer::~ZenCacheDiskLayer() = default;
+
+bool
+ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ CacheBucket* Bucket = nullptr;
+
+ {
+ zen::RwLock::SharedLockScope _(m_Lock);
+
+ auto it = m_Buckets.find(std::string(InBucket));
+
+ if (it != m_Buckets.end())
+ {
+ Bucket = &it->second;
+ }
+ }
+
+ if (Bucket == nullptr)
+ {
+ // Bucket needs to be opened/created
+
+ zen::RwLock::ExclusiveLockScope _(m_Lock);
+
+ auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore);
+ Bucket = &It.first->second;
+
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= std::string(InBucket);
+
+ Bucket->OpenOrCreate(BucketPath.c_str());
+ }
+
+ ZEN_ASSERT(Bucket != nullptr);
+
+ return Bucket->Get(HashKey, OutValue);
+}
+
+void
+ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value)
+{
+ CacheBucket* Bucket = nullptr;
+
+ {
+ zen::RwLock::SharedLockScope _(m_Lock);
+
+ auto it = m_Buckets.find(std::string(InBucket));
+
+ if (it != m_Buckets.end())
+ {
+ Bucket = &it->second;
+ }
+ }
+
+ if (Bucket == nullptr)
+ {
+ // New bucket needs to be created
+
+ zen::RwLock::ExclusiveLockScope _(m_Lock);
+
+ auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore);
+ Bucket = &It.first->second;
+
+ std::filesystem::path bucketPath = m_RootDir;
+ bucketPath /= std::string(InBucket);
+
+ Bucket->OpenOrCreate(bucketPath.c_str());
+ }
+
+ ZEN_ASSERT(Bucket != nullptr);
+
+ if (Bucket->IsOk())
+ {
+ Bucket->Put(HashKey, Value);
+ }
+}
+
+void
+ZenCacheDiskLayer::Flush()
+{
+ std::vector<CacheBucket*> Buckets;
+ Buckets.reserve(m_Buckets.size());
+
+ {
+ zen::RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ Buckets.push_back(&Kv.second);
+ }
+ }
+
+ for (auto& Bucket : Buckets)
+ {
+ Bucket->Flush();
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore)
+{
+ ZEN_UNUSED(CacheStore);
+}
+
+ZenCacheTracker::~ZenCacheTracker()
+{
+}
+
+void
+ZenCacheTracker::TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey)
+{
+ ZEN_UNUSED(Bucket);
+ ZEN_UNUSED(HashKey);
+}
+
+void
+ZenCacheTracker::Flush()
+{
+}