aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/blockstore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/blockstore.cpp')
-rw-r--r--src/zenstore/blockstore.cpp818
1 files changed, 124 insertions, 694 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 00a38c3b6..3974fb989 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -26,10 +26,20 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <random>
#endif
+#include <zencore/memory/llm.h>
+
//////////////////////////////////////////////////////////////////////////
namespace zen {
+const FLLMTag&
+GetBlocksTag()
+{
+ static FLLMTag _("blocks");
+
+ return _;
+}
+
//////////////////////////////////////////////////////////////////////////
BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath)
@@ -254,7 +264,6 @@ BlockStoreFile::GetMetaPath() const
////////////////////////////////////////////////////////
constexpr uint64_t DefaultIterateSmallChunkWindowSize = 2 * 1024 * 1024;
-constexpr uint64_t IterateSmallChunkMaxGapSize = 4 * 1024;
BlockStore::BlockStore()
{
@@ -267,6 +276,7 @@ BlockStore::~BlockStore()
void
BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount)
{
+ ZEN_MEMSCOPE(GetBlocksTag());
ZEN_TRACE_CPU("BlockStore::Initialize");
ZEN_ASSERT(MaxBlockSize > 0);
@@ -331,18 +341,9 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max
}
void
-BlockStore::BlockIndexSet::Add(uint32_t BlockIndex)
-{
- if (!std::binary_search(begin(BlockIndexes), end(BlockIndexes), BlockIndex))
- {
- auto It = std::lower_bound(begin(BlockIndexes), end(BlockIndexes), BlockIndex);
- BlockIndexes.insert(It, BlockIndex);
- }
-}
-
-void
-BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations)
+BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks)
{
+ ZEN_MEMSCOPE(GetBlocksTag());
ZEN_TRACE_CPU("BlockStore::SyncExistingBlocksOnDisk");
RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
@@ -355,7 +356,7 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations)
DeleteBlocks.insert(It.first);
}
- for (const uint32_t BlockIndex : KnownLocations.GetBlockIndices())
+ for (const uint32_t BlockIndex : KnownBlocks)
{
DeleteBlocks.erase(BlockIndex);
if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end() && !It->second.IsNull())
@@ -389,10 +390,16 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations)
BlockStore::BlockEntryCountMap
BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent)
{
+ ZEN_MEMSCOPE(GetBlocksTag());
ZEN_TRACE_CPU("BlockStoreFile::GetBlocksToCompact");
BlockEntryCountMap Result;
{
RwLock::SharedLockScope InsertLock(m_InsertLock);
+
+ const uint64_t SmallBlockLimit = m_MaxBlockSize / 2;
+
+ std::vector<uint32_t> SmallBlockIndexes;
+
for (const auto& It : m_ChunkBlocks)
{
uint32_t BlockIndex = It.first;
@@ -413,35 +420,53 @@ BlockStore::GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUs
UsedCount = UsageIt->second.EntryCount;
}
- uint64_t BlockSize = It.second ? It.second->FileSize() : 0u;
- if (BlockSize == 0)
+ uint64_t PhysicalSize = It.second ? It.second->FileSize() : 0u;
+ if (PhysicalSize == 0)
{
Result.insert_or_assign(BlockIndex, UsedCount);
continue;
}
+ bool IsBelowUnusedLimit = false;
+
if (BlockUsageThresholdPercent == 100)
{
- if (UsedSize < BlockSize)
+ if (UsedSize < PhysicalSize)
{
- Result.insert_or_assign(BlockIndex, UsedCount);
+ IsBelowUnusedLimit = true;
}
}
else if (BlockUsageThresholdPercent == 0)
{
if (UsedSize == 0)
{
- Result.insert_or_assign(BlockIndex, UsedCount);
+ IsBelowUnusedLimit = true;
}
}
else
{
- const uint32_t UsedPercent = UsedSize < BlockSize ? gsl::narrow<uint32_t>((100 * UsedSize) / BlockSize) : 100u;
+ const uint32_t UsedPercent = UsedSize < PhysicalSize ? gsl::narrow<uint32_t>((100 * UsedSize) / PhysicalSize) : 100u;
if (UsedPercent < BlockUsageThresholdPercent)
{
- Result.insert_or_assign(BlockIndex, UsedCount);
+ IsBelowUnusedLimit = true;
}
}
+
+ if (IsBelowUnusedLimit)
+ {
+ Result.insert_or_assign(BlockIndex, UsedCount);
+ }
+ else if (PhysicalSize < SmallBlockLimit)
+ {
+ Result.insert_or_assign(BlockIndex, UsedCount);
+ SmallBlockIndexes.push_back(BlockIndex);
+ }
+ }
+
+ // If we only find one small block to compact, let it be.
+ if (SmallBlockIndexes.size() == 1 && Result.size() == 1)
+ {
+ Result.erase(SmallBlockIndexes[0]);
}
}
return Result;
@@ -495,6 +520,7 @@ BlockStore::GetFreeBlockIndex(uint32_t ProbeIndex, RwLock::ExclusiveLockScope&,
void
BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, const WriteChunkCallback& Callback)
{
+ ZEN_MEMSCOPE(GetBlocksTag());
ZEN_TRACE_CPU("BlockStore::WriteChunk");
ZEN_ASSERT(Data != nullptr);
@@ -539,21 +565,21 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons
Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
m_ActiveWriteBlocks.push_back(WriteBlockIndex);
InsertLock.ReleaseNow();
+ auto _ = MakeGuard([this, WriteBlockIndex]() {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
+ });
WriteBlock->Write(Data, ChunkSize, AlignedInsertOffset);
m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed);
Callback({.BlockIndex = WriteBlockIndex, .Offset = AlignedInsertOffset, .Size = ChunkSize});
-
- {
- RwLock::ExclusiveLockScope _(m_InsertLock);
- m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
- }
}
void
BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback)
{
+ ZEN_MEMSCOPE(GetBlocksTag());
ZEN_TRACE_CPU("BlockStore::WriteChunks");
ZEN_ASSERT(!Datas.empty());
@@ -615,6 +641,10 @@ BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const Wri
Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
m_ActiveWriteBlocks.push_back(WriteBlockIndex);
InsertLock.ReleaseNow();
+ auto _ = MakeGuard([this, WriteBlockIndex]() {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
+ });
{
MutableMemoryView WriteBuffer(Buffer.data(), RangeSize);
@@ -639,35 +669,10 @@ BlockStore::WriteChunks(std::span<IoBuffer> Datas, uint32_t Alignment, const Wri
}
Callback(Locations);
- {
- RwLock::ExclusiveLockScope _(m_InsertLock);
- m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
- }
-
Offset += Count;
}
}
-BlockStore::ReclaimSnapshotState
-BlockStore::GetReclaimSnapshotState()
-{
- ReclaimSnapshotState State;
- RwLock::SharedLockScope _(m_InsertLock);
- for (uint32_t BlockIndex : m_ActiveWriteBlocks)
- {
- State.m_ActiveWriteBlocks.insert(BlockIndex);
- }
- if (m_WriteBlock)
- {
- State.m_ActiveWriteBlocks.insert(m_WriteBlockIndex);
- }
- for (auto It : m_ChunkBlocks)
- {
- State.m_BlockIndexes.insert(It.first);
- }
- return State;
-}
-
IoBuffer
BlockStore::TryGetChunk(const BlockStoreLocation& Location) const
{
@@ -690,6 +695,7 @@ BlockStore::TryGetChunk(const BlockStoreLocation& Location) const
void
BlockStore::Flush(bool ForceNewBlock)
{
+ ZEN_MEMSCOPE(GetBlocksTag());
ZEN_TRACE_CPU("BlockStore::Flush");
if (ForceNewBlock)
@@ -713,429 +719,6 @@ BlockStore::Flush(bool ForceNewBlock)
}
}
-void
-BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
- const std::vector<BlockStoreLocation>& ChunkLocations,
- const ChunkIndexArray& KeepChunkIndexes,
- uint32_t PayloadAlignment,
- bool DryRun,
- const ReclaimCallback& ChangeCallback,
- const ClaimDiskReserveCallback& DiskReserveCallback)
-{
- ZEN_TRACE_CPU("BlockStore::ReclaimSpace");
-
- uint64_t WriteBlockTimeUs = 0;
- uint64_t WriteBlockLongestTimeUs = 0;
- uint64_t ReadBlockTimeUs = 0;
- uint64_t ReadBlockLongestTimeUs = 0;
- uint64_t TotalChunkCount = ChunkLocations.size();
- uint64_t DeletedSize = 0;
- uint64_t OldTotalSize = 0;
- uint64_t NewTotalSize = 0;
-
- uint64_t MovedCount = 0;
- uint64_t DeletedCount = 0;
-
- Stopwatch TotalTimer;
- const auto _ = MakeGuard([&] {
- ZEN_DEBUG(
- "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved "
- "{} "
- "of {} "
- "chunks ({}).",
- m_BlocksBasePath,
- NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
- NiceLatencyNs(WriteBlockTimeUs),
- NiceLatencyNs(WriteBlockLongestTimeUs),
- NiceLatencyNs(ReadBlockTimeUs),
- NiceLatencyNs(ReadBlockLongestTimeUs),
- NiceBytes(DeletedSize),
- DeletedCount,
- MovedCount,
- TotalChunkCount,
- NiceBytes(OldTotalSize));
- });
-
- size_t BlockCount = Snapshot.m_BlockIndexes.size();
- if (BlockCount == 0)
- {
- ZEN_DEBUG("garbage collect for '{}' SKIPPED, no blocks to process", m_BlocksBasePath);
- return;
- }
-
- tsl::robin_set<size_t> KeepChunkMap;
- KeepChunkMap.reserve(KeepChunkIndexes.size());
- for (size_t KeepChunkIndex : KeepChunkIndexes)
- {
- KeepChunkMap.insert(KeepChunkIndex);
- }
-
- tsl::robin_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
- std::vector<ChunkIndexArray> BlockKeepChunks;
- std::vector<ChunkIndexArray> BlockDeleteChunks;
-
- BlockIndexToChunkMapIndex.reserve(BlockCount);
- BlockKeepChunks.reserve(BlockCount);
- BlockDeleteChunks.reserve(BlockCount);
- size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
-
- size_t DeleteCount = 0;
- for (size_t Index = 0; Index < TotalChunkCount; ++Index)
- {
- const BlockStoreLocation& Location = ChunkLocations[Index];
- if (!Snapshot.m_BlockIndexes.contains(Location.BlockIndex))
- {
- // We did not know about the block when we took the snapshot, don't touch it
- continue;
- }
- OldTotalSize += Location.Size;
- auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex);
- size_t ChunkMapIndex = 0;
- if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
- {
- ChunkMapIndex = BlockKeepChunks.size();
- BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex;
- BlockKeepChunks.resize(ChunkMapIndex + 1);
- BlockKeepChunks.back().reserve(GuesstimateCountPerBlock);
- BlockDeleteChunks.resize(ChunkMapIndex + 1);
- BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock);
- }
- else
- {
- ChunkMapIndex = BlockIndexPtr->second;
- }
-
- if (KeepChunkMap.contains(Index))
- {
- ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex];
- IndexMap.push_back(Index);
- NewTotalSize += Location.Size;
- continue;
- }
- ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex];
- IndexMap.push_back(Index);
- DeleteCount++;
- }
-
- std::vector<uint32_t> BlocksToReWrite;
- BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
- for (const auto& Entry : BlockIndexToChunkMapIndex)
- {
- uint32_t BlockIndex = Entry.first;
- size_t ChunkMapIndex = Entry.second;
- const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex];
- if (ChunkMap.empty())
- {
- continue;
- }
- BlocksToReWrite.push_back(BlockIndex);
- }
-
- {
- // Any known block not referenced should be added as well
- RwLock::SharedLockScope __(m_InsertLock);
- for (std::uint32_t BlockIndex : Snapshot.m_BlockIndexes)
- {
- if (!m_ChunkBlocks.contains(BlockIndex))
- {
- continue;
- }
- bool WasActiveWriteBlock = Snapshot.m_ActiveWriteBlocks.contains(BlockIndex);
- if (WasActiveWriteBlock)
- {
- continue;
- }
- if (BlockIndexToChunkMapIndex.contains(BlockIndex))
- {
- continue;
- }
- size_t ChunkMapIndex = ChunkMapIndex = BlockKeepChunks.size();
- BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex;
- BlockKeepChunks.resize(ChunkMapIndex + 1);
- BlockDeleteChunks.resize(ChunkMapIndex + 1);
- BlocksToReWrite.push_back(BlockIndex);
- }
- }
-
- if (DryRun)
- {
- ZEN_DEBUG("garbage collect for '{}' DISABLED, found {} {} chunks of total {} {}",
- m_BlocksBasePath,
- DeleteCount,
- NiceBytes(OldTotalSize - NewTotalSize),
- TotalChunkCount,
- OldTotalSize);
- return;
- }
-
- try
- {
- ZEN_TRACE_CPU("BlockStore::ReclaimSpace::Compact");
- Ref<BlockStoreFile> NewBlockFile;
- auto NewBlockFileGuard = MakeGuard([&]() {
- if (NewBlockFile && NewBlockFile->IsOpen())
- {
- ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath());
- m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed);
- ZEN_ASSERT_SLOW(NewBlockFile->MetaSize() == 0);
- NewBlockFile->MarkAsDeleteOnClose();
- }
- });
-
- uint64_t WriteOffset = 0;
- uint32_t NewBlockIndex = 0;
- for (uint32_t BlockIndex : BlocksToReWrite)
- {
- bool IsActiveWriteBlock = Snapshot.m_ActiveWriteBlocks.contains(BlockIndex);
-
- const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
-
- Ref<BlockStoreFile> OldBlockFile;
- if (!IsActiveWriteBlock)
- {
- RwLock::SharedLockScope _i(m_InsertLock);
- Stopwatch Timer;
- const auto __ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end())
- {
- OldBlockFile = It->second;
- }
- }
-
- ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex];
- if (KeepMap.empty())
- {
- ZEN_TRACE_CPU("BlockStore::ReclaimSpace::DeleteBlock");
-
- const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
- for (size_t DeleteIndex : DeleteMap)
- {
- DeletedSize += ChunkLocations[DeleteIndex].Size;
- }
- ChangeCallback({}, DeleteMap);
- DeletedCount += DeleteMap.size();
- if (OldBlockFile)
- {
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- Stopwatch Timer;
- const auto __ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
- ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile);
- m_ChunkBlocks.erase(BlockIndex);
- m_TotalSize.fetch_sub(OldBlockFile->TotalSize(), std::memory_order::relaxed);
- OldBlockFile->MarkAsDeleteOnClose();
- }
- continue;
- }
- else if (!OldBlockFile && !IsActiveWriteBlock)
- {
- // If the block file pointed to does not exist, move any keep chunk them to deleted list
- ZEN_ERROR("Expected to find block {} in {} - this should never happen, marking {} entries as deleted.",
- BlockIndex,
- m_BlocksBasePath,
- KeepMap.size());
-
- BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(), KeepMap.begin(), KeepMap.end());
- KeepMap.clear();
- }
- else if (OldBlockFile && (OldBlockFile->FileSize() == 0))
- {
- // Block created to accommodate missing blocks
- ZEN_WARN("Missing block {} in {} - backing data for locations is missing, marking {} entries as deleted.",
- BlockIndex,
- m_BlocksBasePath,
- KeepMap.size());
-
- BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(), KeepMap.begin(), KeepMap.end());
- KeepMap.clear();
- }
-
- MovedChunksArray MovedChunks;
- if (OldBlockFile)
- {
- ZEN_TRACE_CPU("BlockStore::ReclaimSpace::MoveBlock");
-
- ZEN_INFO("Moving {} chunks from '{}' to new block", KeepMap.size(), GetBlockPath(m_BlocksBasePath, BlockIndex));
-
- uint64_t OldBlockSize = OldBlockFile->FileSize();
- std::vector<uint8_t> Chunk;
- for (const size_t& ChunkIndex : KeepMap)
- {
- const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
- if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize)
- {
- ZEN_WARN(
- "ReclaimSpace skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block "
- "size {}",
- m_BlocksBasePath,
- ChunkLocation.Offset,
- ChunkLocation.Size,
- OldBlockFile->GetPath(),
- OldBlockSize);
- continue;
- }
- Chunk.resize(ChunkLocation.Size);
- OldBlockFile->Read(Chunk.data(), ChunkLocation.Size, ChunkLocation.Offset);
-
- if (!NewBlockFile || (WriteOffset + ChunkLocation.Size > m_MaxBlockSize))
- {
- uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
-
- if (NewBlockFile)
- {
- ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
- NewBlockFile->Flush();
- NewBlockFile = nullptr;
- }
- {
- ChangeCallback(MovedChunks, {});
- MovedCount += KeepMap.size();
- MovedChunks.clear();
- RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
-
- std::filesystem::path NewBlockPath;
- NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath);
- if (NextBlockIndex == (uint32_t)m_MaxBlockCount)
- {
- ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
- m_BlocksBasePath,
- static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
- return;
- }
-
- NewBlockFile = new BlockStoreFile(NewBlockPath);
- m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
- }
-
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
- if (Error)
- {
- ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
- return;
- }
- if (Space.Free < m_MaxBlockSize)
- {
- uint64_t ReclaimedSpace = DiskReserveCallback();
- if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
- {
- ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
- m_BlocksBasePath,
- m_MaxBlockSize,
- NiceBytes(Space.Free + ReclaimedSpace));
- RwLock::ExclusiveLockScope _l(m_InsertLock);
- Stopwatch Timer;
- const auto __ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
- ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
- ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen());
- m_ChunkBlocks.erase(NextBlockIndex);
- return;
- }
-
- ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
- m_BlocksBasePath,
- ReclaimedSpace,
- NiceBytes(Space.Free + ReclaimedSpace));
- }
- NewBlockFile->Create(m_MaxBlockSize);
- NewBlockIndex = NextBlockIndex;
- WriteOffset = 0;
- }
-
- NewBlockFile->Write(Chunk.data(), ChunkLocation.Size, WriteOffset);
- MovedChunks.push_back(
- {ChunkIndex,
- {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}});
- uint64_t OldOffset = WriteOffset;
- WriteOffset = RoundUp(WriteOffset + ChunkLocation.Size, PayloadAlignment);
- m_TotalSize.fetch_add(WriteOffset - OldOffset, std::memory_order::relaxed);
- }
- Chunk.clear();
- if (NewBlockFile)
- {
- ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
- NewBlockFile->Flush();
- }
- }
-
- const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
- for (size_t DeleteIndex : DeleteMap)
- {
- DeletedSize += ChunkLocations[DeleteIndex].Size;
- }
-
- ChangeCallback(MovedChunks, DeleteMap);
- MovedCount += MovedChunks.size();
- DeletedCount += DeleteMap.size();
- MovedChunks.clear();
-
- if (OldBlockFile)
- {
- RwLock::ExclusiveLockScope __(m_InsertLock);
- Stopwatch Timer;
- const auto ___ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
- });
-
- ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
- ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile);
- m_ChunkBlocks.erase(BlockIndex);
- m_TotalSize.fetch_sub(OldBlockFile->TotalSize(), std::memory_order::relaxed);
- OldBlockFile->MarkAsDeleteOnClose();
- }
- }
- if (NewBlockFile)
- {
- ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
- NewBlockFile->Flush();
- NewBlockFile = nullptr;
- }
- }
- catch (const std::system_error& SystemError)
- {
- if (IsOOM(SystemError.code()))
- {
- ZEN_WARN("reclaiming space for '{}' ran out of memory: '{}'", m_BlocksBasePath, SystemError.what());
- }
- else if (IsOOD(SystemError.code()))
- {
- ZEN_WARN("reclaiming space for '{}' ran out of disk space: '{}'", m_BlocksBasePath, SystemError.what());
- }
- else
- {
- ZEN_ERROR("reclaiming space for '{}' failed with system error exception: '{}'", m_BlocksBasePath, SystemError.what());
- }
- }
- catch (const std::bad_alloc& BadAlloc)
- {
- ZEN_WARN("reclaiming space for '{}' ran out of memory: '{}'", m_BlocksBasePath, BadAlloc.what());
- }
- catch (const std::exception& ex)
- {
- ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what());
- }
-}
-
bool
BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
std::span<const size_t> InChunkIndexes,
@@ -1143,6 +726,7 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
const IterateChunksLargeSizeCallback& LargeSizeCallback,
uint64_t LargeSizeLimit)
{
+ ZEN_MEMSCOPE(GetBlocksTag());
ZEN_TRACE_CPU("BlockStore::IterateBlock");
if (InChunkIndexes.empty())
@@ -1150,26 +734,27 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
return true;
}
- uint64_t IterateSmallChunkWindowSize = Max(DefaultIterateSmallChunkWindowSize, LargeSizeLimit);
- if (LargeSizeLimit == 0u)
+ if (LargeSizeLimit == 0)
{
- LargeSizeLimit = IterateSmallChunkWindowSize;
- }
- else
- {
- IterateSmallChunkWindowSize =
- Min((LargeSizeLimit + IterateSmallChunkMaxGapSize) * ChunkLocations.size(), IterateSmallChunkWindowSize);
+ LargeSizeLimit = DefaultIterateSmallChunkWindowSize;
}
+ uint64_t IterateSmallChunkWindowSize = Max(DefaultIterateSmallChunkWindowSize, LargeSizeLimit);
+
+ const uint64_t IterateSmallChunkMaxGapSize = Max(2048u, IterateSmallChunkWindowSize / 512u);
+
+ IterateSmallChunkWindowSize = Min((LargeSizeLimit + IterateSmallChunkMaxGapSize) * ChunkLocations.size(), IterateSmallChunkWindowSize);
+
uint32_t BlockIndex = ChunkLocations[InChunkIndexes[0]].BlockIndex;
std::vector<size_t> ChunkIndexes(InChunkIndexes.begin(), InChunkIndexes.end());
std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool {
return ChunkLocations[IndexA].Offset < ChunkLocations[IndexB].Offset;
});
- auto GetNextRange = [LargeSizeLimit, IterateSmallChunkWindowSize, &ChunkLocations](uint64_t BlockFileSize,
- std::span<const size_t> ChunkIndexes,
- size_t StartIndexOffset) -> size_t {
+ auto GetNextRange = [LargeSizeLimit,
+ IterateSmallChunkWindowSize,
+ IterateSmallChunkMaxGapSize,
+ &ChunkLocations](uint64_t BlockFileSize, std::span<const size_t> ChunkIndexes, size_t StartIndexOffset) -> size_t {
size_t ChunkCount = 0;
size_t StartIndex = ChunkIndexes[StartIndexOffset];
const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex];
@@ -1224,8 +809,8 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
ZEN_ASSERT(BlockFile);
InsertLock.ReleaseNow();
- IoBuffer ReadBuffer{IterateSmallChunkWindowSize};
- void* BufferBase = ReadBuffer.MutableData();
+ IoBuffer ReadBuffer;
+ void* BufferBase = nullptr;
size_t LocationIndexOffset = 0;
while (LocationIndexOffset < ChunkIndexes.size())
@@ -1235,11 +820,16 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
const size_t BlockSize = BlockFile->FileSize();
const size_t RangeCount = GetNextRange(BlockSize, ChunkIndexes, LocationIndexOffset);
- if (RangeCount > 0)
+ if (RangeCount > 1)
{
size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1];
const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex];
uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset;
+ if (ReadBuffer.GetSize() < Size)
+ {
+ ReadBuffer = IoBuffer(Min(Size * 2, IterateSmallChunkWindowSize));
+ BufferBase = ReadBuffer.MutableData();
+ }
BlockFile->Read(BufferBase, Size, FirstLocation.Offset);
for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex)
{
@@ -1293,6 +883,7 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
bool
BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocations, const IterateChunksCallback& Callback)
{
+ ZEN_MEMSCOPE(GetBlocksTag());
ZEN_TRACE_CPU("BlockStore::IterateChunks");
Stopwatch Timer;
@@ -1302,31 +893,39 @@ BlockStore::IterateChunks(const std::span<const BlockStoreLocation>& ChunkLocati
ZEN_LOG_SCOPE("iterating chunks from '{}'", m_BlocksBasePath);
- tsl::robin_map<uint32_t, size_t> BlockIndexToBlockChunks;
- std::vector<std::vector<size_t>> BlocksChunks;
-
+ std::vector<size_t> ChunkOrder(ChunkLocations.size());
for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex)
{
- const BlockStoreLocation& Location = ChunkLocations[ChunkIndex];
- if (auto It = BlockIndexToBlockChunks.find(Location.BlockIndex); It != BlockIndexToBlockChunks.end())
- {
- BlocksChunks[It->second].push_back(ChunkIndex);
- }
- else
+ ChunkOrder[ChunkIndex] = ChunkIndex;
+ }
+
+ std::sort(ChunkOrder.begin(), ChunkOrder.end(), [&ChunkLocations](const size_t Lhs, const size_t Rhs) {
+ return ChunkLocations[Lhs].BlockIndex < ChunkLocations[Rhs].BlockIndex;
+ });
+ size_t RangeStart = 0;
+ size_t RangeEnd = 0;
+ const std::span<size_t> ChunkIndexRange(ChunkOrder);
+ while (RangeStart < ChunkOrder.size())
+ {
+ const size_t ChunkIndex = ChunkOrder[RangeStart];
+ const uint32_t BlockIndex = ChunkLocations[ChunkIndex].BlockIndex;
+ RangeEnd++;
+ while (RangeEnd < ChunkOrder.size())
{
- BlockIndexToBlockChunks.insert(std::make_pair(Location.BlockIndex, BlocksChunks.size()));
- BlocksChunks.push_back(std::vector<size_t>({ChunkIndex}));
+ const size_t NextChunkIndex = ChunkOrder[RangeEnd];
+ if (ChunkLocations[NextChunkIndex].BlockIndex != BlockIndex)
+ {
+ break;
+ }
+ ++RangeEnd;
}
- }
- for (auto& BlockChunks : BlocksChunks)
- {
- ZEN_ASSERT(!BlockChunks.empty());
- uint32_t BlockIndex = ChunkLocations[BlockChunks[0]].BlockIndex;
- if (!Callback(BlockIndex, BlockChunks))
+ if (!Callback(BlockIndex, ChunkIndexRange.subspan(RangeStart, RangeEnd - RangeStart)))
{
return false;
}
+
+ RangeStart = RangeEnd;
}
return true;
}
@@ -1338,6 +937,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
const ClaimDiskReserveCallback& DiskReserveCallback,
std::string_view LogPrefix)
{
+ ZEN_MEMSCOPE(GetBlocksTag());
ZEN_TRACE_CPU("BlockStore::CompactBlocks");
uint64_t DeletedSize = 0;
@@ -1581,7 +1181,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
KeepChunkIndexes.size(),
NiceBytes(MovedFromBlock),
GetBlockPath(m_BlocksBasePath, BlockIndex).filename(),
- NiceBytes(OldBlockSize - MovedFromBlock));
+ OldBlockSize > MovedFromBlock ? NiceBytes(OldBlockSize - MovedFromBlock) : 0);
}
if (TargetFileBuffer)
{
@@ -1934,53 +1534,6 @@ TEST_CASE("blockstore.chunks")
CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData);
}
-TEST_CASE("blockstore.clean.stray.blocks")
-{
- using namespace blockstore::impl;
-
- ScopedTemporaryDirectory TempDir;
- auto RootDirectory = TempDir.Path();
-
- BlockStore Store;
- Store.Initialize(RootDirectory / "store", 128, 1024);
-
- std::string FirstChunkData = "This is the data of the first chunk that we will write";
- BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
- std::string SecondChunkData = "This is the data for the second chunk that we will write";
- BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
- std::string ThirdChunkData =
- "This is a much longer string that will not fit in the first block so it should be placed in the second block";
- BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4);
-
- Store.Close();
-
- Store.Initialize(RootDirectory / "store", 128, 1024);
- CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2);
- IoBuffer ThirdChunk = Store.TryGetChunk(ThirdChunkLocation);
- CHECK(ThirdChunk);
-
- // Reclaim space should delete unreferenced block
- Store.ReclaimSpace(Store.GetReclaimSnapshotState(), {FirstChunkLocation, SecondChunkLocation}, {0, 1}, 4, false);
- // Block lives on as long as we reference it via ThirdChunk
- CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2);
- ThirdChunk = {};
- CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1);
- ThirdChunk = Store.TryGetChunk(ThirdChunkLocation);
- CHECK(!ThirdChunk);
-
- // Recreate a fake block for a missing chunk location
- BlockStore::BlockIndexSet KnownBlocks;
- KnownBlocks.Add(FirstChunkLocation.BlockIndex);
- KnownBlocks.Add(SecondChunkLocation.BlockIndex);
- KnownBlocks.Add(ThirdChunkLocation.BlockIndex);
- Store.SyncExistingBlocksOnDisk(KnownBlocks);
-
- // We create a fake block for the location - we should still not be able to get the chunk
- CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2);
- ThirdChunk = Store.TryGetChunk(ThirdChunkLocation);
- CHECK(!ThirdChunk);
-}
-
TEST_CASE("blockstore.flush.force.new.block")
{
using namespace blockstore::impl;
@@ -2113,7 +1666,8 @@ TEST_CASE("blockstore.iterate.chunks")
break;
}
return true;
- });
+ },
+ 0);
CHECK(Continue);
});
return true;
@@ -2122,125 +1676,6 @@ TEST_CASE("blockstore.iterate.chunks")
WorkLatch.Wait();
}
-TEST_CASE("blockstore.reclaim.space")
-{
- using namespace blockstore::impl;
-
- ScopedTemporaryDirectory TempDir;
- auto RootDirectory = TempDir.Path();
-
- BlockStore Store;
- Store.Initialize(RootDirectory / "store", 512, 1024);
-
- constexpr size_t ChunkCount = 200;
- constexpr size_t Alignment = 8;
- std::vector<BlockStoreLocation> ChunkLocations;
- std::vector<IoHash> ChunkHashes;
- ChunkLocations.reserve(ChunkCount);
- ChunkHashes.reserve(ChunkCount);
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- IoBuffer Chunk = CreateRandomBlob(57 + ChunkIndex);
-
- Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); });
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- std::vector<size_t> ChunksToKeep;
- ChunksToKeep.reserve(ChunkLocations.size());
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- ChunksToKeep.push_back(ChunkIndex);
- }
-
- Store.Flush(/*ForceNewBlock*/ false);
- BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState();
- Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true);
-
- // If we keep all the chunks we should not get any callbacks on moved/deleted stuff
- Store.ReclaimSpace(
- State1,
- ChunkLocations,
- ChunksToKeep,
- Alignment,
- false,
- [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); },
- []() {
- CHECK(false);
- return 0;
- });
-
- size_t DeleteChunkCount = 38;
- ChunksToKeep.clear();
- for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- ChunksToKeep.push_back(ChunkIndex);
- }
-
- std::vector<BlockStoreLocation> NewChunkLocations = ChunkLocations;
- size_t MovedChunkCount = 0;
- size_t DeletedChunkCount = 0;
- Store.ReclaimSpace(
- State1,
- ChunkLocations,
- ChunksToKeep,
- Alignment,
- false,
- [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) {
- for (const auto& MovedChunk : MovedChunks)
- {
- CHECK(MovedChunk.first >= DeleteChunkCount);
- NewChunkLocations[MovedChunk.first] = MovedChunk.second;
- }
- MovedChunkCount += MovedChunks.size();
- for (size_t DeletedIndex : DeletedChunks)
- {
- CHECK(DeletedIndex < DeleteChunkCount);
- }
- DeletedChunkCount += DeletedChunks.size();
- },
- []() {
- CHECK(false);
- return 0;
- });
- CHECK(MovedChunkCount <= DeleteChunkCount);
- CHECK(DeletedChunkCount == DeleteChunkCount);
- ChunkLocations = std::vector<BlockStoreLocation>(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end());
-
- for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
- {
- IoBuffer ChunkBlock = Store.TryGetChunk(NewChunkLocations[ChunkIndex]);
- if (ChunkIndex >= DeleteChunkCount)
- {
- IoBuffer VerifyChunk = Store.TryGetChunk(NewChunkLocations[ChunkIndex]);
- CHECK(VerifyChunk);
- IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
- CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
- }
- }
-
- // We need to take a new state since reclaim space add new block when compacting
- BlockStore::ReclaimSnapshotState State2 = Store.GetReclaimSnapshotState();
- NewChunkLocations = ChunkLocations;
- MovedChunkCount = 0;
- DeletedChunkCount = 0;
- Store.ReclaimSpace(
- State2,
- ChunkLocations,
- {},
- Alignment,
- false,
- [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) {
- CHECK(MovedChunks.empty());
- DeletedChunkCount += DeletedChunks.size();
- },
- []() {
- CHECK(false);
- return 0;
- });
- CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount);
-}
-
TEST_CASE("blockstore.thread.read.write")
{
using namespace blockstore::impl;
@@ -2400,12 +1835,11 @@ TEST_CASE("blockstore.compact.blocks")
}
SUBCASE("keep current write block")
{
- uint64_t PreSize = Store.TotalSize();
- BlockStoreCompactState State;
- BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState();
+ uint64_t PreSize = Store.TotalSize();
+ BlockStoreCompactState State;
for (const BlockStoreLocation& Location : ChunkLocations)
{
- if (SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex))
+ if (Store.IsWriting(Location.BlockIndex))
{
continue;
}
@@ -2429,9 +1863,8 @@ TEST_CASE("blockstore.compact.blocks")
{
Store.Flush(true);
- uint64_t PreSize = Store.TotalSize();
- BlockStoreCompactState State;
- BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState();
+ uint64_t PreSize = Store.TotalSize();
+ BlockStoreCompactState State;
for (const BlockStoreLocation& Location : ChunkLocations)
{
State.AddKeepLocation(Location);
@@ -2451,11 +1884,10 @@ TEST_CASE("blockstore.compact.blocks")
}
SUBCASE("drop first block")
{
- uint64_t PreSize = Store.TotalSize();
- BlockStoreCompactState State;
- BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState();
+ uint64_t PreSize = Store.TotalSize();
+ BlockStoreCompactState State;
- CHECK(!SnapshotState.m_ActiveWriteBlocks.contains(0));
+ CHECK(!Store.IsWriting(0));
State.IncludeBlock(0);
uint64_t FirstBlockSize = 0;
@@ -2485,11 +1917,10 @@ TEST_CASE("blockstore.compact.blocks")
}
SUBCASE("compact first block")
{
- uint64_t PreSize = Store.TotalSize();
- BlockStoreCompactState State;
- BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState();
+ uint64_t PreSize = Store.TotalSize();
+ BlockStoreCompactState State;
- CHECK(!SnapshotState.m_ActiveWriteBlocks.contains(0));
+ CHECK(!Store.IsWriting(0));
State.IncludeBlock(0);
uint64_t SkipChunkCount = 2;
@@ -2544,14 +1975,13 @@ TEST_CASE("blockstore.compact.blocks")
}
SUBCASE("compact every other item")
{
- uint64_t PreSize = Store.TotalSize();
- BlockStoreCompactState State;
- BlockStore::ReclaimSnapshotState SnapshotState = Store.GetReclaimSnapshotState();
- bool SkipFlag = false;
+ uint64_t PreSize = Store.TotalSize();
+ BlockStoreCompactState State;
+ bool SkipFlag = false;
for (const BlockStoreLocation& Location : ChunkLocations)
{
- if (SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex))
+ if (Store.IsWriting(Location.BlockIndex))
{
continue;
}
@@ -2568,7 +1998,7 @@ TEST_CASE("blockstore.compact.blocks")
std::vector<BlockStoreLocation> DroppedLocations;
for (const BlockStoreLocation& Location : ChunkLocations)
{
- if (SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex))
+ if (Store.IsWriting(Location.BlockIndex))
{
continue;
}
@@ -2605,7 +2035,7 @@ TEST_CASE("blockstore.compact.blocks")
for (size_t Index = 0; Index < ChunkLocations.size(); Index++)
{
const BlockStoreLocation& Location = ChunkLocations[Index];
- if (SkipFlag && !SnapshotState.m_ActiveWriteBlocks.contains(Location.BlockIndex))
+ if (SkipFlag && !Store.IsWriting(Location.BlockIndex))
{
CHECK(std::find(DroppedLocations.begin(), DroppedLocations.end(), Location) != DroppedLocations.end());
CHECK(!Store.TryGetChunk(Location));