// Copyright Epic Games, Inc. All Rights Reserved. #include #include "compactcas.h" #include "filecas.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include ////////////////////////////////////////////////////////////////////////// namespace zen { void CasChunkSet::AddChunkToSet(const IoHash& HashToAdd) { m_ChunkSet.insert(HashToAdd); } void CasChunkSet::AddChunksToSet(std::span HashesToAdd) { m_ChunkSet.insert(HashesToAdd.begin(), HashesToAdd.end()); } void CasChunkSet::RemoveChunksIf(std::function&& Predicate) { for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;) { if (Predicate(*It)) { It = m_ChunkSet.erase(It); } else { ++It; } } } void CasChunkSet::IterateChunks(std::function&& Callback) { for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd; ++It) { Callback(*It); } } ////////////////////////////////////////////////////////////////////////// void ScrubContext::ReportBadCasChunks(std::span BadCasChunks) { m_BadCas.AddChunksToSet(BadCasChunks); } void ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) { m_ChunkCount.fetch_add(ChunkCount); m_ByteCount.fetch_add(ChunkBytes); } /** * CAS store implementation * * Uses a basic strategy of splitting payloads by size, to improve ability to reclaim space * quickly for unused large chunks and to maintain locality for small chunks which are * frequently accessed together. * */ class CasImpl : public CasStore { public: CasImpl(CasGc& Gc); virtual ~CasImpl(); virtual void Initialize(const CasStoreConfiguration& InConfig) override; virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) override; virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; virtual bool ContainsChunk(const IoHash& ChunkHash) override; virtual void FilterChunks(CasChunkSet& InOutChunks) override; virtual void Flush() override; virtual void Scrub(ScrubContext& Ctx) override; virtual void GarbageCollect(GcContext& GcCtx) override; virtual CasStoreSize TotalSize() const override; private: CasContainerStrategy m_TinyStrategy; CasContainerStrategy m_SmallStrategy; FileCasStrategy m_LargeStrategy; CbObject m_ManifestObject; enum class StorageScheme { Legacy = 0, WithCbManifest = 1 }; StorageScheme m_StorageScheme = StorageScheme::Legacy; bool OpenOrCreateManifest(); void UpdateManifest(); }; CasImpl::CasImpl(CasGc& Gc) : m_TinyStrategy(m_Config, Gc), m_SmallStrategy(m_Config, Gc), m_LargeStrategy(m_Config, Gc) { } CasImpl::~CasImpl() { } void CasImpl::Initialize(const CasStoreConfiguration& InConfig) { m_Config = InConfig; ZEN_INFO("initializing CAS pool at '{}'", m_Config.RootDirectory); // Ensure root directory exists - create if it doesn't exist already std::filesystem::create_directories(m_Config.RootDirectory); // Open or create manifest const bool IsNewStore = OpenOrCreateManifest(); // Initialize payload storage m_LargeStrategy.Initialize(IsNewStore); m_TinyStrategy.Initialize("tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block m_SmallStrategy.Initialize("sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block } bool CasImpl::OpenOrCreateManifest() { bool IsNewStore = false; std::filesystem::path ManifestPath = m_Config.RootDirectory; ManifestPath /= ".ucas_root"; std::error_code Ec; BasicFile ManifestFile; ManifestFile.Open(ManifestPath.c_str(), BasicFile::Mode::kRead, Ec); bool ManifestIsOk = false; if (Ec) { if (Ec == std::errc::no_such_file_or_directory) { IsNewStore = true; } } else { IoBuffer ManifestBuffer = ManifestFile.ReadAll(); ManifestFile.Close(); if (ManifestBuffer.Size() > 0 && ManifestBuffer.Data()[0] == '#') { // Old-style manifest, does not contain any useful information, so we may as well update it } else { CbObject Manifest{SharedBuffer(ManifestBuffer)}; CbValidateError ValidationResult = ValidateCompactBinary(ManifestBuffer, CbValidateMode::All); if (ValidationResult == CbValidateError::None) { if (Manifest["id"]) { ManifestIsOk = true; } } else { ZEN_ERROR("Store manifest validation failed: {:#x}, will generate new manifest to recover", uint32_t(ValidationResult)); } if (ManifestIsOk) { m_ManifestObject = std::move(Manifest); } } } if (!ManifestIsOk) { UpdateManifest(); } return IsNewStore; } void CasImpl::UpdateManifest() { if (!m_ManifestObject) { CbObjectWriter Cbo; Cbo << "id" << zen::Oid::NewOid() << "created" << DateTime::Now(); m_ManifestObject = Cbo.Save(); } // Write manifest to file std::filesystem::path ManifestPath = m_Config.RootDirectory; ManifestPath /= ".ucas_root"; // This will throw on failure ZEN_TRACE("Writing new manifest to '{}'", ManifestPath); BasicFile Marker; Marker.Open(ManifestPath.c_str(), BasicFile::Mode::kTruncate); Marker.Write(m_ManifestObject.GetBuffer(), 0); } CasStore::InsertResult CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { ZEN_TRACE_CPU("CAS::InsertChunk"); const uint64_t ChunkSize = Chunk.Size(); if (ChunkSize < m_Config.TinyValueThreshold) { ZEN_ASSERT(ChunkSize); return m_TinyStrategy.InsertChunk(Chunk, ChunkHash); } else if (ChunkSize < m_Config.HugeValueThreshold) { return m_SmallStrategy.InsertChunk(Chunk, ChunkHash); } return m_LargeStrategy.InsertChunk(Chunk, ChunkHash); } IoBuffer CasImpl::FindChunk(const IoHash& ChunkHash) { ZEN_TRACE_CPU("CAS::FindChunk"); if (IoBuffer Found = m_SmallStrategy.FindChunk(ChunkHash)) { return Found; } if (IoBuffer Found = m_TinyStrategy.FindChunk(ChunkHash)) { return Found; } if (IoBuffer Found = m_LargeStrategy.FindChunk(ChunkHash)) { return Found; } // Not found return IoBuffer{}; } bool CasImpl::ContainsChunk(const IoHash& ChunkHash) { return m_SmallStrategy.HaveChunk(ChunkHash) || m_TinyStrategy.HaveChunk(ChunkHash) || m_LargeStrategy.HaveChunk(ChunkHash); } void CasImpl::FilterChunks(CasChunkSet& InOutChunks) { m_SmallStrategy.FilterChunks(InOutChunks); m_TinyStrategy.FilterChunks(InOutChunks); m_LargeStrategy.FilterChunks(InOutChunks); } void CasImpl::Flush() { m_SmallStrategy.Flush(); m_TinyStrategy.Flush(); m_LargeStrategy.Flush(); } void CasImpl::Scrub(ScrubContext& Ctx) { if (m_LastScrubTime == Ctx.ScrubTimestamp()) { return; } m_LastScrubTime = Ctx.ScrubTimestamp(); m_SmallStrategy.Scrub(Ctx); m_TinyStrategy.Scrub(Ctx); m_LargeStrategy.Scrub(Ctx); } void CasImpl::GarbageCollect(GcContext& GcCtx) { m_SmallStrategy.CollectGarbage(GcCtx); m_TinyStrategy.CollectGarbage(GcCtx); m_LargeStrategy.CollectGarbage(GcCtx); } CasStoreSize CasImpl::TotalSize() const { const uint64_t Tiny = m_TinyStrategy.StorageSize().DiskSize; const uint64_t Small = m_SmallStrategy.StorageSize().DiskSize; const uint64_t Large = m_LargeStrategy.StorageSize().DiskSize; return {.TinySize = Tiny, .SmallSize = Small, .LargeSize = Large, .TotalSize = Tiny + Small + Large}; } ////////////////////////////////////////////////////////////////////////// std::unique_ptr CreateCasStore(CasGc& Gc) { return std::make_unique(Gc); } ////////////////////////////////////////////////////////////////////////// // // Testing related code follows... // #if ZEN_WITH_TESTS TEST_CASE("CasStore") { ScopedTemporaryDirectory TempDir; CasStoreConfiguration config; config.RootDirectory = TempDir.Path(); CasGc Gc; std::unique_ptr Store = CreateCasStore(Gc); Store->Initialize(config); ScrubContext Ctx; Store->Scrub(Ctx); IoBuffer Value1{16}; memcpy(Value1.MutableData(), "1234567890123456", 16); IoHash Hash1 = IoHash::HashBuffer(Value1.Data(), Value1.Size()); CasStore::InsertResult Result1 = Store->InsertChunk(Value1, Hash1); CHECK(Result1.New); IoBuffer Value2{16}; memcpy(Value2.MutableData(), "ABCDEFGHIJKLMNOP", 16); IoHash Hash2 = IoHash::HashBuffer(Value2.Data(), Value2.Size()); CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2); CHECK(Result2.New); CasChunkSet ChunkSet; ChunkSet.AddChunkToSet(Hash1); ChunkSet.AddChunkToSet(Hash2); Store->FilterChunks(ChunkSet); CHECK(ChunkSet.IsEmpty()); IoBuffer Lookup1 = Store->FindChunk(Hash1); CHECK(Lookup1); IoBuffer Lookup2 = Store->FindChunk(Hash2); CHECK(Lookup2); } void CAS_forcelink() { } #endif } // namespace zen