diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-01 10:17:35 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-01 10:17:35 +0200 |
| commit | 7dc31ec99aa3fc2f40000258e45d5d6381403ff8 (patch) | |
| tree | 9c5a986c506128405fe63df3acfbdede4c2a2995 /zenstore/compactcas.cpp | |
| parent | first pass at generic block store with gc (diff) | |
| download | zen-7dc31ec99aa3fc2f40000258e45d5d6381403ff8.tar.xz zen-7dc31ec99aa3fc2f40000258e45d5d6381403ff8.zip | |
threading issues resolved
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 53 |
1 files changed, 38 insertions, 15 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 2b48eb143..84019d7aa 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -283,15 +283,16 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const // This should be a rare occasion and the current flow reduces the time we block for // reads, insert and GC. - BlockStoreLocation Location = m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment); - BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); - const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; - m_CasLog.Append(IndexEntry); - { - RwLock::ExclusiveLockScope _(m_LocationMapLock); - m_LocationMap.emplace(ChunkHash, DiskLocation); - } - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst); + m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [this, &ChunkHash, ChunkSize](const BlockStoreLocation& Location) { + BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); + const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; + m_CasLog.Append(IndexEntry); + { + RwLock::ExclusiveLockScope _(m_LocationMapLock); + m_LocationMap.emplace(ChunkHash, DiskLocation); + } + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst); + }); return CasStore::InsertResult{.New = true}; } @@ -311,10 +312,15 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { return IoBuffer(); } - BlockStoreLocation Location = KeyIt->second.Get(m_PayloadAlignment); - Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); // m_ChunkBlocks[Location.BlockIndex]; + BlockStoreLocation Location = KeyIt->second.Get(m_PayloadAlignment); _.ReleaseNow(); + Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); // m_ChunkBlocks[Location.BlockIndex]; + if (!ChunkBlock) + { + return IoBuffer(); + } + return ChunkBlock->GetChunk(Location.Offset, Location.Size); } @@ -476,7 +482,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; - LocationMap_t LocationMap; + LocationMap_t LocationMap; + BlockStore::ReclaimSnapshotState BlockStoreState; { RwLock::SharedLockScope ___(m_LocationMapLock); Stopwatch Timer; @@ -486,6 +493,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); LocationMap = m_LocationMap; + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); } uint64_t TotalChunkCount = LocationMap.size(); @@ -521,18 +529,23 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { - m_BlockStore.ReclaimSpace(ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); return; } + + auto GetChunkLocations = [] {}; + std::vector<IoHash> DeletedChunks; m_BlockStore.ReclaimSpace( + BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, false, [this, &DeletedChunks, &ChunkIndexToChunkHash, &LocationMap, &ReadBlockTimeUs, &ReadBlockLongestTimeUs]( + uint32_t BlockIndex, const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, - const std::vector<size_t> RemovedChunks) { + const std::vector<size_t>& RemovedChunks) { std::vector<CasDiskIndexEntry> LogEntries; LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); for (const auto& Entry : MovedChunks) @@ -572,6 +585,10 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) } m_LocationMap[Entry.Key] = Entry.Location; } + for (const auto& Entry : m_LocationMap) + { + ZEN_ASSERT(Entry.second.GetBlockIndex() != BlockIndex); + } } }); @@ -2093,7 +2110,13 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { CHECK(Cas.HaveChunk(ChunkHash)); - CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + if (ChunkHash != IoHash::HashBuffer(Cas.FindChunk(ChunkHash))) + { + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + CHECK(Buffer); + IoHash BufferHash = IoHash::HashBuffer(Buffer); + CHECK(ChunkHash == BufferHash); + } WorkCompleted.fetch_add(1); }); } |