diff options
Diffstat (limited to 'src/zenserver/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 1213 |
1 files changed, 772 insertions, 441 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 6ab3c7746..0767086ce 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -25,12 +25,6 @@ namespace { #pragma pack(push) #pragma pack(1) - // We use this to indicate if a on disk bucket needs wiping - // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scrable the references - // to block items. - // See: https://github.com/EpicGames/zen/pull/299 - static const uint32_t CurrentDiskBucketVersion = 1; - struct CacheBucketIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; @@ -48,12 +42,75 @@ namespace { { return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } + + bool IsValid() const + { + if (Magic != ExpectedMagic) + { + return false; + } + + if (Checksum != ComputeChecksum(*this)) + { + return false; + } + + if (PayloadAlignment == 0) + { + return false; + } + + return true; + } }; static_assert(sizeof(CacheBucketIndexHeader) == 32); + struct BucketMetaHeader + { + static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta'; + static constexpr uint32_t Version1 = 1; + static constexpr uint32_t CurrentVersion = Version1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t Padding = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const BucketMetaHeader& Header) + { + return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + + bool IsValid() const + { + if (Magic != ExpectedMagic) + { + return false; + } + + if (Checksum != ComputeChecksum(*this)) + { + return false; + } + + if (Padding != 0) + { + return false; + } + + return true; + } + }; + + static_assert(sizeof(BucketMetaHeader) == 32); + #pragma pack(pop) + ////////////////////////////////////////////////////////////////////////// + template<typename T> void Reset(T& V) { @@ -63,15 +120,16 @@ namespace { const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; + const char* MetaExtension = ".meta"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + IndexExtension); } - std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { - return BucketDir / (BucketName + ".tmp"); + return BucketDir / (BucketName + MetaExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) @@ -79,6 +137,12 @@ namespace { return BucketDir / (BucketName + LogExtension); } + std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + ZEN_UNUSED(BucketName); + return BucketDir / "zen_manifest"; + } + bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) @@ -147,22 +211,430 @@ namespace { } // namespace namespace fs = std::filesystem; +using namespace std::literals; + +class BucketManifestSerializer +{ + using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; + using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData; + + using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; + using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; + +public: + // We use this to indicate if a on disk bucket needs wiping + // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references + // to block items. + // See: https://github.com/EpicGames/zen/pull/299 + static inline const uint32_t CurrentDiskBucketVersion = 1; + + bool Open(std::filesystem::path ManifestPath) + { + Manifest = LoadCompactBinaryObject(ManifestPath); + return !!Manifest; + } + + Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); } + + bool IsCurrentVersion(uint32_t& OutVersion) const + { + OutVersion = Manifest["Version"sv].AsUInt32(0); + return OutVersion == CurrentDiskBucketVersion; + } + + void ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path ManifestPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads); + + Oid GenerateNewManifest(std::filesystem::path ManifestPath); + + IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount); + uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); } + void WriteSidecarFile(const std::filesystem::path& SidecarPath, + uint64_t SnapshotLogPosition, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector<AccessTime>&& AccessTimes, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas); + bool ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path SidecarPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads); + + IoBuffer MakeManifest(const Oid& BucketId, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector<AccessTime>&& AccessTimes, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas); + + CbObject Manifest; + +private: + CbObject LoadCompactBinaryObject(const fs::path& Path) + { + FileContents Result = ReadFile(Path); + + if (!Result.ErrorCode) + { + IoBuffer Buffer = Result.Flatten(); + if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + { + return zen::LoadCompactBinaryObject(Buffer); + } + } + + return CbObject(); + } + + uint64_t m_ManifestEntryCount = 0; + + struct ManifestData + { + IoHash Key; // 20 + AccessTime Timestamp; // 4 + IoHash RawHash; // 20 + uint32_t Padding_0; // 4 + size_t RawSize; // 8 + uint64_t Padding_1; // 8 + }; + + static_assert(sizeof(ManifestData) == 64); +}; -static CbObject -LoadCompactBinaryObject(const fs::path& Path) +void +BucketManifestSerializer::ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path ManifestPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads) { - FileContents Result = ReadFile(Path); + if (Manifest["UsingMetaFile"sv].AsBool()) + { + ReadSidecarFile(Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads); + + return; + } + + ZEN_TRACE_CPU("Z$::ParseManifest"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + + const uint64_t Count = Manifest["Count"sv].AsUInt64(0); + std::vector<PayloadIndex> KeysIndexes; + KeysIndexes.reserve(Count); - if (!Result.ErrorCode) + CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); + for (CbFieldView& KeyView : KeyArray) { - IoBuffer Buffer = Result.Flatten(); - if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + if (auto It = Index.find(KeyView.AsHash()); It != Index.end()) { - return LoadCompactBinaryObject(Buffer); + KeysIndexes.push_back(It.value()); + } + else + { + KeysIndexes.push_back(PayloadIndex()); + } + } + + size_t KeyIndexOffset = 0; + CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); + for (CbFieldView& TimeStampView : TimeStampArray) + { + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex) + { + AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + } + } + + KeyIndexOffset = 0; + CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); + CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); + if (RawHashArray.Num() == RawSizeArray.Num()) + { + auto RawHashIt = RawHashArray.CreateViewIterator(); + auto RawSizeIt = RawSizeArray.CreateViewIterator(); + while (RawHashIt != CbFieldViewIterator()) + { + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + + if (KeyIndex) + { + uint64_t RawSize = RawSizeIt.AsUInt64(); + IoHash RawHash = RawHashIt.AsHash(); + if (RawSize != 0 || RawHash != IoHash::Zero) + { + BucketPayload& Payload = Payloads[KeyIndex]; + Bucket.SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); + } + } + + RawHashIt++; + RawSizeIt++; } } + else + { + ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); + } +} - return CbObject(); +Oid +BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath) +{ + const Oid BucketId = Oid::NewOid(); + + CbObjectWriter Writer; + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + Manifest = Writer.Save(); + WriteFile(ManifestPath, Manifest.GetBuffer().AsIoBuffer()); + + return BucketId; +} + +IoBuffer +BucketManifestSerializer::MakeManifest(const Oid& BucketId, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector<AccessTime>&& AccessTimes, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas) +{ + using namespace std::literals; + + ZEN_TRACE_CPU("Z$::MakeManifest"); + + size_t ItemCount = Index.size(); + + // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth + // And we don't need to reallocate the underlying buffer in almost every case + const size_t EstimatedSizePerItem = 54u; + const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); + CbObjectWriter Writer(ReserveSize); + + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + + if (!Index.empty()) + { + Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size())); + Writer.BeginArray("Keys"sv); + for (auto& Kv : Index) + { + const IoHash& Key = Kv.first; + Writer.AddHash(Key); + } + Writer.EndArray(); + + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : Index) + { + GcClock::Tick AccessTime = AccessTimes[Kv.second]; + Writer.AddInteger(AccessTime); + } + Writer.EndArray(); + + if (!MetaDatas.empty()) + { + Writer.BeginArray("RawHash"sv); + for (auto& Kv : Index) + { + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; + if (Payload.MetaData) + { + Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); + } + else + { + Writer.AddHash(IoHash::Zero); + } + } + Writer.EndArray(); + + Writer.BeginArray("RawSize"sv); + for (auto& Kv : Index) + { + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; + if (Payload.MetaData) + { + Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); + } + else + { + Writer.AddInteger(0); + } + } + Writer.EndArray(); + } + } + + Manifest = Writer.Save(); + return Manifest.GetBuffer().AsIoBuffer(); +} + +IoBuffer +BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount) +{ + m_ManifestEntryCount = EntryCount; + + CbObjectWriter Writer; + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + Writer << "Count"sv << EntryCount; + Writer << "UsingMetaFile"sv << true; + Manifest = Writer.Save(); + + return Manifest.GetBuffer().AsIoBuffer(); +} + +bool +BucketManifestSerializer::ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path SidecarPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads) +{ + ZEN_ASSERT(AccessTimes.size() == Payloads.size()); + + std::error_code Ec; + + BasicFile SidecarFile; + SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to open sidecar file '{}'", SidecarPath)); + } + + uint64_t FileSize = SidecarFile.FileSize(); + + auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); }); + + if (FileSize < sizeof(BucketMetaHeader)) + { + return false; + } + + BasicFileBuffer Sidecar(SidecarFile, 128 * 1024); + + BucketMetaHeader Header; + Sidecar.Read(&Header, sizeof Header, 0); + + if (!Header.IsValid()) + { + return false; + } + + if (Header.Version != BucketMetaHeader::Version1) + { + return false; + } + + const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData); + if (Header.EntryCount > ExpectedEntryCount) + { + return false; + } + + InvalidGuard.Dismiss(); + + uint64_t RemainingEntryCount = ExpectedEntryCount; + uint64_t EntryCount = 0; + uint64_t CurrentReadOffset = sizeof(Header); + + while (RemainingEntryCount--) + { + const ManifestData* Entry = Sidecar.MakeView<ManifestData>(CurrentReadOffset); + CurrentReadOffset += sizeof(ManifestData); + + if (auto It = Index.find(Entry->Key); It != Index.end()) + { + PayloadIndex PlIndex = It.value(); + + ZEN_ASSERT(size_t(PlIndex) <= Payloads.size()); + + ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex]; + + AccessTimes[PlIndex] = Entry->Timestamp; + + if (Entry->RawSize && Entry->RawHash != IoHash::Zero) + { + Bucket.SetMetaData(PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash}); + } + } + + EntryCount++; + } + + ZEN_ASSERT(EntryCount == ExpectedEntryCount); + + return true; +} + +void +BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path& SidecarPath, + uint64_t SnapshotLogPosition, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector<AccessTime>&& AccessTimes, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas) +{ + BucketMetaHeader Header; + Header.EntryCount = m_ManifestEntryCount; + Header.LogPosition = SnapshotLogPosition; + Header.Checksum = Header.ComputeChecksum(Header); + + std::error_code Ec; + + TemporaryFile SidecarFile; + SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath())); + } + + SidecarFile.Write(&Header, sizeof Header, 0); + + // TODO: make this batching for better performance + { + uint64_t WriteOffset = sizeof Header; + + BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024); + + for (auto& Kv : Index) + { + const IoHash& Key = Kv.first; + const PayloadIndex PlIndex = Kv.second; + + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0; + + if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData) + { + RawHash = MetaDatas[MetaIndex].RawHash; + RawSize = MetaDatas[MetaIndex].RawSize; + } + + ManifestData ManifestEntry = + {.Key = Key, .Timestamp = AccessTimes[PlIndex], .RawHash = RawHash, .Padding_0 = 0, .RawSize = RawSize, .Padding_1 = 0}; + + SidecarWriter.Write(&ManifestEntry, sizeof ManifestEntry, WriteOffset); + + WriteOffset += sizeof ManifestEntry; + } + } + + SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath)); + } } ////////////////////////////////////////////////////////////////////////// @@ -207,167 +679,67 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo CreateDirectories(m_BucketDir); - std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; + std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); bool IsNew = false; - CbObject Manifest = LoadCompactBinaryObject(ManifestPath); + BucketManifestSerializer ManifestReader; - if (Manifest) + if (ManifestReader.Open(ManifestPath)) { - m_BucketId = Manifest["BucketId"sv].AsObjectId(); + m_BucketId = ManifestReader.GetBucketId(); if (m_BucketId == Oid::Zero) { return false; } - const uint32_t Version = Manifest["Version"sv].AsUInt32(0); - if (Version != CurrentDiskBucketVersion) + + uint32_t Version = 0; + if (ManifestReader.IsCurrentVersion(/* out */ Version) == false) { - ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion); + ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", + BucketDir, + Version, + BucketManifestSerializer::CurrentDiskBucketVersion); IsNew = true; } } else if (AllowCreate) { - m_BucketId.Generate(); - - CbObjectWriter Writer; - Writer << "BucketId"sv << m_BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; - Manifest = Writer.Save(); - WriteFile(m_BucketDir / "zen_manifest", Manifest.GetBuffer().AsIoBuffer()); - IsNew = true; + m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath); + IsNew = true; } else { return false; } - OpenLog(IsNew); + InitializeIndexFromDisk(IsNew); - if (!IsNew) + if (IsNew) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate::Manifest"); - - Stopwatch Timer; - const auto _ = - MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - - const uint64_t Count = Manifest["Count"sv].AsUInt64(0); - if (Count != 0) - { - std::vector<PayloadIndex> KeysIndexes; - KeysIndexes.reserve(Count); - CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); - for (CbFieldView& KeyView : KeyArray) - { - if (auto It = m_Index.find(KeyView.AsHash()); It != m_Index.end()) - { - KeysIndexes.push_back(It.value()); - } - else - { - KeysIndexes.push_back(PayloadIndex()); - } - } - size_t KeyIndexOffset = 0; - CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); - for (CbFieldView& TimeStampView : TimeStampArray) - { - const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; - if (KeyIndex) - { - m_AccessTimes[KeyIndex] = TimeStampView.AsInt64(); - } - } - KeyIndexOffset = 0; - CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); - CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); - if (RawHashArray.Num() == RawSizeArray.Num()) - { - auto RawHashIt = RawHashArray.CreateViewIterator(); - auto RawSizeIt = RawSizeArray.CreateViewIterator(); - while (RawHashIt != CbFieldViewIterator()) - { - const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; - - if (KeyIndex) - { - uint64_t RawSize = RawSizeIt.AsUInt64(); - IoHash RawHash = RawHashIt.AsHash(); - if (RawSize != 0 || RawHash != IoHash::Zero) - { - BucketPayload& Payload = m_Payloads[KeyIndex]; - SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); - } - } - - RawHashIt++; - RawSizeIt++; - } - } - else - { - ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); - } - } - - ////// Legacy format read - { - for (CbFieldView Entry : Manifest["Timestamps"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); - } - } - for (CbFieldView Entry : Manifest["RawInfo"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - const IoHash RawHash = Obj["RawHash"sv].AsHash(); - const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); - - if (RawHash == IoHash::Zero || RawSize == 0) - { - ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); - } - - BucketPayload& Payload = m_Payloads[EntryIndex]; - SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); - } - } - } + return true; } + ManifestReader.ParseManifest(*this, ManifestPath, m_Index, m_AccessTimes, m_Payloads); + return true; } void -ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeIndexSnapshot"); + ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot"); - uint64_t LogCount = m_SlogFile.GetLogCount(); + const uint64_t LogCount = m_SlogFile.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { + const uint64_t EntryCount = m_Index.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", m_BucketDir, EntryCount, @@ -376,42 +748,11 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t() namespace fs = std::filesystem; - fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(STmpIndexPath)) - { - std::error_code Ec; - if (!fs::remove(STmpIndexPath, Ec) || Ec) - { - ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", STmpIndexPath, Ec.message()); - return; - } - } + fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); try { - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, STmpIndexPath); - } - - // Write the current state of the location map to a new index state - std::vector<DiskIndexEntry> Entries; - Entries.resize(m_Index.size()); - - { - uint64_t EntryIndex = 0; - for (auto& Entry : m_Index) - { - DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Location = m_Payloads[Entry.second].Location; - } - } - - uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + Entries.size() * sizeof(DiskIndexEntry); + const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) @@ -431,45 +772,67 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t() fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize))); } - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); - CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), - .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; + TemporaryFile ObjectIndexFile; + std::error_code Ec; + ObjectIndexFile.CreateTemporary(m_BucketDir, Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); + } - Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); - ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); - ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); - ObjectIndexFile.Flush(); - ObjectIndexFile.Close(); - EntryCount = Entries.size(); - m_LogFlushPosition = LogCount; - } - catch (std::exception& Err) - { - ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); + { + // This is in a separate scope just to ensure IndexWriter goes out + // of scope before the file is flushed/closed, in order to ensure + // all data is written to the file + BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024); + + CacheBucketIndexHeader Header = {.EntryCount = EntryCount, + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; + + Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); + IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + + uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader); - // Restore any previous snapshot + for (auto& Entry : m_Index) + { + DiskIndexEntry IndexEntry; + IndexEntry.Key = Entry.first; + IndexEntry.Location = m_Payloads[Entry.second].Location; + IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset); - if (fs::is_regular_file(STmpIndexPath)) + IndexWriteOffset += sizeof(DiskIndexEntry); + } + + IndexWriter.Flush(); + } + + ObjectIndexFile.Flush(); + ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); + if (Ec) { - std::error_code Ec; - fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless - fs::rename(STmpIndexPath, IndexPath, Ec); - if (Ec) + std::filesystem::path TempFilePath = ObjectIndexFile.GetPath(); + ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message()); + + if (std::filesystem::is_regular_file(TempFilePath)) { - ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", STmpIndexPath, Ec.message()); + if (!std::filesystem::remove(TempFilePath, Ec) || Ec) + { + ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempFilePath, Ec.message()); + } } } - } - if (fs::is_regular_file(STmpIndexPath)) - { - std::error_code Ec; - if (!fs::remove(STmpIndexPath, Ec) || Ec) + else { - ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", STmpIndexPath, Ec.message()); + // We must only update the log flush position once the snapshot write succeeds + m_LogFlushPosition = LogCount; } } + catch (std::exception& Err) + { + ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); + } } uint64_t @@ -477,77 +840,88 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index { ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile"); - if (std::filesystem::is_regular_file(IndexPath)) + if (!std::filesystem::is_regular_file(IndexPath)) { - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CacheBucketIndexHeader)) - { - CacheBucketIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && - (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0)) - { - switch (Header.Version) - { - case CacheBucketIndexHeader::Version2: - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); - if (Header.EntryCount > ExpectedEntryCount) - { - break; - } - size_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", - IndexPath, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); + return 0; + } - m_Configuration.PayloadAlignment = Header.PayloadAlignment; + auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); }); - std::vector<DiskIndexEntry> Entries; - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), - Header.EntryCount * sizeof(DiskIndexEntry), - sizeof(CacheBucketIndexHeader)); + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t FileSize = ObjectIndexFile.FileSize(); + if (FileSize < sizeof(CacheBucketIndexHeader)) + { + return 0; + } - m_Payloads.reserve(Header.EntryCount); - m_Index.reserve(Header.EntryCount); + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); - std::string InvalidEntryReason; - for (const DiskIndexEntry& Entry : Entries) - { - if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location}); - m_Index.insert_or_assign(Entry.Key, EntryIndex); - EntryCount++; - } - m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); - if (m_Configuration.EnableReferenceCaching) - { - m_FirstReferenceIndex.resize(m_Payloads.size()); - } - OutVersion = CacheBucketIndexHeader::Version2; - return Header.LogPosition; - } - break; - default: - break; - } - } + if (!Header.IsValid()) + { + return 0; + } + + if (Header.Version != CacheBucketIndexHeader::Version2) + { + return 0; + } + + const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + if (Header.EntryCount > ExpectedEntryCount) + { + return 0; + } + + InvalidGuard.Dismiss(); + + size_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + m_Configuration.PayloadAlignment = Header.PayloadAlignment; + + m_Payloads.reserve(Header.EntryCount); + m_Index.reserve(Header.EntryCount); + + BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024); + + uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader); + uint64_t RemainingEntryCount = Header.EntryCount; + + std::string InvalidEntryReason; + while (RemainingEntryCount--) + { + const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset); + CurrentReadOffset += sizeof(DiskIndexEntry); + + if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; } - ZEN_WARN("skipping invalid index file '{}'", IndexPath); + + const PayloadIndex EntryIndex = PayloadIndex(EntryCount); + m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location}); + m_Index.insert_or_assign(Entry->Key, EntryIndex); + + EntryCount++; } - return 0; + + ZEN_ASSERT(EntryCount == m_Payloads.size()); + + m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount())); + + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(EntryCount); + } + + OutVersion = CacheBucketIndexHeader::Version2; + return Header.LogPosition; } uint64_t @@ -555,61 +929,73 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui { ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog"); - if (std::filesystem::is_regular_file(LogPath)) + if (!std::filesystem::is_regular_file(LogPath)) { - uint64_t LogEntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - TCasLogFile<DiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - if (CasLog.Initialize()) - { - uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); - SkipEntryCount = 0; - } - LogEntryCount = EntryCount - SkipEntryCount; - uint64_t InvalidEntryCount = 0; - CasLog.Replay( - [&](const DiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Location.Flags & DiskLocation::kTombStone) - { - m_Index.erase(Record.Key); - return; - } - if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); - ++InvalidEntryCount; - return; - } - PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); - m_Index.insert_or_assign(Record.Key, EntryIndex); - }, - SkipEntryCount); - m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); - if (m_Configuration.EnableReferenceCaching) + return 0; + } + + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (!CasLog.Initialize()) + { + return 0; + } + + const uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + + LogEntryCount = EntryCount - SkipEntryCount; + uint64_t InvalidEntryCount = 0; + + CasLog.Replay( + [&](const DiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Location.Flags & DiskLocation::kTombStone) { - m_FirstReferenceIndex.resize(m_Payloads.size()); + // Note: this leaves m_Payloads and other arrays with 'holes' in them + m_Index.erase(Record.Key); + return; } - if (InvalidEntryCount) + + if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; } - return LogEntryCount; - } + PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); + m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); + m_Index.insert_or_assign(Record.Key, EntryIndex); + }, + SkipEntryCount); + + m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); + + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(m_Payloads.size()); } - return 0; + + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); + } + + return LogEntryCount; }; void -ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) +ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(const bool IsNew) { ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog"); @@ -661,15 +1047,14 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) } else if (fs::is_regular_file(LogPath)) { - ZEN_WARN("removing invalid cas log at '{}'", LogPath); + ZEN_WARN("removing invalid log at '{}'", LogPath); std::filesystem::remove(LogPath); } } m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - std::vector<BlockStoreLocation> KnownLocations; - KnownLocations.reserve(m_Index.size()); + BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_Index) { size_t EntryIndex = Entry.second; @@ -679,19 +1064,19 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); - continue; } - const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); - KnownLocations.push_back(BlockLocation); + else + { + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); + KnownBlocks.Add(BlockLocation.BlockIndex); + } } - - m_BlockStore.SyncExistingBlocksOnDisk(KnownLocations); + m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); if (IsNew || LogEntryCount > 0) { - MakeIndexSnapshot(); + WriteIndexSnapshot(); } - // TODO: should validate integrity of container files here } void @@ -981,20 +1366,7 @@ ZenCacheDiskLayer::CacheBucket::Flush() m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - std::vector<BucketMetaData> MetaDatas; - IndexMap Index; - - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - MakeIndexSnapshot(); - Index = m_Index; - Payloads = m_Payloads; - AccessTimes = m_AccessTimes; - MetaDatas = m_MetaDatas; - } - SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas)); + SaveSnapshot(); } catch (std::exception& Ex) { @@ -1003,113 +1375,85 @@ ZenCacheDiskLayer::CacheBucket::Flush() } void -ZenCacheDiskLayer::CacheBucket::SaveManifest(CbObject&& Manifest, const std::function<uint64_t()>& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest"); try { - IoBuffer Buffer = Manifest.GetBuffer().AsIoBuffer(); - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); - if (Error) - { - ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); - return; - } - bool EnoughSpace = Space.Free >= Buffer.GetSize() + 1024 * 512; - if (!EnoughSpace) - { - uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); - EnoughSpace = (Space.Free + ReclaimedSpace) >= Buffer.GetSize() + 1024 * 512; - } - if (!EnoughSpace) - { - ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", m_BucketDir, NiceBytes(Buffer.GetSize())); - return; - } - WriteFile(m_BucketDir / "zen_manifest", Buffer); - } - catch (std::exception& Err) - { - ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); - } -} + bool UseLegacyScheme = false; + uint64_t SidecarSize = 0; -CbObject -ZenCacheDiskLayer::CacheBucket::MakeManifest(IndexMap&& Index, - std::vector<AccessTime>&& AccessTimes, - const std::vector<BucketPayload>& Payloads, - const std::vector<BucketMetaData>& MetaDatas) -{ - using namespace std::literals; + IoBuffer Buffer; + BucketManifestSerializer ManifestWriter; - ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeManifest"); - - size_t ItemCount = Index.size(); - - // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth - // And we don't need to reallocate theunderying buffer in almost every case - const size_t EstimatedSizePerItem = 54u; - const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); - CbObjectWriter Writer(ReserveSize); + { + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + std::vector<BucketMetaData> MetaDatas; + IndexMap Index; - Writer << "BucketId"sv << m_BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + WriteIndexSnapshot(); + // Note: this copy could be eliminated on shutdown to + // reduce memory usage and execution time + Index = m_Index; + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + MetaDatas = m_MetaDatas; + } - if (!Index.empty()) - { - Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size())); - Writer.BeginArray("Keys"sv); - for (auto& Kv : Index) - { - const IoHash& Key = Kv.first; - Writer.AddHash(Key); - } - Writer.EndArray(); + if (UseLegacyScheme) + { + Buffer = ManifestWriter.MakeManifest(m_BucketId, std::move(Index), std::move(AccessTimes), Payloads, MetaDatas); + } + else + { + const uint64_t EntryCount = Index.size(); + Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); + SidecarSize = ManifestWriter.GetSidecarSize(); + } - Writer.BeginArray("Timestamps"sv); - for (auto& Kv : Index) - { - GcClock::Tick AccessTime = AccessTimes[Kv.second]; - Writer.AddInteger(AccessTime); - } - Writer.EndArray(); + const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512; - if (!MetaDatas.empty()) - { - Writer.BeginArray("RawHash"sv); - for (auto& Kv : Index) + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) { - const BucketPayload& Payload = Payloads[Kv.second]; - if (Payload.MetaData) - { - Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); - } - else - { - Writer.AddHash(IoHash::Zero); - } + ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); + return; + } + bool EnoughSpace = Space.Free >= RequiredSpace; + if (!EnoughSpace) + { + uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); + EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; + } + if (!EnoughSpace) + { + ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", + m_BucketDir, + NiceBytes(Buffer.GetSize())); + return; } - Writer.EndArray(); - Writer.BeginArray("RawSize"sv); - for (auto& Kv : Index) + if (!UseLegacyScheme) { - const BucketPayload& Payload = Payloads[Kv.second]; - if (Payload.MetaData) - { - Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); - } - else - { - Writer.AddInteger(0); - } + ManifestWriter.WriteSidecarFile(GetMetaPath(m_BucketDir, m_BucketName), + m_LogFlushPosition, + std::move(Index), + std::move(AccessTimes), + Payloads, + MetaDatas); } - Writer.EndArray(); } + + std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); + WriteFile(ManifestPath, Buffer); + } + catch (std::exception& Err) + { + ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); } - return Writer.Save(); } IoHash @@ -1683,20 +2027,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) try { - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - std::vector<BucketMetaData> MetaDatas; - IndexMap Index; - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - MakeIndexSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); - Index = m_Index; - Payloads = m_Payloads; - AccessTimes = m_AccessTimes; - MetaDatas = m_MetaDatas; - } - SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas), - [&]() { return GcCtx.ClaimGCReserve(); }); + SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); } catch (std::exception& Ex) { |