diff options
| author | Stefan Boberg <[email protected]> | 2023-12-11 13:09:03 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2023-12-11 13:09:03 +0100 |
| commit | 93afeddbc7a5b5df390a29407f5515acd5a70fc1 (patch) | |
| tree | 6f85ee551aabe20dece64a750c0b2d5d2c5d2d5d /src/zenserver/cache/cachedisklayer.cpp | |
| parent | removed unnecessary SHA1 references (diff) | |
| parent | Make sure that PathFromHandle don't hide true error when throwing exceptions ... (diff) | |
| download | zen-93afeddbc7a5b5df390a29407f5515acd5a70fc1.tar.xz zen-93afeddbc7a5b5df390a29407f5515acd5a70fc1.zip | |
Merge branch 'main' of https://github.com/EpicGames/zen
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 2728 |
1 files changed, 1746 insertions, 982 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 9bb75480e..0987cd0f1 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -14,6 +14,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zencore/xxhash.h> +#include <zenutil/workerpools.h> #include <future> @@ -25,12 +26,6 @@ namespace { #pragma pack(push) #pragma pack(1) - // We use this to indicate if a on disk bucket needs wiping - // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scrable the references - // to block items. - // See: https://github.com/EpicGames/zen/pull/299 - static const uint32_t CurrentDiskBucketVersion = 1; - struct CacheBucketIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; @@ -48,23 +43,94 @@ namespace { { return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } + + bool IsValid() const + { + if (Magic != ExpectedMagic) + { + return false; + } + + if (Checksum != ComputeChecksum(*this)) + { + return false; + } + + if (PayloadAlignment == 0) + { + return false; + } + + return true; + } }; static_assert(sizeof(CacheBucketIndexHeader) == 32); + struct BucketMetaHeader + { + static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta'; + static constexpr uint32_t Version1 = 1; + static constexpr uint32_t CurrentVersion = Version1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t Padding = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const BucketMetaHeader& Header) + { + return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + + bool IsValid() const + { + if (Magic != ExpectedMagic) + { + return false; + } + + if (Checksum != ComputeChecksum(*this)) + { + return false; + } + + if (Padding != 0) + { + return false; + } + + return true; + } + }; + + static_assert(sizeof(BucketMetaHeader) == 32); + #pragma pack(pop) + ////////////////////////////////////////////////////////////////////////// + + template<typename T> + void Reset(T& V) + { + T Tmp; + V.swap(Tmp); + } + const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; + const char* MetaExtension = ".meta"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + IndexExtension); } - std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { - return BucketDir / (BucketName + ".tmp"); + return BucketDir / (BucketName + MetaExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) @@ -72,6 +138,12 @@ namespace { return BucketDir / (BucketName + LogExtension); } + std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + ZEN_UNUSED(BucketName); + return BucketDir / "zen_manifest"; + } + bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) @@ -140,26 +212,458 @@ namespace { } // namespace namespace fs = std::filesystem; +using namespace std::literals; -static CbObject -LoadCompactBinaryObject(const fs::path& Path) +class BucketManifestSerializer { - FileContents Result = ReadFile(Path); + using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; + using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData; + + using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; + using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; + +public: + // We use this to indicate if a on disk bucket needs wiping + // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references + // to block items. + // See: https://github.com/EpicGames/zen/pull/299 + static inline const uint32_t CurrentDiskBucketVersion = 1; - if (!Result.ErrorCode) + bool Open(std::filesystem::path ManifestPath) { - IoBuffer Buffer = Result.Flatten(); - if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + Manifest = LoadCompactBinaryObject(ManifestPath); + return !!Manifest; + } + + Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); } + + bool IsCurrentVersion(uint32_t& OutVersion) const + { + OutVersion = Manifest["Version"sv].AsUInt32(0); + return OutVersion == CurrentDiskBucketVersion; + } + + void ParseManifest(RwLock::ExclusiveLockScope& BucketLock, + ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path ManifestPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads); + + Oid GenerateNewManifest(std::filesystem::path ManifestPath); + + IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount); + uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); } + void WriteSidecarFile(RwLock::SharedLockScope& BucketLock, + const std::filesystem::path& SidecarPath, + uint64_t SnapshotLogPosition, + const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + const std::vector<AccessTime>& AccessTimes, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas); + bool ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock, + ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path SidecarPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads); + + IoBuffer MakeManifest(const Oid& BucketId, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector<AccessTime>&& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>&& Payloads, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>&& MetaDatas); + + CbObject Manifest; + +private: + CbObject LoadCompactBinaryObject(const fs::path& Path) + { + FileContents Result = ReadFile(Path); + + if (!Result.ErrorCode) { - return LoadCompactBinaryObject(Buffer); + IoBuffer Buffer = Result.Flatten(); + if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + { + return zen::LoadCompactBinaryObject(Buffer); + } } + + return CbObject(); } - return CbObject(); + uint64_t m_ManifestEntryCount = 0; + + struct ManifestData + { + IoHash Key; // 20 + AccessTime Timestamp; // 4 + IoHash RawHash; // 20 + uint32_t Padding_0; // 4 + size_t RawSize; // 8 + uint64_t Padding_1; // 8 + }; + + static_assert(sizeof(ManifestData) == 64); +}; + +void +BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& BucketLock, + ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path ManifestPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads) +{ + if (Manifest["UsingMetaFile"sv].AsBool()) + { + ReadSidecarFile(BucketLock, Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads); + + return; + } + + ZEN_TRACE_CPU("Z$::ParseManifest"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + + const uint64_t Count = Manifest["Count"sv].AsUInt64(0); + std::vector<PayloadIndex> KeysIndexes; + KeysIndexes.reserve(Count); + + CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); + for (CbFieldView& KeyView : KeyArray) + { + if (auto It = Index.find(KeyView.AsHash()); It != Index.end()) + { + KeysIndexes.push_back(It.value()); + } + else + { + KeysIndexes.push_back(PayloadIndex()); + } + } + + size_t KeyIndexOffset = 0; + CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); + for (CbFieldView& TimeStampView : TimeStampArray) + { + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex) + { + AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + } + } + + KeyIndexOffset = 0; + CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); + CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); + if (RawHashArray.Num() == RawSizeArray.Num()) + { + auto RawHashIt = RawHashArray.CreateViewIterator(); + auto RawSizeIt = RawSizeArray.CreateViewIterator(); + while (RawHashIt != CbFieldViewIterator()) + { + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + + if (KeyIndex) + { + uint64_t RawSize = RawSizeIt.AsUInt64(); + IoHash RawHash = RawHashIt.AsHash(); + if (RawSize != 0 || RawHash != IoHash::Zero) + { + BucketPayload& Payload = Payloads[KeyIndex]; + Bucket.SetMetaData(BucketLock, Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); + } + } + + RawHashIt++; + RawSizeIt++; + } + } + else + { + ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); + } +} + +Oid +BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath) +{ + const Oid BucketId = Oid::NewOid(); + + CbObjectWriter Writer; + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + Manifest = Writer.Save(); + WriteFile(ManifestPath, Manifest.GetBuffer().AsIoBuffer()); + + return BucketId; +} + +IoBuffer +BucketManifestSerializer::MakeManifest(const Oid& BucketId, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector<AccessTime>&& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>&& Payloads, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>&& MetaDatas) +{ + using namespace std::literals; + + ZEN_TRACE_CPU("Z$::MakeManifest"); + + size_t ItemCount = Index.size(); + + // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth + // And we don't need to reallocate the underlying buffer in almost every case + const size_t EstimatedSizePerItem = 54u; + const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); + CbObjectWriter Writer(ReserveSize); + + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + + if (!Index.empty()) + { + Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size())); + Writer.BeginArray("Keys"sv); + for (auto& Kv : Index) + { + const IoHash& Key = Kv.first; + Writer.AddHash(Key); + } + Writer.EndArray(); + + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : Index) + { + GcClock::Tick AccessTime = AccessTimes[Kv.second]; + Writer.AddInteger(AccessTime); + } + Writer.EndArray(); + + if (!MetaDatas.empty()) + { + Writer.BeginArray("RawHash"sv); + for (auto& Kv : Index) + { + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; + if (Payload.MetaData) + { + Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); + } + else + { + Writer.AddHash(IoHash::Zero); + } + } + Writer.EndArray(); + + Writer.BeginArray("RawSize"sv); + for (auto& Kv : Index) + { + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; + if (Payload.MetaData) + { + Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); + } + else + { + Writer.AddInteger(0); + } + } + Writer.EndArray(); + } + } + + Manifest = Writer.Save(); + return Manifest.GetBuffer().AsIoBuffer(); +} + +IoBuffer +BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount) +{ + m_ManifestEntryCount = EntryCount; + + CbObjectWriter Writer; + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + Writer << "Count"sv << EntryCount; + Writer << "UsingMetaFile"sv << true; + Manifest = Writer.Save(); + + return Manifest.GetBuffer().AsIoBuffer(); +} + +bool +BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock, + ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path SidecarPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads) +{ + ZEN_ASSERT(AccessTimes.size() == Payloads.size()); + + std::error_code Ec; + + BasicFile SidecarFile; + SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to open sidecar file '{}'", SidecarPath)); + } + + uint64_t FileSize = SidecarFile.FileSize(); + + auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); }); + + if (FileSize < sizeof(BucketMetaHeader)) + { + return false; + } + + BasicFileBuffer Sidecar(SidecarFile, 128 * 1024); + + BucketMetaHeader Header; + Sidecar.Read(&Header, sizeof Header, 0); + + if (!Header.IsValid()) + { + return false; + } + + if (Header.Version != BucketMetaHeader::Version1) + { + return false; + } + + const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData); + if (Header.EntryCount > ExpectedEntryCount) + { + return false; + } + + InvalidGuard.Dismiss(); + + uint64_t RemainingEntryCount = ExpectedEntryCount; + uint64_t EntryCount = 0; + uint64_t CurrentReadOffset = sizeof(Header); + + while (RemainingEntryCount--) + { + const ManifestData* Entry = Sidecar.MakeView<ManifestData>(CurrentReadOffset); + CurrentReadOffset += sizeof(ManifestData); + + if (auto It = Index.find(Entry->Key); It != Index.end()) + { + PayloadIndex PlIndex = It.value(); + + ZEN_ASSERT(size_t(PlIndex) <= Payloads.size()); + + ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex]; + + AccessTimes[PlIndex] = Entry->Timestamp; + + if (Entry->RawSize && Entry->RawHash != IoHash::Zero) + { + Bucket.SetMetaData(BucketLock, PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash}); + } + } + + EntryCount++; + } + + ZEN_ASSERT(EntryCount == ExpectedEntryCount); + + return true; +} + +void +BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&, + const std::filesystem::path& SidecarPath, + uint64_t SnapshotLogPosition, + const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + const std::vector<AccessTime>& AccessTimes, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas) +{ + BucketMetaHeader Header; + Header.EntryCount = m_ManifestEntryCount; + Header.LogPosition = SnapshotLogPosition; + Header.Checksum = Header.ComputeChecksum(Header); + + std::error_code Ec; + + TemporaryFile SidecarFile; + SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath())); + } + + SidecarFile.Write(&Header, sizeof Header, 0); + + // TODO: make this batching for better performance + { + uint64_t WriteOffset = sizeof Header; + + // BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024); + + std::vector<ManifestData> ManifestDataBuffer; + const size_t MaxManifestDataBufferCount = Min(Index.size(), 4096u); // 256 Kb + ManifestDataBuffer.reserve(MaxManifestDataBufferCount); + for (auto& Kv : Index) + { + const IoHash& Key = Kv.first; + const PayloadIndex PlIndex = Kv.second; + + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0; + + if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData) + { + RawHash = MetaDatas[MetaIndex].RawHash; + RawSize = MetaDatas[MetaIndex].RawSize; + } + + ManifestDataBuffer.emplace_back(ManifestData{.Key = Key, + .Timestamp = AccessTimes[PlIndex], + .RawHash = RawHash, + .Padding_0 = 0, + .RawSize = RawSize, + .Padding_1 = 0}); + if (ManifestDataBuffer.size() == MaxManifestDataBufferCount) + { + const uint64_t WriteSize = sizeof(ManifestData) * ManifestDataBuffer.size(); + SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset); + WriteOffset += WriteSize; + ManifestDataBuffer.clear(); + ManifestDataBuffer.reserve(MaxManifestDataBufferCount); + } + } + if (ManifestDataBuffer.size() > 0) + { + SidecarFile.Write(ManifestDataBuffer.data(), sizeof(ManifestData) * ManifestDataBuffer.size(), WriteOffset); + } + } + + SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath)); + } } ////////////////////////////////////////////////////////////////////////// +static const float IndexMinLoadFactor = 0.2f; +static const float IndexMaxLoadFactor = 0.7f; + ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, std::string BucketName, @@ -170,6 +674,9 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, , m_Configuration(Config) , m_BucketId(Oid::Zero) { + m_Index.min_load_factor(IndexMinLoadFactor); + m_Index.max_load_factor(IndexMaxLoadFactor); + if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) { const uint64_t LegacyOverrideSize = 16 * 1024 * 1024; @@ -192,6 +699,10 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo using namespace std::literals; ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate"); + ZEN_ASSERT(m_IsFlushing.load()); + + // We want to take the lock here since we register as a GC referencer a construction + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); @@ -200,169 +711,72 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo CreateDirectories(m_BucketDir); - std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; + std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); bool IsNew = false; - CbObject Manifest = LoadCompactBinaryObject(ManifestPath); + BucketManifestSerializer ManifestReader; - if (Manifest) + if (ManifestReader.Open(ManifestPath)) { - m_BucketId = Manifest["BucketId"sv].AsObjectId(); + m_BucketId = ManifestReader.GetBucketId(); if (m_BucketId == Oid::Zero) { return false; } - const uint32_t Version = Manifest["Version"sv].AsUInt32(0); - if (Version != CurrentDiskBucketVersion) + + uint32_t Version = 0; + if (ManifestReader.IsCurrentVersion(/* out */ Version) == false) { - ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion); + ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", + BucketDir, + Version, + BucketManifestSerializer::CurrentDiskBucketVersion); IsNew = true; } } else if (AllowCreate) { - m_BucketId.Generate(); - - CbObjectWriter Writer; - Writer << "BucketId"sv << m_BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; - Manifest = Writer.Save(); - WriteFile(m_BucketDir / "zen_manifest", Manifest.GetBuffer().AsIoBuffer()); - IsNew = true; + m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath); + IsNew = true; } else { return false; } - OpenLog(IsNew); - - if (!IsNew) - { - ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate::Manifest"); - - Stopwatch Timer; - const auto _ = - MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - - const uint64_t kInvalidIndex = ~(0ull); + InitializeIndexFromDisk(IndexLock, IsNew); - const uint64_t Count = Manifest["Count"sv].AsUInt64(0); - if (Count != 0) - { - std::vector<size_t> KeysIndexes; - KeysIndexes.reserve(Count); - CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); - for (CbFieldView& KeyView : KeyArray) - { - if (auto It = m_Index.find(KeyView.AsHash()); It != m_Index.end()) - { - KeysIndexes.push_back(It.value()); - } - else - { - KeysIndexes.push_back(kInvalidIndex); - } - } - size_t KeyIndexOffset = 0; - CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); - for (CbFieldView& TimeStampView : TimeStampArray) - { - const size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; - if (KeyIndex != kInvalidIndex) - { - m_AccessTimes[KeyIndex] = TimeStampView.AsInt64(); - } - } - KeyIndexOffset = 0; - CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); - CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); - if (RawHashArray.Num() == RawSizeArray.Num()) - { - auto RawHashIt = RawHashArray.CreateViewIterator(); - auto RawSizeIt = RawSizeArray.CreateViewIterator(); - while (RawHashIt != CbFieldViewIterator()) - { - const size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; - - if (KeyIndex != kInvalidIndex) - { - uint64_t RawSize = RawSizeIt.AsUInt64(); - IoHash RawHash = RawHashIt.AsHash(); - if (RawSize != 0 || RawHash != IoHash::Zero) - { - BucketPayload& Payload = m_Payloads[KeyIndex]; - SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); - } - } - - RawHashIt++; - RawSizeIt++; - } - } - else - { - ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); - } - } - - ////// Legacy format read - { - for (CbFieldView Entry : Manifest["Timestamps"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); - } - } - for (CbFieldView Entry : Manifest["RawInfo"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - const IoHash RawHash = Obj["RawHash"sv].AsHash(); - const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); - - if (RawHash == IoHash::Zero || RawSize == 0) - { - ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); - } + auto _ = MakeGuard([&]() { + // We are now initialized, allow flushing when we exit + m_IsFlushing.store(false); + }); - BucketPayload& Payload = m_Payloads[EntryIndex]; - SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); - } - } - } + if (IsNew) + { + return true; } + ManifestReader.ParseManifest(IndexLock, *this, ManifestPath, m_Index, m_AccessTimes, m_Payloads); + return true; } void -ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeIndexSnapshot"); + ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot"); - uint64_t LogCount = m_SlogFile.GetLogCount(); + const uint64_t LogCount = m_SlogFile.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { + const uint64_t EntryCount = m_Index.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", m_BucketDir, EntryCount, @@ -371,42 +785,11 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t() namespace fs = std::filesystem; - fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(STmpIndexPath)) - { - std::error_code Ec; - if (!fs::remove(STmpIndexPath, Ec) || Ec) - { - ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", STmpIndexPath, Ec.message()); - return; - } - } + fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); try { - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, STmpIndexPath); - } - - // Write the current state of the location map to a new index state - std::vector<DiskIndexEntry> Entries; - Entries.resize(m_Index.size()); - - { - uint64_t EntryIndex = 0; - for (auto& Entry : m_Index) - { - DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Location = m_Payloads[Entry.second].Location; - } - } - - uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + Entries.size() * sizeof(DiskIndexEntry); + const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) @@ -426,185 +809,230 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t() fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize))); } - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); - CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), - .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; + TemporaryFile ObjectIndexFile; + std::error_code Ec; + ObjectIndexFile.CreateTemporary(m_BucketDir, Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); + } + + { + // This is in a separate scope just to ensure IndexWriter goes out + // of scope before the file is flushed/closed, in order to ensure + // all data is written to the file + BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024); - Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); - ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); - ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); - ObjectIndexFile.Flush(); - ObjectIndexFile.Close(); - EntryCount = Entries.size(); - m_LogFlushPosition = LogCount; - } - catch (std::exception& Err) - { - ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); + CacheBucketIndexHeader Header = {.EntryCount = EntryCount, + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; + + Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); + IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + + uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader); - // Restore any previous snapshot + for (auto& Entry : m_Index) + { + DiskIndexEntry IndexEntry; + IndexEntry.Key = Entry.first; + IndexEntry.Location = m_Payloads[Entry.second].Location; + IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset); + + IndexWriteOffset += sizeof(DiskIndexEntry); + } + + IndexWriter.Flush(); + } - if (fs::is_regular_file(STmpIndexPath)) + ObjectIndexFile.Flush(); + ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); + if (Ec) { - std::error_code Ec; - fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless - fs::rename(STmpIndexPath, IndexPath, Ec); - if (Ec) + std::filesystem::path TempFilePath = ObjectIndexFile.GetPath(); + ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message()); + + if (std::filesystem::is_regular_file(TempFilePath)) { - ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", STmpIndexPath, Ec.message()); + if (!std::filesystem::remove(TempFilePath, Ec) || Ec) + { + ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempFilePath, Ec.message()); + } } } - } - if (fs::is_regular_file(STmpIndexPath)) - { - std::error_code Ec; - if (!fs::remove(STmpIndexPath, Ec) || Ec) + else { - ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", STmpIndexPath, Ec.message()); + // We must only update the log flush position once the snapshot write succeeds + m_LogFlushPosition = LogCount; } } + catch (std::exception& Err) + { + ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); + } } uint64_t -ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) +ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion) { ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile"); - if (std::filesystem::is_regular_file(IndexPath)) + if (!std::filesystem::is_regular_file(IndexPath)) { - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CacheBucketIndexHeader)) - { - CacheBucketIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && - (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0)) - { - switch (Header.Version) - { - case CacheBucketIndexHeader::Version2: - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); - if (Header.EntryCount > ExpectedEntryCount) - { - break; - } - size_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", - IndexPath, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); + return 0; + } - m_Configuration.PayloadAlignment = Header.PayloadAlignment; + auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); }); - std::vector<DiskIndexEntry> Entries; - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), - Header.EntryCount * sizeof(DiskIndexEntry), - sizeof(CacheBucketIndexHeader)); + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t FileSize = ObjectIndexFile.FileSize(); + if (FileSize < sizeof(CacheBucketIndexHeader)) + { + return 0; + } - m_Payloads.reserve(Header.EntryCount); - m_Index.reserve(Header.EntryCount); + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); - std::string InvalidEntryReason; - for (const DiskIndexEntry& Entry : Entries) - { - if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location}); - m_Index.insert_or_assign(Entry.Key, EntryIndex); - EntryCount++; - } - m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); - if (m_Configuration.EnableReferenceCaching) - { - m_FirstReferenceIndex.resize(m_Payloads.size()); - } - OutVersion = CacheBucketIndexHeader::Version2; - return Header.LogPosition; - } - break; - default: - break; - } - } + if (!Header.IsValid()) + { + return 0; + } + + if (Header.Version != CacheBucketIndexHeader::Version2) + { + return 0; + } + + const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + if (Header.EntryCount > ExpectedEntryCount) + { + return 0; + } + + InvalidGuard.Dismiss(); + + size_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + m_Configuration.PayloadAlignment = Header.PayloadAlignment; + + m_Payloads.reserve(Header.EntryCount); + m_Index.reserve(Header.EntryCount); + + BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024); + + uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader); + uint64_t RemainingEntryCount = Header.EntryCount; + + std::string InvalidEntryReason; + while (RemainingEntryCount--) + { + const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset); + CurrentReadOffset += sizeof(DiskIndexEntry); + + if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; } - ZEN_WARN("skipping invalid index file '{}'", IndexPath); + + const PayloadIndex EntryIndex = PayloadIndex(EntryCount); + m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location}); + m_Index.insert_or_assign(Entry->Key, EntryIndex); + + EntryCount++; } - return 0; + + ZEN_ASSERT(EntryCount == m_Payloads.size()); + + m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount())); + + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(EntryCount); + } + + OutVersion = CacheBucketIndexHeader::Version2; + return Header.LogPosition; } uint64_t -ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) +ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog"); - if (std::filesystem::is_regular_file(LogPath)) + if (!std::filesystem::is_regular_file(LogPath)) { - uint64_t LogEntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - TCasLogFile<DiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - if (CasLog.Initialize()) - { - uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); - SkipEntryCount = 0; - } - LogEntryCount = EntryCount - SkipEntryCount; - uint64_t InvalidEntryCount = 0; - CasLog.Replay( - [&](const DiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Location.Flags & DiskLocation::kTombStone) - { - m_Index.erase(Record.Key); - return; - } - if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); - ++InvalidEntryCount; - return; - } - PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); - m_Index.insert_or_assign(Record.Key, EntryIndex); - }, - SkipEntryCount); - m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); - if (m_Configuration.EnableReferenceCaching) + return 0; + } + + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (!CasLog.Initialize()) + { + return 0; + } + + const uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + + LogEntryCount = EntryCount - SkipEntryCount; + uint64_t InvalidEntryCount = 0; + + CasLog.Replay( + [&](const DiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Location.Flags & DiskLocation::kTombStone) { - m_FirstReferenceIndex.resize(m_Payloads.size()); + // Note: this leaves m_Payloads and other arrays with 'holes' in them + m_Index.erase(Record.Key); + return; } - if (InvalidEntryCount) + + if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; } - return LogEntryCount; - } + PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); + m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); + m_Index.insert_or_assign(Record.Key, EntryIndex); + }, + SkipEntryCount); + + m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); + + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(m_Payloads.size()); } - return 0; + + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); + } + + return LogEntryCount; }; void -ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) +ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew) { ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog"); @@ -639,7 +1067,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) if (std::filesystem::is_regular_file(IndexPath)) { uint32_t IndexVersion = 0; - m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); + m_LogFlushPosition = ReadIndexFile(IndexLock, IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); @@ -652,19 +1080,18 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) { if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath)) { - LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); + LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition); } else if (fs::is_regular_file(LogPath)) { - ZEN_WARN("removing invalid cas log at '{}'", LogPath); + ZEN_WARN("removing invalid log at '{}'", LogPath); std::filesystem::remove(LogPath); } } m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - std::vector<BlockStoreLocation> KnownLocations; - KnownLocations.reserve(m_Index.size()); + BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_Index) { size_t EntryIndex = Entry.second; @@ -674,19 +1101,19 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); - continue; } - const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); - KnownLocations.push_back(BlockLocation); + else + { + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); + KnownBlocks.Add(BlockLocation.BlockIndex); + } } - - m_BlockStore.SyncExistingBlocksOnDisk(KnownLocations); + m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); if (IsNew || LogEntryCount > 0) { - MakeIndexSnapshot(); + WriteIndexSnapshot(IndexLock); } - // TODO: should validate integrity of container files here } void @@ -759,7 +1186,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal return false; } - size_t EntryIndex = It.value(); + PayloadIndex EntryIndex = It.value(); m_AccessTimes[EntryIndex] = GcClock::TickCount(); DiskLocation Location = m_Payloads[EntryIndex].Location; @@ -776,7 +1203,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal if (Payload->MemCached) { - OutValue.Value = m_MemCachedPayloads[Payload->MemCached]; + OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload; Payload = nullptr; IndexLock.ReleaseNow(); m_MemoryHitCount++; @@ -803,14 +1230,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal { ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache"); OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); - RwLock::ExclusiveLockScope _(m_IndexLock); + RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end()) { - BucketPayload& WritePayload = m_Payloads[EntryIndex]; + BucketPayload& WritePayload = m_Payloads[UpdateIt->second]; // Only update if it has not already been updated by other thread if (!WritePayload.MemCached) { - SetMemCachedData(WritePayload, OutValue.Value); + SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); } } } @@ -835,7 +1262,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); OutValue.RawSize = OutValue.Value.GetSize(); } - RwLock::ExclusiveLockScope __(m_IndexLock); + RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) { BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; @@ -843,7 +1270,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal // Only set if no other path has already updated the meta data if (!WritePayload.MetaData) { - SetMetaData(WritePayload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash}); + SetMetaData(UpdateIndexLock, WritePayload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash}); } } } @@ -877,48 +1304,84 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& m_DiskWriteCount++; } -void +uint64_t ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) { + ZEN_TRACE_CPU("Z$::Disk::Bucket::MemCacheTrim"); + + uint64_t Trimmed = 0; GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - RwLock::ExclusiveLockScope _(m_IndexLock); - for (const auto& Kv : m_Index) + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); + if (MemCachedCount == 0) { - if (m_AccessTimes[Kv.second] < ExpireTicks) + return 0; + } + + uint32_t WriteIndex = 0; + for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) + { + MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; + if (!Data.Payload) { - BucketPayload& Payload = m_Payloads[Kv.second]; - RemoveMemCachedData(Payload); + continue; } + PayloadIndex Index = Data.OwnerIndex; + ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); + GcClock::Tick AccessTime = m_AccessTimes[Index]; + if (AccessTime < ExpireTicks) + { + size_t PayloadSize = Data.Payload.GetSize(); + RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); + Data = {}; + m_Payloads[Index].MemCached = {}; + Trimmed += PayloadSize; + continue; + } + if (ReadIndex > WriteIndex) + { + m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index}; + m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex); + } + WriteIndex++; } + m_MemCachedPayloads.resize(WriteIndex); + m_MemCachedPayloads.shrink_to_fit(); + zen::Reset(m_FreeMemCachedPayloads); + return Trimmed; } void -ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart, - GcClock::Duration SectionLength, - std::vector<uint64_t>& InOutUsageSlots) +ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots) { + ZEN_TRACE_CPU("Z$::Disk::Bucket::GetUsageByAccess"); + + size_t SlotCount = InOutUsageSlots.capacity(); RwLock::SharedLockScope _(m_IndexLock); - for (const auto& It : m_Index) + uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); + if (MemCachedCount == 0) { - size_t Index = It.second; - BucketPayload& Payload = m_Payloads[Index]; - if (!Payload.MemCached) + return; + } + for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) + { + MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; + if (!Data.Payload) { continue; } + PayloadIndex Index = Data.OwnerIndex; + ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); - GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch(); - uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0); - if (Slot >= InOutUsageSlots.capacity()) + GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0); + size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1); + ZEN_ASSERT_SLOW(Slot < SlotCount); + if (Slot >= InOutUsageSlots.size()) { - Slot = InOutUsageSlots.capacity() - 1; + InOutUsageSlots.resize(Slot + 1, 0); } - if (Slot > InOutUsageSlots.size()) - { - InOutUsageSlots.resize(uint64_t(Slot + 1), 0); - } - InOutUsageSlots[Slot] += m_MemCachedPayloads[Payload.MemCached].GetSize(); + InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize()); } } @@ -976,20 +1439,7 @@ ZenCacheDiskLayer::CacheBucket::Flush() m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - std::vector<BucketMetaData> MetaDatas; - IndexMap Index; - - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - MakeIndexSnapshot(); - Index = m_Index; - Payloads = m_Payloads; - AccessTimes = m_AccessTimes; - MetaDatas = m_MetaDatas; - } - SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas)); + SaveSnapshot(); } catch (std::exception& Ex) { @@ -998,113 +1448,108 @@ ZenCacheDiskLayer::CacheBucket::Flush() } void -ZenCacheDiskLayer::CacheBucket::SaveManifest(CbObject&& Manifest, const std::function<uint64_t()>& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest"); try { - IoBuffer Buffer = Manifest.GetBuffer().AsIoBuffer(); - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); - if (Error) - { - ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); - return; - } - bool EnoughSpace = Space.Free >= Buffer.GetSize() + 1024 * 512; - if (!EnoughSpace) - { - uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); - EnoughSpace = (Space.Free + ReclaimedSpace) >= Buffer.GetSize() + 1024 * 512; - } - if (!EnoughSpace) - { - ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", m_BucketDir, NiceBytes(Buffer.GetSize())); - return; - } - WriteFile(m_BucketDir / "zen_manifest", Buffer); - } - catch (std::exception& Err) - { - ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); - } -} + bool UseLegacyScheme = false; -CbObject -ZenCacheDiskLayer::CacheBucket::MakeManifest(IndexMap&& Index, - std::vector<AccessTime>&& AccessTimes, - const std::vector<BucketPayload>& Payloads, - const std::vector<BucketMetaData>& MetaDatas) -{ - using namespace std::literals; + IoBuffer Buffer; + BucketManifestSerializer ManifestWriter; - ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeManifest"); - - size_t ItemCount = Index.size(); + if (UseLegacyScheme) + { + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + std::vector<BucketMetaData> MetaDatas; + IndexMap Index; - // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth - // And we don't need to reallocate theunderying buffer in almost every case - const size_t EstimatedSizePerItem = 54u; - const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); - CbObjectWriter Writer(ReserveSize); + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + WriteIndexSnapshot(IndexLock); + // Note: this copy could be eliminated on shutdown to + // reduce memory usage and execution time + Index = m_Index; + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + MetaDatas = m_MetaDatas; + } - Writer << "BucketId"sv << m_BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; + Buffer = ManifestWriter.MakeManifest(m_BucketId, + std::move(Index), + std::move(AccessTimes), + std::move(Payloads), + std::move(MetaDatas)); + const uint64_t RequiredSpace = Buffer.GetSize() + 1024 * 512; - if (!Index.empty()) - { - Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size())); - Writer.BeginArray("Keys"sv); - for (auto& Kv : Index) - { - const IoHash& Key = Kv.first; - Writer.AddHash(Key); + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) + { + ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); + return; + } + bool EnoughSpace = Space.Free >= RequiredSpace; + if (!EnoughSpace) + { + uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); + EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; + } + if (!EnoughSpace) + { + ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", + m_BucketDir, + NiceBytes(Buffer.GetSize())); + return; + } } - Writer.EndArray(); - - Writer.BeginArray("Timestamps"sv); - for (auto& Kv : Index) + else { - GcClock::Tick AccessTime = AccessTimes[Kv.second]; - Writer.AddInteger(AccessTime); - } - Writer.EndArray(); + RwLock::SharedLockScope IndexLock(m_IndexLock); + WriteIndexSnapshot(IndexLock); + const uint64_t EntryCount = m_Index.size(); + Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); + uint64_t SidecarSize = ManifestWriter.GetSidecarSize(); - if (!MetaDatas.empty()) - { - Writer.BeginArray("RawHash"sv); - for (auto& Kv : Index) + const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512; + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) { - const BucketPayload& Payload = Payloads[Kv.second]; - if (Payload.MetaData) - { - Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); - } - else - { - Writer.AddHash(IoHash::Zero); - } + ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); + return; } - Writer.EndArray(); - - Writer.BeginArray("RawSize"sv); - for (auto& Kv : Index) + bool EnoughSpace = Space.Free >= RequiredSpace; + if (!EnoughSpace) { - const BucketPayload& Payload = Payloads[Kv.second]; - if (Payload.MetaData) - { - Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); - } - else - { - Writer.AddInteger(0); - } + uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); + EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; } - Writer.EndArray(); + if (!EnoughSpace) + { + ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", + m_BucketDir, + NiceBytes(Buffer.GetSize())); + return; + } + + ManifestWriter.WriteSidecarFile(IndexLock, + GetMetaPath(m_BucketDir, m_BucketName), + m_LogFlushPosition, + m_Index, + m_AccessTimes, + m_Payloads, + m_MetaDatas); } + + std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); + WriteFile(ManifestPath, Buffer); + } + catch (std::exception& Err) + { + ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); } - return Writer.Save(); } IoHash @@ -1364,8 +1809,8 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed); } - RemoveMemCachedData(Payload); - RemoveMetaData(Payload); + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); Location.Flags |= DiskLocation::kTombStone; LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); @@ -1395,13 +1840,13 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) std::vector<BucketPayload> Payloads; std::vector<AccessTime> AccessTimes; std::vector<BucketMetaData> MetaDatas; - std::vector<IoBuffer> MemCachedPayloads; + std::vector<MemCacheData> MemCachedPayloads; std::vector<ReferenceIndex> FirstReferenceIndex; IndexMap Index; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); } } } @@ -1463,6 +1908,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); #endif // CALCULATE_BLOCKING_TIME + if (m_Index.empty()) + { + return; + } Index = m_Index; AccessTimes = m_AccessTimes; Payloads = m_Payloads; @@ -1542,10 +1991,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) for (const auto& Entry : StructuredItemsWithUnknownAttachments) { - const IoHash& Key = Entry.first; - size_t PayloadIndex = Entry.second; - BucketPayload& Payload = Payloads[PayloadIndex]; - const DiskLocation& Loc = Payload.Location; + const IoHash& Key = Entry.first; + BucketPayload& Payload = Payloads[Entry.second]; + const DiskLocation& Loc = Payload.Location; { IoBuffer Buffer; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) @@ -1568,10 +2016,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) #endif // CALCULATE_BLOCKING_TIME if (auto It = m_Index.find(Key); It != m_Index.end()) { - const BucketPayload& CachedPayload = Payloads[PayloadIndex]; + const BucketPayload& CachedPayload = Payloads[It->second]; if (CachedPayload.MemCached) { - Buffer = m_MemCachedPayloads[CachedPayload.MemCached]; + Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload; ZEN_ASSERT_SLOW(Buffer); } else @@ -1678,20 +2126,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) try { - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - std::vector<BucketMetaData> MetaDatas; - IndexMap Index; - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - MakeIndexSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); - Index = m_Index; - Payloads = m_Payloads; - AccessTimes = m_AccessTimes; - MetaDatas = m_MetaDatas; - } - SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas), - [&]() { return GcCtx.ClaimGCReserve(); }); + SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); } catch (std::exception& Ex) { @@ -1699,8 +2134,6 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) } }); - m_SlogFile.Flush(); - auto __ = MakeGuard([&]() { if (!DeletedChunks.empty()) { @@ -1708,7 +2141,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) std::vector<BucketPayload> Payloads; std::vector<AccessTime> AccessTimes; std::vector<BucketMetaData> MetaDatas; - std::vector<IoBuffer> MemCachedPayloads; + std::vector<MemCacheData> MemCachedPayloads; std::vector<ReferenceIndex> FirstReferenceIndex; IndexMap Index; { @@ -1719,18 +2152,25 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); - CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); } GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end())); } }); - std::span<const IoHash> ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); + std::span<const IoHash> ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); + if (ExpiredCacheKeySpan.empty()) + { + return; + } + + m_SlogFile.Flush(); + std::unordered_set<IoHash, IoHash::Hasher> ExpiredCacheKeys(ExpiredCacheKeySpan.begin(), ExpiredCacheKeySpan.end()); std::vector<DiskIndexEntry> ExpiredStandaloneEntries; - IndexMap Index; - std::vector<BucketPayload> Payloads; + IndexMap IndexSnapshot; + std::vector<BucketPayload> PayloadsSnapshot; BlockStore::ReclaimSnapshotState BlockStoreState; { bool Expected = false; @@ -1741,7 +2181,6 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - std::vector<AccessTime> AccessTimes; { ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State"); RwLock::SharedLockScope IndexLock(m_IndexLock); @@ -1755,23 +2194,23 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - Payloads = m_Payloads; - AccessTimes = m_AccessTimes; - Index = m_Index; - for (const IoHash& Key : ExpiredCacheKeys) { - if (auto It = Index.find(Key); It != Index.end()) + if (auto It = m_Index.find(Key); It != m_Index.end()) { - const BucketPayload& Payload = Payloads[It->second]; - DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; - if (Entry.Location.Flags & DiskLocation::kStandaloneFile) + const BucketPayload& Payload = m_Payloads[It->second]; + if (Payload.Location.Flags & DiskLocation::kStandaloneFile) { + DiskIndexEntry Entry = {.Key = Key, .Location = Payload.Location}; Entry.Location.Flags |= DiskLocation::kTombStone; ExpiredStandaloneEntries.push_back(Entry); } } } + + PayloadsSnapshot = m_Payloads; + IndexSnapshot = m_Index; + if (GcCtx.IsDeletionMode()) { IndexLock.ReleaseNow(); @@ -1836,7 +2275,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) } } - TotalChunkCount = Index.size(); + TotalChunkCount = IndexSnapshot.size(); std::vector<BlockStoreLocation> ChunkLocations; BlockStore::ChunkIndexArray KeepChunkIndexes; @@ -1846,10 +2285,10 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) ChunkIndexToChunkHash.reserve(TotalChunkCount); { TotalChunkCount = 0; - for (const auto& Entry : Index) + for (const auto& Entry : IndexSnapshot) { size_t EntryIndex = Entry.second; - const DiskLocation& DiskLocation = Payloads[EntryIndex].Location; + const DiskLocation& DiskLocation = PayloadsSnapshot[EntryIndex].Location; if (DiskLocation.Flags & DiskLocation::kStandaloneFile) { @@ -1894,7 +2333,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) std::vector<DiskIndexEntry> LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); { - RwLock::ExclusiveLockScope __(m_IndexLock); + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); Stopwatch Timer; const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); @@ -1908,7 +2347,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; size_t EntryIndex = m_Index[ChunkHash]; BucketPayload& Payload = m_Payloads[EntryIndex]; - if (Payloads[Index[ChunkHash]].Location != m_Payloads[EntryIndex].Location) + if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != m_Payloads[EntryIndex].Location) { // Entry has been updated while GC was running, ignore the move continue; @@ -1921,7 +2360,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; size_t EntryIndex = m_Index[ChunkHash]; BucketPayload& Payload = m_Payloads[EntryIndex]; - if (Payloads[Index[ChunkHash]].Location != Payload.Location) + if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != Payload.Location) { // Entry has been updated while GC was running, ignore the delete continue; @@ -1932,8 +2371,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_Configuration.PayloadAlignment, OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); - RemoveMemCachedData(Payload); - RemoveMetaData(Payload); + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); m_Index.erase(ChunkHash); DeletedChunks.insert(ChunkHash); @@ -1970,7 +2409,7 @@ ZenCacheDiskLayer::CacheBucket::EntryCount() const } CacheValueDetails::ValueDetails -ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, PayloadIndex Index) const +ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const IoHash& Key, PayloadIndex Index) const { std::vector<IoHash> Attachments; const BucketPayload& Payload = m_Payloads[Index]; @@ -1982,7 +2421,7 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, PayloadIndex CbObjectView Obj(Value.GetData()); Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); } - BucketMetaData MetaData = GetMetaData(Payload); + BucketMetaData MetaData = GetMetaData(IndexLock, Payload); return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), .RawSize = MetaData.RawSize, .RawHash = MetaData.RawHash, @@ -1992,7 +2431,7 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, PayloadIndex } CacheValueDetails::BucketDetails -ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const +ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const { CacheValueDetails::BucketDetails Details; RwLock::SharedLockScope _(m_IndexLock); @@ -2001,7 +2440,7 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilt Details.Values.reserve(m_Index.size()); for (const auto& It : m_Index) { - Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second)); + Details.Values.insert_or_assign(It.first, GetValueDetails(IndexLock, It.first, It.second)); } } else @@ -2009,7 +2448,7 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilt IoHash Key = IoHash::FromHexString(ValueFilter); if (auto It = m_Index.find(Key); It != m_Index.end()) { - Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second)); + Details.Values.insert_or_assign(It->first, GetValueDetails(IndexLock, It->first, It->second)); } } return Details; @@ -2019,10 +2458,10 @@ void ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents( std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const { - RwLock::SharedLockScope _(m_IndexLock); + RwLock::SharedLockScope IndexLock(m_IndexLock); for (const auto& It : m_Index) { - CacheValueDetails::ValueDetails Vd = GetValueDetails(It.first, It.second); + CacheValueDetails::ValueDetails Vd = GetValueDetails(IndexLock, It.first, It.second); Fn(It.first, Vd); } @@ -2046,7 +2485,10 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { Bucket->CollectGarbage(GcCtx); } - MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); + if (!m_IsMemCacheTrimming) + { + MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); + } } void @@ -2166,6 +2608,10 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c RwLock::ExclusiveLockScope IndexLock(m_IndexLock); ValueLock.ReleaseNow(); + if (m_UpdatedKeys) + { + m_UpdatedKeys->insert(HashKey); + } PayloadIndex EntryIndex = {}; if (auto It = m_Index.find(HashKey); It == m_Index.end()) @@ -2193,16 +2639,16 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); } m_AccessTimes[EntryIndex] = GcClock::TickCount(); - RemoveMemCachedData(Payload); + RemoveMemCachedData(IndexLock, Payload); m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed); } if (Value.RawSize != 0 || Value.RawHash != IoHash::Zero) { - SetMetaData(m_Payloads[EntryIndex], {.RawSize = Value.RawSize, .RawHash = Value.RawHash}); + SetMetaData(IndexLock, m_Payloads[EntryIndex], {.RawSize = Value.RawSize, .RawHash = Value.RawHash}); } else { - RemoveMetaData(m_Payloads[EntryIndex]); + RemoveMetaData(IndexLock, m_Payloads[EntryIndex]); } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); @@ -2210,7 +2656,9 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c } void -ZenCacheDiskLayer::CacheBucket::SetMetaData(BucketPayload& Payload, const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData) +ZenCacheDiskLayer::CacheBucket::SetMetaData(RwLock::ExclusiveLockScope&, + BucketPayload& Payload, + const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData) { if (Payload.MetaData) { @@ -2233,7 +2681,7 @@ ZenCacheDiskLayer::CacheBucket::SetMetaData(BucketPayload& Payload, const ZenCac } void -ZenCacheDiskLayer::CacheBucket::RemoveMetaData(BucketPayload& Payload) +ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload) { if (Payload.MetaData) { @@ -2243,17 +2691,18 @@ ZenCacheDiskLayer::CacheBucket::RemoveMetaData(BucketPayload& Payload) } void -ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffer& MemCachedData) +ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData) { - uint64_t PayloadSize = MemCachedData.GetSize(); + BucketPayload& Payload = m_Payloads[PayloadIndex]; + uint64_t PayloadSize = MemCachedData.GetSize(); ZEN_ASSERT(PayloadSize != 0); if (m_FreeMemCachedPayloads.empty()) { if (m_MemCachedPayloads.size() != std::numeric_limits<uint32_t>::max()) { Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size())); - m_MemCachedPayloads.push_back(MemCachedData); - AddMemCacheUsage(PayloadSize); + m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}); + AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } } @@ -2261,20 +2710,20 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffe { Payload.MemCached = m_FreeMemCachedPayloads.back(); m_FreeMemCachedPayloads.pop_back(); - m_MemCachedPayloads[Payload.MemCached] = MemCachedData; - AddMemCacheUsage(PayloadSize); + m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}; + AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); m_MemoryWriteCount++; } } size_t -ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(BucketPayload& Payload) +ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload) { if (Payload.MemCached) { - size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].GetSize(); - RemoveMemCacheUsage(PayloadSize); - m_MemCachedPayloads[Payload.MemCached] = IoBuffer{}; + size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize(); + RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); + m_MemCachedPayloads[Payload.MemCached] = {}; m_FreeMemCachedPayloads.push_back(Payload.MemCached); Payload.MemCached = {}; return PayloadSize; @@ -2283,7 +2732,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(BucketPayload& Payload) } ZenCacheDiskLayer::CacheBucket::BucketMetaData -ZenCacheDiskLayer::CacheBucket::GetMetaData(const BucketPayload& Payload) const +ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const { if (Payload.MetaData) { @@ -2316,14 +2765,18 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const m_SlogFile.Append({.Key = HashKey, .Location = Location}); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + if (m_UpdatedKeys) + { + m_UpdatedKeys->insert(HashKey); + } if (auto It = m_Index.find(HashKey); It != m_Index.end()) { PayloadIndex EntryIndex = It.value(); ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); BucketPayload& Payload = m_Payloads[EntryIndex]; - RemoveMemCachedData(Payload); - RemoveMetaData(Payload); + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); Payload = (BucketPayload{.Location = Location}); m_AccessTimes[EntryIndex] = GcClock::TickCount(); @@ -2354,12 +2807,246 @@ ZenCacheDiskLayer::CacheBucket::GetGcName(GcCtx&) return fmt::format("cachebucket:'{}'", m_BucketDir.string()); } -void -ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcReferencerStats& Stats) +class DiskBucketStoreCompactor : public GcStoreCompactor { - size_t TotalEntries = 0; - tsl::robin_set<IoHash, IoHash::Hasher> ExpiredInlineKeys; - std::vector<std::pair<IoHash, uint64_t>> ExpiredStandaloneKeys; +public: + DiskBucketStoreCompactor(ZenCacheDiskLayer::CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys) + : m_Bucket(Bucket) + , m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys)) + { + m_ExpiredStandaloneKeys.shrink_to_fit(); + } + + virtual ~DiskBucketStoreCompactor() {} + + virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override + { + ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactStore"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + Reset(m_ExpiredStandaloneKeys); + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': RemovedDisk: {} in {}", + m_Bucket.m_BucketDir, + NiceBytes(Stats.RemovedDisk), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + if (!m_ExpiredStandaloneKeys.empty()) + { + // Compact standalone items + size_t Skipped = 0; + ExtendablePathBuilder<256> Path; + for (const std::pair<IoHash, uint64_t>& ExpiredKey : m_ExpiredStandaloneKeys) + { + if (Ctx.IsCancelledFlag.load()) + { + return; + } + Path.Reset(); + m_Bucket.BuildPath(Path, ExpiredKey.first); + fs::path FilePath = Path.ToPath(); + + RwLock::SharedLockScope IndexLock(m_Bucket.m_IndexLock); + if (m_Bucket.m_Index.contains(ExpiredKey.first)) + { + // Someone added it back, let the file on disk be + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back", + m_Bucket.m_BucketDir, + Path.ToUtf8()); + continue; + } + + if (Ctx.Settings.IsDeleteMode) + { + RwLock::ExclusiveLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); + IndexLock.ReleaseNow(); + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); + + std::error_code Ec; + if (!fs::remove(FilePath, Ec)) + { + continue; + } + if (Ec) + { + ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'", + m_Bucket.m_BucketDir, + Path.ToUtf8(), + Ec.message()); + continue; + } + Stats.RemovedDisk += ExpiredKey.second; + } + else + { + RwLock::SharedLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); + IndexLock.ReleaseNow(); + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); + + std::error_code Ec; + bool Existed = std::filesystem::is_regular_file(FilePath, Ec); + if (Ec) + { + ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'", + m_Bucket.m_BucketDir, + FilePath, + Ec.message()); + continue; + } + if (!Existed) + { + continue; + } + Skipped++; + } + } + if (Skipped > 0) + { + ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipped deleting of {} eligible files", m_Bucket.m_BucketDir, Skipped); + } + } + + if (Ctx.Settings.CollectSmallObjects) + { + m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys = std::make_unique<HashSet>(); }); + auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys.reset(); }); }); + + size_t InlineEntryCount = 0; + BlockStore::BlockUsageMap BlockUsage; + { + RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); + for (const auto& Entry : m_Bucket.m_Index) + { + ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index]; + const DiskLocation& Loc = Payload.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + continue; + } + InlineEntryCount++; + uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex(); + uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment); + if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end()) + { + It->second.EntryCount++; + It->second.DiskUsage += ChunkSize; + } + else + { + BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); + } + } + } + + { + BlockStoreCompactState BlockCompactState; + std::vector<IoHash> BlockCompactStateKeys; + BlockCompactStateKeys.reserve(InlineEntryCount); + + BlockStore::BlockEntryCountMap BlocksToCompact = + m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); + BlockCompactState.IncludeBlocks(BlocksToCompact); + + if (BlocksToCompact.size() > 0) + { + { + RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); + for (const auto& Entry : m_Bucket.m_Index) + { + ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index]; + const DiskLocation& Loc = Payload.Location; + + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + continue; + } + if (!BlockCompactState.AddKeepLocation(Loc.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment))) + { + continue; + } + BlockCompactStateKeys.push_back(Entry.first); + } + } + + if (Ctx.Settings.IsDeleteMode) + { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks", + m_Bucket.m_BucketDir, + BlocksToCompact.size()); + } + + m_Bucket.m_BlockStore.CompactBlocks( + BlockCompactState, + m_Bucket.m_Configuration.PayloadAlignment, + [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { + std::vector<DiskIndexEntry> MovedEntries; + MovedEntries.reserve(MovedArray.size()); + RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock); + for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) + { + size_t ChunkIndex = Moved.first; + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + + if (m_Bucket.m_UpdatedKeys->contains(Key)) + { + continue; + } + + if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end()) + { + ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; + const BlockStoreLocation& NewLocation = Moved.second; + Payload.Location = DiskLocation(NewLocation, + m_Bucket.m_Configuration.PayloadAlignment, + Payload.Location.GetFlags()); + MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); + } + } + m_Bucket.m_SlogFile.Append(MovedEntries); + Stats.RemovedDisk += FreedDiskSpace; + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + return true; + }, + ClaimDiskReserveCallback); + } + else + { + if (Ctx.Settings.Verbose) + { + ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks", + m_Bucket.m_BucketDir, + BlocksToCompact.size()); + } + } + } + } + } + } + +private: + ZenCacheDiskLayer::CacheBucket& m_Bucket; + std::vector<std::pair<IoHash, uint64_t>> m_ExpiredStandaloneKeys; +}; + +GcStoreCompactor* +ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) +{ + ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveExpiredData"); + + size_t TotalEntries = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -2367,36 +3054,38 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcReferencerStats& { return; } - ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, RemovedDisk: {}, RemovedMemory: {} in {}", + ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}", m_BucketDir, - Stats.Count, - Stats.Expired, - Stats.Deleted, - NiceBytes(Stats.RemovedDisk), - NiceBytes(Stats.RemovedMemory), + Stats.CheckedCount, + Stats.FoundCount, + Stats.DeletedCount, + NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count(); - BlockStoreCompactState BlockCompactState; - BlockStore::ReclaimSnapshotState BlockSnapshotState; - std::vector<IoHash> BlockCompactStateKeys; - std::vector<DiskIndexEntry> ExpiredEntries; - uint64_t RemovedStandaloneSize = 0; + std::vector<DiskIndexEntry> ExpiredEntries; + std::vector<std::pair<IoHash, uint64_t>> ExpiredStandaloneKeys; + uint64_t RemovedStandaloneSize = 0; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - if (Ctx.Settings.CollectSmallObjects) + if (Ctx.IsCancelledFlag.load()) + { + return nullptr; + } + if (m_Index.empty()) { - BlockSnapshotState = m_BlockStore.GetReclaimSnapshotState(); + return nullptr; } + TotalEntries = m_Index.size(); - // Find out expired keys and affected blocks + // Find out expired keys for (const auto& Entry : m_Index) { const IoHash& Key = Entry.first; - size_t EntryIndex = Entry.second; + PayloadIndex EntryIndex = Entry.second; GcClock::Tick AccessTime = m_AccessTimes[EntryIndex]; if (AccessTime >= ExpireTicks) { @@ -2415,40 +3104,16 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcReferencerStats& } else if (Ctx.Settings.CollectSmallObjects) { - ExpiredInlineKeys.insert(Key); - uint32_t BlockIndex = Payload.Location.Location.BlockLocation.GetBlockIndex(); - bool IsActiveWriteBlock = BlockSnapshotState.m_ActiveWriteBlocks.contains(BlockIndex); - if (!IsActiveWriteBlock) - { - BlockCompactState.IncludeBlock(BlockIndex); - } ExpiredEntries.push_back(ExpiredEntry); } } - Stats.Expired += ExpiredStandaloneKeys.size() + ExpiredInlineKeys.size(); + Stats.CheckedCount += TotalEntries; + Stats.FoundCount += ExpiredEntries.size(); - // Get all locations we need to keep for affected blocks - if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty()) + if (Ctx.IsCancelledFlag.load()) { - for (const auto& Entry : m_Index) - { - const IoHash& Key = Entry.first; - if (ExpiredInlineKeys.contains(Key)) - { - continue; - } - size_t EntryIndex = Entry.second; - const BucketPayload& Payload = m_Payloads[EntryIndex]; - if (Payload.Location.Flags & DiskLocation::kStandaloneFile) - { - continue; - } - if (BlockCompactState.AddKeepLocation(Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment))) - { - BlockCompactStateKeys.push_back(Key); - } - } + return nullptr; } if (Ctx.Settings.IsDeleteMode) @@ -2458,132 +3123,291 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcReferencerStats& auto It = m_Index.find(Entry.Key); ZEN_ASSERT(It != m_Index.end()); BucketPayload& Payload = m_Payloads[It->second]; - RemoveMetaData(Payload); - Stats.RemovedMemory += RemoveMemCachedData(Payload); + RemoveMetaData(IndexLock, Payload); + Stats.FreedMemory += RemoveMemCachedData(IndexLock, Payload); m_Index.erase(It); + Stats.DeletedCount++; } m_SlogFile.Append(ExpiredEntries); m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed); } } - Stats.Count += TotalEntries; - if (ExpiredEntries.empty()) + if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty()) { - return; + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + std::vector<BucketMetaData> MetaDatas; + std::vector<MemCacheData> MemCachedPayloads; + std::vector<ReferenceIndex> FirstReferenceIndex; + IndexMap Index; + { + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); + } } - if (!Ctx.Settings.IsDeleteMode) + if (Ctx.IsCancelledFlag.load()) { - return; + return nullptr; } - Stats.Deleted += ExpiredEntries.size(); - - // Compact standalone items - ExtendablePathBuilder<256> Path; - for (const std::pair<IoHash, uint64_t>& ExpiredKey : ExpiredStandaloneKeys) - { - Path.Reset(); - BuildPath(Path, ExpiredKey.first); - fs::path FilePath = Path.ToPath(); + return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); +} - RwLock::SharedLockScope IndexLock(m_IndexLock); - if (m_Index.contains(ExpiredKey.first)) - { - // Someone added it back, let the file on disk be - ZEN_DEBUG("gc cache bucket '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back", - m_BucketDir, - Path.ToUtf8()); - continue; - } +class DiskBucketReferenceChecker : public GcReferenceChecker +{ + using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; + using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; + using CacheBucket = ZenCacheDiskLayer::CacheBucket; + using ReferenceIndex = ZenCacheDiskLayer::CacheBucket::ReferenceIndex; - RwLock::ExclusiveLockScope ValueLock(LockForHash(ExpiredKey.first)); - IndexLock.ReleaseNow(); - ZEN_DEBUG("gc cache bucket '{}': deleting standalone cache file '{}'", m_BucketDir, Path.ToUtf8()); +public: + DiskBucketReferenceChecker(CacheBucket& Owner) : m_CacheBucket(Owner) {} - std::error_code Ec; - if (!fs::remove(FilePath, Ec)) + virtual ~DiskBucketReferenceChecker() + { + try { - continue; + m_IndexLock.reset(); + if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it + m_CacheBucket.ClearReferenceCache(); + } } - if (Ec) + catch (std::exception& Ex) { - ZEN_WARN("gc cache bucket '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'", - m_BucketDir, - Path.ToUtf8(), - Ec.message()); - continue; + ZEN_ERROR("~DiskBucketReferenceChecker threw exception: '{}'", Ex.what()); } - Stats.RemovedDisk += ExpiredKey.second; } - if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty()) + virtual void PreCache(GcCtx& Ctx) override { - // Compact block store - m_BlockStore.CompactBlocks( - BlockCompactState, - m_Configuration.PayloadAlignment, - [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { - std::vector<DiskIndexEntry> MovedEntries; - RwLock::ExclusiveLockScope _(m_IndexLock); - for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) - { - size_t ChunkIndex = Moved.first; - const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; + ZEN_TRACE_CPU("Z$::Disk::Bucket::PreCache"); - if (auto It = m_Index.find(Key); It != m_Index.end()) + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}", + m_CacheBucket.m_BucketDir, + m_CacheBucket.m_ReferenceCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + std::vector<IoHash> UpdateKeys; + std::vector<size_t> ReferenceCounts; + std::vector<IoHash> References; + + auto GetAttachments = [&References, &ReferenceCounts](const void* CbObjectData) { + size_t CurrentReferenceCount = References.size(); + CbObjectView Obj(CbObjectData); + Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); + ReferenceCounts.push_back(References.size() - CurrentReferenceCount); + }; + + // Refresh cache + { + // If reference caching is enabled the references will be updated at modification for us so we don't need to track modifications + if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys = std::make_unique<HashSet>(); }); + } + + std::vector<IoHash> StandaloneKeys; + { + std::vector<IoHash> InlineKeys; + std::unordered_map<uint32_t, std::size_t> BlockIndexToEntriesPerBlockIndex; + struct InlineEntry + { + uint32_t InlineKeyIndex; + uint32_t Offset; + uint32_t Size; + }; + std::vector<std::vector<InlineEntry>> EntriesPerBlock; + size_t UpdateCount = 0; + { + RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); + for (const auto& Entry : m_CacheBucket.m_Index) { - BucketPayload& Payload = m_Payloads[It->second]; - const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); - if (Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment) != OldLocation) + if (Ctx.IsCancelledFlag.load()) + { + IndexLock.ReleaseNow(); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + return; + } + + PayloadIndex EntryIndex = Entry.second; + const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + if (m_CacheBucket.m_Configuration.EnableReferenceCaching && + m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown()) + { + continue; + } + UpdateCount++; + const IoHash& Key = Entry.first; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - // Someone has moved our chunk so lets just skip the new location we were provided, it will be GC:d at a later - // time + StandaloneKeys.push_back(Key); continue; } - const BlockStoreLocation& NewLocation = Moved.second; + BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment); + InlineEntry UpdateEntry = {.InlineKeyIndex = gsl::narrow<uint32_t>(InlineKeys.size()), + .Offset = gsl::narrow<uint32_t>(ChunkLocation.Offset), + .Size = gsl::narrow<uint32_t>(ChunkLocation.Size)}; + InlineKeys.push_back(Key); - Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); - MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); + if (auto It = BlockIndexToEntriesPerBlockIndex.find(ChunkLocation.BlockIndex); + It != BlockIndexToEntriesPerBlockIndex.end()) + { + EntriesPerBlock[It->second].emplace_back(UpdateEntry); + } + else + { + BlockIndexToEntriesPerBlockIndex.insert_or_assign(ChunkLocation.BlockIndex, EntriesPerBlock.size()); + EntriesPerBlock.emplace_back(std::vector<InlineEntry>{UpdateEntry}); + } } } - m_SlogFile.Append(MovedEntries); - Stats.RemovedDisk += FreedDiskSpace; - }, - [&]() { return 0; }); - } - std::vector<BucketPayload> Payloads; - std::vector<AccessTime> AccessTimes; - std::vector<BucketMetaData> MetaDatas; - std::vector<IoBuffer> MemCachedPayloads; - std::vector<ReferenceIndex> FirstReferenceIndex; - IndexMap Index; - { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); - } -} + UpdateKeys.reserve(UpdateCount); -class DiskBucketReferenceChecker : public GcReferenceChecker -{ -public: - DiskBucketReferenceChecker(ZenCacheDiskLayer::CacheBucket& Owner) : m_CacheBucket(Owner) {} + for (auto It : BlockIndexToEntriesPerBlockIndex) + { + uint32_t BlockIndex = It.first; + + Ref<BlockStoreFile> BlockFile = m_CacheBucket.m_BlockStore.GetBlockFile(BlockIndex); + if (BlockFile) + { + size_t EntriesPerBlockIndex = It.second; + std::vector<InlineEntry>& InlineEntries = EntriesPerBlock[EntriesPerBlockIndex]; + + std::sort(InlineEntries.begin(), InlineEntries.end(), [&](const InlineEntry& Lhs, const InlineEntry& Rhs) -> bool { + return Lhs.Offset < Rhs.Offset; + }); + + uint64_t BlockFileSize = BlockFile->FileSize(); + BasicFileBuffer BlockBuffer(BlockFile->GetBasicFile(), 32768); + for (const InlineEntry& InlineEntry : InlineEntries) + { + if ((InlineEntry.Offset + InlineEntry.Size) > BlockFileSize) + { + ReferenceCounts.push_back(0); + } + else + { + MemoryView ChunkView = BlockBuffer.MakeView(InlineEntry.Size, InlineEntry.Offset); + if (ChunkView.GetSize() == InlineEntry.Size) + { + GetAttachments(ChunkView.GetData()); + } + else + { + std::vector<uint8_t> Buffer(InlineEntry.Size); + BlockBuffer.Read(Buffer.data(), InlineEntry.Size, InlineEntry.Offset); + GetAttachments(Buffer.data()); + } + } + const IoHash& Key = InlineKeys[InlineEntry.InlineKeyIndex]; + UpdateKeys.push_back(Key); + } + } + } + } + { + for (const IoHash& Key : StandaloneKeys) + { + if (Ctx.IsCancelledFlag.load()) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + return; + } + + IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(ZenContentType::kCbObject, Key); + if (!Buffer) + { + continue; + } + + GetAttachments(Buffer.GetData()); + UpdateKeys.push_back(Key); + } + } + } - virtual ~DiskBucketReferenceChecker() - { - m_IndexLock.reset(); - if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) { - // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it - m_CacheBucket.ClearReferenceCache(); + size_t ReferenceOffset = 0; + RwLock::ExclusiveLockScope IndexLock(m_CacheBucket.m_IndexLock); + + if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + ZEN_ASSERT(m_CacheBucket.m_FirstReferenceIndex.empty()); + ZEN_ASSERT(m_CacheBucket.m_ReferenceHashes.empty()); + ZEN_ASSERT(m_CacheBucket.m_NextReferenceHashesIndexes.empty()); + ZEN_ASSERT(m_CacheBucket.m_ReferenceCount == 0); + ZEN_ASSERT(m_CacheBucket.m_UpdatedKeys); + + // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when + // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted. + m_CacheBucket.m_FirstReferenceIndex.resize(m_CacheBucket.m_Payloads.size(), ReferenceIndex::Unknown()); + m_CacheBucket.m_ReferenceHashes.reserve(References.size()); + m_CacheBucket.m_NextReferenceHashesIndexes.reserve(References.size()); + } + else + { + ZEN_ASSERT(!m_CacheBucket.m_UpdatedKeys); + } + + for (size_t Index = 0; Index < UpdateKeys.size(); Index++) + { + const IoHash& Key = UpdateKeys[Index]; + size_t ReferenceCount = ReferenceCounts[Index]; + if (auto It = m_CacheBucket.m_Index.find(Key); It != m_CacheBucket.m_Index.end()) + { + PayloadIndex EntryIndex = It->second; + if (m_CacheBucket.m_Configuration.EnableReferenceCaching) + { + if (m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown()) + { + // The reference data is valid and what we have is old/redundant + continue; + } + } + else if (m_CacheBucket.m_UpdatedKeys->contains(Key)) + { + // Our pre-cache data is invalid + continue; + } + + m_CacheBucket.SetReferences(IndexLock, + m_CacheBucket.m_FirstReferenceIndex[EntryIndex], + std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount}); + } + ReferenceOffset += ReferenceCount; + } + + if (m_CacheBucket.m_Configuration.EnableReferenceCaching && !UpdateKeys.empty()) + { + m_CacheBucket.CompactReferences(IndexLock); + } } } virtual void LockState(GcCtx& Ctx) override { + ZEN_TRACE_CPU("Z$::Disk::Bucket::LockState"); + Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) @@ -2597,22 +3421,42 @@ public: }); m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock); - - // Rescan to see if any cache items needs refreshing since last pass when we had the lock - for (const auto& Entry : m_CacheBucket.m_Index) + if (Ctx.IsCancelledFlag.load()) { - size_t PayloadIndex = Entry.second; - const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_CacheBucket.m_Payloads[PayloadIndex]; - const DiskLocation& Loc = Payload.Location; + m_UncachedReferences.clear(); + m_IndexLock.reset(); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + return; + } - if (!Loc.IsFlagSet(DiskLocation::kStructured)) - { - continue; - } - ZEN_ASSERT(!m_CacheBucket.m_FirstReferenceIndex.empty()); - const IoHash& Key = Entry.first; - if (m_CacheBucket.m_FirstReferenceIndex[PayloadIndex] == ZenCacheDiskLayer::CacheBucket::ReferenceIndex::Unknown()) + if (m_CacheBucket.m_UpdatedKeys) + { + const HashSet& UpdatedKeys(*m_CacheBucket.m_UpdatedKeys); + for (const IoHash& Key : UpdatedKeys) { + if (Ctx.IsCancelledFlag.load()) + { + m_UncachedReferences.clear(); + m_IndexLock.reset(); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); + return; + } + + auto It = m_CacheBucket.m_Index.find(Key); + if (It == m_CacheBucket.m_Index.end()) + { + continue; + } + + PayloadIndex EntryIndex = It->second; + const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + IoBuffer Buffer; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { @@ -2633,21 +3477,48 @@ public: } } - virtual void RemoveUsedReferencesFromSet(GcCtx&, HashSet& IoCids) override + virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { + ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveUsedReferencesFromSet"); + ZEN_ASSERT(m_IndexLock); + size_t InitialCount = IoCids.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachebucket [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", + m_CacheBucket.m_BucketDir, + InitialCount - IoCids.size(), + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes) { - IoCids.erase(ReferenceHash); + if (IoCids.erase(ReferenceHash) == 1) + { + if (IoCids.empty()) + { + return; + } + } } for (const IoHash& ReferenceHash : m_UncachedReferences) { - IoCids.erase(ReferenceHash); + if (IoCids.erase(ReferenceHash) == 1) + { + if (IoCids.empty()) + { + return; + } + } } } - ZenCacheDiskLayer::CacheBucket& m_CacheBucket; + CacheBucket& m_CacheBucket; std::unique_ptr<RwLock::SharedLockScope> m_IndexLock; HashSet m_UncachedReferences; }; @@ -2655,119 +3526,22 @@ public: std::vector<GcReferenceChecker*> ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) { + ZEN_TRACE_CPU("Z$::Disk::Bucket::CreateReferenceCheckers"); + Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } - ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': found {} references in {}", - m_BucketDir, - m_ReferenceCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': completed in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - std::vector<IoHash> UpdateKeys; - std::vector<IoHash> StandaloneKeys; - std::vector<size_t> ReferenceCounts; - std::vector<IoHash> References; - - // Refresh cache - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - for (const auto& Entry : m_Index) - { - size_t PayloadIndex = Entry.second; - const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Payloads[PayloadIndex]; - const DiskLocation& Loc = Payload.Location; - - if (!Loc.IsFlagSet(DiskLocation::kStructured)) - { - continue; - } - if (m_Configuration.EnableReferenceCaching && - m_FirstReferenceIndex[PayloadIndex] != ZenCacheDiskLayer::CacheBucket::ReferenceIndex::Unknown()) - { - continue; - } - const IoHash& Key = Entry.first; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - StandaloneKeys.push_back(Key); - continue; - } - IoBuffer Buffer = GetInlineCacheValue(Loc); - if (!Buffer) - { - UpdateKeys.push_back(Key); - ReferenceCounts.push_back(0); - continue; - } - size_t CurrentReferenceCount = References.size(); - { - CbObjectView Obj(Buffer.GetData()); - Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); - Buffer = {}; - } - UpdateKeys.push_back(Key); - ReferenceCounts.push_back(References.size() - CurrentReferenceCount); - } - } - { - for (const IoHash& Key : StandaloneKeys) - { - IoBuffer Buffer = GetStandaloneCacheValue(ZenContentType::kCbObject, Key); - if (!Buffer) - { - continue; - } - - size_t CurrentReferenceCount = References.size(); - { - CbObjectView Obj(Buffer.GetData()); - Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); - Buffer = {}; - } - UpdateKeys.push_back(Key); - ReferenceCounts.push_back(References.size() - CurrentReferenceCount); - } - } - { - size_t ReferenceOffset = 0; - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - if (!m_Configuration.EnableReferenceCaching) - { - ZEN_ASSERT(m_FirstReferenceIndex.empty()); - ZEN_ASSERT(m_ReferenceHashes.empty()); - ZEN_ASSERT(m_NextReferenceHashesIndexes.empty()); - ZEN_ASSERT(m_ReferenceCount == 0); - // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when - // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted. - m_FirstReferenceIndex.resize(m_Payloads.size()); - } - for (size_t Index = 0; Index < UpdateKeys.size(); Index++) - { - const IoHash& Key = UpdateKeys[Index]; - size_t ReferenceCount = ReferenceCounts[Index]; - auto It = m_Index.find(Key); - if (It == m_Index.end()) - { - ReferenceOffset += ReferenceCount; - continue; - } - if (m_FirstReferenceIndex[It->second] != ReferenceIndex::Unknown()) - { - continue; - } - SetReferences(IndexLock, - m_FirstReferenceIndex[It->second], - std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount}); - ReferenceOffset += ReferenceCount; - } - if (m_Configuration.EnableReferenceCaching) + RwLock::SharedLockScope __(m_IndexLock); + if (m_Index.empty()) { - CompactReferences(IndexLock); + return {}; } } @@ -2777,6 +3551,8 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) void ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&) { + ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactReferences"); + std::vector<ReferenceIndex> FirstReferenceIndex; std::vector<IoHash> NewReferenceHashes; std::vector<ReferenceIndex> NewNextReferenceHashesIndexes; @@ -2813,7 +3589,9 @@ ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&) } m_FirstReferenceIndex.swap(FirstReferenceIndex); m_ReferenceHashes.swap(NewReferenceHashes); + m_ReferenceHashes.shrink_to_fit(); m_NextReferenceHashesIndexes.swap(NewNextReferenceHashesIndexes); + m_NextReferenceHashesIndexes.shrink_to_fit(); m_ReferenceCount = m_ReferenceHashes.size(); } @@ -2940,24 +3718,24 @@ void ZenCacheDiskLayer::CacheBucket::ClearReferenceCache() { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - m_FirstReferenceIndex.clear(); - m_FirstReferenceIndex.shrink_to_fit(); - m_ReferenceHashes.clear(); - m_ReferenceHashes.shrink_to_fit(); - m_NextReferenceHashesIndexes.clear(); - m_NextReferenceHashesIndexes.shrink_to_fit(); + Reset(m_FirstReferenceIndex); + Reset(m_ReferenceHashes); + Reset(m_NextReferenceHashesIndexes); m_ReferenceCount = 0; } void -ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloads, +ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, + std::vector<BucketPayload>& Payloads, std::vector<AccessTime>& AccessTimes, std::vector<BucketMetaData>& MetaDatas, - std::vector<IoBuffer>& MemCachedPayloads, + std::vector<MemCacheData>& MemCachedPayloads, std::vector<ReferenceIndex>& FirstReferenceIndex, IndexMap& Index, RwLock::ExclusiveLockScope& IndexLock) { + ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactState"); + size_t EntryCount = m_Index.size(); Payloads.reserve(EntryCount); AccessTimes.reserve(EntryCount); @@ -2966,6 +3744,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloa FirstReferenceIndex.reserve(EntryCount); } Index.reserve(EntryCount); + Index.min_load_factor(IndexMinLoadFactor); + Index.max_load_factor(IndexMaxLoadFactor); for (auto It : m_Index) { PayloadIndex EntryIndex = PayloadIndex(Payloads.size()); @@ -2975,11 +3755,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloa if (Payload.MetaData) { MetaDatas.push_back(m_MetaDatas[Payload.MetaData]); - Payload.MetaData = MetaDataIndex(m_MetaDatas.size() - 1); + Payload.MetaData = MetaDataIndex(MetaDatas.size() - 1); } if (Payload.MemCached) { - MemCachedPayloads.push_back(std::move(m_MemCachedPayloads[Payload.MemCached])); + MemCachedPayloads.emplace_back( + MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex}); Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(MemCachedPayloads.size() - 1)); } if (m_Configuration.EnableReferenceCaching) @@ -2992,11 +3773,9 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloa m_Payloads.swap(Payloads); m_AccessTimes.swap(AccessTimes); m_MetaDatas.swap(MetaDatas); - m_FreeMetaDatas.clear(); - m_FreeMetaDatas.shrink_to_fit(); + Reset(m_FreeMetaDatas); m_MemCachedPayloads.swap(MemCachedPayloads); - m_FreeMemCachedPayloads.clear(); - m_FreeMetaDatas.shrink_to_fit(); + Reset(m_FreeMemCachedPayloads); if (m_Configuration.EnableReferenceCaching) { m_FirstReferenceIndex.swap(FirstReferenceIndex); @@ -3031,124 +3810,99 @@ ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const st ZenCacheDiskLayer::~ZenCacheDiskLayer() { -} - -bool -ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - ZEN_TRACE_CPU("Z$::Disk::Get"); - - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - + try { - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(BucketName); - - if (It != m_Buckets.end()) { - Bucket = It->second.get(); + RwLock::ExclusiveLockScope _(m_Lock); + for (auto& It : m_Buckets) + { + m_DroppedBuckets.emplace_back(std::move(It.second)); + } + m_Buckets.clear(); } + // We destroy the buckets without holding a lock since destructor calls GcManager::RemoveGcReferencer which takes an exclusive lock. + // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock + m_DroppedBuckets.clear(); } - - if (Bucket == nullptr) + catch (std::exception& Ex) { - // Bucket needs to be opened/created + ZEN_ERROR("~ZenCacheDiskLayer() failed. Reason: '{}'", Ex.what()); + } +} - RwLock::ExclusiveLockScope _(m_Lock); +ZenCacheDiskLayer::CacheBucket* +ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) +{ + ZEN_TRACE_CPU("Z$::Disk::GetOrCreateBucket"); + const auto BucketName = std::string(InBucket); + { + RwLock::SharedLockScope SharedLock(m_Lock); if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { - Bucket = It->second.get(); - } - else - { - auto InsertResult = - m_Buckets.emplace(BucketName, - std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); - Bucket = InsertResult.first->second.get(); - - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; - - if (!Bucket->OpenOrCreate(BucketPath)) - { - m_Buckets.erase(InsertResult.first); - return false; - } + return It->second.get(); } } - ZEN_ASSERT(Bucket != nullptr); - if (Bucket->Get(HashKey, OutValue)) + // We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock. + // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock + std::unique_ptr<CacheBucket> Bucket( + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); + + RwLock::ExclusiveLockScope Lock(m_Lock); + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { - TryMemCacheTrim(); - return true; + return It->second.get(); } - return false; -} - -void -ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) -{ - ZEN_TRACE_CPU("Z$::Disk::Put"); - - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName; + try { - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(BucketName); - - if (It != m_Buckets.end()) + if (!Bucket->OpenOrCreate(BucketPath)) { - Bucket = It->second.get(); + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + return nullptr; } } - - if (Bucket == nullptr) + catch (const std::exception& Err) { - // New bucket needs to be created + ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + throw; + } - RwLock::ExclusiveLockScope _(m_Lock); + CacheBucket* Result = Bucket.get(); + m_Buckets.emplace(BucketName, std::move(Bucket)); - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - else - { - auto InsertResult = - m_Buckets.emplace(BucketName, - std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); - Bucket = InsertResult.first->second.get(); + return Result; +} - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; +bool +ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) +{ + ZEN_TRACE_CPU("Z$::Disk::Get"); - try - { - if (!Bucket->OpenOrCreate(BucketPath)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - m_Buckets.erase(InsertResult.first); - return; - } - } - catch (const std::exception& Err) - { - ZEN_WARN("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - throw; - } + if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) + { + if (Bucket->Get(HashKey, OutValue)) + { + TryMemCacheTrim(); + return true; } } + return false; +} - ZEN_ASSERT(Bucket != nullptr); +void +ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) +{ + ZEN_TRACE_CPU("Z$::Disk::Put"); - Bucket->Put(HashKey, Value, References); - TryMemCacheTrim(); + if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) + { + Bucket->Put(HashKey, Value, References); + TryMemCacheTrim(); + } } void @@ -3208,11 +3962,8 @@ ZenCacheDiskLayer::DiscoverBuckets() RwLock SyncLock; - const size_t MaxHwTreadUse = std::thread::hardware_concurrency(); - const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, FoundBucketDirectories.size())); - - WorkerThreadPool Pool(WorkerThreadPoolCount); - Latch WorkLatch(1); + WorkerThreadPool& Pool = GetLargeWorkerPool(); + Latch WorkLatch(1); for (auto& BucketPath : FoundBucketDirectories) { WorkLatch.AddCount(1); @@ -3301,13 +4052,17 @@ void ZenCacheDiskLayer::Flush() { std::vector<CacheBucket*> Buckets; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (Buckets.empty()) + { + return; + } + ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); { - RwLock::SharedLockScope _(m_Lock); - if (m_Buckets.empty()) - { - return; - } + RwLock::SharedLockScope __(m_Lock); Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { @@ -3315,28 +4070,29 @@ ZenCacheDiskLayer::Flush() Buckets.push_back(Bucket); } } - const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); - const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, Buckets.size())); - - WorkerThreadPool Pool(WorkerThreadPoolCount); - Latch WorkLatch(1); - for (auto& Bucket : Buckets) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([&]() { - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); - Bucket->Flush(); - }); + WorkerThreadPool& Pool = GetSmallWorkerPool(); + Latch WorkLatch(1); + for (auto& Bucket : Buckets) + { + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + Bucket->Flush(); + }); + } + WorkLatch.CountDown(); + while (!WorkLatch.Wait(1000)) + { + ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); + } } - WorkLatch.CountDown(); - WorkLatch.Wait(); } void ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) { RwLock::SharedLockScope _(m_Lock); - { std::vector<std::future<void>> Results; Results.reserve(m_Buckets.size()); @@ -3457,19 +4213,21 @@ ZenCacheDiskLayer::EnumerateBucketContents(std::string_view CacheValueDetails::NamespaceDetails ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const { - RwLock::SharedLockScope _(m_Lock); CacheValueDetails::NamespaceDetails Details; - if (BucketFilter.empty()) { - Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); - for (auto& Kv : m_Buckets) + RwLock::SharedLockScope IndexLock(m_Lock); + if (BucketFilter.empty()) { - Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter); + Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); + for (auto& Kv : m_Buckets) + { + Details.Buckets[Kv.first] = Kv.second->GetValueDetails(IndexLock, ValueFilter); + } + } + else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) + { + Details.Buckets[It->first] = It->second->GetValueDetails(IndexLock, ValueFilter); } - } - else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) - { - Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter); } return Details; } @@ -3480,17 +4238,8 @@ ZenCacheDiskLayer::MemCacheTrim() ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim"); ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0); - - const GcClock::TimePoint Now = GcClock::Now(); - - const GcClock::Tick NowTick = Now.time_since_epoch().count(); - const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; - const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count(); - if (NowTick < NextAllowedTrimTick) - { - return; - } + ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0); + ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0); bool Expected = false; if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true)) @@ -3498,75 +4247,90 @@ ZenCacheDiskLayer::MemCacheTrim() return; } - // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong - const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_LastTickMemCacheTrim.store(NextTrimTick); + try + { + m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) { + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); + + const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); + uint64_t TrimmedSize = 0; + Stopwatch Timer; + const auto Guard = MakeGuard([&] { + ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", + NiceBytes(TrimmedSize), + NiceBytes(m_TotalMemCachedSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + + const GcClock::Tick NowTick = GcClock::TickCount(); + const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); + m_NextAllowedTrimTick.store(NextTrimTick); + m_IsMemCacheTrimming.store(false); + }); - m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) { - ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); + const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); - uint64_t StartSize = m_TotalMemCachedSize.load(); - Stopwatch Timer; - const auto Guard = MakeGuard([&] { - uint64_t EndSize = m_TotalMemCachedSize.load(); - ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", - NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0), - NiceBytes(m_TotalMemCachedSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - m_IsMemCacheTrimming.store(false); - }); - - const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); + static const size_t UsageSlotCount = 2048; + std::vector<uint64_t> UsageSlots; + UsageSlots.reserve(UsageSlotCount); - std::vector<uint64_t> UsageSlots; - UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count()); + std::vector<CacheBucket*> Buckets; + { + RwLock::SharedLockScope __(m_Lock); + Buckets.reserve(m_Buckets.size()); + for (auto& Kv : m_Buckets) + { + Buckets.push_back(Kv.second.get()); + } + } - std::vector<CacheBucket*> Buckets; - { - RwLock::SharedLockScope __(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) + const GcClock::TimePoint Now = GcClock::Now(); { - Buckets.push_back(Kv.second.get()); + ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess"); + for (CacheBucket* Bucket : Buckets) + { + Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots); + } } - } - for (CacheBucket* Bucket : Buckets) - { - Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots); - } - uint64_t TotalSize = 0; - for (size_t Index = 0; Index < UsageSlots.size(); ++Index) - { - TotalSize += UsageSlots[Index]; - if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) + uint64_t TotalSize = 0; + for (size_t Index = 0; Index < UsageSlots.size(); ++Index) { - GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index); - MemCacheTrim(Buckets, ExpireTime); - break; + TotalSize += UsageSlots[Index]; + if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) + { + GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount); + TrimmedSize = MemCacheTrim(Buckets, ExpireTime); + break; + } } - } - }); + }); + } + catch (std::exception& Ex) + { + ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what()); + m_IsMemCacheTrimming.store(false); + } } -void +uint64_t ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime) { if (m_Configuration.MemCacheTargetFootprintBytes == 0) { - return; + return 0; } - RwLock::SharedLockScope __(m_Lock); + uint64_t TrimmedSize = 0; for (CacheBucket* Bucket : Buckets) { - Bucket->MemCacheTrim(ExpireTime); + TrimmedSize += Bucket->MemCacheTrim(ExpireTime); } const GcClock::TimePoint Now = GcClock::Now(); const GcClock::Tick NowTick = Now.time_since_epoch().count(); const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim; + GcClock::Tick LastTrimTick = m_NextAllowedTrimTick; const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); + m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); + return TrimmedSize; } #if ZEN_WITH_TESTS |