diff options
| author | Stefan Boberg <[email protected]> | 2021-09-20 23:11:35 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-09-20 23:11:35 +0200 |
| commit | aaefc249d49d8f4ce30cd4a5230f6208ac5b5b73 (patch) | |
| tree | ae76c8f57c30d93ce8e196114f74a324c4fdfdbc | |
| parent | Made mimalloc enable/disable visible in xmake file (diff) | |
| download | zen-aaefc249d49d8f4ce30cd4a5230f6208ac5b5b73.tar.xz zen-aaefc249d49d8f4ce30cd4a5230f6208ac5b5b73.zip | |
Implemented CID store scrubbing
Added support for tombstones in commit log to support scrubbing
| -rw-r--r-- | zenstore/cidstore.cpp | 75 |
1 files changed, 73 insertions, 2 deletions
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 6dd33efb2..5e266f9d3 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -73,6 +73,8 @@ struct CidStore::CidState bool ContainsChunk(const IoHash& DecompressedId) { RwLock::SharedLockScope _(m_Lock); + // Note that we do not check CAS here. This is optimistic but usually + // what we want. return m_CidMap.find(DecompressedId) != m_CidMap.end(); } @@ -85,13 +87,82 @@ struct CidStore::CidState m_LogFile.Open(SlogPath, IsNew); - m_LogFile.Replay([&](const IndexEntry& Ie) { m_CidMap.insert_or_assign(Ie.Uncompressed, Ie.Compressed); }); + m_LogFile.Replay([&](const IndexEntry& Ie) { + if (Ie.Compressed != IoHash::Zero) + { + // Update + m_CidMap.insert_or_assign(Ie.Uncompressed, Ie.Compressed); + } + else + { + // Tombstone + m_CidMap.erase(Ie.Uncompressed); + } + }); ZEN_DEBUG("CID index initialized: {} entries found", m_CidMap.size()); } void Flush() { m_LogFile.Flush(); } - void Scrub(ScrubContext& Ctx) { ZEN_UNUSED(Ctx); } + + void Scrub(ScrubContext& Ctx) + { + CasChunkSet ChunkSet; + + { + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_CidMap) + { + ChunkSet.AddChunk(Kv.second); + } + } + + m_CasStore.FilterChunks(ChunkSet); + + if (ChunkSet.IsEmpty()) + { + // All good - we have all the chunks + return; + } + + ZEN_ERROR("Scrubbing found that {} cid mappings mapped to non-existent CAS chunks", ChunkSet.GetChunkSet().size()); + + // Erase all mappings to chunks which are not present in the underlying CAS store + // we do this by removing mappings from the in-memory lookup structure and also + // by emitting tombstone records to the commit log + + const auto& MissingChunks = ChunkSet.GetChunkSet(); + std::vector<IoHash> BadChunks; + { + RwLock::SharedLockScope _(m_Lock); + + for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;) + { + if (auto MissingIt = MissingChunks.find(It->second); MissingIt != MissingChunks.end()) + { + const IoHash& BadHash = It->first; + + // Log a tombstone record + m_LogFile.Append({.Uncompressed = BadHash, .Compressed = IoHash::Zero}); + + BadChunks.push_back(BadHash); + + It = m_CidMap.erase(It); + } + else + { + ++It; + } + } + } + + m_LogFile.Flush(); + + // TODO: Should compute a snapshot index here + + Ctx.ReportBadChunks(BadChunks); + } }; ////////////////////////////////////////////////////////////////////////// |