aboutsummaryrefslogtreecommitdiff
path: root/zenstore/cas.cpp
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-10-29 15:10:28 +0200
committerMartin Ridgers <[email protected]>2021-10-29 15:49:27 +0200
commita0bd0821e7a23f1ff3476151c0702be3691dc582 (patch)
tree3910d5d41e32be1a9b48ffdbf612dc99615e604a /zenstore/cas.cpp
parentDisabled SpawnServer() on POSIX for time being (diff)
downloadzen-a0bd0821e7a23f1ff3476151c0702be3691dc582.tar.xz
zen-a0bd0821e7a23f1ff3476151c0702be3691dc582.zip
CAS.cpp/h -> cas.cpp/h to keep Zen's file casing consistent
Diffstat (limited to 'zenstore/cas.cpp')
-rw-r--r--zenstore/cas.cpp325
1 files changed, 325 insertions, 0 deletions
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp
new file mode 100644
index 000000000..4268e314b
--- /dev/null
+++ b/zenstore/cas.cpp
@@ -0,0 +1,325 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/cas.h>
+
+#include "compactcas.h"
+#include "filecas.h"
+
+#include <zencore/except.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/memory.h>
+#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
+#include <zencore/thread.h>
+#include <zencore/uid.h>
+
+#include <gsl/gsl-lite.hpp>
+
+#include <filesystem>
+#include <functional>
+#include <unordered_map>
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+void
+CasChunkSet::AddChunkToSet(const IoHash& HashToAdd)
+{
+ m_ChunkSet.insert(HashToAdd);
+}
+
+void
+CasChunkSet::AddChunksToSet(std::span<const IoHash> HashesToAdd)
+{
+ for (const IoHash& Hash : HashesToAdd)
+ {
+ m_ChunkSet.insert(Hash);
+ }
+}
+
+void
+CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate)
+{
+ for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;)
+ {
+ if (Predicate(*It))
+ {
+ It = m_ChunkSet.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
+CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback)
+{
+ for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd; ++It)
+ {
+ Callback(*It);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+struct GcContext::GcState
+{
+ CasChunkSet m_CasChunks;
+ CasChunkSet m_CidChunks;
+};
+
+GcContext::GcContext() : m_State(std::make_unique<GcState>())
+{
+}
+
+GcContext::~GcContext()
+{
+}
+
+void
+GcContext::ContributeCids(std::span<const IoHash> Cids)
+{
+ m_State->m_CidChunks.AddChunksToSet(Cids);
+}
+
+void
+GcContext::ContributeCas(std::span<const IoHash> Cas)
+{
+ m_State->m_CasChunks.AddChunksToSet(Cas);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks)
+{
+ m_BadCas.AddChunksToSet(BadCasChunks);
+}
+
+void
+ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
+{
+ m_ChunkCount.fetch_add(ChunkCount);
+ m_ByteCount.fetch_add(ChunkBytes);
+}
+
+/**
+ * CAS store implementation
+ *
+ * Uses a basic strategy of splitting payloads by size, to improve ability to reclaim space
+ * quickly for unused large chunks and to maintain locality for small chunks which are
+ * frequently accessed together.
+ *
+ */
+class CasImpl : public CasStore
+{
+public:
+ CasImpl();
+ virtual ~CasImpl();
+
+ virtual void Initialize(const CasStoreConfiguration& InConfig) override;
+ virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) override;
+ virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
+ virtual void FilterChunks(CasChunkSet& InOutChunks) override;
+ virtual void Flush() override;
+ virtual void Scrub(ScrubContext& Ctx) override;
+
+private:
+ CasContainerStrategy m_TinyStrategy;
+ CasContainerStrategy m_SmallStrategy;
+ FileCasStrategy m_LargeStrategy;
+};
+
+CasImpl::CasImpl() : m_TinyStrategy(m_Config), m_SmallStrategy(m_Config), m_LargeStrategy(m_Config)
+{
+}
+
+CasImpl::~CasImpl()
+{
+}
+
+void
+CasImpl::Initialize(const CasStoreConfiguration& InConfig)
+{
+ m_Config = InConfig;
+
+ ZEN_INFO("initializing CAS pool at '{}'", m_Config.RootDirectory);
+
+ // Ensure root directory exists - create if it doesn't exist already
+
+ std::filesystem::create_directories(m_Config.RootDirectory);
+
+ // Open or create manifest
+ //
+ // The manifest is not currently fully implemented. The goal is to
+ // use it for recovery and configuration
+
+ bool IsNewStore = false;
+
+ {
+ std::filesystem::path ManifestPath = m_Config.RootDirectory;
+ ManifestPath /= ".ucas_root";
+
+ std::error_code Ec;
+ BasicFile Marker;
+ Marker.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec);
+
+ if (Ec)
+ {
+ IsNewStore = true;
+
+ ExtendableStringBuilder<128> manifest;
+ manifest.Append("#CAS_ROOT\n");
+ manifest.Append("ID=");
+ zen::Oid id = zen::Oid::NewOid();
+ id.ToString(manifest);
+
+ Marker.Open(ManifestPath.c_str(), /* IsCreate */ true);
+ Marker.Write(manifest.c_str(), uint32_t(manifest.Size()), 0);
+ }
+ }
+
+ // Initialize payload storage
+
+ m_TinyStrategy.Initialize("tobs", 16, IsNewStore);
+ m_SmallStrategy.Initialize("sobs", 4096, IsNewStore);
+}
+
+CasStore::InsertResult
+CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
+{
+ const uint64_t ChunkSize = Chunk.Size();
+
+ if (ChunkSize < m_Config.TinyValueThreshold)
+ {
+ ZEN_ASSERT(ChunkSize);
+
+ return m_TinyStrategy.InsertChunk(Chunk, ChunkHash);
+ }
+ else if (ChunkSize < m_Config.HugeValueThreshold)
+ {
+ return m_SmallStrategy.InsertChunk(Chunk, ChunkHash);
+ }
+
+ return m_LargeStrategy.InsertChunk(Chunk, ChunkHash);
+}
+
+IoBuffer
+CasImpl::FindChunk(const IoHash& ChunkHash)
+{
+ if (IoBuffer Found = m_SmallStrategy.FindChunk(ChunkHash))
+ {
+ return Found;
+ }
+
+ if (IoBuffer Found = m_TinyStrategy.FindChunk(ChunkHash))
+ {
+ return Found;
+ }
+
+ if (IoBuffer Found = m_LargeStrategy.FindChunk(ChunkHash))
+ {
+ return Found;
+ }
+
+ // Not found
+ return IoBuffer{};
+}
+
+void
+CasImpl::FilterChunks(CasChunkSet& InOutChunks)
+{
+ m_SmallStrategy.FilterChunks(InOutChunks);
+ m_TinyStrategy.FilterChunks(InOutChunks);
+ m_LargeStrategy.FilterChunks(InOutChunks);
+}
+
+void
+CasImpl::Flush()
+{
+ m_SmallStrategy.Flush();
+ m_TinyStrategy.Flush();
+ m_LargeStrategy.Flush();
+}
+
+void
+CasImpl::Scrub(ScrubContext& Ctx)
+{
+ if (m_LastScrubTime == Ctx.ScrubTimestamp())
+ {
+ return;
+ }
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
+ m_SmallStrategy.Scrub(Ctx);
+ m_TinyStrategy.Scrub(Ctx);
+ m_LargeStrategy.Scrub(Ctx);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+CasStore*
+CreateCasStore()
+{
+ return new CasImpl();
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Testing related code follows...
+//
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("CasStore")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration config;
+ config.RootDirectory = TempDir.Path();
+
+ std::unique_ptr<CasStore> Store{CreateCasStore()};
+ Store->Initialize(config);
+
+ ScrubContext Ctx;
+ Store->Scrub(Ctx);
+
+ IoBuffer Value1{16};
+ memcpy(Value1.MutableData(), "1234567890123456", 16);
+ IoHash Hash1 = IoHash::HashBuffer(Value1.Data(), Value1.Size());
+ CasStore::InsertResult Result1 = Store->InsertChunk(Value1, Hash1);
+ CHECK(Result1.New);
+
+ IoBuffer Value2{16};
+ memcpy(Value2.MutableData(), "ABCDEFGHIJKLMNOP", 16);
+ IoHash Hash2 = IoHash::HashBuffer(Value2.Data(), Value2.Size());
+ CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2);
+ CHECK(Result2.New);
+
+ CasChunkSet ChunkSet;
+ ChunkSet.AddChunkToSet(Hash1);
+ ChunkSet.AddChunkToSet(Hash2);
+
+ Store->FilterChunks(ChunkSet);
+ CHECK(ChunkSet.IsEmpty());
+
+ IoBuffer Lookup1 = Store->FindChunk(Hash1);
+ CHECK(Lookup1);
+ IoBuffer Lookup2 = Store->FindChunk(Hash2);
+ CHECK(Lookup2);
+}
+
+void
+CAS_forcelink()
+{
+}
+
+#endif
+
+} // namespace zen