aboutsummaryrefslogtreecommitdiff
path: root/zenstore/blockstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-30 01:29:24 +0200
committerDan Engelbrecht <[email protected]>2022-04-30 09:41:24 +0200
commitde6057de814a4dc16654bdda84f697476b2ebef5 (patch)
tree7f5fac98b79a68e98b3effc91162379438682c8d /zenstore/blockstore.cpp
parentMerge remote-tracking branch 'origin/main' into de/cache-with-block-store (diff)
downloadzen-de6057de814a4dc16654bdda84f697476b2ebef5.tar.xz
zen-de6057de814a4dc16654bdda84f697476b2ebef5.zip
first pass at generic block store with gc
Diffstat (limited to 'zenstore/blockstore.cpp')
-rw-r--r--zenstore/blockstore.cpp508
1 files changed, 508 insertions, 0 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index 1eb859d5a..a897ed902 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -2,6 +2,9 @@
#include "compactcas.h"
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zenstore/blockstore.h>
#if ZEN_WITH_TESTS
@@ -108,6 +111,511 @@ BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::functio
m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun));
}
+namespace {
+ const char* DataExtension = ".ucas";
+
+ std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex)
+ {
+ ExtendablePathBuilder<256> Path;
+
+ char BlockHexString[9];
+ ToHexNumber(BlockIndex, BlockHexString);
+
+ Path.Append(BlocksBasePath);
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(BlockHexString, BlockHexString + 4);
+ Path.AppendSeparator();
+ Path.Append(BlockHexString);
+ Path.Append(DataExtension);
+ return Path.ToPath();
+ }
+} // namespace
+
+void
+BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ const std::vector<BlockStoreLocation>& KnownLocations)
+{
+ ZEN_ASSERT(MaxBlockSize > 0);
+ ZEN_ASSERT(MaxBlockCount > 0);
+ ZEN_ASSERT(IsPow2(MaxBlockCount));
+
+ m_BlocksBasePath = BlocksBasePath;
+ m_MaxBlockSize = MaxBlockSize;
+
+ m_TotalSize = 0;
+ m_ChunkBlocks.clear();
+
+ std::unordered_set<uint32_t> KnownBlocks;
+ for (const auto& Entry : KnownLocations)
+ {
+ m_TotalSize.fetch_add(Entry.Size, std::memory_order_seq_cst);
+ KnownBlocks.insert(Entry.BlockIndex);
+ }
+
+ if (std::filesystem::is_directory(m_BlocksBasePath))
+ {
+ std::vector<std::filesystem::path> FoldersToScan;
+ FoldersToScan.push_back(m_BlocksBasePath);
+ size_t FolderOffset = 0;
+ while (FolderOffset < FoldersToScan.size())
+ {
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
+ {
+ if (Entry.is_directory())
+ {
+ FoldersToScan.push_back(Entry.path());
+ continue;
+ }
+ if (Entry.is_regular_file())
+ {
+ const std::filesystem::path Path = Entry.path();
+ if (Path.extension() != DataExtension)
+ {
+ continue;
+ }
+ std::string FileName = Path.stem().string();
+ uint32_t BlockIndex;
+ bool OK = ParseHexNumber(FileName, BlockIndex);
+ if (!OK)
+ {
+ continue;
+ }
+ if (!KnownBlocks.contains(BlockIndex))
+ {
+ // Log removing unreferenced block
+ // Clear out unused blocks
+ ZEN_INFO("removing unused block for '{}' at '{}'", m_BlocksBasePath, Path);
+ std::error_code Ec;
+ std::filesystem::remove(Path, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
+ }
+ continue;
+ }
+ Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path);
+ BlockFile->Open();
+ m_ChunkBlocks[BlockIndex] = BlockFile;
+ }
+ }
+ ++FolderOffset;
+ }
+ }
+ else
+ {
+ CreateDirectories(m_BlocksBasePath);
+ }
+}
+
+BlockStoreLocation
+BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment)
+{
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ bool IsWriting = m_WriteBlock != nullptr;
+ if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize)
+ {
+ if (m_WriteBlock)
+ {
+ m_WriteBlock = nullptr;
+ }
+ {
+ if (m_ChunkBlocks.size() == m_MaxBlockCount)
+ {
+ throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
+ }
+ WriteBlockIndex += IsWriting ? 1 : 0;
+ while (m_ChunkBlocks.contains(WriteBlockIndex))
+ {
+ WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
+ }
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+ m_WriteBlock = new BlockStoreFile(BlockPath);
+ m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ }
+ m_CurrentInsertOffset = 0;
+ m_WriteBlock->Create(m_MaxBlockSize);
+ }
+ uint64_t InsertOffset = m_CurrentInsertOffset;
+ m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
+ Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
+ InsertLock.ReleaseNow();
+
+ BlockStoreLocation Location{.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size};
+ WriteBlock->Write(Data, Size, InsertOffset);
+
+ return Location;
+}
+
+/*
+IoBuffer
+BlockStore::ReadChunk(const BlockStoreLocation& Location)
+{
+ RwLock::SharedLockScope InsertLock(m_InsertLock);
+ Ref<BlockStoreFile> ChunkBlock = m_ChunkBlocks[Location.BlockIndex];
+ InsertLock.ReleaseNow();
+ return ChunkBlock->GetChunk(Location.Offset, Location.Size);
+}
+*/
+
+Ref<BlockStoreFile>
+BlockStore::GetChunkBlock(const BlockStoreLocation& Location)
+{
+ RwLock::SharedLockScope InsertLock(m_InsertLock);
+ return m_ChunkBlocks[Location.BlockIndex];
+}
+
+void
+BlockStore::Flush()
+{
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ if (m_CurrentInsertOffset > 0)
+ {
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
+ m_WriteBlock = nullptr;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
+ }
+}
+
+// TODO: Almost there - some bug remain and API might need tweaking
+void
+BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
+ const std::vector<size_t>& KeepChunkIndexes,
+ uint64_t PayloadAlignment,
+ bool DryRun,
+ const ReclaimCallback& Callback)
+{
+ if (ChunkLocations.empty())
+ {
+ return;
+ }
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = ChunkLocations.size();
+ uint64_t DeletedSize = 0;
+ uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+
+ uint64_t MovedCount = 0;
+ uint64_t DeletedCount = 0;
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([this,
+ &TotalTimer,
+ &WriteBlockTimeUs,
+ &WriteBlockLongestTimeUs,
+ &ReadBlockTimeUs,
+ &ReadBlockLongestTimeUs,
+ &TotalChunkCount,
+ &DeletedCount,
+ &MovedCount,
+ &DeletedSize,
+ OldTotalSize] {
+ ZEN_INFO(
+ "garbage collect for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
+ "#{} "
+ "of #{} "
+ "chunks ({}).",
+ m_BlocksBasePath,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedCount,
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize));
+ });
+
+ size_t BlockCount = 0;
+ uint64_t ExcludeBlockIndex = 0x800000000ull;
+ {
+ RwLock::ExclusiveLockScope __(m_InsertLock);
+ if (m_WriteBlock)
+ {
+ ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ }
+ BlockCount = m_ChunkBlocks.size();
+ }
+
+ std::unordered_map<size_t, BlockStoreLocation> LocationLookup;
+ LocationLookup.reserve(TotalChunkCount);
+
+ std::unordered_set<size_t> KeepChunkMap;
+ KeepChunkMap.reserve(KeepChunkIndexes.size());
+ for (size_t KeepChunkIndex : KeepChunkIndexes)
+ {
+ const BlockStoreLocation& Location = ChunkLocations[KeepChunkIndex];
+ if (Location.BlockIndex == ExcludeBlockIndex)
+ {
+ continue;
+ }
+ KeepChunkMap.insert(KeepChunkIndex);
+ }
+ std::unordered_set<size_t> DeleteChunkMap;
+ DeleteChunkMap.reserve(ChunkLocations.size() - KeepChunkIndexes.size());
+
+ std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
+ std::vector<std::vector<size_t>> KeepChunks;
+ std::vector<std::vector<size_t>> DeleteChunks;
+
+ BlockIndexToChunkMapIndex.reserve(BlockCount);
+ KeepChunks.reserve(BlockCount);
+ DeleteChunks.reserve(BlockCount);
+ size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
+
+ size_t DeleteCount = 0;
+ uint64_t NewTotalSize = 0;
+ for (size_t Index = 0; Index < TotalChunkCount; ++Index)
+ {
+ const BlockStoreLocation& Location = ChunkLocations[Index];
+ LocationLookup[Index] = Location;
+ if (Location.BlockIndex == ExcludeBlockIndex)
+ {
+ continue;
+ }
+
+ auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex);
+ size_t ChunkMapIndex = 0;
+ if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
+ {
+ ChunkMapIndex = KeepChunks.size();
+ BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex;
+ KeepChunks.resize(ChunkMapIndex + 1);
+ KeepChunks.back().reserve(GuesstimateCountPerBlock);
+ DeleteChunks.resize(ChunkMapIndex + 1);
+ DeleteChunks.back().reserve(GuesstimateCountPerBlock);
+ }
+ else
+ {
+ ChunkMapIndex = BlockIndexPtr->second;
+ }
+
+ if (KeepChunkMap.contains(Index))
+ {
+ std::vector<size_t>& IndexMap = KeepChunks[ChunkMapIndex];
+ IndexMap.push_back(Index);
+ NewTotalSize += Location.Size;
+ continue;
+ }
+ std::vector<size_t>& IndexMap = DeleteChunks[ChunkMapIndex];
+ IndexMap.push_back(Index);
+ DeleteCount++;
+ }
+
+ std::unordered_set<uint32_t> BlocksToReWrite;
+ BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
+ for (const auto& Entry : BlockIndexToChunkMapIndex)
+ {
+ uint32_t BlockIndex = Entry.first;
+ size_t ChunkMapIndex = Entry.second;
+ const std::vector<size_t>& ChunkMap = DeleteChunks[ChunkMapIndex];
+ if (ChunkMap.empty())
+ {
+ continue;
+ }
+ BlocksToReWrite.insert(BlockIndex);
+ }
+
+ if (DryRun)
+ {
+ uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
+ ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}",
+ m_BlocksBasePath,
+ DeleteCount,
+ NiceBytes(TotalSize - NewTotalSize),
+ TotalChunkCount,
+ NiceBytes(TotalSize));
+ return;
+ }
+
+ std::unordered_map<size_t, BlockStoreLocation> MovedChunks;
+ std::vector<size_t> RemovedChunks;
+
+ Ref<BlockStoreFile> NewBlockFile;
+ uint64_t WriteOffset = 0;
+ uint32_t NewBlockIndex = 0;
+
+ for (uint32_t BlockIndex : BlocksToReWrite)
+ {
+ const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
+
+ Ref<BlockStoreFile> OldBlockFile;
+ {
+ RwLock::SharedLockScope _i(m_InsertLock);
+ OldBlockFile = m_ChunkBlocks[BlockIndex];
+ ZEN_ASSERT(OldBlockFile);
+ }
+
+ const std::vector<size_t>& KeepMap = KeepChunks[ChunkMapIndex];
+ if (KeepMap.empty())
+ {
+ const std::vector<size_t>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ for (size_t DeleteIndex : DeleteMap)
+ {
+ RemovedChunks.push_back(DeleteIndex);
+ DeletedSize += ChunkLocations[DeleteIndex].Size;
+ DeletedCount++;
+ }
+ Callback(MovedChunks, RemovedChunks);
+ MovedChunks.clear();
+ RemovedChunks.clear();
+ {
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ }
+ ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_BlocksBasePath, BlockIndex, OldBlockFile->GetPath());
+ std::error_code Ec;
+ OldBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
+ }
+ continue;
+ }
+
+ std::vector<uint8_t> Chunk;
+ for (const size_t& ChunkIndex : KeepMap)
+ {
+ const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
+ Chunk.resize(ChunkLocation.Size);
+ OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
+
+ if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
+ {
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
+
+ if (NewBlockFile)
+ {
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ }
+ {
+ Callback(MovedChunks, RemovedChunks);
+ MovedChunks.clear();
+ RemovedChunks.clear();
+ RwLock::ExclusiveLockScope __(m_InsertLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_ChunkBlocks.size() == m_MaxBlockCount)
+ {
+ ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
+ m_BlocksBasePath,
+ static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ return;
+ }
+ while (m_ChunkBlocks.contains(NextBlockIndex))
+ {
+ NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1);
+ }
+ std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
+ NewBlockFile = new BlockStoreFile(NewBlockPath);
+ m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
+ }
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
+ return;
+ }
+ if (Space.Free < m_MaxBlockSize)
+ {
+ uint64_t ReclaimedSpace = 0; // GcCtx.ClaimGCReserve();
+ if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
+ {
+ ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
+ m_BlocksBasePath,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ RwLock::ExclusiveLockScope _l(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ m_ChunkBlocks.erase(NextBlockIndex);
+ return;
+ }
+
+ ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
+ m_BlocksBasePath,
+ ReclaimedSpace,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ }
+ NewBlockFile->Create(m_MaxBlockSize);
+ NewBlockIndex = NextBlockIndex;
+ WriteOffset = 0;
+ }
+
+ NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
+ MovedChunks[ChunkIndex] = {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()};
+ WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment);
+ MovedCount++;
+ }
+ Chunk.clear();
+ if (NewBlockFile)
+ {
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ NewBlockFile = {};
+ }
+
+ const std::vector<size_t>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ for (size_t DeleteIndex : DeleteMap)
+ {
+ RemovedChunks.push_back(DeleteIndex);
+ DeletedSize += ChunkLocations[DeleteIndex].Size;
+ DeletedCount++;
+ }
+
+ Callback(MovedChunks, RemovedChunks);
+ MovedChunks.clear();
+ RemovedChunks.clear();
+ {
+ RwLock::ExclusiveLockScope __(m_InsertLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ }
+ ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_BlocksBasePath, BlockIndex, OldBlockFile->GetPath());
+ std::error_code Ec;
+ OldBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
+ }
+ OldBlockFile = nullptr;
+ }
+
+ return;
+}
+
#if ZEN_WITH_TESTS
static bool