aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-03 12:15:35 +0200
committerDan Engelbrecht <[email protected]>2022-05-03 12:15:59 +0200
commit5dddf5f993dff479fbc429d10cbcc93601af90c9 (patch)
treefcb3329c35491acf793fdd4c23fa2992077b3675
parentmore tests for block store (diff)
downloadzen-5dddf5f993dff479fbc429d10cbcc93601af90c9.tar.xz
zen-5dddf5f993dff479fbc429d10cbcc93601af90c9.zip
threading test for blockstore
-rw-r--r--zenstore/blockstore.cpp92
1 files changed, 91 insertions, 1 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index f469e3746..0992662c2 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -11,6 +11,7 @@
# include <zencore/compactbinarybuilder.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
# include <algorithm>
# include <random>
#endif
@@ -209,6 +210,11 @@ BlockStore::Close()
BlockStoreLocation
BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment)
{
+ ZEN_ASSERT(Data != nullptr);
+ ZEN_ASSERT(Size > 0u);
+ ZEN_ASSERT(Size <= m_MaxBlockSize);
+ ZEN_ASSERT(Alignment > 0u);
+
RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
@@ -1185,7 +1191,7 @@ TEST_CASE("blockstore.iterate.chunks")
auto RootDirectory = TempDir.Path();
BlockStore Store;
- Store.Initialize(RootDirectory / "store", 128, 1024, {});
+ Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {});
Ref<BlockStoreFile> BadChunk = Store.GetChunkBlock({.BlockIndex = 0, .Offset = 0, .Size = 512});
CHECK(!BadChunk);
@@ -1194,6 +1200,7 @@ TEST_CASE("blockstore.iterate.chunks")
std::string SecondChunkData = "This is the data for the second chunk that we will write";
BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
+ Store.Flush();
std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L');
BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4);
@@ -1345,6 +1352,89 @@ TEST_CASE("blockstore.reclaim.space")
CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount);
}
+TEST_CASE("blockstore.thread.read.write")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 1088, 1024, {});
+
+ constexpr size_t ChunkCount = 1000;
+ constexpr size_t Alignment = 8;
+ std::vector<IoBuffer> Chunks;
+ std::vector<IoHash> ChunkHashes;
+ Chunks.reserve(ChunkCount);
+ ChunkHashes.reserve(ChunkCount);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ IoBuffer Chunk = CreateChunk(57 + ChunkIndex / 2);
+ Chunks.push_back(Chunk);
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ std::vector<BlockStoreLocation> ChunkLocations;
+ ChunkLocations.resize(ChunkCount);
+
+ WorkerThreadPool WorkerPool(8);
+ std::atomic<size_t> WorkCompleted = 0;
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ ChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ WorkCompleted = 0;
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(ChunkLocations[ChunkIndex]);
+ CHECK(ChunkBlock);
+ IoBuffer VerifyChunk = ChunkBlock->GetChunk(ChunkLocations[ChunkIndex].Offset, ChunkLocations[ChunkIndex].Size);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ std::vector<BlockStoreLocation> SecondChunkLocations;
+ SecondChunkLocations.resize(ChunkCount);
+ WorkCompleted = 0;
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ SecondChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment);
+ WorkCompleted.fetch_add(1);
+ });
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(ChunkLocations[ChunkIndex]);
+ CHECK(ChunkBlock);
+ IoBuffer VerifyChunk = ChunkBlock->GetChunk(ChunkLocations[ChunkIndex].Offset, ChunkLocations[ChunkIndex].Size);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size() * 2)
+ {
+ Sleep(1);
+ }
+}
+
#endif
void