aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-11-27 14:32:19 +0100
committerGitHub <[email protected]>2023-11-27 14:32:19 +0100
commit4d95b578350ebfbbf6d54407c9403547b01cac4c (patch)
tree9f8df5d934a6a62fdcebeac94dffe52139d3ea6b /src/zenstore/compactcas.cpp
parentgc stop command (#569) (diff)
downloadzen-4d95b578350ebfbbf6d54407c9403547b01cac4c.tar.xz
zen-4d95b578350ebfbbf6d54407c9403547b01cac4c.zip
optimized index snapshot reading/writing (#561)
the previous implementation of in-memory index snapshots serialise data to memory before writing to disk and vice versa when reading. This leads to some memory spikes which end up pushing useful data out of system cache and also cause stalls on I/O operations. this change moves more code to a streaming serialisation approach which scales better from a memory usage perspective and also performs much better
Diffstat (limited to 'src/zenstore/compactcas.cpp')
-rw-r--r--src/zenstore/compactcas.cpp53
1 files changed, 35 insertions, 18 deletions
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index f28601771..95198fd59 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -945,10 +945,10 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint
{
ZEN_TRACE_CPU("CasContainer::ReadIndexFile");
- std::vector<CasDiskIndexEntry> Entries;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
BasicFile ObjectIndexFile;
@@ -963,21 +963,38 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint
(Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
(Header.EntryCount <= ExpectedEntryCount))
{
- Entries.resize(Header.EntryCount);
- ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
m_PayloadAlignment = Header.PayloadAlignment;
- std::string InvalidEntryReason;
- for (const CasDiskIndexEntry& Entry : Entries)
+ m_Locations.reserve(ExpectedEntryCount);
+ m_LocationMap.reserve(ExpectedEntryCount);
+
+ std::vector<CasDiskIndexEntry> Entries;
+ Entries.resize(128 * 1024 / sizeof(CasDiskIndexEntry));
+
+ uint64_t RemainingEntries = Header.EntryCount;
+
+ do
{
- if (!ValidateEntry(Entry, InvalidEntryReason))
+ const uint64_t NumToRead = Min(RemainingEntries, Entries.size());
+ Entries.resize(NumToRead);
+
+ ObjectIndexFile.Read(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
+
+ std::string InvalidEntryReason;
+ for (const CasDiskIndexEntry& Entry : Entries)
{
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
- continue;
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ m_LocationMap[Entry.Key] = m_Locations.size();
+ m_Locations.push_back(Entry.Location);
+ ++EntryCount;
}
- m_LocationMap[Entry.Key] = m_Locations.size();
- m_Locations.push_back(Entry.Location);
- }
+
+ RemainingEntries -= NumToRead;
+ } while (RemainingEntries);
OutVersion = CasDiskIndexHeader::CurrentVersion;
return Header.LogPosition;
@@ -1097,16 +1114,16 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
- std::vector<BlockStoreLocation> KnownLocations;
- KnownLocations.reserve(m_LocationMap.size());
+ BlockStore::BlockIndexSet KnownBlocks;
+
for (const auto& Entry : m_LocationMap)
{
const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second];
BlockStoreLocation BlockLocation = DiskLocation.Get(m_PayloadAlignment);
- KnownLocations.emplace_back(std::move(BlockLocation));
+ KnownBlocks.Add(BlockLocation.BlockIndex);
}
- m_BlockStore.SyncExistingBlocksOnDisk(KnownLocations);
+ m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
if (IsNewStore || (LogEntryCount > 0))
{