diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-30 01:29:24 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-04-30 09:41:24 +0200 |
| commit | de6057de814a4dc16654bdda84f697476b2ebef5 (patch) | |
| tree | 7f5fac98b79a68e98b3effc91162379438682c8d /zenstore/blockstore.cpp | |
| parent | Merge remote-tracking branch 'origin/main' into de/cache-with-block-store (diff) | |
| download | zen-de6057de814a4dc16654bdda84f697476b2ebef5.tar.xz zen-de6057de814a4dc16654bdda84f697476b2ebef5.zip | |
first pass at generic block store with gc
Diffstat (limited to 'zenstore/blockstore.cpp')
| -rw-r--r-- | zenstore/blockstore.cpp | 508 |
1 files changed, 508 insertions, 0 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp index 1eb859d5a..a897ed902 100644 --- a/zenstore/blockstore.cpp +++ b/zenstore/blockstore.cpp @@ -2,6 +2,9 @@ #include "compactcas.h" +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> #include <zenstore/blockstore.h> #if ZEN_WITH_TESTS @@ -108,6 +111,511 @@ BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::functio m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); } +namespace { + const char* DataExtension = ".ucas"; + + std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) + { + ExtendablePathBuilder<256> Path; + + char BlockHexString[9]; + ToHexNumber(BlockIndex, BlockHexString); + + Path.Append(BlocksBasePath); + Path.AppendSeparator(); + Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); + Path.AppendSeparator(); + Path.Append(BlockHexString); + Path.Append(DataExtension); + return Path.ToPath(); + } +} // namespace + +void +BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + const std::vector<BlockStoreLocation>& KnownLocations) +{ + ZEN_ASSERT(MaxBlockSize > 0); + ZEN_ASSERT(MaxBlockCount > 0); + ZEN_ASSERT(IsPow2(MaxBlockCount)); + + m_BlocksBasePath = BlocksBasePath; + m_MaxBlockSize = MaxBlockSize; + + m_TotalSize = 0; + m_ChunkBlocks.clear(); + + std::unordered_set<uint32_t> KnownBlocks; + for (const auto& Entry : KnownLocations) + { + m_TotalSize.fetch_add(Entry.Size, std::memory_order_seq_cst); + KnownBlocks.insert(Entry.BlockIndex); + } + + if (std::filesystem::is_directory(m_BlocksBasePath)) + { + std::vector<std::filesystem::path> FoldersToScan; + FoldersToScan.push_back(m_BlocksBasePath); + size_t FolderOffset = 0; + while (FolderOffset < FoldersToScan.size()) + { + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) + { + if (Entry.is_directory()) + { + FoldersToScan.push_back(Entry.path()); + continue; + } + if (Entry.is_regular_file()) + { + const std::filesystem::path Path = Entry.path(); + if (Path.extension() != DataExtension) + { + continue; + } + std::string FileName = Path.stem().string(); + uint32_t BlockIndex; + bool OK = ParseHexNumber(FileName, BlockIndex); + if (!OK) + { + continue; + } + if (!KnownBlocks.contains(BlockIndex)) + { + // Log removing unreferenced block + // Clear out unused blocks + ZEN_INFO("removing unused block for '{}' at '{}'", m_BlocksBasePath, Path); + std::error_code Ec; + std::filesystem::remove(Path, Ec); + if (Ec) + { + ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); + } + continue; + } + Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path); + BlockFile->Open(); + m_ChunkBlocks[BlockIndex] = BlockFile; + } + } + ++FolderOffset; + } + } + else + { + CreateDirectories(m_BlocksBasePath); + } +} + +BlockStoreLocation +BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment) +{ + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + bool IsWriting = m_WriteBlock != nullptr; + if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize) + { + if (m_WriteBlock) + { + m_WriteBlock = nullptr; + } + { + if (m_ChunkBlocks.size() == m_MaxBlockCount) + { + throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); + } + WriteBlockIndex += IsWriting ? 1 : 0; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); + } + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + m_WriteBlock = new BlockStoreFile(BlockPath); + m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + } + m_CurrentInsertOffset = 0; + m_WriteBlock->Create(m_MaxBlockSize); + } + uint64_t InsertOffset = m_CurrentInsertOffset; + m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); + Ref<BlockStoreFile> WriteBlock = m_WriteBlock; + InsertLock.ReleaseNow(); + + BlockStoreLocation Location{.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}; + WriteBlock->Write(Data, Size, InsertOffset); + + return Location; +} + +/* +IoBuffer +BlockStore::ReadChunk(const BlockStoreLocation& Location) +{ + RwLock::SharedLockScope InsertLock(m_InsertLock); + Ref<BlockStoreFile> ChunkBlock = m_ChunkBlocks[Location.BlockIndex]; + InsertLock.ReleaseNow(); + return ChunkBlock->GetChunk(Location.Offset, Location.Size); +} +*/ + +Ref<BlockStoreFile> +BlockStore::GetChunkBlock(const BlockStoreLocation& Location) +{ + RwLock::SharedLockScope InsertLock(m_InsertLock); + return m_ChunkBlocks[Location.BlockIndex]; +} + +void +BlockStore::Flush() +{ + RwLock::ExclusiveLockScope _(m_InsertLock); + if (m_CurrentInsertOffset > 0) + { + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); + m_WriteBlock = nullptr; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + } +} + +// TODO: Almost there - some bug remain and API might need tweaking +void +BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, + const std::vector<size_t>& KeepChunkIndexes, + uint64_t PayloadAlignment, + bool DryRun, + const ReclaimCallback& Callback) +{ + if (ChunkLocations.empty()) + { + return; + } + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = ChunkLocations.size(); + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + + uint64_t MovedCount = 0; + uint64_t DeletedCount = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([this, + &TotalTimer, + &WriteBlockTimeUs, + &WriteBlockLongestTimeUs, + &ReadBlockTimeUs, + &ReadBlockLongestTimeUs, + &TotalChunkCount, + &DeletedCount, + &MovedCount, + &DeletedSize, + OldTotalSize] { + ZEN_INFO( + "garbage collect for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " + "#{} " + "of #{} " + "chunks ({}).", + m_BlocksBasePath, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedCount, + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + }); + + size_t BlockCount = 0; + uint64_t ExcludeBlockIndex = 0x800000000ull; + { + RwLock::ExclusiveLockScope __(m_InsertLock); + if (m_WriteBlock) + { + ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + } + BlockCount = m_ChunkBlocks.size(); + } + + std::unordered_map<size_t, BlockStoreLocation> LocationLookup; + LocationLookup.reserve(TotalChunkCount); + + std::unordered_set<size_t> KeepChunkMap; + KeepChunkMap.reserve(KeepChunkIndexes.size()); + for (size_t KeepChunkIndex : KeepChunkIndexes) + { + const BlockStoreLocation& Location = ChunkLocations[KeepChunkIndex]; + if (Location.BlockIndex == ExcludeBlockIndex) + { + continue; + } + KeepChunkMap.insert(KeepChunkIndex); + } + std::unordered_set<size_t> DeleteChunkMap; + DeleteChunkMap.reserve(ChunkLocations.size() - KeepChunkIndexes.size()); + + std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; + std::vector<std::vector<size_t>> KeepChunks; + std::vector<std::vector<size_t>> DeleteChunks; + + BlockIndexToChunkMapIndex.reserve(BlockCount); + KeepChunks.reserve(BlockCount); + DeleteChunks.reserve(BlockCount); + size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; + + size_t DeleteCount = 0; + uint64_t NewTotalSize = 0; + for (size_t Index = 0; Index < TotalChunkCount; ++Index) + { + const BlockStoreLocation& Location = ChunkLocations[Index]; + LocationLookup[Index] = Location; + if (Location.BlockIndex == ExcludeBlockIndex) + { + continue; + } + + auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex); + size_t ChunkMapIndex = 0; + if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) + { + ChunkMapIndex = KeepChunks.size(); + BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex; + KeepChunks.resize(ChunkMapIndex + 1); + KeepChunks.back().reserve(GuesstimateCountPerBlock); + DeleteChunks.resize(ChunkMapIndex + 1); + DeleteChunks.back().reserve(GuesstimateCountPerBlock); + } + else + { + ChunkMapIndex = BlockIndexPtr->second; + } + + if (KeepChunkMap.contains(Index)) + { + std::vector<size_t>& IndexMap = KeepChunks[ChunkMapIndex]; + IndexMap.push_back(Index); + NewTotalSize += Location.Size; + continue; + } + std::vector<size_t>& IndexMap = DeleteChunks[ChunkMapIndex]; + IndexMap.push_back(Index); + DeleteCount++; + } + + std::unordered_set<uint32_t> BlocksToReWrite; + BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); + for (const auto& Entry : BlockIndexToChunkMapIndex) + { + uint32_t BlockIndex = Entry.first; + size_t ChunkMapIndex = Entry.second; + const std::vector<size_t>& ChunkMap = DeleteChunks[ChunkMapIndex]; + if (ChunkMap.empty()) + { + continue; + } + BlocksToReWrite.insert(BlockIndex); + } + + if (DryRun) + { + uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); + ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_BlocksBasePath, + DeleteCount, + NiceBytes(TotalSize - NewTotalSize), + TotalChunkCount, + NiceBytes(TotalSize)); + return; + } + + std::unordered_map<size_t, BlockStoreLocation> MovedChunks; + std::vector<size_t> RemovedChunks; + + Ref<BlockStoreFile> NewBlockFile; + uint64_t WriteOffset = 0; + uint32_t NewBlockIndex = 0; + + for (uint32_t BlockIndex : BlocksToReWrite) + { + const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; + + Ref<BlockStoreFile> OldBlockFile; + { + RwLock::SharedLockScope _i(m_InsertLock); + OldBlockFile = m_ChunkBlocks[BlockIndex]; + ZEN_ASSERT(OldBlockFile); + } + + const std::vector<size_t>& KeepMap = KeepChunks[ChunkMapIndex]; + if (KeepMap.empty()) + { + const std::vector<size_t>& DeleteMap = DeleteChunks[ChunkMapIndex]; + for (size_t DeleteIndex : DeleteMap) + { + RemovedChunks.push_back(DeleteIndex); + DeletedSize += ChunkLocations[DeleteIndex].Size; + DeletedCount++; + } + Callback(MovedChunks, RemovedChunks); + MovedChunks.clear(); + RemovedChunks.clear(); + { + RwLock::ExclusiveLockScope _i(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + m_ChunkBlocks[BlockIndex] = nullptr; + } + ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_BlocksBasePath, BlockIndex, OldBlockFile->GetPath()); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); + } + continue; + } + + std::vector<uint8_t> Chunk; + for (const size_t& ChunkIndex : KeepMap) + { + const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + + if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) + { + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); + + if (NewBlockFile) + { + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); + } + { + Callback(MovedChunks, RemovedChunks); + MovedChunks.clear(); + RemovedChunks.clear(); + RwLock::ExclusiveLockScope __(m_InsertLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_ChunkBlocks.size() == m_MaxBlockCount) + { + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_BlocksBasePath, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return; + } + while (m_ChunkBlocks.contains(NextBlockIndex)) + { + NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1); + } + std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; + } + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); + return; + } + if (Space.Free < m_MaxBlockSize) + { + uint64_t ReclaimedSpace = 0; // GcCtx.ClaimGCReserve(); + if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + { + ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", + m_BlocksBasePath, + m_MaxBlockSize, + NiceBytes(Space.Free + ReclaimedSpace)); + RwLock::ExclusiveLockScope _l(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + m_ChunkBlocks.erase(NextBlockIndex); + return; + } + + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_BlocksBasePath, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(m_MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; + } + + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + MovedChunks[ChunkIndex] = {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}; + WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); + MovedCount++; + } + Chunk.clear(); + if (NewBlockFile) + { + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); + NewBlockFile = {}; + } + + const std::vector<size_t>& DeleteMap = DeleteChunks[ChunkMapIndex]; + for (size_t DeleteIndex : DeleteMap) + { + RemovedChunks.push_back(DeleteIndex); + DeletedSize += ChunkLocations[DeleteIndex].Size; + DeletedCount++; + } + + Callback(MovedChunks, RemovedChunks); + MovedChunks.clear(); + RemovedChunks.clear(); + { + RwLock::ExclusiveLockScope __(m_InsertLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks[BlockIndex] = nullptr; + } + ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_BlocksBasePath, BlockIndex, OldBlockFile->GetPath()); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); + } + OldBlockFile = nullptr; + } + + return; +} + #if ZEN_WITH_TESTS static bool |