aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-11-27 14:32:19 +0100
committerGitHub <[email protected]>2023-11-27 14:32:19 +0100
commit4d95b578350ebfbbf6d54407c9403547b01cac4c (patch)
tree9f8df5d934a6a62fdcebeac94dffe52139d3ea6b /src/zenstore
parentgc stop command (#569) (diff)
downloadzen-4d95b578350ebfbbf6d54407c9403547b01cac4c.tar.xz
zen-4d95b578350ebfbbf6d54407c9403547b01cac4c.zip
optimized index snapshot reading/writing (#561)
the previous implementation of in-memory index snapshots serialise data to memory before writing to disk and vice versa when reading. This leads to some memory spikes which end up pushing useful data out of system cache and also cause stalls on I/O operations. this change moves more code to a streaming serialisation approach which scales better from a memory usage perspective and also performs much better
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/blockstore.cpp31
-rw-r--r--src/zenstore/caslog.cpp26
-rw-r--r--src/zenstore/compactcas.cpp53
-rw-r--r--src/zenstore/include/zenstore/blockstore.h11
-rw-r--r--src/zenstore/zenstore.cpp2
5 files changed, 88 insertions, 35 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index e4a66daf4..89774f26d 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -227,7 +227,17 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max
}
void
-BlockStore::SyncExistingBlocksOnDisk(const std::vector<BlockStoreLocation>& KnownLocations)
+BlockStore::BlockIndexSet::Add(uint32_t BlockIndex)
+{
+ if (!std::binary_search(begin(BlockIndexes), end(BlockIndexes), BlockIndex))
+ {
+ auto It = std::lower_bound(begin(BlockIndexes), end(BlockIndexes), BlockIndex);
+ BlockIndexes.insert(It, BlockIndex);
+ }
+}
+
+void
+BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations)
{
ZEN_TRACE_CPU("BlockStore::SyncExistingBlocksOnDisk");
@@ -240,14 +250,18 @@ BlockStore::SyncExistingBlocksOnDisk(const std::vector<BlockStoreLocation>& Know
{
DeleteBlocks.insert(It.first);
}
- for (const auto& Entry : KnownLocations)
+
+ for (const uint32_t BlockIndex : KnownLocations.GetBlockIndices())
{
- DeleteBlocks.erase(Entry.BlockIndex);
- if (auto It = m_ChunkBlocks.find(Entry.BlockIndex); It != m_ChunkBlocks.end() && !It->second.IsNull())
+ DeleteBlocks.erase(BlockIndex);
+ if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end() && !It->second.IsNull())
{
continue;
}
- MissingBlocks.insert(Entry.BlockIndex);
+ else
+ {
+ MissingBlocks.insert(BlockIndex);
+ }
}
for (std::uint32_t BlockIndex : MissingBlocks)
{
@@ -1473,7 +1487,12 @@ TEST_CASE("blockstore.clean.stray.blocks")
CHECK(!ThirdChunk);
// Recreate a fake block for a missing chunk location
- Store.SyncExistingBlocksOnDisk({FirstChunkLocation, SecondChunkLocation, ThirdChunkLocation});
+ BlockStore::BlockIndexSet KnownBlocks;
+ KnownBlocks.Add(FirstChunkLocation.BlockIndex);
+ KnownBlocks.Add(SecondChunkLocation.BlockIndex);
+ KnownBlocks.Add(ThirdChunkLocation.BlockIndex);
+ Store.SyncExistingBlocksOnDisk(KnownBlocks);
+
// We create a fake block for the location - we should still not be able to get the chunk
CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2);
ThirdChunk = Store.TryGetChunk(ThirdChunkLocation);
diff --git a/src/zenstore/caslog.cpp b/src/zenstore/caslog.cpp
index c04324fbc..cf3bd76da 100644
--- a/src/zenstore/caslog.cpp
+++ b/src/zenstore/caslog.cpp
@@ -188,20 +188,30 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntr
LogBaseOffset += SkipEntryCount * m_RecordSize;
LogEntryCount -= SkipEntryCount;
- // This should really be streaming the data rather than just
- // reading it into memory, though we don't tend to get very
- // large logs so it may not matter
+ const uint64_t LogDataSize = LogEntryCount * m_RecordSize;
+ uint64_t LogDataRemain = LogDataSize;
- const uint64_t LogDataSize = LogEntryCount * m_RecordSize;
+ const uint64_t MaxBufferSize = 1024 * 1024;
std::vector<uint8_t> ReadBuffer;
- ReadBuffer.resize(LogDataSize);
+ ReadBuffer.resize((Min(LogDataSize, MaxBufferSize) / m_RecordSize) * m_RecordSize);
- m_File.Read(ReadBuffer.data(), LogDataSize, LogBaseOffset);
+ uint64_t ReadOffset = 0;
- for (int i = 0; i < int(LogEntryCount); ++i)
+ while (LogDataRemain)
{
- Handler(ReadBuffer.data() + (i * m_RecordSize));
+ const uint64_t BytesToRead = Min(ReadBuffer.size(), LogDataRemain);
+ const uint64_t EntriesToRead = BytesToRead / m_RecordSize;
+
+ m_File.Read(ReadBuffer.data(), BytesToRead, LogBaseOffset + ReadOffset);
+
+ for (int i = 0; i < int(EntriesToRead); ++i)
+ {
+ Handler(ReadBuffer.data() + (i * m_RecordSize));
+ }
+
+ LogDataRemain -= BytesToRead;
+ ReadOffset += BytesToRead;
}
m_AppendOffset = LogBaseOffset + (m_RecordSize * LogEntryCount);
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index f28601771..95198fd59 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -945,10 +945,10 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint
{
ZEN_TRACE_CPU("CasContainer::ReadIndexFile");
- std::vector<CasDiskIndexEntry> Entries;
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
BasicFile ObjectIndexFile;
@@ -963,21 +963,38 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint
(Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
(Header.EntryCount <= ExpectedEntryCount))
{
- Entries.resize(Header.EntryCount);
- ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
m_PayloadAlignment = Header.PayloadAlignment;
- std::string InvalidEntryReason;
- for (const CasDiskIndexEntry& Entry : Entries)
+ m_Locations.reserve(ExpectedEntryCount);
+ m_LocationMap.reserve(ExpectedEntryCount);
+
+ std::vector<CasDiskIndexEntry> Entries;
+ Entries.resize(128 * 1024 / sizeof(CasDiskIndexEntry));
+
+ uint64_t RemainingEntries = Header.EntryCount;
+
+ do
{
- if (!ValidateEntry(Entry, InvalidEntryReason))
+ const uint64_t NumToRead = Min(RemainingEntries, Entries.size());
+ Entries.resize(NumToRead);
+
+ ObjectIndexFile.Read(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
+
+ std::string InvalidEntryReason;
+ for (const CasDiskIndexEntry& Entry : Entries)
{
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
- continue;
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ m_LocationMap[Entry.Key] = m_Locations.size();
+ m_Locations.push_back(Entry.Location);
+ ++EntryCount;
}
- m_LocationMap[Entry.Key] = m_Locations.size();
- m_Locations.push_back(Entry.Location);
- }
+
+ RemainingEntries -= NumToRead;
+ } while (RemainingEntries);
OutVersion = CasDiskIndexHeader::CurrentVersion;
return Header.LogPosition;
@@ -1097,16 +1114,16 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
- std::vector<BlockStoreLocation> KnownLocations;
- KnownLocations.reserve(m_LocationMap.size());
+ BlockStore::BlockIndexSet KnownBlocks;
+
for (const auto& Entry : m_LocationMap)
{
const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second];
BlockStoreLocation BlockLocation = DiskLocation.Get(m_PayloadAlignment);
- KnownLocations.emplace_back(std::move(BlockLocation));
+ KnownBlocks.Add(BlockLocation.BlockIndex);
}
- m_BlockStore.SyncExistingBlocksOnDisk(KnownLocations);
+ m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
if (IsNewStore || (LogEntryCount > 0))
{
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index b748fc8f6..82e1c71c6 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -134,9 +134,18 @@ public:
void Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount);
+ struct BlockIndexSet
+ {
+ void Add(uint32_t BlockIndex);
+ std::span<const uint32_t> GetBlockIndices() const { return BlockIndexes; }
+
+ private:
+ std::vector<uint32_t> BlockIndexes;
+ };
+
// Ask the store to create empty blocks for all locations that does not have a block
// Remove any block that is not referenced
- void SyncExistingBlocksOnDisk(const std::vector<BlockStoreLocation>& KnownLocations);
+ void SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations);
std::vector<uint32_t> GetBlocksToCompact(const std::unordered_map<uint32_t, uint64_t>& BlockUsage, uint32_t BlockUsageThresholdPercent);
void Close();
diff --git a/src/zenstore/zenstore.cpp b/src/zenstore/zenstore.cpp
index d87652fde..60dabe31f 100644
--- a/src/zenstore/zenstore.cpp
+++ b/src/zenstore/zenstore.cpp
@@ -7,7 +7,6 @@
# include <zenstore/blockstore.h>
# include <zenstore/gc.h>
# include <zenstore/hashkeyset.h>
-# include <zenutil/basicfile.h>
# include "cas.h"
# include "compactcas.h"
@@ -18,7 +17,6 @@ namespace zen {
void
zenstore_forcelinktests()
{
- basicfile_forcelink();
CAS_forcelink();
filecas_forcelink();
blockstore_forcelink();