// Copyright Epic Games, Inc. All Rights Reserved. #include "structuredcachestore.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////// namespace zen { using namespace fmt::literals; ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} { ZEN_INFO("initializing structured cache at '{}'", RootDir); CreateDirectories(RootDir); } ZenCacheStore::~ZenCacheStore() { } bool ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); if (Ok) { ZEN_ASSERT(OutValue.Value.Size()); } if (!Ok) { Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue); if (Ok) { ZEN_ASSERT(OutValue.Value.Size()); } if (Ok && (OutValue.Value.Size() <= m_DiskLayerSizeThreshold)) { m_MemLayer.Put(InBucket, HashKey, OutValue); } } return Ok; } void ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { // Store value and index ZEN_ASSERT(Value.Value.Size()); m_DiskLayer.Put(InBucket, HashKey, Value); if (Value.Value.Size() <= m_DiskLayerSizeThreshold) { m_MemLayer.Put(InBucket, HashKey, Value); } } bool ZenCacheStore::DropBucket(std::string_view Bucket) { ZEN_INFO("dropping bucket '{}'", Bucket); // TODO: should ensure this is done atomically across all layers const bool MemDropped = m_MemLayer.DropBucket(Bucket); const bool DiskDropped = m_DiskLayer.DropBucket(Bucket); const bool AnyDropped = MemDropped || DiskDropped; ZEN_INFO("bucket '{}' was {}", Bucket, AnyDropped ? "dropped" : "not found"); return AnyDropped; } void ZenCacheStore::Flush() { m_DiskLayer.Flush(); } void ZenCacheStore::Scrub(ScrubContext& Ctx) { if (m_LastScrubTime == Ctx.ScrubTimestamp()) { return; } m_LastScrubTime = Ctx.ScrubTimestamp(); m_DiskLayer.Scrub(Ctx); m_MemLayer.Scrub(Ctx); } ////////////////////////////////////////////////////////////////////////// ZenCacheMemoryLayer::ZenCacheMemoryLayer() { } ZenCacheMemoryLayer::~ZenCacheMemoryLayer() { } bool ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); if (it == m_Buckets.end()) { return false; } CacheBucket* Bucket = Bucket = &it->second; _.ReleaseNow(); return Bucket->Get(HashKey, OutValue); } void ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); if (it != m_Buckets.end()) { Bucket = &it->second; } } if (Bucket == nullptr) { // New bucket RwLock::ExclusiveLockScope _(m_Lock); Bucket = &m_Buckets[std::string(InBucket)]; } // Note that since the underlying IoBuffer is retained, the content type is also Bucket->Put(HashKey, Value); } bool ZenCacheMemoryLayer::DropBucket(std::string_view Bucket) { RwLock::ExclusiveLockScope _(m_Lock); return !!m_Buckets.erase(std::string(Bucket)); } void ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { Kv.second.Scrub(Ctx); } } void ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) { std::vector BadHashes; for (auto& Kv : m_cacheMap) { if (Kv.first != IoHash::HashBuffer(Kv.second)) { BadHashes.push_back(Kv.first); } } if (!BadHashes.empty()) { Ctx.ReportBadChunks(BadHashes); } } bool ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { RwLock::SharedLockScope _(m_bucketLock); if (auto bucketIt = m_cacheMap.find(HashKey); bucketIt == m_cacheMap.end()) { return false; } else { OutValue.Value = bucketIt->second; return true; } } void ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { RwLock::ExclusiveLockScope _(m_bucketLock); m_cacheMap[HashKey] = Value.Value; } ////////////////////////////////////////////////////////////////////////// #pragma pack(push) #pragma pack(1) struct DiskLocation { uint64_t OffsetAndFlags; uint32_t Size; uint32_t IndexDataSize; static const uint64_t kOffsetMask = 0x00FF'ffFF'ffFF'ffFFull; static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; static const uint64_t kStructured = 0x4000'0000'0000'0000ull; static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } inline ZenContentType GetContentType() const { ZenContentType ContentType = ZenContentType::kBinary; if (IsFlagSet(DiskLocation::kStructured)) { ContentType = ZenContentType::kCbObject; } return ContentType; } }; struct DiskIndexEntry { IoHash Key; DiskLocation Location; }; #pragma pack(pop) static_assert(sizeof(DiskIndexEntry) == 36); struct ZenCacheDiskLayer::CacheBucket { CacheBucket(CasStore& Cas); ~CacheBucket(); void OpenOrCreate(std::filesystem::path BucketDir); static bool Delete(std::filesystem::path BucketDir); bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); void Put(const IoHash& HashKey, const ZenCacheValue& Value); void Drop(); void Flush(); void Scrub(ScrubContext& Ctx); inline bool IsOk() const { return m_Ok; } private: CasStore& m_CasStore; std::filesystem::path m_BucketDir; Oid m_BucketId; bool m_Ok = false; uint64_t m_LargeObjectThreshold = 64 * 1024; // These files are used to manage storage of small objects for this bucket BasicFile m_SobsFile; TCasLogFile m_SlogFile; RwLock m_IndexLock; tsl::robin_map m_Index; uint64_t m_WriteCursor = 0; void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); void PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value); bool GetStandaloneCacheValue(const IoHash& HashKey, ZenCacheValue& OutValue, const DiskLocation& Loc); bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); }; ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) { } ZenCacheDiskLayer::CacheBucket::~CacheBucket() { } bool ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) { if (std::filesystem::exists(BucketDir)) { DeleteDirectories(BucketDir); return true; } return false; } void ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) { CreateDirectories(BucketDir); m_BucketDir = BucketDir; std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"}; std::filesystem::path SlogPath{m_BucketDir / "zen.slog"}; BasicFile ManifestFile; // Try opening existing manifest file first bool IsNew = false; std::error_code Ec; ManifestFile.Open(ManifestPath, /* IsCreate */ false, Ec); if (!Ec) { uint64_t FileSize = ManifestFile.FileSize(); if (FileSize == sizeof(Oid)) { ManifestFile.Read(&m_BucketId, sizeof(Oid), 0); m_Ok = true; } if (!m_Ok) { ManifestFile.Close(); } } if (!m_Ok) { // No manifest file found, this is a new bucket ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec); if (Ec) { throw std::system_error(Ec, "Failed to create bucket manifest '{}'"_format(ManifestPath)); } m_BucketId.Generate(); ManifestFile.Write(&m_BucketId, sizeof(Oid), /* FileOffset */ 0); IsNew = true; } // Initialize small object storage related files m_SobsFile.Open(SobsPath, IsNew); // Open and replay log m_SlogFile.Open(SlogPath, IsNew); uint64_t MaxFileOffset = 0; if (RwLock::ExclusiveLockScope _(m_IndexLock); m_Index.empty()) { m_SlogFile.Replay([&](const DiskIndexEntry& Record) { m_Index[Record.Key] = Record.Location; MaxFileOffset = std::max(MaxFileOffset, Record.Location.Offset() + Record.Location.Size); }); m_WriteCursor = (MaxFileOffset + 15) & ~15; } m_Ok = true; } void ZenCacheDiskLayer::CacheBucket::BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey) { char HexString[sizeof(HashKey.Hash) * 2]; ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); Path.Append(m_BucketDir.c_str()); Path.Append(L"/blob/"); Path.AppendAsciiRange(HexString, HexString + 3); Path.Append(L"/"); Path.AppendAsciiRange(HexString + 3, HexString + 5); Path.Append(L"/"); Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); } bool ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue) { if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size); OutValue.Value.SetContentType(Loc.GetContentType()); return true; } return false; } bool ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, ZenCacheValue& OutValue, const DiskLocation& Loc) { WideStringBuilder<128> DataFilePath; BuildPath(DataFilePath, HashKey); if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str())) { OutValue.Value = Data; OutValue.Value.SetContentType(Loc.GetContentType()); return true; } return false; } bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { if (!m_Ok) { return false; } RwLock::SharedLockScope _(m_IndexLock); if (auto it = m_Index.find(HashKey); it != m_Index.end()) { const DiskLocation& Loc = it->second; if (GetInlineCacheValue(Loc, OutValue)) { return true; } _.ReleaseNow(); return GetStandaloneCacheValue(HashKey, OutValue, Loc); } return false; } void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { if (!m_Ok) { return; } if (Value.Value.Size() >= m_LargeObjectThreshold) { return PutLargeObject(HashKey, Value); } else { // Small object put uint64_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } RwLock::ExclusiveLockScope _(m_IndexLock); DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags), .Size = gsl::narrow(Value.Value.Size())}; m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size, 16); if (auto it = m_Index.find(HashKey); it == m_Index.end()) { // Previously unknown object m_Index.insert({HashKey, Loc}); } else { // TODO: should check if write is idempotent and bail out if it is? it.value() = Loc; } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset()); } } void ZenCacheDiskLayer::CacheBucket::Drop() { // TODO: add error handling m_SobsFile.Close(); m_SlogFile.Close(); DeleteDirectories(m_BucketDir); } void ZenCacheDiskLayer::CacheBucket::Flush() { m_SobsFile.Flush(); m_SlogFile.Flush(); } void ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { std::vector StandaloneFiles; std::vector BadChunks; std::vector BadStandaloneChunks; { RwLock::SharedLockScope _(m_IndexLock); for (auto& Kv : m_Index) { const IoHash& Hash = Kv.first; const DiskLocation& Loc = Kv.second; ZenCacheValue Value; if (!GetInlineCacheValue(Loc, Value)) { ZEN_ASSERT(Loc.IsFlagSet(DiskLocation::kStandaloneFile)); StandaloneFiles.push_back({.Key = Hash, .Location = Loc}); } else { if (GetStandaloneCacheValue(Hash, Value, Loc)) { // Hash contents const IoHash ComputedHash = HashBuffer(Value.Value); if (ComputedHash != Hash) { BadChunks.push_back(Hash); } } else { // Non-existent } } } } if (Ctx.RunRecovery()) { // Clean out bad chunks } if (!BadChunks.empty()) { Ctx.ReportBadChunks(BadChunks); } } void ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value) { WideStringBuilder<128> DataFilePath; BuildPath(DataFilePath, HashKey); TemporaryFile DataFile; std::error_code Ec; DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); if (Ec) { throw std::system_error(Ec, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); } DataFile.WriteAll(Value.Value, Ec); if (Ec) { throw std::system_error(Ec, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); } // Move file into place (atomically) DataFile.MoveTemporaryIntoPlace(DataFilePath.c_str(), Ec); if (Ec) { std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path(); CreateDirectories(ParentPath); DataFile.MoveTemporaryIntoPlace(DataFilePath.c_str(), Ec); if (Ec) { throw std::system_error(Ec, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath))); } } // Update index uint64_t EntryFlags = DiskLocation::kStandaloneFile; if (Value.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } RwLock::ExclusiveLockScope _(m_IndexLock); DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0}; if (auto it = m_Index.find(HashKey); it == m_Index.end()) { // Previously unknown object m_Index.insert({HashKey, Loc}); } else { // TODO: should check if write is idempotent and bail out if it is? it.value() = Loc; } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); } ////////////////////////////////////////////////////////////////////////// ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir) : m_RootDir(RootDir), m_CasStore(Cas) { } ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; bool ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); if (it != m_Buckets.end()) { Bucket = &it->second; } } if (Bucket == nullptr) { // Bucket needs to be opened/created RwLock::ExclusiveLockScope _(m_Lock); if (auto it = m_Buckets.find(std::string(InBucket)); it != m_Buckets.end()) { Bucket = &it->second; } else { auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); Bucket = &It.first->second; std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); Bucket->OpenOrCreate(BucketPath.c_str()); } } ZEN_ASSERT(Bucket != nullptr); return Bucket->Get(HashKey, OutValue); } void ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); if (it != m_Buckets.end()) { Bucket = &it->second; } } if (Bucket == nullptr) { // New bucket needs to be created RwLock::ExclusiveLockScope _(m_Lock); if (auto it = m_Buckets.find(std::string(InBucket)); it != m_Buckets.end()) { Bucket = &it->second; } else { auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); Bucket = &It.first->second; std::filesystem::path bucketPath = m_RootDir; bucketPath /= std::string(InBucket); Bucket->OpenOrCreate(bucketPath.c_str()); } } ZEN_ASSERT(Bucket != nullptr); if (Bucket->IsOk()) { Bucket->Put(HashKey, Value); } } bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { RwLock::ExclusiveLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); if (it != m_Buckets.end()) { CacheBucket* Bucket = &it->second; Bucket->Drop(); m_Buckets.erase(it); return true; } std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); return CacheBucket::Delete(BucketPath); } void ZenCacheDiskLayer::Flush() { std::vector Buckets; Buckets.reserve(m_Buckets.size()); { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { Buckets.push_back(&Kv.second); } } for (auto& Bucket : Buckets) { Bucket->Flush(); } } void ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { Kv.second.Scrub(Ctx); } } ////////////////////////////////////////////////////////////////////////// ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore) { ZEN_UNUSED(CacheStore); } ZenCacheTracker::~ZenCacheTracker() { } void ZenCacheTracker::TrackAccess(std::string_view Bucket, const IoHash& HashKey) { ZEN_UNUSED(Bucket); ZEN_UNUSED(HashKey); } void ZenCacheTracker::Flush() { } } // namespace zen