aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-11 13:09:03 +0100
committerStefan Boberg <[email protected]>2023-12-11 13:09:03 +0100
commit93afeddbc7a5b5df390a29407f5515acd5a70fc1 (patch)
tree6f85ee551aabe20dece64a750c0b2d5d2c5d2d5d /src/zenserver/cache/cachedisklayer.cpp
parentremoved unnecessary SHA1 references (diff)
parentMake sure that PathFromHandle don't hide true error when throwing exceptions ... (diff)
downloadzen-93afeddbc7a5b5df390a29407f5515acd5a70fc1.tar.xz
zen-93afeddbc7a5b5df390a29407f5515acd5a70fc1.zip
Merge branch 'main' of https://github.com/EpicGames/zen
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp2728
1 files changed, 1746 insertions, 982 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 9bb75480e..0987cd0f1 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -14,6 +14,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zencore/xxhash.h>
+#include <zenutil/workerpools.h>
#include <future>
@@ -25,12 +26,6 @@ namespace {
#pragma pack(push)
#pragma pack(1)
- // We use this to indicate if a on disk bucket needs wiping
- // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scrable the references
- // to block items.
- // See: https://github.com/EpicGames/zen/pull/299
- static const uint32_t CurrentDiskBucketVersion = 1;
-
struct CacheBucketIndexHeader
{
static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
@@ -48,23 +43,94 @@ namespace {
{
return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
}
+
+ bool IsValid() const
+ {
+ if (Magic != ExpectedMagic)
+ {
+ return false;
+ }
+
+ if (Checksum != ComputeChecksum(*this))
+ {
+ return false;
+ }
+
+ if (PayloadAlignment == 0)
+ {
+ return false;
+ }
+
+ return true;
+ }
};
static_assert(sizeof(CacheBucketIndexHeader) == 32);
+ struct BucketMetaHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta';
+ static constexpr uint32_t Version1 = 1;
+ static constexpr uint32_t CurrentVersion = Version1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t Padding = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const BucketMetaHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+
+ bool IsValid() const
+ {
+ if (Magic != ExpectedMagic)
+ {
+ return false;
+ }
+
+ if (Checksum != ComputeChecksum(*this))
+ {
+ return false;
+ }
+
+ if (Padding != 0)
+ {
+ return false;
+ }
+
+ return true;
+ }
+ };
+
+ static_assert(sizeof(BucketMetaHeader) == 32);
+
#pragma pack(pop)
+ //////////////////////////////////////////////////////////////////////////
+
+ template<typename T>
+ void Reset(T& V)
+ {
+ T Tmp;
+ V.swap(Tmp);
+ }
+
const char* IndexExtension = ".uidx";
const char* LogExtension = ".slog";
+ const char* MetaExtension = ".meta";
std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
{
return BucketDir / (BucketName + IndexExtension);
}
- std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
{
- return BucketDir / (BucketName + ".tmp");
+ return BucketDir / (BucketName + MetaExtension);
}
std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
@@ -72,6 +138,12 @@ namespace {
return BucketDir / (BucketName + LogExtension);
}
+ std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ ZEN_UNUSED(BucketName);
+ return BucketDir / "zen_manifest";
+ }
+
bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason)
{
if (Entry.Key == IoHash::Zero)
@@ -140,26 +212,458 @@ namespace {
} // namespace
namespace fs = std::filesystem;
+using namespace std::literals;
-static CbObject
-LoadCompactBinaryObject(const fs::path& Path)
+class BucketManifestSerializer
{
- FileContents Result = ReadFile(Path);
+ using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex;
+ using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData;
+
+ using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex;
+ using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload;
+
+public:
+ // We use this to indicate if a on disk bucket needs wiping
+ // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references
+ // to block items.
+ // See: https://github.com/EpicGames/zen/pull/299
+ static inline const uint32_t CurrentDiskBucketVersion = 1;
- if (!Result.ErrorCode)
+ bool Open(std::filesystem::path ManifestPath)
{
- IoBuffer Buffer = Result.Flatten();
- if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
+ Manifest = LoadCompactBinaryObject(ManifestPath);
+ return !!Manifest;
+ }
+
+ Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); }
+
+ bool IsCurrentVersion(uint32_t& OutVersion) const
+ {
+ OutVersion = Manifest["Version"sv].AsUInt32(0);
+ return OutVersion == CurrentDiskBucketVersion;
+ }
+
+ void ParseManifest(RwLock::ExclusiveLockScope& BucketLock,
+ ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path ManifestPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads);
+
+ Oid GenerateNewManifest(std::filesystem::path ManifestPath);
+
+ IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount);
+ uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); }
+ void WriteSidecarFile(RwLock::SharedLockScope& BucketLock,
+ const std::filesystem::path& SidecarPath,
+ uint64_t SnapshotLogPosition,
+ const ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ const std::vector<AccessTime>& AccessTimes,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas);
+ bool ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock,
+ ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path SidecarPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads);
+
+ IoBuffer MakeManifest(const Oid& BucketId,
+ ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
+ std::vector<AccessTime>&& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>&& Payloads,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>&& MetaDatas);
+
+ CbObject Manifest;
+
+private:
+ CbObject LoadCompactBinaryObject(const fs::path& Path)
+ {
+ FileContents Result = ReadFile(Path);
+
+ if (!Result.ErrorCode)
{
- return LoadCompactBinaryObject(Buffer);
+ IoBuffer Buffer = Result.Flatten();
+ if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
+ {
+ return zen::LoadCompactBinaryObject(Buffer);
+ }
}
+
+ return CbObject();
}
- return CbObject();
+ uint64_t m_ManifestEntryCount = 0;
+
+ struct ManifestData
+ {
+ IoHash Key; // 20
+ AccessTime Timestamp; // 4
+ IoHash RawHash; // 20
+ uint32_t Padding_0; // 4
+ size_t RawSize; // 8
+ uint64_t Padding_1; // 8
+ };
+
+ static_assert(sizeof(ManifestData) == 64);
+};
+
+void
+BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& BucketLock,
+ ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path ManifestPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
+{
+ if (Manifest["UsingMetaFile"sv].AsBool())
+ {
+ ReadSidecarFile(BucketLock, Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads);
+
+ return;
+ }
+
+ ZEN_TRACE_CPU("Z$::ParseManifest");
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
+ const uint64_t Count = Manifest["Count"sv].AsUInt64(0);
+ std::vector<PayloadIndex> KeysIndexes;
+ KeysIndexes.reserve(Count);
+
+ CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView();
+ for (CbFieldView& KeyView : KeyArray)
+ {
+ if (auto It = Index.find(KeyView.AsHash()); It != Index.end())
+ {
+ KeysIndexes.push_back(It.value());
+ }
+ else
+ {
+ KeysIndexes.push_back(PayloadIndex());
+ }
+ }
+
+ size_t KeyIndexOffset = 0;
+ CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView();
+ for (CbFieldView& TimeStampView : TimeStampArray)
+ {
+ const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
+ if (KeyIndex)
+ {
+ AccessTimes[KeyIndex] = TimeStampView.AsInt64();
+ }
+ }
+
+ KeyIndexOffset = 0;
+ CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView();
+ CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView();
+ if (RawHashArray.Num() == RawSizeArray.Num())
+ {
+ auto RawHashIt = RawHashArray.CreateViewIterator();
+ auto RawSizeIt = RawSizeArray.CreateViewIterator();
+ while (RawHashIt != CbFieldViewIterator())
+ {
+ const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
+
+ if (KeyIndex)
+ {
+ uint64_t RawSize = RawSizeIt.AsUInt64();
+ IoHash RawHash = RawHashIt.AsHash();
+ if (RawSize != 0 || RawHash != IoHash::Zero)
+ {
+ BucketPayload& Payload = Payloads[KeyIndex];
+ Bucket.SetMetaData(BucketLock, Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash});
+ }
+ }
+
+ RawHashIt++;
+ RawSizeIt++;
+ }
+ }
+ else
+ {
+ ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath);
+ }
+}
+
+Oid
+BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath)
+{
+ const Oid BucketId = Oid::NewOid();
+
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << BucketId;
+ Writer << "Version"sv << CurrentDiskBucketVersion;
+ Manifest = Writer.Save();
+ WriteFile(ManifestPath, Manifest.GetBuffer().AsIoBuffer());
+
+ return BucketId;
+}
+
+IoBuffer
+BucketManifestSerializer::MakeManifest(const Oid& BucketId,
+ ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
+ std::vector<AccessTime>&& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>&& Payloads,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>&& MetaDatas)
+{
+ using namespace std::literals;
+
+ ZEN_TRACE_CPU("Z$::MakeManifest");
+
+ size_t ItemCount = Index.size();
+
+ // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth
+ // And we don't need to reallocate the underlying buffer in almost every case
+ const size_t EstimatedSizePerItem = 54u;
+ const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128);
+ CbObjectWriter Writer(ReserveSize);
+
+ Writer << "BucketId"sv << BucketId;
+ Writer << "Version"sv << CurrentDiskBucketVersion;
+
+ if (!Index.empty())
+ {
+ Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size()));
+ Writer.BeginArray("Keys"sv);
+ for (auto& Kv : Index)
+ {
+ const IoHash& Key = Kv.first;
+ Writer.AddHash(Key);
+ }
+ Writer.EndArray();
+
+ Writer.BeginArray("Timestamps"sv);
+ for (auto& Kv : Index)
+ {
+ GcClock::Tick AccessTime = AccessTimes[Kv.second];
+ Writer.AddInteger(AccessTime);
+ }
+ Writer.EndArray();
+
+ if (!MetaDatas.empty())
+ {
+ Writer.BeginArray("RawHash"sv);
+ for (auto& Kv : Index)
+ {
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second];
+ if (Payload.MetaData)
+ {
+ Writer.AddHash(MetaDatas[Payload.MetaData].RawHash);
+ }
+ else
+ {
+ Writer.AddHash(IoHash::Zero);
+ }
+ }
+ Writer.EndArray();
+
+ Writer.BeginArray("RawSize"sv);
+ for (auto& Kv : Index)
+ {
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second];
+ if (Payload.MetaData)
+ {
+ Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize);
+ }
+ else
+ {
+ Writer.AddInteger(0);
+ }
+ }
+ Writer.EndArray();
+ }
+ }
+
+ Manifest = Writer.Save();
+ return Manifest.GetBuffer().AsIoBuffer();
+}
+
+IoBuffer
+BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount)
+{
+ m_ManifestEntryCount = EntryCount;
+
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << BucketId;
+ Writer << "Version"sv << CurrentDiskBucketVersion;
+ Writer << "Count"sv << EntryCount;
+ Writer << "UsingMetaFile"sv << true;
+ Manifest = Writer.Save();
+
+ return Manifest.GetBuffer().AsIoBuffer();
+}
+
+bool
+BucketManifestSerializer::ReadSidecarFile(RwLock::ExclusiveLockScope& BucketLock,
+ ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path SidecarPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
+{
+ ZEN_ASSERT(AccessTimes.size() == Payloads.size());
+
+ std::error_code Ec;
+
+ BasicFile SidecarFile;
+ SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to open sidecar file '{}'", SidecarPath));
+ }
+
+ uint64_t FileSize = SidecarFile.FileSize();
+
+ auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); });
+
+ if (FileSize < sizeof(BucketMetaHeader))
+ {
+ return false;
+ }
+
+ BasicFileBuffer Sidecar(SidecarFile, 128 * 1024);
+
+ BucketMetaHeader Header;
+ Sidecar.Read(&Header, sizeof Header, 0);
+
+ if (!Header.IsValid())
+ {
+ return false;
+ }
+
+ if (Header.Version != BucketMetaHeader::Version1)
+ {
+ return false;
+ }
+
+ const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData);
+ if (Header.EntryCount > ExpectedEntryCount)
+ {
+ return false;
+ }
+
+ InvalidGuard.Dismiss();
+
+ uint64_t RemainingEntryCount = ExpectedEntryCount;
+ uint64_t EntryCount = 0;
+ uint64_t CurrentReadOffset = sizeof(Header);
+
+ while (RemainingEntryCount--)
+ {
+ const ManifestData* Entry = Sidecar.MakeView<ManifestData>(CurrentReadOffset);
+ CurrentReadOffset += sizeof(ManifestData);
+
+ if (auto It = Index.find(Entry->Key); It != Index.end())
+ {
+ PayloadIndex PlIndex = It.value();
+
+ ZEN_ASSERT(size_t(PlIndex) <= Payloads.size());
+
+ ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex];
+
+ AccessTimes[PlIndex] = Entry->Timestamp;
+
+ if (Entry->RawSize && Entry->RawHash != IoHash::Zero)
+ {
+ Bucket.SetMetaData(BucketLock, PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash});
+ }
+ }
+
+ EntryCount++;
+ }
+
+ ZEN_ASSERT(EntryCount == ExpectedEntryCount);
+
+ return true;
+}
+
+void
+BucketManifestSerializer::WriteSidecarFile(RwLock::SharedLockScope&,
+ const std::filesystem::path& SidecarPath,
+ uint64_t SnapshotLogPosition,
+ const ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ const std::vector<AccessTime>& AccessTimes,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas)
+{
+ BucketMetaHeader Header;
+ Header.EntryCount = m_ManifestEntryCount;
+ Header.LogPosition = SnapshotLogPosition;
+ Header.Checksum = Header.ComputeChecksum(Header);
+
+ std::error_code Ec;
+
+ TemporaryFile SidecarFile;
+ SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath()));
+ }
+
+ SidecarFile.Write(&Header, sizeof Header, 0);
+
+ // TODO: make this batching for better performance
+ {
+ uint64_t WriteOffset = sizeof Header;
+
+ // BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024);
+
+ std::vector<ManifestData> ManifestDataBuffer;
+ const size_t MaxManifestDataBufferCount = Min(Index.size(), 4096u); // 256 Kb
+ ManifestDataBuffer.reserve(MaxManifestDataBufferCount);
+ for (auto& Kv : Index)
+ {
+ const IoHash& Key = Kv.first;
+ const PayloadIndex PlIndex = Kv.second;
+
+ IoHash RawHash = IoHash::Zero;
+ uint64_t RawSize = 0;
+
+ if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData)
+ {
+ RawHash = MetaDatas[MetaIndex].RawHash;
+ RawSize = MetaDatas[MetaIndex].RawSize;
+ }
+
+ ManifestDataBuffer.emplace_back(ManifestData{.Key = Key,
+ .Timestamp = AccessTimes[PlIndex],
+ .RawHash = RawHash,
+ .Padding_0 = 0,
+ .RawSize = RawSize,
+ .Padding_1 = 0});
+ if (ManifestDataBuffer.size() == MaxManifestDataBufferCount)
+ {
+ const uint64_t WriteSize = sizeof(ManifestData) * ManifestDataBuffer.size();
+ SidecarFile.Write(ManifestDataBuffer.data(), WriteSize, WriteOffset);
+ WriteOffset += WriteSize;
+ ManifestDataBuffer.clear();
+ ManifestDataBuffer.reserve(MaxManifestDataBufferCount);
+ }
+ }
+ if (ManifestDataBuffer.size() > 0)
+ {
+ SidecarFile.Write(ManifestDataBuffer.data(), sizeof(ManifestData) * ManifestDataBuffer.size(), WriteOffset);
+ }
+ }
+
+ SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath));
+ }
}
//////////////////////////////////////////////////////////////////////////
+static const float IndexMinLoadFactor = 0.2f;
+static const float IndexMaxLoadFactor = 0.7f;
+
ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
std::atomic_uint64_t& OuterCacheMemoryUsage,
std::string BucketName,
@@ -170,6 +674,9 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
, m_Configuration(Config)
, m_BucketId(Oid::Zero)
{
+ m_Index.min_load_factor(IndexMinLoadFactor);
+ m_Index.max_load_factor(IndexMaxLoadFactor);
+
if (m_BucketName.starts_with(std::string_view("legacy")) || m_BucketName.ends_with(std::string_view("shadermap")))
{
const uint64_t LegacyOverrideSize = 16 * 1024 * 1024;
@@ -192,6 +699,10 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
using namespace std::literals;
ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate");
+ ZEN_ASSERT(m_IsFlushing.load());
+
+ // We want to take the lock here since we register as a GC referencer a construction
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
ZEN_LOG_SCOPE("opening cache bucket '{}'", BucketDir);
@@ -200,169 +711,72 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
CreateDirectories(m_BucketDir);
- std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"};
+ std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
bool IsNew = false;
- CbObject Manifest = LoadCompactBinaryObject(ManifestPath);
+ BucketManifestSerializer ManifestReader;
- if (Manifest)
+ if (ManifestReader.Open(ManifestPath))
{
- m_BucketId = Manifest["BucketId"sv].AsObjectId();
+ m_BucketId = ManifestReader.GetBucketId();
if (m_BucketId == Oid::Zero)
{
return false;
}
- const uint32_t Version = Manifest["Version"sv].AsUInt32(0);
- if (Version != CurrentDiskBucketVersion)
+
+ uint32_t Version = 0;
+ if (ManifestReader.IsCurrentVersion(/* out */ Version) == false)
{
- ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion);
+ ZEN_INFO("Wiping bucket '{}', found version {}, required version {}",
+ BucketDir,
+ Version,
+ BucketManifestSerializer::CurrentDiskBucketVersion);
IsNew = true;
}
}
else if (AllowCreate)
{
- m_BucketId.Generate();
-
- CbObjectWriter Writer;
- Writer << "BucketId"sv << m_BucketId;
- Writer << "Version"sv << CurrentDiskBucketVersion;
- Manifest = Writer.Save();
- WriteFile(m_BucketDir / "zen_manifest", Manifest.GetBuffer().AsIoBuffer());
- IsNew = true;
+ m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath);
+ IsNew = true;
}
else
{
return false;
}
- OpenLog(IsNew);
-
- if (!IsNew)
- {
- ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate::Manifest");
-
- Stopwatch Timer;
- const auto _ =
- MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
-
- const uint64_t kInvalidIndex = ~(0ull);
+ InitializeIndexFromDisk(IndexLock, IsNew);
- const uint64_t Count = Manifest["Count"sv].AsUInt64(0);
- if (Count != 0)
- {
- std::vector<size_t> KeysIndexes;
- KeysIndexes.reserve(Count);
- CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView();
- for (CbFieldView& KeyView : KeyArray)
- {
- if (auto It = m_Index.find(KeyView.AsHash()); It != m_Index.end())
- {
- KeysIndexes.push_back(It.value());
- }
- else
- {
- KeysIndexes.push_back(kInvalidIndex);
- }
- }
- size_t KeyIndexOffset = 0;
- CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView();
- for (CbFieldView& TimeStampView : TimeStampArray)
- {
- const size_t KeyIndex = KeysIndexes[KeyIndexOffset++];
- if (KeyIndex != kInvalidIndex)
- {
- m_AccessTimes[KeyIndex] = TimeStampView.AsInt64();
- }
- }
- KeyIndexOffset = 0;
- CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView();
- CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView();
- if (RawHashArray.Num() == RawSizeArray.Num())
- {
- auto RawHashIt = RawHashArray.CreateViewIterator();
- auto RawSizeIt = RawSizeArray.CreateViewIterator();
- while (RawHashIt != CbFieldViewIterator())
- {
- const size_t KeyIndex = KeysIndexes[KeyIndexOffset++];
-
- if (KeyIndex != kInvalidIndex)
- {
- uint64_t RawSize = RawSizeIt.AsUInt64();
- IoHash RawHash = RawHashIt.AsHash();
- if (RawSize != 0 || RawHash != IoHash::Zero)
- {
- BucketPayload& Payload = m_Payloads[KeyIndex];
- SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash});
- }
- }
-
- RawHashIt++;
- RawSizeIt++;
- }
- }
- else
- {
- ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath);
- }
- }
-
- ////// Legacy format read
- {
- for (CbFieldView Entry : Manifest["Timestamps"sv])
- {
- const CbObjectView Obj = Entry.AsObjectView();
- const IoHash Key = Obj["Key"sv].AsHash();
-
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64();
- }
- }
- for (CbFieldView Entry : Manifest["RawInfo"sv])
- {
- const CbObjectView Obj = Entry.AsObjectView();
- const IoHash Key = Obj["Key"sv].AsHash();
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
-
- const IoHash RawHash = Obj["RawHash"sv].AsHash();
- const uint64_t RawSize = Obj["RawSize"sv].AsUInt64();
-
- if (RawHash == IoHash::Zero || RawSize == 0)
- {
- ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex);
- }
+ auto _ = MakeGuard([&]() {
+ // We are now initialized, allow flushing when we exit
+ m_IsFlushing.store(false);
+ });
- BucketPayload& Payload = m_Payloads[EntryIndex];
- SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash});
- }
- }
- }
+ if (IsNew)
+ {
+ return true;
}
+ ManifestReader.ParseManifest(IndexLock, *this, ManifestPath, m_Index, m_AccessTimes, m_Payloads);
+
return true;
}
void
-ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
+ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeIndexSnapshot");
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot");
- uint64_t LogCount = m_SlogFile.GetLogCount();
+ const uint64_t LogCount = m_SlogFile.GetLogCount();
if (m_LogFlushPosition == LogCount)
{
return;
}
ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir);
- uint64_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
+ const uint64_t EntryCount = m_Index.size();
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
m_BucketDir,
EntryCount,
@@ -371,42 +785,11 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t()
namespace fs = std::filesystem;
- fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
- fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName);
-
- // Move index away, we keep it if something goes wrong
- if (fs::is_regular_file(STmpIndexPath))
- {
- std::error_code Ec;
- if (!fs::remove(STmpIndexPath, Ec) || Ec)
- {
- ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", STmpIndexPath, Ec.message());
- return;
- }
- }
+ fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
try
{
- if (fs::is_regular_file(IndexPath))
- {
- fs::rename(IndexPath, STmpIndexPath);
- }
-
- // Write the current state of the location map to a new index state
- std::vector<DiskIndexEntry> Entries;
- Entries.resize(m_Index.size());
-
- {
- uint64_t EntryIndex = 0;
- for (auto& Entry : m_Index)
- {
- DiskIndexEntry& IndexEntry = Entries[EntryIndex++];
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = m_Payloads[Entry.second].Location;
- }
- }
-
- uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + Entries.size() * sizeof(DiskIndexEntry);
+ const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry);
std::error_code Error;
DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
if (Error)
@@ -426,185 +809,230 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t()
fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize)));
}
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
- CacheBucketIndexHeader Header = {.EntryCount = Entries.size(),
- .LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
+ TemporaryFile ObjectIndexFile;
+ std::error_code Ec;
+ ObjectIndexFile.CreateTemporary(m_BucketDir, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir));
+ }
+
+ {
+ // This is in a separate scope just to ensure IndexWriter goes out
+ // of scope before the file is flushed/closed, in order to ensure
+ // all data is written to the file
+ BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024);
- Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
- ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
- ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
- ObjectIndexFile.Flush();
- ObjectIndexFile.Close();
- EntryCount = Entries.size();
- m_LogFlushPosition = LogCount;
- }
- catch (std::exception& Err)
- {
- ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());
+ CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
+
+ Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
+ IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
+
+ uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader);
- // Restore any previous snapshot
+ for (auto& Entry : m_Index)
+ {
+ DiskIndexEntry IndexEntry;
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Location = m_Payloads[Entry.second].Location;
+ IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset);
+
+ IndexWriteOffset += sizeof(DiskIndexEntry);
+ }
+
+ IndexWriter.Flush();
+ }
- if (fs::is_regular_file(STmpIndexPath))
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
+ if (Ec)
{
- std::error_code Ec;
- fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless
- fs::rename(STmpIndexPath, IndexPath, Ec);
- if (Ec)
+ std::filesystem::path TempFilePath = ObjectIndexFile.GetPath();
+ ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message());
+
+ if (std::filesystem::is_regular_file(TempFilePath))
{
- ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", STmpIndexPath, Ec.message());
+ if (!std::filesystem::remove(TempFilePath, Ec) || Ec)
+ {
+ ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempFilePath, Ec.message());
+ }
}
}
- }
- if (fs::is_regular_file(STmpIndexPath))
- {
- std::error_code Ec;
- if (!fs::remove(STmpIndexPath, Ec) || Ec)
+ else
{
- ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", STmpIndexPath, Ec.message());
+ // We must only update the log flush position once the snapshot write succeeds
+ m_LogFlushPosition = LogCount;
}
}
+ catch (std::exception& Err)
+ {
+ ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());
+ }
}
uint64_t
-ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion)
+ZenCacheDiskLayer::CacheBucket::ReadIndexFile(RwLock::ExclusiveLockScope&, const std::filesystem::path& IndexPath, uint32_t& OutVersion)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile");
- if (std::filesystem::is_regular_file(IndexPath))
+ if (!std::filesystem::is_regular_file(IndexPath))
{
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CacheBucketIndexHeader))
- {
- CacheBucketIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) &&
- (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0))
- {
- switch (Header.Version)
- {
- case CacheBucketIndexHeader::Version2:
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
- if (Header.EntryCount > ExpectedEntryCount)
- {
- break;
- }
- size_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' index containing {} entries in {}",
- IndexPath,
- EntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
+ return 0;
+ }
- m_Configuration.PayloadAlignment = Header.PayloadAlignment;
+ auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); });
- std::vector<DiskIndexEntry> Entries;
- Entries.resize(Header.EntryCount);
- ObjectIndexFile.Read(Entries.data(),
- Header.EntryCount * sizeof(DiskIndexEntry),
- sizeof(CacheBucketIndexHeader));
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t FileSize = ObjectIndexFile.FileSize();
+ if (FileSize < sizeof(CacheBucketIndexHeader))
+ {
+ return 0;
+ }
- m_Payloads.reserve(Header.EntryCount);
- m_Index.reserve(Header.EntryCount);
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- std::string InvalidEntryReason;
- for (const DiskIndexEntry& Entry : Entries)
- {
- if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
- continue;
- }
- PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
- m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location});
- m_Index.insert_or_assign(Entry.Key, EntryIndex);
- EntryCount++;
- }
- m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
- if (m_Configuration.EnableReferenceCaching)
- {
- m_FirstReferenceIndex.resize(m_Payloads.size());
- }
- OutVersion = CacheBucketIndexHeader::Version2;
- return Header.LogPosition;
- }
- break;
- default:
- break;
- }
- }
+ if (!Header.IsValid())
+ {
+ return 0;
+ }
+
+ if (Header.Version != CacheBucketIndexHeader::Version2)
+ {
+ return 0;
+ }
+
+ const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ if (Header.EntryCount > ExpectedEntryCount)
+ {
+ return 0;
+ }
+
+ InvalidGuard.Dismiss();
+
+ size_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ m_Configuration.PayloadAlignment = Header.PayloadAlignment;
+
+ m_Payloads.reserve(Header.EntryCount);
+ m_Index.reserve(Header.EntryCount);
+
+ BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024);
+
+ uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader);
+ uint64_t RemainingEntryCount = Header.EntryCount;
+
+ std::string InvalidEntryReason;
+ while (RemainingEntryCount--)
+ {
+ const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset);
+ CurrentReadOffset += sizeof(DiskIndexEntry);
+
+ if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
}
- ZEN_WARN("skipping invalid index file '{}'", IndexPath);
+
+ const PayloadIndex EntryIndex = PayloadIndex(EntryCount);
+ m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location});
+ m_Index.insert_or_assign(Entry->Key, EntryIndex);
+
+ EntryCount++;
}
- return 0;
+
+ ZEN_ASSERT(EntryCount == m_Payloads.size());
+
+ m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount()));
+
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.resize(EntryCount);
+ }
+
+ OutVersion = CacheBucketIndexHeader::Version2;
+ return Header.LogPosition;
}
uint64_t
-ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
+ZenCacheDiskLayer::CacheBucket::ReadLog(RwLock::ExclusiveLockScope&, const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog");
- if (std::filesystem::is_regular_file(LogPath))
+ if (!std::filesystem::is_regular_file(LogPath))
{
- uint64_t LogEntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
- TCasLogFile<DiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- if (CasLog.Initialize())
- {
- uint64_t EntryCount = CasLog.GetLogCount();
- if (EntryCount < SkipEntryCount)
- {
- ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
- SkipEntryCount = 0;
- }
- LogEntryCount = EntryCount - SkipEntryCount;
- uint64_t InvalidEntryCount = 0;
- CasLog.Replay(
- [&](const DiskIndexEntry& Record) {
- std::string InvalidEntryReason;
- if (Record.Location.Flags & DiskLocation::kTombStone)
- {
- m_Index.erase(Record.Key);
- return;
- }
- if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
- ++InvalidEntryCount;
- return;
- }
- PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
- m_Payloads.emplace_back(BucketPayload{.Location = Record.Location});
- m_Index.insert_or_assign(Record.Key, EntryIndex);
- },
- SkipEntryCount);
- m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
- if (m_Configuration.EnableReferenceCaching)
+ return 0;
+ }
+
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (!CasLog.Initialize())
+ {
+ return 0;
+ }
+
+ const uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+
+ LogEntryCount = EntryCount - SkipEntryCount;
+ uint64_t InvalidEntryCount = 0;
+
+ CasLog.Replay(
+ [&](const DiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Location.Flags & DiskLocation::kTombStone)
{
- m_FirstReferenceIndex.resize(m_Payloads.size());
+ // Note: this leaves m_Payloads and other arrays with 'holes' in them
+ m_Index.erase(Record.Key);
+ return;
}
- if (InvalidEntryCount)
+
+ if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
{
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
}
- return LogEntryCount;
- }
+ PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
+ m_Payloads.emplace_back(BucketPayload{.Location = Record.Location});
+ m_Index.insert_or_assign(Record.Key, EntryIndex);
+ },
+ SkipEntryCount);
+
+ m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
+
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.resize(m_Payloads.size());
}
- return 0;
+
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
+ }
+
+ return LogEntryCount;
};
void
-ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
+ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockScope& IndexLock, const bool IsNew)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog");
@@ -639,7 +1067,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
if (std::filesystem::is_regular_file(IndexPath))
{
uint32_t IndexVersion = 0;
- m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion);
+ m_LogFlushPosition = ReadIndexFile(IndexLock, IndexPath, IndexVersion);
if (IndexVersion == 0)
{
ZEN_WARN("removing invalid index file at '{}'", IndexPath);
@@ -652,19 +1080,18 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
{
if (TCasLogFile<DiskIndexEntry>::IsValid(LogPath))
{
- LogEntryCount = ReadLog(LogPath, m_LogFlushPosition);
+ LogEntryCount = ReadLog(IndexLock, LogPath, m_LogFlushPosition);
}
else if (fs::is_regular_file(LogPath))
{
- ZEN_WARN("removing invalid cas log at '{}'", LogPath);
+ ZEN_WARN("removing invalid log at '{}'", LogPath);
std::filesystem::remove(LogPath);
}
}
m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
- std::vector<BlockStoreLocation> KnownLocations;
- KnownLocations.reserve(m_Index.size());
+ BlockStore::BlockIndexSet KnownBlocks;
for (const auto& Entry : m_Index)
{
size_t EntryIndex = Entry.second;
@@ -674,19 +1101,19 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
{
m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
- continue;
}
- const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment);
- KnownLocations.push_back(BlockLocation);
+ else
+ {
+ const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment);
+ KnownBlocks.Add(BlockLocation.BlockIndex);
+ }
}
-
- m_BlockStore.SyncExistingBlocksOnDisk(KnownLocations);
+ m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
if (IsNew || LogEntryCount > 0)
{
- MakeIndexSnapshot();
+ WriteIndexSnapshot(IndexLock);
}
- // TODO: should validate integrity of container files here
}
void
@@ -759,7 +1186,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
return false;
}
- size_t EntryIndex = It.value();
+ PayloadIndex EntryIndex = It.value();
m_AccessTimes[EntryIndex] = GcClock::TickCount();
DiskLocation Location = m_Payloads[EntryIndex].Location;
@@ -776,7 +1203,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
if (Payload->MemCached)
{
- OutValue.Value = m_MemCachedPayloads[Payload->MemCached];
+ OutValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload;
Payload = nullptr;
IndexLock.ReleaseNow();
m_MemoryHitCount++;
@@ -803,14 +1230,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::Get::MemCache");
OutValue.Value = IoBufferBuilder::ReadFromFileMaybe(OutValue.Value);
- RwLock::ExclusiveLockScope _(m_IndexLock);
+ RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
if (auto UpdateIt = m_Index.find(HashKey); UpdateIt != m_Index.end())
{
- BucketPayload& WritePayload = m_Payloads[EntryIndex];
+ BucketPayload& WritePayload = m_Payloads[UpdateIt->second];
// Only update if it has not already been updated by other thread
if (!WritePayload.MemCached)
{
- SetMemCachedData(WritePayload, OutValue.Value);
+ SetMemCachedData(UpdateIndexLock, UpdateIt->second, OutValue.Value);
}
}
}
@@ -835,7 +1262,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
OutValue.RawHash = IoHash::HashBuffer(OutValue.Value);
OutValue.RawSize = OutValue.Value.GetSize();
}
- RwLock::ExclusiveLockScope __(m_IndexLock);
+ RwLock::ExclusiveLockScope UpdateIndexLock(m_IndexLock);
if (auto WriteIt = m_Index.find(HashKey); WriteIt != m_Index.end())
{
BucketPayload& WritePayload = m_Payloads[WriteIt.value()];
@@ -843,7 +1270,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
// Only set if no other path has already updated the meta data
if (!WritePayload.MetaData)
{
- SetMetaData(WritePayload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash});
+ SetMetaData(UpdateIndexLock, WritePayload, {.RawSize = OutValue.RawSize, .RawHash = OutValue.RawHash});
}
}
}
@@ -877,48 +1304,84 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
m_DiskWriteCount++;
}
-void
+uint64_t
ZenCacheDiskLayer::CacheBucket::MemCacheTrim(GcClock::TimePoint ExpireTime)
{
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::MemCacheTrim");
+
+ uint64_t Trimmed = 0;
GcClock::Tick ExpireTicks = ExpireTime.time_since_epoch().count();
- RwLock::ExclusiveLockScope _(m_IndexLock);
- for (const auto& Kv : m_Index)
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
+ if (MemCachedCount == 0)
{
- if (m_AccessTimes[Kv.second] < ExpireTicks)
+ return 0;
+ }
+
+ uint32_t WriteIndex = 0;
+ for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
+ {
+ MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
+ if (!Data.Payload)
{
- BucketPayload& Payload = m_Payloads[Kv.second];
- RemoveMemCachedData(Payload);
+ continue;
}
+ PayloadIndex Index = Data.OwnerIndex;
+ ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
+ GcClock::Tick AccessTime = m_AccessTimes[Index];
+ if (AccessTime < ExpireTicks)
+ {
+ size_t PayloadSize = Data.Payload.GetSize();
+ RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
+ Data = {};
+ m_Payloads[Index].MemCached = {};
+ Trimmed += PayloadSize;
+ continue;
+ }
+ if (ReadIndex > WriteIndex)
+ {
+ m_MemCachedPayloads[WriteIndex] = MemCacheData{.Payload = std::move(Data.Payload), .OwnerIndex = Index};
+ m_Payloads[Index].MemCached = MemCachedIndex(WriteIndex);
+ }
+ WriteIndex++;
}
+ m_MemCachedPayloads.resize(WriteIndex);
+ m_MemCachedPayloads.shrink_to_fit();
+ zen::Reset(m_FreeMemCachedPayloads);
+ return Trimmed;
}
void
-ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint TickStart,
- GcClock::Duration SectionLength,
- std::vector<uint64_t>& InOutUsageSlots)
+ZenCacheDiskLayer::CacheBucket::GetUsageByAccess(GcClock::TimePoint Now, GcClock::Duration MaxAge, std::vector<uint64_t>& InOutUsageSlots)
{
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::GetUsageByAccess");
+
+ size_t SlotCount = InOutUsageSlots.capacity();
RwLock::SharedLockScope _(m_IndexLock);
- for (const auto& It : m_Index)
+ uint32_t MemCachedCount = gsl::narrow<uint32_t>(m_MemCachedPayloads.size());
+ if (MemCachedCount == 0)
{
- size_t Index = It.second;
- BucketPayload& Payload = m_Payloads[Index];
- if (!Payload.MemCached)
+ return;
+ }
+ for (uint32_t ReadIndex = 0; ReadIndex < MemCachedCount; ++ReadIndex)
+ {
+ MemCacheData& Data = m_MemCachedPayloads[ReadIndex];
+ if (!Data.Payload)
{
continue;
}
+ PayloadIndex Index = Data.OwnerIndex;
+ ZEN_ASSERT_SLOW(m_Payloads[Index].MemCached == MemCachedIndex(ReadIndex));
GcClock::TimePoint ItemAccessTime = GcClock::TimePointFromTick(GcClock::Tick(m_AccessTimes[Index]));
- GcClock::Duration Age = TickStart.time_since_epoch() - ItemAccessTime.time_since_epoch();
- uint64_t Slot = gsl::narrow<uint64_t>(Age.count() > 0 ? Age.count() / SectionLength.count() : 0);
- if (Slot >= InOutUsageSlots.capacity())
+ GcClock::Duration Age = Now > ItemAccessTime ? Now - ItemAccessTime : GcClock::Duration(0);
+ size_t Slot = Age < MaxAge ? gsl::narrow<size_t>((Age.count() * SlotCount) / MaxAge.count()) : (SlotCount - 1);
+ ZEN_ASSERT_SLOW(Slot < SlotCount);
+ if (Slot >= InOutUsageSlots.size())
{
- Slot = InOutUsageSlots.capacity() - 1;
+ InOutUsageSlots.resize(Slot + 1, 0);
}
- if (Slot > InOutUsageSlots.size())
- {
- InOutUsageSlots.resize(uint64_t(Slot + 1), 0);
- }
- InOutUsageSlots[Slot] += m_MemCachedPayloads[Payload.MemCached].GetSize();
+ InOutUsageSlots[Slot] += EstimateMemCachePayloadMemory(Data.Payload.GetSize());
}
}
@@ -976,20 +1439,7 @@ ZenCacheDiskLayer::CacheBucket::Flush()
m_BlockStore.Flush(/*ForceNewBlock*/ false);
m_SlogFile.Flush();
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- std::vector<BucketMetaData> MetaDatas;
- IndexMap Index;
-
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- MakeIndexSnapshot();
- Index = m_Index;
- Payloads = m_Payloads;
- AccessTimes = m_AccessTimes;
- MetaDatas = m_MetaDatas;
- }
- SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas));
+ SaveSnapshot();
}
catch (std::exception& Ex)
{
@@ -998,113 +1448,108 @@ ZenCacheDiskLayer::CacheBucket::Flush()
}
void
-ZenCacheDiskLayer::CacheBucket::SaveManifest(CbObject&& Manifest, const std::function<uint64_t()>& ClaimDiskReserveFunc)
+ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest");
try
{
- IoBuffer Buffer = Manifest.GetBuffer().AsIoBuffer();
-
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
- if (Error)
- {
- ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
- return;
- }
- bool EnoughSpace = Space.Free >= Buffer.GetSize() + 1024 * 512;
- if (!EnoughSpace)
- {
- uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
- EnoughSpace = (Space.Free + ReclaimedSpace) >= Buffer.GetSize() + 1024 * 512;
- }
- if (!EnoughSpace)
- {
- ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", m_BucketDir, NiceBytes(Buffer.GetSize()));
- return;
- }
- WriteFile(m_BucketDir / "zen_manifest", Buffer);
- }
- catch (std::exception& Err)
- {
- ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what());
- }
-}
+ bool UseLegacyScheme = false;
-CbObject
-ZenCacheDiskLayer::CacheBucket::MakeManifest(IndexMap&& Index,
- std::vector<AccessTime>&& AccessTimes,
- const std::vector<BucketPayload>& Payloads,
- const std::vector<BucketMetaData>& MetaDatas)
-{
- using namespace std::literals;
+ IoBuffer Buffer;
+ BucketManifestSerializer ManifestWriter;
- ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeManifest");
-
- size_t ItemCount = Index.size();
+ if (UseLegacyScheme)
+ {
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketPayload> Payloads;
+ std::vector<BucketMetaData> MetaDatas;
+ IndexMap Index;
- // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth
- // And we don't need to reallocate theunderying buffer in almost every case
- const size_t EstimatedSizePerItem = 54u;
- const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128);
- CbObjectWriter Writer(ReserveSize);
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ WriteIndexSnapshot(IndexLock);
+ // Note: this copy could be eliminated on shutdown to
+ // reduce memory usage and execution time
+ Index = m_Index;
+ Payloads = m_Payloads;
+ AccessTimes = m_AccessTimes;
+ MetaDatas = m_MetaDatas;
+ }
- Writer << "BucketId"sv << m_BucketId;
- Writer << "Version"sv << CurrentDiskBucketVersion;
+ Buffer = ManifestWriter.MakeManifest(m_BucketId,
+ std::move(Index),
+ std::move(AccessTimes),
+ std::move(Payloads),
+ std::move(MetaDatas));
+ const uint64_t RequiredSpace = Buffer.GetSize() + 1024 * 512;
- if (!Index.empty())
- {
- Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size()));
- Writer.BeginArray("Keys"sv);
- for (auto& Kv : Index)
- {
- const IoHash& Key = Kv.first;
- Writer.AddHash(Key);
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
+ if (Error)
+ {
+ ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
+ return;
+ }
+ bool EnoughSpace = Space.Free >= RequiredSpace;
+ if (!EnoughSpace)
+ {
+ uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
+ EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace;
+ }
+ if (!EnoughSpace)
+ {
+ ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}",
+ m_BucketDir,
+ NiceBytes(Buffer.GetSize()));
+ return;
+ }
}
- Writer.EndArray();
-
- Writer.BeginArray("Timestamps"sv);
- for (auto& Kv : Index)
+ else
{
- GcClock::Tick AccessTime = AccessTimes[Kv.second];
- Writer.AddInteger(AccessTime);
- }
- Writer.EndArray();
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ WriteIndexSnapshot(IndexLock);
+ const uint64_t EntryCount = m_Index.size();
+ Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount);
+ uint64_t SidecarSize = ManifestWriter.GetSidecarSize();
- if (!MetaDatas.empty())
- {
- Writer.BeginArray("RawHash"sv);
- for (auto& Kv : Index)
+ const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512;
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
+ if (Error)
{
- const BucketPayload& Payload = Payloads[Kv.second];
- if (Payload.MetaData)
- {
- Writer.AddHash(MetaDatas[Payload.MetaData].RawHash);
- }
- else
- {
- Writer.AddHash(IoHash::Zero);
- }
+ ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
+ return;
}
- Writer.EndArray();
-
- Writer.BeginArray("RawSize"sv);
- for (auto& Kv : Index)
+ bool EnoughSpace = Space.Free >= RequiredSpace;
+ if (!EnoughSpace)
{
- const BucketPayload& Payload = Payloads[Kv.second];
- if (Payload.MetaData)
- {
- Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize);
- }
- else
- {
- Writer.AddInteger(0);
- }
+ uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
+ EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace;
}
- Writer.EndArray();
+ if (!EnoughSpace)
+ {
+ ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}",
+ m_BucketDir,
+ NiceBytes(Buffer.GetSize()));
+ return;
+ }
+
+ ManifestWriter.WriteSidecarFile(IndexLock,
+ GetMetaPath(m_BucketDir, m_BucketName),
+ m_LogFlushPosition,
+ m_Index,
+ m_AccessTimes,
+ m_Payloads,
+ m_MetaDatas);
}
+
+ std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
+ WriteFile(ManifestPath, Buffer);
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what());
}
- return Writer.Save();
}
IoHash
@@ -1364,8 +1809,8 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
m_StandaloneSize.fetch_sub(Location.Size(), std::memory_order::relaxed);
}
- RemoveMemCachedData(Payload);
- RemoveMetaData(Payload);
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
Location.Flags |= DiskLocation::kTombStone;
LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
@@ -1395,13 +1840,13 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
std::vector<BucketMetaData> MetaDatas;
- std::vector<IoBuffer> MemCachedPayloads;
+ std::vector<MemCacheData> MemCachedPayloads;
std::vector<ReferenceIndex> FirstReferenceIndex;
IndexMap Index;
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock);
+ CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock);
}
}
}
@@ -1463,6 +1908,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
#endif // CALCULATE_BLOCKING_TIME
+ if (m_Index.empty())
+ {
+ return;
+ }
Index = m_Index;
AccessTimes = m_AccessTimes;
Payloads = m_Payloads;
@@ -1542,10 +1991,9 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
for (const auto& Entry : StructuredItemsWithUnknownAttachments)
{
- const IoHash& Key = Entry.first;
- size_t PayloadIndex = Entry.second;
- BucketPayload& Payload = Payloads[PayloadIndex];
- const DiskLocation& Loc = Payload.Location;
+ const IoHash& Key = Entry.first;
+ BucketPayload& Payload = Payloads[Entry.second];
+ const DiskLocation& Loc = Payload.Location;
{
IoBuffer Buffer;
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
@@ -1568,10 +2016,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
#endif // CALCULATE_BLOCKING_TIME
if (auto It = m_Index.find(Key); It != m_Index.end())
{
- const BucketPayload& CachedPayload = Payloads[PayloadIndex];
+ const BucketPayload& CachedPayload = Payloads[It->second];
if (CachedPayload.MemCached)
{
- Buffer = m_MemCachedPayloads[CachedPayload.MemCached];
+ Buffer = m_MemCachedPayloads[CachedPayload.MemCached].Payload;
ZEN_ASSERT_SLOW(Buffer);
}
else
@@ -1678,20 +2126,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
try
{
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- std::vector<BucketMetaData> MetaDatas;
- IndexMap Index;
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- MakeIndexSnapshot([&]() { return GcCtx.ClaimGCReserve(); });
- Index = m_Index;
- Payloads = m_Payloads;
- AccessTimes = m_AccessTimes;
- MetaDatas = m_MetaDatas;
- }
- SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas),
- [&]() { return GcCtx.ClaimGCReserve(); });
+ SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); });
}
catch (std::exception& Ex)
{
@@ -1699,8 +2134,6 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
}
});
- m_SlogFile.Flush();
-
auto __ = MakeGuard([&]() {
if (!DeletedChunks.empty())
{
@@ -1708,7 +2141,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
std::vector<BucketPayload> Payloads;
std::vector<AccessTime> AccessTimes;
std::vector<BucketMetaData> MetaDatas;
- std::vector<IoBuffer> MemCachedPayloads;
+ std::vector<MemCacheData> MemCachedPayloads;
std::vector<ReferenceIndex> FirstReferenceIndex;
IndexMap Index;
{
@@ -1719,18 +2152,25 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
WriteBlockTimeUs += ElapsedUs;
WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
- CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock);
+ CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock);
}
GcCtx.AddDeletedCids(std::vector<IoHash>(DeletedChunks.begin(), DeletedChunks.end()));
}
});
- std::span<const IoHash> ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string());
+ std::span<const IoHash> ExpiredCacheKeySpan = GcCtx.ExpiredCacheKeys(m_BucketDir.string());
+ if (ExpiredCacheKeySpan.empty())
+ {
+ return;
+ }
+
+ m_SlogFile.Flush();
+
std::unordered_set<IoHash, IoHash::Hasher> ExpiredCacheKeys(ExpiredCacheKeySpan.begin(), ExpiredCacheKeySpan.end());
std::vector<DiskIndexEntry> ExpiredStandaloneEntries;
- IndexMap Index;
- std::vector<BucketPayload> Payloads;
+ IndexMap IndexSnapshot;
+ std::vector<BucketPayload> PayloadsSnapshot;
BlockStore::ReclaimSnapshotState BlockStoreState;
{
bool Expected = false;
@@ -1741,7 +2181,6 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
}
auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
- std::vector<AccessTime> AccessTimes;
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State");
RwLock::SharedLockScope IndexLock(m_IndexLock);
@@ -1755,23 +2194,23 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
- Payloads = m_Payloads;
- AccessTimes = m_AccessTimes;
- Index = m_Index;
-
for (const IoHash& Key : ExpiredCacheKeys)
{
- if (auto It = Index.find(Key); It != Index.end())
+ if (auto It = m_Index.find(Key); It != m_Index.end())
{
- const BucketPayload& Payload = Payloads[It->second];
- DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location};
- if (Entry.Location.Flags & DiskLocation::kStandaloneFile)
+ const BucketPayload& Payload = m_Payloads[It->second];
+ if (Payload.Location.Flags & DiskLocation::kStandaloneFile)
{
+ DiskIndexEntry Entry = {.Key = Key, .Location = Payload.Location};
Entry.Location.Flags |= DiskLocation::kTombStone;
ExpiredStandaloneEntries.push_back(Entry);
}
}
}
+
+ PayloadsSnapshot = m_Payloads;
+ IndexSnapshot = m_Index;
+
if (GcCtx.IsDeletionMode())
{
IndexLock.ReleaseNow();
@@ -1836,7 +2275,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
}
}
- TotalChunkCount = Index.size();
+ TotalChunkCount = IndexSnapshot.size();
std::vector<BlockStoreLocation> ChunkLocations;
BlockStore::ChunkIndexArray KeepChunkIndexes;
@@ -1846,10 +2285,10 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
ChunkIndexToChunkHash.reserve(TotalChunkCount);
{
TotalChunkCount = 0;
- for (const auto& Entry : Index)
+ for (const auto& Entry : IndexSnapshot)
{
size_t EntryIndex = Entry.second;
- const DiskLocation& DiskLocation = Payloads[EntryIndex].Location;
+ const DiskLocation& DiskLocation = PayloadsSnapshot[EntryIndex].Location;
if (DiskLocation.Flags & DiskLocation::kStandaloneFile)
{
@@ -1894,7 +2333,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
std::vector<DiskIndexEntry> LogEntries;
LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
{
- RwLock::ExclusiveLockScope __(m_IndexLock);
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
Stopwatch Timer;
const auto ____ = MakeGuard([&] {
uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
@@ -1908,7 +2347,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
size_t EntryIndex = m_Index[ChunkHash];
BucketPayload& Payload = m_Payloads[EntryIndex];
- if (Payloads[Index[ChunkHash]].Location != m_Payloads[EntryIndex].Location)
+ if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != m_Payloads[EntryIndex].Location)
{
// Entry has been updated while GC was running, ignore the move
continue;
@@ -1921,7 +2360,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
size_t EntryIndex = m_Index[ChunkHash];
BucketPayload& Payload = m_Payloads[EntryIndex];
- if (Payloads[Index[ChunkHash]].Location != Payload.Location)
+ if (PayloadsSnapshot[IndexSnapshot[ChunkHash]].Location != Payload.Location)
{
// Entry has been updated while GC was running, ignore the delete
continue;
@@ -1932,8 +2371,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
m_Configuration.PayloadAlignment,
OldDiskLocation.GetFlags() | DiskLocation::kTombStone)});
- RemoveMemCachedData(Payload);
- RemoveMetaData(Payload);
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
m_Index.erase(ChunkHash);
DeletedChunks.insert(ChunkHash);
@@ -1970,7 +2409,7 @@ ZenCacheDiskLayer::CacheBucket::EntryCount() const
}
CacheValueDetails::ValueDetails
-ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, PayloadIndex Index) const
+ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const IoHash& Key, PayloadIndex Index) const
{
std::vector<IoHash> Attachments;
const BucketPayload& Payload = m_Payloads[Index];
@@ -1982,7 +2421,7 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, PayloadIndex
CbObjectView Obj(Value.GetData());
Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); });
}
- BucketMetaData MetaData = GetMetaData(Payload);
+ BucketMetaData MetaData = GetMetaData(IndexLock, Payload);
return CacheValueDetails::ValueDetails{.Size = Payload.Location.Size(),
.RawSize = MetaData.RawSize,
.RawHash = MetaData.RawHash,
@@ -1992,7 +2431,7 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const IoHash& Key, PayloadIndex
}
CacheValueDetails::BucketDetails
-ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilter) const
+ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLock, const std::string_view ValueFilter) const
{
CacheValueDetails::BucketDetails Details;
RwLock::SharedLockScope _(m_IndexLock);
@@ -2001,7 +2440,7 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilt
Details.Values.reserve(m_Index.size());
for (const auto& It : m_Index)
{
- Details.Values.insert_or_assign(It.first, GetValueDetails(It.first, It.second));
+ Details.Values.insert_or_assign(It.first, GetValueDetails(IndexLock, It.first, It.second));
}
}
else
@@ -2009,7 +2448,7 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilt
IoHash Key = IoHash::FromHexString(ValueFilter);
if (auto It = m_Index.find(Key); It != m_Index.end())
{
- Details.Values.insert_or_assign(It->first, GetValueDetails(It->first, It->second));
+ Details.Values.insert_or_assign(It->first, GetValueDetails(IndexLock, It->first, It->second));
}
}
return Details;
@@ -2019,10 +2458,10 @@ void
ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents(
std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const
{
- RwLock::SharedLockScope _(m_IndexLock);
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
for (const auto& It : m_Index)
{
- CacheValueDetails::ValueDetails Vd = GetValueDetails(It.first, It.second);
+ CacheValueDetails::ValueDetails Vd = GetValueDetails(IndexLock, It.first, It.second);
Fn(It.first, Vd);
}
@@ -2046,7 +2485,10 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
{
Bucket->CollectGarbage(GcCtx);
}
- MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
+ if (!m_IsMemCacheTrimming)
+ {
+ MemCacheTrim(Buckets, GcCtx.CacheExpireTime());
+ }
}
void
@@ -2166,6 +2608,10 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
ValueLock.ReleaseNow();
+ if (m_UpdatedKeys)
+ {
+ m_UpdatedKeys->insert(HashKey);
+ }
PayloadIndex EntryIndex = {};
if (auto It = m_Index.find(HashKey); It == m_Index.end())
@@ -2193,16 +2639,16 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
SetReferences(IndexLock, m_FirstReferenceIndex[EntryIndex], References);
}
m_AccessTimes[EntryIndex] = GcClock::TickCount();
- RemoveMemCachedData(Payload);
+ RemoveMemCachedData(IndexLock, Payload);
m_StandaloneSize.fetch_sub(OldSize, std::memory_order::relaxed);
}
if (Value.RawSize != 0 || Value.RawHash != IoHash::Zero)
{
- SetMetaData(m_Payloads[EntryIndex], {.RawSize = Value.RawSize, .RawHash = Value.RawHash});
+ SetMetaData(IndexLock, m_Payloads[EntryIndex], {.RawSize = Value.RawSize, .RawHash = Value.RawHash});
}
else
{
- RemoveMetaData(m_Payloads[EntryIndex]);
+ RemoveMetaData(IndexLock, m_Payloads[EntryIndex]);
}
m_SlogFile.Append({.Key = HashKey, .Location = Loc});
@@ -2210,7 +2656,9 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
}
void
-ZenCacheDiskLayer::CacheBucket::SetMetaData(BucketPayload& Payload, const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData)
+ZenCacheDiskLayer::CacheBucket::SetMetaData(RwLock::ExclusiveLockScope&,
+ BucketPayload& Payload,
+ const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData)
{
if (Payload.MetaData)
{
@@ -2233,7 +2681,7 @@ ZenCacheDiskLayer::CacheBucket::SetMetaData(BucketPayload& Payload, const ZenCac
}
void
-ZenCacheDiskLayer::CacheBucket::RemoveMetaData(BucketPayload& Payload)
+ZenCacheDiskLayer::CacheBucket::RemoveMetaData(RwLock::ExclusiveLockScope&, BucketPayload& Payload)
{
if (Payload.MetaData)
{
@@ -2243,17 +2691,18 @@ ZenCacheDiskLayer::CacheBucket::RemoveMetaData(BucketPayload& Payload)
}
void
-ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffer& MemCachedData)
+ZenCacheDiskLayer::CacheBucket::SetMemCachedData(RwLock::ExclusiveLockScope&, PayloadIndex PayloadIndex, IoBuffer& MemCachedData)
{
- uint64_t PayloadSize = MemCachedData.GetSize();
+ BucketPayload& Payload = m_Payloads[PayloadIndex];
+ uint64_t PayloadSize = MemCachedData.GetSize();
ZEN_ASSERT(PayloadSize != 0);
if (m_FreeMemCachedPayloads.empty())
{
if (m_MemCachedPayloads.size() != std::numeric_limits<uint32_t>::max())
{
Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(m_MemCachedPayloads.size()));
- m_MemCachedPayloads.push_back(MemCachedData);
- AddMemCacheUsage(PayloadSize);
+ m_MemCachedPayloads.emplace_back(MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex});
+ AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
m_MemoryWriteCount++;
}
}
@@ -2261,20 +2710,20 @@ ZenCacheDiskLayer::CacheBucket::SetMemCachedData(BucketPayload& Payload, IoBuffe
{
Payload.MemCached = m_FreeMemCachedPayloads.back();
m_FreeMemCachedPayloads.pop_back();
- m_MemCachedPayloads[Payload.MemCached] = MemCachedData;
- AddMemCacheUsage(PayloadSize);
+ m_MemCachedPayloads[Payload.MemCached] = MemCacheData{.Payload = MemCachedData, .OwnerIndex = PayloadIndex};
+ AddMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
m_MemoryWriteCount++;
}
}
size_t
-ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(BucketPayload& Payload)
+ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(RwLock::ExclusiveLockScope&, BucketPayload& Payload)
{
if (Payload.MemCached)
{
- size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].GetSize();
- RemoveMemCacheUsage(PayloadSize);
- m_MemCachedPayloads[Payload.MemCached] = IoBuffer{};
+ size_t PayloadSize = m_MemCachedPayloads[Payload.MemCached].Payload.GetSize();
+ RemoveMemCacheUsage(EstimateMemCachePayloadMemory(PayloadSize));
+ m_MemCachedPayloads[Payload.MemCached] = {};
m_FreeMemCachedPayloads.push_back(Payload.MemCached);
Payload.MemCached = {};
return PayloadSize;
@@ -2283,7 +2732,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveMemCachedData(BucketPayload& Payload)
}
ZenCacheDiskLayer::CacheBucket::BucketMetaData
-ZenCacheDiskLayer::CacheBucket::GetMetaData(const BucketPayload& Payload) const
+ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const BucketPayload& Payload) const
{
if (Payload.MetaData)
{
@@ -2316,14 +2765,18 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
m_SlogFile.Append({.Key = HashKey, .Location = Location});
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ if (m_UpdatedKeys)
+ {
+ m_UpdatedKeys->insert(HashKey);
+ }
if (auto It = m_Index.find(HashKey); It != m_Index.end())
{
PayloadIndex EntryIndex = It.value();
ZEN_ASSERT_SLOW(EntryIndex < PayloadIndex(m_AccessTimes.size()));
BucketPayload& Payload = m_Payloads[EntryIndex];
- RemoveMemCachedData(Payload);
- RemoveMetaData(Payload);
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
Payload = (BucketPayload{.Location = Location});
m_AccessTimes[EntryIndex] = GcClock::TickCount();
@@ -2354,12 +2807,246 @@ ZenCacheDiskLayer::CacheBucket::GetGcName(GcCtx&)
return fmt::format("cachebucket:'{}'", m_BucketDir.string());
}
-void
-ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcReferencerStats& Stats)
+class DiskBucketStoreCompactor : public GcStoreCompactor
{
- size_t TotalEntries = 0;
- tsl::robin_set<IoHash, IoHash::Hasher> ExpiredInlineKeys;
- std::vector<std::pair<IoHash, uint64_t>> ExpiredStandaloneKeys;
+public:
+ DiskBucketStoreCompactor(ZenCacheDiskLayer::CacheBucket& Bucket, std::vector<std::pair<IoHash, uint64_t>>&& ExpiredStandaloneKeys)
+ : m_Bucket(Bucket)
+ , m_ExpiredStandaloneKeys(std::move(ExpiredStandaloneKeys))
+ {
+ m_ExpiredStandaloneKeys.shrink_to_fit();
+ }
+
+ virtual ~DiskBucketStoreCompactor() {}
+
+ virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override
+ {
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactStore");
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ Reset(m_ExpiredStandaloneKeys);
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': RemovedDisk: {} in {}",
+ m_Bucket.m_BucketDir,
+ NiceBytes(Stats.RemovedDisk),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ if (!m_ExpiredStandaloneKeys.empty())
+ {
+ // Compact standalone items
+ size_t Skipped = 0;
+ ExtendablePathBuilder<256> Path;
+ for (const std::pair<IoHash, uint64_t>& ExpiredKey : m_ExpiredStandaloneKeys)
+ {
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return;
+ }
+ Path.Reset();
+ m_Bucket.BuildPath(Path, ExpiredKey.first);
+ fs::path FilePath = Path.ToPath();
+
+ RwLock::SharedLockScope IndexLock(m_Bucket.m_IndexLock);
+ if (m_Bucket.m_Index.contains(ExpiredKey.first))
+ {
+ // Someone added it back, let the file on disk be
+ ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back",
+ m_Bucket.m_BucketDir,
+ Path.ToUtf8());
+ continue;
+ }
+
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ RwLock::ExclusiveLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first));
+ IndexLock.ReleaseNow();
+ ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': deleting standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8());
+
+ std::error_code Ec;
+ if (!fs::remove(FilePath, Ec))
+ {
+ continue;
+ }
+ if (Ec)
+ {
+ ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'",
+ m_Bucket.m_BucketDir,
+ Path.ToUtf8(),
+ Ec.message());
+ continue;
+ }
+ Stats.RemovedDisk += ExpiredKey.second;
+ }
+ else
+ {
+ RwLock::SharedLockScope ValueLock(m_Bucket.LockForHash(ExpiredKey.first));
+ IndexLock.ReleaseNow();
+ ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': checking standalone cache file '{}'", m_Bucket.m_BucketDir, Path.ToUtf8());
+
+ std::error_code Ec;
+ bool Existed = std::filesystem::is_regular_file(FilePath, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("GCV2: cachebucket [COMPACT] '{}': failed checking cache payload file '{}'. Reason '{}'",
+ m_Bucket.m_BucketDir,
+ FilePath,
+ Ec.message());
+ continue;
+ }
+ if (!Existed)
+ {
+ continue;
+ }
+ Skipped++;
+ }
+ }
+ if (Skipped > 0)
+ {
+ ZEN_DEBUG("GCV2: cachebucket [COMPACT] '{}': skipped deleting of {} eligible files", m_Bucket.m_BucketDir, Skipped);
+ }
+ }
+
+ if (Ctx.Settings.CollectSmallObjects)
+ {
+ m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys = std::make_unique<HashSet>(); });
+ auto __ = MakeGuard([&]() { m_Bucket.m_IndexLock.WithExclusiveLock([&]() { m_Bucket.m_UpdatedKeys.reset(); }); });
+
+ size_t InlineEntryCount = 0;
+ BlockStore::BlockUsageMap BlockUsage;
+ {
+ RwLock::SharedLockScope ___(m_Bucket.m_IndexLock);
+ for (const auto& Entry : m_Bucket.m_Index)
+ {
+ ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second;
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ InlineEntryCount++;
+ uint32_t BlockIndex = Loc.Location.BlockLocation.GetBlockIndex();
+ uint64_t ChunkSize = RoundUp(Loc.Size(), m_Bucket.m_Configuration.PayloadAlignment);
+ if (auto It = BlockUsage.find(BlockIndex); It != BlockUsage.end())
+ {
+ It->second.EntryCount++;
+ It->second.DiskUsage += ChunkSize;
+ }
+ else
+ {
+ BlockUsage.insert_or_assign(BlockIndex, BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1});
+ }
+ }
+ }
+
+ {
+ BlockStoreCompactState BlockCompactState;
+ std::vector<IoHash> BlockCompactStateKeys;
+ BlockCompactStateKeys.reserve(InlineEntryCount);
+
+ BlockStore::BlockEntryCountMap BlocksToCompact =
+ m_Bucket.m_BlockStore.GetBlocksToCompact(BlockUsage, Ctx.Settings.CompactBlockUsageThresholdPercent);
+ BlockCompactState.IncludeBlocks(BlocksToCompact);
+
+ if (BlocksToCompact.size() > 0)
+ {
+ {
+ RwLock::SharedLockScope ___(m_Bucket.m_IndexLock);
+ for (const auto& Entry : m_Bucket.m_Index)
+ {
+ ZenCacheDiskLayer::CacheBucket::PayloadIndex Index = Entry.second;
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[Index];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ continue;
+ }
+ if (!BlockCompactState.AddKeepLocation(Loc.GetBlockLocation(m_Bucket.m_Configuration.PayloadAlignment)))
+ {
+ continue;
+ }
+ BlockCompactStateKeys.push_back(Entry.first);
+ }
+ }
+
+ if (Ctx.Settings.IsDeleteMode)
+ {
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': compacting {} blocks",
+ m_Bucket.m_BucketDir,
+ BlocksToCompact.size());
+ }
+
+ m_Bucket.m_BlockStore.CompactBlocks(
+ BlockCompactState,
+ m_Bucket.m_Configuration.PayloadAlignment,
+ [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
+ std::vector<DiskIndexEntry> MovedEntries;
+ MovedEntries.reserve(MovedArray.size());
+ RwLock::ExclusiveLockScope _(m_Bucket.m_IndexLock);
+ for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
+ {
+ size_t ChunkIndex = Moved.first;
+ const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
+
+ if (m_Bucket.m_UpdatedKeys->contains(Key))
+ {
+ continue;
+ }
+
+ if (auto It = m_Bucket.m_Index.find(Key); It != m_Bucket.m_Index.end())
+ {
+ ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Bucket.m_Payloads[It->second];
+ const BlockStoreLocation& NewLocation = Moved.second;
+ Payload.Location = DiskLocation(NewLocation,
+ m_Bucket.m_Configuration.PayloadAlignment,
+ Payload.Location.GetFlags());
+ MovedEntries.push_back({.Key = Key, .Location = Payload.Location});
+ }
+ }
+ m_Bucket.m_SlogFile.Append(MovedEntries);
+ Stats.RemovedDisk += FreedDiskSpace;
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return false;
+ }
+ return true;
+ },
+ ClaimDiskReserveCallback);
+ }
+ else
+ {
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: cachebucket [COMPACT] '{}': skipped compacting of {} eligible blocks",
+ m_Bucket.m_BucketDir,
+ BlocksToCompact.size());
+ }
+ }
+ }
+ }
+ }
+ }
+
+private:
+ ZenCacheDiskLayer::CacheBucket& m_Bucket;
+ std::vector<std::pair<IoHash, uint64_t>> m_ExpiredStandaloneKeys;
+};
+
+GcStoreCompactor*
+ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
+{
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveExpiredData");
+
+ size_t TotalEntries = 0;
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -2367,36 +3054,38 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcReferencerStats&
{
return;
}
- ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, RemovedDisk: {}, RemovedMemory: {} in {}",
+ ZEN_INFO("GCV2: cachebucket [REMOVE EXPIRED] '{}': Count: {}, Expired: {}, Deleted: {}, FreedMemory: {} in {}",
m_BucketDir,
- Stats.Count,
- Stats.Expired,
- Stats.Deleted,
- NiceBytes(Stats.RemovedDisk),
- NiceBytes(Stats.RemovedMemory),
+ Stats.CheckedCount,
+ Stats.FoundCount,
+ Stats.DeletedCount,
+ NiceBytes(Stats.FreedMemory),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
const GcClock::Tick ExpireTicks = Ctx.Settings.CacheExpireTime.time_since_epoch().count();
- BlockStoreCompactState BlockCompactState;
- BlockStore::ReclaimSnapshotState BlockSnapshotState;
- std::vector<IoHash> BlockCompactStateKeys;
- std::vector<DiskIndexEntry> ExpiredEntries;
- uint64_t RemovedStandaloneSize = 0;
+ std::vector<DiskIndexEntry> ExpiredEntries;
+ std::vector<std::pair<IoHash, uint64_t>> ExpiredStandaloneKeys;
+ uint64_t RemovedStandaloneSize = 0;
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- if (Ctx.Settings.CollectSmallObjects)
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return nullptr;
+ }
+ if (m_Index.empty())
{
- BlockSnapshotState = m_BlockStore.GetReclaimSnapshotState();
+ return nullptr;
}
+
TotalEntries = m_Index.size();
- // Find out expired keys and affected blocks
+ // Find out expired keys
for (const auto& Entry : m_Index)
{
const IoHash& Key = Entry.first;
- size_t EntryIndex = Entry.second;
+ PayloadIndex EntryIndex = Entry.second;
GcClock::Tick AccessTime = m_AccessTimes[EntryIndex];
if (AccessTime >= ExpireTicks)
{
@@ -2415,40 +3104,16 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcReferencerStats&
}
else if (Ctx.Settings.CollectSmallObjects)
{
- ExpiredInlineKeys.insert(Key);
- uint32_t BlockIndex = Payload.Location.Location.BlockLocation.GetBlockIndex();
- bool IsActiveWriteBlock = BlockSnapshotState.m_ActiveWriteBlocks.contains(BlockIndex);
- if (!IsActiveWriteBlock)
- {
- BlockCompactState.IncludeBlock(BlockIndex);
- }
ExpiredEntries.push_back(ExpiredEntry);
}
}
- Stats.Expired += ExpiredStandaloneKeys.size() + ExpiredInlineKeys.size();
+ Stats.CheckedCount += TotalEntries;
+ Stats.FoundCount += ExpiredEntries.size();
- // Get all locations we need to keep for affected blocks
- if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty())
+ if (Ctx.IsCancelledFlag.load())
{
- for (const auto& Entry : m_Index)
- {
- const IoHash& Key = Entry.first;
- if (ExpiredInlineKeys.contains(Key))
- {
- continue;
- }
- size_t EntryIndex = Entry.second;
- const BucketPayload& Payload = m_Payloads[EntryIndex];
- if (Payload.Location.Flags & DiskLocation::kStandaloneFile)
- {
- continue;
- }
- if (BlockCompactState.AddKeepLocation(Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment)))
- {
- BlockCompactStateKeys.push_back(Key);
- }
- }
+ return nullptr;
}
if (Ctx.Settings.IsDeleteMode)
@@ -2458,132 +3123,291 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcReferencerStats&
auto It = m_Index.find(Entry.Key);
ZEN_ASSERT(It != m_Index.end());
BucketPayload& Payload = m_Payloads[It->second];
- RemoveMetaData(Payload);
- Stats.RemovedMemory += RemoveMemCachedData(Payload);
+ RemoveMetaData(IndexLock, Payload);
+ Stats.FreedMemory += RemoveMemCachedData(IndexLock, Payload);
m_Index.erase(It);
+ Stats.DeletedCount++;
}
m_SlogFile.Append(ExpiredEntries);
m_StandaloneSize.fetch_sub(RemovedStandaloneSize, std::memory_order::relaxed);
}
}
- Stats.Count += TotalEntries;
- if (ExpiredEntries.empty())
+ if (Ctx.Settings.IsDeleteMode && !ExpiredEntries.empty())
{
- return;
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketMetaData> MetaDatas;
+ std::vector<MemCacheData> MemCachedPayloads;
+ std::vector<ReferenceIndex> FirstReferenceIndex;
+ IndexMap Index;
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
+ CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock);
+ }
}
- if (!Ctx.Settings.IsDeleteMode)
+ if (Ctx.IsCancelledFlag.load())
{
- return;
+ return nullptr;
}
- Stats.Deleted += ExpiredEntries.size();
-
- // Compact standalone items
- ExtendablePathBuilder<256> Path;
- for (const std::pair<IoHash, uint64_t>& ExpiredKey : ExpiredStandaloneKeys)
- {
- Path.Reset();
- BuildPath(Path, ExpiredKey.first);
- fs::path FilePath = Path.ToPath();
+ return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys));
+}
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- if (m_Index.contains(ExpiredKey.first))
- {
- // Someone added it back, let the file on disk be
- ZEN_DEBUG("gc cache bucket '{}': skipping z$ delete standalone of file '{}' FAILED, it has been added back",
- m_BucketDir,
- Path.ToUtf8());
- continue;
- }
+class DiskBucketReferenceChecker : public GcReferenceChecker
+{
+ using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex;
+ using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload;
+ using CacheBucket = ZenCacheDiskLayer::CacheBucket;
+ using ReferenceIndex = ZenCacheDiskLayer::CacheBucket::ReferenceIndex;
- RwLock::ExclusiveLockScope ValueLock(LockForHash(ExpiredKey.first));
- IndexLock.ReleaseNow();
- ZEN_DEBUG("gc cache bucket '{}': deleting standalone cache file '{}'", m_BucketDir, Path.ToUtf8());
+public:
+ DiskBucketReferenceChecker(CacheBucket& Owner) : m_CacheBucket(Owner) {}
- std::error_code Ec;
- if (!fs::remove(FilePath, Ec))
+ virtual ~DiskBucketReferenceChecker()
+ {
+ try
{
- continue;
+ m_IndexLock.reset();
+ if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it
+ m_CacheBucket.ClearReferenceCache();
+ }
}
- if (Ec)
+ catch (std::exception& Ex)
{
- ZEN_WARN("gc cache bucket '{}': delete expired z$ standalone file '{}' FAILED, reason: '{}'",
- m_BucketDir,
- Path.ToUtf8(),
- Ec.message());
- continue;
+ ZEN_ERROR("~DiskBucketReferenceChecker threw exception: '{}'", Ex.what());
}
- Stats.RemovedDisk += ExpiredKey.second;
}
- if (Ctx.Settings.CollectSmallObjects && !ExpiredInlineKeys.empty())
+ virtual void PreCache(GcCtx& Ctx) override
{
- // Compact block store
- m_BlockStore.CompactBlocks(
- BlockCompactState,
- m_Configuration.PayloadAlignment,
- [&](const BlockStore::MovedChunksArray& MovedArray, uint64_t FreedDiskSpace) {
- std::vector<DiskIndexEntry> MovedEntries;
- RwLock::ExclusiveLockScope _(m_IndexLock);
- for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
- {
- size_t ChunkIndex = Moved.first;
- const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::PreCache");
- if (auto It = m_Index.find(Key); It != m_Index.end())
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}",
+ m_CacheBucket.m_BucketDir,
+ m_CacheBucket.m_ReferenceCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ std::vector<IoHash> UpdateKeys;
+ std::vector<size_t> ReferenceCounts;
+ std::vector<IoHash> References;
+
+ auto GetAttachments = [&References, &ReferenceCounts](const void* CbObjectData) {
+ size_t CurrentReferenceCount = References.size();
+ CbObjectView Obj(CbObjectData);
+ Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); });
+ ReferenceCounts.push_back(References.size() - CurrentReferenceCount);
+ };
+
+ // Refresh cache
+ {
+ // If reference caching is enabled the references will be updated at modification for us so we don't need to track modifications
+ if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys = std::make_unique<HashSet>(); });
+ }
+
+ std::vector<IoHash> StandaloneKeys;
+ {
+ std::vector<IoHash> InlineKeys;
+ std::unordered_map<uint32_t, std::size_t> BlockIndexToEntriesPerBlockIndex;
+ struct InlineEntry
+ {
+ uint32_t InlineKeyIndex;
+ uint32_t Offset;
+ uint32_t Size;
+ };
+ std::vector<std::vector<InlineEntry>> EntriesPerBlock;
+ size_t UpdateCount = 0;
+ {
+ RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock);
+ for (const auto& Entry : m_CacheBucket.m_Index)
{
- BucketPayload& Payload = m_Payloads[It->second];
- const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex);
- if (Payload.Location.GetBlockLocation(m_Configuration.PayloadAlignment) != OldLocation)
+ if (Ctx.IsCancelledFlag.load())
+ {
+ IndexLock.ReleaseNow();
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ return;
+ }
+
+ PayloadIndex EntryIndex = Entry.second;
+ const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
+ if (m_CacheBucket.m_Configuration.EnableReferenceCaching &&
+ m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown())
+ {
+ continue;
+ }
+ UpdateCount++;
+ const IoHash& Key = Entry.first;
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
- // Someone has moved our chunk so lets just skip the new location we were provided, it will be GC:d at a later
- // time
+ StandaloneKeys.push_back(Key);
continue;
}
- const BlockStoreLocation& NewLocation = Moved.second;
+ BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment);
+ InlineEntry UpdateEntry = {.InlineKeyIndex = gsl::narrow<uint32_t>(InlineKeys.size()),
+ .Offset = gsl::narrow<uint32_t>(ChunkLocation.Offset),
+ .Size = gsl::narrow<uint32_t>(ChunkLocation.Size)};
+ InlineKeys.push_back(Key);
- Payload.Location = DiskLocation(NewLocation, m_Configuration.PayloadAlignment, Payload.Location.GetFlags());
- MovedEntries.push_back({.Key = Key, .Location = Payload.Location});
+ if (auto It = BlockIndexToEntriesPerBlockIndex.find(ChunkLocation.BlockIndex);
+ It != BlockIndexToEntriesPerBlockIndex.end())
+ {
+ EntriesPerBlock[It->second].emplace_back(UpdateEntry);
+ }
+ else
+ {
+ BlockIndexToEntriesPerBlockIndex.insert_or_assign(ChunkLocation.BlockIndex, EntriesPerBlock.size());
+ EntriesPerBlock.emplace_back(std::vector<InlineEntry>{UpdateEntry});
+ }
}
}
- m_SlogFile.Append(MovedEntries);
- Stats.RemovedDisk += FreedDiskSpace;
- },
- [&]() { return 0; });
- }
- std::vector<BucketPayload> Payloads;
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketMetaData> MetaDatas;
- std::vector<IoBuffer> MemCachedPayloads;
- std::vector<ReferenceIndex> FirstReferenceIndex;
- IndexMap Index;
- {
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- CompactState(Payloads, AccessTimes, MetaDatas, MemCachedPayloads, FirstReferenceIndex, Index, IndexLock);
- }
-}
+ UpdateKeys.reserve(UpdateCount);
-class DiskBucketReferenceChecker : public GcReferenceChecker
-{
-public:
- DiskBucketReferenceChecker(ZenCacheDiskLayer::CacheBucket& Owner) : m_CacheBucket(Owner) {}
+ for (auto It : BlockIndexToEntriesPerBlockIndex)
+ {
+ uint32_t BlockIndex = It.first;
+
+ Ref<BlockStoreFile> BlockFile = m_CacheBucket.m_BlockStore.GetBlockFile(BlockIndex);
+ if (BlockFile)
+ {
+ size_t EntriesPerBlockIndex = It.second;
+ std::vector<InlineEntry>& InlineEntries = EntriesPerBlock[EntriesPerBlockIndex];
+
+ std::sort(InlineEntries.begin(), InlineEntries.end(), [&](const InlineEntry& Lhs, const InlineEntry& Rhs) -> bool {
+ return Lhs.Offset < Rhs.Offset;
+ });
+
+ uint64_t BlockFileSize = BlockFile->FileSize();
+ BasicFileBuffer BlockBuffer(BlockFile->GetBasicFile(), 32768);
+ for (const InlineEntry& InlineEntry : InlineEntries)
+ {
+ if ((InlineEntry.Offset + InlineEntry.Size) > BlockFileSize)
+ {
+ ReferenceCounts.push_back(0);
+ }
+ else
+ {
+ MemoryView ChunkView = BlockBuffer.MakeView(InlineEntry.Size, InlineEntry.Offset);
+ if (ChunkView.GetSize() == InlineEntry.Size)
+ {
+ GetAttachments(ChunkView.GetData());
+ }
+ else
+ {
+ std::vector<uint8_t> Buffer(InlineEntry.Size);
+ BlockBuffer.Read(Buffer.data(), InlineEntry.Size, InlineEntry.Offset);
+ GetAttachments(Buffer.data());
+ }
+ }
+ const IoHash& Key = InlineKeys[InlineEntry.InlineKeyIndex];
+ UpdateKeys.push_back(Key);
+ }
+ }
+ }
+ }
+ {
+ for (const IoHash& Key : StandaloneKeys)
+ {
+ if (Ctx.IsCancelledFlag.load())
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ return;
+ }
+
+ IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(ZenContentType::kCbObject, Key);
+ if (!Buffer)
+ {
+ continue;
+ }
+
+ GetAttachments(Buffer.GetData());
+ UpdateKeys.push_back(Key);
+ }
+ }
+ }
- virtual ~DiskBucketReferenceChecker()
- {
- m_IndexLock.reset();
- if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
{
- // If reference caching is not enabled, we temporarily used the data structure for reference caching, lets reset it
- m_CacheBucket.ClearReferenceCache();
+ size_t ReferenceOffset = 0;
+ RwLock::ExclusiveLockScope IndexLock(m_CacheBucket.m_IndexLock);
+
+ if (!m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ ZEN_ASSERT(m_CacheBucket.m_FirstReferenceIndex.empty());
+ ZEN_ASSERT(m_CacheBucket.m_ReferenceHashes.empty());
+ ZEN_ASSERT(m_CacheBucket.m_NextReferenceHashesIndexes.empty());
+ ZEN_ASSERT(m_CacheBucket.m_ReferenceCount == 0);
+ ZEN_ASSERT(m_CacheBucket.m_UpdatedKeys);
+
+ // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when
+ // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted.
+ m_CacheBucket.m_FirstReferenceIndex.resize(m_CacheBucket.m_Payloads.size(), ReferenceIndex::Unknown());
+ m_CacheBucket.m_ReferenceHashes.reserve(References.size());
+ m_CacheBucket.m_NextReferenceHashesIndexes.reserve(References.size());
+ }
+ else
+ {
+ ZEN_ASSERT(!m_CacheBucket.m_UpdatedKeys);
+ }
+
+ for (size_t Index = 0; Index < UpdateKeys.size(); Index++)
+ {
+ const IoHash& Key = UpdateKeys[Index];
+ size_t ReferenceCount = ReferenceCounts[Index];
+ if (auto It = m_CacheBucket.m_Index.find(Key); It != m_CacheBucket.m_Index.end())
+ {
+ PayloadIndex EntryIndex = It->second;
+ if (m_CacheBucket.m_Configuration.EnableReferenceCaching)
+ {
+ if (m_CacheBucket.m_FirstReferenceIndex[EntryIndex] != ReferenceIndex::Unknown())
+ {
+ // The reference data is valid and what we have is old/redundant
+ continue;
+ }
+ }
+ else if (m_CacheBucket.m_UpdatedKeys->contains(Key))
+ {
+ // Our pre-cache data is invalid
+ continue;
+ }
+
+ m_CacheBucket.SetReferences(IndexLock,
+ m_CacheBucket.m_FirstReferenceIndex[EntryIndex],
+ std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount});
+ }
+ ReferenceOffset += ReferenceCount;
+ }
+
+ if (m_CacheBucket.m_Configuration.EnableReferenceCaching && !UpdateKeys.empty())
+ {
+ m_CacheBucket.CompactReferences(IndexLock);
+ }
}
}
virtual void LockState(GcCtx& Ctx) override
{
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::LockState");
+
Stopwatch Timer;
const auto _ = MakeGuard([&] {
if (!Ctx.Settings.Verbose)
@@ -2597,22 +3421,42 @@ public:
});
m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock);
-
- // Rescan to see if any cache items needs refreshing since last pass when we had the lock
- for (const auto& Entry : m_CacheBucket.m_Index)
+ if (Ctx.IsCancelledFlag.load())
{
- size_t PayloadIndex = Entry.second;
- const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_CacheBucket.m_Payloads[PayloadIndex];
- const DiskLocation& Loc = Payload.Location;
+ m_UncachedReferences.clear();
+ m_IndexLock.reset();
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ return;
+ }
- if (!Loc.IsFlagSet(DiskLocation::kStructured))
- {
- continue;
- }
- ZEN_ASSERT(!m_CacheBucket.m_FirstReferenceIndex.empty());
- const IoHash& Key = Entry.first;
- if (m_CacheBucket.m_FirstReferenceIndex[PayloadIndex] == ZenCacheDiskLayer::CacheBucket::ReferenceIndex::Unknown())
+ if (m_CacheBucket.m_UpdatedKeys)
+ {
+ const HashSet& UpdatedKeys(*m_CacheBucket.m_UpdatedKeys);
+ for (const IoHash& Key : UpdatedKeys)
{
+ if (Ctx.IsCancelledFlag.load())
+ {
+ m_UncachedReferences.clear();
+ m_IndexLock.reset();
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_UpdatedKeys.reset(); });
+ return;
+ }
+
+ auto It = m_CacheBucket.m_Index.find(Key);
+ if (It == m_CacheBucket.m_Index.end())
+ {
+ continue;
+ }
+
+ PayloadIndex EntryIndex = It->second;
+ const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
+
IoBuffer Buffer;
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
@@ -2633,21 +3477,48 @@ public:
}
}
- virtual void RemoveUsedReferencesFromSet(GcCtx&, HashSet& IoCids) override
+ virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
{
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::RemoveUsedReferencesFromSet");
+
ZEN_ASSERT(m_IndexLock);
+ size_t InitialCount = IoCids.size();
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: cachebucket [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}",
+ m_CacheBucket.m_BucketDir,
+ InitialCount - IoCids.size(),
+ InitialCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
for (const IoHash& ReferenceHash : m_CacheBucket.m_ReferenceHashes)
{
- IoCids.erase(ReferenceHash);
+ if (IoCids.erase(ReferenceHash) == 1)
+ {
+ if (IoCids.empty())
+ {
+ return;
+ }
+ }
}
for (const IoHash& ReferenceHash : m_UncachedReferences)
{
- IoCids.erase(ReferenceHash);
+ if (IoCids.erase(ReferenceHash) == 1)
+ {
+ if (IoCids.empty())
+ {
+ return;
+ }
+ }
}
}
- ZenCacheDiskLayer::CacheBucket& m_CacheBucket;
+ CacheBucket& m_CacheBucket;
std::unique_ptr<RwLock::SharedLockScope> m_IndexLock;
HashSet m_UncachedReferences;
};
@@ -2655,119 +3526,22 @@ public:
std::vector<GcReferenceChecker*>
ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
{
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::CreateReferenceCheckers");
+
Stopwatch Timer;
const auto _ = MakeGuard([&] {
if (!Ctx.Settings.Verbose)
{
return;
}
- ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': found {} references in {}",
- m_BucketDir,
- m_ReferenceCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ ZEN_INFO("GCV2: cachebucket [CREATE CHECKERS] '{}': completed in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- std::vector<IoHash> UpdateKeys;
- std::vector<IoHash> StandaloneKeys;
- std::vector<size_t> ReferenceCounts;
- std::vector<IoHash> References;
-
- // Refresh cache
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- for (const auto& Entry : m_Index)
- {
- size_t PayloadIndex = Entry.second;
- const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = m_Payloads[PayloadIndex];
- const DiskLocation& Loc = Payload.Location;
-
- if (!Loc.IsFlagSet(DiskLocation::kStructured))
- {
- continue;
- }
- if (m_Configuration.EnableReferenceCaching &&
- m_FirstReferenceIndex[PayloadIndex] != ZenCacheDiskLayer::CacheBucket::ReferenceIndex::Unknown())
- {
- continue;
- }
- const IoHash& Key = Entry.first;
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- StandaloneKeys.push_back(Key);
- continue;
- }
- IoBuffer Buffer = GetInlineCacheValue(Loc);
- if (!Buffer)
- {
- UpdateKeys.push_back(Key);
- ReferenceCounts.push_back(0);
- continue;
- }
- size_t CurrentReferenceCount = References.size();
- {
- CbObjectView Obj(Buffer.GetData());
- Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); });
- Buffer = {};
- }
- UpdateKeys.push_back(Key);
- ReferenceCounts.push_back(References.size() - CurrentReferenceCount);
- }
- }
- {
- for (const IoHash& Key : StandaloneKeys)
- {
- IoBuffer Buffer = GetStandaloneCacheValue(ZenContentType::kCbObject, Key);
- if (!Buffer)
- {
- continue;
- }
-
- size_t CurrentReferenceCount = References.size();
- {
- CbObjectView Obj(Buffer.GetData());
- Obj.IterateAttachments([&References](CbFieldView Field) { References.emplace_back(Field.AsAttachment()); });
- Buffer = {};
- }
- UpdateKeys.push_back(Key);
- ReferenceCounts.push_back(References.size() - CurrentReferenceCount);
- }
- }
-
{
- size_t ReferenceOffset = 0;
- RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- if (!m_Configuration.EnableReferenceCaching)
- {
- ZEN_ASSERT(m_FirstReferenceIndex.empty());
- ZEN_ASSERT(m_ReferenceHashes.empty());
- ZEN_ASSERT(m_NextReferenceHashesIndexes.empty());
- ZEN_ASSERT(m_ReferenceCount == 0);
- // If reference caching is not enabled, we will resize and use the data structure in place for reference caching when
- // we figure out what this bucket references. This will be reset once the DiskBucketReferenceChecker is deleted.
- m_FirstReferenceIndex.resize(m_Payloads.size());
- }
- for (size_t Index = 0; Index < UpdateKeys.size(); Index++)
- {
- const IoHash& Key = UpdateKeys[Index];
- size_t ReferenceCount = ReferenceCounts[Index];
- auto It = m_Index.find(Key);
- if (It == m_Index.end())
- {
- ReferenceOffset += ReferenceCount;
- continue;
- }
- if (m_FirstReferenceIndex[It->second] != ReferenceIndex::Unknown())
- {
- continue;
- }
- SetReferences(IndexLock,
- m_FirstReferenceIndex[It->second],
- std::span<IoHash>{References.data() + ReferenceOffset, ReferenceCount});
- ReferenceOffset += ReferenceCount;
- }
- if (m_Configuration.EnableReferenceCaching)
+ RwLock::SharedLockScope __(m_IndexLock);
+ if (m_Index.empty())
{
- CompactReferences(IndexLock);
+ return {};
}
}
@@ -2777,6 +3551,8 @@ ZenCacheDiskLayer::CacheBucket::CreateReferenceCheckers(GcCtx& Ctx)
void
ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&)
{
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactReferences");
+
std::vector<ReferenceIndex> FirstReferenceIndex;
std::vector<IoHash> NewReferenceHashes;
std::vector<ReferenceIndex> NewNextReferenceHashesIndexes;
@@ -2813,7 +3589,9 @@ ZenCacheDiskLayer::CacheBucket::CompactReferences(RwLock::ExclusiveLockScope&)
}
m_FirstReferenceIndex.swap(FirstReferenceIndex);
m_ReferenceHashes.swap(NewReferenceHashes);
+ m_ReferenceHashes.shrink_to_fit();
m_NextReferenceHashesIndexes.swap(NewNextReferenceHashesIndexes);
+ m_NextReferenceHashesIndexes.shrink_to_fit();
m_ReferenceCount = m_ReferenceHashes.size();
}
@@ -2940,24 +3718,24 @@ void
ZenCacheDiskLayer::CacheBucket::ClearReferenceCache()
{
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
- m_FirstReferenceIndex.clear();
- m_FirstReferenceIndex.shrink_to_fit();
- m_ReferenceHashes.clear();
- m_ReferenceHashes.shrink_to_fit();
- m_NextReferenceHashesIndexes.clear();
- m_NextReferenceHashesIndexes.shrink_to_fit();
+ Reset(m_FirstReferenceIndex);
+ Reset(m_ReferenceHashes);
+ Reset(m_NextReferenceHashesIndexes);
m_ReferenceCount = 0;
}
void
-ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloads,
+ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
+ std::vector<BucketPayload>& Payloads,
std::vector<AccessTime>& AccessTimes,
std::vector<BucketMetaData>& MetaDatas,
- std::vector<IoBuffer>& MemCachedPayloads,
+ std::vector<MemCacheData>& MemCachedPayloads,
std::vector<ReferenceIndex>& FirstReferenceIndex,
IndexMap& Index,
RwLock::ExclusiveLockScope& IndexLock)
{
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::CompactState");
+
size_t EntryCount = m_Index.size();
Payloads.reserve(EntryCount);
AccessTimes.reserve(EntryCount);
@@ -2966,6 +3744,8 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloa
FirstReferenceIndex.reserve(EntryCount);
}
Index.reserve(EntryCount);
+ Index.min_load_factor(IndexMinLoadFactor);
+ Index.max_load_factor(IndexMaxLoadFactor);
for (auto It : m_Index)
{
PayloadIndex EntryIndex = PayloadIndex(Payloads.size());
@@ -2975,11 +3755,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloa
if (Payload.MetaData)
{
MetaDatas.push_back(m_MetaDatas[Payload.MetaData]);
- Payload.MetaData = MetaDataIndex(m_MetaDatas.size() - 1);
+ Payload.MetaData = MetaDataIndex(MetaDatas.size() - 1);
}
if (Payload.MemCached)
{
- MemCachedPayloads.push_back(std::move(m_MemCachedPayloads[Payload.MemCached]));
+ MemCachedPayloads.emplace_back(
+ MemCacheData{.Payload = std::move(m_MemCachedPayloads[Payload.MemCached].Payload), .OwnerIndex = EntryIndex});
Payload.MemCached = MemCachedIndex(gsl::narrow<uint32_t>(MemCachedPayloads.size() - 1));
}
if (m_Configuration.EnableReferenceCaching)
@@ -2992,11 +3773,9 @@ ZenCacheDiskLayer::CacheBucket::CompactState(std::vector<BucketPayload>& Payloa
m_Payloads.swap(Payloads);
m_AccessTimes.swap(AccessTimes);
m_MetaDatas.swap(MetaDatas);
- m_FreeMetaDatas.clear();
- m_FreeMetaDatas.shrink_to_fit();
+ Reset(m_FreeMetaDatas);
m_MemCachedPayloads.swap(MemCachedPayloads);
- m_FreeMemCachedPayloads.clear();
- m_FreeMetaDatas.shrink_to_fit();
+ Reset(m_FreeMemCachedPayloads);
if (m_Configuration.EnableReferenceCaching)
{
m_FirstReferenceIndex.swap(FirstReferenceIndex);
@@ -3031,124 +3810,99 @@ ZenCacheDiskLayer::ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const st
ZenCacheDiskLayer::~ZenCacheDiskLayer()
{
-}
-
-bool
-ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
-{
- ZEN_TRACE_CPU("Z$::Disk::Get");
-
- const auto BucketName = std::string(InBucket);
- CacheBucket* Bucket = nullptr;
-
+ try
{
- RwLock::SharedLockScope _(m_Lock);
-
- auto It = m_Buckets.find(BucketName);
-
- if (It != m_Buckets.end())
{
- Bucket = It->second.get();
+ RwLock::ExclusiveLockScope _(m_Lock);
+ for (auto& It : m_Buckets)
+ {
+ m_DroppedBuckets.emplace_back(std::move(It.second));
+ }
+ m_Buckets.clear();
}
+ // We destroy the buckets without holding a lock since destructor calls GcManager::RemoveGcReferencer which takes an exclusive lock.
+ // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock
+ m_DroppedBuckets.clear();
}
-
- if (Bucket == nullptr)
+ catch (std::exception& Ex)
{
- // Bucket needs to be opened/created
+ ZEN_ERROR("~ZenCacheDiskLayer() failed. Reason: '{}'", Ex.what());
+ }
+}
- RwLock::ExclusiveLockScope _(m_Lock);
+ZenCacheDiskLayer::CacheBucket*
+ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
+{
+ ZEN_TRACE_CPU("Z$::Disk::GetOrCreateBucket");
+ const auto BucketName = std::string(InBucket);
+ {
+ RwLock::SharedLockScope SharedLock(m_Lock);
if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
{
- Bucket = It->second.get();
- }
- else
- {
- auto InsertResult =
- m_Buckets.emplace(BucketName,
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
- Bucket = InsertResult.first->second.get();
-
- std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= BucketName;
-
- if (!Bucket->OpenOrCreate(BucketPath))
- {
- m_Buckets.erase(InsertResult.first);
- return false;
- }
+ return It->second.get();
}
}
- ZEN_ASSERT(Bucket != nullptr);
- if (Bucket->Get(HashKey, OutValue))
+ // We create the bucket without holding a lock since contructor calls GcManager::AddGcReferencer which takes an exclusive lock.
+ // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock
+ std::unique_ptr<CacheBucket> Bucket(
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
+
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
{
- TryMemCacheTrim();
- return true;
+ return It->second.get();
}
- return false;
-}
-
-void
-ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
-{
- ZEN_TRACE_CPU("Z$::Disk::Put");
-
- const auto BucketName = std::string(InBucket);
- CacheBucket* Bucket = nullptr;
+ std::filesystem::path BucketPath = m_RootDir;
+ BucketPath /= BucketName;
+ try
{
- RwLock::SharedLockScope _(m_Lock);
-
- auto It = m_Buckets.find(BucketName);
-
- if (It != m_Buckets.end())
+ if (!Bucket->OpenOrCreate(BucketPath))
{
- Bucket = It->second.get();
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ return nullptr;
}
}
-
- if (Bucket == nullptr)
+ catch (const std::exception& Err)
{
- // New bucket needs to be created
+ ZEN_WARN("Creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ throw;
+ }
- RwLock::ExclusiveLockScope _(m_Lock);
+ CacheBucket* Result = Bucket.get();
+ m_Buckets.emplace(BucketName, std::move(Bucket));
- if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
- {
- Bucket = It->second.get();
- }
- else
- {
- auto InsertResult =
- m_Buckets.emplace(BucketName,
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
- Bucket = InsertResult.first->second.get();
+ return Result;
+}
- std::filesystem::path BucketPath = m_RootDir;
- BucketPath /= BucketName;
+bool
+ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
+{
+ ZEN_TRACE_CPU("Z$::Disk::Get");
- try
- {
- if (!Bucket->OpenOrCreate(BucketPath))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
- m_Buckets.erase(InsertResult.first);
- return;
- }
- }
- catch (const std::exception& Err)
- {
- ZEN_WARN("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- throw;
- }
+ if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
+ {
+ if (Bucket->Get(HashKey, OutValue))
+ {
+ TryMemCacheTrim();
+ return true;
}
}
+ return false;
+}
- ZEN_ASSERT(Bucket != nullptr);
+void
+ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References)
+{
+ ZEN_TRACE_CPU("Z$::Disk::Put");
- Bucket->Put(HashKey, Value, References);
- TryMemCacheTrim();
+ if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
+ {
+ Bucket->Put(HashKey, Value, References);
+ TryMemCacheTrim();
+ }
}
void
@@ -3208,11 +3962,8 @@ ZenCacheDiskLayer::DiscoverBuckets()
RwLock SyncLock;
- const size_t MaxHwTreadUse = std::thread::hardware_concurrency();
- const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, FoundBucketDirectories.size()));
-
- WorkerThreadPool Pool(WorkerThreadPoolCount);
- Latch WorkLatch(1);
+ WorkerThreadPool& Pool = GetLargeWorkerPool();
+ Latch WorkLatch(1);
for (auto& BucketPath : FoundBucketDirectories)
{
WorkLatch.AddCount(1);
@@ -3301,13 +4052,17 @@ void
ZenCacheDiskLayer::Flush()
{
std::vector<CacheBucket*> Buckets;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (Buckets.empty())
+ {
+ return;
+ }
+ ZEN_INFO("Flushed {} buckets at '{}' in {}", Buckets.size(), m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
{
- RwLock::SharedLockScope _(m_Lock);
- if (m_Buckets.empty())
- {
- return;
- }
+ RwLock::SharedLockScope __(m_Lock);
Buckets.reserve(m_Buckets.size());
for (auto& Kv : m_Buckets)
{
@@ -3315,28 +4070,29 @@ ZenCacheDiskLayer::Flush()
Buckets.push_back(Bucket);
}
}
- const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u);
- const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, Buckets.size()));
-
- WorkerThreadPool Pool(WorkerThreadPoolCount);
- Latch WorkLatch(1);
- for (auto& Bucket : Buckets)
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([&]() {
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
- Bucket->Flush();
- });
+ WorkerThreadPool& Pool = GetSmallWorkerPool();
+ Latch WorkLatch(1);
+ for (auto& Bucket : Buckets)
+ {
+ WorkLatch.AddCount(1);
+ Pool.ScheduleWork([&]() {
+ auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
+ Bucket->Flush();
+ });
+ }
+ WorkLatch.CountDown();
+ while (!WorkLatch.Wait(1000))
+ {
+ ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir);
+ }
}
- WorkLatch.CountDown();
- WorkLatch.Wait();
}
void
ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
{
RwLock::SharedLockScope _(m_Lock);
-
{
std::vector<std::future<void>> Results;
Results.reserve(m_Buckets.size());
@@ -3457,19 +4213,21 @@ ZenCacheDiskLayer::EnumerateBucketContents(std::string_view
CacheValueDetails::NamespaceDetails
ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const
{
- RwLock::SharedLockScope _(m_Lock);
CacheValueDetails::NamespaceDetails Details;
- if (BucketFilter.empty())
{
- Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1);
- for (auto& Kv : m_Buckets)
+ RwLock::SharedLockScope IndexLock(m_Lock);
+ if (BucketFilter.empty())
{
- Details.Buckets[Kv.first] = Kv.second->GetValueDetails(ValueFilter);
+ Details.Buckets.reserve(BucketFilter.empty() ? m_Buckets.size() : 1);
+ for (auto& Kv : m_Buckets)
+ {
+ Details.Buckets[Kv.first] = Kv.second->GetValueDetails(IndexLock, ValueFilter);
+ }
+ }
+ else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end())
+ {
+ Details.Buckets[It->first] = It->second->GetValueDetails(IndexLock, ValueFilter);
}
- }
- else if (auto It = m_Buckets.find(std::string(BucketFilter)); It != m_Buckets.end())
- {
- Details.Buckets[It->first] = It->second->GetValueDetails(ValueFilter);
}
return Details;
}
@@ -3480,17 +4238,8 @@ ZenCacheDiskLayer::MemCacheTrim()
ZEN_TRACE_CPU("Z$::Disk::MemCacheTrim");
ZEN_ASSERT(m_Configuration.MemCacheTargetFootprintBytes != 0);
-
- const GcClock::TimePoint Now = GcClock::Now();
-
- const GcClock::Tick NowTick = Now.time_since_epoch().count();
- const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim;
- const GcClock::Tick NextAllowedTrimTick = LastTrimTick + GcClock::Duration(TrimInterval).count();
- if (NowTick < NextAllowedTrimTick)
- {
- return;
- }
+ ZEN_ASSERT(m_Configuration.MemCacheMaxAgeSeconds != 0);
+ ZEN_ASSERT(m_Configuration.MemCacheTrimIntervalSeconds != 0);
bool Expected = false;
if (!m_IsMemCacheTrimming.compare_exchange_strong(Expected, true))
@@ -3498,75 +4247,90 @@ ZenCacheDiskLayer::MemCacheTrim()
return;
}
- // Bump time forward so we don't keep trying to do m_IsTrimming.compare_exchange_strong
- const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_LastTickMemCacheTrim.store(NextTrimTick);
+ try
+ {
+ m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this](JobContext&) {
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
+
+ const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
+ uint64_t TrimmedSize = 0;
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([&] {
+ ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
+ NiceBytes(TrimmedSize),
+ NiceBytes(m_TotalMemCachedSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+
+ const GcClock::Tick NowTick = GcClock::TickCount();
+ const GcClock::Tick NextTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
+ m_NextAllowedTrimTick.store(NextTrimTick);
+ m_IsMemCacheTrimming.store(false);
+ });
- m_JobQueue.QueueJob("ZenCacheDiskLayer::MemCacheTrim", [this, Now, TrimInterval](JobContext&) {
- ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim [Async]");
+ const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
- uint64_t StartSize = m_TotalMemCachedSize.load();
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] {
- uint64_t EndSize = m_TotalMemCachedSize.load();
- ZEN_INFO("trimmed {} (remaining {}), from memory cache in {}",
- NiceBytes(StartSize > EndSize ? StartSize - EndSize : 0),
- NiceBytes(m_TotalMemCachedSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- m_IsMemCacheTrimming.store(false);
- });
-
- const std::chrono::seconds MaxAge = std::chrono::seconds(m_Configuration.MemCacheMaxAgeSeconds);
+ static const size_t UsageSlotCount = 2048;
+ std::vector<uint64_t> UsageSlots;
+ UsageSlots.reserve(UsageSlotCount);
- std::vector<uint64_t> UsageSlots;
- UsageSlots.reserve(std::chrono::seconds(MaxAge / TrimInterval).count());
+ std::vector<CacheBucket*> Buckets;
+ {
+ RwLock::SharedLockScope __(m_Lock);
+ Buckets.reserve(m_Buckets.size());
+ for (auto& Kv : m_Buckets)
+ {
+ Buckets.push_back(Kv.second.get());
+ }
+ }
- std::vector<CacheBucket*> Buckets;
- {
- RwLock::SharedLockScope __(m_Lock);
- Buckets.reserve(m_Buckets.size());
- for (auto& Kv : m_Buckets)
+ const GcClock::TimePoint Now = GcClock::Now();
{
- Buckets.push_back(Kv.second.get());
+ ZEN_TRACE_CPU("Z$::ZenCacheDiskLayer::MemCacheTrim GetUsageByAccess");
+ for (CacheBucket* Bucket : Buckets)
+ {
+ Bucket->GetUsageByAccess(Now, MaxAge, UsageSlots);
+ }
}
- }
- for (CacheBucket* Bucket : Buckets)
- {
- Bucket->GetUsageByAccess(Now, GcClock::Duration(TrimInterval), UsageSlots);
- }
- uint64_t TotalSize = 0;
- for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
- {
- TotalSize += UsageSlots[Index];
- if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
+ uint64_t TotalSize = 0;
+ for (size_t Index = 0; Index < UsageSlots.size(); ++Index)
{
- GcClock::TimePoint ExpireTime = Now - (TrimInterval * Index);
- MemCacheTrim(Buckets, ExpireTime);
- break;
+ TotalSize += UsageSlots[Index];
+ if (TotalSize >= m_Configuration.MemCacheTargetFootprintBytes)
+ {
+ GcClock::TimePoint ExpireTime = Now - ((GcClock::Duration(MaxAge) * Index) / UsageSlotCount);
+ TrimmedSize = MemCacheTrim(Buckets, ExpireTime);
+ break;
+ }
}
- }
- });
+ });
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what());
+ m_IsMemCacheTrimming.store(false);
+ }
}
-void
+uint64_t
ZenCacheDiskLayer::MemCacheTrim(std::vector<CacheBucket*>& Buckets, GcClock::TimePoint ExpireTime)
{
if (m_Configuration.MemCacheTargetFootprintBytes == 0)
{
- return;
+ return 0;
}
- RwLock::SharedLockScope __(m_Lock);
+ uint64_t TrimmedSize = 0;
for (CacheBucket* Bucket : Buckets)
{
- Bucket->MemCacheTrim(ExpireTime);
+ TrimmedSize += Bucket->MemCacheTrim(ExpireTime);
}
const GcClock::TimePoint Now = GcClock::Now();
const GcClock::Tick NowTick = Now.time_since_epoch().count();
const std::chrono::seconds TrimInterval = std::chrono::seconds(m_Configuration.MemCacheTrimIntervalSeconds);
- GcClock::Tick LastTrimTick = m_LastTickMemCacheTrim;
+ GcClock::Tick LastTrimTick = m_NextAllowedTrimTick;
const GcClock::Tick NextAllowedTrimTick = NowTick + GcClock::Duration(TrimInterval).count();
- m_LastTickMemCacheTrim.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+ m_NextAllowedTrimTick.compare_exchange_strong(LastTrimTick, NextAllowedTrimTick);
+ return TrimmedSize;
}
#if ZEN_WITH_TESTS