diff options
| author | Martin Ridgers <[email protected]> | 2021-10-29 15:10:28 +0200 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-10-29 15:49:27 +0200 |
| commit | a0bd0821e7a23f1ff3476151c0702be3691dc582 (patch) | |
| tree | 3910d5d41e32be1a9b48ffdbf612dc99615e604a /zenstore/cas.cpp | |
| parent | Disabled SpawnServer() on POSIX for time being (diff) | |
| download | zen-a0bd0821e7a23f1ff3476151c0702be3691dc582.tar.xz zen-a0bd0821e7a23f1ff3476151c0702be3691dc582.zip | |
CAS.cpp/h -> cas.cpp/h to keep Zen's file casing consistent
Diffstat (limited to 'zenstore/cas.cpp')
| -rw-r--r-- | zenstore/cas.cpp | 325 |
1 files changed, 325 insertions, 0 deletions
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp new file mode 100644 index 000000000..4268e314b --- /dev/null +++ b/zenstore/cas.cpp @@ -0,0 +1,325 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/cas.h> + +#include "compactcas.h" +#include "filecas.h" + +#include <zencore/except.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/memory.h> +#include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> +#include <zencore/thread.h> +#include <zencore/uid.h> + +#include <gsl/gsl-lite.hpp> + +#include <filesystem> +#include <functional> +#include <unordered_map> + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +void +CasChunkSet::AddChunkToSet(const IoHash& HashToAdd) +{ + m_ChunkSet.insert(HashToAdd); +} + +void +CasChunkSet::AddChunksToSet(std::span<const IoHash> HashesToAdd) +{ + for (const IoHash& Hash : HashesToAdd) + { + m_ChunkSet.insert(Hash); + } +} + +void +CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate) +{ + for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;) + { + if (Predicate(*It)) + { + It = m_ChunkSet.erase(It); + } + else + { + ++It; + } + } +} + +void +CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback) +{ + for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd; ++It) + { + Callback(*It); + } +} + +////////////////////////////////////////////////////////////////////////// + +struct GcContext::GcState +{ + CasChunkSet m_CasChunks; + CasChunkSet m_CidChunks; +}; + +GcContext::GcContext() : m_State(std::make_unique<GcState>()) +{ +} + +GcContext::~GcContext() +{ +} + +void +GcContext::ContributeCids(std::span<const IoHash> Cids) +{ + m_State->m_CidChunks.AddChunksToSet(Cids); +} + +void +GcContext::ContributeCas(std::span<const IoHash> Cas) +{ + m_State->m_CasChunks.AddChunksToSet(Cas); +} + +////////////////////////////////////////////////////////////////////////// + +void +ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks) +{ + m_BadCas.AddChunksToSet(BadCasChunks); +} + +void +ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) +{ + m_ChunkCount.fetch_add(ChunkCount); + m_ByteCount.fetch_add(ChunkBytes); +} + +/** + * CAS store implementation + * + * Uses a basic strategy of splitting payloads by size, to improve ability to reclaim space + * quickly for unused large chunks and to maintain locality for small chunks which are + * frequently accessed together. + * + */ +class CasImpl : public CasStore +{ +public: + CasImpl(); + virtual ~CasImpl(); + + virtual void Initialize(const CasStoreConfiguration& InConfig) override; + virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) override; + virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; + virtual void FilterChunks(CasChunkSet& InOutChunks) override; + virtual void Flush() override; + virtual void Scrub(ScrubContext& Ctx) override; + +private: + CasContainerStrategy m_TinyStrategy; + CasContainerStrategy m_SmallStrategy; + FileCasStrategy m_LargeStrategy; +}; + +CasImpl::CasImpl() : m_TinyStrategy(m_Config), m_SmallStrategy(m_Config), m_LargeStrategy(m_Config) +{ +} + +CasImpl::~CasImpl() +{ +} + +void +CasImpl::Initialize(const CasStoreConfiguration& InConfig) +{ + m_Config = InConfig; + + ZEN_INFO("initializing CAS pool at '{}'", m_Config.RootDirectory); + + // Ensure root directory exists - create if it doesn't exist already + + std::filesystem::create_directories(m_Config.RootDirectory); + + // Open or create manifest + // + // The manifest is not currently fully implemented. The goal is to + // use it for recovery and configuration + + bool IsNewStore = false; + + { + std::filesystem::path ManifestPath = m_Config.RootDirectory; + ManifestPath /= ".ucas_root"; + + std::error_code Ec; + BasicFile Marker; + Marker.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec); + + if (Ec) + { + IsNewStore = true; + + ExtendableStringBuilder<128> manifest; + manifest.Append("#CAS_ROOT\n"); + manifest.Append("ID="); + zen::Oid id = zen::Oid::NewOid(); + id.ToString(manifest); + + Marker.Open(ManifestPath.c_str(), /* IsCreate */ true); + Marker.Write(manifest.c_str(), uint32_t(manifest.Size()), 0); + } + } + + // Initialize payload storage + + m_TinyStrategy.Initialize("tobs", 16, IsNewStore); + m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); +} + +CasStore::InsertResult +CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) +{ + const uint64_t ChunkSize = Chunk.Size(); + + if (ChunkSize < m_Config.TinyValueThreshold) + { + ZEN_ASSERT(ChunkSize); + + return m_TinyStrategy.InsertChunk(Chunk, ChunkHash); + } + else if (ChunkSize < m_Config.HugeValueThreshold) + { + return m_SmallStrategy.InsertChunk(Chunk, ChunkHash); + } + + return m_LargeStrategy.InsertChunk(Chunk, ChunkHash); +} + +IoBuffer +CasImpl::FindChunk(const IoHash& ChunkHash) +{ + if (IoBuffer Found = m_SmallStrategy.FindChunk(ChunkHash)) + { + return Found; + } + + if (IoBuffer Found = m_TinyStrategy.FindChunk(ChunkHash)) + { + return Found; + } + + if (IoBuffer Found = m_LargeStrategy.FindChunk(ChunkHash)) + { + return Found; + } + + // Not found + return IoBuffer{}; +} + +void +CasImpl::FilterChunks(CasChunkSet& InOutChunks) +{ + m_SmallStrategy.FilterChunks(InOutChunks); + m_TinyStrategy.FilterChunks(InOutChunks); + m_LargeStrategy.FilterChunks(InOutChunks); +} + +void +CasImpl::Flush() +{ + m_SmallStrategy.Flush(); + m_TinyStrategy.Flush(); + m_LargeStrategy.Flush(); +} + +void +CasImpl::Scrub(ScrubContext& Ctx) +{ + if (m_LastScrubTime == Ctx.ScrubTimestamp()) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + + m_SmallStrategy.Scrub(Ctx); + m_TinyStrategy.Scrub(Ctx); + m_LargeStrategy.Scrub(Ctx); +} + +////////////////////////////////////////////////////////////////////////// + +CasStore* +CreateCasStore() +{ + return new CasImpl(); +} + +////////////////////////////////////////////////////////////////////////// +// +// Testing related code follows... +// + +#if ZEN_WITH_TESTS + +TEST_CASE("CasStore") +{ + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration config; + config.RootDirectory = TempDir.Path(); + + std::unique_ptr<CasStore> Store{CreateCasStore()}; + Store->Initialize(config); + + ScrubContext Ctx; + Store->Scrub(Ctx); + + IoBuffer Value1{16}; + memcpy(Value1.MutableData(), "1234567890123456", 16); + IoHash Hash1 = IoHash::HashBuffer(Value1.Data(), Value1.Size()); + CasStore::InsertResult Result1 = Store->InsertChunk(Value1, Hash1); + CHECK(Result1.New); + + IoBuffer Value2{16}; + memcpy(Value2.MutableData(), "ABCDEFGHIJKLMNOP", 16); + IoHash Hash2 = IoHash::HashBuffer(Value2.Data(), Value2.Size()); + CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2); + CHECK(Result2.New); + + CasChunkSet ChunkSet; + ChunkSet.AddChunkToSet(Hash1); + ChunkSet.AddChunkToSet(Hash2); + + Store->FilterChunks(ChunkSet); + CHECK(ChunkSet.IsEmpty()); + + IoBuffer Lookup1 = Store->FindChunk(Hash1); + CHECK(Lookup1); + IoBuffer Lookup2 = Store->FindChunk(Hash2); + CHECK(Lookup2); +} + +void +CAS_forcelink() +{ +} + +#endif + +} // namespace zen |