aboutsummaryrefslogtreecommitdiff
path: root/zenstore/blockstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-03 23:04:45 +0200
committerDan Engelbrecht <[email protected]>2022-05-03 23:04:45 +0200
commita19eee841d7ce0c9c868dced40a6380f55cdb9bd (patch)
tree7dc1a81d9ba159588e845c94c10eb7e391cfed9b /zenstore/blockstore.cpp
parentunused variable in test fix (diff)
downloadzen-a19eee841d7ce0c9c868dced40a6380f55cdb9bd.tar.xz
zen-a19eee841d7ce0c9c868dced40a6380f55cdb9bd.zip
handle that more than one block can be written to in parallel
Diffstat (limited to 'zenstore/blockstore.cpp')
-rw-r--r--zenstore/blockstore.cpp36
1 files changed, 24 insertions, 12 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index cb22551b9..54a8eb9df 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -207,8 +207,8 @@ BlockStore::Close()
m_BlocksBasePath.clear();
}
-BlockStoreLocation
-BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment)
+void
+BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, WriteChunkCallback Callback)
{
ZEN_ASSERT(Data != nullptr);
ZEN_ASSERT(Size > 0u);
@@ -246,11 +246,17 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment)
uint64_t InsertOffset = m_CurrentInsertOffset;
m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
+ m_ActiveWriteBlocks.push_back(WriteBlockIndex);
InsertLock.ReleaseNow();
WriteBlock->Write(Data, Size, InsertOffset);
- return {.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size};
+ Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size});
+
+ {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
+ }
}
BlockStore::ReclaimSnapshotState
@@ -258,8 +264,11 @@ BlockStore::GetReclaimSnapshotState()
{
ReclaimSnapshotState State;
RwLock::ExclusiveLockScope _(m_InsertLock);
- State.ExcludeBlockIndex = m_WriteBlock ? m_WriteBlockIndex.load(std::memory_order_acquire) : 0xffffffffu;
- State.BlockCount = m_ChunkBlocks.size();
+ for (uint32_t BlockIndex : m_ActiveWriteBlocks)
+ {
+ State.m_ActiveWriteBlocks.insert(BlockIndex);
+ }
+ State.BlockCount = m_ChunkBlocks.size();
_.ReleaseNow();
return State;
}
@@ -357,7 +366,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
{
const BlockStoreLocation& Location = ChunkLocations[Index];
OldTotalSize += Location.Size;
- if (Location.BlockIndex == Snapshot.ExcludeBlockIndex)
+ if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex))
{
continue;
}
@@ -1041,7 +1050,8 @@ TEST_CASE("blockstore.blockfile")
namespace {
BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment)
{
- BlockStoreLocation Location = Store.WriteChunk(String.data(), String.length(), PayloadAlignment);
+ BlockStoreLocation Location;
+ Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; });
CHECK(Location.Size == String.length());
return Location;
};
@@ -1254,7 +1264,7 @@ TEST_CASE("blockstore.reclaim.space")
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
IoBuffer Chunk = CreateChunk(57 + ChunkIndex);
- ChunkLocations.push_back(Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment));
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); });
ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
}
@@ -1381,8 +1391,8 @@ TEST_CASE("blockstore.thread.read.write")
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- ChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment);
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; });
WorkCompleted.fetch_add(1);
});
}
@@ -1415,8 +1425,10 @@ TEST_CASE("blockstore.thread.read.write")
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- SecondChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment);
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
+ SecondChunkLocations[ChunkIndex] = L;
+ });
WorkCompleted.fetch_add(1);
});
WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {