diff options
| author | Stefan Boberg <[email protected]> | 2023-12-19 21:49:55 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-19 21:49:55 +0100 |
| commit | 8a90a40b4517d198855c1e52740a7e7fb21ecc20 (patch) | |
| tree | ffd8524bcebf8841b69725934f31d438a03e7746 /src/zenserver/cache/cachedisklayer.cpp | |
| parent | 0.2.38 (diff) | |
| download | zen-8a90a40b4517d198855c1e52740a7e7fb21ecc20.tar.xz zen-8a90a40b4517d198855c1e52740a7e7fb21ecc20.zip | |
move cachedisklayer and structuredcachestore into zenstore (#624)
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 4381 |
1 files changed, 0 insertions, 4381 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp deleted file mode 100644 index 8d046105d..000000000 --- a/src/zenserver/cache/cachedisklayer.cpp +++ /dev/null @@ -1,4381 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "cachedisklayer.h" - -#include <zencore/compactbinary.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/compress.h> -#include <zencore/except.h> -#include <zencore/fmtutils.h> -#include <zencore/jobqueue.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zencore/trace.h> -#include <zencore/workthreadpool.h> -#include <zencore/xxhash.h> -#include <zenutil/workerpools.h> - -#include <future> - -////////////////////////////////////////////////////////////////////////// - -namespace zen { -namespace { - -#pragma pack(push) -#pragma pack(1) - - struct CacheBucketIndexHeader - { - static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; - static constexpr uint32_t Version2 = 2; - static constexpr uint32_t CurrentVersion = Version2; - - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - uint64_t EntryCount = 0; - uint64_t LogPosition = 0; - uint32_t PayloadAlignment = 0; - uint32_t Checksum = 0; - - static uint32_t ComputeChecksum(const CacheBucketIndexHeader& Header) - { - return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); - } - - bool IsValid() const - { - if (Magic != ExpectedMagic) - { - return false; - } - - if (Checksum != ComputeChecksum(*this)) - { - return false; - } - - if (PayloadAlignment == 0) - { - return false; - } - - return true; - } - }; - - static_assert(sizeof(CacheBucketIndexHeader) == 32); - - struct BucketMetaHeader - { - static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta'; - static constexpr uint32_t Version1 = 1; - static constexpr uint32_t CurrentVersion = Version1; - - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - uint64_t EntryCount = 0; - uint64_t LogPosition = 0; - uint32_t Padding = 0; - uint32_t Checksum = 0; - - static uint32_t ComputeChecksum(const BucketMetaHeader& Header) - { - return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA); - } - - bool IsValid() const - { - if (Magic != ExpectedMagic) - { - return false; - } - - if (Checksum != ComputeChecksum(*this)) - { - return false; - } - - if (Padding != 0) - { - return false; - } - - return true; - } - }; - - static_assert(sizeof(BucketMetaHeader) == 32); - -#pragma pack(pop) - - ////////////////////////////////////////////////////////////////////////// - - template<typename T> - void Reset(T& V) - { - T Tmp; - V.swap(Tmp); - } - - const char* IndexExtension = ".uidx"; - const char* LogExtension = ".slog"; - const char* MetaExtension = ".meta"; - - std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - return BucketDir / (BucketName + IndexExtension); - } - - std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - return BucketDir / (BucketName + MetaExtension); - } - - std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - return BucketDir / (BucketName + LogExtension); - } - - std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName) - { - ZEN_UNUSED(BucketName); - return BucketDir / "zen_manifest"; - } - - bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) - { - if (Entry.Key == IoHash::Zero) - { - OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); - return false; - } - if (Entry.Location.Reserved != 0) - { - OutReason = fmt::format("Reserved field non-zero ({}) for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); - return false; - } - if (Entry.Location.GetFlags() & - ~(DiskLocation::kStandaloneFile | DiskLocation::kStructured | DiskLocation::kTombStone | DiskLocation::kCompressed)) - { - OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.GetFlags(), Entry.Key.ToHexString()); - return false; - } - if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) - { - return true; - } - if (Entry.Location.Reserved != 0) - { - OutReason = fmt::format("Invalid reserved field {} for entry {}", Entry.Location.Reserved, Entry.Key.ToHexString()); - return false; - } - uint64_t Size = Entry.Location.Size(); - if (Size == 0) - { - OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); - return false; - } - return true; - } - - bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) - { - int DropIndex = 0; - do - { - if (!std::filesystem::exists(Dir)) - { - return false; - } - - std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); - std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; - if (std::filesystem::exists(DroppedBucketPath)) - { - DropIndex++; - continue; - } - - std::error_code Ec; - std::filesystem::rename(Dir, DroppedBucketPath, Ec); - if (!Ec) - { - DeleteDirectories(DroppedBucketPath); - return true; - } - // TODO: Do we need to bail at some point? - zen::Sleep(100); - } while (true); - } -} // namespace - -namespace fs = std::filesystem; -using namespace std::literals; - -class BucketManifestSerializer -{ - using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; - using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData; - - using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; - using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; - -public: - // We use this to indicate if a on disk bucket needs wiping - // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references - // to block items. - // See: https://github.com/EpicGames/zen/pull/299 - static inline const uint32_t CurrentDiskBucketVersion = 1; - - bool Open(std::filesystem::path ManifestPath) - { - Manifest = LoadCompactBinaryObject(ManifestPath); - return !!Manifest; - } - - Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); } - - bool IsCurrentVersion(uint32_t& OutVersion) const - { - OutVersion = Manifest["Version"sv].AsUInt32(0); - return OutVersion == CurrentDiskBucketVersion; - } - - void ParseManifest(RwLock::ExclusiveLockScope& BucketLock, - ZenCacheDiskLayer::CacheBucket& Bucket, - std::filesystem::path ManifestPath, - ZenCacheDiskLayer::CacheBucket::IndexMap& Index, - std::vector<AccessTime>& AccessTimes, - std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads); - - Oid GenerateNewManifest(std::filesystem::path ManifestPath); - - IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount); - uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); } - void WriteSidecarFile(RwLock::SharedLockScope& BucketLock, - const std::filesystem::path& SidecarPath, - uint64_t SnapshotLogPosition, - const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, - const std::vector<AccessTime>& AccessTimes, - const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, - const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas); - bool ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock, - ZenCacheDiskLayer::CacheBucket& Bucket, - std::filesystem::path SidecarPath, - ZenCacheDiskLayer::CacheBucket::IndexMap& Index, - std::vector<AccessTime>& AccessTimes, - std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads); - - IoBuffer MakeManifest(const Oid& BucketId, - ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, - std::vector<AccessTime>&& AccessTimes, - std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>&& Payloads, - std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>&& MetaDatas); - - CbObject Manifest; - -private: - CbObject LoadCompactBinaryObject(const fs::path& Path) - { - FileContents Result = ReadFile(Path); - - if (!Result.ErrorCode) - { - IoBuffer Buffer = Result.Flatten(); - if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) - { - return zen::LoadCompactBinaryObject(Buffer); - } - } - - return CbObject(); - } - - uint64_t m_ManifestEntryCount = 0; - - struct ManifestData - { - IoHash Key; // 20 - AccessTime Timestamp; // 4 - IoHash RawHash; // 20 - uint32_t Padding_0; // 4 - size_t RawSize; // 8 - uint64_t Padding_1; // 8 - }; - - static_assert(sizeof(ManifestData) == 64); -}; - -void -BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& BucketLock, - ZenCacheDiskLayer::CacheBucket& Bucket, - std::filesystem::path ManifestPath, - ZenCacheDiskLayer::CacheBucket::IndexMap& Index, - std::vector<AccessTime>& AccessTimes, - std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads) -{ - if (Manifest["UsingMetaFile"sv].AsBool()) - { - ReadSidecarFile(BucketLock, Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads); - - return; - } - - ZEN_TRACE_CPU("Z$::ParseManifest"); - - Stopwatch Timer; - const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - - const uint64_t Count = Manifest["Count"sv].AsUInt64(0); - std::vector<PayloadIndex> KeysIndexes; - KeysIndexes.reserve(Count); - - CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); - for (CbFieldView& KeyView : KeyArray) - { - if (auto It = Index.find(KeyView.AsHash()); It != Index.end()) - { - KeysIndexes.push_back(It.value()); - } - else - { - KeysIndexes.push_back(PayloadIndex()); - } - } - - size_t KeyIndexOffset = 0; - CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); - for (CbFieldView& TimeStampView : TimeStampArray) - { - const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; - if (KeyIndex) - { - AccessTimes[KeyIndex] = TimeStampView.AsInt64(); - } - } - - KeyIndexOffset = 0; - CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); - CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); - if (RawHashArray.Num() == RawSizeArray.Num()) - { - auto RawHashIt = RawHashArray.CreateViewIterator(); - auto RawSizeIt = RawSizeArray.CreateViewIterator(); - while (RawHashIt != CbFieldViewIterator()) - { - const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; - - if (KeyIndex) - { - uint64_t RawSize = RawSizeIt.AsUInt64(); - IoHash RawHash = RawHashIt.AsHash(); - if (RawSize != 0 || RawHash != IoHash::Zero) - { - BucketPayload& Payload = Payloads[KeyIndex]; - Bucket.SetMetaData(BucketLock, Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); - } - } - - RawHashIt++; - RawSizeIt++; - } - } - else - { - ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); - } -} - -Oid -BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath) -{ - const Oid BucketId = Oid::NewOid(); - - CbObjectWriter Writer; - Writer << "BucketId"sv << BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; - Manifest = Writer.Save(); - WriteFile(ManifestPath, Manifest.GetBuffer().AsIoBuffer()); - - return BucketId; -} - -IoBuffer -BucketManifestSerializer::MakeManifest(const Oid& BucketId, - ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, - std::vector<AccessTime>&& AccessTimes, - std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>&& Payloads, - std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>&& MetaDatas) -{ - using namespace std::literals; - - ZEN_TRACE_CPU("Z$::MakeManifest"); - - size_t ItemCount = Index.size(); - - // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth - // And we don't need to reallocate the underlying buffer in almost every case - const size_t EstimatedSizePerItem = 54u; - const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); - CbObjectWriter Writer(ReserveSize); - - Writer << "BucketId"sv << BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; - - if (!Index.empty()) - { - Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size())); - Writer.BeginArray("Keys"sv); - for (auto& Kv : Index) - { - const IoHash& Key = Kv.first; - Writer.AddHash(Key); - } - Writer.EndArray(); - - Writer.BeginArray("Timestamps"sv); - for (auto& Kv : Index) - { - GcClock::Tick AccessTime = AccessTimes[Kv.second]; - Writer.AddInteger(AccessTime); - } - Writer.EndArray(); - - if (!MetaDatas.empty()) - { - Writer.BeginArray("RawHash"sv); - for (auto& Kv : Index) - { - const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; - if (Payload.MetaData) - { - Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); - } - else - { - Writer.AddHash(IoHash::Zero); - } - } - Writer.EndArray(); - - Writer.BeginArray("RawSize"sv); - for (auto& Kv : Index) - { - const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; - if (Payload.MetaData) - { - Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); - } - else - { - Writer.AddInteger(0); - } - } - Writer.EndArray(); - } - } - - Manifest = Writer.Save(); - return Manifest.GetBuffer().AsIoBuffer(); -} - -IoBuffer -BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount) -{ - m_ManifestEntryCount = EntryCount; - - CbObjectWriter Writer; - Writer << "BucketId"sv << BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; - Writer << "Count"sv << EntryCount; - Writer << "UsingMetaFile"sv << true; - Manifest = Writer.Save(); - - return Manifest.GetBuffer().AsIoBuffer(); -} - -bool -BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock, - ZenCacheDiskLayer::CacheBucket& Bucket, - std::filesystem::path SidecarPath, - ZenCacheDiskLayer::CacheBucket::IndexMap& Index, - std::vector<AccessTime>& AccessTimes, - std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads) -{ - ZEN_TRACE_CPU("Z$::ReadSidecarFile"); - - ZEN_ASSERT(AccessTimes.size() == Payloads.size()); - - std::error_code Ec; - - BasicFile SidecarFile; - SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec); - - if (Ec) - { - throw std::system_error(Ec, fmt::format("failed to open sidecar file '{}'", SidecarPath)); - } - - uint64_t FileSize = SidecarFile.FileSize(); - - auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); }); - - if (FileSize < sizeof(BucketMetaHeader)) - { - return false; - } - - BasicFileBuffer Sidecar(SidecarFile, 128 * 1024); - - BucketMetaHeader Header; - Sidecar.Read(&Header, sizeof Header, 0); - - if (!Header.IsValid()) - { - return false; - } - - if (Header.Version != BucketMetaHeader::Version1) - { - return false; - } - - const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData); - if (Header.EntryCount > ExpectedEntryCount) - { - return false; - } - - InvalidGuard.Dismiss(); - - uint64_t RemainingEntryCount = ExpectedEntryCount; - uint64_t EntryCount = 0; - uint64_t CurrentReadOffset = sizeof(Header); - - while (RemainingEntryCount--) - { - const ManifestData* Entry = Sidecar.MakeView<ManifestData>(CurrentReadOffset); - CurrentReadOffset += sizeof(ManifestData); - - if (auto It = Index.find(Entry->Key); It != Index.end()) - { - PayloadIndex PlIndex = It.value(); - - ZEN_ASSERT(size_t(PlIndex) <= Payloads.size()); - - ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex]; - - AccessTimes[PlIndex] = Entry->Timestamp; - - if (Entry->RawSize && Entry->RawHash != IoHash::Zero) - { - Bucket.SetMetaData(BucketLock, PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash}); - } - } - - EntryCount++; - } - - ZEN_ASSERT(EntryCount == ExpectedEntryCount); - - return true; -} - -void -BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&, - const std::filesystem::path& SidecarPath, - uint64_t SnapshotLogPosition, - const ZenCacheDiskLayer::CacheBucket::IndexMap& Index, - const std::vector<AccessTime>& AccessTimes, - const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, - const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas) -{ - ZEN_TRACE_CPU("Z$::WriteSidecarFile"); - - BucketMetaHeader Header; - Header.EntryCount = m_ManifestEntryCount; - Header.LogPosition = SnapshotLogPosition; - Header.Checksum = Header.ComputeChecksum(Header); - - std::error_code Ec; - - TemporaryFile SidecarFile; - SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec); - - if (Ec) - { - throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath())); - } - - SidecarFile.Write(&Header, sizeof Header, 0); - - // TODO: make this batching for better performance - { - uint64_t WriteOffset = sizeof Header; - - // BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024); - - std::vector<ManifestData> ManifestDataBuffer; - const size_t MaxManifestDataBufferCount = Min(Index.size(), 4096u); // 256 Kb - ManifestDataBuffer.reserve(MaxManifestDataBufferCount); - for (auto& Kv : Index) - { - const IoHash& Key = Kv.first; - const PayloadIndex PlIndex = Kv.second; - - IoHash RawHash = IoHash::Zero; - uint64_t RawSize = 0; - - if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData) - { - RawHash = MetaDatas[MetaIndex].RawHash; - RawSize = MetaDatas[MetaIndex].RawSize; - } - - ManifestDataBuffer.emplace_back(ManifestData{.Key = Key, - .Timestamp = AccessTimes[PlIndex], - .RawHash = RawHash, - .Padding_0 = 0, - .RawSize = RawSize, - .Padding_1 = 0}); - if (ManifestDataBuffer.size() == MaxManifestDataBufferCount) - { - const uint64_t WriteSize = sizeof(ManifestData) * ManifestDataBuffer.size(); - SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset); - WriteOffset += WriteSize; - ManifestDataBuffer.clear(); - ManifestDataBuffer.reserve(MaxManifestDataBufferCount); - } - } - if (ManifestDataBuffer.size() > 0) - { - SidecarFile.Write(ManifestDataBuffer.data(), sizeof(ManifestData) * ManifestDataBuffer.size(), WriteOffset); - } - } - - SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec); - - if (Ec) - { - throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath)); - } -} - -////////////////////////////////////////////////////////////////////////// - -static const float IndexMinLoadFactor = 0.2f; -static const float IndexMaxLoadFactor = 0.7f; - -ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, - std::atomic_uint64_t& OuterCacheMemoryUsage, - std::string BucketName, - const BucketConfiguration& Config) -: m_Gc(Gc) -, m_OuterCacheMemoryUsage(OuterCacheMemoryUsage) -, m_BucketName(std::move(BucketName)) -, m_Configuration(Config) -, m_BucketId(Oid::Zero) -{ - m_Index.min_load_factor(IndexMinLoadFactor); - m_Index.max_load_factor(IndexMaxLoadFactor); - - if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap"))) - { - const uint64_t LegacyOverrideSize = 16 * 1024 * 1024; - - // This is pretty ad hoc but in order to avoid too many individual files - // it makes sense to have a different strategy for legacy values - m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, LegacyOverrideSize); - } - m_Gc.AddGcReferencer(*this); -} - -ZenCacheDiskLayer::CacheBucket::~CacheBucket() -{ - m_Gc.RemoveGcReferencer(*this); -} - -bool -ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) -{ - using namespace std::literals; - - ZEN_TRACE_CPU("Z$::Bucket::OpenOrCreate"); - ZEN_ASSERT(m_IsFlushing.load()); - - // We want to take the lock here since we register as a GC referencer a construction - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - - ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir); - - m_BlocksBasePath = BucketDir / "blocks"; - m_BucketDir = BucketDir; - - CreateDirectories(m_BucketDir); - - std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); - - bool IsNew = false; - - BucketManifestSerializer ManifestReader; - - if (ManifestReader.Open(ManifestPath)) - { - m_BucketId = ManifestReader.GetBucketId(); - if (m_BucketId == Oid::Zero) - { - return false; - } - - uint32_t Version = 0; - if (ManifestReader.IsCurrentVersion(/* out */ Version) == false) - { - ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", - BucketDir, - Version, - BucketManifestSerializer::CurrentDiskBucketVersion); - IsNew = true; - } - } - else if (AllowCreate) - { - m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath); - IsNew = true; - } - else - { - return false; - } - - InitializeIndexFromDisk(IndexLock, IsNew); - - auto _ = MakeGuard([&]() { - // We are now initialized, allow flushing when we exit - m_IsFlushing.store(false); - }); - - if (IsNew) - { - return true; - } - - ManifestReader.ParseManifest(IndexLock, *this, ManifestPath, m_Index, m_AccessTimes, m_Payloads); - - return true; -} - -void -ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc) -{ - ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot"); - - const uint64_t LogCount = m_SlogFile.GetLogCount(); - if (m_LogFlushPosition == LogCount) - { - return; - } - - ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); - const uint64_t EntryCount = m_Index.size(); - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", - m_BucketDir, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - namespace fs = std::filesystem; - - fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - - try - { - const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); - if (Error) - { - throw std::system_error(Error, fmt::format("get disk space in '{}' FAILED", m_BucketDir)); - } - - bool EnoughSpace = Space.Free >= IndexSize + 1024 * 512; - if (!EnoughSpace) - { - uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); - EnoughSpace = (Space.Free + ReclaimedSpace) >= IndexSize + 1024 * 512; - } - if (!EnoughSpace) - { - throw std::runtime_error( - fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize))); - } - - TemporaryFile ObjectIndexFile; - std::error_code Ec; - ObjectIndexFile.CreateTemporary(m_BucketDir, Ec); - if (Ec) - { - throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); - } - - { - // This is in a separate scope just to ensure IndexWriter goes out - // of scope before the file is flushed/closed, in order to ensure - // all data is written to the file - BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024); - - CacheBucketIndexHeader Header = {.EntryCount = EntryCount, - .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; - - Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); - IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0); - - uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader); - - for (auto& Entry : m_Index) - { - DiskIndexEntry IndexEntry; - IndexEntry.Key = Entry.first; - IndexEntry.Location = m_Payloads[Entry.second].Location; - IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset); - - IndexWriteOffset += sizeof(DiskIndexEntry); - } - - IndexWriter.Flush(); - } - - ObjectIndexFile.Flush(); - ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); - if (Ec) - { - std::filesystem::path TempFilePath = ObjectIndexFile.GetPath(); - ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message()); - - if (std::filesystem::is_regular_file(TempFilePath)) - { - if (!std::filesystem::remove(TempFilePath, Ec) || Ec) - { - ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempFilePath, Ec.message()); - } - } - } - else - { - // We must only update the log flush position once the snapshot write succeeds - m_LogFlushPosition = LogCount; - } - } - catch (std::exception& Err) - { - ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); - } -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion) -{ - ZEN_TRACE_CPU("Z$::Bucket::ReadIndexFile"); - - if (!std::filesystem::is_regular_file(IndexPath)) - { - return 0; - } - - auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); }); - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t FileSize = ObjectIndexFile.FileSize(); - if (FileSize < sizeof(CacheBucketIndexHeader)) - { - return 0; - } - - CacheBucketIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - - if (!Header.IsValid()) - { - return 0; - } - - if (Header.Version != CacheBucketIndexHeader::Version2) - { - return 0; - } - - const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); - if (Header.EntryCount > ExpectedEntryCount) - { - return 0; - } - - InvalidGuard.Dismiss(); - - size_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - m_Configuration.PayloadAlignment = Header.PayloadAlignment; - - m_Payloads.reserve(Header.EntryCount); - m_Index.reserve(Header.EntryCount); - - BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024); - - uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader); - uint64_t RemainingEntryCount = Header.EntryCount; - - std::string InvalidEntryReason; - while (RemainingEntryCount--) - { - const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset); - CurrentReadOffset += sizeof(DiskIndexEntry); - - if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - - const PayloadIndex EntryIndex = PayloadIndex(EntryCount); - m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location}); - m_Index.insert_or_assign(Entry->Key, EntryIndex); - - EntryCount++; - } - - ZEN_ASSERT(EntryCount == m_Payloads.size()); - - m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount())); - - if (m_Configuration.EnableReferenceCaching) - { - m_FirstReferenceIndex.resize(EntryCount); - } - - OutVersion = CacheBucketIndexHeader::Version2; - return Header.LogPosition; -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount) -{ - ZEN_TRACE_CPU("Z$::Bucket::ReadLog"); - - if (!std::filesystem::is_regular_file(LogPath)) - { - return 0; - } - - uint64_t LogEntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - TCasLogFile<DiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - if (!CasLog.Initialize()) - { - return 0; - } - - const uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); - SkipEntryCount = 0; - } - - LogEntryCount = EntryCount - SkipEntryCount; - uint64_t InvalidEntryCount = 0; - - CasLog.Replay( - [&](const DiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Location.Flags & DiskLocation::kTombStone) - { - // Note: this leaves m_Payloads and other arrays with 'holes' in them - m_Index.erase(Record.Key); - return; - } - - if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); - ++InvalidEntryCount; - return; - } - PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); - m_Index.insert_or_assign(Record.Key, EntryIndex); - }, - SkipEntryCount); - - m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); - - if (m_Configuration.EnableReferenceCaching) - { - m_FirstReferenceIndex.resize(m_Payloads.size()); - } - - if (InvalidEntryCount) - { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); - } - - return LogEntryCount; -}; - -void -ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew) -{ - ZEN_TRACE_CPU("Z$::Bucket::Initialize"); - - m_StandaloneSize = 0; - - m_Index.clear(); - m_Payloads.clear(); - m_AccessTimes.clear(); - m_MetaDatas.clear(); - m_FreeMetaDatas.clear(); - m_MemCachedPayloads.clear(); - m_FreeMemCachedPayloads.clear(); - m_FirstReferenceIndex.clear(); - m_ReferenceHashes.clear(); - m_NextReferenceHashesIndexes.clear(); - m_ReferenceCount = 0; - - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); - std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - - if (IsNew) - { - fs::remove(LogPath); - fs::remove(IndexPath); - fs::remove_all(m_BlocksBasePath); - } - - CreateDirectories(m_BucketDir); - - m_BlockStore.Initialize(m_BlocksBasePath, m_Configuration.MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1); - - if (std::filesystem::is_regular_file(IndexPath)) - { - uint32_t IndexVersion = 0; - m_LogFlushPosition = ReadIndexFile(IndexLock, IndexPath, IndexVersion); - if (IndexVersion == 0) - { - ZEN_WARN("removing invalid index file at '{}'", IndexPath); - std::filesystem::remove(IndexPath); - } - } - - uint64_t LogEntryCount = 0; - if (std::filesystem::is_regular_file(LogPath)) - { - if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath)) - { - LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition); - } - else if (fs::is_regular_file(LogPath)) - { - ZEN_WARN("removing invalid log at '{}'", LogPath); - std::filesystem::remove(LogPath); - } - } - - m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - - BlockStore::BlockIndexSet KnownBlocks; - for (const auto& Entry : m_Index) - { - size_t EntryIndex = Entry.second; - const BucketPayload& Payload = m_Payloads[EntryIndex]; - const DiskLocation& Location = Payload.Location; - - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); - } - else - { - const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); - KnownBlocks.Add(BlockLocation.BlockIndex); - } - } - m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); - - if (IsNew || LogEntryCount > 0) - { - WriteIndexSnapshot(IndexLock); - } -} - -void -ZenCacheDiskLayer::CacheBucket::BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const -{ - char HexString[sizeof(HashKey.Hash) * 2]; - ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); - - Path.Append(m_BucketDir); - Path.AppendSeparator(); - Path.Append(L"blob"); - Path.AppendSeparator(); - Path.AppendAsciiRange(HexString, HexString + 3); - Path.AppendSeparator(); - Path.AppendAsciiRange(HexString + 3, HexString + 5); - Path.AppendSeparator(); - Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); -} - -IoBuffer -ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) const -{ - ZEN_TRACE_CPU("Z$::Bucket::GetInlineCacheValue"); - - BlockStoreLocation Location = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); - - IoBuffer Value = m_BlockStore.TryGetChunk(Location); - if (Value) - { - Value.SetContentType(Loc.GetContentType()); - } - - return Value; -} - -IoBuffer -ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const -{ - ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue"); - - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - - RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); - - if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) - { - Data.SetContentType(ContentType); - - return Data; - } - - return {}; -} - -bool -ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) -{ - ZEN_TRACE_CPU("Z$::Bucket::Get"); - - metrics::RequestStats::Scope StatsScope(m_GetOps, 0); - - RwLock::SharedLockScope IndexLock(m_IndexLock); - auto It = m_Index.find(HashKey); - if (It == m_Index.end()) - { - m_DiskMissCount++; - if (m_Configuration.MemCacheSizeThreshold > 0) - { - m_MemoryMissCount++; - } - return false; - } - - PayloadIndex EntryIndex = It.value(); - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - DiskLocation Location = m_Payloads[EntryIndex].Location; - - bool FillRawHashAndRawSize = (!Location.IsFlagSet(DiskLocation::kStructured)) && (Location.Size() > 0); - - const BucketPayload* Payload = &m_Payloads[EntryIndex]; - if (Payload->MetaData) - { - const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; - OutValue.RawHash = MetaData.RawHash; - OutValue.RawSize = MetaData.RawSize; - FillRawHashAndRawSize = false; - } - - if (Payload->MemCached) - { - OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload; - Payload = nullptr; - IndexLock.ReleaseNow(); - m_MemoryHitCount++; - } - else - { - Payload = nullptr; - IndexLock.ReleaseNow(); - if (m_Configuration.MemCacheSizeThreshold > 0) - { - m_MemoryMissCount++; - } - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey); - } - else - { - OutValue.Value = GetInlineCacheValue(Location); - if (m_Configuration.MemCacheSizeThreshold > 0) - { - size_t ValueSize = OutValue.Value.GetSize(); - if (OutValue.Value && ValueSize <= m_Configuration.MemCacheSizeThreshold) - { - ZEN_TRACE_CPU("Z$::Bucket::Get::MemCache"); - OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value); - RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); - if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end()) - { - BucketPayload& WritePayload = m_Payloads[UpdateIt->second]; - // Only update if it has not already been updated by other thread - if (!WritePayload.MemCached) - { - SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value); - } - } - } - } - } - } - - if (FillRawHashAndRawSize) - { - ZEN_TRACE_CPU("Z$::Bucket::Get::MetaData"); - if (Location.IsFlagSet(DiskLocation::kCompressed)) - { - if (!CompressedBuffer::ValidateCompressedHeader(OutValue.Value, OutValue.RawHash, OutValue.RawSize)) - { - OutValue = ZenCacheValue{}; - m_DiskMissCount++; - return false; - } - } - else - { - OutValue.RawHash = IoHash::HashBuffer(OutValue.Value); - OutValue.RawSize = OutValue.Value.GetSize(); - } - RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock); - if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end()) - { - BucketPayload& WritePayload = m_Payloads[WriteIt.value()]; - - // Only set if no other path has already updated the meta data - if (!WritePayload.MetaData) - { - SetMetaData(UpdateIndexLock, WritePayload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash}); - } - } - } - if (OutValue.Value) - { - m_DiskHitCount++; - StatsScope.SetBytes(OutValue.Value.GetSize()); - return true; - } - else - { - m_DiskMissCount++; - return false; - } -} - -void -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) -{ - ZEN_TRACE_CPU("Z$::Bucket::Put"); - - metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); - - if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) - { - PutStandaloneCacheValue(HashKey, Value, References); - } - else - { - PutInlineCacheValue(HashKey, Value, References); - } - - m_DiskWriteCount++; -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime) -{ - ZEN_TRACE_CPU("Z$::Bucket::MemCacheTrim"); - - uint64_t Trimmed = 0; - GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); - if (MemCachedCount == 0) - { - return 0; - } - - uint32_t WriteIndex = 0; - for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) - { - MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; - if (!Data.Payload) - { - continue; - } - PayloadIndex Index = Data.OwnerIndex; - ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); - GcClock::Tick AccessTime = m_AccessTimes[Index]; - if (AccessTime < ExpireTicks) - { - size_t PayloadSize = Data.Payload.GetSize(); - RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); - Data = {}; - m_Payloads[Index].MemCached = {}; - Trimmed += PayloadSize; - continue; - } - if (ReadIndex > WriteIndex) - { - m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index}; - m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex); - } - WriteIndex++; - } - m_MemCachedPayloads.resize(WriteIndex); - m_MemCachedPayloads.shrink_to_fit(); - zen::Reset(m_FreeMemCachedPayloads); - return Trimmed; -} - -void -ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots) -{ - ZEN_TRACE_CPU("Z$::Bucket::GetUsageByAccess"); - - size_t SlotCount = InOutUsageSlots.capacity(); - RwLock::SharedLockScope _(m_IndexLock); - uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size()); - if (MemCachedCount == 0) - { - return; - } - for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex) - { - MemCacheData& Data = m_MemCachedPayloads[ReadIndex]; - if (!Data.Payload) - { - continue; - } - PayloadIndex Index = Data.OwnerIndex; - ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex)); - GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index])); - GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0); - size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1); - ZEN_ASSERT_SLOW(Slot < SlotCount); - if (Slot >= InOutUsageSlots.size()) - { - InOutUsageSlots.resize(Slot + 1, 0); - } - InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize()); - } -} - -bool -ZenCacheDiskLayer::CacheBucket::Drop() -{ - ZEN_TRACE_CPU("Z$::Bucket::Drop"); - - RwLock::ExclusiveLockScope _(m_IndexLock); - - std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks; - ShardLocks.reserve(256); - for (RwLock& Lock : m_ShardedLocks) - { - ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock)); - } - m_BlockStore.Close(); - m_SlogFile.Close(); - - bool Deleted = MoveAndDeleteDirectory(m_BucketDir); - - m_Index.clear(); - m_Payloads.clear(); - m_AccessTimes.clear(); - m_MetaDatas.clear(); - m_FreeMetaDatas.clear(); - m_MemCachedPayloads.clear(); - m_FreeMemCachedPayloads.clear(); - m_FirstReferenceIndex.clear(); - m_ReferenceHashes.clear(); - m_NextReferenceHashesIndexes.clear(); - m_ReferenceCount = 0; - m_StandaloneSize.store(0); - m_OuterCacheMemoryUsage.fetch_sub(m_MemCachedSize.load()); - m_MemCachedSize.store(0); - - return Deleted; -} - -void -ZenCacheDiskLayer::CacheBucket::Flush() -{ - ZEN_TRACE_CPU("Z$::Bucket::Flush"); - bool Expected = false; - if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) - { - return; - } - auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - - ZEN_INFO("Flushing bucket {}", m_BucketDir); - - try - { - m_BlockStore.Flush(/*ForceNewBlock*/ false); - m_SlogFile.Flush(); - - SaveSnapshot(); - } - catch (std::exception& Ex) - { - ZEN_WARN("Failed to flush bucket in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); - } -} - -void -ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc) -{ - ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot"); - try - { - bool UseLegacyScheme = false; - - IoBuffer Buffer; - BucketManifestSerializer ManifestWriter; - - if (UseLegacyScheme) - { - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - std::vector<BucketMetaData> MetaDatas; - IndexMap Index; - - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - WriteIndexSnapshot(IndexLock); - // Note: this copy could be eliminated on shutdown to - // reduce memory usage and execution time - Index = m_Index; - Payloads = m_Payloads; - AccessTimes = m_AccessTimes; - MetaDatas = m_MetaDatas; - } - - Buffer = ManifestWriter.MakeManifest(m_BucketId, - std::move(Index), - std::move(AccessTimes), - std::move(Payloads), - std::move(MetaDatas)); - const uint64_t RequiredSpace = Buffer.GetSize() + 1024 * 512; - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); - if (Error) - { - ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); - return; - } - bool EnoughSpace = Space.Free >= RequiredSpace; - if (!EnoughSpace) - { - uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); - EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; - } - if (!EnoughSpace) - { - ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", - m_BucketDir, - NiceBytes(Buffer.GetSize())); - return; - } - } - else - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - WriteIndexSnapshot(IndexLock); - const uint64_t EntryCount = m_Index.size(); - Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); - uint64_t SidecarSize = ManifestWriter.GetSidecarSize(); - - const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512; - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); - if (Error) - { - ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); - return; - } - bool EnoughSpace = Space.Free >= RequiredSpace; - if (!EnoughSpace) - { - uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); - EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; - } - if (!EnoughSpace) - { - ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", - m_BucketDir, - NiceBytes(Buffer.GetSize())); - return; - } - - ManifestWriter.WriteSidecarFile(IndexLock, - GetMetaPath(m_BucketDir, m_BucketName), - m_LogFlushPosition, - m_Index, - m_AccessTimes, - m_Payloads, - m_MetaDatas); - } - - std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); - WriteFile(ManifestPath, Buffer); - } - catch (std::exception& Err) - { - ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); - } -} - -IoHash -HashBuffer(const CompositeBuffer& Buffer) -{ - IoHashStream Hasher; - - for (const SharedBuffer& Segment : Buffer.GetSegments()) - { - Hasher.Append(Segment.GetView()); - } - - return Hasher.GetHash(); -} - -bool -ValidateCacheBucketEntryValue(ZenContentType ContentType, IoBuffer Buffer) -{ - ZEN_ASSERT_SLOW(Buffer.GetContentType() == ContentType); - - if (ContentType == ZenContentType::kCbObject) - { - CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); - - if (Error == CbValidateError::None) - { - return true; - } - - ZEN_SCOPED_ERROR("compact binary validation failed: '{}'", ToString(Error)); - - return false; - } - else if (ContentType == ZenContentType::kCompressedBinary) - { - IoBuffer MemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Buffer); - - IoHash HeaderRawHash; - uint64_t RawSize = 0; - if (!CompressedBuffer::ValidateCompressedHeader(MemoryBuffer, /* out */ HeaderRawHash, /* out */ RawSize)) - { - ZEN_SCOPED_ERROR("compressed buffer header validation failed"); - - return false; - } - - CompressedBuffer Compressed = - CompressedBuffer::FromCompressed(SharedBuffer(MemoryBuffer), /* out */ HeaderRawHash, /* out */ RawSize); - CompositeBuffer Decompressed = Compressed.DecompressToComposite(); - IoHash DecompressedHash = HashBuffer(Decompressed); - - if (HeaderRawHash != DecompressedHash) - { - ZEN_SCOPED_ERROR("decompressed hash {} differs from header hash {}", DecompressedHash, HeaderRawHash); - - return false; - } - } - else - { - // No way to verify this kind of content (what is it exactly?) - - static int Once = [&] { - ZEN_WARN("ValidateCacheBucketEntryValue called with unknown content type ({})", ToString(ContentType)); - return 42; - }(); - } - - return true; -}; - -void -ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) -{ - ZEN_TRACE_CPU("Z$::Bucket::Scrub"); - - ZEN_INFO("scrubbing '{}'", m_BucketDir); - - Stopwatch Timer; - uint64_t ChunkCount = 0; - uint64_t VerifiedChunkBytes = 0; - - auto LogStats = MakeGuard([&] { - const uint32_t DurationMs = gsl::narrow<uint32_t>(Timer.GetElapsedTimeMs()); - - ZEN_INFO("cache bucket '{}' scrubbed {}B in {} from {} chunks ({})", - m_BucketName, - NiceBytes(VerifiedChunkBytes), - NiceTimeSpanMs(DurationMs), - ChunkCount, - NiceRate(VerifiedChunkBytes, DurationMs)); - }); - - std::vector<IoHash> BadKeys; - auto ReportBadKey = [&](const IoHash& Key) { BadKeys.push_back(Key); }; - - try - { - std::vector<BlockStoreLocation> ChunkLocations; - std::vector<IoHash> ChunkIndexToChunkHash; - - RwLock::SharedLockScope _(m_IndexLock); - - const size_t BlockChunkInitialCount = m_Index.size() / 4; - ChunkLocations.reserve(BlockChunkInitialCount); - ChunkIndexToChunkHash.reserve(BlockChunkInitialCount); - - // Do a pass over the index and verify any standalone file values straight away - // all other storage classes are gathered and verified in bulk in order to enable - // more efficient I/O scheduling - - for (auto& Kv : m_Index) - { - const IoHash& HashKey = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - const DiskLocation& Loc = Payload.Location; - - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - Ctx.ThrowIfDeadlineExpired(); - - ++ChunkCount; - VerifiedChunkBytes += Loc.Size(); - - if (Loc.GetContentType() == ZenContentType::kBinary) - { - // Blob cache value, not much we can do about data integrity checking - // here since there's no hash available - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - - RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); - - std::error_code Ec; - uintmax_t size = std::filesystem::file_size(DataFilePath.ToPath(), Ec); - if (Ec) - { - ReportBadKey(HashKey); - } - if (size != Loc.Size()) - { - ReportBadKey(HashKey); - } - continue; - } - else - { - // Structured cache value - IoBuffer Buffer = GetStandaloneCacheValue(Loc.GetContentType(), HashKey); - if (!Buffer) - { - ReportBadKey(HashKey); - continue; - } - if (!ValidateCacheBucketEntryValue(Loc.GetContentType(), Buffer)) - { - ReportBadKey(HashKey); - continue; - } - } - } - else - { - ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment)); - ChunkIndexToChunkHash.push_back(HashKey); - continue; - } - } - - const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) -> void { - ++ChunkCount; - VerifiedChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - if (!Data) - { - // ChunkLocation out of range of stored blocks - ReportBadKey(Hash); - return; - } - if (!Size) - { - ReportBadKey(Hash); - return; - } - IoBuffer Buffer(IoBuffer::Wrap, Data, Size); - if (!Buffer) - { - ReportBadKey(Hash); - return; - } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); - Buffer.SetContentType(ContentType); - if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) - { - ReportBadKey(Hash); - return; - } - }; - - const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) -> void { - Ctx.ThrowIfDeadlineExpired(); - - ++ChunkCount; - VerifiedChunkBytes += Size; - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); - if (!Buffer) - { - ReportBadKey(Hash); - return; - } - const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)]; - ZenContentType ContentType = Payload.Location.GetContentType(); - Buffer.SetContentType(ContentType); - if (!ValidateCacheBucketEntryValue(ContentType, Buffer)) - { - ReportBadKey(Hash); - return; - } - }; - - m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); - } - catch (ScrubDeadlineExpiredException&) - { - ZEN_INFO("Scrubbing deadline expired, operation incomplete"); - } - - Ctx.ReportScrubbed(ChunkCount, VerifiedChunkBytes); - - if (!BadKeys.empty()) - { - ZEN_WARN("Scrubbing found {} bad chunks in '{}'", BadKeys.size(), m_BucketDir); - - if (Ctx.RunRecovery()) - { - // Deal with bad chunks by removing them from our lookup map - - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(BadKeys.size()); - - { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - for (const IoHash& BadKey : BadKeys) - { - // Log a tombstone and delete the in-memory index for the bad entry - const auto It = m_Index.find(BadKey); - BucketPayload& Payload = m_Payloads[It->second]; - if (m_Configuration.EnableReferenceCaching) - { - RemoveReferences(IndexLock, m_FirstReferenceIndex[It->second]); - } - DiskLocation Location = Payload.Location; - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed); - } - - RemoveMemCachedData(IndexLock, Payload); - RemoveMetaData(IndexLock, Payload); - - Location.Flags |= DiskLocation::kTombStone; - LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); - m_Index.erase(BadKey); - } - } - for (const DiskIndexEntry& Entry : LogEntries) - { - if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - ExtendablePathBuilder<256> Path; - BuildPath(Path, Entry.Key); - fs::path FilePath = Path.ToPath(); - RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key)); - if (fs::is_regular_file(FilePath)) - { - ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8()); - std::error_code Ec; - fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file... - } - } - } - m_SlogFile.Append(LogEntries); - - // Clean up m_AccessTimes and m_Payloads vectors - { - std::vector<BucketPayload> Payloads; - std::vector<AccessTime> AccessTimes; - std::vector<BucketMetaData> MetaDatas; - std::vector<MemCacheData> MemCachedPayloads; - std::vector<ReferenceIndex> FirstReferenceIndex; - IndexMap Index; - - { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); - } - } - } - } - - // Let whomever it concerns know about the bad chunks. This could - // be used to invalidate higher level data structures more efficiently - // than a full validation pass might be able to do - if (!BadKeys.empty()) - { - Ctx.ReportBadCidChunks(BadKeys); - } -} - -void -ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::Bucket::GatherReferences"); - -#define CALCULATE_BLOCKING_TIME 0 - -#if CALCULATE_BLOCKING_TIME - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; -#endif // CALCULATE_BLOCKING_TIME - - Stopwatch TotalTimer; - const auto _ = MakeGuard([&] { -#if CALCULATE_BLOCKING_TIME - ZEN_DEBUG("gathered references from '{}' in {} write lock: {} ({}), read lock: {} ({})", - m_BucketDir, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs)); -#else - ZEN_DEBUG("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); -#endif // CALCULATE_BLOCKING_TIME - }); - - const GcClock::TimePoint ExpireTime = GcCtx.CacheExpireTime(); - - const GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count(); - - IndexMap Index; - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - std::vector<ReferenceIndex> FirstReferenceIndex; - { - RwLock::SharedLockScope __(m_IndexLock); -#if CALCULATE_BLOCKING_TIME - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); -#endif // CALCULATE_BLOCKING_TIME - if (m_Index.empty()) - { - return; - } - Index = m_Index; - AccessTimes = m_AccessTimes; - Payloads = m_Payloads; - FirstReferenceIndex = m_FirstReferenceIndex; - } - - std::vector<IoHash> ExpiredKeys; - ExpiredKeys.reserve(1024); - - std::vector<IoHash> Cids; - if (!GcCtx.SkipCid()) - { - Cids.reserve(1024); - } - - std::vector<std::pair<IoHash, size_t>> StructuredItemsWithUnknownAttachments; - - for (const auto& Entry : Index) - { - const IoHash& Key = Entry.first; - size_t PayloadIndex = Entry.second; - GcClock::Tick AccessTime = AccessTimes[PayloadIndex]; - if (AccessTime < ExpireTicks) - { - ExpiredKeys.push_back(Key); - continue; - } - - if (GcCtx.SkipCid()) - { - continue; - } - - BucketPayload& Payload = Payloads[PayloadIndex]; - const DiskLocation& Loc = Payload.Location; - - if (!Loc.IsFlagSet(DiskLocation::kStructured)) - { - continue; - } - if (m_Configuration.EnableReferenceCaching) - { - if (FirstReferenceIndex.empty() || FirstReferenceIndex[PayloadIndex] == ReferenceIndex::Unknown()) - { - StructuredItemsWithUnknownAttachments.push_back(Entry); - continue; - } - - bool ReferencesAreKnown = false; - { - RwLock::SharedLockScope IndexLock(m_IndexLock); -#if CALCULATE_BLOCKING_TIME - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); -#endif // CALCULATE_BLOCKING_TIME - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - ReferencesAreKnown = GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids); - } - } - if (ReferencesAreKnown) - { - if (Cids.size() >= 1024) - { - GcCtx.AddRetainedCids(Cids); - Cids.clear(); - } - continue; - } - } - StructuredItemsWithUnknownAttachments.push_back(Entry); - } - - for (const auto& Entry : StructuredItemsWithUnknownAttachments) - { - const IoHash& Key = Entry.first; - BucketPayload& Payload = Payloads[Entry.second]; - const DiskLocation& Loc = Payload.Location; - { - IoBuffer Buffer; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Key); !Buffer) - { - continue; - } - } - else - { - RwLock::SharedLockScope IndexLock(m_IndexLock); -#if CALCULATE_BLOCKING_TIME - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); -#endif // CALCULATE_BLOCKING_TIME - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - const BucketPayload& CachedPayload = m_Payloads[It->second]; - if (CachedPayload.MemCached) - { - Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload; - ZEN_ASSERT_SLOW(Buffer); - } - else - { - DiskLocation Location = m_Payloads[It->second].Location; - IndexLock.ReleaseNow(); - Buffer = GetInlineCacheValue(Location); - // Don't memcache items when doing GC - } - } - if (!Buffer) - { - continue; - } - } - - ZEN_ASSERT(Buffer); - ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); - CbObjectView Obj(Buffer.GetData()); - size_t CurrentCidCount = Cids.size(); - Obj.IterateAttachments([&Cids](CbFieldView Field) { Cids.push_back(Field.AsAttachment()); }); - if (m_Configuration.EnableReferenceCaching) - { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); -#if CALCULATE_BLOCKING_TIME - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); -#endif // CALCULATE_BLOCKING_TIME - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - if (m_FirstReferenceIndex[It->second] == ReferenceIndex::Unknown()) - { - SetReferences(IndexLock, - m_FirstReferenceIndex[It->second], - std::span<IoHash>(Cids.data() + CurrentCidCount, Cids.size() - CurrentCidCount)); - } - else - { - Cids.resize(CurrentCidCount); - (void)GetReferences(IndexLock, m_FirstReferenceIndex[It->second], Cids); - } - } - } - if (Cids.size() >= 1024) - { - GcCtx.AddRetainedCids(Cids); - Cids.clear(); - } - } - } - - GcCtx.AddRetainedCids(Cids); - GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); -} - -void -ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage"); - - ZEN_DEBUG("collecting garbage from '{}'", m_BucketDir); - - Stopwatch TotalTimer; - uint64_t WriteBlockTimeUs = 0; - uint64_t WriteBlockLongestTimeUs = 0; - uint64_t ReadBlockTimeUs = 0; - uint64_t ReadBlockLongestTimeUs = 0; - uint64_t TotalChunkCount = 0; - uint64_t DeletedSize = 0; - GcStorageSize OldTotalSize = StorageSize(); - - std::unordered_set<IoHash> DeletedChunks; - uint64_t MovedCount = 0; - - const auto _ = MakeGuard([&] { - ZEN_DEBUG( - "garbage collect from '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " - "{} " - "of {} " - "entries ({}/{}).", - m_BucketDir, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs), - NiceBytes(DeletedSize), - DeletedChunks.size(), - MovedCount, - TotalChunkCount, - NiceBytes(OldTotalSize.DiskSize), - NiceBytes(OldTotalSize.MemorySize)); - - bool Expected = false; - if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) - { - return; - } - auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - - try - { - SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); - } - catch (std::exception& Ex) - { - ZEN_WARN("Failed to write index and manifest after GC in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); - } - }); - - auto __ = MakeGuard([&]() { - if (!DeletedChunks.empty()) - { - // Clean up m_AccessTimes and m_Payloads vectors - std::vector<BucketPayload> Payloads; - std::vector<AccessTime> AccessTimes; - std::vector<BucketMetaData> MetaDatas; - std::vector<MemCacheData> MemCachedPayloads; - std::vector<ReferenceIndex> FirstReferenceIndex; - IndexMap Index; - { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); - } - GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end())); - } - }); - - std::span<const IoHash> ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); - if (ExpiredCacheKeySpan.empty()) - { - return; - } - - m_SlogFile.Flush(); - - std::unordered_set<IoHash, IoHash::Hasher> ExpiredCacheKeys(ExpiredCacheKeySpan.begin(), ExpiredCacheKeySpan.end()); - - std::vector<DiskIndexEntry> ExpiredStandaloneEntries; - IndexMap IndexSnapshot; - std::vector<BucketPayload> PayloadsSnapshot; - BlockStore::ReclaimSnapshotState BlockStoreState; - { - bool Expected = false; - if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) - { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir); - return; - } - auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - - { - ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::State"); - RwLock::SharedLockScope IndexLock(m_IndexLock); - - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - - BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - - for (const IoHash& Key : ExpiredCacheKeys) - { - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - const BucketPayload& Payload = m_Payloads[It->second]; - if (Payload.Location.Flags & DiskLocation::kStandaloneFile) - { - DiskIndexEntry Entry = {.Key = Key, .Location = Payload.Location}; - Entry.Location.Flags |= DiskLocation::kTombStone; - ExpiredStandaloneEntries.push_back(Entry); - } - } - } - - PayloadsSnapshot = m_Payloads; - IndexSnapshot = m_Index; - - if (GcCtx.IsDeletionMode()) - { - IndexLock.ReleaseNow(); - RwLock::ExclusiveLockScope __(m_IndexLock); - for (const auto& Entry : ExpiredStandaloneEntries) - { - if (m_Index.erase(Entry.Key) == 1) - { - m_StandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - DeletedChunks.insert(Entry.Key); - } - } - m_SlogFile.Append(ExpiredStandaloneEntries); - } - } - } - - if (GcCtx.IsDeletionMode()) - { - ZEN_TRACE_CPU("Z$::Bucket::CollectGarbage::Delete"); - - ExtendablePathBuilder<256> Path; - - for (const auto& Entry : ExpiredStandaloneEntries) - { - const IoHash& Key = Entry.Key; - - Path.Reset(); - BuildPath(Path, Key); - fs::path FilePath = Path.ToPath(); - - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_Index.contains(Key)) - { - // Someone added it back, let the file on disk be - ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); - continue; - } - IndexLock.ReleaseNow(); - - RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); - if (fs::is_regular_file(FilePath)) - { - ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); - std::error_code Ec; - fs::remove(FilePath, Ec); - if (Ec) - { - ZEN_WARN("delete expired z$ standalone file '{}' FAILED, reason: '{}'", Path.ToUtf8(), Ec.message()); - continue; - } - } - } - DeletedSize += Entry.Location.Size(); - } - } - - TotalChunkCount = IndexSnapshot.size(); - - std::vector<BlockStoreLocation> ChunkLocations; - BlockStore::ChunkIndexArray KeepChunkIndexes; - std::vector<IoHash> ChunkIndexToChunkHash; - ChunkLocations.reserve(TotalChunkCount); - ChunkLocations.reserve(TotalChunkCount); - ChunkIndexToChunkHash.reserve(TotalChunkCount); - { - TotalChunkCount = 0; - for (const auto& Entry : IndexSnapshot) - { - size_t EntryIndex = Entry.second; - const DiskLocation& DiskLocation = PayloadsSnapshot[EntryIndex].Location; - - if (DiskLocation.Flags & DiskLocation::kStandaloneFile) - { - continue; - } - const IoHash& Key = Entry.first; - BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment); - size_t ChunkIndex = ChunkLocations.size(); - ChunkLocations.push_back(Location); - ChunkIndexToChunkHash.push_back(Key); - if (ExpiredCacheKeys.contains(Key)) - { - continue; - } - KeepChunkIndexes.push_back(ChunkIndex); - } - } - TotalChunkCount = ChunkLocations.size(); - size_t DeleteCount = TotalChunkCount - KeepChunkIndexes.size(); - - const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); - if (!PerformDelete) - { - m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_Configuration.PayloadAlignment, true); - GcStorageSize CurrentTotalSize = StorageSize(); - ZEN_DEBUG("garbage collect from '{}' DISABLED, found {} chunks of total {} ({}/{})", - m_BucketDir, - DeleteCount, - TotalChunkCount, - NiceBytes(CurrentTotalSize.DiskSize), - NiceBytes(CurrentTotalSize.MemorySize)); - return; - } - - m_BlockStore.ReclaimSpace( - BlockStoreState, - ChunkLocations, - KeepChunkIndexes, - m_Configuration.PayloadAlignment, - false, - [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); - { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const BlockStoreLocation& NewLocation = Entry.second; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - size_t EntryIndex = m_Index[ChunkHash]; - BucketPayload& Payload = m_Payloads[EntryIndex]; - if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != m_Payloads[EntryIndex].Location) - { - // Entry has been updated while GC was running, ignore the move - continue; - } - Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags()); - LogEntries.push_back({.Key = ChunkHash, .Location = Payload.Location}); - } - for (const size_t ChunkIndex : RemovedChunks) - { - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - size_t EntryIndex = m_Index[ChunkHash]; - BucketPayload& Payload = m_Payloads[EntryIndex]; - if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != Payload.Location) - { - // Entry has been updated while GC was running, ignore the delete - continue; - } - const DiskLocation& OldDiskLocation = Payload.Location; - LogEntries.push_back({.Key = ChunkHash, - .Location = DiskLocation(OldDiskLocation.GetBlockLocation(m_Configuration.PayloadAlignment), - m_Configuration.PayloadAlignment, - OldDiskLocation.GetFlags() | DiskLocation::kTombStone)}); - - RemoveMemCachedData(IndexLock, Payload); - RemoveMetaData(IndexLock, Payload); - - m_Index.erase(ChunkHash); - DeletedChunks.insert(ChunkHash); - } - } - - m_SlogFile.Append(LogEntries); - m_SlogFile.Flush(); - }, - [&]() { return GcCtx.ClaimGCReserve(); }); -} - -ZenCacheDiskLayer::BucketStats -ZenCacheDiskLayer::CacheBucket::Stats() -{ - GcStorageSize Size = StorageSize(); - return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize, - .MemorySize = Size.MemorySize, - .DiskHitCount = m_DiskHitCount, - .DiskMissCount = m_DiskMissCount, - .DiskWriteCount = m_DiskWriteCount, - .MemoryHitCount = m_MemoryHitCount, - .MemoryMissCount = m_MemoryMissCount, - .MemoryWriteCount = m_MemoryWriteCount, - .PutOps = m_PutOps.Snapshot(), - .GetOps = m_GetOps.Snapshot()}; -} - -uint64_t -ZenCacheDiskLayer::CacheBucket::EntryCount() const -{ - RwLock::SharedLockScope _(m_IndexLock); - return static_cast<uint64_t>(m_Index.size()); -} - -CacheValueDetails::ValueDetails -ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const IoHash& Key, PayloadIndex Index) const -{ - std::vector<IoHash> Attachments; - const BucketPayload& Payload = m_Payloads[Index]; - if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) - { - IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) - ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key) - : GetInlineCacheValue(Payload.Location); - CbObjectView Obj(Value.GetData()); - Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); - } - BucketMetaData MetaData = GetMetaData(IndexLock, Payload); - return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(), - .RawSize = MetaData.RawSize, - .RawHash = MetaData.RawHash, - .LastAccess = m_AccessTimes[Index], - .Attachments = std::move(Attachments), - .ContentType = Payload.Location.GetContentType()}; -} - -CacheValueDetails::BucketDetails -ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const -{ - CacheValueDetails::BucketDetails Details; - RwLock::SharedLockScope _(m_IndexLock); - if (ValueFilter.empty()) - { - Details.Values.reserve(m_Index.size()); - for (const auto& It : m_Index) - { - Details.Values.insert_or_assign(It.first, GetValueDetails(IndexLock, It.first, It.second)); - } - } - else - { - IoHash Key = IoHash::FromHexString(ValueFilter); - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - Details.Values.insert_or_assign(It->first, GetValueDetails(IndexLock, It->first, It->second)); - } - } - return Details; -} - -void -ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents( - std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const -{ - RwLock::SharedLockScope IndexLock(m_IndexLock); - for (const auto& It : m_Index) - { - CacheValueDetails::ValueDetails Vd = GetValueDetails(IndexLock, It.first, It.second); - - Fn(It.first, Vd); - } -} - -void -ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::CollectGarbage"); - - std::vector<CacheBucket*> Buckets; - { - RwLock::SharedLockScope _(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Buckets.push_back(Kv.second.get()); - } - } - for (CacheBucket* Bucket : Buckets) - { - Bucket->CollectGarbage(GcCtx); - } - if (!m_IsMemCacheTrimming) - { - MemCacheTrim(Buckets, GcCtx.CacheExpireTime()); - } -} - -void -ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) -{ - ZEN_TRACE_CPU("Z$::Bucket::PutStandaloneCacheValue"); - - uint64_t NewFileSize = Value.Value.Size(); - - TemporaryFile DataFile; - - std::error_code Ec; - DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); - } - - bool CleanUpTempFile = true; - auto __ = MakeGuard([&] { - if (CleanUpTempFile) - { - std::error_code Ec; - std::filesystem::remove(DataFile.GetPath(), Ec); - if (Ec) - { - ZEN_WARN("Failed to clean up temporary file '{}' for put in '{}', reason '{}'", - DataFile.GetPath(), - m_BucketDir, - Ec.message()); - } - } - }); - - DataFile.WriteAll(Value.Value, Ec); - if (Ec) - { - throw std::system_error(Ec, - fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", - NiceBytes(NewFileSize), - DataFile.GetPath().string(), - m_BucketDir)); - } - - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - std::filesystem::path FsPath{DataFilePath.ToPath()}; - - RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - - // We do a speculative remove of the file instead of probing with a exists call and check the error code instead - std::filesystem::remove(FsPath, Ec); - if (Ec) - { - if (Ec.value() != ENOENT) - { - ZEN_WARN("Failed to remove file '{}' for put in '{}', reason: '{}', retrying.", FsPath, m_BucketDir, Ec.message()); - Sleep(100); - Ec.clear(); - std::filesystem::remove(FsPath, Ec); - if (Ec && Ec.value() != ENOENT) - { - throw std::system_error(Ec, fmt::format("Failed to remove file '{}' for put in '{}'", FsPath, m_BucketDir)); - } - } - } - - // Assume parent directory exists - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - if (Ec) - { - CreateDirectories(FsPath.parent_path()); - - // Try again after we or someone else created the directory - Ec.clear(); - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - - // Retry if we still fail to handle contention to file system - uint32_t RetriesLeft = 3; - while (Ec && RetriesLeft > 0) - { - ZEN_WARN("Failed to finalize file '{}', moving from '{}' for put in '{}', reason: '{}', retries left: {}.", - FsPath, - DataFile.GetPath(), - m_BucketDir, - Ec.message(), - RetriesLeft); - Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms - Ec.clear(); - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - RetriesLeft--; - } - if (Ec) - { - throw std::system_error( - Ec, - fmt::format("Failed to finalize file '{}', moving from '{}' for put in '{}'", FsPath, DataFile.GetPath(), m_BucketDir)); - } - } - - // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file - // will be disabled as the file handle has already been closed - CleanUpTempFile = false; - - uint8_t EntryFlags = DiskLocation::kStandaloneFile; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - DiskLocation Loc(NewFileSize, EntryFlags); - - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - ValueLock.ReleaseNow(); - if (m_UpdatedKeys) - { - m_UpdatedKeys->insert(HashKey); - } - - PayloadIndex EntryIndex = {}; - if (auto It = m_Index.find(HashKey); It == m_Index.end()) - { - // Previously unknown object - EntryIndex = PayloadIndex(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Location = Loc}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - if (m_Configuration.EnableReferenceCaching) - { - m_FirstReferenceIndex.emplace_back(ReferenceIndex{}); - SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); - } - m_Index.insert_or_assign(HashKey, EntryIndex); - } - else - { - EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); - BucketPayload& Payload = m_Payloads[EntryIndex]; - uint64_t OldSize = Payload.Location.Size(); - Payload = BucketPayload{.Location = Loc}; - if (m_Configuration.EnableReferenceCaching) - { - SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); - } - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - RemoveMemCachedData(IndexLock, Payload); - m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed); - } - if (Value.RawSize != 0 || Value.RawHash != IoHash::Zero) - { - SetMetaData(IndexLock, m_Payloads[EntryIndex], {.RawSize = Value.RawSize, .RawHash = Value.RawHash}); - } - else - { - RemoveMetaData(IndexLock, m_Payloads[EntryIndex]); - } - - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_StandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed); -} - -void -ZenCacheDiskLayer::CacheBucket::SetMetaData(RwLock::ExclusiveLockScope&, - BucketPayload& Payload, - const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData) -{ - if (Payload.MetaData) - { - m_MetaDatas[Payload.MetaData] = MetaData; - } - else - { - if (m_FreeMetaDatas.empty()) - { - Payload.MetaData = MetaDataIndex(m_MetaDatas.size()); - m_MetaDatas.emplace_back(MetaData); - } - else - { - Payload.MetaData = m_FreeMetaDatas.back(); - m_FreeMetaDatas.pop_back(); - m_MetaDatas[Payload.MetaData] = MetaData; - } - } -} - -void -ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload) -{ - if (Payload.MetaData) - { - m_FreeMetaDatas.push_back(Payload.MetaData); - Payload.MetaData = {}; - } -} - -void -ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData) -{ - BucketPayload& Payload = m_Payloads[PayloadIndex]; - uint64_t PayloadSize = MemCachedData.GetSize(); - ZEN_ASSERT(PayloadSize != 0); - if (m_FreeMemCachedPayloads.empty()) - { - if (m_MemCachedPayloads.size() != std::numeric_limits<uint32_t>::max()) - { - Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size())); - m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}); - AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); - m_MemoryWriteCount++; - } - } - else - { - Payload.MemCached = m_FreeMemCachedPayloads.back(); - m_FreeMemCachedPayloads.pop_back(); - m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex}; - AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); - m_MemoryWriteCount++; - } -} - -size_t -ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload) -{ - if (Payload.MemCached) - { - size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize(); - RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize)); - m_MemCachedPayloads[Payload.MemCached] = {}; - m_FreeMemCachedPayloads.push_back(Payload.MemCached); - Payload.MemCached = {}; - return PayloadSize; - } - return 0; -} - -ZenCacheDiskLayer::CacheBucket::BucketMetaData -ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const -{ - if (Payload.MetaData) - { - return m_MetaDatas[Payload.MetaData]; - } - return {}; -} - -void -ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) -{ - ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); - - uint8_t EntryFlags = 0; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - m_BlockStore.WriteChunk(Value.Value.Data(), - Value.Value.Size(), - m_Configuration.PayloadAlignment, - [&](const BlockStoreLocation& BlockStoreLocation) { - DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); - m_SlogFile.Append({.Key = HashKey, .Location = Location}); - - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - if (m_UpdatedKeys) - { - m_UpdatedKeys->insert(HashKey); - } - if (auto It = m_Index.find(HashKey); It != m_Index.end()) - { - PayloadIndex EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size())); - BucketPayload& Payload = m_Payloads[EntryIndex]; - - RemoveMemCachedData(IndexLock, Payload); - RemoveMetaData(IndexLock, Payload); - - Payload = (BucketPayload{.Location = Location}); - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - - if (m_Configuration.EnableReferenceCaching) - { - SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References); - } - } - else - { - PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Location = Location}); - m_AccessTimes.emplace_back(GcClock::TickCount()); - if (m_Configuration.EnableReferenceCaching) - { - m_FirstReferenceIndex.emplace_back(ReferenceIndex{}); - SetReferences(IndexLock, m_FirstReferenceIndex.back(), References); - } - m_Index.insert_or_assign(HashKey, EntryIndex); - } - }); -} - -std::string -ZenCacheDiskLayer::CacheBucket::GetGcName(GcCtx&) -{ - return fmt::format("cachebucket:'{}'", m_BucketDir.string()); -} - -class DiskBucketStoreCompactor : public GcStoreCompactor -{ -public: - DiskBucketStoreCompactor(ZenCacheDiskLayer::CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys) - : m_Bucket(Bucket) - , m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys)) - { - m_ExpiredStandaloneKeys.shrink_to_fit(); - } - - virtual ~DiskBucketStoreCompactor() {} - - virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override - { - ZEN_TRACE_CPU("Z$::Bucket::CompactStore"); - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - Reset(m_ExpiredStandaloneKeys); - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': RemovedDisk: {} in {}", - m_Bucket.m_BucketDir, - NiceBytes(Stats.RemovedDisk), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - if (!m_ExpiredStandaloneKeys.empty()) - { - // Compact standalone items - size_t Skipped = 0; - ExtendablePathBuilder<256> Path; - for (const std::pair<IoHash, uint64_t>& ExpiredKey : m_ExpiredStandaloneKeys) - { - if (Ctx.IsCancelledFlag.load()) - { - return; - } - Path.Reset(); - m_Bucket.BuildPath(Path, ExpiredKey.first); - fs::path FilePath = Path.ToPath(); - - RwLock::SharedLockScope IndexLock(m_Bucket.m_IndexLock); - if (m_Bucket.m_Index.contains(ExpiredKey.first)) - { - // Someone added it back, let the file on disk be - ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back", - m_Bucket.m_BucketDir, - Path.ToUtf8()); - continue; - } - - if (Ctx.Settings.IsDeleteMode) - { - RwLock::ExclusiveLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); - IndexLock.ReleaseNow(); - ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); - - std::error_code Ec; - if (!fs::remove(FilePath, Ec)) - { - continue; - } - if (Ec) - { - ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'", - m_Bucket.m_BucketDir, - Path.ToUtf8(), - Ec.message()); - continue; - } - Stats.RemovedDisk += ExpiredKey.second; - } - else - { - RwLock::SharedLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first)); - IndexLock.ReleaseNow(); - ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8()); - - std::error_code Ec; - bool Existed = std::filesystem::is_regular_file(FilePath, Ec); - if (Ec) - { - ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'", - m_Bucket.m_BucketDir, - FilePath, - Ec.message()); - continue; - } - if (!Existed) - { - continue; - } - Skipped++; - } - } - if (Skipped > 0) - { - ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipped deleting of {} eligible files", m_Bucket.m_BucketDir, Skipped); - } - } - - if (Ctx.Settings.CollectSmallObjects) - { - m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys = std::make_unique<HashSet>(); }); - auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys.reset(); }); }); - - size_t InlineEntryCount = 0; - BlockStore::BlockUsageMap BlockUsage; - { - RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); - for (const auto& Entry : m_Bucket.m_Index) - { - ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; - const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index]; - const DiskLocation& Loc = Payload.Location; - - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - continue; - } - InlineEntryCount++; - uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex(); - uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment); - if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end()) - { - It->second.EntryCount++; - It->second.DiskUsage += ChunkSize; - } - else - { - BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); - } - } - } - - { - BlockStoreCompactState BlockCompactState; - std::vector<IoHash> BlockCompactStateKeys; - BlockCompactStateKeys.reserve(InlineEntryCount); - - BlockStore::BlockEntryCountMap BlocksToCompact = - m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent); - BlockCompactState.IncludeBlocks(BlocksToCompact); - - if (BlocksToCompact.size() > 0) - { - { - RwLock::SharedLockScope ___(m_Bucket.m_IndexLock); - for (const auto& Entry : m_Bucket.m_Index) - { - ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second; - const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index]; - const DiskLocation& Loc = Payload.Location; - - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - continue; - } - if (!BlockCompactState.AddKeepLocation(Loc.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment))) - { - continue; - } - BlockCompactStateKeys.push_back(Entry.first); - } - } - - if (Ctx.Settings.IsDeleteMode) - { - if (Ctx.Settings.Verbose) - { - ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks", - m_Bucket.m_BucketDir, - BlocksToCompact.size()); - } - - m_Bucket.m_BlockStore.CompactBlocks( - BlockCompactState, - m_Bucket.m_Configuration.PayloadAlignment, - [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) { - std::vector<DiskIndexEntry> MovedEntries; - MovedEntries.reserve(MovedArray.size()); - RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock); - for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) - { - size_t ChunkIndex = Moved.first; - const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; - - if (m_Bucket.m_UpdatedKeys->contains(Key)) - { - continue; - } - - if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end()) - { - ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second]; - const BlockStoreLocation& NewLocation = Moved.second; - Payload.Location = DiskLocation(NewLocation, - m_Bucket.m_Configuration.PayloadAlignment, - Payload.Location.GetFlags()); - MovedEntries.push_back({.Key = Key, .Location = Payload.Location}); - } - } - m_Bucket.m_SlogFile.Append(MovedEntries); - Stats.RemovedDisk += FreedDiskSpace; - if (Ctx.IsCancelledFlag.load()) - { - return false; - } - return true; - }, - ClaimDiskReserveCallback); - } - else - { - if (Ctx.Settings.Verbose) - { - ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks", - m_Bucket.m_BucketDir, - BlocksToCompact.size()); - } - } - } - } - } - } - -private: - ZenCacheDiskLayer::CacheBucket& m_Bucket; - std::vector<std::pair<IoHash, uint64_t>> m_ExpiredStandaloneKeys; -}; - -GcStoreCompactor* -ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) -{ - ZEN_TRACE_CPU("Z$::Bucket::RemoveExpiredData"); - - size_t TotalEntries = 0; - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}", - m_BucketDir, - Stats.CheckedCount, - Stats.FoundCount, - Stats.DeletedCount, - NiceBytes(Stats.FreedMemory), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count(); - - std::vector<DiskIndexEntry> ExpiredEntries; - std::vector<std::pair<IoHash, uint64_t>> ExpiredStandaloneKeys; - uint64_t RemovedStandaloneSize = 0; - { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - if (Ctx.IsCancelledFlag.load()) - { - return nullptr; - } - if (m_Index.empty()) - { - return nullptr; - } - - TotalEntries = m_Index.size(); - - // Find out expired keys - for (const auto& Entry : m_Index) - { - const IoHash& Key = Entry.first; - PayloadIndex EntryIndex = Entry.second; - GcClock::Tick AccessTime = m_AccessTimes[EntryIndex]; - if (AccessTime >= ExpireTicks) - { - continue; - } - - const BucketPayload& Payload = m_Payloads[EntryIndex]; - DiskIndexEntry ExpiredEntry = {.Key = Key, .Location = Payload.Location}; - ExpiredEntry.Location.Flags |= DiskLocation::kTombStone; - - if (Payload.Location.Flags & DiskLocation::kStandaloneFile) - { - ExpiredStandaloneKeys.push_back({Key, Payload.Location.Size()}); - RemovedStandaloneSize += Payload.Location.Size(); - ExpiredEntries.push_back(ExpiredEntry); - } - else if (Ctx.Settings.CollectSmallObjects) - { - ExpiredEntries.push_back(ExpiredEntry); - } - } - - Stats.CheckedCount += TotalEntries; - Stats.FoundCount += ExpiredEntries.size(); - - if (Ctx.IsCancelledFlag.load()) - { - return nullptr; - } - - if (Ctx.Settings.IsDeleteMode) - { - for (const DiskIndexEntry& Entry : ExpiredEntries) - { - auto It = m_Index.find(Entry.Key); - ZEN_ASSERT(It != m_Index.end()); - BucketPayload& Payload = m_Payloads[It->second]; - RemoveMetaData(IndexLock, Payload); - Stats.FreedMemory += RemoveMemCachedData(IndexLock, Payload); - m_Index.erase(It); - Stats.DeletedCount++; - } - m_SlogFile.Append(ExpiredEntries); - m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed); - } - } - - if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty()) - { - std::vector<BucketPayload> Payloads; - std::vector<AccessTime> AccessTimes; - std::vector<BucketMetaData> MetaDatas; - std::vector<MemCacheData> MemCachedPayloads; - std::vector<ReferenceIndex> FirstReferenceIndex; - IndexMap Index; - { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock); - } - } - - if (Ctx.IsCancelledFlag.load()) - { - return nullptr; - } - - return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); -} - -class DiskBucketReferenceChecker : public GcReferenceChecker -{ - using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; - using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; - using CacheBucket = ZenCacheDiskLayer::CacheBucket; - using ReferenceIndex = ZenCacheDiskLayer::CacheBucket::ReferenceIndex; - -public: - DiskBucketReferenceChecker(CacheBucket& Owner) : m_CacheBucket(Owner) {} - - virtual ~DiskBucketReferenceChecker() - { - try - { - m_IndexLock.reset(); - if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); - // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it - m_CacheBucket.ClearReferenceCache(); - } - } - catch (std::exception& Ex) - { - ZEN_ERROR("~DiskBucketReferenceChecker threw exception: '{}'", Ex.what()); - } - } - - virtual void PreCache(GcCtx& Ctx) override - { - ZEN_TRACE_CPU("Z$::Bucket::PreCache"); - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}", - m_CacheBucket.m_BucketDir, - m_CacheBucket.m_ReferenceCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - std::vector<IoHash> UpdateKeys; - std::vector<size_t> ReferenceCounts; - std::vector<IoHash> References; - - auto GetAttachments = [&References, &ReferenceCounts](const void* CbObjectData) { - size_t CurrentReferenceCount = References.size(); - CbObjectView Obj(CbObjectData); - Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); }); - ReferenceCounts.push_back(References.size() - CurrentReferenceCount); - }; - - // Refresh cache - { - // If reference caching is enabled the references will be updated at modification for us so we don't need to track modifications - if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys = std::make_unique<HashSet>(); }); - } - - std::vector<IoHash> StandaloneKeys; - { - std::vector<IoHash> InlineKeys; - std::unordered_map<uint32_t, std::size_t> BlockIndexToEntriesPerBlockIndex; - struct InlineEntry - { - uint32_t InlineKeyIndex; - uint32_t Offset; - uint32_t Size; - }; - std::vector<std::vector<InlineEntry>> EntriesPerBlock; - size_t UpdateCount = 0; - { - RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); - for (const auto& Entry : m_CacheBucket.m_Index) - { - if (Ctx.IsCancelledFlag.load()) - { - IndexLock.ReleaseNow(); - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); - return; - } - - PayloadIndex EntryIndex = Entry.second; - const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex]; - const DiskLocation& Loc = Payload.Location; - - if (!Loc.IsFlagSet(DiskLocation::kStructured)) - { - continue; - } - if (m_CacheBucket.m_Configuration.EnableReferenceCaching && - m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown()) - { - continue; - } - UpdateCount++; - const IoHash& Key = Entry.first; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - StandaloneKeys.push_back(Key); - continue; - } - - BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment); - InlineEntry UpdateEntry = {.InlineKeyIndex = gsl::narrow<uint32_t>(InlineKeys.size()), - .Offset = gsl::narrow<uint32_t>(ChunkLocation.Offset), - .Size = gsl::narrow<uint32_t>(ChunkLocation.Size)}; - InlineKeys.push_back(Key); - - if (auto It = BlockIndexToEntriesPerBlockIndex.find(ChunkLocation.BlockIndex); - It != BlockIndexToEntriesPerBlockIndex.end()) - { - EntriesPerBlock[It->second].emplace_back(UpdateEntry); - } - else - { - BlockIndexToEntriesPerBlockIndex.insert_or_assign(ChunkLocation.BlockIndex, EntriesPerBlock.size()); - EntriesPerBlock.emplace_back(std::vector<InlineEntry>{UpdateEntry}); - } - } - } - - UpdateKeys.reserve(UpdateCount); - - for (auto It : BlockIndexToEntriesPerBlockIndex) - { - uint32_t BlockIndex = It.first; - - Ref<BlockStoreFile> BlockFile = m_CacheBucket.m_BlockStore.GetBlockFile(BlockIndex); - if (BlockFile) - { - size_t EntriesPerBlockIndex = It.second; - std::vector<InlineEntry>& InlineEntries = EntriesPerBlock[EntriesPerBlockIndex]; - - std::sort(InlineEntries.begin(), InlineEntries.end(), [&](const InlineEntry& Lhs, const InlineEntry& Rhs) -> bool { - return Lhs.Offset < Rhs.Offset; - }); - - uint64_t BlockFileSize = BlockFile->FileSize(); - BasicFileBuffer BlockBuffer(BlockFile->GetBasicFile(), 32768); - for (const InlineEntry& InlineEntry : InlineEntries) - { - if ((InlineEntry.Offset + InlineEntry.Size) > BlockFileSize) - { - ReferenceCounts.push_back(0); - } - else - { - MemoryView ChunkView = BlockBuffer.MakeView(InlineEntry.Size, InlineEntry.Offset); - if (ChunkView.GetSize() == InlineEntry.Size) - { - GetAttachments(ChunkView.GetData()); - } - else - { - std::vector<uint8_t> Buffer(InlineEntry.Size); - BlockBuffer.Read(Buffer.data(), InlineEntry.Size, InlineEntry.Offset); - GetAttachments(Buffer.data()); - } - } - const IoHash& Key = InlineKeys[InlineEntry.InlineKeyIndex]; - UpdateKeys.push_back(Key); - } - } - } - } - { - for (const IoHash& Key : StandaloneKeys) - { - if (Ctx.IsCancelledFlag.load()) - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); - return; - } - - IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(ZenContentType::kCbObject, Key); - if (!Buffer) - { - continue; - } - - GetAttachments(Buffer.GetData()); - UpdateKeys.push_back(Key); - } - } - } - - { - size_t ReferenceOffset = 0; - RwLock::ExclusiveLockScope IndexLock(m_CacheBucket.m_IndexLock); - - if (!m_CacheBucket.m_Configuration.EnableReferenceCaching) - { - ZEN_ASSERT(m_CacheBucket.m_FirstReferenceIndex.empty()); - ZEN_ASSERT(m_CacheBucket.m_ReferenceHashes.empty()); - ZEN_ASSERT(m_CacheBucket.m_NextReferenceHashesIndexes.empty()); - ZEN_ASSERT(m_CacheBucket.m_ReferenceCount == 0); - ZEN_ASSERT(m_CacheBucket.m_UpdatedKeys); - - // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when - // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted. - m_CacheBucket.m_FirstReferenceIndex.resize(m_CacheBucket.m_Payloads.size(), ReferenceIndex::Unknown()); - m_CacheBucket.m_ReferenceHashes.reserve(References.size()); - m_CacheBucket.m_NextReferenceHashesIndexes.reserve(References.size()); - } - else - { - ZEN_ASSERT(!m_CacheBucket.m_UpdatedKeys); - } - - for (size_t Index = 0; Index < UpdateKeys.size(); Index++) - { - const IoHash& Key = UpdateKeys[Index]; - size_t ReferenceCount = ReferenceCounts[Index]; - if (auto It = m_CacheBucket.m_Index.find(Key); It != m_CacheBucket.m_Index.end()) - { - PayloadIndex EntryIndex = It->second; - if (m_CacheBucket.m_Configuration.EnableReferenceCaching) - { - if (m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown()) - { - // The reference data is valid and what we have is old/redundant - continue; - } - } - else if (m_CacheBucket.m_UpdatedKeys->contains(Key)) - { - // Our pre-cache data is invalid - continue; - } - - m_CacheBucket.SetReferences(IndexLock, - m_CacheBucket.m_FirstReferenceIndex[EntryIndex], - std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount}); - } - ReferenceOffset += ReferenceCount; - } - - if (m_CacheBucket.m_Configuration.EnableReferenceCaching && !UpdateKeys.empty()) - { - m_CacheBucket.CompactReferences(IndexLock); - } - } - } - - virtual void LockState(GcCtx& Ctx) override - { - ZEN_TRACE_CPU("Z$::Bucket::LockState"); - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}", - m_CacheBucket.m_BucketDir, - m_CacheBucket.m_ReferenceCount + m_UncachedReferences.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock); - if (Ctx.IsCancelledFlag.load()) - { - m_UncachedReferences.clear(); - m_IndexLock.reset(); - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); - return; - } - - if (m_CacheBucket.m_UpdatedKeys) - { - const HashSet& UpdatedKeys(*m_CacheBucket.m_UpdatedKeys); - for (const IoHash& Key : UpdatedKeys) - { - if (Ctx.IsCancelledFlag.load()) - { - m_UncachedReferences.clear(); - m_IndexLock.reset(); - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); }); - return; - } - - auto It = m_CacheBucket.m_Index.find(Key); - if (It == m_CacheBucket.m_Index.end()) - { - continue; - } - - PayloadIndex EntryIndex = It->second; - const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex]; - const DiskLocation& Loc = Payload.Location; - - if (!Loc.IsFlagSet(DiskLocation::kStructured)) - { - continue; - } - - IoBuffer Buffer; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - Buffer = m_CacheBucket.GetStandaloneCacheValue(Loc.GetContentType(), Key); - } - else - { - Buffer = m_CacheBucket.GetInlineCacheValue(Loc); - } - - if (Buffer) - { - ZEN_ASSERT(Buffer.GetContentType() == ZenContentType::kCbObject); - CbObjectView Obj(Buffer.GetData()); - Obj.IterateAttachments([this](CbFieldView Field) { m_UncachedReferences.insert(Field.AsAttachment()); }); - } - } - } - } - - virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override - { - ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet"); - - ZEN_ASSERT(m_IndexLock); - size_t InitialCount = IoCids.size(); - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: cachebucket [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", - m_CacheBucket.m_BucketDir, - InitialCount - IoCids.size(), - InitialCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes) - { - if (IoCids.erase(ReferenceHash) == 1) - { - if (IoCids.empty()) - { - return; - } - } - } - - for (const IoHash& ReferenceHash : m_UncachedReferences) - { - if (IoCids.erase(ReferenceHash) == 1) - { - if (IoCids.empty()) - { - return; - } - } - } - } - CacheBucket& m_CacheBucket; - std::unique_ptr<RwLock::SharedLockScope> m_IndexLock; - HashSet m_UncachedReferences; -}; - -std::vector<GcReferenceChecker*> -ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx) -{ - ZEN_TRACE_CPU("Z$::Bucket::CreateReferenceCheckers"); - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': completed in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - { - RwLock::SharedLockScope __(m_IndexLock); - if (m_Index.empty()) - { - return {}; - } - } - - return {new DiskBucketReferenceChecker(*this)}; -} - -void -ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&) -{ - ZEN_TRACE_CPU("Z$::Bucket::CompactReferences"); - - std::vector<ReferenceIndex> FirstReferenceIndex; - std::vector<IoHash> NewReferenceHashes; - std::vector<ReferenceIndex> NewNextReferenceHashesIndexes; - - FirstReferenceIndex.reserve(m_ReferenceCount); - NewReferenceHashes.reserve(m_ReferenceCount); - NewNextReferenceHashesIndexes.reserve(m_ReferenceCount); - - for (const auto& It : m_Index) - { - ReferenceIndex SourceIndex = m_FirstReferenceIndex[It.second]; - if (SourceIndex == ReferenceIndex::Unknown()) - { - FirstReferenceIndex.push_back(ReferenceIndex{}); - continue; - } - if (SourceIndex == ReferenceIndex::None()) - { - FirstReferenceIndex.push_back(ReferenceIndex::None()); - continue; - } - FirstReferenceIndex.push_back(ReferenceIndex{NewNextReferenceHashesIndexes.size()}); - NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]); - NewNextReferenceHashesIndexes.push_back(ReferenceIndex::None()); - - SourceIndex = m_NextReferenceHashesIndexes[SourceIndex]; - while (SourceIndex != ReferenceIndex::None()) - { - NewNextReferenceHashesIndexes.back() = ReferenceIndex{NewReferenceHashes.size()}; - NewReferenceHashes.push_back(m_ReferenceHashes[SourceIndex]); - NewNextReferenceHashesIndexes.push_back(ReferenceIndex::None()); - SourceIndex = m_NextReferenceHashesIndexes[SourceIndex]; - } - } - m_FirstReferenceIndex.swap(FirstReferenceIndex); - m_ReferenceHashes.swap(NewReferenceHashes); - m_ReferenceHashes.shrink_to_fit(); - m_NextReferenceHashesIndexes.swap(NewNextReferenceHashesIndexes); - m_NextReferenceHashesIndexes.shrink_to_fit(); - m_ReferenceCount = m_ReferenceHashes.size(); -} - -ZenCacheDiskLayer::CacheBucket::ReferenceIndex -ZenCacheDiskLayer::CacheBucket::AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key) -{ - ReferenceIndex NewIndex = ReferenceIndex{m_ReferenceHashes.size()}; - m_ReferenceHashes.push_back(Key); - m_NextReferenceHashesIndexes.emplace_back(ReferenceIndex::None()); - m_ReferenceCount++; - return NewIndex; -} - -void -ZenCacheDiskLayer::CacheBucket::SetReferences(RwLock::ExclusiveLockScope& Lock, - ReferenceIndex& FirstReferenceIndex, - std::span<IoHash> References) -{ - auto ReferenceIt = References.begin(); - - if (FirstReferenceIndex == ReferenceIndex::Unknown()) - { - FirstReferenceIndex = ReferenceIndex::None(); - } - - ReferenceIndex CurrentIndex = FirstReferenceIndex; - if (CurrentIndex != ReferenceIndex::None()) - { - if (ReferenceIt != References.end()) - { - ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero); - if (CurrentIndex == ReferenceIndex::None()) - { - CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt); - FirstReferenceIndex = CurrentIndex; - } - else - { - m_ReferenceHashes[CurrentIndex] = *ReferenceIt; - } - ReferenceIt++; - } - } - else - { - if (ReferenceIt != References.end()) - { - ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero); - CurrentIndex = AllocateReferenceEntry(Lock, *ReferenceIt); - ReferenceIt++; - } - FirstReferenceIndex = CurrentIndex; - } - - while (ReferenceIt != References.end()) - { - ZEN_ASSERT(CurrentIndex != ReferenceIndex::None()); - ZEN_ASSERT_SLOW(*ReferenceIt != IoHash::Zero); - ReferenceIndex NextReferenceIndex = m_NextReferenceHashesIndexes[CurrentIndex]; - if (NextReferenceIndex == ReferenceIndex::None()) - { - NextReferenceIndex = AllocateReferenceEntry(Lock, *ReferenceIt); - m_NextReferenceHashesIndexes[CurrentIndex] = NextReferenceIndex; - } - else - { - m_ReferenceHashes[NextReferenceIndex] = *ReferenceIt; - } - CurrentIndex = NextReferenceIndex; - ReferenceIt++; - } - - while (CurrentIndex != ReferenceIndex::None()) - { - ReferenceIndex NextIndex = m_NextReferenceHashesIndexes[CurrentIndex]; - if (NextIndex != ReferenceIndex::None()) - { - m_ReferenceHashes[CurrentIndex] = IoHash::Zero; - ZEN_ASSERT(m_ReferenceCount > 0); - m_ReferenceCount--; - m_NextReferenceHashesIndexes[CurrentIndex] = ReferenceIndex::None(); - } - CurrentIndex = NextIndex; - } -} - -void -ZenCacheDiskLayer::CacheBucket::RemoveReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex) -{ - if (FirstReferenceIndex == ReferenceIndex::Unknown()) - { - return; - } - ReferenceIndex CurrentIndex = FirstReferenceIndex; - while (CurrentIndex == ReferenceIndex::None()) - { - m_ReferenceHashes[CurrentIndex] = IoHash::Zero; - ZEN_ASSERT(m_ReferenceCount > 0); - m_ReferenceCount--; - CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex]; - } - FirstReferenceIndex = {}; -} - -bool -ZenCacheDiskLayer::CacheBucket::LockedGetReferences(ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const -{ - if (FirstReferenceIndex == ReferenceIndex::Unknown()) - { - return false; - } - - ReferenceIndex CurrentIndex = FirstReferenceIndex; - while (CurrentIndex != ReferenceIndex::None()) - { - ZEN_ASSERT_SLOW(m_ReferenceHashes[CurrentIndex] != IoHash::Zero); - OutReferences.push_back(m_ReferenceHashes[CurrentIndex]); - CurrentIndex = m_NextReferenceHashesIndexes[CurrentIndex]; - } - return true; -} - -void -ZenCacheDiskLayer::CacheBucket::ClearReferenceCache() -{ - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); - Reset(m_FirstReferenceIndex); - Reset(m_ReferenceHashes); - Reset(m_NextReferenceHashesIndexes); - m_ReferenceCount = 0; -} - -void -ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, - std::vector<BucketPayload>& Payloads, - std::vector<AccessTime>& AccessTimes, - std::vector<BucketMetaData>& MetaDatas, - std::vector<MemCacheData>& MemCachedPayloads, - std::vector<ReferenceIndex>& FirstReferenceIndex, - IndexMap& Index, - RwLock::ExclusiveLockScope& IndexLock) -{ - ZEN_TRACE_CPU("Z$::Bucket::CompactState"); - - size_t EntryCount = m_Index.size(); - Payloads.reserve(EntryCount); - AccessTimes.reserve(EntryCount); - if (m_Configuration.EnableReferenceCaching) - { - FirstReferenceIndex.reserve(EntryCount); - } - Index.reserve(EntryCount); - Index.min_load_factor(IndexMinLoadFactor); - Index.max_load_factor(IndexMaxLoadFactor); - for (auto It : m_Index) - { - PayloadIndex EntryIndex = PayloadIndex(Payloads.size()); - Payloads.push_back(m_Payloads[It.second]); - BucketPayload& Payload = Payloads.back(); - AccessTimes.push_back(m_AccessTimes[It.second]); - if (Payload.MetaData) - { - MetaDatas.push_back(m_MetaDatas[Payload.MetaData]); - Payload.MetaData = MetaDataIndex(MetaDatas.size() - 1); - } - if (Payload.MemCached) - { - MemCachedPayloads.emplace_back( - MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex}); - Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(MemCachedPayloads.size() - 1)); - } - if (m_Configuration.EnableReferenceCaching) - { - FirstReferenceIndex.push_back(m_FirstReferenceIndex[It.second]); - } - Index.insert({It.first, EntryIndex}); - } - m_Index.swap(Index); - m_Payloads.swap(Payloads); - m_AccessTimes.swap(AccessTimes); - m_MetaDatas.swap(MetaDatas); - Reset(m_FreeMetaDatas); - m_MemCachedPayloads.swap(MemCachedPayloads); - Reset(m_FreeMemCachedPayloads); - if (m_Configuration.EnableReferenceCaching) - { - m_FirstReferenceIndex.swap(FirstReferenceIndex); - CompactReferences(IndexLock); - } -} - -#if ZEN_WITH_TESTS -void -ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time) -{ - GcClock::Tick TimeTick = Time.time_since_epoch().count(); - RwLock::SharedLockScope IndexLock(m_IndexLock); - if (auto It = m_Index.find(HashKey); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = TimeTick; - } -} -#endif // ZEN_WITH_TESTS - -////////////////////////////////////////////////////////////////////////// - -ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config) -: m_Gc(Gc) -, m_JobQueue(JobQueue) -, m_RootDir(RootDir) -, m_Configuration(Config) -{ -} - -ZenCacheDiskLayer::~ZenCacheDiskLayer() -{ - try - { - { - RwLock::ExclusiveLockScope _(m_Lock); - for (auto& It : m_Buckets) - { - m_DroppedBuckets.emplace_back(std::move(It.second)); - } - m_Buckets.clear(); - } - // We destroy the buckets without holding a lock since destructor calls GcManager::RemoveGcReferencer which takes an exclusive lock. - // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock - m_DroppedBuckets.clear(); - } - catch (std::exception& Ex) - { - ZEN_ERROR("~ZenCacheDiskLayer() failed. Reason: '{}'", Ex.what()); - } -} - -ZenCacheDiskLayer::CacheBucket* -ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) -{ - ZEN_TRACE_CPU("Z$::GetOrCreateBucket"); - const auto BucketName = std::string(InBucket); - - { - RwLock::SharedLockScope SharedLock(m_Lock); - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - return It->second.get(); - } - } - - // We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock. - // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock - std::unique_ptr<CacheBucket> Bucket( - std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); - - RwLock::ExclusiveLockScope Lock(m_Lock); - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - return It->second.get(); - } - - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= BucketName; - try - { - if (!Bucket->OpenOrCreate(BucketPath)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - return nullptr; - } - } - catch (const std::exception& Err) - { - ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - throw; - } - - CacheBucket* Result = Bucket.get(); - m_Buckets.emplace(BucketName, std::move(Bucket)); - - return Result; -} - -bool -ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) -{ - ZEN_TRACE_CPU("Z$::Get"); - - if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) - { - if (Bucket->Get(HashKey, OutValue)) - { - TryMemCacheTrim(); - return true; - } - } - return false; -} - -void -ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References) -{ - ZEN_TRACE_CPU("Z$::Put"); - - if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) - { - Bucket->Put(HashKey, Value, References); - TryMemCacheTrim(); - } -} - -void -ZenCacheDiskLayer::DiscoverBuckets() -{ - ZEN_TRACE_CPU("Z$::DiscoverBuckets"); - - DirectoryContent DirContent; - GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); - - // Initialize buckets - - std::vector<std::filesystem::path> BadBucketDirectories; - std::vector<std::filesystem::path> FoundBucketDirectories; - - RwLock::ExclusiveLockScope _(m_Lock); - - for (const std::filesystem::path& BucketPath : DirContent.Directories) - { - const std::string BucketName = PathToUtf8(BucketPath.stem()); - - if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) - { - continue; - } - - if (IsKnownBadBucketName(BucketName)) - { - BadBucketDirectories.push_back(BucketPath); - - continue; - } - - FoundBucketDirectories.push_back(BucketPath); - ZEN_INFO("Discovered bucket '{}'", BucketName); - } - - for (const std::filesystem::path& BadBucketPath : BadBucketDirectories) - { - bool IsOk = false; - - try - { - IsOk = DeleteDirectories(BadBucketPath); - } - catch (std::exception&) - { - } - - if (IsOk) - { - ZEN_INFO("found bad bucket at '{}', deleted contents", BadBucketPath); - } - else - { - ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath); - } - } - - RwLock SyncLock; - - WorkerThreadPool& Pool = GetLargeWorkerPool(); - Latch WorkLatch(1); - for (auto& BucketPath : FoundBucketDirectories) - { - WorkLatch.AddCount(1); - Pool.ScheduleWork([&]() { - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); - - const std::string BucketName = PathToUtf8(BucketPath.stem()); - try - { - std::unique_ptr<CacheBucket> NewBucket = - std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig); - - CacheBucket* Bucket = nullptr; - { - RwLock::ExclusiveLockScope __(SyncLock); - auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket)); - Bucket = InsertResult.first->second.get(); - } - ZEN_ASSERT(Bucket); - - if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - - { - RwLock::ExclusiveLockScope __(SyncLock); - m_Buckets.erase(BucketName); - } - } - } - catch (const std::exception& Err) - { - ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - return; - } - }); - } - WorkLatch.CountDown(); - WorkLatch.Wait(); -} - -bool -ZenCacheDiskLayer::DropBucket(std::string_view InBucket) -{ - ZEN_TRACE_CPU("Z$::DropBucket"); - - RwLock::ExclusiveLockScope _(m_Lock); - - auto It = m_Buckets.find(std::string(InBucket)); - - if (It != m_Buckets.end()) - { - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It); - - return Bucket.Drop(); - } - - // Make sure we remove the folder even if we don't know about the bucket - std::filesystem::path BucketPath = m_RootDir; - BucketPath /= std::string(InBucket); - return MoveAndDeleteDirectory(BucketPath); -} - -bool -ZenCacheDiskLayer::Drop() -{ - ZEN_TRACE_CPU("Z$::Drop"); - - RwLock::ExclusiveLockScope _(m_Lock); - - std::vector<std::unique_ptr<CacheBucket>> Buckets; - Buckets.reserve(m_Buckets.size()); - while (!m_Buckets.empty()) - { - const auto& It = m_Buckets.begin(); - CacheBucket& Bucket = *It->second; - m_DroppedBuckets.push_back(std::move(It->second)); - m_Buckets.erase(It->first); - if (!Bucket.Drop()) - { - return false; - } - } - return MoveAndDeleteDirectory(m_RootDir); -} - -void -ZenCacheDiskLayer::Flush() -{ - ZEN_TRACE_CPU("Z$::Flush"); - - std::vector<CacheBucket*> Buckets; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (Buckets.empty()) - { - return; - } - ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - { - RwLock::SharedLockScope __(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - CacheBucket* Bucket = Kv.second.get(); - Buckets.push_back(Bucket); - } - } - { - WorkerThreadPool& Pool = GetSmallWorkerPool(); - Latch WorkLatch(1); - for (auto& Bucket : Buckets) - { - WorkLatch.AddCount(1); - Pool.ScheduleWork([&]() { - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); - Bucket->Flush(); - }); - } - WorkLatch.CountDown(); - while (!WorkLatch.Wait(1000)) - { - ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); - } - } -} - -void -ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) -{ - ZEN_TRACE_CPU("Z$::ScrubStorage"); - - RwLock::SharedLockScope _(m_Lock); - { - std::vector<std::future<void>> Results; - Results.reserve(m_Buckets.size()); - - for (auto& Kv : m_Buckets) - { -#if 1 - Results.push_back(Ctx.ThreadPool().EnqueueTask( - std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); -#else - CacheBucket& Bucket = *Kv.second; - Bucket.ScrubStorage(Ctx); -#endif - } - - for (auto& Result : Results) - { - Result.get(); - } - } -} - -void -ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::GatherReferences"); - - std::vector<CacheBucket*> Buckets; - { - RwLock::SharedLockScope _(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Buckets.push_back(Kv.second.get()); - } - } - for (CacheBucket* Bucket : Buckets) - { - Bucket->GatherReferences(GcCtx); - } -} - -GcStorageSize -ZenCacheDiskLayer::StorageSize() const -{ - GcStorageSize StorageSize{}; - - RwLock::SharedLockScope _(m_Lock); - for (auto& Kv : m_Buckets) - { - GcStorageSize BucketSize = Kv.second->StorageSize(); - StorageSize.DiskSize += BucketSize.DiskSize; - StorageSize.MemorySize += BucketSize.MemorySize; - } - - return StorageSize; -} - -ZenCacheDiskLayer::DiskStats -ZenCacheDiskLayer::Stats() const -{ - GcStorageSize Size = StorageSize(); - ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; - { - RwLock::SharedLockScope _(m_Lock); - Stats.BucketStats.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Stats.BucketStats.emplace_back(NamedBucketStats{.BucketName = Kv.first, .Stats = Kv.second->Stats()}); - } - } - return Stats; -} - -ZenCacheDiskLayer::Info -ZenCacheDiskLayer::GetInfo() const -{ - ZenCacheDiskLayer::Info Info = {.RootDir = m_RootDir, .Config = m_Configuration}; - { - RwLock::SharedLockScope _(m_Lock); - Info.BucketNames.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Info.BucketNames.push_back(Kv.first); - Info.EntryCount += Kv.second->EntryCount(); - GcStorageSize BucketSize = Kv.second->StorageSize(); - Info.StorageSize.DiskSize += BucketSize.DiskSize; - Info.StorageSize.MemorySize += BucketSize.MemorySize; - } - } - return Info; -} - -std::optional<ZenCacheDiskLayer::BucketInfo> -ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const -{ - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) - { - return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()}; - } - return {}; -} - -void -ZenCacheDiskLayer::EnumerateBucketContents(std::string_view Bucket, - std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const -{ - RwLock::SharedLockScope _(m_Lock); - - if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) - { - It->second->EnumerateBucketContents(Fn); - } -} - -CacheValueDetails::NamespaceDetails -ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const -{ - CacheValueDetails::NamespaceDetails Details; - { - RwLock::SharedLockScope IndexLock(m_Lock); - if (BucketFilter.empty()) - { - Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1); - for (auto& Kv : m_Buckets) - { - Details.Buckets[Kv.first] = Kv.second->GetValueDetails(IndexLock, ValueFilter); - } - } - else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end()) - { - Details.Buckets[It->first] = It->second->GetValueDetails(IndexLock, ValueFilter); - } - } - return Details; -} - -void -ZenCacheDiskLayer::MemCacheTrim() -{ - ZEN_TRACE_CPU("Z$::MemCacheTrim"); - - ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0); - ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0); - ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0); - - bool Expected = false; - if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true)) - { - return; - } - - try - { - m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) { - ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]"); - - const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); - uint64_t TrimmedSize = 0; - Stopwatch Timer; - const auto Guard = MakeGuard([&] { - ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}", - NiceBytes(TrimmedSize), - NiceBytes(m_TotalMemCachedSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - - const GcClock::Tick NowTick = GcClock::TickCount(); - const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_NextAllowedTrimTick.store(NextTrimTick); - m_IsMemCacheTrimming.store(false); - }); - - const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds); - - static const size_t UsageSlotCount = 2048; - std::vector<uint64_t> UsageSlots; - UsageSlots.reserve(UsageSlotCount); - - std::vector<CacheBucket*> Buckets; - { - RwLock::SharedLockScope __(m_Lock); - Buckets.reserve(m_Buckets.size()); - for (auto& Kv : m_Buckets) - { - Buckets.push_back(Kv.second.get()); - } - } - - const GcClock::TimePoint Now = GcClock::Now(); - { - ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess"); - for (CacheBucket* Bucket : Buckets) - { - Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots); - } - } - - uint64_t TotalSize = 0; - for (size_t Index = 0; Index < UsageSlots.size(); ++Index) - { - TotalSize += UsageSlots[Index]; - if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes) - { - GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount); - TrimmedSize = MemCacheTrim(Buckets, ExpireTime); - break; - } - } - }); - } - catch (std::exception& Ex) - { - ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what()); - m_IsMemCacheTrimming.store(false); - } -} - -uint64_t -ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime) -{ - if (m_Configuration.MemCacheTargetFootprintBytes == 0) - { - return 0; - } - uint64_t TrimmedSize = 0; - for (CacheBucket* Bucket : Buckets) - { - TrimmedSize += Bucket->MemCacheTrim(ExpireTime); - } - const GcClock::TimePoint Now = GcClock::Now(); - const GcClock::Tick NowTick = Now.time_since_epoch().count(); - const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds); - GcClock::Tick LastTrimTick = m_NextAllowedTrimTick; - const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count(); - m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick); - return TrimmedSize; -} - -#if ZEN_WITH_TESTS -void -ZenCacheDiskLayer::SetAccessTime(std::string_view InBucket, const IoHash& HashKey, GcClock::TimePoint Time) -{ - const auto BucketName = std::string(InBucket); - CacheBucket* Bucket = nullptr; - - { - RwLock::SharedLockScope _(m_Lock); - - auto It = m_Buckets.find(BucketName); - - if (It != m_Buckets.end()) - { - Bucket = It->second.get(); - } - } - - if (Bucket == nullptr) - { - return; - } - Bucket->SetAccessTime(HashKey, Time); -} -#endif // ZEN_WITH_TESTS - -} // namespace zen |