aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-11-27 14:32:19 +0100
committerGitHub <[email protected]>2023-11-27 14:32:19 +0100
commit4d95b578350ebfbbf6d54407c9403547b01cac4c (patch)
tree9f8df5d934a6a62fdcebeac94dffe52139d3ea6b /src/zenserver/cache/cachedisklayer.cpp
parentgc stop command (#569) (diff)
downloadzen-4d95b578350ebfbbf6d54407c9403547b01cac4c.tar.xz
zen-4d95b578350ebfbbf6d54407c9403547b01cac4c.zip
optimized index snapshot reading/writing (#561)
the previous implementation of in-memory index snapshots serialise data to memory before writing to disk and vice versa when reading. This leads to some memory spikes which end up pushing useful data out of system cache and also cause stalls on I/O operations. this change moves more code to a streaming serialisation approach which scales better from a memory usage perspective and also performs much better
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp1213
1 files changed, 772 insertions, 441 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 6ab3c7746..0767086ce 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -25,12 +25,6 @@ namespace {
#pragma pack(push)
#pragma pack(1)
- // We use this to indicate if a on disk bucket needs wiping
- // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scrable the references
- // to block items.
- // See: https://github.com/EpicGames/zen/pull/299
- static const uint32_t CurrentDiskBucketVersion = 1;
-
struct CacheBucketIndexHeader
{
static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
@@ -48,12 +42,75 @@ namespace {
{
return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
}
+
+ bool IsValid() const
+ {
+ if (Magic != ExpectedMagic)
+ {
+ return false;
+ }
+
+ if (Checksum != ComputeChecksum(*this))
+ {
+ return false;
+ }
+
+ if (PayloadAlignment == 0)
+ {
+ return false;
+ }
+
+ return true;
+ }
};
static_assert(sizeof(CacheBucketIndexHeader) == 32);
+ struct BucketMetaHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta';
+ static constexpr uint32_t Version1 = 1;
+ static constexpr uint32_t CurrentVersion = Version1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t Padding = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const BucketMetaHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+
+ bool IsValid() const
+ {
+ if (Magic != ExpectedMagic)
+ {
+ return false;
+ }
+
+ if (Checksum != ComputeChecksum(*this))
+ {
+ return false;
+ }
+
+ if (Padding != 0)
+ {
+ return false;
+ }
+
+ return true;
+ }
+ };
+
+ static_assert(sizeof(BucketMetaHeader) == 32);
+
#pragma pack(pop)
+ //////////////////////////////////////////////////////////////////////////
+
template<typename T>
void Reset(T& V)
{
@@ -63,15 +120,16 @@ namespace {
const char* IndexExtension = ".uidx";
const char* LogExtension = ".slog";
+ const char* MetaExtension = ".meta";
std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
{
return BucketDir / (BucketName + IndexExtension);
}
- std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
{
- return BucketDir / (BucketName + ".tmp");
+ return BucketDir / (BucketName + MetaExtension);
}
std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
@@ -79,6 +137,12 @@ namespace {
return BucketDir / (BucketName + LogExtension);
}
+ std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ ZEN_UNUSED(BucketName);
+ return BucketDir / "zen_manifest";
+ }
+
bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason)
{
if (Entry.Key == IoHash::Zero)
@@ -147,22 +211,430 @@ namespace {
} // namespace
namespace fs = std::filesystem;
+using namespace std::literals;
+
+class BucketManifestSerializer
+{
+ using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex;
+ using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData;
+
+ using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex;
+ using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload;
+
+public:
+ // We use this to indicate if a on disk bucket needs wiping
+ // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references
+ // to block items.
+ // See: https://github.com/EpicGames/zen/pull/299
+ static inline const uint32_t CurrentDiskBucketVersion = 1;
+
+ bool Open(std::filesystem::path ManifestPath)
+ {
+ Manifest = LoadCompactBinaryObject(ManifestPath);
+ return !!Manifest;
+ }
+
+ Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); }
+
+ bool IsCurrentVersion(uint32_t& OutVersion) const
+ {
+ OutVersion = Manifest["Version"sv].AsUInt32(0);
+ return OutVersion == CurrentDiskBucketVersion;
+ }
+
+ void ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path ManifestPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads);
+
+ Oid GenerateNewManifest(std::filesystem::path ManifestPath);
+
+ IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount);
+ uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); }
+ void WriteSidecarFile(const std::filesystem::path& SidecarPath,
+ uint64_t SnapshotLogPosition,
+ ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
+ std::vector<AccessTime>&& AccessTimes,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas);
+ bool ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path SidecarPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads);
+
+ IoBuffer MakeManifest(const Oid& BucketId,
+ ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
+ std::vector<AccessTime>&& AccessTimes,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas);
+
+ CbObject Manifest;
+
+private:
+ CbObject LoadCompactBinaryObject(const fs::path& Path)
+ {
+ FileContents Result = ReadFile(Path);
+
+ if (!Result.ErrorCode)
+ {
+ IoBuffer Buffer = Result.Flatten();
+ if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
+ {
+ return zen::LoadCompactBinaryObject(Buffer);
+ }
+ }
+
+ return CbObject();
+ }
+
+ uint64_t m_ManifestEntryCount = 0;
+
+ struct ManifestData
+ {
+ IoHash Key; // 20
+ AccessTime Timestamp; // 4
+ IoHash RawHash; // 20
+ uint32_t Padding_0; // 4
+ size_t RawSize; // 8
+ uint64_t Padding_1; // 8
+ };
+
+ static_assert(sizeof(ManifestData) == 64);
+};
-static CbObject
-LoadCompactBinaryObject(const fs::path& Path)
+void
+BucketManifestSerializer::ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path ManifestPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
{
- FileContents Result = ReadFile(Path);
+ if (Manifest["UsingMetaFile"sv].AsBool())
+ {
+ ReadSidecarFile(Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads);
+
+ return;
+ }
+
+ ZEN_TRACE_CPU("Z$::ParseManifest");
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
+ const uint64_t Count = Manifest["Count"sv].AsUInt64(0);
+ std::vector<PayloadIndex> KeysIndexes;
+ KeysIndexes.reserve(Count);
- if (!Result.ErrorCode)
+ CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView();
+ for (CbFieldView& KeyView : KeyArray)
{
- IoBuffer Buffer = Result.Flatten();
- if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
+ if (auto It = Index.find(KeyView.AsHash()); It != Index.end())
{
- return LoadCompactBinaryObject(Buffer);
+ KeysIndexes.push_back(It.value());
+ }
+ else
+ {
+ KeysIndexes.push_back(PayloadIndex());
+ }
+ }
+
+ size_t KeyIndexOffset = 0;
+ CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView();
+ for (CbFieldView& TimeStampView : TimeStampArray)
+ {
+ const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
+ if (KeyIndex)
+ {
+ AccessTimes[KeyIndex] = TimeStampView.AsInt64();
+ }
+ }
+
+ KeyIndexOffset = 0;
+ CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView();
+ CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView();
+ if (RawHashArray.Num() == RawSizeArray.Num())
+ {
+ auto RawHashIt = RawHashArray.CreateViewIterator();
+ auto RawSizeIt = RawSizeArray.CreateViewIterator();
+ while (RawHashIt != CbFieldViewIterator())
+ {
+ const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
+
+ if (KeyIndex)
+ {
+ uint64_t RawSize = RawSizeIt.AsUInt64();
+ IoHash RawHash = RawHashIt.AsHash();
+ if (RawSize != 0 || RawHash != IoHash::Zero)
+ {
+ BucketPayload& Payload = Payloads[KeyIndex];
+ Bucket.SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash});
+ }
+ }
+
+ RawHashIt++;
+ RawSizeIt++;
}
}
+ else
+ {
+ ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath);
+ }
+}
- return CbObject();
+Oid
+BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath)
+{
+ const Oid BucketId = Oid::NewOid();
+
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << BucketId;
+ Writer << "Version"sv << CurrentDiskBucketVersion;
+ Manifest = Writer.Save();
+ WriteFile(ManifestPath, Manifest.GetBuffer().AsIoBuffer());
+
+ return BucketId;
+}
+
+IoBuffer
+BucketManifestSerializer::MakeManifest(const Oid& BucketId,
+ ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
+ std::vector<AccessTime>&& AccessTimes,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas)
+{
+ using namespace std::literals;
+
+ ZEN_TRACE_CPU("Z$::MakeManifest");
+
+ size_t ItemCount = Index.size();
+
+ // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth
+ // And we don't need to reallocate the underlying buffer in almost every case
+ const size_t EstimatedSizePerItem = 54u;
+ const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128);
+ CbObjectWriter Writer(ReserveSize);
+
+ Writer << "BucketId"sv << BucketId;
+ Writer << "Version"sv << CurrentDiskBucketVersion;
+
+ if (!Index.empty())
+ {
+ Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size()));
+ Writer.BeginArray("Keys"sv);
+ for (auto& Kv : Index)
+ {
+ const IoHash& Key = Kv.first;
+ Writer.AddHash(Key);
+ }
+ Writer.EndArray();
+
+ Writer.BeginArray("Timestamps"sv);
+ for (auto& Kv : Index)
+ {
+ GcClock::Tick AccessTime = AccessTimes[Kv.second];
+ Writer.AddInteger(AccessTime);
+ }
+ Writer.EndArray();
+
+ if (!MetaDatas.empty())
+ {
+ Writer.BeginArray("RawHash"sv);
+ for (auto& Kv : Index)
+ {
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second];
+ if (Payload.MetaData)
+ {
+ Writer.AddHash(MetaDatas[Payload.MetaData].RawHash);
+ }
+ else
+ {
+ Writer.AddHash(IoHash::Zero);
+ }
+ }
+ Writer.EndArray();
+
+ Writer.BeginArray("RawSize"sv);
+ for (auto& Kv : Index)
+ {
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second];
+ if (Payload.MetaData)
+ {
+ Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize);
+ }
+ else
+ {
+ Writer.AddInteger(0);
+ }
+ }
+ Writer.EndArray();
+ }
+ }
+
+ Manifest = Writer.Save();
+ return Manifest.GetBuffer().AsIoBuffer();
+}
+
+IoBuffer
+BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount)
+{
+ m_ManifestEntryCount = EntryCount;
+
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << BucketId;
+ Writer << "Version"sv << CurrentDiskBucketVersion;
+ Writer << "Count"sv << EntryCount;
+ Writer << "UsingMetaFile"sv << true;
+ Manifest = Writer.Save();
+
+ return Manifest.GetBuffer().AsIoBuffer();
+}
+
+bool
+BucketManifestSerializer::ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path SidecarPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
+{
+ ZEN_ASSERT(AccessTimes.size() == Payloads.size());
+
+ std::error_code Ec;
+
+ BasicFile SidecarFile;
+ SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to open sidecar file '{}'", SidecarPath));
+ }
+
+ uint64_t FileSize = SidecarFile.FileSize();
+
+ auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); });
+
+ if (FileSize < sizeof(BucketMetaHeader))
+ {
+ return false;
+ }
+
+ BasicFileBuffer Sidecar(SidecarFile, 128 * 1024);
+
+ BucketMetaHeader Header;
+ Sidecar.Read(&Header, sizeof Header, 0);
+
+ if (!Header.IsValid())
+ {
+ return false;
+ }
+
+ if (Header.Version != BucketMetaHeader::Version1)
+ {
+ return false;
+ }
+
+ const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData);
+ if (Header.EntryCount > ExpectedEntryCount)
+ {
+ return false;
+ }
+
+ InvalidGuard.Dismiss();
+
+ uint64_t RemainingEntryCount = ExpectedEntryCount;
+ uint64_t EntryCount = 0;
+ uint64_t CurrentReadOffset = sizeof(Header);
+
+ while (RemainingEntryCount--)
+ {
+ const ManifestData* Entry = Sidecar.MakeView<ManifestData>(CurrentReadOffset);
+ CurrentReadOffset += sizeof(ManifestData);
+
+ if (auto It = Index.find(Entry->Key); It != Index.end())
+ {
+ PayloadIndex PlIndex = It.value();
+
+ ZEN_ASSERT(size_t(PlIndex) <= Payloads.size());
+
+ ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex];
+
+ AccessTimes[PlIndex] = Entry->Timestamp;
+
+ if (Entry->RawSize && Entry->RawHash != IoHash::Zero)
+ {
+ Bucket.SetMetaData(PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash});
+ }
+ }
+
+ EntryCount++;
+ }
+
+ ZEN_ASSERT(EntryCount == ExpectedEntryCount);
+
+ return true;
+}
+
+void
+BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path& SidecarPath,
+ uint64_t SnapshotLogPosition,
+ ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
+ std::vector<AccessTime>&& AccessTimes,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas)
+{
+ BucketMetaHeader Header;
+ Header.EntryCount = m_ManifestEntryCount;
+ Header.LogPosition = SnapshotLogPosition;
+ Header.Checksum = Header.ComputeChecksum(Header);
+
+ std::error_code Ec;
+
+ TemporaryFile SidecarFile;
+ SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath()));
+ }
+
+ SidecarFile.Write(&Header, sizeof Header, 0);
+
+ // TODO: make this batching for better performance
+ {
+ uint64_t WriteOffset = sizeof Header;
+
+ BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024);
+
+ for (auto& Kv : Index)
+ {
+ const IoHash& Key = Kv.first;
+ const PayloadIndex PlIndex = Kv.second;
+
+ IoHash RawHash = IoHash::Zero;
+ uint64_t RawSize = 0;
+
+ if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData)
+ {
+ RawHash = MetaDatas[MetaIndex].RawHash;
+ RawSize = MetaDatas[MetaIndex].RawSize;
+ }
+
+ ManifestData ManifestEntry =
+ {.Key = Key, .Timestamp = AccessTimes[PlIndex], .RawHash = RawHash, .Padding_0 = 0, .RawSize = RawSize, .Padding_1 = 0};
+
+ SidecarWriter.Write(&ManifestEntry, sizeof ManifestEntry, WriteOffset);
+
+ WriteOffset += sizeof ManifestEntry;
+ }
+ }
+
+ SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath));
+ }
}
//////////////////////////////////////////////////////////////////////////
@@ -207,167 +679,67 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
CreateDirectories(m_BucketDir);
- std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"};
+ std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
bool IsNew = false;
- CbObject Manifest = LoadCompactBinaryObject(ManifestPath);
+ BucketManifestSerializer ManifestReader;
- if (Manifest)
+ if (ManifestReader.Open(ManifestPath))
{
- m_BucketId = Manifest["BucketId"sv].AsObjectId();
+ m_BucketId = ManifestReader.GetBucketId();
if (m_BucketId == Oid::Zero)
{
return false;
}
- const uint32_t Version = Manifest["Version"sv].AsUInt32(0);
- if (Version != CurrentDiskBucketVersion)
+
+ uint32_t Version = 0;
+ if (ManifestReader.IsCurrentVersion(/* out */ Version) == false)
{
- ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion);
+ ZEN_INFO("Wiping bucket '{}', found version {}, required version {}",
+ BucketDir,
+ Version,
+ BucketManifestSerializer::CurrentDiskBucketVersion);
IsNew = true;
}
}
else if (AllowCreate)
{
- m_BucketId.Generate();
-
- CbObjectWriter Writer;
- Writer << "BucketId"sv << m_BucketId;
- Writer << "Version"sv << CurrentDiskBucketVersion;
- Manifest = Writer.Save();
- WriteFile(m_BucketDir / "zen_manifest", Manifest.GetBuffer().AsIoBuffer());
- IsNew = true;
+ m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath);
+ IsNew = true;
}
else
{
return false;
}
- OpenLog(IsNew);
+ InitializeIndexFromDisk(IsNew);
- if (!IsNew)
+ if (IsNew)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate::Manifest");
-
- Stopwatch Timer;
- const auto _ =
- MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
-
- const uint64_t Count = Manifest["Count"sv].AsUInt64(0);
- if (Count != 0)
- {
- std::vector<PayloadIndex> KeysIndexes;
- KeysIndexes.reserve(Count);
- CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView();
- for (CbFieldView& KeyView : KeyArray)
- {
- if (auto It = m_Index.find(KeyView.AsHash()); It != m_Index.end())
- {
- KeysIndexes.push_back(It.value());
- }
- else
- {
- KeysIndexes.push_back(PayloadIndex());
- }
- }
- size_t KeyIndexOffset = 0;
- CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView();
- for (CbFieldView& TimeStampView : TimeStampArray)
- {
- const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
- if (KeyIndex)
- {
- m_AccessTimes[KeyIndex] = TimeStampView.AsInt64();
- }
- }
- KeyIndexOffset = 0;
- CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView();
- CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView();
- if (RawHashArray.Num() == RawSizeArray.Num())
- {
- auto RawHashIt = RawHashArray.CreateViewIterator();
- auto RawSizeIt = RawSizeArray.CreateViewIterator();
- while (RawHashIt != CbFieldViewIterator())
- {
- const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
-
- if (KeyIndex)
- {
- uint64_t RawSize = RawSizeIt.AsUInt64();
- IoHash RawHash = RawHashIt.AsHash();
- if (RawSize != 0 || RawHash != IoHash::Zero)
- {
- BucketPayload& Payload = m_Payloads[KeyIndex];
- SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash});
- }
- }
-
- RawHashIt++;
- RawSizeIt++;
- }
- }
- else
- {
- ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath);
- }
- }
-
- ////// Legacy format read
- {
- for (CbFieldView Entry : Manifest["Timestamps"sv])
- {
- const CbObjectView Obj = Entry.AsObjectView();
- const IoHash Key = Obj["Key"sv].AsHash();
-
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64();
- }
- }
- for (CbFieldView Entry : Manifest["RawInfo"sv])
- {
- const CbObjectView Obj = Entry.AsObjectView();
- const IoHash Key = Obj["Key"sv].AsHash();
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
-
- const IoHash RawHash = Obj["RawHash"sv].AsHash();
- const uint64_t RawSize = Obj["RawSize"sv].AsUInt64();
-
- if (RawHash == IoHash::Zero || RawSize == 0)
- {
- ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex);
- }
-
- BucketPayload& Payload = m_Payloads[EntryIndex];
- SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash});
- }
- }
- }
+ return true;
}
+ ManifestReader.ParseManifest(*this, ManifestPath, m_Index, m_AccessTimes, m_Payloads);
+
return true;
}
void
-ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
+ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeIndexSnapshot");
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot");
- uint64_t LogCount = m_SlogFile.GetLogCount();
+ const uint64_t LogCount = m_SlogFile.GetLogCount();
if (m_LogFlushPosition == LogCount)
{
return;
}
ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir);
- uint64_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
+ const uint64_t EntryCount = m_Index.size();
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
m_BucketDir,
EntryCount,
@@ -376,42 +748,11 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t()
namespace fs = std::filesystem;
- fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
- fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName);
-
- // Move index away, we keep it if something goes wrong
- if (fs::is_regular_file(STmpIndexPath))
- {
- std::error_code Ec;
- if (!fs::remove(STmpIndexPath, Ec) || Ec)
- {
- ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", STmpIndexPath, Ec.message());
- return;
- }
- }
+ fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
try
{
- if (fs::is_regular_file(IndexPath))
- {
- fs::rename(IndexPath, STmpIndexPath);
- }
-
- // Write the current state of the location map to a new index state
- std::vector<DiskIndexEntry> Entries;
- Entries.resize(m_Index.size());
-
- {
- uint64_t EntryIndex = 0;
- for (auto& Entry : m_Index)
- {
- DiskIndexEntry& IndexEntry = Entries[EntryIndex++];
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = m_Payloads[Entry.second].Location;
- }
- }
-
- uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + Entries.size() * sizeof(DiskIndexEntry);
+ const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry);
std::error_code Error;
DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
if (Error)
@@ -431,45 +772,67 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t()
fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize)));
}
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
- CacheBucketIndexHeader Header = {.EntryCount = Entries.size(),
- .LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
+ TemporaryFile ObjectIndexFile;
+ std::error_code Ec;
+ ObjectIndexFile.CreateTemporary(m_BucketDir, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir));
+ }
- Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
- ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
- ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
- ObjectIndexFile.Flush();
- ObjectIndexFile.Close();
- EntryCount = Entries.size();
- m_LogFlushPosition = LogCount;
- }
- catch (std::exception& Err)
- {
- ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());
+ {
+ // This is in a separate scope just to ensure IndexWriter goes out
+ // of scope before the file is flushed/closed, in order to ensure
+ // all data is written to the file
+ BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024);
+
+ CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
+
+ Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
+ IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
+
+ uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader);
- // Restore any previous snapshot
+ for (auto& Entry : m_Index)
+ {
+ DiskIndexEntry IndexEntry;
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Location = m_Payloads[Entry.second].Location;
+ IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset);
- if (fs::is_regular_file(STmpIndexPath))
+ IndexWriteOffset += sizeof(DiskIndexEntry);
+ }
+
+ IndexWriter.Flush();
+ }
+
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
+ if (Ec)
{
- std::error_code Ec;
- fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless
- fs::rename(STmpIndexPath, IndexPath, Ec);
- if (Ec)
+ std::filesystem::path TempFilePath = ObjectIndexFile.GetPath();
+ ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message());
+
+ if (std::filesystem::is_regular_file(TempFilePath))
{
- ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", STmpIndexPath, Ec.message());
+ if (!std::filesystem::remove(TempFilePath, Ec) || Ec)
+ {
+ ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempFilePath, Ec.message());
+ }
}
}
- }
- if (fs::is_regular_file(STmpIndexPath))
- {
- std::error_code Ec;
- if (!fs::remove(STmpIndexPath, Ec) || Ec)
+ else
{
- ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", STmpIndexPath, Ec.message());
+ // We must only update the log flush position once the snapshot write succeeds
+ m_LogFlushPosition = LogCount;
}
}
+ catch (std::exception& Err)
+ {
+ ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());
+ }
}
uint64_t
@@ -477,77 +840,88 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile");
- if (std::filesystem::is_regular_file(IndexPath))
+ if (!std::filesystem::is_regular_file(IndexPath))
{
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CacheBucketIndexHeader))
- {
- CacheBucketIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) &&
- (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0))
- {
- switch (Header.Version)
- {
- case CacheBucketIndexHeader::Version2:
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
- if (Header.EntryCount > ExpectedEntryCount)
- {
- break;
- }
- size_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' index containing {} entries in {}",
- IndexPath,
- EntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
+ return 0;
+ }
- m_Configuration.PayloadAlignment = Header.PayloadAlignment;
+ auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); });
- std::vector<DiskIndexEntry> Entries;
- Entries.resize(Header.EntryCount);
- ObjectIndexFile.Read(Entries.data(),
- Header.EntryCount * sizeof(DiskIndexEntry),
- sizeof(CacheBucketIndexHeader));
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t FileSize = ObjectIndexFile.FileSize();
+ if (FileSize < sizeof(CacheBucketIndexHeader))
+ {
+ return 0;
+ }
- m_Payloads.reserve(Header.EntryCount);
- m_Index.reserve(Header.EntryCount);
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- std::string InvalidEntryReason;
- for (const DiskIndexEntry& Entry : Entries)
- {
- if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
- continue;
- }
- PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
- m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location});
- m_Index.insert_or_assign(Entry.Key, EntryIndex);
- EntryCount++;
- }
- m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
- if (m_Configuration.EnableReferenceCaching)
- {
- m_FirstReferenceIndex.resize(m_Payloads.size());
- }
- OutVersion = CacheBucketIndexHeader::Version2;
- return Header.LogPosition;
- }
- break;
- default:
- break;
- }
- }
+ if (!Header.IsValid())
+ {
+ return 0;
+ }
+
+ if (Header.Version != CacheBucketIndexHeader::Version2)
+ {
+ return 0;
+ }
+
+ const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ if (Header.EntryCount > ExpectedEntryCount)
+ {
+ return 0;
+ }
+
+ InvalidGuard.Dismiss();
+
+ size_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ m_Configuration.PayloadAlignment = Header.PayloadAlignment;
+
+ m_Payloads.reserve(Header.EntryCount);
+ m_Index.reserve(Header.EntryCount);
+
+ BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024);
+
+ uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader);
+ uint64_t RemainingEntryCount = Header.EntryCount;
+
+ std::string InvalidEntryReason;
+ while (RemainingEntryCount--)
+ {
+ const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset);
+ CurrentReadOffset += sizeof(DiskIndexEntry);
+
+ if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
}
- ZEN_WARN("skipping invalid index file '{}'", IndexPath);
+
+ const PayloadIndex EntryIndex = PayloadIndex(EntryCount);
+ m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location});
+ m_Index.insert_or_assign(Entry->Key, EntryIndex);
+
+ EntryCount++;
}
- return 0;
+
+ ZEN_ASSERT(EntryCount == m_Payloads.size());
+
+ m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount()));
+
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.resize(EntryCount);
+ }
+
+ OutVersion = CacheBucketIndexHeader::Version2;
+ return Header.LogPosition;
}
uint64_t
@@ -555,61 +929,73 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog");
- if (std::filesystem::is_regular_file(LogPath))
+ if (!std::filesystem::is_regular_file(LogPath))
{
- uint64_t LogEntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
- TCasLogFile<DiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- if (CasLog.Initialize())
- {
- uint64_t EntryCount = CasLog.GetLogCount();
- if (EntryCount < SkipEntryCount)
- {
- ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
- SkipEntryCount = 0;
- }
- LogEntryCount = EntryCount - SkipEntryCount;
- uint64_t InvalidEntryCount = 0;
- CasLog.Replay(
- [&](const DiskIndexEntry& Record) {
- std::string InvalidEntryReason;
- if (Record.Location.Flags & DiskLocation::kTombStone)
- {
- m_Index.erase(Record.Key);
- return;
- }
- if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
- ++InvalidEntryCount;
- return;
- }
- PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
- m_Payloads.emplace_back(BucketPayload{.Location = Record.Location});
- m_Index.insert_or_assign(Record.Key, EntryIndex);
- },
- SkipEntryCount);
- m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
- if (m_Configuration.EnableReferenceCaching)
+ return 0;
+ }
+
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (!CasLog.Initialize())
+ {
+ return 0;
+ }
+
+ const uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+
+ LogEntryCount = EntryCount - SkipEntryCount;
+ uint64_t InvalidEntryCount = 0;
+
+ CasLog.Replay(
+ [&](const DiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Location.Flags & DiskLocation::kTombStone)
{
- m_FirstReferenceIndex.resize(m_Payloads.size());
+ // Note: this leaves m_Payloads and other arrays with 'holes' in them
+ m_Index.erase(Record.Key);
+ return;
}
- if (InvalidEntryCount)
+
+ if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
{
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
}
- return LogEntryCount;
- }
+ PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
+ m_Payloads.emplace_back(BucketPayload{.Location = Record.Location});
+ m_Index.insert_or_assign(Record.Key, EntryIndex);
+ },
+ SkipEntryCount);
+
+ m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
+
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.resize(m_Payloads.size());
}
- return 0;
+
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
+ }
+
+ return LogEntryCount;
};
void
-ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
+ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(const bool IsNew)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog");
@@ -661,15 +1047,14 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
}
else if (fs::is_regular_file(LogPath))
{
- ZEN_WARN("removing invalid cas log at '{}'", LogPath);
+ ZEN_WARN("removing invalid log at '{}'", LogPath);
std::filesystem::remove(LogPath);
}
}
m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
- std::vector<BlockStoreLocation> KnownLocations;
- KnownLocations.reserve(m_Index.size());
+ BlockStore::BlockIndexSet KnownBlocks;
for (const auto& Entry : m_Index)
{
size_t EntryIndex = Entry.second;
@@ -679,19 +1064,19 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
{
m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
- continue;
}
- const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment);
- KnownLocations.push_back(BlockLocation);
+ else
+ {
+ const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment);
+ KnownBlocks.Add(BlockLocation.BlockIndex);
+ }
}
-
- m_BlockStore.SyncExistingBlocksOnDisk(KnownLocations);
+ m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
if (IsNew || LogEntryCount > 0)
{
- MakeIndexSnapshot();
+ WriteIndexSnapshot();
}
- // TODO: should validate integrity of container files here
}
void
@@ -981,20 +1366,7 @@ ZenCacheDiskLayer::CacheBucket::Flush()
m_BlockStore.Flush(/*ForceNewBlock*/ false);
m_SlogFile.Flush();
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- std::vector<BucketMetaData> MetaDatas;
- IndexMap Index;
-
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- MakeIndexSnapshot();
- Index = m_Index;
- Payloads = m_Payloads;
- AccessTimes = m_AccessTimes;
- MetaDatas = m_MetaDatas;
- }
- SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas));
+ SaveSnapshot();
}
catch (std::exception& Ex)
{
@@ -1003,113 +1375,85 @@ ZenCacheDiskLayer::CacheBucket::Flush()
}
void
-ZenCacheDiskLayer::CacheBucket::SaveManifest(CbObject&& Manifest, const std::function<uint64_t()>& ClaimDiskReserveFunc)
+ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest");
try
{
- IoBuffer Buffer = Manifest.GetBuffer().AsIoBuffer();
-
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
- if (Error)
- {
- ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
- return;
- }
- bool EnoughSpace = Space.Free >= Buffer.GetSize() + 1024 * 512;
- if (!EnoughSpace)
- {
- uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
- EnoughSpace = (Space.Free + ReclaimedSpace) >= Buffer.GetSize() + 1024 * 512;
- }
- if (!EnoughSpace)
- {
- ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", m_BucketDir, NiceBytes(Buffer.GetSize()));
- return;
- }
- WriteFile(m_BucketDir / "zen_manifest", Buffer);
- }
- catch (std::exception& Err)
- {
- ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what());
- }
-}
+ bool UseLegacyScheme = false;
+ uint64_t SidecarSize = 0;
-CbObject
-ZenCacheDiskLayer::CacheBucket::MakeManifest(IndexMap&& Index,
- std::vector<AccessTime>&& AccessTimes,
- const std::vector<BucketPayload>& Payloads,
- const std::vector<BucketMetaData>& MetaDatas)
-{
- using namespace std::literals;
+ IoBuffer Buffer;
+ BucketManifestSerializer ManifestWriter;
- ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeManifest");
-
- size_t ItemCount = Index.size();
-
- // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth
- // And we don't need to reallocate theunderying buffer in almost every case
- const size_t EstimatedSizePerItem = 54u;
- const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128);
- CbObjectWriter Writer(ReserveSize);
+ {
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketPayload> Payloads;
+ std::vector<BucketMetaData> MetaDatas;
+ IndexMap Index;
- Writer << "BucketId"sv << m_BucketId;
- Writer << "Version"sv << CurrentDiskBucketVersion;
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ WriteIndexSnapshot();
+ // Note: this copy could be eliminated on shutdown to
+ // reduce memory usage and execution time
+ Index = m_Index;
+ Payloads = m_Payloads;
+ AccessTimes = m_AccessTimes;
+ MetaDatas = m_MetaDatas;
+ }
- if (!Index.empty())
- {
- Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size()));
- Writer.BeginArray("Keys"sv);
- for (auto& Kv : Index)
- {
- const IoHash& Key = Kv.first;
- Writer.AddHash(Key);
- }
- Writer.EndArray();
+ if (UseLegacyScheme)
+ {
+ Buffer = ManifestWriter.MakeManifest(m_BucketId, std::move(Index), std::move(AccessTimes), Payloads, MetaDatas);
+ }
+ else
+ {
+ const uint64_t EntryCount = Index.size();
+ Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount);
+ SidecarSize = ManifestWriter.GetSidecarSize();
+ }
- Writer.BeginArray("Timestamps"sv);
- for (auto& Kv : Index)
- {
- GcClock::Tick AccessTime = AccessTimes[Kv.second];
- Writer.AddInteger(AccessTime);
- }
- Writer.EndArray();
+ const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512;
- if (!MetaDatas.empty())
- {
- Writer.BeginArray("RawHash"sv);
- for (auto& Kv : Index)
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
+ if (Error)
{
- const BucketPayload& Payload = Payloads[Kv.second];
- if (Payload.MetaData)
- {
- Writer.AddHash(MetaDatas[Payload.MetaData].RawHash);
- }
- else
- {
- Writer.AddHash(IoHash::Zero);
- }
+ ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
+ return;
+ }
+ bool EnoughSpace = Space.Free >= RequiredSpace;
+ if (!EnoughSpace)
+ {
+ uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
+ EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace;
+ }
+ if (!EnoughSpace)
+ {
+ ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}",
+ m_BucketDir,
+ NiceBytes(Buffer.GetSize()));
+ return;
}
- Writer.EndArray();
- Writer.BeginArray("RawSize"sv);
- for (auto& Kv : Index)
+ if (!UseLegacyScheme)
{
- const BucketPayload& Payload = Payloads[Kv.second];
- if (Payload.MetaData)
- {
- Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize);
- }
- else
- {
- Writer.AddInteger(0);
- }
+ ManifestWriter.WriteSidecarFile(GetMetaPath(m_BucketDir, m_BucketName),
+ m_LogFlushPosition,
+ std::move(Index),
+ std::move(AccessTimes),
+ Payloads,
+ MetaDatas);
}
- Writer.EndArray();
}
+
+ std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
+ WriteFile(ManifestPath, Buffer);
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what());
}
- return Writer.Save();
}
IoHash
@@ -1683,20 +2027,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
try
{
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- std::vector<BucketMetaData> MetaDatas;
- IndexMap Index;
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- MakeIndexSnapshot([&]() { return GcCtx.ClaimGCReserve(); });
- Index = m_Index;
- Payloads = m_Payloads;
- AccessTimes = m_AccessTimes;
- MetaDatas = m_MetaDatas;
- }
- SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas),
- [&]() { return GcCtx.ClaimGCReserve(); });
+ SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); });
}
catch (std::exception& Ex)
{