aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-01 10:17:35 +0200
committerDan Engelbrecht <[email protected]>2022-05-01 10:17:35 +0200
commit7dc31ec99aa3fc2f40000258e45d5d6381403ff8 (patch)
tree9c5a986c506128405fe63df3acfbdede4c2a2995 /zenstore/compactcas.cpp
parentfirst pass at generic block store with gc (diff)
downloadzen-7dc31ec99aa3fc2f40000258e45d5d6381403ff8.tar.xz
zen-7dc31ec99aa3fc2f40000258e45d5d6381403ff8.zip
threading issues resolved
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp53
1 files changed, 38 insertions, 15 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 2b48eb143..84019d7aa 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -283,15 +283,16 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
// This should be a rare occasion and the current flow reduces the time we block for
// reads, insert and GC.
- BlockStoreLocation Location = m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment);
- BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
- const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
- m_CasLog.Append(IndexEntry);
- {
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
- m_LocationMap.emplace(ChunkHash, DiskLocation);
- }
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst);
+ m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [this, &ChunkHash, ChunkSize](const BlockStoreLocation& Location) {
+ BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
+ const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
+ m_CasLog.Append(IndexEntry);
+ {
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ m_LocationMap.emplace(ChunkHash, DiskLocation);
+ }
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst);
+ });
return CasStore::InsertResult{.New = true};
}
@@ -311,10 +312,15 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
return IoBuffer();
}
- BlockStoreLocation Location = KeyIt->second.Get(m_PayloadAlignment);
- Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); // m_ChunkBlocks[Location.BlockIndex];
+ BlockStoreLocation Location = KeyIt->second.Get(m_PayloadAlignment);
_.ReleaseNow();
+ Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); // m_ChunkBlocks[Location.BlockIndex];
+ if (!ChunkBlock)
+ {
+ return IoBuffer();
+ }
+
return ChunkBlock->GetChunk(Location.Offset, Location.Size);
}
@@ -476,7 +482,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
uint64_t ReadBlockTimeUs = 0;
uint64_t ReadBlockLongestTimeUs = 0;
- LocationMap_t LocationMap;
+ LocationMap_t LocationMap;
+ BlockStore::ReclaimSnapshotState BlockStoreState;
{
RwLock::SharedLockScope ___(m_LocationMapLock);
Stopwatch Timer;
@@ -486,6 +493,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
LocationMap = m_LocationMap;
+ BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
}
uint64_t TotalChunkCount = LocationMap.size();
@@ -521,18 +529,23 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
if (!PerformDelete)
{
- m_BlockStore.ReclaimSpace(ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
+ m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
return;
}
+
+ auto GetChunkLocations = [] {};
+
std::vector<IoHash> DeletedChunks;
m_BlockStore.ReclaimSpace(
+ BlockStoreState,
ChunkLocations,
KeepChunkIndexes,
m_PayloadAlignment,
false,
[this, &DeletedChunks, &ChunkIndexToChunkHash, &LocationMap, &ReadBlockTimeUs, &ReadBlockLongestTimeUs](
+ uint32_t BlockIndex,
const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks,
- const std::vector<size_t> RemovedChunks) {
+ const std::vector<size_t>& RemovedChunks) {
std::vector<CasDiskIndexEntry> LogEntries;
LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
for (const auto& Entry : MovedChunks)
@@ -572,6 +585,10 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
}
m_LocationMap[Entry.Key] = Entry.Location;
}
+ for (const auto& Entry : m_LocationMap)
+ {
+ ZEN_ASSERT(Entry.second.GetBlockIndex() != BlockIndex);
+ }
}
});
@@ -2093,7 +2110,13 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
{
ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
CHECK(Cas.HaveChunk(ChunkHash));
- CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ if (ChunkHash != IoHash::HashBuffer(Cas.FindChunk(ChunkHash)))
+ {
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ CHECK(Buffer);
+ IoHash BufferHash = IoHash::HashBuffer(Buffer);
+ CHECK(ChunkHash == BufferHash);
+ }
WorkCompleted.fetch_add(1);
});
}