aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp1213
1 files changed, 772 insertions, 441 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 6ab3c7746..0767086ce 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -25,12 +25,6 @@ namespace {
#pragma pack(push)
#pragma pack(1)
- // We use this to indicate if a on disk bucket needs wiping
- // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scrable the references
- // to block items.
- // See: https://github.com/EpicGames/zen/pull/299
- static const uint32_t CurrentDiskBucketVersion = 1;
-
struct CacheBucketIndexHeader
{
static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
@@ -48,12 +42,75 @@ namespace {
{
return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
}
+
+ bool IsValid() const
+ {
+ if (Magic != ExpectedMagic)
+ {
+ return false;
+ }
+
+ if (Checksum != ComputeChecksum(*this))
+ {
+ return false;
+ }
+
+ if (PayloadAlignment == 0)
+ {
+ return false;
+ }
+
+ return true;
+ }
};
static_assert(sizeof(CacheBucketIndexHeader) == 32);
+ struct BucketMetaHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta';
+ static constexpr uint32_t Version1 = 1;
+ static constexpr uint32_t CurrentVersion = Version1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t Padding = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const BucketMetaHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+
+ bool IsValid() const
+ {
+ if (Magic != ExpectedMagic)
+ {
+ return false;
+ }
+
+ if (Checksum != ComputeChecksum(*this))
+ {
+ return false;
+ }
+
+ if (Padding != 0)
+ {
+ return false;
+ }
+
+ return true;
+ }
+ };
+
+ static_assert(sizeof(BucketMetaHeader) == 32);
+
#pragma pack(pop)
+ //////////////////////////////////////////////////////////////////////////
+
template<typename T>
void Reset(T& V)
{
@@ -63,15 +120,16 @@ namespace {
const char* IndexExtension = ".uidx";
const char* LogExtension = ".slog";
+ const char* MetaExtension = ".meta";
std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
{
return BucketDir / (BucketName + IndexExtension);
}
- std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
{
- return BucketDir / (BucketName + ".tmp");
+ return BucketDir / (BucketName + MetaExtension);
}
std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
@@ -79,6 +137,12 @@ namespace {
return BucketDir / (BucketName + LogExtension);
}
+ std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
+ {
+ ZEN_UNUSED(BucketName);
+ return BucketDir / "zen_manifest";
+ }
+
bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason)
{
if (Entry.Key == IoHash::Zero)
@@ -147,22 +211,430 @@ namespace {
} // namespace
namespace fs = std::filesystem;
+using namespace std::literals;
+
+class BucketManifestSerializer
+{
+ using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex;
+ using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData;
+
+ using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex;
+ using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload;
+
+public:
+ // We use this to indicate if a on disk bucket needs wiping
+ // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references
+ // to block items.
+ // See: https://github.com/EpicGames/zen/pull/299
+ static inline const uint32_t CurrentDiskBucketVersion = 1;
+
+ bool Open(std::filesystem::path ManifestPath)
+ {
+ Manifest = LoadCompactBinaryObject(ManifestPath);
+ return !!Manifest;
+ }
+
+ Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); }
+
+ bool IsCurrentVersion(uint32_t& OutVersion) const
+ {
+ OutVersion = Manifest["Version"sv].AsUInt32(0);
+ return OutVersion == CurrentDiskBucketVersion;
+ }
+
+ void ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path ManifestPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads);
+
+ Oid GenerateNewManifest(std::filesystem::path ManifestPath);
+
+ IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount);
+ uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); }
+ void WriteSidecarFile(const std::filesystem::path& SidecarPath,
+ uint64_t SnapshotLogPosition,
+ ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
+ std::vector<AccessTime>&& AccessTimes,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas);
+ bool ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path SidecarPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads);
+
+ IoBuffer MakeManifest(const Oid& BucketId,
+ ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
+ std::vector<AccessTime>&& AccessTimes,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas);
+
+ CbObject Manifest;
+
+private:
+ CbObject LoadCompactBinaryObject(const fs::path& Path)
+ {
+ FileContents Result = ReadFile(Path);
+
+ if (!Result.ErrorCode)
+ {
+ IoBuffer Buffer = Result.Flatten();
+ if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
+ {
+ return zen::LoadCompactBinaryObject(Buffer);
+ }
+ }
+
+ return CbObject();
+ }
+
+ uint64_t m_ManifestEntryCount = 0;
+
+ struct ManifestData
+ {
+ IoHash Key; // 20
+ AccessTime Timestamp; // 4
+ IoHash RawHash; // 20
+ uint32_t Padding_0; // 4
+ size_t RawSize; // 8
+ uint64_t Padding_1; // 8
+ };
+
+ static_assert(sizeof(ManifestData) == 64);
+};
-static CbObject
-LoadCompactBinaryObject(const fs::path& Path)
+void
+BucketManifestSerializer::ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path ManifestPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
{
- FileContents Result = ReadFile(Path);
+ if (Manifest["UsingMetaFile"sv].AsBool())
+ {
+ ReadSidecarFile(Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads);
+
+ return;
+ }
+
+ ZEN_TRACE_CPU("Z$::ParseManifest");
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
+ const uint64_t Count = Manifest["Count"sv].AsUInt64(0);
+ std::vector<PayloadIndex> KeysIndexes;
+ KeysIndexes.reserve(Count);
- if (!Result.ErrorCode)
+ CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView();
+ for (CbFieldView& KeyView : KeyArray)
{
- IoBuffer Buffer = Result.Flatten();
- if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None)
+ if (auto It = Index.find(KeyView.AsHash()); It != Index.end())
{
- return LoadCompactBinaryObject(Buffer);
+ KeysIndexes.push_back(It.value());
+ }
+ else
+ {
+ KeysIndexes.push_back(PayloadIndex());
+ }
+ }
+
+ size_t KeyIndexOffset = 0;
+ CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView();
+ for (CbFieldView& TimeStampView : TimeStampArray)
+ {
+ const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
+ if (KeyIndex)
+ {
+ AccessTimes[KeyIndex] = TimeStampView.AsInt64();
+ }
+ }
+
+ KeyIndexOffset = 0;
+ CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView();
+ CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView();
+ if (RawHashArray.Num() == RawSizeArray.Num())
+ {
+ auto RawHashIt = RawHashArray.CreateViewIterator();
+ auto RawSizeIt = RawSizeArray.CreateViewIterator();
+ while (RawHashIt != CbFieldViewIterator())
+ {
+ const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
+
+ if (KeyIndex)
+ {
+ uint64_t RawSize = RawSizeIt.AsUInt64();
+ IoHash RawHash = RawHashIt.AsHash();
+ if (RawSize != 0 || RawHash != IoHash::Zero)
+ {
+ BucketPayload& Payload = Payloads[KeyIndex];
+ Bucket.SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash});
+ }
+ }
+
+ RawHashIt++;
+ RawSizeIt++;
}
}
+ else
+ {
+ ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath);
+ }
+}
- return CbObject();
+Oid
+BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath)
+{
+ const Oid BucketId = Oid::NewOid();
+
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << BucketId;
+ Writer << "Version"sv << CurrentDiskBucketVersion;
+ Manifest = Writer.Save();
+ WriteFile(ManifestPath, Manifest.GetBuffer().AsIoBuffer());
+
+ return BucketId;
+}
+
+IoBuffer
+BucketManifestSerializer::MakeManifest(const Oid& BucketId,
+ ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
+ std::vector<AccessTime>&& AccessTimes,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas)
+{
+ using namespace std::literals;
+
+ ZEN_TRACE_CPU("Z$::MakeManifest");
+
+ size_t ItemCount = Index.size();
+
+ // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth
+ // And we don't need to reallocate the underlying buffer in almost every case
+ const size_t EstimatedSizePerItem = 54u;
+ const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128);
+ CbObjectWriter Writer(ReserveSize);
+
+ Writer << "BucketId"sv << BucketId;
+ Writer << "Version"sv << CurrentDiskBucketVersion;
+
+ if (!Index.empty())
+ {
+ Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size()));
+ Writer.BeginArray("Keys"sv);
+ for (auto& Kv : Index)
+ {
+ const IoHash& Key = Kv.first;
+ Writer.AddHash(Key);
+ }
+ Writer.EndArray();
+
+ Writer.BeginArray("Timestamps"sv);
+ for (auto& Kv : Index)
+ {
+ GcClock::Tick AccessTime = AccessTimes[Kv.second];
+ Writer.AddInteger(AccessTime);
+ }
+ Writer.EndArray();
+
+ if (!MetaDatas.empty())
+ {
+ Writer.BeginArray("RawHash"sv);
+ for (auto& Kv : Index)
+ {
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second];
+ if (Payload.MetaData)
+ {
+ Writer.AddHash(MetaDatas[Payload.MetaData].RawHash);
+ }
+ else
+ {
+ Writer.AddHash(IoHash::Zero);
+ }
+ }
+ Writer.EndArray();
+
+ Writer.BeginArray("RawSize"sv);
+ for (auto& Kv : Index)
+ {
+ const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second];
+ if (Payload.MetaData)
+ {
+ Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize);
+ }
+ else
+ {
+ Writer.AddInteger(0);
+ }
+ }
+ Writer.EndArray();
+ }
+ }
+
+ Manifest = Writer.Save();
+ return Manifest.GetBuffer().AsIoBuffer();
+}
+
+IoBuffer
+BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount)
+{
+ m_ManifestEntryCount = EntryCount;
+
+ CbObjectWriter Writer;
+ Writer << "BucketId"sv << BucketId;
+ Writer << "Version"sv << CurrentDiskBucketVersion;
+ Writer << "Count"sv << EntryCount;
+ Writer << "UsingMetaFile"sv << true;
+ Manifest = Writer.Save();
+
+ return Manifest.GetBuffer().AsIoBuffer();
+}
+
+bool
+BucketManifestSerializer::ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket,
+ std::filesystem::path SidecarPath,
+ ZenCacheDiskLayer::CacheBucket::IndexMap& Index,
+ std::vector<AccessTime>& AccessTimes,
+ std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads)
+{
+ ZEN_ASSERT(AccessTimes.size() == Payloads.size());
+
+ std::error_code Ec;
+
+ BasicFile SidecarFile;
+ SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to open sidecar file '{}'", SidecarPath));
+ }
+
+ uint64_t FileSize = SidecarFile.FileSize();
+
+ auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); });
+
+ if (FileSize < sizeof(BucketMetaHeader))
+ {
+ return false;
+ }
+
+ BasicFileBuffer Sidecar(SidecarFile, 128 * 1024);
+
+ BucketMetaHeader Header;
+ Sidecar.Read(&Header, sizeof Header, 0);
+
+ if (!Header.IsValid())
+ {
+ return false;
+ }
+
+ if (Header.Version != BucketMetaHeader::Version1)
+ {
+ return false;
+ }
+
+ const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData);
+ if (Header.EntryCount > ExpectedEntryCount)
+ {
+ return false;
+ }
+
+ InvalidGuard.Dismiss();
+
+ uint64_t RemainingEntryCount = ExpectedEntryCount;
+ uint64_t EntryCount = 0;
+ uint64_t CurrentReadOffset = sizeof(Header);
+
+ while (RemainingEntryCount--)
+ {
+ const ManifestData* Entry = Sidecar.MakeView<ManifestData>(CurrentReadOffset);
+ CurrentReadOffset += sizeof(ManifestData);
+
+ if (auto It = Index.find(Entry->Key); It != Index.end())
+ {
+ PayloadIndex PlIndex = It.value();
+
+ ZEN_ASSERT(size_t(PlIndex) <= Payloads.size());
+
+ ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex];
+
+ AccessTimes[PlIndex] = Entry->Timestamp;
+
+ if (Entry->RawSize && Entry->RawHash != IoHash::Zero)
+ {
+ Bucket.SetMetaData(PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash});
+ }
+ }
+
+ EntryCount++;
+ }
+
+ ZEN_ASSERT(EntryCount == ExpectedEntryCount);
+
+ return true;
+}
+
+void
+BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path& SidecarPath,
+ uint64_t SnapshotLogPosition,
+ ZenCacheDiskLayer::CacheBucket::IndexMap&& Index,
+ std::vector<AccessTime>&& AccessTimes,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads,
+ const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas)
+{
+ BucketMetaHeader Header;
+ Header.EntryCount = m_ManifestEntryCount;
+ Header.LogPosition = SnapshotLogPosition;
+ Header.Checksum = Header.ComputeChecksum(Header);
+
+ std::error_code Ec;
+
+ TemporaryFile SidecarFile;
+ SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath()));
+ }
+
+ SidecarFile.Write(&Header, sizeof Header, 0);
+
+ // TODO: make this batching for better performance
+ {
+ uint64_t WriteOffset = sizeof Header;
+
+ BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024);
+
+ for (auto& Kv : Index)
+ {
+ const IoHash& Key = Kv.first;
+ const PayloadIndex PlIndex = Kv.second;
+
+ IoHash RawHash = IoHash::Zero;
+ uint64_t RawSize = 0;
+
+ if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData)
+ {
+ RawHash = MetaDatas[MetaIndex].RawHash;
+ RawSize = MetaDatas[MetaIndex].RawSize;
+ }
+
+ ManifestData ManifestEntry =
+ {.Key = Key, .Timestamp = AccessTimes[PlIndex], .RawHash = RawHash, .Padding_0 = 0, .RawSize = RawSize, .Padding_1 = 0};
+
+ SidecarWriter.Write(&ManifestEntry, sizeof ManifestEntry, WriteOffset);
+
+ WriteOffset += sizeof ManifestEntry;
+ }
+ }
+
+ SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec);
+
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath));
+ }
}
//////////////////////////////////////////////////////////////////////////
@@ -207,167 +679,67 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
CreateDirectories(m_BucketDir);
- std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"};
+ std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
bool IsNew = false;
- CbObject Manifest = LoadCompactBinaryObject(ManifestPath);
+ BucketManifestSerializer ManifestReader;
- if (Manifest)
+ if (ManifestReader.Open(ManifestPath))
{
- m_BucketId = Manifest["BucketId"sv].AsObjectId();
+ m_BucketId = ManifestReader.GetBucketId();
if (m_BucketId == Oid::Zero)
{
return false;
}
- const uint32_t Version = Manifest["Version"sv].AsUInt32(0);
- if (Version != CurrentDiskBucketVersion)
+
+ uint32_t Version = 0;
+ if (ManifestReader.IsCurrentVersion(/* out */ Version) == false)
{
- ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion);
+ ZEN_INFO("Wiping bucket '{}', found version {}, required version {}",
+ BucketDir,
+ Version,
+ BucketManifestSerializer::CurrentDiskBucketVersion);
IsNew = true;
}
}
else if (AllowCreate)
{
- m_BucketId.Generate();
-
- CbObjectWriter Writer;
- Writer << "BucketId"sv << m_BucketId;
- Writer << "Version"sv << CurrentDiskBucketVersion;
- Manifest = Writer.Save();
- WriteFile(m_BucketDir / "zen_manifest", Manifest.GetBuffer().AsIoBuffer());
- IsNew = true;
+ m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath);
+ IsNew = true;
}
else
{
return false;
}
- OpenLog(IsNew);
+ InitializeIndexFromDisk(IsNew);
- if (!IsNew)
+ if (IsNew)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate::Manifest");
-
- Stopwatch Timer;
- const auto _ =
- MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
-
- const uint64_t Count = Manifest["Count"sv].AsUInt64(0);
- if (Count != 0)
- {
- std::vector<PayloadIndex> KeysIndexes;
- KeysIndexes.reserve(Count);
- CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView();
- for (CbFieldView& KeyView : KeyArray)
- {
- if (auto It = m_Index.find(KeyView.AsHash()); It != m_Index.end())
- {
- KeysIndexes.push_back(It.value());
- }
- else
- {
- KeysIndexes.push_back(PayloadIndex());
- }
- }
- size_t KeyIndexOffset = 0;
- CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView();
- for (CbFieldView& TimeStampView : TimeStampArray)
- {
- const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
- if (KeyIndex)
- {
- m_AccessTimes[KeyIndex] = TimeStampView.AsInt64();
- }
- }
- KeyIndexOffset = 0;
- CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView();
- CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView();
- if (RawHashArray.Num() == RawSizeArray.Num())
- {
- auto RawHashIt = RawHashArray.CreateViewIterator();
- auto RawSizeIt = RawSizeArray.CreateViewIterator();
- while (RawHashIt != CbFieldViewIterator())
- {
- const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++];
-
- if (KeyIndex)
- {
- uint64_t RawSize = RawSizeIt.AsUInt64();
- IoHash RawHash = RawHashIt.AsHash();
- if (RawSize != 0 || RawHash != IoHash::Zero)
- {
- BucketPayload& Payload = m_Payloads[KeyIndex];
- SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash});
- }
- }
-
- RawHashIt++;
- RawSizeIt++;
- }
- }
- else
- {
- ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath);
- }
- }
-
- ////// Legacy format read
- {
- for (CbFieldView Entry : Manifest["Timestamps"sv])
- {
- const CbObjectView Obj = Entry.AsObjectView();
- const IoHash Key = Obj["Key"sv].AsHash();
-
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64();
- }
- }
- for (CbFieldView Entry : Manifest["RawInfo"sv])
- {
- const CbObjectView Obj = Entry.AsObjectView();
- const IoHash Key = Obj["Key"sv].AsHash();
- if (auto It = m_Index.find(Key); It != m_Index.end())
- {
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
-
- const IoHash RawHash = Obj["RawHash"sv].AsHash();
- const uint64_t RawSize = Obj["RawSize"sv].AsUInt64();
-
- if (RawHash == IoHash::Zero || RawSize == 0)
- {
- ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex);
- }
-
- BucketPayload& Payload = m_Payloads[EntryIndex];
- SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash});
- }
- }
- }
+ return true;
}
+ ManifestReader.ParseManifest(*this, ManifestPath, m_Index, m_AccessTimes, m_Payloads);
+
return true;
}
void
-ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
+ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeIndexSnapshot");
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot");
- uint64_t LogCount = m_SlogFile.GetLogCount();
+ const uint64_t LogCount = m_SlogFile.GetLogCount();
if (m_LogFlushPosition == LogCount)
{
return;
}
ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir);
- uint64_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
+ const uint64_t EntryCount = m_Index.size();
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
m_BucketDir,
EntryCount,
@@ -376,42 +748,11 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t()
namespace fs = std::filesystem;
- fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
- fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName);
-
- // Move index away, we keep it if something goes wrong
- if (fs::is_regular_file(STmpIndexPath))
- {
- std::error_code Ec;
- if (!fs::remove(STmpIndexPath, Ec) || Ec)
- {
- ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", STmpIndexPath, Ec.message());
- return;
- }
- }
+ fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
try
{
- if (fs::is_regular_file(IndexPath))
- {
- fs::rename(IndexPath, STmpIndexPath);
- }
-
- // Write the current state of the location map to a new index state
- std::vector<DiskIndexEntry> Entries;
- Entries.resize(m_Index.size());
-
- {
- uint64_t EntryIndex = 0;
- for (auto& Entry : m_Index)
- {
- DiskIndexEntry& IndexEntry = Entries[EntryIndex++];
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = m_Payloads[Entry.second].Location;
- }
- }
-
- uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + Entries.size() * sizeof(DiskIndexEntry);
+ const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry);
std::error_code Error;
DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
if (Error)
@@ -431,45 +772,67 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t()
fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize)));
}
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
- CacheBucketIndexHeader Header = {.EntryCount = Entries.size(),
- .LogPosition = LogCount,
- .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
+ TemporaryFile ObjectIndexFile;
+ std::error_code Ec;
+ ObjectIndexFile.CreateTemporary(m_BucketDir, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir));
+ }
- Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
- ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
- ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
- ObjectIndexFile.Flush();
- ObjectIndexFile.Close();
- EntryCount = Entries.size();
- m_LogFlushPosition = LogCount;
- }
- catch (std::exception& Err)
- {
- ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());
+ {
+ // This is in a separate scope just to ensure IndexWriter goes out
+ // of scope before the file is flushed/closed, in order to ensure
+ // all data is written to the file
+ BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024);
+
+ CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)};
+
+ Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header);
+ IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0);
+
+ uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader);
- // Restore any previous snapshot
+ for (auto& Entry : m_Index)
+ {
+ DiskIndexEntry IndexEntry;
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Location = m_Payloads[Entry.second].Location;
+ IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset);
- if (fs::is_regular_file(STmpIndexPath))
+ IndexWriteOffset += sizeof(DiskIndexEntry);
+ }
+
+ IndexWriter.Flush();
+ }
+
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec);
+ if (Ec)
{
- std::error_code Ec;
- fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless
- fs::rename(STmpIndexPath, IndexPath, Ec);
- if (Ec)
+ std::filesystem::path TempFilePath = ObjectIndexFile.GetPath();
+ ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message());
+
+ if (std::filesystem::is_regular_file(TempFilePath))
{
- ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", STmpIndexPath, Ec.message());
+ if (!std::filesystem::remove(TempFilePath, Ec) || Ec)
+ {
+ ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempFilePath, Ec.message());
+ }
}
}
- }
- if (fs::is_regular_file(STmpIndexPath))
- {
- std::error_code Ec;
- if (!fs::remove(STmpIndexPath, Ec) || Ec)
+ else
{
- ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", STmpIndexPath, Ec.message());
+ // We must only update the log flush position once the snapshot write succeeds
+ m_LogFlushPosition = LogCount;
}
}
+ catch (std::exception& Err)
+ {
+ ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what());
+ }
}
uint64_t
@@ -477,77 +840,88 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile");
- if (std::filesystem::is_regular_file(IndexPath))
+ if (!std::filesystem::is_regular_file(IndexPath))
{
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CacheBucketIndexHeader))
- {
- CacheBucketIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) &&
- (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0))
- {
- switch (Header.Version)
- {
- case CacheBucketIndexHeader::Version2:
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
- if (Header.EntryCount > ExpectedEntryCount)
- {
- break;
- }
- size_t EntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' index containing {} entries in {}",
- IndexPath,
- EntryCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
+ return 0;
+ }
- m_Configuration.PayloadAlignment = Header.PayloadAlignment;
+ auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); });
- std::vector<DiskIndexEntry> Entries;
- Entries.resize(Header.EntryCount);
- ObjectIndexFile.Read(Entries.data(),
- Header.EntryCount * sizeof(DiskIndexEntry),
- sizeof(CacheBucketIndexHeader));
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t FileSize = ObjectIndexFile.FileSize();
+ if (FileSize < sizeof(CacheBucketIndexHeader))
+ {
+ return 0;
+ }
- m_Payloads.reserve(Header.EntryCount);
- m_Index.reserve(Header.EntryCount);
+ CacheBucketIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- std::string InvalidEntryReason;
- for (const DiskIndexEntry& Entry : Entries)
- {
- if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
- continue;
- }
- PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
- m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location});
- m_Index.insert_or_assign(Entry.Key, EntryIndex);
- EntryCount++;
- }
- m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
- if (m_Configuration.EnableReferenceCaching)
- {
- m_FirstReferenceIndex.resize(m_Payloads.size());
- }
- OutVersion = CacheBucketIndexHeader::Version2;
- return Header.LogPosition;
- }
- break;
- default:
- break;
- }
- }
+ if (!Header.IsValid())
+ {
+ return 0;
+ }
+
+ if (Header.Version != CacheBucketIndexHeader::Version2)
+ {
+ return 0;
+ }
+
+ const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
+ if (Header.EntryCount > ExpectedEntryCount)
+ {
+ return 0;
+ }
+
+ InvalidGuard.Dismiss();
+
+ size_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ m_Configuration.PayloadAlignment = Header.PayloadAlignment;
+
+ m_Payloads.reserve(Header.EntryCount);
+ m_Index.reserve(Header.EntryCount);
+
+ BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024);
+
+ uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader);
+ uint64_t RemainingEntryCount = Header.EntryCount;
+
+ std::string InvalidEntryReason;
+ while (RemainingEntryCount--)
+ {
+ const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset);
+ CurrentReadOffset += sizeof(DiskIndexEntry);
+
+ if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
}
- ZEN_WARN("skipping invalid index file '{}'", IndexPath);
+
+ const PayloadIndex EntryIndex = PayloadIndex(EntryCount);
+ m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location});
+ m_Index.insert_or_assign(Entry->Key, EntryIndex);
+
+ EntryCount++;
}
- return 0;
+
+ ZEN_ASSERT(EntryCount == m_Payloads.size());
+
+ m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount()));
+
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.resize(EntryCount);
+ }
+
+ OutVersion = CacheBucketIndexHeader::Version2;
+ return Header.LogPosition;
}
uint64_t
@@ -555,61 +929,73 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog");
- if (std::filesystem::is_regular_file(LogPath))
+ if (!std::filesystem::is_regular_file(LogPath))
{
- uint64_t LogEntryCount = 0;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
- TCasLogFile<DiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- if (CasLog.Initialize())
- {
- uint64_t EntryCount = CasLog.GetLogCount();
- if (EntryCount < SkipEntryCount)
- {
- ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
- SkipEntryCount = 0;
- }
- LogEntryCount = EntryCount - SkipEntryCount;
- uint64_t InvalidEntryCount = 0;
- CasLog.Replay(
- [&](const DiskIndexEntry& Record) {
- std::string InvalidEntryReason;
- if (Record.Location.Flags & DiskLocation::kTombStone)
- {
- m_Index.erase(Record.Key);
- return;
- }
- if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
- ++InvalidEntryCount;
- return;
- }
- PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
- m_Payloads.emplace_back(BucketPayload{.Location = Record.Location});
- m_Index.insert_or_assign(Record.Key, EntryIndex);
- },
- SkipEntryCount);
- m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
- if (m_Configuration.EnableReferenceCaching)
+ return 0;
+ }
+
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ TCasLogFile<DiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (!CasLog.Initialize())
+ {
+ return 0;
+ }
+
+ const uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+
+ LogEntryCount = EntryCount - SkipEntryCount;
+ uint64_t InvalidEntryCount = 0;
+
+ CasLog.Replay(
+ [&](const DiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Location.Flags & DiskLocation::kTombStone)
{
- m_FirstReferenceIndex.resize(m_Payloads.size());
+ // Note: this leaves m_Payloads and other arrays with 'holes' in them
+ m_Index.erase(Record.Key);
+ return;
}
- if (InvalidEntryCount)
+
+ if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason))
{
- ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
}
- return LogEntryCount;
- }
+ PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size());
+ m_Payloads.emplace_back(BucketPayload{.Location = Record.Location});
+ m_Index.insert_or_assign(Record.Key, EntryIndex);
+ },
+ SkipEntryCount);
+
+ m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount()));
+
+ if (m_Configuration.EnableReferenceCaching)
+ {
+ m_FirstReferenceIndex.resize(m_Payloads.size());
}
- return 0;
+
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir);
+ }
+
+ return LogEntryCount;
};
void
-ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
+ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(const bool IsNew)
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog");
@@ -661,15 +1047,14 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
}
else if (fs::is_regular_file(LogPath))
{
- ZEN_WARN("removing invalid cas log at '{}'", LogPath);
+ ZEN_WARN("removing invalid log at '{}'", LogPath);
std::filesystem::remove(LogPath);
}
}
m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
- std::vector<BlockStoreLocation> KnownLocations;
- KnownLocations.reserve(m_Index.size());
+ BlockStore::BlockIndexSet KnownBlocks;
for (const auto& Entry : m_Index)
{
size_t EntryIndex = Entry.second;
@@ -679,19 +1064,19 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
{
m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
- continue;
}
- const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment);
- KnownLocations.push_back(BlockLocation);
+ else
+ {
+ const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment);
+ KnownBlocks.Add(BlockLocation.BlockIndex);
+ }
}
-
- m_BlockStore.SyncExistingBlocksOnDisk(KnownLocations);
+ m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
if (IsNew || LogEntryCount > 0)
{
- MakeIndexSnapshot();
+ WriteIndexSnapshot();
}
- // TODO: should validate integrity of container files here
}
void
@@ -981,20 +1366,7 @@ ZenCacheDiskLayer::CacheBucket::Flush()
m_BlockStore.Flush(/*ForceNewBlock*/ false);
m_SlogFile.Flush();
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- std::vector<BucketMetaData> MetaDatas;
- IndexMap Index;
-
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- MakeIndexSnapshot();
- Index = m_Index;
- Payloads = m_Payloads;
- AccessTimes = m_AccessTimes;
- MetaDatas = m_MetaDatas;
- }
- SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas));
+ SaveSnapshot();
}
catch (std::exception& Ex)
{
@@ -1003,113 +1375,85 @@ ZenCacheDiskLayer::CacheBucket::Flush()
}
void
-ZenCacheDiskLayer::CacheBucket::SaveManifest(CbObject&& Manifest, const std::function<uint64_t()>& ClaimDiskReserveFunc)
+ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest");
try
{
- IoBuffer Buffer = Manifest.GetBuffer().AsIoBuffer();
-
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
- if (Error)
- {
- ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
- return;
- }
- bool EnoughSpace = Space.Free >= Buffer.GetSize() + 1024 * 512;
- if (!EnoughSpace)
- {
- uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
- EnoughSpace = (Space.Free + ReclaimedSpace) >= Buffer.GetSize() + 1024 * 512;
- }
- if (!EnoughSpace)
- {
- ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", m_BucketDir, NiceBytes(Buffer.GetSize()));
- return;
- }
- WriteFile(m_BucketDir / "zen_manifest", Buffer);
- }
- catch (std::exception& Err)
- {
- ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what());
- }
-}
+ bool UseLegacyScheme = false;
+ uint64_t SidecarSize = 0;
-CbObject
-ZenCacheDiskLayer::CacheBucket::MakeManifest(IndexMap&& Index,
- std::vector<AccessTime>&& AccessTimes,
- const std::vector<BucketPayload>& Payloads,
- const std::vector<BucketMetaData>& MetaDatas)
-{
- using namespace std::literals;
+ IoBuffer Buffer;
+ BucketManifestSerializer ManifestWriter;
- ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeManifest");
-
- size_t ItemCount = Index.size();
-
- // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth
- // And we don't need to reallocate theunderying buffer in almost every case
- const size_t EstimatedSizePerItem = 54u;
- const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128);
- CbObjectWriter Writer(ReserveSize);
+ {
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketPayload> Payloads;
+ std::vector<BucketMetaData> MetaDatas;
+ IndexMap Index;
- Writer << "BucketId"sv << m_BucketId;
- Writer << "Version"sv << CurrentDiskBucketVersion;
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ WriteIndexSnapshot();
+ // Note: this copy could be eliminated on shutdown to
+ // reduce memory usage and execution time
+ Index = m_Index;
+ Payloads = m_Payloads;
+ AccessTimes = m_AccessTimes;
+ MetaDatas = m_MetaDatas;
+ }
- if (!Index.empty())
- {
- Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size()));
- Writer.BeginArray("Keys"sv);
- for (auto& Kv : Index)
- {
- const IoHash& Key = Kv.first;
- Writer.AddHash(Key);
- }
- Writer.EndArray();
+ if (UseLegacyScheme)
+ {
+ Buffer = ManifestWriter.MakeManifest(m_BucketId, std::move(Index), std::move(AccessTimes), Payloads, MetaDatas);
+ }
+ else
+ {
+ const uint64_t EntryCount = Index.size();
+ Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount);
+ SidecarSize = ManifestWriter.GetSidecarSize();
+ }
- Writer.BeginArray("Timestamps"sv);
- for (auto& Kv : Index)
- {
- GcClock::Tick AccessTime = AccessTimes[Kv.second];
- Writer.AddInteger(AccessTime);
- }
- Writer.EndArray();
+ const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512;
- if (!MetaDatas.empty())
- {
- Writer.BeginArray("RawHash"sv);
- for (auto& Kv : Index)
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
+ if (Error)
{
- const BucketPayload& Payload = Payloads[Kv.second];
- if (Payload.MetaData)
- {
- Writer.AddHash(MetaDatas[Payload.MetaData].RawHash);
- }
- else
- {
- Writer.AddHash(IoHash::Zero);
- }
+ ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
+ return;
+ }
+ bool EnoughSpace = Space.Free >= RequiredSpace;
+ if (!EnoughSpace)
+ {
+ uint64_t ReclaimedSpace = ClaimDiskReserveFunc();
+ EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace;
+ }
+ if (!EnoughSpace)
+ {
+ ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}",
+ m_BucketDir,
+ NiceBytes(Buffer.GetSize()));
+ return;
}
- Writer.EndArray();
- Writer.BeginArray("RawSize"sv);
- for (auto& Kv : Index)
+ if (!UseLegacyScheme)
{
- const BucketPayload& Payload = Payloads[Kv.second];
- if (Payload.MetaData)
- {
- Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize);
- }
- else
- {
- Writer.AddInteger(0);
- }
+ ManifestWriter.WriteSidecarFile(GetMetaPath(m_BucketDir, m_BucketName),
+ m_LogFlushPosition,
+ std::move(Index),
+ std::move(AccessTimes),
+ Payloads,
+ MetaDatas);
}
- Writer.EndArray();
}
+
+ std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName);
+ WriteFile(ManifestPath, Buffer);
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what());
}
- return Writer.Save();
}
IoHash
@@ -1683,20 +2027,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
try
{
- std::vector<AccessTime> AccessTimes;
- std::vector<BucketPayload> Payloads;
- std::vector<BucketMetaData> MetaDatas;
- IndexMap Index;
- {
- RwLock::SharedLockScope IndexLock(m_IndexLock);
- MakeIndexSnapshot([&]() { return GcCtx.ClaimGCReserve(); });
- Index = m_Index;
- Payloads = m_Payloads;
- AccessTimes = m_AccessTimes;
- MetaDatas = m_MetaDatas;
- }
- SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas),
- [&]() { return GcCtx.ClaimGCReserve(); });
+ SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); });
}
catch (std::exception& Ex)
{