aboutsummaryrefslogtreecommitdiff
path: root/zenstore/cidstore.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenstore/cidstore.cpp')
-rw-r--r--zenstore/cidstore.cpp261
1 files changed, 29 insertions, 232 deletions
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index 01eda4697..5a079dbed 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -7,8 +7,9 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/string.h>
-#include <zenstore/cas.h>
-#include <zenstore/caslog.h>
+#include <zenstore/scrubcontext.h>
+
+#include "cas.h"
#include <filesystem>
@@ -18,153 +19,32 @@ struct CidStore::Impl
{
Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {}
- struct IndexEntry
- {
- IoHash Uncompressed;
- IoHash Compressed;
- };
-
- CasStore& m_CasStore;
- TCasLogFile<IndexEntry> m_LogFile;
+ CasStore& m_CasStore;
- RwLock m_Lock;
- tsl::robin_map<IoHash, IoHash> m_CidMap;
+ void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); }
- CidStore::InsertResult AddChunk(CompressedBuffer& ChunkData)
+ CidStore::InsertResult AddChunk(const CompressedBuffer& ChunkData)
{
const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash());
IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer();
- IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
Payload.SetContentType(ZenContentType::kCompressedBinary);
- CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, CompressedHash);
- AddCompressedCid(DecompressedId, CompressedHash);
+ CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, DecompressedId);
- return {.DecompressedId = DecompressedId, .CompressedHash = CompressedHash, .New = Result.New};
+ return {.New = Result.New};
}
- void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed)
- {
- ZEN_ASSERT(Compressed != IoHash::Zero);
-
- RwLock::ExclusiveLockScope _(m_Lock);
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId) { return m_CasStore.FindChunk(DecompressedId); }
- auto It = m_CidMap.try_emplace(DecompressedId, Compressed);
- if (!It.second)
- {
- if (It.first.value() != Compressed)
- {
- It.first.value() = Compressed;
- }
- else
- {
- // No point logging an update that won't change anything
- return;
- }
- }
+ bool ContainsChunk(const IoHash& DecompressedId) { return m_CasStore.ContainsChunk(DecompressedId); }
- // It's not ideal to do this while holding the lock in case
- // we end up in blocking I/O but that's for later
- LogMapping(DecompressedId, Compressed);
- }
-
- void LogMapping(const IoHash& DecompressedId, const IoHash& CompressedHash)
+ void FilterChunks(HashKeySet& InOutChunks)
{
- ZEN_ASSERT(DecompressedId != CompressedHash);
- m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = CompressedHash});
+ InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); });
}
- IoHash RemapCid(const IoHash& DecompressedId)
- {
- RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end())
- {
- return It->second;
- }
-
- return IoHash::Zero;
- }
-
- IoBuffer FindChunkByCid(const IoHash& DecompressedId)
- {
- IoHash CompressedHash;
-
- {
- RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end())
- {
- CompressedHash = It->second;
- }
- else
- {
- return {};
- }
- }
-
- ZEN_ASSERT(CompressedHash != IoHash::Zero);
-
- return m_CasStore.FindChunk(CompressedHash);
- }
-
- bool ContainsChunk(const IoHash& DecompressedId)
- {
- IoHash CasHash = IoHash::Zero;
-
- {
- RwLock::SharedLockScope _(m_Lock);
- if (const auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end())
- {
- CasHash = It->second;
- }
- }
-
- return CasHash != IoHash::Zero ? m_CasStore.ContainsChunk(CasHash) : false;
- }
-
- void InitializeIndex(const std::filesystem::path& RootDir)
- {
- CreateDirectories(RootDir);
- std::filesystem::path SlogPath{RootDir / "cid.slog"};
-
- bool IsNew = !std::filesystem::exists(SlogPath);
-
- m_LogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
-
- ZEN_DEBUG("Initializing index from '{}' ({})", SlogPath, NiceBytes(m_LogFile.GetLogSize()));
-
- uint64_t TombstoneCount = 0;
- uint64_t InvalidCount = 0;
-
- m_LogFile.Replay(
- [&](const IndexEntry& Entry) {
- if (Entry.Compressed != IoHash::Zero)
- {
- // Update
- m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed);
- }
- else
- {
- if (Entry.Uncompressed != IoHash::Zero)
- {
- // Tombstone
- m_CidMap.erase(Entry.Uncompressed);
- ++TombstoneCount;
- }
- else
- {
- // Completely uninitialized entry with both hashes set to zero indicates a
- // problem. Might be an unwritten page due to BSOD or some other problem
- ++InvalidCount;
- }
- }
- },
- 0);
-
- ZEN_INFO("CID index initialized: {} entries found ({} tombstones, {} invalid)", m_CidMap.size(), TombstoneCount, InvalidCount);
- }
-
- void Flush() { m_LogFile.Flush(); }
+ void Flush() { m_CasStore.Flush(); }
void Scrub(ScrubContext& Ctx)
{
@@ -175,83 +55,7 @@ struct CidStore::Impl
m_LastScrubTime = Ctx.ScrubTimestamp();
- CasChunkSet ChunkSet;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_CidMap)
- {
- ChunkSet.AddChunkToSet(Kv.second);
- }
- }
-
- m_CasStore.FilterChunks(ChunkSet);
-
- if (ChunkSet.IsEmpty())
- {
- // All good - we have all the chunks
- return;
- }
-
- ZEN_ERROR("Scrubbing found that {} cid mappings (out of {}) mapped to non-existent CAS chunks. These mappings will be removed",
- ChunkSet.GetSize(),
- m_CidMap.size());
-
- // Erase all mappings to chunks which are not present in the underlying CAS store
- // we do this by removing mappings from the in-memory lookup structure and also
- // by emitting tombstone records to the commit log
-
- std::vector<IoHash> BadChunks;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;)
- {
- if (ChunkSet.ContainsChunk(It->second))
- {
- const IoHash& BadHash = It->first;
-
- // Log a tombstone record
- LogMapping(BadHash, IoHash::Zero);
-
- BadChunks.push_back(BadHash);
-
- It = m_CidMap.erase(It);
- }
- else
- {
- ++It;
- }
- }
- }
-
- m_LogFile.Flush();
-
- // TODO: Should compute a snapshot index here
-
- Ctx.ReportBadCasChunks(BadChunks);
- }
-
- void RemoveCids(CasChunkSet& CasChunks)
- {
- std::vector<IndexEntry> RemovedEntries;
- RemovedEntries.reserve(CasChunks.GetSize());
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- for (auto It = m_CidMap.begin(), End = m_CidMap.end(); It != End;)
- {
- if (CasChunks.ContainsChunk(It->second))
- {
- RemovedEntries.push_back({It->first, IoHash::Zero});
- It = m_CidMap.erase(It);
- continue;
- }
- ++It;
- }
- }
- m_LogFile.Append(RemovedEntries);
+ m_CasStore.Scrub(Ctx);
}
uint64_t m_LastScrubTime = 0;
@@ -259,25 +63,24 @@ struct CidStore::Impl
//////////////////////////////////////////////////////////////////////////
-CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<Impl>(InCasStore))
+CidStore::CidStore(GcManager& Gc) : m_CasStore(CreateCasStore(Gc)), m_Impl(std::make_unique<Impl>(*m_CasStore))
{
- m_Impl->InitializeIndex(RootDir);
}
CidStore::~CidStore()
{
}
-CidStore::InsertResult
-CidStore::AddChunk(CompressedBuffer& ChunkData)
+void
+CidStore::Initialize(const CidStoreConfiguration& Config)
{
- return m_Impl->AddChunk(ChunkData);
+ m_Impl->Initialize(Config);
}
-void
-CidStore::AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed)
+CidStore::InsertResult
+CidStore::AddChunk(const CompressedBuffer& ChunkData)
{
- m_Impl->AddCompressedCid(DecompressedId, Compressed);
+ return m_Impl->AddChunk(ChunkData);
}
IoBuffer
@@ -286,12 +89,6 @@ CidStore::FindChunkByCid(const IoHash& DecompressedId)
return m_Impl->FindChunkByCid(DecompressedId);
}
-IoHash
-CidStore::RemapCid(const IoHash& DecompressedId)
-{
- return m_Impl->RemapCid(DecompressedId);
-}
-
bool
CidStore::ContainsChunk(const IoHash& DecompressedId)
{
@@ -299,25 +96,25 @@ CidStore::ContainsChunk(const IoHash& DecompressedId)
}
void
-CidStore::Flush()
+CidStore::FilterChunks(HashKeySet& InOutChunks)
{
- m_Impl->Flush();
+ return m_Impl->FilterChunks(InOutChunks);
}
void
-CidStore::Scrub(ScrubContext& Ctx)
+CidStore::Flush()
{
- m_Impl->Scrub(Ctx);
+ m_Impl->Flush();
}
void
-CidStore::RemoveCids(CasChunkSet& CasChunks)
+CidStore::Scrub(ScrubContext& Ctx)
{
- m_Impl->RemoveCids(CasChunks);
+ m_Impl->Scrub(Ctx);
}
-CasStoreSize
-CidStore::CasSize() const
+CidStoreSize
+CidStore::TotalSize() const
{
return m_Impl->m_CasStore.TotalSize();
}