diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-01 10:17:35 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-01 10:17:35 +0200 |
| commit | 7dc31ec99aa3fc2f40000258e45d5d6381403ff8 (patch) | |
| tree | 9c5a986c506128405fe63df3acfbdede4c2a2995 | |
| parent | first pass at generic block store with gc (diff) | |
| download | zen-7dc31ec99aa3fc2f40000258e45d5d6381403ff8.tar.xz zen-7dc31ec99aa3fc2f40000258e45d5d6381403ff8.zip | |
threading issues resolved
| -rw-r--r-- | zenstore/blockstore.cpp | 113 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 53 | ||||
| -rw-r--r-- | zenstore/include/zenstore/blockstore.h | 54 |
3 files changed, 120 insertions, 100 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp index a897ed902..4cf3c6486 100644 --- a/zenstore/blockstore.cpp +++ b/zenstore/blockstore.cpp @@ -209,8 +209,8 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, } } -BlockStoreLocation -BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment) +void +BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, WriteCompleteCallback Callback) { RwLock::ExclusiveLockScope InsertLock(m_InsertLock); @@ -243,24 +243,30 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment) uint64_t InsertOffset = m_CurrentInsertOffset; m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); Ref<BlockStoreFile> WriteBlock = m_WriteBlock; + m_ActiveWriteBlockIndexes.push_back(WriteBlockIndex); InsertLock.ReleaseNow(); - BlockStoreLocation Location{.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}; WriteBlock->Write(Data, Size, InsertOffset); - return Location; + Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}); + + RwLock::ExclusiveLockScope _(m_InsertLock); + m_ActiveWriteBlockIndexes.erase(std::find(m_ActiveWriteBlockIndexes.begin(), m_ActiveWriteBlockIndexes.end(), WriteBlockIndex)); } -/* -IoBuffer -BlockStore::ReadChunk(const BlockStoreLocation& Location) +BlockStore::ReclaimSnapshotState +BlockStore::GetReclaimSnapshotState() { - RwLock::SharedLockScope InsertLock(m_InsertLock); - Ref<BlockStoreFile> ChunkBlock = m_ChunkBlocks[Location.BlockIndex]; - InsertLock.ReleaseNow(); - return ChunkBlock->GetChunk(Location.Offset, Location.Size); + ReclaimSnapshotState State; + RwLock::ExclusiveLockScope _(m_InsertLock); + for (uint32_t BlockIndex : m_ActiveWriteBlockIndexes) + { + State.ExcludeBlockIndexes.insert(BlockIndex); + } + State.BlockCount = m_ChunkBlocks.size(); + _.ReleaseNow(); + return State; } -*/ Ref<BlockStoreFile> BlockStore::GetChunkBlock(const BlockStoreLocation& Location) @@ -283,9 +289,9 @@ BlockStore::Flush() } } -// TODO: Almost there - some bug remain and API might need tweaking void -BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, +BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, + const std::vector<BlockStoreLocation>& ChunkLocations, const std::vector<size_t>& KeepChunkIndexes, uint64_t PayloadAlignment, bool DryRun, @@ -336,41 +342,22 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, NiceBytes(OldTotalSize)); }); - size_t BlockCount = 0; - uint64_t ExcludeBlockIndex = 0x800000000ull; - { - RwLock::ExclusiveLockScope __(m_InsertLock); - if (m_WriteBlock) - { - ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - } - BlockCount = m_ChunkBlocks.size(); - } - - std::unordered_map<size_t, BlockStoreLocation> LocationLookup; - LocationLookup.reserve(TotalChunkCount); + size_t BlockCount = Snapshot.BlockCount; std::unordered_set<size_t> KeepChunkMap; KeepChunkMap.reserve(KeepChunkIndexes.size()); for (size_t KeepChunkIndex : KeepChunkIndexes) { - const BlockStoreLocation& Location = ChunkLocations[KeepChunkIndex]; - if (Location.BlockIndex == ExcludeBlockIndex) - { - continue; - } KeepChunkMap.insert(KeepChunkIndex); } - std::unordered_set<size_t> DeleteChunkMap; - DeleteChunkMap.reserve(ChunkLocations.size() - KeepChunkIndexes.size()); std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; - std::vector<std::vector<size_t>> KeepChunks; - std::vector<std::vector<size_t>> DeleteChunks; + std::vector<std::vector<size_t>> BlockKeepChunks; + std::vector<std::vector<size_t>> BlockDeleteChunks; BlockIndexToChunkMapIndex.reserve(BlockCount); - KeepChunks.reserve(BlockCount); - DeleteChunks.reserve(BlockCount); + BlockKeepChunks.reserve(BlockCount); + BlockDeleteChunks.reserve(BlockCount); size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; size_t DeleteCount = 0; @@ -378,8 +365,7 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, for (size_t Index = 0; Index < TotalChunkCount; ++Index) { const BlockStoreLocation& Location = ChunkLocations[Index]; - LocationLookup[Index] = Location; - if (Location.BlockIndex == ExcludeBlockIndex) + if (Snapshot.ExcludeBlockIndexes.contains(Location.BlockIndex)) { continue; } @@ -388,12 +374,12 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, size_t ChunkMapIndex = 0; if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) { - ChunkMapIndex = KeepChunks.size(); + ChunkMapIndex = BlockKeepChunks.size(); BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex; - KeepChunks.resize(ChunkMapIndex + 1); - KeepChunks.back().reserve(GuesstimateCountPerBlock); - DeleteChunks.resize(ChunkMapIndex + 1); - DeleteChunks.back().reserve(GuesstimateCountPerBlock); + BlockKeepChunks.resize(ChunkMapIndex + 1); + BlockKeepChunks.back().reserve(GuesstimateCountPerBlock); + BlockDeleteChunks.resize(ChunkMapIndex + 1); + BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock); } else { @@ -402,12 +388,12 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, if (KeepChunkMap.contains(Index)) { - std::vector<size_t>& IndexMap = KeepChunks[ChunkMapIndex]; + std::vector<size_t>& IndexMap = BlockKeepChunks[ChunkMapIndex]; IndexMap.push_back(Index); NewTotalSize += Location.Size; continue; } - std::vector<size_t>& IndexMap = DeleteChunks[ChunkMapIndex]; + std::vector<size_t>& IndexMap = BlockDeleteChunks[ChunkMapIndex]; IndexMap.push_back(Index); DeleteCount++; } @@ -418,7 +404,7 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, { uint32_t BlockIndex = Entry.first; size_t ChunkMapIndex = Entry.second; - const std::vector<size_t>& ChunkMap = DeleteChunks[ChunkMapIndex]; + const std::vector<size_t>& ChunkMap = BlockDeleteChunks[ChunkMapIndex]; if (ChunkMap.empty()) { continue; @@ -438,9 +424,6 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, return; } - std::unordered_map<size_t, BlockStoreLocation> MovedChunks; - std::vector<size_t> RemovedChunks; - Ref<BlockStoreFile> NewBlockFile; uint64_t WriteOffset = 0; uint32_t NewBlockIndex = 0; @@ -456,19 +439,16 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, ZEN_ASSERT(OldBlockFile); } - const std::vector<size_t>& KeepMap = KeepChunks[ChunkMapIndex]; + const std::vector<size_t>& KeepMap = BlockKeepChunks[ChunkMapIndex]; if (KeepMap.empty()) { - const std::vector<size_t>& DeleteMap = DeleteChunks[ChunkMapIndex]; + const std::vector<size_t>& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; for (size_t DeleteIndex : DeleteMap) { - RemovedChunks.push_back(DeleteIndex); DeletedSize += ChunkLocations[DeleteIndex].Size; - DeletedCount++; } - Callback(MovedChunks, RemovedChunks); - MovedChunks.clear(); - RemovedChunks.clear(); + Callback(BlockIndex, {}, DeleteMap); + DeletedCount += DeleteMap.size(); { RwLock::ExclusiveLockScope _i(m_InsertLock); Stopwatch Timer; @@ -489,7 +469,8 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, continue; } - std::vector<uint8_t> Chunk; + std::unordered_map<size_t, BlockStoreLocation> MovedChunks; + std::vector<uint8_t> Chunk; for (const size_t& ChunkIndex : KeepMap) { const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; @@ -506,9 +487,9 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, NewBlockFile->Flush(); } { - Callback(MovedChunks, RemovedChunks); + Callback(0xfffffffful, MovedChunks, {}); + MovedCount += KeepMap.size(); MovedChunks.clear(); - RemovedChunks.clear(); RwLock::ExclusiveLockScope __(m_InsertLock); Stopwatch Timer; const auto ___ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { @@ -572,7 +553,6 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); MovedChunks[ChunkIndex] = {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}; WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); - MovedCount++; } Chunk.clear(); if (NewBlockFile) @@ -582,17 +562,16 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations, NewBlockFile = {}; } - const std::vector<size_t>& DeleteMap = DeleteChunks[ChunkMapIndex]; + const std::vector<size_t>& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; for (size_t DeleteIndex : DeleteMap) { - RemovedChunks.push_back(DeleteIndex); DeletedSize += ChunkLocations[DeleteIndex].Size; - DeletedCount++; } - Callback(MovedChunks, RemovedChunks); + Callback(BlockIndex, MovedChunks, DeleteMap); + MovedCount += KeepMap.size(); + DeletedCount += DeleteMap.size(); MovedChunks.clear(); - RemovedChunks.clear(); { RwLock::ExclusiveLockScope __(m_InsertLock); Stopwatch Timer; diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 2b48eb143..84019d7aa 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -283,15 +283,16 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const // This should be a rare occasion and the current flow reduces the time we block for // reads, insert and GC. - BlockStoreLocation Location = m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment); - BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); - const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; - m_CasLog.Append(IndexEntry); - { - RwLock::ExclusiveLockScope _(m_LocationMapLock); - m_LocationMap.emplace(ChunkHash, DiskLocation); - } - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst); + m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [this, &ChunkHash, ChunkSize](const BlockStoreLocation& Location) { + BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); + const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; + m_CasLog.Append(IndexEntry); + { + RwLock::ExclusiveLockScope _(m_LocationMapLock); + m_LocationMap.emplace(ChunkHash, DiskLocation); + } + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst); + }); return CasStore::InsertResult{.New = true}; } @@ -311,10 +312,15 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { return IoBuffer(); } - BlockStoreLocation Location = KeyIt->second.Get(m_PayloadAlignment); - Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); // m_ChunkBlocks[Location.BlockIndex]; + BlockStoreLocation Location = KeyIt->second.Get(m_PayloadAlignment); _.ReleaseNow(); + Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); // m_ChunkBlocks[Location.BlockIndex]; + if (!ChunkBlock) + { + return IoBuffer(); + } + return ChunkBlock->GetChunk(Location.Offset, Location.Size); } @@ -476,7 +482,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; - LocationMap_t LocationMap; + LocationMap_t LocationMap; + BlockStore::ReclaimSnapshotState BlockStoreState; { RwLock::SharedLockScope ___(m_LocationMapLock); Stopwatch Timer; @@ -486,6 +493,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); LocationMap = m_LocationMap; + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); } uint64_t TotalChunkCount = LocationMap.size(); @@ -521,18 +529,23 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { - m_BlockStore.ReclaimSpace(ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); return; } + + auto GetChunkLocations = [] {}; + std::vector<IoHash> DeletedChunks; m_BlockStore.ReclaimSpace( + BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, false, [this, &DeletedChunks, &ChunkIndexToChunkHash, &LocationMap, &ReadBlockTimeUs, &ReadBlockLongestTimeUs]( + uint32_t BlockIndex, const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, - const std::vector<size_t> RemovedChunks) { + const std::vector<size_t>& RemovedChunks) { std::vector<CasDiskIndexEntry> LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); for (const auto& Entry : MovedChunks) @@ -572,6 +585,10 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) } m_LocationMap[Entry.Key] = Entry.Location; } + for (const auto& Entry : m_LocationMap) + { + ZEN_ASSERT(Entry.second.GetBlockIndex() != BlockIndex); + } } }); @@ -2093,7 +2110,13 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { CHECK(Cas.HaveChunk(ChunkHash)); - CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + if (ChunkHash != IoHash::HashBuffer(Cas.FindChunk(ChunkHash))) + { + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + CHECK(Buffer); + IoHash BufferHash = IoHash::HashBuffer(Buffer); + CHECK(ChunkHash == BufferHash); + } WorkCompleted.fetch_add(1); }); } diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h index 4dd6e5289..084142636 100644 --- a/zenstore/include/zenstore/blockstore.h +++ b/zenstore/include/zenstore/blockstore.h @@ -6,6 +6,8 @@ #include <zencore/zencore.h> #include <zenstore/basicfile.h> +#include <unordered_set> + namespace zen { ////////////////////////////////////////////////////////////////////////// @@ -108,31 +110,47 @@ private: class BlockStore { public: - void Initialize(const std::filesystem::path& BlocksBasePath, - uint64_t MaxBlockSize, - uint64_t MaxBlockCount, - const std::vector<BlockStoreLocation>& KnownLocations); - BlockStoreLocation WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment); + struct ReclaimSnapshotState + { + std::unordered_set<uint32_t> ExcludeBlockIndexes; + size_t BlockCount; + }; + typedef std::function<void(uint32_t BlockIndex, + const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, + const std::vector<size_t>& RemovedChunks)> + ReclaimCallback; + typedef std::function<void(const BlockStoreLocation& Location)> WriteCompleteCallback; + + void Initialize(const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + const std::vector<BlockStoreLocation>& KnownLocations); + void WriteChunk( + const void* Data, + uint64_t Size, + uint64_t Alignment, + WriteCompleteCallback Callback = [](const BlockStoreLocation&) {}); Ref<BlockStoreFile> GetChunkBlock(const BlockStoreLocation& Location); void Flush(); - typedef std::function<void(const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, const std::vector<size_t> RemovedChunks)> - ReclaimCallback; - - void ReclaimSpace( - const std::vector<BlockStoreLocation>& ChunkLocations, - const std::vector<size_t>& KeepChunkIndexes, - uint64_t PayloadAlignment, - bool DryRun, - const ReclaimCallback& Callback = [](const std::unordered_map<size_t, BlockStoreLocation>&, const std::vector<size_t>&) {}); + ReclaimSnapshotState GetReclaimSnapshotState(); + void ReclaimSpace( + const ReclaimSnapshotState& Snapshot, + const std::vector<BlockStoreLocation>& ChunkLocations, + const std::vector<size_t>& KeepChunkIndexes, + uint64_t PayloadAlignment, + bool DryRun, + const ReclaimCallback& Callback = [](uint32_t, const std::unordered_map<size_t, BlockStoreLocation>&, const std::vector<size_t>&) { + }); private: std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks; - RwLock m_InsertLock; // used to serialize inserts - Ref<BlockStoreFile> m_WriteBlock; - std::uint64_t m_CurrentInsertOffset = 0; - std::atomic_uint32_t m_WriteBlockIndex{}; + RwLock m_InsertLock; // used to serialize inserts + Ref<BlockStoreFile> m_WriteBlock; + std::uint64_t m_CurrentInsertOffset = 0; + std::atomic_uint32_t m_WriteBlockIndex{}; + std::vector<uint32_t> m_ActiveWriteBlockIndexes; uint64_t m_MaxBlockSize = 1u << 28; uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1; |