// Copyright Epic Games, Inc. All Rights Reserved. #include "zenstore/cidstore.h" #include #include #include #include #include #include #include #include "cas.h" #include namespace zen { struct CidStore::Impl { explicit Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {} CasStore& m_CasStore; void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); } CidStore::InsertResult AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, CidStore::InsertMode Mode) { metrics::RequestStats::Scope $(m_AddChunkOps, ChunkData.GetSize()); #if ZEN_BUILD_DEBUG IoHash VerifyRawHash; uint64_t _; ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _, /*OutOptionalTotalCompressedSize*/ nullptr) && RawHash == VerifyRawHash); #endif IoBuffer Payload(ChunkData); Payload.SetContentType(ZenContentType::kCompressedBinary); CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, RawHash, static_cast(Mode)); if (Result.New) { m_WriteCount++; } return {.New = Result.New}; } std::vector AddChunks(std::span ChunkDatas, std::span RawHashes, CidStore::InsertMode Mode) { if (ChunkDatas.size() == 1) { std::vector Result(1); Result[0] = AddChunk(ChunkDatas[0], RawHashes[0], Mode); return Result; } ZEN_ASSERT(ChunkDatas.size() == RawHashes.size()); std::vector Chunks; Chunks.reserve(ChunkDatas.size()); #if ZEN_BUILD_DEBUG size_t Offset = 0; #endif uint64_t TotalSize = 0; for (const IoBuffer& ChunkData : ChunkDatas) { TotalSize += ChunkData.GetSize(); #if ZEN_BUILD_DEBUG IoHash VerifyRawHash; uint64_t _; ZEN_ASSERT( CompressedBuffer::ValidateCompressedHeader(ChunkData, VerifyRawHash, _, /*OutOptionalTotalCompressedSize*/ nullptr) && RawHashes[Offset++] == VerifyRawHash); #endif Chunks.push_back(ChunkData); Chunks.back().SetContentType(ZenContentType::kCompressedBinary); } metrics::RequestStats::Scope $(m_AddChunkOps, TotalSize); std::vector CasResults = m_CasStore.InsertChunks(Chunks, RawHashes, static_cast(Mode)); ZEN_ASSERT(CasResults.size() == ChunkDatas.size()); std::vector Result; for (const CasStore::InsertResult& CasResult : CasResults) { if (CasResult.New) { m_WriteCount++; } Result.emplace_back(CidStore::InsertResult{.New = CasResult.New}); } return Result; } IoBuffer FindChunkByCid(const IoHash& DecompressedId) { metrics::RequestStats::Scope StatsScope(m_FindChunkOps, 0); IoBuffer Result = m_CasStore.FindChunk(DecompressedId); if (Result) { m_HitCount++; StatsScope.SetBytes(Result.GetSize()); } else { m_MissCount++; } return Result; } bool ContainsChunk(const IoHash& DecompressedId) { // metrics::RequestStats::Scope $(m_ContainChunkOps); return m_CasStore.ContainsChunk(DecompressedId); } void FilterChunks(HashKeySet& InOutChunks) { InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); }); } bool IterateChunks(std::span DecompressedIds, const std::function& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) { return m_CasStore.IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool, LargeSizeLimit); } void Flush() { m_CasStore.Flush(); } #if ZEN_WITH_TESTS void Scrub(ScrubContext& Ctx) { m_CasStore.Scrub(Ctx); } #endif // ZEN_WITH_TESTS CidStoreStats Stats() { return CidStoreStats{ .HitCount = m_HitCount, .MissCount = m_MissCount, .WriteCount = m_WriteCount, .AddChunkOps = m_AddChunkOps.Snapshot(), .FindChunkOps = m_FindChunkOps.Snapshot() // .ContainChunkOps = m_ContainChunkOps.Snapshot() }; } void ReportMetrics(StatsMetrics& Statsd) { const CidStoreStats Now = Stats(); const CidStoreStats& Old = m_LastReportedMetrics; Statsd.Meter("zen.cas_hits", Now.HitCount - Old.HitCount); Statsd.Meter("zen.cas_misses", Now.MissCount - Old.MissCount); Statsd.Meter("zen.cas_writes", Now.WriteCount - Old.WriteCount); m_LastReportedMetrics = Now; } std::atomic_uint64_t m_HitCount{}; std::atomic_uint64_t m_MissCount{}; std::atomic_uint64_t m_WriteCount{}; metrics::RequestStats m_AddChunkOps; metrics::RequestStats m_FindChunkOps; CidStoreStats m_LastReportedMetrics; uint64_t m_LastScrubTime = 0; }; ////////////////////////////////////////////////////////////////////////// CidStore::CidStore(GcManager& Gc) : m_CasStore(CreateCasStore(Gc)), m_Impl(std::make_unique(*m_CasStore)) { } CidStore::~CidStore() { } void CidStore::Initialize(const CidStoreConfiguration& Config) { m_Impl->Initialize(Config); } CidStore::InsertResult CidStore::AddChunk(const IoBuffer& ChunkData, const IoHash& RawHash, InsertMode Mode) { return m_Impl->AddChunk(ChunkData, RawHash, Mode); } std::vector CidStore::AddChunks(std::span ChunkDatas, std::span RawHashes, InsertMode Mode) { return m_Impl->AddChunks(ChunkDatas, RawHashes, Mode); } IoBuffer CidStore::FindChunkByCid(const IoHash& DecompressedId) { return m_Impl->FindChunkByCid(DecompressedId); } bool CidStore::ContainsChunk(const IoHash& DecompressedId) { return m_Impl->ContainsChunk(DecompressedId); } bool CidStore::IterateChunks(std::span DecompressedIds, const std::function& AsyncCallback, WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) { return m_Impl->IterateChunks(DecompressedIds, AsyncCallback, OptionalWorkerPool, LargeSizeLimit); } void CidStore::FilterChunks(HashKeySet& InOutChunks) { return m_Impl->FilterChunks(InOutChunks); } void CidStore::Flush() { m_Impl->Flush(); } #if ZEN_WITH_TESTS void CidStore::Scrub(ScrubContext& Ctx) { m_Impl->Scrub(Ctx); } #endif // ZEN_WITH_TESTS CidStoreSize CidStore::TotalSize() const { return m_Impl->m_CasStore.TotalSize(); } CidStoreStats CidStore::Stats() const { return m_Impl->Stats(); } void CidStore::ReportMetrics(StatsMetrics& Statsd) { return m_Impl->ReportMetrics(Statsd); } } // namespace zen