diff options
| author | Per Larsson <[email protected]> | 2021-12-14 12:34:47 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-12-14 12:34:47 +0100 |
| commit | b6c6568e1618f10d2160d836b65e35586e3c740f (patch) | |
| tree | f6a929cf918850bbba87d0ee67cd3482b2d50e24 /zenstore/cas.cpp | |
| parent | Fixed bug in z$ service returning partial cache records and enable small obje... (diff) | |
| parent | Partial revert b363c5b (diff) | |
| download | zen-b6c6568e1618f10d2160d836b65e35586e3c740f.tar.xz zen-b6c6568e1618f10d2160d836b65e35586e3c740f.zip | |
Merged main.
Diffstat (limited to 'zenstore/cas.cpp')
| -rw-r--r-- | zenstore/cas.cpp | 403 |
1 files changed, 403 insertions, 0 deletions
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp new file mode 100644 index 000000000..d4023bdc4 --- /dev/null +++ b/zenstore/cas.cpp @@ -0,0 +1,403 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/cas.h> + +#include "compactcas.h" +#include "filecas.h" + +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/except.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/memory.h> +#include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> +#include <zencore/thread.h> +#include <zencore/uid.h> +#include <zenstore/gc.h> + +#include <gsl/gsl-lite.hpp> + +#include <filesystem> +#include <functional> +#include <unordered_map> + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +void +CasChunkSet::AddChunkToSet(const IoHash& HashToAdd) +{ + m_ChunkSet.insert(HashToAdd); +} + +void +CasChunkSet::AddChunksToSet(std::span<const IoHash> HashesToAdd) +{ + for (const IoHash& Hash : HashesToAdd) + { + m_ChunkSet.insert(Hash); + } +} + +void +CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate) +{ + for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;) + { + if (Predicate(*It)) + { + It = m_ChunkSet.erase(It); + } + else + { + ++It; + } + } +} + +void +CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback) +{ + for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd; ++It) + { + Callback(*It); + } +} + +////////////////////////////////////////////////////////////////////////// + +void +ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks) +{ + m_BadCas.AddChunksToSet(BadCasChunks); +} + +void +ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) +{ + m_ChunkCount.fetch_add(ChunkCount); + m_ByteCount.fetch_add(ChunkBytes); +} + +/** + * CAS store implementation + * + * Uses a basic strategy of splitting payloads by size, to improve ability to reclaim space + * quickly for unused large chunks and to maintain locality for small chunks which are + * frequently accessed together. + * + */ +class CasImpl : public CasStore +{ +public: + CasImpl(CasGc& Gc); + virtual ~CasImpl(); + + virtual void Initialize(const CasStoreConfiguration& InConfig) override; + virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) override; + virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; + virtual bool ContainsChunk(const IoHash& ChunkHash) override; + virtual void FilterChunks(CasChunkSet& InOutChunks) override; + virtual void Flush() override; + virtual void Scrub(ScrubContext& Ctx) override; + virtual void GarbageCollect(GcContext& GcCtx) override; + virtual CasStoreSize TotalSize() const override; + +private: + CasContainerStrategy m_TinyStrategy; + CasContainerStrategy m_SmallStrategy; + FileCasStrategy m_LargeStrategy; + CbObject m_ManifestObject; + + enum class StorageScheme + { + Legacy = 0, + WithCbManifest = 1 + }; + + StorageScheme m_StorageScheme = StorageScheme::Legacy; + + bool OpenOrCreateManifest(); + void UpdateManifest(); +}; + +CasImpl::CasImpl(CasGc& Gc) : m_TinyStrategy(m_Config, Gc), m_SmallStrategy(m_Config, Gc), m_LargeStrategy(m_Config, Gc) +{ +} + +CasImpl::~CasImpl() +{ +} + +void +CasImpl::Initialize(const CasStoreConfiguration& InConfig) +{ + m_Config = InConfig; + + ZEN_INFO("initializing CAS pool at '{}'", m_Config.RootDirectory); + + // Ensure root directory exists - create if it doesn't exist already + + std::filesystem::create_directories(m_Config.RootDirectory); + + // Open or create manifest + + const bool IsNewStore = OpenOrCreateManifest(); + + // Initialize payload storage + + m_LargeStrategy.Initialize(IsNewStore); + m_TinyStrategy.Initialize("tobs", 16, IsNewStore); + m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); +} + +bool +CasImpl::OpenOrCreateManifest() +{ + bool IsNewStore = false; + + std::filesystem::path ManifestPath = m_Config.RootDirectory; + ManifestPath /= ".ucas_root"; + + std::error_code Ec; + BasicFile ManifestFile; + ManifestFile.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec); + + bool ManifestIsOk = false; + + if (Ec) + { + if (Ec == std::errc::no_such_file_or_directory) + { + IsNewStore = true; + } + } + else + { + IoBuffer ManifestBuffer = ManifestFile.ReadAll(); + ManifestFile.Close(); + + if (ManifestBuffer.Size() > 0 && ManifestBuffer.Data<uint8_t>()[0] == '#') + { + // Old-style manifest, does not contain any useful information, so we may as well update it + } + else + { + CbObject Manifest{SharedBuffer(ManifestBuffer)}; + CbValidateError ValidationResult = ValidateCompactBinary(ManifestBuffer, CbValidateMode::All); + + if (ValidationResult == CbValidateError::None) + { + if (Manifest["id"]) + { + ManifestIsOk = true; + } + } + else + { + ZEN_ERROR("Store manifest validation failed: {:#x}, will generate new manifest to recover", ValidationResult); + } + + if (ManifestIsOk) + { + m_ManifestObject = std::move(Manifest); + } + } + } + + if (!ManifestIsOk) + { + UpdateManifest(); + } + + return IsNewStore; +} + +void +CasImpl::UpdateManifest() +{ + if (!m_ManifestObject) + { + CbObjectWriter Cbo; + Cbo << "id" << zen::Oid::NewOid() << "created" << DateTime::Now(); + m_ManifestObject = Cbo.Save(); + } + + // Write manifest to file + + std::filesystem::path ManifestPath = m_Config.RootDirectory; + ManifestPath /= ".ucas_root"; + + // This will throw on failure + + ZEN_TRACE("Writing new manifest to '{}'", ManifestPath); + + BasicFile Marker; + Marker.Open(ManifestPath.c_str(), /* IsCreate */ true); + Marker.Write(m_ManifestObject.GetBuffer(), 0); +} + +CasStore::InsertResult +CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) +{ + const uint64_t ChunkSize = Chunk.Size(); + + if (ChunkSize < m_Config.TinyValueThreshold) + { + ZEN_ASSERT(ChunkSize); + + return m_TinyStrategy.InsertChunk(Chunk, ChunkHash); + } + else if (ChunkSize < m_Config.HugeValueThreshold) + { + return m_SmallStrategy.InsertChunk(Chunk, ChunkHash); + } + + return m_LargeStrategy.InsertChunk(Chunk, ChunkHash); +} + +IoBuffer +CasImpl::FindChunk(const IoHash& ChunkHash) +{ + if (IoBuffer Found = m_SmallStrategy.FindChunk(ChunkHash)) + { + return Found; + } + + if (IoBuffer Found = m_TinyStrategy.FindChunk(ChunkHash)) + { + return Found; + } + + if (IoBuffer Found = m_LargeStrategy.FindChunk(ChunkHash)) + { + return Found; + } + + // Not found + return IoBuffer{}; +} + +bool +CasImpl::ContainsChunk(const IoHash& ChunkHash) +{ + return m_SmallStrategy.HaveChunk(ChunkHash) || m_TinyStrategy.HaveChunk(ChunkHash) || m_LargeStrategy.HaveChunk(ChunkHash); +} + +void +CasImpl::FilterChunks(CasChunkSet& InOutChunks) +{ + m_SmallStrategy.FilterChunks(InOutChunks); + m_TinyStrategy.FilterChunks(InOutChunks); + m_LargeStrategy.FilterChunks(InOutChunks); +} + +void +CasImpl::Flush() +{ + m_SmallStrategy.Flush(); + m_TinyStrategy.Flush(); + m_LargeStrategy.Flush(); +} + +void +CasImpl::Scrub(ScrubContext& Ctx) +{ + if (m_LastScrubTime == Ctx.ScrubTimestamp()) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + + m_SmallStrategy.Scrub(Ctx); + m_TinyStrategy.Scrub(Ctx); + m_LargeStrategy.Scrub(Ctx); +} + +void +CasImpl::GarbageCollect(GcContext& GcCtx) +{ + m_SmallStrategy.CollectGarbage(GcCtx); + m_TinyStrategy.CollectGarbage(GcCtx); + m_LargeStrategy.CollectGarbage(GcCtx); +} + +CasStoreSize +CasImpl::TotalSize() const +{ + const uint64_t Tiny = m_TinyStrategy.StorageSize().DiskSize; + const uint64_t Small = m_SmallStrategy.StorageSize().DiskSize; + const uint64_t Large = m_LargeStrategy.StorageSize().DiskSize; + + return {.TinySize = Tiny, .SmallSize = Small, .LargeSize = Large, .TotalSize = Tiny + Small + Large}; +} + +////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<CasStore> +CreateCasStore(CasGc& Gc) +{ + return std::make_unique<CasImpl>(Gc); +} + +////////////////////////////////////////////////////////////////////////// +// +// Testing related code follows... +// + +#if ZEN_WITH_TESTS + +TEST_CASE("CasStore") +{ + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration config; + config.RootDirectory = TempDir.Path(); + + CasGc Gc; + + std::unique_ptr<CasStore> Store = CreateCasStore(Gc); + Store->Initialize(config); + + ScrubContext Ctx; + Store->Scrub(Ctx); + + IoBuffer Value1{16}; + memcpy(Value1.MutableData(), "1234567890123456", 16); + IoHash Hash1 = IoHash::HashBuffer(Value1.Data(), Value1.Size()); + CasStore::InsertResult Result1 = Store->InsertChunk(Value1, Hash1); + CHECK(Result1.New); + + IoBuffer Value2{16}; + memcpy(Value2.MutableData(), "ABCDEFGHIJKLMNOP", 16); + IoHash Hash2 = IoHash::HashBuffer(Value2.Data(), Value2.Size()); + CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2); + CHECK(Result2.New); + + CasChunkSet ChunkSet; + ChunkSet.AddChunkToSet(Hash1); + ChunkSet.AddChunkToSet(Hash2); + + Store->FilterChunks(ChunkSet); + CHECK(ChunkSet.IsEmpty()); + + IoBuffer Lookup1 = Store->FindChunk(Hash1); + CHECK(Lookup1); + IoBuffer Lookup2 = Store->FindChunk(Hash2); + CHECK(Lookup2); +} + +void +CAS_forcelink() +{ +} + +#endif + +} // namespace zen |