diff options
| author | Stefan Boberg <[email protected]> | 2021-05-25 09:54:09 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-05-25 09:54:09 +0200 |
| commit | 882e93e4786f9e67e0edf6c276b16bb40848bae9 (patch) | |
| tree | c5d4c45679c676c6aeb804c7601f43340b78ea0b /zenserver/cache/structuredcachestore.cpp | |
| parent | Updated structured cache description (diff) | |
| parent | Compile out all rocksdb code for a smaller binary (diff) | |
| download | zen-882e93e4786f9e67e0edf6c276b16bb40848bae9.tar.xz zen-882e93e4786f9e67e0edf6c276b16bb40848bae9.zip | |
Merged from origin/main
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 625 |
1 files changed, 625 insertions, 0 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp new file mode 100644 index 000000000..764bce93e --- /dev/null +++ b/zenserver/cache/structuredcachestore.cpp @@ -0,0 +1,625 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "structuredcachestore.h" + +#include <zencore/except.h> +#include <zencore/windows.h> + +#include <zencore/compactbinary.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/string.h> +#include <zencore/thread.h> +#include <zenstore/basicfile.h> +#include <zenstore/cas.h> +#include <zenstore/caslog.h> + +#include <fmt/core.h> +#include <spdlog/spdlog.h> +#include <concepts> +#include <filesystem> +#include <gsl/gsl-lite.hpp> +#include <unordered_map> + +#include <atlfile.h> + +using namespace zen; +using namespace fmt::literals; + +////////////////////////////////////////////////////////////////////////// + +ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} +{ + zen::CreateDirectories(RootDir); +} + +ZenCacheStore::~ZenCacheStore() +{ +} + +bool +ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); + + if (Ok) + { + ZEN_ASSERT(OutValue.Value.Size()); + } + + if (!Ok) + { + Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); + + if (Ok) + { + ZEN_ASSERT(OutValue.Value.Size()); + } + + if (Ok && (OutValue.Value.Size() <= m_DiskLayerSizeThreshold)) + { + m_MemLayer.Put(InBucket, HashKey, OutValue); + } + } + + return Ok; +} + +void +ZenCacheStore::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + // Store value and index + + ZEN_ASSERT(Value.Value.Size()); + + m_DiskLayer.Put(InBucket, HashKey, Value); + + if (Value.Value.Size() <= m_DiskLayerSizeThreshold) + { + m_MemLayer.Put(InBucket, HashKey, Value); + } +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheMemoryLayer::ZenCacheMemoryLayer() +{ +} + +ZenCacheMemoryLayer::~ZenCacheMemoryLayer() +{ +} + +bool +ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto it = m_Buckets.find(std::string(InBucket)); + + if (it != m_Buckets.end()) + { + Bucket = &it->second; + } + } + + if (Bucket == nullptr) + return false; + + ZEN_ASSERT(Bucket != nullptr); + + return Bucket->Get(HashKey, OutValue); +} + +void +ZenCacheMemoryLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + CacheBucket* Bucket = nullptr; + + { + RwLock::SharedLockScope _(m_Lock); + + auto it = m_Buckets.find(std::string(InBucket)); + + if (it != m_Buckets.end()) + { + Bucket = &it->second; + } + } + + if (Bucket == nullptr) + { + // New bucket + + RwLock::ExclusiveLockScope _(m_Lock); + + Bucket = &m_Buckets[std::string(InBucket)]; + } + + ZEN_ASSERT(Bucket != nullptr); + + // Note that since the underlying IoBuffer is retained, the content type is also + + Bucket->Put(HashKey, Value); +} + +bool +ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + RwLock::SharedLockScope _(m_bucketLock); + + auto bucketIt = m_cacheMap.find(HashKey); + + if (bucketIt == m_cacheMap.end()) + { + return false; + } + + OutValue.Value = bucketIt->second; + + return true; +} + +void +ZenCacheMemoryLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + RwLock::ExclusiveLockScope _(m_bucketLock); + + m_cacheMap[HashKey] = Value.Value; +} + +////////////////////////////////////////////////////////////////////////// + +#pragma pack(push) +#pragma pack(1) + +struct DiskLocation +{ + uint64_t OffsetAndFlags; + uint32_t Size; + uint32_t IndexDataSize; + + static const uint64_t kOffsetMask = 0x00FF'ffFF'ffFF'ffFFull; + static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; + static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; + static const uint64_t kStructured = 0x4000'0000'0000'0000ull; + + static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } + + inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } + inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } +}; + +struct DiskIndexEntry +{ + zen::IoHash Key; + DiskLocation Location; +}; + +#pragma pack(pop) + +static_assert(sizeof(DiskIndexEntry) == 36); + +struct ZenCacheDiskLayer::CacheBucket +{ + CacheBucket(CasStore& Cas); + ~CacheBucket(); + + void OpenOrCreate(std::filesystem::path BucketDir); + + bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); + void Flush(); + + inline bool IsOk() const { return m_Ok; } + +private: + CasStore& m_CasStore; + std::filesystem::path m_BucketDir; + Oid m_BucketId; + bool m_Ok = false; + uint64_t m_LargeObjectThreshold = 1024; + + BasicFile m_SobsFile; + TCasLogFile<DiskIndexEntry> m_SlogFile; + + void BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey); + void PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value); + + RwLock m_IndexLock; + tsl::robin_map<zen::IoHash, DiskLocation, zen::IoHash::Hasher> m_Index; + uint64_t m_WriteCursor = 0; +}; + +ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) +{ +} + +ZenCacheDiskLayer::CacheBucket::~CacheBucket() +{ +} + +void +ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) +{ + zen::CreateDirectories(BucketDir); + + m_BucketDir = BucketDir; + + std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; + std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"}; + std::filesystem::path SlogPath{m_BucketDir / "zen.slog"}; + + CAtlFile ManifestFile; + + // Try opening existing file first + + bool IsNew = false; + + HRESULT hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, OPEN_EXISTING); + + if (SUCCEEDED(hRes)) + { + ULONGLONG FileSize; + ManifestFile.GetSize(FileSize); + + if (FileSize == sizeof(Oid)) + { + hRes = ManifestFile.Read(&m_BucketId, sizeof(Oid)); + + if (SUCCEEDED(hRes)) + { + m_Ok = true; + } + } + + if (!m_Ok) + { + ManifestFile.Close(); + } + } + + if (!m_Ok) + { + // This is a new bucket + + hRes = ManifestFile.Create(ManifestPath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS); + + if (FAILED(hRes)) + { + ThrowLastError("Failed to create bucket manifest '{}'"_format(ManifestPath)); + } + + m_BucketId.Generate(); + + hRes = ManifestFile.Write(&m_BucketId, sizeof(Oid)); + + IsNew = true; + } + + // Initialize small object storage related files + + m_SobsFile.Open(SobsPath, IsNew); + + // Open and replay log + + m_SlogFile.Open(SlogPath, IsNew); + + uint64_t MaxFileOffset = 0; + + { + // This is not technically necessary but may help future static analysis + zen::RwLock::ExclusiveLockScope _(m_IndexLock); + + m_SlogFile.Replay([&](const DiskIndexEntry& Record) { + m_Index[Record.Key] = Record.Location; + + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size); + }); + } + + m_WriteCursor = (MaxFileOffset + 15) & ~15; + + m_Ok = true; +} + +void +ZenCacheDiskLayer::CacheBucket::BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey) +{ + char hex[sizeof(HashKey.Hash) * 2]; + ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, hex); + + Path.Append(m_BucketDir.c_str()); + Path.Append(L"/"); + Path.AppendAsciiRange(hex, hex + sizeof(hex)); +} + +bool +ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + if (!m_Ok) + { + return false; + } + + zen::RwLock::SharedLockScope _(m_IndexLock); + + if (auto it = m_Index.find(HashKey); it != m_Index.end()) + { + const DiskLocation& Loc = it->second; + + ZenContentType ContentType = ZenContentType::kBinary; + + if (Loc.IsFlagSet(DiskLocation::kStructured)) + { + ContentType = ZenContentType::kCbObject; + } + + if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size); + OutValue.Value.SetContentType(ContentType); + + return true; + } + else + { + _.ReleaseNow(); + + WideStringBuilder<128> DataFilePath; + BuildPath(DataFilePath, HashKey); + + if (zen::IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str())) + { + OutValue.Value = Data; + OutValue.Value.SetContentType(ContentType); + + return true; + } + } + } + + return false; +} + +void +ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + if (!m_Ok) + { + return; + } + + if (Value.Value.Size() >= m_LargeObjectThreshold) + { + return PutLargeObject(HashKey, Value); + } + else + { + // Small object put + + uint64_t EntryFlags = 0; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + + zen::RwLock::ExclusiveLockScope _(m_IndexLock); + + DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags), + .Size = gsl::narrow<uint32_t>(Value.Value.Size())}; + + m_WriteCursor = zen::RoundUp(m_WriteCursor + Loc.Size, 16); + + if (auto it = m_Index.find(HashKey); it == m_Index.end()) + { + // Previously unknown object + m_Index.insert({HashKey, Loc}); + } + else + { + // TODO: should check if write is idempotent and bail out if it is? + it.value() = Loc; + } + + m_SlogFile.Append({.Key = HashKey, .Location = Loc}); + m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset()); + } +} + +void +ZenCacheDiskLayer::CacheBucket::Flush() +{ + m_SobsFile.Flush(); + m_SlogFile.Flush(); +} + +void +ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + zen::WideStringBuilder<128> DataFilePath; + BuildPath(DataFilePath, HashKey); + + // TODO: replace this with a more efficient implementation with proper atomic rename + + CAtlTemporaryFile DataFile; + + HRESULT hRes = DataFile.Create(m_BucketDir.c_str()); + + if (FAILED(hRes)) + { + zen::ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); + } + + hRes = DataFile.Write(Value.Value.Data(), gsl::narrow<DWORD>(Value.Value.Size())); + + if (FAILED(hRes)) + { + zen::ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); + } + + hRes = DataFile.Close(DataFilePath.c_str()); + + if (FAILED(hRes)) + { + zen::ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(zen::WideToUtf8(DataFilePath))); + } + + // Update index + + uint64_t EntryFlags = DiskLocation::kStandaloneFile; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + + zen::RwLock::ExclusiveLockScope _(m_IndexLock); + + DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0}; + + if (auto it = m_Index.find(HashKey); it == m_Index.end()) + { + // Previously unknown object + m_Index.insert({HashKey, Loc}); + } + else + { + // TODO: should check if write is idempotent and bail out if it is? + it.value() = Loc; + } + + m_SlogFile.Append({.Key = HashKey, .Location = Loc}); +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir) : m_RootDir(RootDir), m_CasStore(Cas) +{ +} + +ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; + +bool +ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +{ + CacheBucket* Bucket = nullptr; + + { + zen::RwLock::SharedLockScope _(m_Lock); + + auto it = m_Buckets.find(std::string(InBucket)); + + if (it != m_Buckets.end()) + { + Bucket = &it->second; + } + } + + if (Bucket == nullptr) + { + // Bucket needs to be opened/created + + zen::RwLock::ExclusiveLockScope _(m_Lock); + + auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); + Bucket = &It.first->second; + + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= std::string(InBucket); + + Bucket->OpenOrCreate(BucketPath.c_str()); + } + + ZEN_ASSERT(Bucket != nullptr); + + return Bucket->Get(HashKey, OutValue); +} + +void +ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +{ + CacheBucket* Bucket = nullptr; + + { + zen::RwLock::SharedLockScope _(m_Lock); + + auto it = m_Buckets.find(std::string(InBucket)); + + if (it != m_Buckets.end()) + { + Bucket = &it->second; + } + } + + if (Bucket == nullptr) + { + // New bucket needs to be created + + zen::RwLock::ExclusiveLockScope _(m_Lock); + + auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); + Bucket = &It.first->second; + + std::filesystem::path bucketPath = m_RootDir; + bucketPath /= std::string(InBucket); + + Bucket->OpenOrCreate(bucketPath.c_str()); + } + + ZEN_ASSERT(Bucket != nullptr); + + if (Bucket->IsOk()) + { + Bucket->Put(HashKey, Value); + } +} + +void +ZenCacheDiskLayer::Flush() +{ + std::vector<CacheBucket*> Buckets; + Buckets.reserve(m_Buckets.size()); + + { + zen::RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + Buckets.push_back(&Kv.second); + } + } + + for (auto& Bucket : Buckets) + { + Bucket->Flush(); + } +} + +////////////////////////////////////////////////////////////////////////// + +ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore) +{ + ZEN_UNUSED(CacheStore); +} + +ZenCacheTracker::~ZenCacheTracker() +{ +} + +void +ZenCacheTracker::TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey) +{ + ZEN_UNUSED(Bucket); + ZEN_UNUSED(HashKey); +} + +void +ZenCacheTracker::Flush() +{ +} |