diff options
Diffstat (limited to 'zenstore/cidstore.cpp')
| -rw-r--r-- | zenstore/cidstore.cpp | 261 |
1 files changed, 29 insertions, 232 deletions
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 01eda4697..5a079dbed 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -7,8 +7,9 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/string.h> -#include <zenstore/cas.h> -#include <zenstore/caslog.h> +#include <zenstore/scrubcontext.h> + +#include "cas.h" #include <filesystem> @@ -18,153 +19,32 @@ struct CidStore::Impl { Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {} - struct IndexEntry - { - IoHash Uncompressed; - IoHash Compressed; - }; - - CasStore& m_CasStore; - TCasLogFile<IndexEntry> m_LogFile; + CasStore& m_CasStore; - RwLock m_Lock; - tsl::robin_map<IoHash, IoHash> m_CidMap; + void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); } - CidStore::InsertResult AddChunk(CompressedBuffer& ChunkData) + CidStore::InsertResult AddChunk(const CompressedBuffer& ChunkData) { const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash()); IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer(); - IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); Payload.SetContentType(ZenContentType::kCompressedBinary); - CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, CompressedHash); - AddCompressedCid(DecompressedId, CompressedHash); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, DecompressedId); - return {.DecompressedId = DecompressedId, .CompressedHash = CompressedHash, .New = Result.New}; + return {.New = Result.New}; } - void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) - { - ZEN_ASSERT(Compressed != IoHash::Zero); - - RwLock::ExclusiveLockScope _(m_Lock); + IoBuffer FindChunkByCid(const IoHash& DecompressedId) { return m_CasStore.FindChunk(DecompressedId); } - auto It = m_CidMap.try_emplace(DecompressedId, Compressed); - if (!It.second) - { - if (It.first.value() != Compressed) - { - It.first.value() = Compressed; - } - else - { - // No point logging an update that won't change anything - return; - } - } + bool ContainsChunk(const IoHash& DecompressedId) { return m_CasStore.ContainsChunk(DecompressedId); } - // It's not ideal to do this while holding the lock in case - // we end up in blocking I/O but that's for later - LogMapping(DecompressedId, Compressed); - } - - void LogMapping(const IoHash& DecompressedId, const IoHash& CompressedHash) + void FilterChunks(HashKeySet& InOutChunks) { - ZEN_ASSERT(DecompressedId != CompressedHash); - m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = CompressedHash}); + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); }); } - IoHash RemapCid(const IoHash& DecompressedId) - { - RwLock::SharedLockScope _(m_Lock); - if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) - { - return It->second; - } - - return IoHash::Zero; - } - - IoBuffer FindChunkByCid(const IoHash& DecompressedId) - { - IoHash CompressedHash; - - { - RwLock::SharedLockScope _(m_Lock); - if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) - { - CompressedHash = It->second; - } - else - { - return {}; - } - } - - ZEN_ASSERT(CompressedHash != IoHash::Zero); - - return m_CasStore.FindChunk(CompressedHash); - } - - bool ContainsChunk(const IoHash& DecompressedId) - { - IoHash CasHash = IoHash::Zero; - - { - RwLock::SharedLockScope _(m_Lock); - if (const auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) - { - CasHash = It->second; - } - } - - return CasHash != IoHash::Zero ? m_CasStore.ContainsChunk(CasHash) : false; - } - - void InitializeIndex(const std::filesystem::path& RootDir) - { - CreateDirectories(RootDir); - std::filesystem::path SlogPath{RootDir / "cid.slog"}; - - bool IsNew = !std::filesystem::exists(SlogPath); - - m_LogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); - - ZEN_DEBUG("Initializing index from '{}' ({})", SlogPath, NiceBytes(m_LogFile.GetLogSize())); - - uint64_t TombstoneCount = 0; - uint64_t InvalidCount = 0; - - m_LogFile.Replay( - [&](const IndexEntry& Entry) { - if (Entry.Compressed != IoHash::Zero) - { - // Update - m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed); - } - else - { - if (Entry.Uncompressed != IoHash::Zero) - { - // Tombstone - m_CidMap.erase(Entry.Uncompressed); - ++TombstoneCount; - } - else - { - // Completely uninitialized entry with both hashes set to zero indicates a - // problem. Might be an unwritten page due to BSOD or some other problem - ++InvalidCount; - } - } - }, - 0); - - ZEN_INFO("CID index initialized: {} entries found ({} tombstones, {} invalid)", m_CidMap.size(), TombstoneCount, InvalidCount); - } - - void Flush() { m_LogFile.Flush(); } + void Flush() { m_CasStore.Flush(); } void Scrub(ScrubContext& Ctx) { @@ -175,83 +55,7 @@ struct CidStore::Impl m_LastScrubTime = Ctx.ScrubTimestamp(); - CasChunkSet ChunkSet; - - { - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_CidMap) - { - ChunkSet.AddChunkToSet(Kv.second); - } - } - - m_CasStore.FilterChunks(ChunkSet); - - if (ChunkSet.IsEmpty()) - { - // All good - we have all the chunks - return; - } - - ZEN_ERROR("Scrubbing found that {} cid mappings (out of {}) mapped to non-existent CAS chunks. These mappings will be removed", - ChunkSet.GetSize(), - m_CidMap.size()); - - // Erase all mappings to chunks which are not present in the underlying CAS store - // we do this by removing mappings from the in-memory lookup structure and also - // by emitting tombstone records to the commit log - - std::vector<IoHash> BadChunks; - - { - RwLock::SharedLockScope _(m_Lock); - - for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;) - { - if (ChunkSet.ContainsChunk(It->second)) - { - const IoHash& BadHash = It->first; - - // Log a tombstone record - LogMapping(BadHash, IoHash::Zero); - - BadChunks.push_back(BadHash); - - It = m_CidMap.erase(It); - } - else - { - ++It; - } - } - } - - m_LogFile.Flush(); - - // TODO: Should compute a snapshot index here - - Ctx.ReportBadCasChunks(BadChunks); - } - - void RemoveCids(CasChunkSet& CasChunks) - { - std::vector<IndexEntry> RemovedEntries; - RemovedEntries.reserve(CasChunks.GetSize()); - { - RwLock::ExclusiveLockScope _(m_Lock); - for (auto It = m_CidMap.begin(), End = m_CidMap.end(); It != End;) - { - if (CasChunks.ContainsChunk(It->second)) - { - RemovedEntries.push_back({It->first, IoHash::Zero}); - It = m_CidMap.erase(It); - continue; - } - ++It; - } - } - m_LogFile.Append(RemovedEntries); + m_CasStore.Scrub(Ctx); } uint64_t m_LastScrubTime = 0; @@ -259,25 +63,24 @@ struct CidStore::Impl ////////////////////////////////////////////////////////////////////////// -CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<Impl>(InCasStore)) +CidStore::CidStore(GcManager& Gc) : m_CasStore(CreateCasStore(Gc)), m_Impl(std::make_unique<Impl>(*m_CasStore)) { - m_Impl->InitializeIndex(RootDir); } CidStore::~CidStore() { } -CidStore::InsertResult -CidStore::AddChunk(CompressedBuffer& ChunkData) +void +CidStore::Initialize(const CidStoreConfiguration& Config) { - return m_Impl->AddChunk(ChunkData); + m_Impl->Initialize(Config); } -void -CidStore::AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) +CidStore::InsertResult +CidStore::AddChunk(const CompressedBuffer& ChunkData) { - m_Impl->AddCompressedCid(DecompressedId, Compressed); + return m_Impl->AddChunk(ChunkData); } IoBuffer @@ -286,12 +89,6 @@ CidStore::FindChunkByCid(const IoHash& DecompressedId) return m_Impl->FindChunkByCid(DecompressedId); } -IoHash -CidStore::RemapCid(const IoHash& DecompressedId) -{ - return m_Impl->RemapCid(DecompressedId); -} - bool CidStore::ContainsChunk(const IoHash& DecompressedId) { @@ -299,25 +96,25 @@ CidStore::ContainsChunk(const IoHash& DecompressedId) } void -CidStore::Flush() +CidStore::FilterChunks(HashKeySet& InOutChunks) { - m_Impl->Flush(); + return m_Impl->FilterChunks(InOutChunks); } void -CidStore::Scrub(ScrubContext& Ctx) +CidStore::Flush() { - m_Impl->Scrub(Ctx); + m_Impl->Flush(); } void -CidStore::RemoveCids(CasChunkSet& CasChunks) +CidStore::Scrub(ScrubContext& Ctx) { - m_Impl->RemoveCids(CasChunks); + m_Impl->Scrub(Ctx); } -CasStoreSize -CidStore::CasSize() const +CidStoreSize +CidStore::TotalSize() const { return m_Impl->m_CasStore.TotalSize(); } |