aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-09-20 23:11:35 +0200
committerStefan Boberg <[email protected]>2021-09-20 23:11:35 +0200
commitaaefc249d49d8f4ce30cd4a5230f6208ac5b5b73 (patch)
treeae76c8f57c30d93ce8e196114f74a324c4fdfdbc
parentMade mimalloc enable/disable visible in xmake file (diff)
downloadzen-aaefc249d49d8f4ce30cd4a5230f6208ac5b5b73.tar.xz
zen-aaefc249d49d8f4ce30cd4a5230f6208ac5b5b73.zip
Implemented CID store scrubbing
Added support for tombstones in commit log to support scrubbing
-rw-r--r--zenstore/cidstore.cpp75
1 files changed, 73 insertions, 2 deletions
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index 6dd33efb2..5e266f9d3 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -73,6 +73,8 @@ struct CidStore::CidState
bool ContainsChunk(const IoHash& DecompressedId)
{
RwLock::SharedLockScope _(m_Lock);
+ // Note that we do not check CAS here. This is optimistic but usually
+ // what we want.
return m_CidMap.find(DecompressedId) != m_CidMap.end();
}
@@ -85,13 +87,82 @@ struct CidStore::CidState
m_LogFile.Open(SlogPath, IsNew);
- m_LogFile.Replay([&](const IndexEntry& Ie) { m_CidMap.insert_or_assign(Ie.Uncompressed, Ie.Compressed); });
+ m_LogFile.Replay([&](const IndexEntry& Ie) {
+ if (Ie.Compressed != IoHash::Zero)
+ {
+ // Update
+ m_CidMap.insert_or_assign(Ie.Uncompressed, Ie.Compressed);
+ }
+ else
+ {
+ // Tombstone
+ m_CidMap.erase(Ie.Uncompressed);
+ }
+ });
ZEN_DEBUG("CID index initialized: {} entries found", m_CidMap.size());
}
void Flush() { m_LogFile.Flush(); }
- void Scrub(ScrubContext& Ctx) { ZEN_UNUSED(Ctx); }
+
+ void Scrub(ScrubContext& Ctx)
+ {
+ CasChunkSet ChunkSet;
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_CidMap)
+ {
+ ChunkSet.AddChunk(Kv.second);
+ }
+ }
+
+ m_CasStore.FilterChunks(ChunkSet);
+
+ if (ChunkSet.IsEmpty())
+ {
+ // All good - we have all the chunks
+ return;
+ }
+
+ ZEN_ERROR("Scrubbing found that {} cid mappings mapped to non-existent CAS chunks", ChunkSet.GetChunkSet().size());
+
+ // Erase all mappings to chunks which are not present in the underlying CAS store
+ // we do this by removing mappings from the in-memory lookup structure and also
+ // by emitting tombstone records to the commit log
+
+ const auto& MissingChunks = ChunkSet.GetChunkSet();
+ std::vector<IoHash> BadChunks;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;)
+ {
+ if (auto MissingIt = MissingChunks.find(It->second); MissingIt != MissingChunks.end())
+ {
+ const IoHash& BadHash = It->first;
+
+ // Log a tombstone record
+ m_LogFile.Append({.Uncompressed = BadHash, .Compressed = IoHash::Zero});
+
+ BadChunks.push_back(BadHash);
+
+ It = m_CidMap.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+ }
+
+ m_LogFile.Flush();
+
+ // TODO: Should compute a snapshot index here
+
+ Ctx.ReportBadChunks(BadChunks);
+ }
};
//////////////////////////////////////////////////////////////////////////