aboutsummaryrefslogtreecommitdiff
path: root/zenstore/cas.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-12-14 12:34:47 +0100
committerPer Larsson <[email protected]>2021-12-14 12:34:47 +0100
commitb6c6568e1618f10d2160d836b65e35586e3c740f (patch)
treef6a929cf918850bbba87d0ee67cd3482b2d50e24 /zenstore/cas.cpp
parentFixed bug in z$ service returning partial cache records and enable small obje... (diff)
parentPartial revert b363c5b (diff)
downloadzen-b6c6568e1618f10d2160d836b65e35586e3c740f.tar.xz
zen-b6c6568e1618f10d2160d836b65e35586e3c740f.zip
Merged main.
Diffstat (limited to 'zenstore/cas.cpp')
-rw-r--r--zenstore/cas.cpp403
1 files changed, 403 insertions, 0 deletions
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp
new file mode 100644
index 000000000..d4023bdc4
--- /dev/null
+++ b/zenstore/cas.cpp
@@ -0,0 +1,403 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/cas.h>
+
+#include "compactcas.h"
+#include "filecas.h"
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/except.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/memory.h>
+#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
+#include <zencore/thread.h>
+#include <zencore/uid.h>
+#include <zenstore/gc.h>
+
+#include <gsl/gsl-lite.hpp>
+
+#include <filesystem>
+#include <functional>
+#include <unordered_map>
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+void
+CasChunkSet::AddChunkToSet(const IoHash& HashToAdd)
+{
+ m_ChunkSet.insert(HashToAdd);
+}
+
+void
+CasChunkSet::AddChunksToSet(std::span<const IoHash> HashesToAdd)
+{
+ for (const IoHash& Hash : HashesToAdd)
+ {
+ m_ChunkSet.insert(Hash);
+ }
+}
+
+void
+CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate)
+{
+ for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;)
+ {
+ if (Predicate(*It))
+ {
+ It = m_ChunkSet.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
+CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback)
+{
+ for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd; ++It)
+ {
+ Callback(*It);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks)
+{
+ m_BadCas.AddChunksToSet(BadCasChunks);
+}
+
+void
+ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
+{
+ m_ChunkCount.fetch_add(ChunkCount);
+ m_ByteCount.fetch_add(ChunkBytes);
+}
+
+/**
+ * CAS store implementation
+ *
+ * Uses a basic strategy of splitting payloads by size, to improve ability to reclaim space
+ * quickly for unused large chunks and to maintain locality for small chunks which are
+ * frequently accessed together.
+ *
+ */
+class CasImpl : public CasStore
+{
+public:
+ CasImpl(CasGc& Gc);
+ virtual ~CasImpl();
+
+ virtual void Initialize(const CasStoreConfiguration& InConfig) override;
+ virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) override;
+ virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
+ virtual bool ContainsChunk(const IoHash& ChunkHash) override;
+ virtual void FilterChunks(CasChunkSet& InOutChunks) override;
+ virtual void Flush() override;
+ virtual void Scrub(ScrubContext& Ctx) override;
+ virtual void GarbageCollect(GcContext& GcCtx) override;
+ virtual CasStoreSize TotalSize() const override;
+
+private:
+ CasContainerStrategy m_TinyStrategy;
+ CasContainerStrategy m_SmallStrategy;
+ FileCasStrategy m_LargeStrategy;
+ CbObject m_ManifestObject;
+
+ enum class StorageScheme
+ {
+ Legacy = 0,
+ WithCbManifest = 1
+ };
+
+ StorageScheme m_StorageScheme = StorageScheme::Legacy;
+
+ bool OpenOrCreateManifest();
+ void UpdateManifest();
+};
+
+CasImpl::CasImpl(CasGc& Gc) : m_TinyStrategy(m_Config, Gc), m_SmallStrategy(m_Config, Gc), m_LargeStrategy(m_Config, Gc)
+{
+}
+
+CasImpl::~CasImpl()
+{
+}
+
+void
+CasImpl::Initialize(const CasStoreConfiguration& InConfig)
+{
+ m_Config = InConfig;
+
+ ZEN_INFO("initializing CAS pool at '{}'", m_Config.RootDirectory);
+
+ // Ensure root directory exists - create if it doesn't exist already
+
+ std::filesystem::create_directories(m_Config.RootDirectory);
+
+ // Open or create manifest
+
+ const bool IsNewStore = OpenOrCreateManifest();
+
+ // Initialize payload storage
+
+ m_LargeStrategy.Initialize(IsNewStore);
+ m_TinyStrategy.Initialize("tobs", 16, IsNewStore);
+ m_SmallStrategy.Initialize("sobs", 4096, IsNewStore);
+}
+
+bool
+CasImpl::OpenOrCreateManifest()
+{
+ bool IsNewStore = false;
+
+ std::filesystem::path ManifestPath = m_Config.RootDirectory;
+ ManifestPath /= ".ucas_root";
+
+ std::error_code Ec;
+ BasicFile ManifestFile;
+ ManifestFile.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec);
+
+ bool ManifestIsOk = false;
+
+ if (Ec)
+ {
+ if (Ec == std::errc::no_such_file_or_directory)
+ {
+ IsNewStore = true;
+ }
+ }
+ else
+ {
+ IoBuffer ManifestBuffer = ManifestFile.ReadAll();
+ ManifestFile.Close();
+
+ if (ManifestBuffer.Size() > 0 && ManifestBuffer.Data<uint8_t>()[0] == '#')
+ {
+ // Old-style manifest, does not contain any useful information, so we may as well update it
+ }
+ else
+ {
+ CbObject Manifest{SharedBuffer(ManifestBuffer)};
+ CbValidateError ValidationResult = ValidateCompactBinary(ManifestBuffer, CbValidateMode::All);
+
+ if (ValidationResult == CbValidateError::None)
+ {
+ if (Manifest["id"])
+ {
+ ManifestIsOk = true;
+ }
+ }
+ else
+ {
+ ZEN_ERROR("Store manifest validation failed: {:#x}, will generate new manifest to recover", ValidationResult);
+ }
+
+ if (ManifestIsOk)
+ {
+ m_ManifestObject = std::move(Manifest);
+ }
+ }
+ }
+
+ if (!ManifestIsOk)
+ {
+ UpdateManifest();
+ }
+
+ return IsNewStore;
+}
+
+void
+CasImpl::UpdateManifest()
+{
+ if (!m_ManifestObject)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "id" << zen::Oid::NewOid() << "created" << DateTime::Now();
+ m_ManifestObject = Cbo.Save();
+ }
+
+ // Write manifest to file
+
+ std::filesystem::path ManifestPath = m_Config.RootDirectory;
+ ManifestPath /= ".ucas_root";
+
+ // This will throw on failure
+
+ ZEN_TRACE("Writing new manifest to '{}'", ManifestPath);
+
+ BasicFile Marker;
+ Marker.Open(ManifestPath.c_str(), /* IsCreate */ true);
+ Marker.Write(m_ManifestObject.GetBuffer(), 0);
+}
+
+CasStore::InsertResult
+CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
+{
+ const uint64_t ChunkSize = Chunk.Size();
+
+ if (ChunkSize < m_Config.TinyValueThreshold)
+ {
+ ZEN_ASSERT(ChunkSize);
+
+ return m_TinyStrategy.InsertChunk(Chunk, ChunkHash);
+ }
+ else if (ChunkSize < m_Config.HugeValueThreshold)
+ {
+ return m_SmallStrategy.InsertChunk(Chunk, ChunkHash);
+ }
+
+ return m_LargeStrategy.InsertChunk(Chunk, ChunkHash);
+}
+
+IoBuffer
+CasImpl::FindChunk(const IoHash& ChunkHash)
+{
+ if (IoBuffer Found = m_SmallStrategy.FindChunk(ChunkHash))
+ {
+ return Found;
+ }
+
+ if (IoBuffer Found = m_TinyStrategy.FindChunk(ChunkHash))
+ {
+ return Found;
+ }
+
+ if (IoBuffer Found = m_LargeStrategy.FindChunk(ChunkHash))
+ {
+ return Found;
+ }
+
+ // Not found
+ return IoBuffer{};
+}
+
+bool
+CasImpl::ContainsChunk(const IoHash& ChunkHash)
+{
+ return m_SmallStrategy.HaveChunk(ChunkHash) || m_TinyStrategy.HaveChunk(ChunkHash) || m_LargeStrategy.HaveChunk(ChunkHash);
+}
+
+void
+CasImpl::FilterChunks(CasChunkSet& InOutChunks)
+{
+ m_SmallStrategy.FilterChunks(InOutChunks);
+ m_TinyStrategy.FilterChunks(InOutChunks);
+ m_LargeStrategy.FilterChunks(InOutChunks);
+}
+
+void
+CasImpl::Flush()
+{
+ m_SmallStrategy.Flush();
+ m_TinyStrategy.Flush();
+ m_LargeStrategy.Flush();
+}
+
+void
+CasImpl::Scrub(ScrubContext& Ctx)
+{
+ if (m_LastScrubTime == Ctx.ScrubTimestamp())
+ {
+ return;
+ }
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
+ m_SmallStrategy.Scrub(Ctx);
+ m_TinyStrategy.Scrub(Ctx);
+ m_LargeStrategy.Scrub(Ctx);
+}
+
+void
+CasImpl::GarbageCollect(GcContext& GcCtx)
+{
+ m_SmallStrategy.CollectGarbage(GcCtx);
+ m_TinyStrategy.CollectGarbage(GcCtx);
+ m_LargeStrategy.CollectGarbage(GcCtx);
+}
+
+CasStoreSize
+CasImpl::TotalSize() const
+{
+ const uint64_t Tiny = m_TinyStrategy.StorageSize().DiskSize;
+ const uint64_t Small = m_SmallStrategy.StorageSize().DiskSize;
+ const uint64_t Large = m_LargeStrategy.StorageSize().DiskSize;
+
+ return {.TinySize = Tiny, .SmallSize = Small, .LargeSize = Large, .TotalSize = Tiny + Small + Large};
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<CasStore>
+CreateCasStore(CasGc& Gc)
+{
+ return std::make_unique<CasImpl>(Gc);
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Testing related code follows...
+//
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("CasStore")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration config;
+ config.RootDirectory = TempDir.Path();
+
+ CasGc Gc;
+
+ std::unique_ptr<CasStore> Store = CreateCasStore(Gc);
+ Store->Initialize(config);
+
+ ScrubContext Ctx;
+ Store->Scrub(Ctx);
+
+ IoBuffer Value1{16};
+ memcpy(Value1.MutableData(), "1234567890123456", 16);
+ IoHash Hash1 = IoHash::HashBuffer(Value1.Data(), Value1.Size());
+ CasStore::InsertResult Result1 = Store->InsertChunk(Value1, Hash1);
+ CHECK(Result1.New);
+
+ IoBuffer Value2{16};
+ memcpy(Value2.MutableData(), "ABCDEFGHIJKLMNOP", 16);
+ IoHash Hash2 = IoHash::HashBuffer(Value2.Data(), Value2.Size());
+ CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2);
+ CHECK(Result2.New);
+
+ CasChunkSet ChunkSet;
+ ChunkSet.AddChunkToSet(Hash1);
+ ChunkSet.AddChunkToSet(Hash2);
+
+ Store->FilterChunks(ChunkSet);
+ CHECK(ChunkSet.IsEmpty());
+
+ IoBuffer Lookup1 = Store->FindChunk(Hash1);
+ CHECK(Lookup1);
+ IoBuffer Lookup2 = Store->FindChunk(Hash2);
+ CHECK(Lookup2);
+}
+
+void
+CAS_forcelink()
+{
+}
+
+#endif
+
+} // namespace zen