aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-01 10:17:35 +0200
committerDan Engelbrecht <[email protected]>2022-05-01 10:17:35 +0200
commit7dc31ec99aa3fc2f40000258e45d5d6381403ff8 (patch)
tree9c5a986c506128405fe63df3acfbdede4c2a2995
parentfirst pass at generic block store with gc (diff)
downloadzen-7dc31ec99aa3fc2f40000258e45d5d6381403ff8.tar.xz
zen-7dc31ec99aa3fc2f40000258e45d5d6381403ff8.zip
threading issues resolved
-rw-r--r--zenstore/blockstore.cpp113
-rw-r--r--zenstore/compactcas.cpp53
-rw-r--r--zenstore/include/zenstore/blockstore.h54
3 files changed, 120 insertions, 100 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index a897ed902..4cf3c6486 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -209,8 +209,8 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
}
}
-BlockStoreLocation
-BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment)
+void
+BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, WriteCompleteCallback Callback)
{
RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
@@ -243,24 +243,30 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment)
uint64_t InsertOffset = m_CurrentInsertOffset;
m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
+ m_ActiveWriteBlockIndexes.push_back(WriteBlockIndex);
InsertLock.ReleaseNow();
- BlockStoreLocation Location{.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size};
WriteBlock->Write(Data, Size, InsertOffset);
- return Location;
+ Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size});
+
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ m_ActiveWriteBlockIndexes.erase(std::find(m_ActiveWriteBlockIndexes.begin(), m_ActiveWriteBlockIndexes.end(), WriteBlockIndex));
}
-/*
-IoBuffer
-BlockStore::ReadChunk(const BlockStoreLocation& Location)
+BlockStore::ReclaimSnapshotState
+BlockStore::GetReclaimSnapshotState()
{
- RwLock::SharedLockScope InsertLock(m_InsertLock);
- Ref<BlockStoreFile> ChunkBlock = m_ChunkBlocks[Location.BlockIndex];
- InsertLock.ReleaseNow();
- return ChunkBlock->GetChunk(Location.Offset, Location.Size);
+ ReclaimSnapshotState State;
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ for (uint32_t BlockIndex : m_ActiveWriteBlockIndexes)
+ {
+ State.ExcludeBlockIndexes.insert(BlockIndex);
+ }
+ State.BlockCount = m_ChunkBlocks.size();
+ _.ReleaseNow();
+ return State;
}
-*/
Ref<BlockStoreFile>
BlockStore::GetChunkBlock(const BlockStoreLocation& Location)
@@ -283,9 +289,9 @@ BlockStore::Flush()
}
}
-// TODO: Almost there - some bug remain and API might need tweaking
void
-BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
+BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
+ const std::vector<BlockStoreLocation>& ChunkLocations,
const std::vector<size_t>& KeepChunkIndexes,
uint64_t PayloadAlignment,
bool DryRun,
@@ -336,41 +342,22 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
NiceBytes(OldTotalSize));
});
- size_t BlockCount = 0;
- uint64_t ExcludeBlockIndex = 0x800000000ull;
- {
- RwLock::ExclusiveLockScope __(m_InsertLock);
- if (m_WriteBlock)
- {
- ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- }
- BlockCount = m_ChunkBlocks.size();
- }
-
- std::unordered_map<size_t, BlockStoreLocation> LocationLookup;
- LocationLookup.reserve(TotalChunkCount);
+ size_t BlockCount = Snapshot.BlockCount;
std::unordered_set<size_t> KeepChunkMap;
KeepChunkMap.reserve(KeepChunkIndexes.size());
for (size_t KeepChunkIndex : KeepChunkIndexes)
{
- const BlockStoreLocation& Location = ChunkLocations[KeepChunkIndex];
- if (Location.BlockIndex == ExcludeBlockIndex)
- {
- continue;
- }
KeepChunkMap.insert(KeepChunkIndex);
}
- std::unordered_set<size_t> DeleteChunkMap;
- DeleteChunkMap.reserve(ChunkLocations.size() - KeepChunkIndexes.size());
std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
- std::vector<std::vector<size_t>> KeepChunks;
- std::vector<std::vector<size_t>> DeleteChunks;
+ std::vector<std::vector<size_t>> BlockKeepChunks;
+ std::vector<std::vector<size_t>> BlockDeleteChunks;
BlockIndexToChunkMapIndex.reserve(BlockCount);
- KeepChunks.reserve(BlockCount);
- DeleteChunks.reserve(BlockCount);
+ BlockKeepChunks.reserve(BlockCount);
+ BlockDeleteChunks.reserve(BlockCount);
size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
size_t DeleteCount = 0;
@@ -378,8 +365,7 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
for (size_t Index = 0; Index < TotalChunkCount; ++Index)
{
const BlockStoreLocation& Location = ChunkLocations[Index];
- LocationLookup[Index] = Location;
- if (Location.BlockIndex == ExcludeBlockIndex)
+ if (Snapshot.ExcludeBlockIndexes.contains(Location.BlockIndex))
{
continue;
}
@@ -388,12 +374,12 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
size_t ChunkMapIndex = 0;
if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
{
- ChunkMapIndex = KeepChunks.size();
+ ChunkMapIndex = BlockKeepChunks.size();
BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex;
- KeepChunks.resize(ChunkMapIndex + 1);
- KeepChunks.back().reserve(GuesstimateCountPerBlock);
- DeleteChunks.resize(ChunkMapIndex + 1);
- DeleteChunks.back().reserve(GuesstimateCountPerBlock);
+ BlockKeepChunks.resize(ChunkMapIndex + 1);
+ BlockKeepChunks.back().reserve(GuesstimateCountPerBlock);
+ BlockDeleteChunks.resize(ChunkMapIndex + 1);
+ BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock);
}
else
{
@@ -402,12 +388,12 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
if (KeepChunkMap.contains(Index))
{
- std::vector<size_t>& IndexMap = KeepChunks[ChunkMapIndex];
+ std::vector<size_t>& IndexMap = BlockKeepChunks[ChunkMapIndex];
IndexMap.push_back(Index);
NewTotalSize += Location.Size;
continue;
}
- std::vector<size_t>& IndexMap = DeleteChunks[ChunkMapIndex];
+ std::vector<size_t>& IndexMap = BlockDeleteChunks[ChunkMapIndex];
IndexMap.push_back(Index);
DeleteCount++;
}
@@ -418,7 +404,7 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
{
uint32_t BlockIndex = Entry.first;
size_t ChunkMapIndex = Entry.second;
- const std::vector<size_t>& ChunkMap = DeleteChunks[ChunkMapIndex];
+ const std::vector<size_t>& ChunkMap = BlockDeleteChunks[ChunkMapIndex];
if (ChunkMap.empty())
{
continue;
@@ -438,9 +424,6 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
return;
}
- std::unordered_map<size_t, BlockStoreLocation> MovedChunks;
- std::vector<size_t> RemovedChunks;
-
Ref<BlockStoreFile> NewBlockFile;
uint64_t WriteOffset = 0;
uint32_t NewBlockIndex = 0;
@@ -456,19 +439,16 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
ZEN_ASSERT(OldBlockFile);
}
- const std::vector<size_t>& KeepMap = KeepChunks[ChunkMapIndex];
+ const std::vector<size_t>& KeepMap = BlockKeepChunks[ChunkMapIndex];
if (KeepMap.empty())
{
- const std::vector<size_t>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ const std::vector<size_t>& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
for (size_t DeleteIndex : DeleteMap)
{
- RemovedChunks.push_back(DeleteIndex);
DeletedSize += ChunkLocations[DeleteIndex].Size;
- DeletedCount++;
}
- Callback(MovedChunks, RemovedChunks);
- MovedChunks.clear();
- RemovedChunks.clear();
+ Callback(BlockIndex, {}, DeleteMap);
+ DeletedCount += DeleteMap.size();
{
RwLock::ExclusiveLockScope _i(m_InsertLock);
Stopwatch Timer;
@@ -489,7 +469,8 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
continue;
}
- std::vector<uint8_t> Chunk;
+ std::unordered_map<size_t, BlockStoreLocation> MovedChunks;
+ std::vector<uint8_t> Chunk;
for (const size_t& ChunkIndex : KeepMap)
{
const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
@@ -506,9 +487,9 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
NewBlockFile->Flush();
}
{
- Callback(MovedChunks, RemovedChunks);
+ Callback(0xfffffffful, MovedChunks, {});
+ MovedCount += KeepMap.size();
MovedChunks.clear();
- RemovedChunks.clear();
RwLock::ExclusiveLockScope __(m_InsertLock);
Stopwatch Timer;
const auto ___ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
@@ -572,7 +553,6 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
MovedChunks[ChunkIndex] = {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()};
WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment);
- MovedCount++;
}
Chunk.clear();
if (NewBlockFile)
@@ -582,17 +562,16 @@ BlockStore::ReclaimSpace(const std::vector<BlockStoreLocation>& ChunkLocations,
NewBlockFile = {};
}
- const std::vector<size_t>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ const std::vector<size_t>& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
for (size_t DeleteIndex : DeleteMap)
{
- RemovedChunks.push_back(DeleteIndex);
DeletedSize += ChunkLocations[DeleteIndex].Size;
- DeletedCount++;
}
- Callback(MovedChunks, RemovedChunks);
+ Callback(BlockIndex, MovedChunks, DeleteMap);
+ MovedCount += KeepMap.size();
+ DeletedCount += DeleteMap.size();
MovedChunks.clear();
- RemovedChunks.clear();
{
RwLock::ExclusiveLockScope __(m_InsertLock);
Stopwatch Timer;
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 2b48eb143..84019d7aa 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -283,15 +283,16 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
// This should be a rare occasion and the current flow reduces the time we block for
// reads, insert and GC.
- BlockStoreLocation Location = m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment);
- BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
- const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
- m_CasLog.Append(IndexEntry);
- {
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
- m_LocationMap.emplace(ChunkHash, DiskLocation);
- }
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst);
+ m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [this, &ChunkHash, ChunkSize](const BlockStoreLocation& Location) {
+ BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
+ const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
+ m_CasLog.Append(IndexEntry);
+ {
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ m_LocationMap.emplace(ChunkHash, DiskLocation);
+ }
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst);
+ });
return CasStore::InsertResult{.New = true};
}
@@ -311,10 +312,15 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
return IoBuffer();
}
- BlockStoreLocation Location = KeyIt->second.Get(m_PayloadAlignment);
- Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); // m_ChunkBlocks[Location.BlockIndex];
+ BlockStoreLocation Location = KeyIt->second.Get(m_PayloadAlignment);
_.ReleaseNow();
+ Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); // m_ChunkBlocks[Location.BlockIndex];
+ if (!ChunkBlock)
+ {
+ return IoBuffer();
+ }
+
return ChunkBlock->GetChunk(Location.Offset, Location.Size);
}
@@ -476,7 +482,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
uint64_t ReadBlockTimeUs = 0;
uint64_t ReadBlockLongestTimeUs = 0;
- LocationMap_t LocationMap;
+ LocationMap_t LocationMap;
+ BlockStore::ReclaimSnapshotState BlockStoreState;
{
RwLock::SharedLockScope ___(m_LocationMapLock);
Stopwatch Timer;
@@ -486,6 +493,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
LocationMap = m_LocationMap;
+ BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
}
uint64_t TotalChunkCount = LocationMap.size();
@@ -521,18 +529,23 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
if (!PerformDelete)
{
- m_BlockStore.ReclaimSpace(ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
+ m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
return;
}
+
+ auto GetChunkLocations = [] {};
+
std::vector<IoHash> DeletedChunks;
m_BlockStore.ReclaimSpace(
+ BlockStoreState,
ChunkLocations,
KeepChunkIndexes,
m_PayloadAlignment,
false,
[this, &DeletedChunks, &ChunkIndexToChunkHash, &LocationMap, &ReadBlockTimeUs, &ReadBlockLongestTimeUs](
+ uint32_t BlockIndex,
const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks,
- const std::vector<size_t> RemovedChunks) {
+ const std::vector<size_t>& RemovedChunks) {
std::vector<CasDiskIndexEntry> LogEntries;
LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
for (const auto& Entry : MovedChunks)
@@ -572,6 +585,10 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
}
m_LocationMap[Entry.Key] = Entry.Location;
}
+ for (const auto& Entry : m_LocationMap)
+ {
+ ZEN_ASSERT(Entry.second.GetBlockIndex() != BlockIndex);
+ }
}
});
@@ -2093,7 +2110,13 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
{
ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
CHECK(Cas.HaveChunk(ChunkHash));
- CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ if (ChunkHash != IoHash::HashBuffer(Cas.FindChunk(ChunkHash)))
+ {
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ CHECK(Buffer);
+ IoHash BufferHash = IoHash::HashBuffer(Buffer);
+ CHECK(ChunkHash == BufferHash);
+ }
WorkCompleted.fetch_add(1);
});
}
diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h
index 4dd6e5289..084142636 100644
--- a/zenstore/include/zenstore/blockstore.h
+++ b/zenstore/include/zenstore/blockstore.h
@@ -6,6 +6,8 @@
#include <zencore/zencore.h>
#include <zenstore/basicfile.h>
+#include <unordered_set>
+
namespace zen {
//////////////////////////////////////////////////////////////////////////
@@ -108,31 +110,47 @@ private:
class BlockStore
{
public:
- void Initialize(const std::filesystem::path& BlocksBasePath,
- uint64_t MaxBlockSize,
- uint64_t MaxBlockCount,
- const std::vector<BlockStoreLocation>& KnownLocations);
- BlockStoreLocation WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment);
+ struct ReclaimSnapshotState
+ {
+ std::unordered_set<uint32_t> ExcludeBlockIndexes;
+ size_t BlockCount;
+ };
+ typedef std::function<void(uint32_t BlockIndex,
+ const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks,
+ const std::vector<size_t>& RemovedChunks)>
+ ReclaimCallback;
+ typedef std::function<void(const BlockStoreLocation& Location)> WriteCompleteCallback;
+
+ void Initialize(const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ const std::vector<BlockStoreLocation>& KnownLocations);
+ void WriteChunk(
+ const void* Data,
+ uint64_t Size,
+ uint64_t Alignment,
+ WriteCompleteCallback Callback = [](const BlockStoreLocation&) {});
Ref<BlockStoreFile> GetChunkBlock(const BlockStoreLocation& Location);
void Flush();
- typedef std::function<void(const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, const std::vector<size_t> RemovedChunks)>
- ReclaimCallback;
-
- void ReclaimSpace(
- const std::vector<BlockStoreLocation>& ChunkLocations,
- const std::vector<size_t>& KeepChunkIndexes,
- uint64_t PayloadAlignment,
- bool DryRun,
- const ReclaimCallback& Callback = [](const std::unordered_map<size_t, BlockStoreLocation>&, const std::vector<size_t>&) {});
+ ReclaimSnapshotState GetReclaimSnapshotState();
+ void ReclaimSpace(
+ const ReclaimSnapshotState& Snapshot,
+ const std::vector<BlockStoreLocation>& ChunkLocations,
+ const std::vector<size_t>& KeepChunkIndexes,
+ uint64_t PayloadAlignment,
+ bool DryRun,
+ const ReclaimCallback& Callback = [](uint32_t, const std::unordered_map<size_t, BlockStoreLocation>&, const std::vector<size_t>&) {
+ });
private:
std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks;
- RwLock m_InsertLock; // used to serialize inserts
- Ref<BlockStoreFile> m_WriteBlock;
- std::uint64_t m_CurrentInsertOffset = 0;
- std::atomic_uint32_t m_WriteBlockIndex{};
+ RwLock m_InsertLock; // used to serialize inserts
+ Ref<BlockStoreFile> m_WriteBlock;
+ std::uint64_t m_CurrentInsertOffset = 0;
+ std::atomic_uint32_t m_WriteBlockIndex{};
+ std::vector<uint32_t> m_ActiveWriteBlockIndexes;
uint64_t m_MaxBlockSize = 1u << 28;
uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1;