From 4d95b578350ebfbbf6d54407c9403547b01cac4c Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 27 Nov 2023 14:32:19 +0100 Subject: optimized index snapshot reading/writing (#561) the previous implementation of in-memory index snapshots serialise data to memory before writing to disk and vice versa when reading. This leads to some memory spikes which end up pushing useful data out of system cache and also cause stalls on I/O operations. this change moves more code to a streaming serialisation approach which scales better from a memory usage perspective and also performs much better --- src/zenstore/compactcas.cpp | 53 ++++++++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 18 deletions(-) (limited to 'src/zenstore/compactcas.cpp') diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index f28601771..95198fd59 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -945,10 +945,10 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint { ZEN_TRACE_CPU("CasContainer::ReadIndexFile"); - std::vector Entries; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); BasicFile ObjectIndexFile; @@ -963,21 +963,38 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && (Header.EntryCount <= ExpectedEntryCount)) { - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); m_PayloadAlignment = Header.PayloadAlignment; - std::string InvalidEntryReason; - for (const CasDiskIndexEntry& Entry : Entries) + m_Locations.reserve(ExpectedEntryCount); + m_LocationMap.reserve(ExpectedEntryCount); + + std::vector Entries; + Entries.resize(128 * 1024 / sizeof(CasDiskIndexEntry)); + + uint64_t RemainingEntries = Header.EntryCount; + + do { - if (!ValidateEntry(Entry, InvalidEntryReason)) + const uint64_t NumToRead = Min(RemainingEntries, Entries.size()); + Entries.resize(NumToRead); + + ObjectIndexFile.Read(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); + + std::string InvalidEntryReason; + for (const CasDiskIndexEntry& Entry : Entries) { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_LocationMap[Entry.Key] = m_Locations.size(); + m_Locations.push_back(Entry.Location); + ++EntryCount; } - m_LocationMap[Entry.Key] = m_Locations.size(); - m_Locations.push_back(Entry.Location); - } + + RemainingEntries -= NumToRead; + } while (RemainingEntries); OutVersion = CasDiskIndexHeader::CurrentVersion; return Header.LogPosition; @@ -1097,16 +1114,16 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - std::vector KnownLocations; - KnownLocations.reserve(m_LocationMap.size()); + BlockStore::BlockIndexSet KnownBlocks; + for (const auto& Entry : m_LocationMap) { const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second]; BlockStoreLocation BlockLocation = DiskLocation.Get(m_PayloadAlignment); - KnownLocations.emplace_back(std::move(BlockLocation)); + KnownBlocks.Add(BlockLocation.BlockIndex); } - m_BlockStore.SyncExistingBlocksOnDisk(KnownLocations); + m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); if (IsNewStore || (LogEntryCount > 0)) { -- cgit v1.2.3