diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-03 23:04:45 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-03 23:04:45 +0200 |
| commit | a19eee841d7ce0c9c868dced40a6380f55cdb9bd (patch) | |
| tree | 7dc1a81d9ba159588e845c94c10eb7e391cfed9b /zenstore/blockstore.cpp | |
| parent | unused variable in test fix (diff) | |
| download | zen-a19eee841d7ce0c9c868dced40a6380f55cdb9bd.tar.xz zen-a19eee841d7ce0c9c868dced40a6380f55cdb9bd.zip | |
handle that more than one block can be written to in parallel
Diffstat (limited to 'zenstore/blockstore.cpp')
| -rw-r--r-- | zenstore/blockstore.cpp | 36 |
1 files changed, 24 insertions, 12 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp index cb22551b9..54a8eb9df 100644 --- a/zenstore/blockstore.cpp +++ b/zenstore/blockstore.cpp @@ -207,8 +207,8 @@ BlockStore::Close() m_BlocksBasePath.clear(); } -BlockStoreLocation -BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment) +void +BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, WriteChunkCallback Callback) { ZEN_ASSERT(Data != nullptr); ZEN_ASSERT(Size > 0u); @@ -246,11 +246,17 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment) uint64_t InsertOffset = m_CurrentInsertOffset; m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); Ref<BlockStoreFile> WriteBlock = m_WriteBlock; + m_ActiveWriteBlocks.push_back(WriteBlockIndex); InsertLock.ReleaseNow(); WriteBlock->Write(Data, Size, InsertOffset); - return {.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}; + Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}); + + { + RwLock::ExclusiveLockScope _(m_InsertLock); + m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); + } } BlockStore::ReclaimSnapshotState @@ -258,8 +264,11 @@ BlockStore::GetReclaimSnapshotState() { ReclaimSnapshotState State; RwLock::ExclusiveLockScope _(m_InsertLock); - State.ExcludeBlockIndex = m_WriteBlock ? m_WriteBlockIndex.load(std::memory_order_acquire) : 0xffffffffu; - State.BlockCount = m_ChunkBlocks.size(); + for (uint32_t BlockIndex : m_ActiveWriteBlocks) + { + State.m_ActiveWriteBlocks.insert(BlockIndex); + } + State.BlockCount = m_ChunkBlocks.size(); _.ReleaseNow(); return State; } @@ -357,7 +366,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, { const BlockStoreLocation& Location = ChunkLocations[Index]; OldTotalSize += Location.Size; - if (Location.BlockIndex == Snapshot.ExcludeBlockIndex) + if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex)) { continue; } @@ -1041,7 +1050,8 @@ TEST_CASE("blockstore.blockfile") namespace { BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment) { - BlockStoreLocation Location = Store.WriteChunk(String.data(), String.length(), PayloadAlignment); + BlockStoreLocation Location; + Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; }); CHECK(Location.Size == String.length()); return Location; }; @@ -1254,7 +1264,7 @@ TEST_CASE("blockstore.reclaim.space") for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer Chunk = CreateChunk(57 + ChunkIndex); - ChunkLocations.push_back(Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment)); + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); }); ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } @@ -1381,8 +1391,8 @@ TEST_CASE("blockstore.thread.read.write") for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - ChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment); + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); WorkCompleted.fetch_add(1); }); } @@ -1415,8 +1425,10 @@ TEST_CASE("blockstore.thread.read.write") for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - SecondChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment); + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + SecondChunkLocations[ChunkIndex] = L; + }); WorkCompleted.fetch_add(1); }); WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { |