diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-03 12:15:35 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-03 12:15:59 +0200 |
| commit | 5dddf5f993dff479fbc429d10cbcc93601af90c9 (patch) | |
| tree | fcb3329c35491acf793fdd4c23fa2992077b3675 /zenstore/blockstore.cpp | |
| parent | more tests for block store (diff) | |
| download | zen-5dddf5f993dff479fbc429d10cbcc93601af90c9.tar.xz zen-5dddf5f993dff479fbc429d10cbcc93601af90c9.zip | |
threading test for blockstore
Diffstat (limited to 'zenstore/blockstore.cpp')
| -rw-r--r-- | zenstore/blockstore.cpp | 92 |
1 files changed, 91 insertions, 1 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp index f469e3746..0992662c2 100644 --- a/zenstore/blockstore.cpp +++ b/zenstore/blockstore.cpp @@ -11,6 +11,7 @@ # include <zencore/compactbinarybuilder.h> # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zencore/workthreadpool.h> # include <algorithm> # include <random> #endif @@ -209,6 +210,11 @@ BlockStore::Close() BlockStoreLocation BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment) { + ZEN_ASSERT(Data != nullptr); + ZEN_ASSERT(Size > 0u); + ZEN_ASSERT(Size <= m_MaxBlockSize); + ZEN_ASSERT(Alignment > 0u); + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); @@ -1185,7 +1191,7 @@ TEST_CASE("blockstore.iterate.chunks") auto RootDirectory = TempDir.Path(); BlockStore Store; - Store.Initialize(RootDirectory / "store", 128, 1024, {}); + Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {}); Ref<BlockStoreFile> BadChunk = Store.GetChunkBlock({.BlockIndex = 0, .Offset = 0, .Size = 512}); CHECK(!BadChunk); @@ -1194,6 +1200,7 @@ TEST_CASE("blockstore.iterate.chunks") std::string SecondChunkData = "This is the data for the second chunk that we will write"; BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + Store.Flush(); std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L'); BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); @@ -1345,6 +1352,89 @@ TEST_CASE("blockstore.reclaim.space") CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount); } +TEST_CASE("blockstore.thread.read.write") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 1088, 1024, {}); + + constexpr size_t ChunkCount = 1000; + constexpr size_t Alignment = 8; + std::vector<IoBuffer> Chunks; + std::vector<IoHash> ChunkHashes; + Chunks.reserve(ChunkCount); + ChunkHashes.reserve(ChunkCount); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + IoBuffer Chunk = CreateChunk(57 + ChunkIndex / 2); + Chunks.push_back(Chunk); + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + std::vector<BlockStoreLocation> ChunkLocations; + ChunkLocations.resize(ChunkCount); + + WorkerThreadPool WorkerPool(8); + std::atomic<size_t> WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + ChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + + WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(ChunkLocations[ChunkIndex]); + CHECK(ChunkBlock); + IoBuffer VerifyChunk = ChunkBlock->GetChunk(ChunkLocations[ChunkIndex].Offset, ChunkLocations[ChunkIndex].Size); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + + std::vector<BlockStoreLocation> SecondChunkLocations; + SecondChunkLocations.resize(ChunkCount); + WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + SecondChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment); + WorkCompleted.fetch_add(1); + }); + WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(ChunkLocations[ChunkIndex]); + CHECK(ChunkBlock); + IoBuffer VerifyChunk = ChunkBlock->GetChunk(ChunkLocations[ChunkIndex].Offset, ChunkLocations[ChunkIndex].Size); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size() * 2) + { + Sleep(1); + } +} + #endif void |