diff options
| author | Martin Ridgers <[email protected]> | 2021-09-22 22:46:21 +0200 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-09-22 22:46:21 +0200 |
| commit | 1623f7471b9b0c09b7e9c98652c280d1f6559ca1 (patch) | |
| tree | 78518774fe79ddf3e266a75c699b697971f52f4b /zenstore | |
| parent | Merged main into linux-mac (diff) | |
| parent | Clang format fix. (diff) | |
| download | zen-1623f7471b9b0c09b7e9c98652c280d1f6559ca1.tar.xz zen-1623f7471b9b0c09b7e9c98652c280d1f6559ca1.zip | |
Merge main
Diffstat (limited to 'zenstore')
| -rw-r--r-- | zenstore/CAS.cpp | 49 | ||||
| -rw-r--r-- | zenstore/cidstore.cpp | 57 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 30 | ||||
| -rw-r--r-- | zenstore/compactcas.h | 1 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 126 | ||||
| -rw-r--r-- | zenstore/filecas.h | 20 | ||||
| -rw-r--r-- | zenstore/include/zenstore/CAS.h | 26 | ||||
| -rw-r--r-- | zenstore/include/zenstore/cidstore.h | 4 |
8 files changed, 207 insertions, 106 deletions
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index 916e7f709..eaf72cb41 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -26,6 +26,39 @@ namespace zen { void +CasChunkSet::AddChunkToSet(const IoHash& HashToAdd) +{ + m_ChunkSet.insert(HashToAdd); +} + +void +CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate) +{ + for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;) + { + if (Predicate(*It)) + { + It = m_ChunkSet.erase(It); + } + else + { + ++It; + } + } +} + +void +CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback) +{ + for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;) + { + Callback(*It); + } +} + +////////////////////////////////////////////////////////////////////////// + +void ScrubContext::ReportBadChunks(std::span<IoHash> BadChunks) { ZEN_UNUSED(BadChunks); @@ -111,9 +144,6 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) m_TinyStrategy.Initialize("tobs", 16, IsNewStore); m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); - - ScrubContext Ctx; - Scrub(Ctx); } CasStore::InsertResult @@ -176,6 +206,13 @@ CasImpl::Flush() void CasImpl::Scrub(ScrubContext& Ctx) { + if (m_LastScrubTime == Ctx.ScrubTimestamp()) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + m_SmallStrategy.Scrub(Ctx); m_TinyStrategy.Scrub(Ctx); m_LargeStrategy.Scrub(Ctx); @@ -222,11 +259,11 @@ TEST_CASE("CasStore") CHECK(Result2.New); CasChunkSet ChunkSet; - ChunkSet.AddChunk(Hash1); - ChunkSet.AddChunk(Hash2); + ChunkSet.AddChunkToSet(Hash1); + ChunkSet.AddChunkToSet(Hash2); Store->FilterChunks(ChunkSet); - CHECK(ChunkSet.GetChunkSet().size() == 0); + CHECK(ChunkSet.IsEmpty()); IoBuffer Lookup1 = Store->FindChunk(Hash1); CHECK(Lookup1); diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 5e266f9d3..08a3192ff 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -12,9 +12,9 @@ namespace zen { -struct CidStore::CidState +struct CidStore::Impl { - CidState(CasStore& InCasStore) : m_CasStore(InCasStore) {} + Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {} struct IndexEntry { @@ -42,18 +42,26 @@ struct CidStore::CidState void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) { + ZEN_ASSERT(Compressed != IoHash::Zero); + RwLock::ExclusiveLockScope _(m_Lock); m_CidMap.insert_or_assign(DecompressedId, Compressed); // TODO: it's pretty wasteful to log even idempotent updates // however we can't simply use the boolean returned by insert_or_assign // since there's not a 1:1 mapping between compressed and uncompressed // so if we want a last-write-wins policy then we have to log each update + LogMapping(DecompressedId, Compressed); + } + + void LogMapping(const IoHash& DecompressedId, const IoHash& Compressed) + { m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = Compressed}); } IoBuffer FindChunkByCid(const IoHash& DecompressedId) { IoHash CompressedHash; + { RwLock::SharedLockScope _(m_Lock); if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) @@ -62,12 +70,9 @@ struct CidStore::CidState } } - if (CompressedHash != IoHash::Zero) - { - return m_CasStore.FindChunk(CompressedHash); - } + ZEN_ASSERT(CompressedHash != IoHash::Zero); - return IoBuffer(); + return m_CasStore.FindChunk(CompressedHash); } bool ContainsChunk(const IoHash& DecompressedId) @@ -75,7 +80,17 @@ struct CidStore::CidState RwLock::SharedLockScope _(m_Lock); // Note that we do not check CAS here. This is optimistic but usually // what we want. - return m_CidMap.find(DecompressedId) != m_CidMap.end(); + auto It = m_CidMap.find(DecompressedId); + + if (It == m_CidMap.end()) + { + // Not in map, or tombstone + return false; + } + + ZEN_ASSERT(It->second != IoHash::Zero); + + return true; } void InitializeIndex(const std::filesystem::path& RootDir) @@ -87,6 +102,8 @@ struct CidStore::CidState m_LogFile.Open(SlogPath, IsNew); + uint64_t TombstoneCount = 0; + m_LogFile.Replay([&](const IndexEntry& Ie) { if (Ie.Compressed != IoHash::Zero) { @@ -97,16 +114,24 @@ struct CidStore::CidState { // Tombstone m_CidMap.erase(Ie.Uncompressed); + ++TombstoneCount; } }); - ZEN_DEBUG("CID index initialized: {} entries found", m_CidMap.size()); + ZEN_INFO("CID index initialized: {} entries found ({} tombstones)", m_CidMap.size(), TombstoneCount); } void Flush() { m_LogFile.Flush(); } void Scrub(ScrubContext& Ctx) { + if (Ctx.ScrubTimestamp() == m_LastScrubTime) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + CasChunkSet ChunkSet; { @@ -114,7 +139,7 @@ struct CidStore::CidState for (auto& Kv : m_CidMap) { - ChunkSet.AddChunk(Kv.second); + ChunkSet.AddChunkToSet(Kv.second); } } @@ -126,20 +151,22 @@ struct CidStore::CidState return; } - ZEN_ERROR("Scrubbing found that {} cid mappings mapped to non-existent CAS chunks", ChunkSet.GetChunkSet().size()); + ZEN_ERROR("Scrubbing found that {} cid mappings (out of {}) mapped to non-existent CAS chunks. These mappings will be removed", + ChunkSet.GetSize(), + m_CidMap.size()); // Erase all mappings to chunks which are not present in the underlying CAS store // we do this by removing mappings from the in-memory lookup structure and also // by emitting tombstone records to the commit log - const auto& MissingChunks = ChunkSet.GetChunkSet(); std::vector<IoHash> BadChunks; + { RwLock::SharedLockScope _(m_Lock); for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;) { - if (auto MissingIt = MissingChunks.find(It->second); MissingIt != MissingChunks.end()) + if (ChunkSet.ContainsChunk(It->second)) { const IoHash& BadHash = It->first; @@ -163,11 +190,13 @@ struct CidStore::CidState Ctx.ReportBadChunks(BadChunks); } + + uint64_t m_LastScrubTime = 0; }; ////////////////////////////////////////////////////////////////////////// -CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<CidState>(InCasStore)) +CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<Impl>(InCasStore)) { m_Impl->InitializeIndex(RootDir); } diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index fe38f0fde..5fc3ac356 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -5,6 +5,7 @@ #include "CompactCas.h" #include <zencore/except.h> +#include <zencore/logging.h> #include <zencore/memory.h> #include <zencore/string.h> #include <zencore/thread.h> @@ -32,7 +33,9 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6 ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); - m_PayloadAlignment = Alignment; + m_ContainerBaseName = ContainerBaseName; + m_PayloadAlignment = Alignment; + std::string BaseName(ContainerBaseName); std::filesystem::path SobsPath = m_Config.RootDirectory / (BaseName + ".ucas"); std::filesystem::path SidxPath = m_Config.RootDirectory / (BaseName + ".uidx"); @@ -144,20 +147,7 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) // we're likely to already have a large proportion of the // chunks in the set - std::unordered_set<IoHash> HaveSet; - - for (const IoHash& Hash : InOutChunks.GetChunkSet()) - { - if (HaveChunk(Hash)) - { - HaveSet.insert(Hash); - } - } - - for (const IoHash& Hash : HaveSet) - { - InOutChunks.RemoveIfPresent(Hash); - } + InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void @@ -210,7 +200,8 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) continue; } - const IoHash ComputedHash = IoHash::HashBuffer(BufferBase, Entry.second.Size); + const IoHash ComputedHash = + IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, Entry.second.Size); if (Entry.first != ComputedHash) { @@ -242,6 +233,13 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) } } + if (BadChunks.empty()) + { + return; + } + + ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName); + // Deal with bad chunks by removing them from our lookup map std::vector<IoHash> BadChunkHashes; diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h index 101e6b1b7..a512c3d93 100644 --- a/zenstore/compactcas.h +++ b/zenstore/compactcas.h @@ -69,6 +69,7 @@ private: BasicFile m_SmallObjectFile; BasicFile m_SmallObjectIndex; TCasLogFile<CasDiskIndexEntry> m_CasLog; + std::string m_ContainerBaseName; RwLock m_LocationMapLock; std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher> m_LocationMap; diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 968c9f3a0..c036efd35 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -31,18 +31,12 @@ namespace zen { using namespace fmt::literals; -FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config) +FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash) { -} - -FileCasStrategy::~FileCasStrategy() -{ -} + ShardedPath.Append(RootPath.c_str()); + ShardedPath.Append(std::filesystem::path::preferred_separator); -WideStringBuilderBase& -FileCasStrategy::MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHash& ChunkHash, size_t& OutShard2len) -{ - ExtendableStringBuilder<96> HashString; + ExtendableStringBuilder<64> HashString; ChunkHash.ToHexString(HashString); const char* str = HashString.c_str(); @@ -53,20 +47,31 @@ FileCasStrategy::MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHas // This results in a maximum of 4096 * 256 directories // // The numbers have been chosen somewhat arbitrarily but are large to scale - // to very large chunk repositories. It may or may not make sense to make - // this a configurable policy, and it would probably be a good idea to - // measure performance for different policies and chunk counts + // to very large chunk repositories without creating too many directories + // on a single level since NTFS does not deal very well with this. + // + // It may or may not make sense to make this a configurable policy, and it + // would probably be a good idea to measure performance for different + // policies and chunk counts ShardedPath.AppendAsciiRange(str, str + 3); - ShardedPath.Append('\\'); + ShardedPath.Append(std::filesystem::path::preferred_separator); ShardedPath.AppendAsciiRange(str + 3, str + 5); - OutShard2len = ShardedPath.Size(); + Shard2len = ShardedPath.Size(); - ShardedPath.Append('\\'); + ShardedPath.Append(std::filesystem::path::preferred_separator); ShardedPath.AppendAsciiRange(str + 5, str + 40); +} + +////////////////////////////////////////////////////////////////////////// - return ShardedPath; +FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config), m_Log(logging::Get("filecas")) +{ +} + +FileCasStrategy::~FileCasStrategy() +{ } CasStore::InsertResult @@ -78,11 +83,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) IoBufferFileReference FileRef; if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) { - size_t Shard2len = 0; - ExtendableWideStringBuilder<128> ShardedPath; - ShardedPath.Append(m_Config.RootDirectory.c_str()); - ShardedPath.Append(std::filesystem::path::preferred_separator); - MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); auto DeletePayloadFileOnClose = [&] { // This will cause the file to be deleted when the last handle to it is closed @@ -105,7 +106,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { CAtlFile PayloadFile; - if (HRESULT hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) + if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) { // If we succeeded in opening the target file then we don't need to do anything else because it already exists // and should contain the content we were about to insert @@ -118,7 +119,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) } } - std::filesystem::path FullPath(ShardedPath.c_str()); + std::filesystem::path FullPath(Name.ShardedPath.c_str()); std::filesystem::path FilePath = FullPath.parent_path(); std::wstring FileName = FullPath.native(); @@ -194,11 +195,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) CasStore::InsertResult FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash) { - size_t Shard2len = 0; - ExtendableWideStringBuilder<128> ShardedPath; - ShardedPath.Append(m_Config.RootDirectory.c_str()); - ShardedPath.Append(std::filesystem::path::preferred_separator); - MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); // See if file already exists // @@ -206,7 +203,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize CAtlFile PayloadFile; - HRESULT hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); if (SUCCEEDED(hRes)) { @@ -221,7 +218,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize // For now, use double-checked locking to see if someone else was first - hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); if (SUCCEEDED(hRes)) { @@ -235,7 +232,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes)); } - auto InternalCreateFile = [&] { return PayloadFile.Create(ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; + auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; hRes = InternalCreateFile(); @@ -243,14 +240,14 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize { // Ensure parent directories exist and retry file creation - std::filesystem::create_directories(std::wstring_view(ShardedPath.c_str(), Shard2len)); + std::filesystem::create_directories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len)); hRes = InternalCreateFile(); } if (FAILED(hRes)) { - ThrowSystemException(hRes, "Failed to open shard file '{}'"_format(WideToUtf8(ShardedPath))); + ThrowSystemException(hRes, "Failed to open shard file '{}'"_format(WideToUtf8(Name.ShardedPath))); } size_t ChunkRemain = ChunkSize; @@ -276,36 +273,37 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize IoBuffer FileCasStrategy::FindChunk(const IoHash& ChunkHash) { - size_t Shard2len = 0; - ExtendableWideStringBuilder<128> ShardedPath; - ShardedPath.Append(m_Config.RootDirectory.c_str()); - ShardedPath.Append(std::filesystem::path::preferred_separator); - MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); - return IoBufferBuilder::MakeFromFile(ShardedPath.c_str()); + return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); } bool FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { - size_t Shard2len = 0; - ExtendableWideStringBuilder<128> ShardedPath; - ShardedPath.Append(m_Config.RootDirectory.c_str()); - ShardedPath.Append(std::filesystem::path::preferred_separator); - MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); std::error_code Ec; - if (std::filesystem::exists(ShardedPath.c_str(), Ec)) + if (std::filesystem::exists(Name.ShardedPath.c_str(), Ec)) { return true; } return false; } +void +FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) +{ + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + + ZEN_DEBUG("deleting CAS payload file '{}'", WideToUtf8(Name.ShardedPath)); + + std::filesystem::remove(Name.ShardedPath.c_str(), Ec); +} void FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) @@ -318,20 +316,7 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) // a caller, this is something which needs to be taken into account by anyone consuming // this functionality in any case - std::unordered_set<IoHash> HaveSet; - - for (const IoHash& Hash : InOutChunks.GetChunkSet()) - { - if (HaveChunk(Hash)) - { - HaveSet.insert(Hash); - } - } - - for (const IoHash& Hash : HaveSet) - { - InOutChunks.RemoveIfPresent(Hash); - } + InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void @@ -421,6 +406,27 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) } }); + if (!BadHashes.empty()) + { + ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size()); + + if (Ctx.RunRecovery()) + { + ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size()); + + for (const IoHash& Hash : BadHashes) + { + std::error_code Ec; + DeleteChunk(Hash, Ec); + + if (Ec) + { + ZEN_WARN("failed to delete file for chunk {}", Hash); + } + } + } + } + Ctx.ReportBadChunks(BadHashes); } diff --git a/zenstore/filecas.h b/zenstore/filecas.h index 18102968a..db21502c6 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -12,6 +12,10 @@ #include <functional> +namespace spdlog { +class logger; +} + namespace zen { class BasicFile; @@ -37,10 +41,20 @@ private: const CasStoreConfiguration& m_Config; RwLock m_Lock; RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines + spdlog::logger& m_Log; + spdlog::logger& Log() { return m_Log; } + + inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } + void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback); + void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec); + + struct ShardingHelper + { + ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash); - inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } - static WideStringBuilderBase& MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHash& ChunkHash, size_t& OutShard2len); - void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback); + size_t Shard2len = 0; + ExtendableWideStringBuilder<128> ShardedPath; + }; }; } // namespace zen diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index bb310b179..93454ca6f 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -8,8 +8,11 @@ #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/refcount.h> +#include <zencore/timer.h> + #include <atomic> #include <filesystem> +#include <functional> #include <memory> #include <string> #include <unordered_set> @@ -37,21 +40,33 @@ public: private: }; +/** Context object for data scrubbing + * + * Data scrubbing is when we traverse stored data to validate it and + * optionally correct/recover + */ + class ScrubContext { public: - virtual void ReportBadChunks(std::span<IoHash> BadChunks); + virtual void ReportBadChunks(std::span<IoHash> BadChunks); + inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } + inline bool RunRecovery() const { return m_Recover; } private: + uint64_t m_ScrubTime = GetHifreqTimerValue(); + bool m_Recover = true; }; class CasChunkSet { public: - void AddChunk(const IoHash& HashToAdd) { m_ChunkSet.insert(HashToAdd); } - bool RemoveIfPresent(const IoHash& HashToRemove) { return 0 != m_ChunkSet.erase(HashToRemove); } - const std::unordered_set<IoHash>& GetChunkSet() const { return m_ChunkSet; } - bool IsEmpty() const { return m_ChunkSet.empty(); } + void AddChunkToSet(const IoHash& HashToAdd); + void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); + void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback); + inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); } + inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); } + inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); } private: std::unordered_set<IoHash> m_ChunkSet; @@ -78,6 +93,7 @@ public: protected: CasStoreConfiguration m_Config; + uint64_t m_LastScrubTime = 0; }; ZENCORE_API CasStore* CreateCasStore(); diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index 49f2bf99a..f4439e083 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -50,8 +50,8 @@ public: // TODO: add batch filter support private: - struct CidState; - std::unique_ptr<CidState> m_Impl; + struct Impl; + std::unique_ptr<Impl> m_Impl; }; } // namespace zen |