diff options
| author | Dan Engelbrecht <[email protected]> | 2022-06-17 07:06:21 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-06-17 07:06:21 -0700 |
| commit | c7e22a4ef1cce7103b9afbeec487461cb32f8dbe (patch) | |
| tree | 8b99d51bf496c96f82161c18fbdcfd5c6f8f31fd /zenstore | |
| parent | fixed merge mistake which caused a build error (diff) | |
| download | zen-0.1.4-pre6.tar.xz zen-0.1.4-pre6.zip | |
Make cas storage an hidden implementation detail of CidStore (#130)v0.1.4-pre6v0.1.4-pre5
- Bumped ZEN_SCHEMA_VERSION
- CasStore no longer a public API, it is hidden behind CidStore
- Moved cas.h from public header folder
- CidStore no longer maps from Cid -> Cas, we store entries in Cas under RawHash
- CasStore now decompresses data to validate content (matching against RawHash)
- CasChunkSet renames to HashKeySet and put in separate header/cpp file
- Disabled "Chunk" command for now as it relied on CAS being exposed as a service
- Changed CAS http service to Cid http server
- Moved "Run" command completely inside ZEN_WITH_EXEC_SERVICES define
- Removed "cas.basic" test
- Uncommented ".exec.basic" test and added return-skip at start of test
- Moved ScrubContext to separate header file
- Renamed CasGC to GcManager
- Cleaned up configuration passing in cas store classes
- Removed CAS stuff from GcContext and clarified naming in class
- Remove migration code
Diffstat (limited to 'zenstore')
| -rw-r--r-- | zenstore/cas.cpp | 90 | ||||
| -rw-r--r-- | zenstore/cas.h | 61 | ||||
| -rw-r--r-- | zenstore/caslog.cpp | 2 | ||||
| -rw-r--r-- | zenstore/cidstore.cpp | 261 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 745 | ||||
| -rw-r--r-- | zenstore/compactcas.h | 16 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 111 | ||||
| -rw-r--r-- | zenstore/filecas.h | 23 | ||||
| -rw-r--r-- | zenstore/gc.cpp | 174 | ||||
| -rw-r--r-- | zenstore/hashkeyset.cpp | 60 | ||||
| -rw-r--r-- | zenstore/include/zenstore/cas.h | 144 | ||||
| -rw-r--r-- | zenstore/include/zenstore/caslog.h | 2 | ||||
| -rw-r--r-- | zenstore/include/zenstore/cidstore.h | 57 | ||||
| -rw-r--r-- | zenstore/include/zenstore/gc.h | 44 | ||||
| -rw-r--r-- | zenstore/include/zenstore/hashkeyset.h | 54 | ||||
| -rw-r--r-- | zenstore/include/zenstore/scrubcontext.h | 40 | ||||
| -rw-r--r-- | zenstore/zenstore.cpp | 5 |
17 files changed, 620 insertions, 1269 deletions
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp index 0e1d5b242..54e8cb11c 100644 --- a/zenstore/cas.cpp +++ b/zenstore/cas.cpp @@ -1,6 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <zenstore/cas.h> +#include "cas.h" #include "compactcas.h" #include "filecas.h" @@ -18,7 +18,9 @@ #include <zencore/thread.h> #include <zencore/trace.h> #include <zencore/uid.h> +#include <zenstore/cidstore.h> #include <zenstore/gc.h> +#include <zenstore/scrubcontext.h> #include <gsl/gsl-lite.hpp> @@ -30,58 +32,6 @@ namespace zen { -void -CasChunkSet::AddChunkToSet(const IoHash& HashToAdd) -{ - m_ChunkSet.insert(HashToAdd); -} - -void -CasChunkSet::AddChunksToSet(std::span<const IoHash> HashesToAdd) -{ - m_ChunkSet.insert(HashesToAdd.begin(), HashesToAdd.end()); -} - -void -CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate) -{ - for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;) - { - if (Predicate(*It)) - { - It = m_ChunkSet.erase(It); - } - else - { - ++It; - } - } -} - -void -CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback) -{ - for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd; ++It) - { - Callback(*It); - } -} - -////////////////////////////////////////////////////////////////////////// - -void -ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks) -{ - m_BadCas.AddChunksToSet(BadCasChunks); -} - -void -ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) -{ - m_ChunkCount.fetch_add(ChunkCount); - m_ByteCount.fetch_add(ChunkBytes); -} - /** * CAS store implementation * @@ -93,18 +43,18 @@ ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) class CasImpl : public CasStore { public: - CasImpl(CasGc& Gc); + CasImpl(GcManager& Gc); virtual ~CasImpl(); - virtual void Initialize(const CasStoreConfiguration& InConfig) override; + virtual void Initialize(const CidStoreConfiguration& InConfig) override; virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) override; virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; virtual bool ContainsChunk(const IoHash& ChunkHash) override; - virtual void FilterChunks(CasChunkSet& InOutChunks) override; + virtual void FilterChunks(HashKeySet& InOutChunks) override; virtual void Flush() override; virtual void Scrub(ScrubContext& Ctx) override; virtual void GarbageCollect(GcContext& GcCtx) override; - virtual CasStoreSize TotalSize() const override; + virtual CidStoreSize TotalSize() const override; private: CasContainerStrategy m_TinyStrategy; @@ -124,7 +74,7 @@ private: void UpdateManifest(); }; -CasImpl::CasImpl(CasGc& Gc) : m_TinyStrategy(m_Config, Gc), m_SmallStrategy(m_Config, Gc), m_LargeStrategy(m_Config, Gc) +CasImpl::CasImpl(GcManager& Gc) : m_TinyStrategy(Gc), m_SmallStrategy(Gc), m_LargeStrategy(Gc) { } @@ -133,7 +83,7 @@ CasImpl::~CasImpl() } void -CasImpl::Initialize(const CasStoreConfiguration& InConfig) +CasImpl::Initialize(const CidStoreConfiguration& InConfig) { m_Config = InConfig; @@ -149,9 +99,9 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) // Initialize payload storage - m_LargeStrategy.Initialize(IsNewStore); - m_TinyStrategy.Initialize("tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block - m_SmallStrategy.Initialize("sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block + m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); + m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block + m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block } bool @@ -292,7 +242,7 @@ CasImpl::ContainsChunk(const IoHash& ChunkHash) } void -CasImpl::FilterChunks(CasChunkSet& InOutChunks) +CasImpl::FilterChunks(HashKeySet& InOutChunks) { m_SmallStrategy.FilterChunks(InOutChunks); m_TinyStrategy.FilterChunks(InOutChunks); @@ -330,7 +280,7 @@ CasImpl::GarbageCollect(GcContext& GcCtx) m_LargeStrategy.CollectGarbage(GcCtx); } -CasStoreSize +CidStoreSize CasImpl::TotalSize() const { const uint64_t Tiny = m_TinyStrategy.StorageSize().DiskSize; @@ -343,7 +293,7 @@ CasImpl::TotalSize() const ////////////////////////////////////////////////////////////////////////// std::unique_ptr<CasStore> -CreateCasStore(CasGc& Gc) +CreateCasStore(GcManager& Gc) { return std::make_unique<CasImpl>(Gc); } @@ -359,10 +309,10 @@ TEST_CASE("CasStore") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration config; + CidStoreConfiguration config; config.RootDirectory = TempDir.Path(); - CasGc Gc; + GcManager Gc; std::unique_ptr<CasStore> Store = CreateCasStore(Gc); Store->Initialize(config); @@ -382,9 +332,9 @@ TEST_CASE("CasStore") CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2); CHECK(Result2.New); - CasChunkSet ChunkSet; - ChunkSet.AddChunkToSet(Hash1); - ChunkSet.AddChunkToSet(Hash2); + HashKeySet ChunkSet; + ChunkSet.AddHashToSet(Hash1); + ChunkSet.AddHashToSet(Hash2); Store->FilterChunks(ChunkSet); CHECK(ChunkSet.IsEmpty()); diff --git a/zenstore/cas.h b/zenstore/cas.h new file mode 100644 index 000000000..2ad160d28 --- /dev/null +++ b/zenstore/cas.h @@ -0,0 +1,61 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/blake3.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/refcount.h> +#include <zencore/timer.h> +#include <zenstore/cidstore.h> +#include <zenstore/hashkeyset.h> + +#include <atomic> +#include <filesystem> +#include <functional> +#include <memory> +#include <string> +#include <unordered_set> + +namespace zen { + +class GcContext; +class GcManager; +class ScrubContext; + +/** Content Addressable Storage interface + + */ + +class CasStore +{ +public: + virtual ~CasStore() = default; + + const CidStoreConfiguration& Config() { return m_Config; } + + struct InsertResult + { + bool New = false; + }; + + virtual void Initialize(const CidStoreConfiguration& Config) = 0; + virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash) = 0; + virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; + virtual bool ContainsChunk(const IoHash& ChunkHash) = 0; + virtual void FilterChunks(HashKeySet& InOutChunks) = 0; + virtual void Flush() = 0; + virtual void Scrub(ScrubContext& Ctx) = 0; + virtual void GarbageCollect(GcContext& GcCtx) = 0; + virtual CidStoreSize TotalSize() const = 0; + +protected: + CidStoreConfiguration m_Config; + uint64_t m_LastScrubTime = 0; +}; + +ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc); + +void CAS_forcelink(); + +} // namespace zen diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp index 03a56f010..9c5258bce 100644 --- a/zenstore/caslog.cpp +++ b/zenstore/caslog.cpp @@ -1,6 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <zenstore/cas.h> +#include <zenstore/caslog.h> #include "compactcas.h" diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 01eda4697..5a079dbed 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -7,8 +7,9 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/string.h> -#include <zenstore/cas.h> -#include <zenstore/caslog.h> +#include <zenstore/scrubcontext.h> + +#include "cas.h" #include <filesystem> @@ -18,153 +19,32 @@ struct CidStore::Impl { Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {} - struct IndexEntry - { - IoHash Uncompressed; - IoHash Compressed; - }; - - CasStore& m_CasStore; - TCasLogFile<IndexEntry> m_LogFile; + CasStore& m_CasStore; - RwLock m_Lock; - tsl::robin_map<IoHash, IoHash> m_CidMap; + void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); } - CidStore::InsertResult AddChunk(CompressedBuffer& ChunkData) + CidStore::InsertResult AddChunk(const CompressedBuffer& ChunkData) { const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash()); IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer(); - IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); Payload.SetContentType(ZenContentType::kCompressedBinary); - CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, CompressedHash); - AddCompressedCid(DecompressedId, CompressedHash); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, DecompressedId); - return {.DecompressedId = DecompressedId, .CompressedHash = CompressedHash, .New = Result.New}; + return {.New = Result.New}; } - void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) - { - ZEN_ASSERT(Compressed != IoHash::Zero); - - RwLock::ExclusiveLockScope _(m_Lock); + IoBuffer FindChunkByCid(const IoHash& DecompressedId) { return m_CasStore.FindChunk(DecompressedId); } - auto It = m_CidMap.try_emplace(DecompressedId, Compressed); - if (!It.second) - { - if (It.first.value() != Compressed) - { - It.first.value() = Compressed; - } - else - { - // No point logging an update that won't change anything - return; - } - } + bool ContainsChunk(const IoHash& DecompressedId) { return m_CasStore.ContainsChunk(DecompressedId); } - // It's not ideal to do this while holding the lock in case - // we end up in blocking I/O but that's for later - LogMapping(DecompressedId, Compressed); - } - - void LogMapping(const IoHash& DecompressedId, const IoHash& CompressedHash) + void FilterChunks(HashKeySet& InOutChunks) { - ZEN_ASSERT(DecompressedId != CompressedHash); - m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = CompressedHash}); + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); }); } - IoHash RemapCid(const IoHash& DecompressedId) - { - RwLock::SharedLockScope _(m_Lock); - if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) - { - return It->second; - } - - return IoHash::Zero; - } - - IoBuffer FindChunkByCid(const IoHash& DecompressedId) - { - IoHash CompressedHash; - - { - RwLock::SharedLockScope _(m_Lock); - if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) - { - CompressedHash = It->second; - } - else - { - return {}; - } - } - - ZEN_ASSERT(CompressedHash != IoHash::Zero); - - return m_CasStore.FindChunk(CompressedHash); - } - - bool ContainsChunk(const IoHash& DecompressedId) - { - IoHash CasHash = IoHash::Zero; - - { - RwLock::SharedLockScope _(m_Lock); - if (const auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) - { - CasHash = It->second; - } - } - - return CasHash != IoHash::Zero ? m_CasStore.ContainsChunk(CasHash) : false; - } - - void InitializeIndex(const std::filesystem::path& RootDir) - { - CreateDirectories(RootDir); - std::filesystem::path SlogPath{RootDir / "cid.slog"}; - - bool IsNew = !std::filesystem::exists(SlogPath); - - m_LogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); - - ZEN_DEBUG("Initializing index from '{}' ({})", SlogPath, NiceBytes(m_LogFile.GetLogSize())); - - uint64_t TombstoneCount = 0; - uint64_t InvalidCount = 0; - - m_LogFile.Replay( - [&](const IndexEntry& Entry) { - if (Entry.Compressed != IoHash::Zero) - { - // Update - m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed); - } - else - { - if (Entry.Uncompressed != IoHash::Zero) - { - // Tombstone - m_CidMap.erase(Entry.Uncompressed); - ++TombstoneCount; - } - else - { - // Completely uninitialized entry with both hashes set to zero indicates a - // problem. Might be an unwritten page due to BSOD or some other problem - ++InvalidCount; - } - } - }, - 0); - - ZEN_INFO("CID index initialized: {} entries found ({} tombstones, {} invalid)", m_CidMap.size(), TombstoneCount, InvalidCount); - } - - void Flush() { m_LogFile.Flush(); } + void Flush() { m_CasStore.Flush(); } void Scrub(ScrubContext& Ctx) { @@ -175,83 +55,7 @@ struct CidStore::Impl m_LastScrubTime = Ctx.ScrubTimestamp(); - CasChunkSet ChunkSet; - - { - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_CidMap) - { - ChunkSet.AddChunkToSet(Kv.second); - } - } - - m_CasStore.FilterChunks(ChunkSet); - - if (ChunkSet.IsEmpty()) - { - // All good - we have all the chunks - return; - } - - ZEN_ERROR("Scrubbing found that {} cid mappings (out of {}) mapped to non-existent CAS chunks. These mappings will be removed", - ChunkSet.GetSize(), - m_CidMap.size()); - - // Erase all mappings to chunks which are not present in the underlying CAS store - // we do this by removing mappings from the in-memory lookup structure and also - // by emitting tombstone records to the commit log - - std::vector<IoHash> BadChunks; - - { - RwLock::SharedLockScope _(m_Lock); - - for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;) - { - if (ChunkSet.ContainsChunk(It->second)) - { - const IoHash& BadHash = It->first; - - // Log a tombstone record - LogMapping(BadHash, IoHash::Zero); - - BadChunks.push_back(BadHash); - - It = m_CidMap.erase(It); - } - else - { - ++It; - } - } - } - - m_LogFile.Flush(); - - // TODO: Should compute a snapshot index here - - Ctx.ReportBadCasChunks(BadChunks); - } - - void RemoveCids(CasChunkSet& CasChunks) - { - std::vector<IndexEntry> RemovedEntries; - RemovedEntries.reserve(CasChunks.GetSize()); - { - RwLock::ExclusiveLockScope _(m_Lock); - for (auto It = m_CidMap.begin(), End = m_CidMap.end(); It != End;) - { - if (CasChunks.ContainsChunk(It->second)) - { - RemovedEntries.push_back({It->first, IoHash::Zero}); - It = m_CidMap.erase(It); - continue; - } - ++It; - } - } - m_LogFile.Append(RemovedEntries); + m_CasStore.Scrub(Ctx); } uint64_t m_LastScrubTime = 0; @@ -259,25 +63,24 @@ struct CidStore::Impl ////////////////////////////////////////////////////////////////////////// -CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<Impl>(InCasStore)) +CidStore::CidStore(GcManager& Gc) : m_CasStore(CreateCasStore(Gc)), m_Impl(std::make_unique<Impl>(*m_CasStore)) { - m_Impl->InitializeIndex(RootDir); } CidStore::~CidStore() { } -CidStore::InsertResult -CidStore::AddChunk(CompressedBuffer& ChunkData) +void +CidStore::Initialize(const CidStoreConfiguration& Config) { - return m_Impl->AddChunk(ChunkData); + m_Impl->Initialize(Config); } -void -CidStore::AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) +CidStore::InsertResult +CidStore::AddChunk(const CompressedBuffer& ChunkData) { - m_Impl->AddCompressedCid(DecompressedId, Compressed); + return m_Impl->AddChunk(ChunkData); } IoBuffer @@ -286,12 +89,6 @@ CidStore::FindChunkByCid(const IoHash& DecompressedId) return m_Impl->FindChunkByCid(DecompressedId); } -IoHash -CidStore::RemapCid(const IoHash& DecompressedId) -{ - return m_Impl->RemapCid(DecompressedId); -} - bool CidStore::ContainsChunk(const IoHash& DecompressedId) { @@ -299,25 +96,25 @@ CidStore::ContainsChunk(const IoHash& DecompressedId) } void -CidStore::Flush() +CidStore::FilterChunks(HashKeySet& InOutChunks) { - m_Impl->Flush(); + return m_Impl->FilterChunks(InOutChunks); } void -CidStore::Scrub(ScrubContext& Ctx) +CidStore::Flush() { - m_Impl->Scrub(Ctx); + m_Impl->Flush(); } void -CidStore::RemoveCids(CasChunkSet& CasChunks) +CidStore::Scrub(ScrubContext& Ctx) { - m_Impl->RemoveCids(CasChunks); + m_Impl->Scrub(Ctx); } -CasStoreSize -CidStore::CasSize() const +CidStoreSize +CidStore::TotalSize() const { return m_Impl->m_CasStore.TotalSize(); } diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 5aed02e7f..a7fdfa1f5 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -2,13 +2,16 @@ #include "compactcas.h" -#include <zenstore/cas.h> +#include "cas.h" +#include <zencore/compress.h> #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zenstore/scrubcontext.h> + #include <gsl/gsl-lite.hpp> #include <xxhash.h> @@ -76,94 +79,6 @@ namespace { return GetBasePath(RootPath, ContainerBaseName) / "blocks"; } - std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return RootPath / (ContainerBaseName + LogExtension); - } - - std::filesystem::path GetLegacyDataPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return RootPath / (ContainerBaseName + ".ucas"); - } - - std::filesystem::path GetLegacyIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return RootPath / (ContainerBaseName + IndexExtension); - } - - struct LegacyCasDiskLocation - { - LegacyCasDiskLocation(uint64_t InOffset, uint64_t InSize) - { - ZEN_ASSERT(InOffset <= 0xff'ffff'ffff); - ZEN_ASSERT(InSize <= 0xff'ffff'ffff); - - memcpy(&m_Offset[0], &InOffset, sizeof m_Offset); - memcpy(&m_Size[0], &InSize, sizeof m_Size); - } - - LegacyCasDiskLocation() = default; - - inline uint64_t GetOffset() const - { - uint64_t Offset = 0; - memcpy(&Offset, &m_Offset, sizeof m_Offset); - return Offset; - } - - inline uint64_t GetSize() const - { - uint64_t Size = 0; - memcpy(&Size, &m_Size, sizeof m_Size); - return Size; - } - - private: - uint8_t m_Offset[5]; - uint8_t m_Size[5]; - }; - - struct LegacyCasDiskIndexEntry - { - static const uint8_t kTombstone = 0x01; - - IoHash Key; - LegacyCasDiskLocation Location; - ZenContentType ContentType = ZenContentType::kUnknownContentType; - uint8_t Flags = 0; - }; - - bool ValidateLegacyEntry(const LegacyCasDiskIndexEntry& Entry, std::string& OutReason) - { - if (Entry.Key == IoHash::Zero) - { - OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); - return false; - } - if ((Entry.Flags & ~LegacyCasDiskIndexEntry::kTombstone) != 0) - { - OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); - return false; - } - if (Entry.Flags & LegacyCasDiskIndexEntry::kTombstone) - { - return true; - } - if (Entry.ContentType != ZenContentType::kUnknownContentType) - { - OutReason = - fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString()); - return false; - } - uint64_t Size = Entry.Location.GetSize(); - if (Size == 0) - { - OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); - return false; - } - return true; - } - bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) @@ -199,10 +114,7 @@ namespace { ////////////////////////////////////////////////////////////////////////// -CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc) -: GcStorage(Gc) -, m_Config(Config) -, m_Log(logging::Get("containercas")) +CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("containercas")) { } @@ -211,16 +123,21 @@ CasContainerStrategy::~CasContainerStrategy() } void -CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore) +CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory, + const std::string_view ContainerBaseName, + uint32_t MaxBlockSize, + uint64_t Alignment, + bool IsNewStore) { ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); ZEN_ASSERT(MaxBlockSize > 0); + m_RootDirectory = RootDirectory; m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; m_MaxBlockSize = MaxBlockSize; - m_BlocksBasePath = GetBlocksBasePath(m_Config.RootDirectory, m_ContainerBaseName); + m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName); OpenContainer(IsNewStore); @@ -267,6 +184,9 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const CasStore::InsertResult CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { +#if !ZEN_WITH_TESTS + ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); +#endif return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } @@ -293,7 +213,7 @@ CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) } void -CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) +CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) { // This implementation is good enough for relatively small // chunk sets (in terms of chunk identifiers), but would @@ -302,7 +222,7 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) // we're likely to already have a large proportion of the // chunks in the set - InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void @@ -316,6 +236,7 @@ void CasContainerStrategy::Scrub(ScrubContext& Ctx) { std::vector<IoHash> BadKeys; + uint64_t ChunkCount{0}, ChunkBytes{0}; std::vector<BlockStoreLocation> ChunkLocations; std::vector<IoHash> ChunkIndexToChunkHash; @@ -337,6 +258,9 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { @@ -344,66 +268,97 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) BadKeys.push_back(Hash); return; } - const IoHash ComputedHash = IoHash::HashBuffer(Data, Size); - if (ComputedHash != Hash) + + IoBuffer Buffer(IoBuffer::Wrap, Data, Size); + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed) + { + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash) + { + // Hash mismatch + BadKeys.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS + IoHash ComputedHash = IoHash::HashBuffer(Data, Size); + if (ComputedHash == Hash) { - // Hash mismatch - BadKeys.push_back(Hash); return; } +#endif + BadKeys.push_back(Hash); }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; + + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + // TODO: Add API to verify compressed buffer without having to memorymap the whole file + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed) + { + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash) + { + // Hash mismatch + BadKeys.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS IoHashStream Hasher; - File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); - IoHash ComputedHash = Hasher.GetHash(); - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - if (ComputedHash != Hash) + File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + IoHash ComputedHash = Hasher.GetHash(); + if (ComputedHash == Hash) { - // Hash mismatch - BadKeys.push_back(Hash); return; } +#endif + BadKeys.push_back(Hash); }; m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); _.ReleaseNow(); - if (BadKeys.empty()) - { - return; - } - - ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_Config.RootDirectory / m_ContainerBaseName); + Ctx.ReportScrubbed(ChunkCount, ChunkBytes); - if (Ctx.RunRecovery()) + if (!BadKeys.empty()) { - // Deal with bad chunks by removing them from our lookup map + ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(BadKeys.size()); + if (Ctx.RunRecovery()) { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - for (const IoHash& ChunkHash : BadKeys) + // Deal with bad chunks by removing them from our lookup map + + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(BadKeys.size()); { - const auto KeyIt = m_LocationMap.find(ChunkHash); - if (KeyIt == m_LocationMap.end()) + RwLock::ExclusiveLockScope __(m_LocationMapLock); + for (const IoHash& ChunkHash : BadKeys) { - // Might have been GC'd - continue; + const auto KeyIt = m_LocationMap.find(ChunkHash); + if (KeyIt == m_LocationMap.end()) + { + // Might have been GC'd + continue; + } + LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); + m_LocationMap.erase(KeyIt); } - LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); - m_LocationMap.erase(KeyIt); } + m_CasLog.Append(LogEntries); } - m_CasLog.Append(LogEntries); } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do - Ctx.ReportBadCasChunks(BadKeys); + Ctx.ReportBadCidChunks(BadKeys); + + ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); } void @@ -432,7 +387,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) // do a blocking operation and update the m_LocationMap after each new block is // written and figuring out the path to the next new block. - ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); + ZEN_INFO("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName); uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; @@ -468,7 +423,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) ChunkLocations.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); - GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { auto KeyIt = LocationMap.find(ChunkHash); const BlockStoreDiskLocation& DiskLocation = KeyIt->second; BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); @@ -539,26 +494,26 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) }, [&GcCtx]() { return GcCtx.CollectSmallObjects(); }); - GcCtx.DeletedCas(DeletedChunks); + GcCtx.AddDeletedCids(DeletedChunks); } void CasContainerStrategy::MakeIndexSnapshot() { - ZEN_INFO("write store snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName); + ZEN_INFO("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", - m_Config.RootDirectory / m_ContainerBaseName, + m_RootDirectory / m_ContainerBaseName, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; - fs::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); - fs::path TempIndexPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); + fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName); // Move index away, we keep it if something goes wrong if (fs::is_regular_file(TempIndexPath)) @@ -629,13 +584,13 @@ uint64_t CasContainerStrategy::ReadIndexFile() { std::vector<CasDiskIndexEntry> Entries; - std::filesystem::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); if (std::filesystem::is_regular_file(IndexPath)) { Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing #{} entries in {}", - m_Config.RootDirectory / m_ContainerBaseName, + m_RootDirectory / m_ContainerBaseName, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); @@ -682,13 +637,13 @@ uint64_t CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) { std::vector<CasDiskIndexEntry> Entries; - std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); if (std::filesystem::is_regular_file(LogPath)) { Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing #{} entries in {}", - m_Config.RootDirectory / m_ContainerBaseName, + m_RootDirectory / m_ContainerBaseName, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); @@ -727,208 +682,6 @@ CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) return 0; } -uint64_t -CasContainerStrategy::MigrateLegacyData(bool CleanSource) -{ - std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName); - - if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0) - { - return 0; - } - - ZEN_INFO("migrating store '{}'", m_Config.RootDirectory / m_ContainerBaseName); - - std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName); - std::filesystem::path LegacyIndexPath = GetLegacyIndexPath(m_Config.RootDirectory, m_ContainerBaseName); - - uint64_t MigratedChunkCount = 0; - uint32_t MigratedBlockCount = 0; - Stopwatch MigrationTimer; - uint64_t TotalSize = 0; - const auto _ = MakeGuard([&] { - ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", - m_Config.RootDirectory / m_ContainerBaseName, - MigratedChunkCount, - MigratedBlockCount, - NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), - NiceBytes(TotalSize)); - }); - - uint64_t BlockFileSize = 0; - { - BasicFile BlockFile; - BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - BlockFileSize = BlockFile.FileSize(); - } - - std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; - uint64_t InvalidEntryCount = 0; - - TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; - LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); - { - Stopwatch Timer; - const auto __ = MakeGuard([&] { - ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", - m_Config.RootDirectory / m_ContainerBaseName, - LegacyDiskIndex.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - if (LegacyCasLog.Initialize()) - { - LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); - LegacyCasLog.Replay( - [&](const LegacyCasDiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone) - { - LegacyDiskIndex.erase(Record.Key); - return; - } - if (!ValidateLegacyEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); - InvalidEntryCount++; - return; - } - LegacyDiskIndex.insert_or_assign(Record.Key, Record); - }, - 0); - - std::vector<IoHash> BadEntries; - for (const auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskIndexEntry& Record(Entry.second); - if (Record.Location.GetOffset() + Record.Location.GetSize() <= BlockFileSize) - { - continue; - } - ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath); - BadEntries.push_back(Entry.first); - } - for (const IoHash& BadHash : BadEntries) - { - LegacyDiskIndex.erase(BadHash); - } - InvalidEntryCount += BadEntries.size(); - } - } - - if (InvalidEntryCount) - { - ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_Config.RootDirectory / m_ContainerBaseName); - } - - if (LegacyDiskIndex.empty()) - { - LegacyCasLog.Close(); - if (CleanSource) - { - // Older versions of CasContainerStrategy expects the legacy files to exist if it can find - // a CAS manifest and crashes on startup if they don't. - // In order to not break startup when switching back an older version, lets just reset - // the legacy data files to zero length. - - BasicFile LegacyLog; - LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); - BasicFile LegacySobs; - LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); - BasicFile LegacySidx; - LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate); - } - return 0; - } - - std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); - CreateDirectories(LogPath.parent_path()); - TCasLogFile<CasDiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - - std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash; - std::vector<BlockStoreLocation> ChunkLocations; - ChunkIndexToChunkHash.reserve(LegacyDiskIndex.size()); - ChunkLocations.reserve(LegacyDiskIndex.size()); - for (const auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskLocation& Location = Entry.second.Location; - const IoHash& ChunkHash = Entry.first; - size_t ChunkIndex = ChunkLocations.size(); - ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.GetOffset(), .Size = Location.GetSize()}); - ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; - TotalSize += Location.GetSize(); - } - m_BlockStore.Split( - ChunkLocations, - LegacyDataPath, - m_BlocksBasePath, - m_MaxBlockSize, - BlockStoreDiskLocation::MaxBlockIndex + 1, - m_PayloadAlignment, - CleanSource, - [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( - const BlockStore::MovedChunksArray& MovedChunks) { - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(MovedChunks.size()); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const BlockStoreLocation& NewLocation = Entry.second; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; - LogEntries.push_back({.Key = ChunkHash, - .Location = {NewLocation, m_PayloadAlignment}, - .ContentType = OldEntry.ContentType, - .Flags = OldEntry.Flags}); - } - for (const CasDiskIndexEntry& Entry : LogEntries) - { - m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); - } - CasLog.Append(LogEntries); - CasLog.Flush(); - if (CleanSource) - { - std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries; - LegacyLogEntries.reserve(MovedChunks.size()); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; - LegacyLogEntries.push_back( - LegacyCasDiskIndexEntry{.Key = ChunkHash, - .Location = OldEntry.Location, - .ContentType = OldEntry.ContentType, - .Flags = (uint8_t)(OldEntry.Flags | LegacyCasDiskIndexEntry::kTombstone)}); - } - LegacyCasLog.Append(LegacyLogEntries); - LegacyCasLog.Flush(); - } - MigratedBlockCount++; - MigratedChunkCount += MovedChunks.size(); - }); - - LegacyCasLog.Close(); - CasLog.Close(); - - if (CleanSource) - { - // Older versions of CasContainerStrategy expects the legacy files to exist if it can find - // a CAS manifest and crashes on startup if they don't. - // In order to not break startup when switching back an older version, lets just reset - // the legacy data files to zero length. - - BasicFile LegacyLog; - LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); - BasicFile LegacySobs; - LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); - BasicFile LegacySidx; - LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate); - } - return MigratedChunkCount; -} - void CasContainerStrategy::OpenContainer(bool IsNewStore) { @@ -937,25 +690,19 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) m_LocationMap.clear(); - std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName); if (IsNewStore) { - std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName); - std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName); - - std::filesystem::remove(LegacyLogPath); - std::filesystem::remove(LegacyDataPath); std::filesystem::remove_all(BasePath); } - uint64_t LogPosition = ReadIndexFile(); - uint64_t LogEntryCount = ReadLog(LogPosition); - uint64_t LegacyLogEntryCount = MigrateLegacyData(true); + uint64_t LogPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(LogPosition); CreateDirectories(BasePath); - std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); std::vector<BlockStoreLocation> KnownLocations; @@ -969,7 +716,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); - if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0)) + if (IsNewStore || (LogEntryCount > 0)) { MakeIndexSnapshot(); } @@ -1040,18 +787,14 @@ TEST_CASE("compactcas.compact.gc") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); - const int kIterationCount = 1000; std::vector<IoHash> Keys(kIterationCount); { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 65536, 16, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); for (int i = 0; i < kIterationCount; ++i) { @@ -1083,9 +826,9 @@ TEST_CASE("compactcas.compact.gc") // the original cas store { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 65536, 16, false); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); for (int i = 0; i < kIterationCount; ++i) { @@ -1109,18 +852,13 @@ TEST_CASE("compactcas.compact.totalsize") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - - CreateDirectories(CasConfig.RootDirectory); - const uint64_t kChunkSize = 1024; const int32_t kChunkCount = 16; { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 65536, 16, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { @@ -1135,9 +873,9 @@ TEST_CASE("compactcas.compact.totalsize") } { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 65536, 16, false); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); @@ -1145,9 +883,9 @@ TEST_CASE("compactcas.compact.totalsize") // Re-open again, this time we should have a snapshot { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 65536, 16, false); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); @@ -1159,13 +897,9 @@ TEST_CASE("compactcas.gc.basic") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); - - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("cb", 65536, 1 << 4, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); IoBuffer Chunk = CreateChunk(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); @@ -1186,16 +920,12 @@ TEST_CASE("compactcas.gc.removefile") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); - IoBuffer Chunk = CreateChunk(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("cb", 65536, 1 << 4, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); CHECK(InsertResult.New); @@ -1204,9 +934,9 @@ TEST_CASE("compactcas.gc.removefile") Cas.Flush(); } - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("cb", 65536, 1 << 4, false); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false); GcContext GcCtx; GcCtx.CollectSmallObjects(true); @@ -1222,13 +952,9 @@ TEST_CASE("compactcas.gc.compact") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); - - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("cb", 2048, 1 << 4, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true); uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; std::vector<IoBuffer> Chunks; @@ -1275,7 +1001,7 @@ TEST_CASE("compactcas.gc.compact") std::vector<IoHash> KeepChunks; KeepChunks.push_back(ChunkHashes[0]); KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); @@ -1308,7 +1034,7 @@ TEST_CASE("compactcas.gc.compact") GcCtx.CollectSmallObjects(true); std::vector<IoHash> KeepChunks; KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); @@ -1342,7 +1068,7 @@ TEST_CASE("compactcas.gc.compact") KeepChunks.push_back(ChunkHashes[1]); KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[7]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); @@ -1377,7 +1103,7 @@ TEST_CASE("compactcas.gc.compact") KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[7]); KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); @@ -1414,7 +1140,7 @@ TEST_CASE("compactcas.gc.compact") KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); @@ -1476,13 +1202,10 @@ TEST_CASE("compactcas.gc.deleteblockonopen") ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 1024, 16, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); for (size_t i = 0; i < 20; i++) { @@ -1498,7 +1221,7 @@ TEST_CASE("compactcas.gc.deleteblockonopen") { KeepChunks.push_back(ChunkHashes[i]); } - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); @@ -1513,9 +1236,9 @@ TEST_CASE("compactcas.gc.deleteblockonopen") } { // Re-open - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 1024, 16, false); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 1024, 16, false); for (size_t i = 0; i < 20; i += 2) { @@ -1545,13 +1268,9 @@ TEST_CASE("compactcas.gc.handleopeniobuffer") ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); - - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 1024, 16, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); for (size_t i = 0; i < 20; i++) { @@ -1574,131 +1293,12 @@ TEST_CASE("compactcas.gc.handleopeniobuffer") CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); } -TEST_CASE("compactcas.legacyconversion") -{ - ScopedTemporaryDirectory TempDir; - - uint64_t ChunkSizes[] = {2041, 1123, 1223, 1239, 341, 1412, 912, 774, 341, 431, 554, 1098, 2048, 339, 561, 16, 16, 2048, 2048}; - size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t); - size_t SingleBlockSize = 0; - std::vector<IoBuffer> Chunks; - Chunks.reserve(ChunkCount); - for (uint64_t Size : ChunkSizes) - { - Chunks.push_back(CreateChunk(Size)); - SingleBlockSize += Size; - } - - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(ChunkCount); - for (const IoBuffer& Chunk : Chunks) - { - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); - - { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", gsl::narrow<uint32_t>(SingleBlockSize * 2), 16, true); - - for (size_t i = 0; i < ChunkCount; i++) - { - CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); - } - - std::vector<IoHash> KeepChunks; - for (size_t i = 0; i < ChunkCount; i += 2) - { - KeepChunks.push_back(ChunkHashes[i]); - } - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - GcCtx.ContributeCas(KeepChunks); - Cas.Flush(); - Gc.CollectGarbage(GcCtx); - } - - std::filesystem::path BlockPath = BlockStore::GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1); - std::filesystem::path LegacyDataPath = GetLegacyDataPath(CasConfig.RootDirectory, "test"); - std::filesystem::rename(BlockPath, LegacyDataPath); - - std::vector<CasDiskIndexEntry> LogEntries; - std::filesystem::path IndexPath = GetIndexPath(CasConfig.RootDirectory, "test"); - if (std::filesystem::is_regular_file(IndexPath)) - { - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CasDiskIndexHeader)) - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); - CasDiskIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion && - Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) - { - LogEntries.resize(Header.EntryCount); - ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); - } - } - ObjectIndexFile.Close(); - std::filesystem::remove(IndexPath); - } - - std::filesystem::path LogPath = GetLogPath(CasConfig.RootDirectory, "test"); - { - TCasLogFile<CasDiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - LogEntries.reserve(CasLog.GetLogCount()); - CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0); - } - TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; - std::filesystem::path LegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test"); - LegacyCasLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate); - - for (const CasDiskIndexEntry& Entry : LogEntries) - { - BlockStoreLocation Location = Entry.Location.Get(16); - LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size); - LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key, - .Location = LegacyLocation, - .ContentType = Entry.ContentType, - .Flags = Entry.Flags}; - LegacyCasLog.Append(LegacyEntry); - } - LegacyCasLog.Close(); - - std::filesystem::remove_all(CasConfig.RootDirectory / "test"); - - { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 2048, 16, false); - - for (size_t i = 0; i < ChunkCount; i += 2) - { - CHECK(Cas.HaveChunk(ChunkHashes[i])); - CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); - CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); - } - } -} - TEST_CASE("compactcas.threadedinsert") { // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - - CreateDirectories(CasConfig.RootDirectory); - const uint64_t kChunkSize = 1048; const int32_t kChunkCount = 4096; uint64_t ExpectedSize = 0; @@ -1724,9 +1324,9 @@ TEST_CASE("compactcas.threadedinsert") std::atomic<size_t> WorkCompleted = 0; WorkerThreadPool ThreadPool(4); - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 32768, 16, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); { for (const auto& Chunk : Chunks) { @@ -1838,10 +1438,10 @@ TEST_CASE("compactcas.threadedinsert") GcContext GcCtx; GcCtx.CollectSmallObjects(true); - GcCtx.ContributeCas(KeepHashes); + GcCtx.AddRetainedCids(KeepHashes); Cas.CollectGarbage(GcCtx); - CasChunkSet& Deleted = GcCtx.DeletedCas(); - Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + const HashKeySet& Deleted = GcCtx.DeletedCids(); + Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } while (WorkCompleted < NewChunks.size() + Chunks.size()) @@ -1879,10 +1479,10 @@ TEST_CASE("compactcas.threadedinsert") GcContext GcCtx; GcCtx.CollectSmallObjects(true); - GcCtx.ContributeCas(KeepHashes); + GcCtx.AddRetainedCids(KeepHashes); Cas.CollectGarbage(GcCtx); - CasChunkSet& Deleted = GcCtx.DeletedCas(); - Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + const HashKeySet& Deleted = GcCtx.DeletedCids(); + Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } { WorkCompleted = 0; @@ -1902,53 +1502,6 @@ TEST_CASE("compactcas.threadedinsert") } } -TEST_CASE("compactcas.migrate.large.data") // * doctest::skip(true)) -{ - if (true) - { - return; - } - const char* BigDataPath = "D:\\zen-data\\dc4-zen-cache-t\\cas"; - std::filesystem::path TobsBasePath = GetBasePath(BigDataPath, "tobs"); - std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs"); - std::filesystem::remove_all(TobsBasePath); - std::filesystem::remove_all(SobsBasePath); - - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = BigDataPath; - uint64_t TObsSize = 0; - { - CasGc TobsCasGc; - CasContainerStrategy TobsCas(CasConfig, TobsCasGc); - TobsCas.Initialize("tobs", 1u << 28, 16, false); - TObsSize = TobsCas.StorageSize().DiskSize; - CHECK(TObsSize > 0); - } - - uint64_t SObsSize = 0; - { - CasGc SobsCasGc; - CasContainerStrategy SobsCas(CasConfig, SobsCasGc); - SobsCas.Initialize("sobs", 1u << 30, 4096, false); - SObsSize = SobsCas.StorageSize().DiskSize; - CHECK(SObsSize > 0); - } - - CasGc TobsCasGc; - CasContainerStrategy TobsCas(CasConfig, TobsCasGc); - TobsCas.Initialize("tobs", 1u << 28, 16, false); - GcContext TobsGcCtx; - TobsCas.CollectGarbage(TobsGcCtx); - CHECK(TobsCas.StorageSize().DiskSize == TObsSize); - - CasGc SobsCasGc; - CasContainerStrategy SobsCas(CasConfig, SobsCasGc); - SobsCas.Initialize("sobs", 1u << 30, 4096, false); - GcContext SobsGcCtx; - SobsCas.CollectGarbage(SobsGcCtx); - CHECK(SobsCas.StorageSize().DiskSize == SObsSize); -} - #endif void diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h index 114a6a48c..2acac7ca3 100644 --- a/zenstore/compactcas.h +++ b/zenstore/compactcas.h @@ -4,10 +4,11 @@ #include <zencore/zencore.h> #include <zenstore/blockstore.h> -#include <zenstore/cas.h> #include <zenstore/caslog.h> #include <zenstore/gc.h> +#include "cas.h" + #include <atomic> #include <limits> #include <unordered_map> @@ -47,14 +48,18 @@ static_assert(sizeof(CasDiskIndexEntry) == 32); struct CasContainerStrategy final : public GcStorage { - CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc); + CasContainerStrategy(GcManager& Gc); ~CasContainerStrategy(); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); - void FilterChunks(CasChunkSet& InOutChunks); - void Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore); + void FilterChunks(HashKeySet& InOutChunks); + void Initialize(const std::filesystem::path& RootDirectory, + const std::string_view ContainerBaseName, + uint32_t MaxBlockSize, + uint64_t Alignment, + bool IsNewStore); void Flush(); void Scrub(ScrubContext& Ctx); virtual void CollectGarbage(GcContext& GcCtx) override; @@ -65,12 +70,11 @@ private: void MakeIndexSnapshot(); uint64_t ReadIndexFile(); uint64_t ReadLog(uint64_t SkipEntryCount); - uint64_t MigrateLegacyData(bool CleanSource); void OpenContainer(bool IsNewStore); spdlog::logger& Log() { return m_Log; } - const CasStoreConfiguration& m_Config; + std::filesystem::path m_RootDirectory; spdlog::logger& m_Log; uint64_t m_PayloadAlignment = 1u << 4; uint64_t m_MaxBlockSize = 1u << 28; diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index d074a906f..23e3f4cd8 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -2,6 +2,7 @@ #include "filecas.h" +#include <zencore/compress.h> #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -16,6 +17,7 @@ #include <zencore/uid.h> #include <zenstore/basicfile.h> #include <zenstore/gc.h> +#include <zenstore/scrubcontext.h> #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> @@ -71,10 +73,7 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo ////////////////////////////////////////////////////////////////////////// -FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc) -: GcStorage(Gc) -, m_Config(Config) -, m_Log(logging::Get("filecas")) +FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas")) { } @@ -83,17 +82,19 @@ FileCasStrategy::~FileCasStrategy() } void -FileCasStrategy::Initialize(bool IsNewStore) +FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore) { m_IsInitialized = true; - CreateDirectories(m_Config.RootDirectory); + m_RootDirectory = RootDirectory; - m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + CreateDirectories(m_RootDirectory); + + m_CasLog.Open(m_RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); Stopwatch Timer; const auto _ = MakeGuard([&] { - ZEN_INFO("read log {} containing {}", m_Config.RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed))); + ZEN_INFO("read log {} containing {}", m_RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed))); }); std::unordered_set<IoHash> FoundEntries; @@ -127,13 +128,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); +#if !ZEN_WITH_TESTS + ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); +#endif + // File-based chunks have special case handling whereby we move the file into // place in the file store directory, thus avoiding unnecessary copying IoBufferFileReference FileRef; if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) { - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::ExclusiveLockScope _(LockForHash(ChunkHash)); @@ -340,7 +345,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize { ZEN_ASSERT(m_IsInitialized); - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); // See if file already exists // @@ -485,7 +490,7 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -497,7 +502,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -513,7 +518,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash) void FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) { - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec)); if (Ec) @@ -534,7 +539,7 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) } void -FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) +FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) { ZEN_ASSERT(m_IsInitialized); @@ -546,7 +551,7 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) // a caller, this is something which needs to be taken into account by anyone consuming // this functionality in any case - InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void @@ -602,12 +607,12 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& const std::filesystem::path& RootDirectory; std::function<void(const IoHash& Hash, BasicFile& PayloadFile)> Callback; - } CasVisitor{m_Config.RootDirectory}; + } CasVisitor{m_RootDirectory}; CasVisitor.Callback = std::move(Callback); FileSystemTraversal Traversal; - Traversal.TraverseFileSystem(m_Config.RootDirectory, CasVisitor); + Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor); } void @@ -630,21 +635,34 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) { ZEN_ASSERT(m_IsInitialized); - std::vector<IoHash> BadHashes; - std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; + std::vector<IoHash> BadHashes; + uint64_t ChunkCount{0}, ChunkBytes{0}; IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { + ++ChunkCount; + ChunkBytes += Payload.FileSize(); + + IoBuffer Buffer(IoBuffer::BorrowedFile, Payload.Handle(), 0, Payload.FileSize()); + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed) + { + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash) + { + // Hash mismatch + BadHashes.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS IoHashStream Hasher; - Payload.StreamFile([&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + Payload.StreamByteRange(0, Payload.FileSize(), [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); - - if (ComputedHash != Hash) + if (ComputedHash == Hash) { - BadHashes.push_back(Hash); + return; } - - ++ChunkCount; - ChunkBytes.fetch_add(Payload.FileSize()); +#endif + BadHashes.push_back(Hash); }); Ctx.ReportScrubbed(ChunkCount, ChunkBytes); @@ -670,9 +688,12 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) } } - Ctx.ReportBadCasChunks(BadHashes); + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + Ctx.ReportBadCidChunks(BadHashes); - ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes)); + ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); } void @@ -680,7 +701,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) { ZEN_ASSERT(m_IsInitialized); - ZEN_INFO("collecting garbage from {}", m_Config.RootDirectory); + ZEN_INFO("collecting garbage from {}", m_RootDirectory); std::vector<IoHash> ChunksToDelete; std::atomic<uint64_t> ChunksToDeleteBytes{0}; @@ -694,7 +715,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_INFO("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}", - m_Config.RootDirectory, + m_RootDirectory, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), DeletedCount, ChunkCount, @@ -706,7 +727,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) bool KeepThis = false; CandidateCas.clear(); CandidateCas.push_back(Hash); - GcCtx.FilterCas(CandidateCas, [&](const IoHash& Hash) { + GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) { ZEN_UNUSED(Hash); KeepThis = true; }); @@ -725,12 +746,12 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) if (ChunksToDelete.empty()) { - ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_Config.RootDirectory); + ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory); return; } ZEN_INFO("deleting file CAS garbage for '{}': {} out of {} chunks ({})", - m_Config.RootDirectory, + m_RootDirectory, ChunksToDelete.size(), ChunkCount.load(), NiceBytes(ChunksToDeleteBytes)); @@ -751,13 +772,13 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) if (Ec) { - ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_Config.RootDirectory, Hash, Ec.message()); + ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message()); continue; } DeletedCount++; } - GcCtx.DeletedCas(ChunksToDelete); + GcCtx.AddDeletedCids(ChunksToDelete); } ////////////////////////////////////////////////////////////////////////// @@ -769,13 +790,10 @@ TEST_CASE("cas.file.move") // specifying an absolute path here can be helpful when using procmon to dig into things ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; - CasGc Gc; + GcManager Gc; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path() / "cas"; - - FileCasStrategy FileCas(CasConfig, Gc); - FileCas.Initialize(/* IsNewStore */ true); + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); { std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; @@ -850,12 +868,9 @@ TEST_CASE("cas.file.gc") // specifying an absolute path here can be helpful when using procmon to dig into things ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path() / "cas"; - - CasGc Gc; - FileCasStrategy FileCas(CasConfig, Gc); - FileCas.Initialize(/* IsNewStore */ true); + GcManager Gc; + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); const int kIterationCount = 1000; std::vector<IoHash> Keys{kIterationCount}; @@ -903,7 +918,7 @@ TEST_CASE("cas.file.gc") { if (Key.Hash[0] & 1) { - Ctx.ContributeCas(std::vector<IoHash>{Key}); + Ctx.AddRetainedCids(std::vector<IoHash>{Key}); } } diff --git a/zenstore/filecas.h b/zenstore/filecas.h index ef67ae9eb..f14e5d057 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -8,10 +8,11 @@ #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/thread.h> -#include <zenstore/cas.h> #include <zenstore/caslog.h> #include <zenstore/gc.h> +#include "cas.h" + #include <atomic> #include <functional> @@ -28,28 +29,28 @@ class BasicFile; struct FileCasStrategy final : public GcStorage { - FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc); + FileCasStrategy(GcManager& Gc); ~FileCasStrategy(); - void Initialize(bool IsNewStore); + void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore); CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); - void FilterChunks(CasChunkSet& InOutChunks); + void FilterChunks(HashKeySet& InOutChunks); void Flush(); void Scrub(ScrubContext& Ctx); virtual void CollectGarbage(GcContext& GcCtx) override; virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; } private: - const CasStoreConfiguration& m_Config; - RwLock m_Lock; - RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines - spdlog::logger& m_Log; - spdlog::logger& Log() { return m_Log; } - std::atomic_uint64_t m_TotalSize{}; - bool m_IsInitialized = false; + std::filesystem::path m_RootDirectory; + RwLock m_Lock; + RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines + spdlog::logger& m_Log; + spdlog::logger& Log() { return m_Log; } + std::atomic_uint64_t m_TotalSize{}; + bool m_IsInitialized = false; struct FileCasIndexEntry { diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index bb03b9751..0902abf4a 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -14,9 +14,10 @@ #include <zencore/testing.h> #include <zencore/testutils.h> #include <zencore/timer.h> -#include <zenstore/cas.h> #include <zenstore/cidstore.h> +#include "cas.h" + #include <fmt/format.h> #include <filesystem> @@ -173,9 +174,8 @@ struct GcContext::GcState using CacheKeyContexts = std::unordered_map<std::string, std::vector<IoHash>>; CacheKeyContexts m_ExpiredCacheKeys; - CasChunkSet m_CasChunks; - CasChunkSet m_DeletedCasChunks; - CasChunkSet m_CidChunks; + HashKeySet m_RetainedCids; + HashKeySet m_DeletedCids; GcClock::TimePoint m_GcTime; GcClock::Duration m_MaxCacheDuration = std::chrono::hours(24); bool m_DeletionMode = true; @@ -194,19 +194,13 @@ GcContext::~GcContext() } void -GcContext::ContributeCids(std::span<const IoHash> Cids) -{ - m_State->m_CidChunks.AddChunksToSet(Cids); -} - -void -GcContext::ContributeCas(std::span<const IoHash> Cas) +GcContext::AddRetainedCids(std::span<const IoHash> Cids) { - m_State->m_CasChunks.AddChunksToSet(Cas); + m_State->m_RetainedCids.AddHashesToSet(Cids); } void -GcContext::ContributeCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys) +GcContext::SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys) { m_State->m_ExpiredCacheKeys[CacheKeyContext] = std::move(ExpiredKeys); } @@ -214,37 +208,31 @@ GcContext::ContributeCacheKeys(const std::string& CacheKeyContext, std::vector<I void GcContext::IterateCids(std::function<void(const IoHash&)> Callback) { - m_State->m_CidChunks.IterateChunks([&](const IoHash& Hash) { Callback(Hash); }); + m_State->m_RetainedCids.IterateHashes([&](const IoHash& Hash) { Callback(Hash); }); } void GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc) { - m_State->m_CidChunks.FilterChunks(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); }); + m_State->m_RetainedCids.FilterHashes(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); }); } void -GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc) +GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc) { - m_State->m_CasChunks.FilterChunks(Cas, [&](const IoHash& Hash) { KeepFunc(Hash); }); + m_State->m_RetainedCids.FilterHashes(Cid, std::move(FilterFunc)); } void -GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&, bool)>&& FilterFunc) +GcContext::AddDeletedCids(std::span<const IoHash> Cas) { - m_State->m_CasChunks.FilterChunks(Cas, std::move(FilterFunc)); + m_State->m_DeletedCids.AddHashesToSet(Cas); } -void -GcContext::DeletedCas(std::span<const IoHash> Cas) +const HashKeySet& +GcContext::DeletedCids() { - m_State->m_DeletedCasChunks.AddChunksToSet(Cas); -} - -CasChunkSet& -GcContext::DeletedCas() -{ - return m_State->m_DeletedCasChunks; + return m_State->m_DeletedCids; } std::span<const IoHash> @@ -318,7 +306,7 @@ GcContext::ClaimGCReserve() ////////////////////////////////////////////////////////////////////////// -GcContributor::GcContributor(CasGc& Gc) : m_Gc(Gc) +GcContributor::GcContributor(GcManager& Gc) : m_Gc(Gc) { m_Gc.AddGcContributor(this); } @@ -330,7 +318,7 @@ GcContributor::~GcContributor() ////////////////////////////////////////////////////////////////////////// -GcStorage::GcStorage(CasGc& Gc) : m_Gc(Gc) +GcStorage::GcStorage(GcManager& Gc) : m_Gc(Gc) { m_Gc.AddGcStorage(this); } @@ -342,30 +330,30 @@ GcStorage::~GcStorage() ////////////////////////////////////////////////////////////////////////// -CasGc::CasGc() +GcManager::GcManager() { } -CasGc::~CasGc() +GcManager::~GcManager() { } void -CasGc::AddGcContributor(GcContributor* Contributor) +GcManager::AddGcContributor(GcContributor* Contributor) { RwLock::ExclusiveLockScope _(m_Lock); m_GcContribs.push_back(Contributor); } void -CasGc::RemoveGcContributor(GcContributor* Contributor) +GcManager::RemoveGcContributor(GcContributor* Contributor) { RwLock::ExclusiveLockScope _(m_Lock); std::erase_if(m_GcContribs, [&](GcContributor* $) { return $ == Contributor; }); } void -CasGc::AddGcStorage(GcStorage* Storage) +GcManager::AddGcStorage(GcStorage* Storage) { ZEN_ASSERT(Storage != nullptr); RwLock::ExclusiveLockScope _(m_Lock); @@ -373,14 +361,14 @@ CasGc::AddGcStorage(GcStorage* Storage) } void -CasGc::RemoveGcStorage(GcStorage* Storage) +GcManager::RemoveGcStorage(GcStorage* Storage) { RwLock::ExclusiveLockScope _(m_Lock); std::erase_if(m_GcStorage, [&](GcStorage* $) { return $ == Storage; }); } void -CasGc::CollectGarbage(GcContext& GcCtx) +GcManager::CollectGarbage(GcContext& GcCtx) { RwLock::SharedLockScope _(m_Lock); @@ -394,36 +382,6 @@ CasGc::CollectGarbage(GcContext& GcCtx) } } - // Cache records reference CAS chunks with the uncompressed - // raw hash (Cid). Map the content ID to CAS hash to enable - // the CAS storage backends to filter valid chunks. - - if (CidStore* CidStore = m_CidStore) - { - std::vector<IoHash> CasHashes; - uint64_t UnknownChunks = 0; - - GcCtx.IterateCids([&](const IoHash& Cid) { - IoHash Cas = CidStore->RemapCid(Cid); - - if (Cas == IoHash::Zero) - { - ++UnknownChunks; - } - else - { - CasHashes.push_back(Cas); - } - }); - - if (UnknownChunks) - { - ZEN_WARN("found {} unknown CIDs", UnknownChunks); - } - - GcCtx.ContributeCas(CasHashes); - } - // Then trim storage { @@ -434,61 +392,48 @@ CasGc::CollectGarbage(GcContext& GcCtx) Storage->CollectGarbage(GcCtx); } } +} + +GcStorageSize +GcManager::TotalStorageSize() const +{ + RwLock::SharedLockScope _(m_Lock); - // Remove Cid to CAS hash mappings. Scrub? + GcStorageSize TotalSize; - if (CidStore* CidStore = m_CidStore) + for (GcStorage* Storage : m_GcStorage) { - Stopwatch Timer; - const auto Guard = MakeGuard([&] { ZEN_INFO("clean up deleted content ids in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - CidStore->RemoveCids(GcCtx.DeletedCas()); + const auto Size = Storage->StorageSize(); + TotalSize.DiskSize += Size.DiskSize; + TotalSize.MemorySize += Size.MemorySize; } -} -void -CasGc::SetCidStore(CidStore* Cids) -{ - m_CidStore = Cids; + return TotalSize; } +#if ZEN_USE_REF_TRACKING void -CasGc::OnNewCidReferences(std::span<IoHash> Hashes) +GcManager::OnNewCidReferences(std::span<IoHash> Hashes) { ZEN_UNUSED(Hashes); } void -CasGc::OnCommittedCidReferences(std::span<IoHash> Hashes) +GcManager::OnCommittedCidReferences(std::span<IoHash> Hashes) { ZEN_UNUSED(Hashes); } void -CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes) +GcManager::OnDroppedCidReferences(std::span<IoHash> Hashes) { ZEN_UNUSED(Hashes); } - -GcStorageSize -CasGc::TotalStorageSize() const -{ - RwLock::SharedLockScope _(m_Lock); - - GcStorageSize TotalSize; - - for (GcStorage* Storage : m_GcStorage) - { - const auto Size = Storage->StorageSize(); - TotalSize.DiskSize += Size.DiskSize; - TotalSize.MemorySize += Size.MemorySize; - } - - return TotalSize; -} +#endif ////////////////////////////////////////////////////////////////////////// -GcScheduler::GcScheduler(CasGc& CasGc) : m_Log(logging::Get("gc")), m_CasGc(CasGc) +GcScheduler::GcScheduler(GcManager& GcManager) : m_Log(logging::Get("gc")), m_GcManager(GcManager) { } @@ -606,7 +551,7 @@ GcScheduler::SchedulerThread() { std::error_code Ec; DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec); - GcStorageSize TotalSize = m_CasGc.TotalStorageSize(); + GcStorageSize TotalSize = m_GcManager.TotalStorageSize(); std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now()); if (RemaingTime < std::chrono::seconds::zero()) @@ -668,7 +613,7 @@ GcScheduler::SchedulerThread() Stopwatch Timer; const auto __ = MakeGuard([&] { ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_CasGc.CollectGarbage(GcCtx); + m_GcManager.CollectGarbage(GcCtx); m_LastGcTime = GcClock::Now(); m_NextGcTime = NextGcTime(m_LastGcTime); @@ -745,38 +690,37 @@ TEST_CASE("gc.basic") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; + CidStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path() / "cas"; - CasGc Gc; - std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc); - CidStore CidStore{*CasStore, TempDir.Path() / "cid"}; + GcManager Gc; + CidStore CidStore(Gc); - CasStore->Initialize(CasConfig); - Gc.SetCidStore(&CidStore); + CidStore.Initialize(CasConfig); IoBuffer Chunk = CreateChunk(128); auto CompressedChunk = Compress(Chunk); const auto InsertResult = CidStore.AddChunk(CompressedChunk); + CHECK(InsertResult.New); GcContext GcCtx; GcCtx.CollectSmallObjects(true); - CasStore->Flush(); + CidStore.Flush(); Gc.CollectGarbage(GcCtx); - CHECK(!CidStore.ContainsChunk(InsertResult.DecompressedId)); + CHECK(!CidStore.ContainsChunk(IoHash::FromBLAKE3(CompressedChunk.GetRawHash()))); } TEST_CASE("gc.full") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; + CidStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path() / "cas"; - CasGc Gc; + GcManager Gc; std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc); CasStore->Initialize(CasConfig); @@ -813,7 +757,7 @@ TEST_CASE("gc.full") CasStore->InsertChunk(Chunks[7], ChunkHashes[7]); CasStore->InsertChunk(Chunks[8], ChunkHashes[8]); - CasStoreSize InitialSize = CasStore->TotalSize(); + CidStoreSize InitialSize = CasStore->TotalSize(); // Keep first and last { @@ -823,7 +767,7 @@ TEST_CASE("gc.full") std::vector<IoHash> KeepChunks; KeepChunks.push_back(ChunkHashes[0]); KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); CasStore->Flush(); Gc.CollectGarbage(GcCtx); @@ -856,7 +800,7 @@ TEST_CASE("gc.full") GcCtx.CollectSmallObjects(true); std::vector<IoHash> KeepChunks; KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); CasStore->Flush(); Gc.CollectGarbage(GcCtx); @@ -890,7 +834,7 @@ TEST_CASE("gc.full") KeepChunks.push_back(ChunkHashes[1]); KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[7]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); CasStore->Flush(); Gc.CollectGarbage(GcCtx); @@ -925,7 +869,7 @@ TEST_CASE("gc.full") KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[7]); KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); CasStore->Flush(); Gc.CollectGarbage(GcCtx); diff --git a/zenstore/hashkeyset.cpp b/zenstore/hashkeyset.cpp new file mode 100644 index 000000000..a5436f5cb --- /dev/null +++ b/zenstore/hashkeyset.cpp @@ -0,0 +1,60 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/hashkeyset.h> + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +void +HashKeySet::AddHashToSet(const IoHash& HashToAdd) +{ + m_HashSet.insert(HashToAdd); +} + +void +HashKeySet::AddHashesToSet(std::span<const IoHash> HashesToAdd) +{ + m_HashSet.insert(HashesToAdd.begin(), HashesToAdd.end()); +} + +void +HashKeySet::RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate) +{ + for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd;) + { + if (Predicate(*It)) + { + It = m_HashSet.erase(It); + } + else + { + ++It; + } + } +} + +void +HashKeySet::IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const +{ + for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd; ++It) + { + Callback(*It); + } +} + +////////////////////////////////////////////////////////////////////////// +// +// Testing related code follows... +// + +#if ZEN_WITH_TESTS + +void +hashkeyset_forcelink() +{ +} + +#endif + +} // namespace zen diff --git a/zenstore/include/zenstore/cas.h b/zenstore/include/zenstore/cas.h deleted file mode 100644 index 5592fbd0a..000000000 --- a/zenstore/include/zenstore/cas.h +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "zenstore.h" - -#include <zencore/blake3.h> -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> -#include <zencore/refcount.h> -#include <zencore/timer.h> - -#include <atomic> -#include <filesystem> -#include <functional> -#include <memory> -#include <string> -#include <unordered_set> - -namespace zen { - -class GcContext; -class CasGc; - -struct CasStoreConfiguration -{ - // Root directory for CAS store - std::filesystem::path RootDirectory; - - // Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy - uint64_t TinyValueThreshold = 1024; - - // Threshold above which values are considered 'huge' and managed using the 'huge values' strategy - uint64_t HugeValueThreshold = 1024 * 1024; -}; - -/** Manage a set of IoHash values - */ - -class CasChunkSet -{ -public: - void AddChunkToSet(const IoHash& HashToAdd); - void AddChunksToSet(std::span<const IoHash> HashesToAdd); - void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); - void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback); - [[nodiscard]] inline bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); } - [[nodiscard]] inline bool IsEmpty() const { return m_ChunkSet.empty(); } - [[nodiscard]] inline size_t GetSize() const { return m_ChunkSet.size(); } - - inline void FilterChunks(std::span<const IoHash> Candidates, Invocable<const IoHash&> auto MatchFunc) - { - for (const IoHash& Candidate : Candidates) - { - if (ContainsChunk(Candidate)) - { - MatchFunc(Candidate); - } - } - } - - inline void FilterChunks(std::span<const IoHash> Candidates, Invocable<const IoHash&, bool> auto MatchFunc) - { - for (const IoHash& Candidate : Candidates) - { - MatchFunc(Candidate, ContainsChunk(Candidate)); - } - } - -private: - // Q: should we protect this with a lock, or is that a higher level concern? - std::unordered_set<IoHash, IoHash::Hasher> m_ChunkSet; -}; - -/** Context object for data scrubbing - * - * Data scrubbing is when we traverse stored data to validate it and - * optionally correct/recover - */ - -class ScrubContext -{ -public: - virtual void ReportBadCasChunks(std::span<IoHash> BadCasChunks); - inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } - inline bool RunRecovery() const { return m_Recover; } - void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes); - - inline uint64_t ScrubbedChunks() const { return m_ChunkCount; } - inline uint64_t ScrubbedBytes() const { return m_ByteCount; } - -private: - uint64_t m_ScrubTime = GetHifreqTimerValue(); - bool m_Recover = true; - std::atomic<uint64_t> m_ChunkCount{0}; - std::atomic<uint64_t> m_ByteCount{0}; - CasChunkSet m_BadCas; - CasChunkSet m_BadCid; -}; - -struct CasStoreSize -{ - uint64_t TinySize{}; - uint64_t SmallSize{}; - uint64_t LargeSize{}; - uint64_t TotalSize{}; -}; - -/** Content Addressable Storage interface - - */ - -class CasStore -{ -public: - virtual ~CasStore() = default; - - const CasStoreConfiguration& Config() { return m_Config; } - - struct InsertResult - { - bool New = false; - }; - - virtual void Initialize(const CasStoreConfiguration& Config) = 0; - virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash) = 0; - virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; - virtual bool ContainsChunk(const IoHash& ChunkHash) = 0; - virtual void FilterChunks(CasChunkSet& InOutChunks) = 0; - virtual void Flush() = 0; - virtual void Scrub(ScrubContext& Ctx) = 0; - virtual void GarbageCollect(GcContext& GcCtx) = 0; - virtual CasStoreSize TotalSize() const = 0; - -protected: - CasStoreConfiguration m_Config; - uint64_t m_LastScrubTime = 0; -}; - -ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(CasGc& Gc); - -void CAS_forcelink(); - -} // namespace zen diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h index 4b93a708f..c56b653fc 100644 --- a/zenstore/include/zenstore/caslog.h +++ b/zenstore/include/zenstore/caslog.h @@ -2,8 +2,6 @@ #pragma once -#include "zenstore.h" - #include <zencore/uid.h> #include <zenstore/basicfile.h> diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index b0252a2a6..21e3c3160 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -5,7 +5,7 @@ #include "zenstore.h" #include <zencore/iohash.h> -#include <zenstore/cas.h> +#include <zenstore/hashkeyset.h> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> @@ -15,53 +15,68 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +class GcManager; class CasStore; class CompressedBuffer; class IoBuffer; +class ScrubContext; /** Content Store * - * Data in the content store is referenced by content identifiers (CIDs), rather than their - * literal hash. This class maps uncompressed hashes to compressed hashes and may + * Data in the content store is referenced by content identifiers (CIDs), it works + * with compressed buffers so the CID is expected to be the RAW hash. It stores the + * chunk directly under the RAW hash. + * This class maps uncompressed hashes (CIDs) to compressed hashes and may * be used to deal with other kinds of indirections in the future. For example, if we want * to support chunking then a CID may represent a list of chunks which could be concatenated * to form the referenced chunk. * - * It would likely be possible to implement this mapping in a more efficient way if we - * integrate it into the CAS store itself, so we can avoid maintaining copies of large - * hashes in multiple locations. This would also allow us to consolidate commit logs etc - * which would be more resilient than the current split log scheme - * */ + +struct CidStoreSize +{ + uint64_t TinySize = 0; + uint64_t SmallSize = 0; + uint64_t LargeSize = 0; + uint64_t TotalSize = 0; +}; + +struct CidStoreConfiguration +{ + // Root directory for CAS store + std::filesystem::path RootDirectory; + + // Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy + uint64_t TinyValueThreshold = 1024; + + // Threshold above which values are considered 'huge' and managed using the 'huge values' strategy + uint64_t HugeValueThreshold = 1024 * 1024; +}; + class CidStore { public: - CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir); + CidStore(GcManager& Gc); ~CidStore(); struct InsertResult { - IoHash DecompressedId; - IoHash CompressedHash; - bool New = false; + bool New = false; }; - InsertResult AddChunk(CompressedBuffer& ChunkData); - void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed); + void Initialize(const CidStoreConfiguration& Config); + InsertResult AddChunk(const CompressedBuffer& ChunkData); IoBuffer FindChunkByCid(const IoHash& DecompressedId); bool ContainsChunk(const IoHash& DecompressedId); + void FilterChunks(HashKeySet& InOutChunks); void Flush(); void Scrub(ScrubContext& Ctx); - void RemoveCids(CasChunkSet& CasChunks); - CasStoreSize CasSize() const; - - // TODO: add batch filter support - - IoHash RemapCid(const IoHash& DecompressedId); + CidStoreSize TotalSize() const; private: struct Impl; - std::unique_ptr<Impl> m_Impl; + std::unique_ptr<CasStore> m_CasStore; + std::unique_ptr<Impl> m_Impl; }; } // namespace zen diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h index 398025181..656e594af 100644 --- a/zenstore/include/zenstore/gc.h +++ b/zenstore/include/zenstore/gc.h @@ -22,8 +22,8 @@ class logger; namespace zen { -class CasChunkSet; -class CasGc; +class HashKeySet; +class GcManager; class CidStore; struct IoHash; @@ -50,18 +50,16 @@ public: GcContext(GcClock::TimePoint Time = GcClock::Now()); ~GcContext(); - void ContributeCids(std::span<const IoHash> Cid); - void ContributeCas(std::span<const IoHash> Hash); - void ContributeCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys); + void AddRetainedCids(std::span<const IoHash> Cid); + void SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys); void IterateCids(std::function<void(const IoHash&)> Callback); void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc); - void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc); - void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&, bool)>&& FilterFunc); + void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc); - void DeletedCas(std::span<const IoHash> Cas); - CasChunkSet& DeletedCas(); + void AddDeletedCids(std::span<const IoHash> Cas); + const HashKeySet& DeletedCids(); std::span<const IoHash> ExpiredCacheKeys(const std::string& CacheKeyContext) const; @@ -97,13 +95,13 @@ private: class GcContributor { public: - GcContributor(CasGc& Gc); + GcContributor(GcManager& Gc); ~GcContributor(); virtual void GatherReferences(GcContext& GcCtx) = 0; protected: - CasGc& m_Gc; + GcManager& m_Gc; }; struct GcStorageSize @@ -117,23 +115,23 @@ struct GcStorageSize class GcStorage { public: - GcStorage(CasGc& Gc); + GcStorage(GcManager& Gc); ~GcStorage(); virtual void CollectGarbage(GcContext& GcCtx) = 0; virtual GcStorageSize StorageSize() const = 0; private: - CasGc& m_Gc; + GcManager& m_Gc; }; /** GC orchestrator */ -class CasGc +class GcManager { public: - CasGc(); - ~CasGc(); + GcManager(); + ~GcManager(); void AddGcContributor(GcContributor* Contributor); void RemoveGcContributor(GcContributor* Contributor); @@ -143,12 +141,14 @@ public: void CollectGarbage(GcContext& GcCtx); - void SetCidStore(CidStore* Cids); - void OnNewCidReferences(std::span<IoHash> Hashes); - void OnCommittedCidReferences(std::span<IoHash> Hashes); - void OnDroppedCidReferences(std::span<IoHash> Hashes); GcStorageSize TotalStorageSize() const; +#if ZEN_USE_REF_TRACKING + void OnNewCidReferences(std::span<IoHash> Hashes); + void OnCommittedCidReferences(std::span<IoHash> Hashes); + void OnDroppedCidReferences(std::span<IoHash> Hashes); +#endif + private: mutable RwLock m_Lock; std::vector<GcContributor*> m_GcContribs; @@ -180,7 +180,7 @@ struct GcSchedulerConfig class GcScheduler { public: - GcScheduler(CasGc& CasGc); + GcScheduler(GcManager& GcManager); ~GcScheduler(); void Initialize(const GcSchedulerConfig& Config); @@ -201,7 +201,7 @@ private: spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - CasGc& m_CasGc; + GcManager& m_GcManager; GcSchedulerConfig m_Config; GcClock::TimePoint m_LastGcTime{}; GcClock::TimePoint m_NextGcTime{}; diff --git a/zenstore/include/zenstore/hashkeyset.h b/zenstore/include/zenstore/hashkeyset.h new file mode 100644 index 000000000..411a6256e --- /dev/null +++ b/zenstore/include/zenstore/hashkeyset.h @@ -0,0 +1,54 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zenstore.h" + +#include <zencore/iohash.h> + +#include <functional> +#include <unordered_set> + +namespace zen { + +/** Manage a set of IoHash values + */ + +class HashKeySet +{ +public: + void AddHashToSet(const IoHash& HashToAdd); + void AddHashesToSet(std::span<const IoHash> HashesToAdd); + void RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); + void IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const; + [[nodiscard]] inline bool ContainsHash(const IoHash& Hash) const { return m_HashSet.find(Hash) != m_HashSet.end(); } + [[nodiscard]] inline bool IsEmpty() const { return m_HashSet.empty(); } + [[nodiscard]] inline size_t GetSize() const { return m_HashSet.size(); } + + inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&> auto MatchFunc) const + { + for (const IoHash& Candidate : Candidates) + { + if (ContainsHash(Candidate)) + { + MatchFunc(Candidate); + } + } + } + + inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&, bool> auto MatchFunc) const + { + for (const IoHash& Candidate : Candidates) + { + MatchFunc(Candidate, ContainsHash(Candidate)); + } + } + +private: + // Q: should we protect this with a lock, or is that a higher level concern? + std::unordered_set<IoHash, IoHash::Hasher> m_HashSet; +}; + +void hashkeyset_forcelink(); + +} // namespace zen diff --git a/zenstore/include/zenstore/scrubcontext.h b/zenstore/include/zenstore/scrubcontext.h new file mode 100644 index 000000000..bf906492c --- /dev/null +++ b/zenstore/include/zenstore/scrubcontext.h @@ -0,0 +1,40 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/timer.h> + +namespace zen { + +/** Context object for data scrubbing + * + * Data scrubbing is when we traverse stored data to validate it and + * optionally correct/recover + */ + +class ScrubContext +{ +public: + virtual void ReportBadCidChunks(std::span<IoHash> BadCasChunks) { m_BadCid.AddHashesToSet(BadCasChunks); } + inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } + inline bool RunRecovery() const { return m_Recover; } + void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) + { + m_ChunkCount.fetch_add(ChunkCount); + m_ByteCount.fetch_add(ChunkBytes); + } + + inline uint64_t ScrubbedChunks() const { return m_ChunkCount; } + inline uint64_t ScrubbedBytes() const { return m_ByteCount; } + + const HashKeySet BadCids() const { return m_BadCid; } + +private: + uint64_t m_ScrubTime = GetHifreqTimerValue(); + bool m_Recover = true; + std::atomic<uint64_t> m_ChunkCount{0}; + std::atomic<uint64_t> m_ByteCount{0}; + HashKeySet m_BadCid; +}; + +} // namespace zen diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp index 5f40b7f60..836cdf691 100644 --- a/zenstore/zenstore.cpp +++ b/zenstore/zenstore.cpp @@ -4,8 +4,10 @@ #include <zenstore/basicfile.h> #include <zenstore/blockstore.h> -#include <zenstore/cas.h> #include <zenstore/gc.h> +#include <zenstore/hashkeyset.h> + +#include "cas.h" #include "compactcas.h" #include "filecas.h" @@ -20,6 +22,7 @@ zenstore_forcelinktests() blockstore_forcelink(); compactcas_forcelink(); gc_forcelink(); + hashkeyset_forcelink(); } } // namespace zen |