diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-03 23:04:45 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-03 23:04:45 +0200 |
| commit | a19eee841d7ce0c9c868dced40a6380f55cdb9bd (patch) | |
| tree | 7dc1a81d9ba159588e845c94c10eb7e391cfed9b | |
| parent | unused variable in test fix (diff) | |
| download | zen-a19eee841d7ce0c9c868dced40a6380f55cdb9bd.tar.xz zen-a19eee841d7ce0c9c868dced40a6380f55cdb9bd.zip | |
handle that more than one block can be written to in parallel
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 39 | ||||
| -rw-r--r-- | zenstore/blockstore.cpp | 36 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 29 | ||||
| -rw-r--r-- | zenstore/include/zenstore/blockstore.h | 16 |
4 files changed, 65 insertions, 55 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 3ba75cd9c..2869191fd 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1832,25 +1832,26 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const EntryFlags |= DiskLocation::kCompressed; } - BlockStoreLocation BlockStoreLocation = m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment); - DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); - const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; - m_SlogFile.Append(DiskIndexEntry); - m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order::relaxed); - RwLock::ExclusiveLockScope __(m_IndexLock); - if (auto It = m_Index.find(HashKey); It != m_Index.end()) - { - // TODO: should check if write is idempotent and bail out if it is? - // this would requiring comparing contents on disk unless we add a - // content hash to the index entry - IndexEntry& Entry = It.value(); - Entry.Location = Location; - Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); - } - else - { - m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); - } + m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](BlockStoreLocation BlockStoreLocation) { + DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); + const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; + m_SlogFile.Append(DiskIndexEntry); + m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order::relaxed); + RwLock::ExclusiveLockScope __(m_IndexLock); + if (auto It = m_Index.find(HashKey); It != m_Index.end()) + { + // TODO: should check if write is idempotent and bail out if it is? + // this would requiring comparing contents on disk unless we add a + // content hash to the index entry + IndexEntry& Entry = It.value(); + Entry.Location = Location; + Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + } + else + { + m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); + } + }); } ////////////////////////////////////////////////////////////////////////// diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp index cb22551b9..54a8eb9df 100644 --- a/zenstore/blockstore.cpp +++ b/zenstore/blockstore.cpp @@ -207,8 +207,8 @@ BlockStore::Close() m_BlocksBasePath.clear(); } -BlockStoreLocation -BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment) +void +BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, WriteChunkCallback Callback) { ZEN_ASSERT(Data != nullptr); ZEN_ASSERT(Size > 0u); @@ -246,11 +246,17 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment) uint64_t InsertOffset = m_CurrentInsertOffset; m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); Ref<BlockStoreFile> WriteBlock = m_WriteBlock; + m_ActiveWriteBlocks.push_back(WriteBlockIndex); InsertLock.ReleaseNow(); WriteBlock->Write(Data, Size, InsertOffset); - return {.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}; + Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}); + + { + RwLock::ExclusiveLockScope _(m_InsertLock); + m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); + } } BlockStore::ReclaimSnapshotState @@ -258,8 +264,11 @@ BlockStore::GetReclaimSnapshotState() { ReclaimSnapshotState State; RwLock::ExclusiveLockScope _(m_InsertLock); - State.ExcludeBlockIndex = m_WriteBlock ? m_WriteBlockIndex.load(std::memory_order_acquire) : 0xffffffffu; - State.BlockCount = m_ChunkBlocks.size(); + for (uint32_t BlockIndex : m_ActiveWriteBlocks) + { + State.m_ActiveWriteBlocks.insert(BlockIndex); + } + State.BlockCount = m_ChunkBlocks.size(); _.ReleaseNow(); return State; } @@ -357,7 +366,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, { const BlockStoreLocation& Location = ChunkLocations[Index]; OldTotalSize += Location.Size; - if (Location.BlockIndex == Snapshot.ExcludeBlockIndex) + if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex)) { continue; } @@ -1041,7 +1050,8 @@ TEST_CASE("blockstore.blockfile") namespace { BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment) { - BlockStoreLocation Location = Store.WriteChunk(String.data(), String.length(), PayloadAlignment); + BlockStoreLocation Location; + Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; }); CHECK(Location.Size == String.length()); return Location; }; @@ -1254,7 +1264,7 @@ TEST_CASE("blockstore.reclaim.space") for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { IoBuffer Chunk = CreateChunk(57 + ChunkIndex); - ChunkLocations.push_back(Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment)); + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); }); ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } @@ -1381,8 +1391,8 @@ TEST_CASE("blockstore.thread.read.write") for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - ChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment); + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); WorkCompleted.fetch_add(1); }); } @@ -1415,8 +1425,10 @@ TEST_CASE("blockstore.thread.read.write") for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) { WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { - IoBuffer& Chunk = Chunks[ChunkIndex]; - SecondChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment); + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + SecondChunkLocations[ChunkIndex] = L; + }); WorkCompleted.fetch_add(1); }); WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 7cc742beb..cc0e2241c 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -250,15 +250,16 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const // This should be a rare occasion and the current flow reduces the time we block for // reads, insert and GC. - BlockStoreLocation Location = m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment); - BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); - const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; - m_CasLog.Append(IndexEntry); - { - RwLock::ExclusiveLockScope _(m_LocationMapLock); - m_LocationMap.emplace(ChunkHash, DiskLocation); - } - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) { + BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); + const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; + m_CasLog.Append(IndexEntry); + { + RwLock::ExclusiveLockScope _(m_LocationMapLock); + m_LocationMap.emplace(ChunkHash, DiskLocation); + } + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + }); return CasStore::InsertResult{.New = true}; } @@ -1685,7 +1686,7 @@ TEST_CASE("compactcas.legacyconversion") } } -TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) +TEST_CASE("compactcas.threadedinsert") { // for (uint32_t i = 0; i < 100; ++i) { @@ -1887,13 +1888,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { CHECK(Cas.HaveChunk(ChunkHash)); - if (ChunkHash != IoHash::HashBuffer(Cas.FindChunk(ChunkHash))) - { - IoBuffer Buffer = Cas.FindChunk(ChunkHash); - CHECK(Buffer); - IoHash BufferHash = IoHash::HashBuffer(Buffer); - CHECK(ChunkHash == BufferHash); - } + CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); WorkCompleted.fetch_add(1); }); } diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h index e330cc080..9edfc36e8 100644 --- a/zenstore/include/zenstore/blockstore.h +++ b/zenstore/include/zenstore/blockstore.h @@ -110,8 +110,8 @@ class BlockStore public: struct ReclaimSnapshotState { - size_t ExcludeBlockIndex; - size_t BlockCount; + std::unordered_set<uint32_t> m_ActiveWriteBlocks; + size_t BlockCount; }; typedef std::vector<std::pair<size_t, BlockStoreLocation>> MovedChunksArray; @@ -123,6 +123,7 @@ public: typedef std::function<void(size_t ChunkIndex, Ref<BlockStoreFile> BlockFile, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback; typedef std::function<void(const MovedChunksArray& MovedChunks)> SplitCallback; + typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback; void Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, @@ -130,7 +131,7 @@ public: const std::vector<BlockStoreLocation>& KnownLocations); void Close(); - BlockStoreLocation WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment); + void WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, WriteChunkCallback Callback); Ref<BlockStoreFile> GetChunkBlock(const BlockStoreLocation& Location); void Flush(); @@ -164,10 +165,11 @@ public: private: std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks; - RwLock m_InsertLock; // used to serialize inserts - Ref<BlockStoreFile> m_WriteBlock; - std::uint64_t m_CurrentInsertOffset = 0; - std::atomic_uint32_t m_WriteBlockIndex{}; + RwLock m_InsertLock; // used to serialize inserts + Ref<BlockStoreFile> m_WriteBlock; + std::uint64_t m_CurrentInsertOffset = 0; + std::atomic_uint32_t m_WriteBlockIndex{}; + std::vector<uint32_t> m_ActiveWriteBlocks; uint64_t m_MaxBlockSize = 1u << 28; uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1; |