aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-03 23:04:45 +0200
committerDan Engelbrecht <[email protected]>2022-05-03 23:04:45 +0200
commita19eee841d7ce0c9c868dced40a6380f55cdb9bd (patch)
tree7dc1a81d9ba159588e845c94c10eb7e391cfed9b
parentunused variable in test fix (diff)
downloadzen-a19eee841d7ce0c9c868dced40a6380f55cdb9bd.tar.xz
zen-a19eee841d7ce0c9c868dced40a6380f55cdb9bd.zip
handle that more than one block can be written to in parallel
-rw-r--r--zenserver/cache/structuredcachestore.cpp39
-rw-r--r--zenstore/blockstore.cpp36
-rw-r--r--zenstore/compactcas.cpp29
-rw-r--r--zenstore/include/zenstore/blockstore.h16
4 files changed, 65 insertions, 55 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 3ba75cd9c..2869191fd 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -1832,25 +1832,26 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
EntryFlags |= DiskLocation::kCompressed;
}
- BlockStoreLocation BlockStoreLocation = m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment);
- DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
- const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location};
- m_SlogFile.Append(DiskIndexEntry);
- m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order::relaxed);
- RwLock::ExclusiveLockScope __(m_IndexLock);
- if (auto It = m_Index.find(HashKey); It != m_Index.end())
- {
- // TODO: should check if write is idempotent and bail out if it is?
- // this would requiring comparing contents on disk unless we add a
- // content hash to the index entry
- IndexEntry& Entry = It.value();
- Entry.Location = Location;
- Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
- }
- else
- {
- m_Index.insert({HashKey, {Location, GcClock::TickCount()}});
- }
+ m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](BlockStoreLocation BlockStoreLocation) {
+ DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
+ const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location};
+ m_SlogFile.Append(DiskIndexEntry);
+ m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order::relaxed);
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ // this would requiring comparing contents on disk unless we add a
+ // content hash to the index entry
+ IndexEntry& Entry = It.value();
+ Entry.Location = Location;
+ Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
+ }
+ else
+ {
+ m_Index.insert({HashKey, {Location, GcClock::TickCount()}});
+ }
+ });
}
//////////////////////////////////////////////////////////////////////////
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index cb22551b9..54a8eb9df 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -207,8 +207,8 @@ BlockStore::Close()
m_BlocksBasePath.clear();
}
-BlockStoreLocation
-BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment)
+void
+BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, WriteChunkCallback Callback)
{
ZEN_ASSERT(Data != nullptr);
ZEN_ASSERT(Size > 0u);
@@ -246,11 +246,17 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment)
uint64_t InsertOffset = m_CurrentInsertOffset;
m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
+ m_ActiveWriteBlocks.push_back(WriteBlockIndex);
InsertLock.ReleaseNow();
WriteBlock->Write(Data, Size, InsertOffset);
- return {.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size};
+ Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size});
+
+ {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex));
+ }
}
BlockStore::ReclaimSnapshotState
@@ -258,8 +264,11 @@ BlockStore::GetReclaimSnapshotState()
{
ReclaimSnapshotState State;
RwLock::ExclusiveLockScope _(m_InsertLock);
- State.ExcludeBlockIndex = m_WriteBlock ? m_WriteBlockIndex.load(std::memory_order_acquire) : 0xffffffffu;
- State.BlockCount = m_ChunkBlocks.size();
+ for (uint32_t BlockIndex : m_ActiveWriteBlocks)
+ {
+ State.m_ActiveWriteBlocks.insert(BlockIndex);
+ }
+ State.BlockCount = m_ChunkBlocks.size();
_.ReleaseNow();
return State;
}
@@ -357,7 +366,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
{
const BlockStoreLocation& Location = ChunkLocations[Index];
OldTotalSize += Location.Size;
- if (Location.BlockIndex == Snapshot.ExcludeBlockIndex)
+ if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex))
{
continue;
}
@@ -1041,7 +1050,8 @@ TEST_CASE("blockstore.blockfile")
namespace {
BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment)
{
- BlockStoreLocation Location = Store.WriteChunk(String.data(), String.length(), PayloadAlignment);
+ BlockStoreLocation Location;
+ Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; });
CHECK(Location.Size == String.length());
return Location;
};
@@ -1254,7 +1264,7 @@ TEST_CASE("blockstore.reclaim.space")
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
IoBuffer Chunk = CreateChunk(57 + ChunkIndex);
- ChunkLocations.push_back(Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment));
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); });
ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
}
@@ -1381,8 +1391,8 @@ TEST_CASE("blockstore.thread.read.write")
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- ChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment);
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; });
WorkCompleted.fetch_add(1);
});
}
@@ -1415,8 +1425,10 @@ TEST_CASE("blockstore.thread.read.write")
for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
- IoBuffer& Chunk = Chunks[ChunkIndex];
- SecondChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment);
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) {
+ SecondChunkLocations[ChunkIndex] = L;
+ });
WorkCompleted.fetch_add(1);
});
WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 7cc742beb..cc0e2241c 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -250,15 +250,16 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
// This should be a rare occasion and the current flow reduces the time we block for
// reads, insert and GC.
- BlockStoreLocation Location = m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment);
- BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
- const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
- m_CasLog.Append(IndexEntry);
- {
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
- m_LocationMap.emplace(ChunkHash, DiskLocation);
- }
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) {
+ BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
+ const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
+ m_CasLog.Append(IndexEntry);
+ {
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ m_LocationMap.emplace(ChunkHash, DiskLocation);
+ }
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ });
return CasStore::InsertResult{.New = true};
}
@@ -1685,7 +1686,7 @@ TEST_CASE("compactcas.legacyconversion")
}
}
-TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
+TEST_CASE("compactcas.threadedinsert")
{
// for (uint32_t i = 0; i < 100; ++i)
{
@@ -1887,13 +1888,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
{
ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
CHECK(Cas.HaveChunk(ChunkHash));
- if (ChunkHash != IoHash::HashBuffer(Cas.FindChunk(ChunkHash)))
- {
- IoBuffer Buffer = Cas.FindChunk(ChunkHash);
- CHECK(Buffer);
- IoHash BufferHash = IoHash::HashBuffer(Buffer);
- CHECK(ChunkHash == BufferHash);
- }
+ CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
WorkCompleted.fetch_add(1);
});
}
diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h
index e330cc080..9edfc36e8 100644
--- a/zenstore/include/zenstore/blockstore.h
+++ b/zenstore/include/zenstore/blockstore.h
@@ -110,8 +110,8 @@ class BlockStore
public:
struct ReclaimSnapshotState
{
- size_t ExcludeBlockIndex;
- size_t BlockCount;
+ std::unordered_set<uint32_t> m_ActiveWriteBlocks;
+ size_t BlockCount;
};
typedef std::vector<std::pair<size_t, BlockStoreLocation>> MovedChunksArray;
@@ -123,6 +123,7 @@ public:
typedef std::function<void(size_t ChunkIndex, Ref<BlockStoreFile> BlockFile, uint64_t Offset, uint64_t Size)>
IterateChunksLargeSizeCallback;
typedef std::function<void(const MovedChunksArray& MovedChunks)> SplitCallback;
+ typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback;
void Initialize(const std::filesystem::path& BlocksBasePath,
uint64_t MaxBlockSize,
@@ -130,7 +131,7 @@ public:
const std::vector<BlockStoreLocation>& KnownLocations);
void Close();
- BlockStoreLocation WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment);
+ void WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, WriteChunkCallback Callback);
Ref<BlockStoreFile> GetChunkBlock(const BlockStoreLocation& Location);
void Flush();
@@ -164,10 +165,11 @@ public:
private:
std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks;
- RwLock m_InsertLock; // used to serialize inserts
- Ref<BlockStoreFile> m_WriteBlock;
- std::uint64_t m_CurrentInsertOffset = 0;
- std::atomic_uint32_t m_WriteBlockIndex{};
+ RwLock m_InsertLock; // used to serialize inserts
+ Ref<BlockStoreFile> m_WriteBlock;
+ std::uint64_t m_CurrentInsertOffset = 0;
+ std::atomic_uint32_t m_WriteBlockIndex{};
+ std::vector<uint32_t> m_ActiveWriteBlocks;
uint64_t m_MaxBlockSize = 1u << 28;
uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1;