diff options
Diffstat (limited to 'zenstore')
| -rw-r--r-- | zenstore/CAS.cpp | 166 | ||||
| -rw-r--r-- | zenstore/caslog.cpp | 18 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 105 | ||||
| -rw-r--r-- | zenstore/compactcas.h | 38 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 135 | ||||
| -rw-r--r-- | zenstore/filecas.h | 21 | ||||
| -rw-r--r-- | zenstore/gc.cpp | 111 | ||||
| -rw-r--r-- | zenstore/include/zenstore/CAS.h | 36 | ||||
| -rw-r--r-- | zenstore/include/zenstore/caslog.h | 3 | ||||
| -rw-r--r-- | zenstore/include/zenstore/cidstore.h | 5 | ||||
| -rw-r--r-- | zenstore/include/zenstore/gc.h | 79 | ||||
| -rw-r--r-- | zenstore/zenstore.cpp | 2 | ||||
| -rw-r--r-- | zenstore/zenstore.vcxproj | 2 |
13 files changed, 614 insertions, 107 deletions
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index a4bbfa340..86c6eb849 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -5,6 +5,9 @@ #include "compactcas.h" #include "filecas.h" +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/except.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> @@ -14,6 +17,7 @@ #include <zencore/testutils.h> #include <zencore/thread.h> #include <zencore/uid.h> +#include <zenstore/gc.h> #include <gsl/gsl-lite.hpp> @@ -67,34 +71,6 @@ CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callba ////////////////////////////////////////////////////////////////////////// -struct GcContext::GcState -{ - CasChunkSet m_CasChunks; - CasChunkSet m_CidChunks; -}; - -GcContext::GcContext() : m_State(std::make_unique<GcState>()) -{ -} - -GcContext::~GcContext() -{ -} - -void -GcContext::ContributeCids(std::span<const IoHash> Cids) -{ - m_State->m_CidChunks.AddChunksToSet(Cids); -} - -void -GcContext::ContributeCas(std::span<const IoHash> Cas) -{ - m_State->m_CasChunks.AddChunksToSet(Cas); -} - -////////////////////////////////////////////////////////////////////////// - void ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks) { @@ -119,7 +95,7 @@ ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) class CasImpl : public CasStore { public: - CasImpl(); + CasImpl(CasGc& Gc); virtual ~CasImpl(); virtual void Initialize(const CasStoreConfiguration& InConfig) override; @@ -128,14 +104,27 @@ public: virtual void FilterChunks(CasChunkSet& InOutChunks) override; virtual void Flush() override; virtual void Scrub(ScrubContext& Ctx) override; + virtual void GarbageCollect(GcContext& GcCtx) override; private: CasContainerStrategy m_TinyStrategy; CasContainerStrategy m_SmallStrategy; FileCasStrategy m_LargeStrategy; + CbObject m_ManifestObject; + + enum class StorageScheme + { + Legacy = 0, + WithCbManifest = 1 + }; + + StorageScheme m_StorageScheme = StorageScheme::Legacy; + + bool OpenOrCreateManifest(); + void UpdateManifest(); }; -CasImpl::CasImpl() : m_TinyStrategy(m_Config), m_SmallStrategy(m_Config), m_LargeStrategy(m_Config) +CasImpl::CasImpl(CasGc& Gc) : m_TinyStrategy(m_Config), m_SmallStrategy(m_Config), m_LargeStrategy(m_Config, Gc) { } @@ -155,39 +144,100 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) std::filesystem::create_directories(m_Config.RootDirectory); // Open or create manifest - // - // The manifest is not currently fully implemented. The goal is to - // use it for recovery and configuration + const bool IsNewStore = OpenOrCreateManifest(); + + // Initialize payload storage + + m_LargeStrategy.Initialize(IsNewStore); + m_TinyStrategy.Initialize("tobs", 16, IsNewStore); + m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); +} + +bool +CasImpl::OpenOrCreateManifest() +{ bool IsNewStore = false; - { - std::filesystem::path ManifestPath = m_Config.RootDirectory; - ManifestPath /= ".ucas_root"; + std::filesystem::path ManifestPath = m_Config.RootDirectory; + ManifestPath /= ".ucas_root"; - std::error_code Ec; - BasicFile Marker; - Marker.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec); + std::error_code Ec; + BasicFile ManifestFile; + ManifestFile.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec); - if (Ec) + bool ManifestIsOk = false; + + if (Ec) + { + if (Ec == std::errc::no_such_file_or_directory) { IsNewStore = true; + } + } + else + { + IoBuffer ManifestBuffer = ManifestFile.ReadAll(); + ManifestFile.Close(); - ExtendableStringBuilder<128> manifest; - manifest.Append("#CAS_ROOT\n"); - manifest.Append("ID="); - zen::Oid id = zen::Oid::NewOid(); - id.ToString(manifest); - - Marker.Open(ManifestPath.c_str(), /* IsCreate */ true); - Marker.Write(manifest.c_str(), (DWORD)manifest.Size(), 0); + if (ManifestBuffer.Size() > 0 && ManifestBuffer.Data<uint8_t>()[0] == '#') + { + // Old-style manifest, does not contain any useful information, so we may as well update it + } + else + { + CbObject Manifest{SharedBuffer(ManifestBuffer)}; + CbValidateError ValidationResult = ValidateCompactBinary(ManifestBuffer, CbValidateMode::All); + + if (ValidationResult == CbValidateError::None) + { + if (Manifest["id"]) + { + ManifestIsOk = true; + } + } + else + { + ZEN_ERROR("Store manifest validation failed: {:#x}, will generate new manifest to recover", ValidationResult); + } + + if (ManifestIsOk) + { + m_ManifestObject = std::move(Manifest); + } } } - // Initialize payload storage + if (!ManifestIsOk) + { + UpdateManifest(); + } - m_TinyStrategy.Initialize("tobs", 16, IsNewStore); - m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); + return IsNewStore; +} + +void +CasImpl::UpdateManifest() +{ + if (!m_ManifestObject) + { + CbObjectWriter Cbo; + Cbo << "id" << zen::Oid::NewOid() << "created" << DateTime::Now(); + m_ManifestObject = Cbo.Save(); + } + + // Write manifest to file + + std::filesystem::path ManifestPath = m_Config.RootDirectory; + ManifestPath /= ".ucas_root"; + + // This will throw on failure + + ZEN_TRACE("Writing new manifest to '{}'", ManifestPath); + + BasicFile Marker; + Marker.Open(ManifestPath.c_str(), /* IsCreate */ true); + Marker.Write(m_ManifestObject.GetBuffer(), 0); } CasStore::InsertResult @@ -262,12 +312,18 @@ CasImpl::Scrub(ScrubContext& Ctx) m_LargeStrategy.Scrub(Ctx); } +void +CasImpl::GarbageCollect(GcContext& GcCtx) +{ + m_LargeStrategy.CollectGarbage(GcCtx); +} + ////////////////////////////////////////////////////////////////////////// CasStore* -CreateCasStore() +CreateCasStore(CasGc& Gc) { - return new CasImpl(); + return new CasImpl(Gc); } ////////////////////////////////////////////////////////////////////////// @@ -284,7 +340,9 @@ TEST_CASE("CasStore") CasStoreConfiguration config; config.RootDirectory = TempDir.Path(); - std::unique_ptr<CasStore> Store{CreateCasStore()}; + CasGc Gc; + + std::unique_ptr<CasStore> Store{CreateCasStore(Gc)}; Store->Initialize(config); ScrubContext Ctx; diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp index 2bac6affd..38d0f818e 100644 --- a/zenstore/caslog.cpp +++ b/zenstore/caslog.cpp @@ -46,7 +46,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat m_RecordSize = RecordSize; std::error_code Ec; - m_File.Open(FileName, IsCreate); + m_File.Open(FileName, IsCreate, Ec); if (Ec) { @@ -55,7 +55,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat uint64_t AppendOffset = 0; - if (IsCreate) + if (IsCreate || (m_File.FileSize() < sizeof(FileHeader))) { // Initialize log by writing header FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0}; @@ -76,12 +76,18 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum())) { - // TODO: provide more context! - throw std::runtime_error("Mangled log header"); + throw std::runtime_error("Mangled log header (invalid header magic) in '{}'"_format(FileName)); } AppendOffset = m_File.FileSize(); - m_Header = Header; + + // Adjust the offset to ensure we end up on a good boundary, in case there is some garbage appended + + AppendOffset -= sizeof Header; + AppendOffset -= AppendOffset % RecordSize; + AppendOffset += sizeof Header; + + m_Header = Header; } m_AppendOffset = AppendOffset; @@ -125,6 +131,8 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler) { Handler(ReadBuffer.data() + (i * m_RecordSize)); } + + m_AppendOffset = LogBaseOffset + (LogFileSize * LogEntryCount); } void diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 612f87c7c..dbe5572b9 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -4,13 +4,19 @@ #include "CompactCas.h" +#include <zencore/compactbinarybuilder.h> #include <zencore/except.h> +#include <zencore/filesystem.h> #include <zencore/logging.h> #include <zencore/memory.h> #include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> #include <zencore/thread.h> #include <zencore/uid.h> +#include <zenstore/gc.h> + #include <filesystem> #include <functional> #include <gsl/gsl-lite.hpp> @@ -58,7 +64,7 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6 m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { m_LocationMap[Record.Key] = Record.Location; - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset + Record.Location.Size); + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize()); }); } @@ -91,7 +97,7 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const RwLock::ExclusiveLockScope __(m_LocationMapLock); - CasDiskLocation Location{.Offset = InsertOffset, .Size = /* TODO FIX */ uint32_t(ChunkSize)}; + const CasDiskLocation Location{InsertOffset, ChunkSize}; m_LocationMap[ChunkHash] = Location; @@ -116,7 +122,8 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { const CasDiskLocation& Location = KeyIt->second; - return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.Offset, Location.Size); + + return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize()); } // Not found @@ -187,11 +194,11 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) for (auto& Entry : m_LocationMap) { - const uint64_t EntryOffset = Entry.second.Offset; + const uint64_t EntryOffset = Entry.second.GetOffset(); if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { - const uint64_t EntryEnd = EntryOffset + Entry.second.Size; + const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize(); if (EntryEnd >= WindowEnd) { @@ -201,7 +208,8 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) } const IoHash ComputedHash = - IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, Entry.second.Size); + IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart, + Entry.second.GetSize()); if (Entry.first != ComputedHash) { @@ -222,7 +230,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) for (const CasDiskIndexEntry& Entry : BigChunks) { IoHashStream Hasher; - m_SmallObjectFile.StreamByteRange(Entry.Location.Offset, Entry.Location.Size, [&](const void* Data, uint64_t Size) { + m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); @@ -258,6 +266,12 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) } void +CasContainerStrategy::CollectGarbage(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + +void CasContainerStrategy::MakeSnapshot() { RwLock::SharedLockScope _(m_LocationMapLock); @@ -275,4 +289,81 @@ CasContainerStrategy::MakeSnapshot() m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0); } +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +TEST_CASE("cas.compact.gc") +{ + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + + CreateDirectories(CasConfig.RootDirectory); + + const int kIterationCount = 1000; + + std::vector<IoHash> Keys(kIterationCount); + + { + CasContainerStrategy Cas(CasConfig); + Cas.Initialize("test", 16, true); + + for (int i = 0; i < kIterationCount; ++i) + { + CbObjectWriter Cbo; + Cbo << "id" << i; + CbObject Obj = Cbo.Save(); + + IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); + const IoHash Hash = HashBuffer(ObjBuffer); + + Cas.InsertChunk(ObjBuffer, Hash); + + Keys[i] = Hash; + } + + for (int i = 0; i < kIterationCount; ++i) + { + IoBuffer Chunk = Cas.FindChunk(Keys[i]); + + CHECK(!!Chunk); + + CbObject Value = LoadCompactBinaryObject(Chunk); + + CHECK_EQ(Value["id"].AsInt32(), i); + } + } + + // Validate that we can still read the inserted data after closing + // the original cas store + + { + CasContainerStrategy Cas(CasConfig); + Cas.Initialize("test", 16, false); + + for (int i = 0; i < kIterationCount; ++i) + { + IoBuffer Chunk = Cas.FindChunk(Keys[i]); + + CHECK(!!Chunk); + + CbObject Value = LoadCompactBinaryObject(Chunk); + + CHECK_EQ(Value["id"].AsInt32(), i); + } + + GcContext Ctx; + Cas.CollectGarbage(Ctx); + } +} + +#endif + +void +compactcas_forcelink() +{ +} + } // namespace zen diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h index a512c3d93..a3f3121e6 100644 --- a/zenstore/compactcas.h +++ b/zenstore/compactcas.h @@ -23,17 +23,42 @@ namespace zen { struct CasDiskLocation { - uint64_t Offset; - // If we wanted to be able to store larger chunks using this storage mechanism then - // we could make this more like the IoStore index so we can store larger chunks. - // I.e use five bytes for size and seven for offset - uint32_t Size; + CasDiskLocation(uint64_t InOffset, uint64_t InSize) + { + ZEN_ASSERT(InOffset <= 0xff'ffff'ffff); + ZEN_ASSERT(InSize <= 0xff'ffff'ffff); + + memcpy(&m_Offset[0], &InOffset, sizeof m_Offset); + memcpy(&m_Size[0], &InSize, sizeof m_Size); + } + + CasDiskLocation() = default; + + inline uint64_t GetOffset() const + { + uint64_t Offset = 0; + memcpy(&Offset, &m_Offset, sizeof m_Offset); + return Offset; + } + + inline uint64_t GetSize() const + { + uint64_t Size = 0; + memcpy(&Size, &m_Size, sizeof m_Size); + return Size; + } + +private: + uint8_t m_Offset[5]; + uint8_t m_Size[5]; }; struct CasDiskIndexEntry { IoHash Key; CasDiskLocation Location; + ZenContentType ContentType = ZenContentType::kUnknownContentType; + uint8_t Flags = 0; }; #pragma pack(pop) @@ -61,6 +86,7 @@ struct CasContainerStrategy void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore); void Flush(); void Scrub(ScrubContext& Ctx); + void CollectGarbage(GcContext& GcCtx); private: const CasStoreConfiguration& m_Config; @@ -80,4 +106,6 @@ private: void MakeSnapshot(); }; +void compactcas_forcelink(); + } // namespace zen diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index a37450cd8..0714637c6 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -14,6 +14,11 @@ #include <zencore/thread.h> #include <zencore/uid.h> #include <zenstore/basicfile.h> +#include <zenstore/gc.h> + +#if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +#endif #include <gsl/gsl-lite.hpp> @@ -65,7 +70,10 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo ////////////////////////////////////////////////////////////////////////// -FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config), m_Log(logging::Get("filecas")) +FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc) +: GcStorage(Gc) +, m_Config(Config) +, m_Log(logging::Get("filecas")) { } @@ -73,9 +81,23 @@ FileCasStrategy::~FileCasStrategy() { } +void +FileCasStrategy::Initialize(bool IsNewStore) +{ + m_IsInitialized = true; + + CreateDirectories(m_Config.RootDirectory); + + m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore); + + m_CasLog.Replay([&](const FileCasIndexEntry& Entry) { ZEN_UNUSED(Entry); }); +} + CasStore::InsertResult FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { + ZEN_ASSERT(m_IsInitialized); + // File-based chunks have special case handling whereby we move the file into // place in the file store directory, thus avoiding unnecessary copying @@ -207,6 +229,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) if (Success) { + m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); + return CasStore::InsertResult{.New = true}; } @@ -232,6 +256,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) CasStore::InsertResult FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash) { + ZEN_ASSERT(m_IsInitialized); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); // See if file already exists @@ -304,12 +330,16 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize // *after* the lock is released due to the initialization order PayloadFile.Close(); + m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); + return {.New = true}; } IoBuffer FileCasStrategy::FindChunk(const IoHash& ChunkHash) { + ZEN_ASSERT(m_IsInitialized); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -320,6 +350,8 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) bool FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { + ZEN_ASSERT(m_IsInitialized); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -332,6 +364,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash) return false; } + void FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) { @@ -340,11 +373,18 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) ZEN_DEBUG("deleting CAS payload file '{}'", WideToUtf8(Name.ShardedPath)); std::filesystem::remove(Name.ShardedPath.c_str(), Ec); + + if (!Ec) + { + m_CasLog.Append({.Key = ChunkHash, .Size = ~(0ull)}); + } } void FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) { + ZEN_ASSERT(m_IsInitialized); + // NOTE: it's not a problem now, but in the future if a GC should happen while this // is in flight, the result could be wrong since chunks could go away in the meantime. // @@ -359,6 +399,8 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) void FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback) { + ZEN_ASSERT(m_IsInitialized); + struct Visitor : public FileSystemTraversal::TreeVisitor { Visitor(const std::filesystem::path& RootDir) : RootDirectory(RootDir) {} @@ -430,6 +472,8 @@ FileCasStrategy::Flush() void FileCasStrategy::Scrub(ScrubContext& Ctx) { + ZEN_ASSERT(m_IsInitialized); + std::vector<IoHash> BadHashes; std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; @@ -476,9 +520,58 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) } void -FileCasStrategy::GarbageCollect(GcContext& GcCtx) +FileCasStrategy::CollectGarbage(GcContext& GcCtx) { - ZEN_UNUSED(GcCtx); + ZEN_ASSERT(m_IsInitialized); + + ZEN_INFO("collecting garbage from {}", m_Config.RootDirectory); + + std::vector<IoHash> ChunksToDelete; + std::atomic<uint64_t> ChunksToDeleteBytes{0}; + std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; + + std::vector<IoHash> CandidateCas; + + IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { + bool KeepThis = false; + CandidateCas.clear(); + CandidateCas.push_back(Hash); + GcCtx.FilterCas(CandidateCas, [&](const IoHash& Hash) { + ZEN_UNUSED(Hash); + KeepThis = true; + }); + + const uint64_t FileSize = Payload.FileSize(); + + if (!KeepThis) + { + ChunksToDelete.push_back(Hash); + ChunksToDeleteBytes.fetch_add(FileSize); + } + + ++ChunkCount; + ChunkBytes.fetch_add(FileSize); + }); + + ZEN_INFO("file CAS gc scanned: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes)); + + if (ChunksToDelete.empty()) + { + return; + } + + ZEN_INFO("deleting file CAS garbage: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunksToDeleteBytes)); + + for (const IoHash& Hash : ChunksToDelete) + { + std::error_code Ec; + DeleteChunk(Hash, Ec); + + if (Ec) + { + ZEN_WARN("failed to delete file for chunk {}: '{}'", Hash, Ec.message()); + } + } } ////////////////////////////////////////////////////////////////////////// @@ -489,12 +582,16 @@ TEST_CASE("cas.file.move") { using namespace fmt::literals; - ScopedTemporaryDirectory TempDir{"d:\\filecas_testdir"}; + // specifying an absolute path here can be helpful when using procmon to dig into things + ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; + + CasGc Gc; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path() / "cas"; - FileCasStrategy FileCas(CasConfig); + FileCasStrategy FileCas(CasConfig, Gc); + FileCas.Initialize(/* IsNewStore */ true); { std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; @@ -564,6 +661,34 @@ TEST_CASE("cas.file.move") # endif } +TEST_CASE("cas.file.gc") +{ + // specifying an absolute path here can be helpful when using procmon to dig into things + ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path() / "cas"; + + CasGc Gc; + FileCasStrategy FileCas(CasConfig, Gc); + FileCas.Initialize(/* IsNewStore */ true); + + for (int i = 0; i < 1000; ++i) + { + CbObjectWriter Cbo; + Cbo << "id" << i; + CbObject Obj = Cbo.Save(); + + IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); + IoHash Hash = HashBuffer(ObjBuffer); + + FileCas.InsertChunk(ObjBuffer, Hash); + } + + GcContext Ctx; + FileCas.CollectGarbage(Ctx); +} + #endif void diff --git a/zenstore/filecas.h b/zenstore/filecas.h index 14314ce52..ec2ca3f31 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -9,6 +9,8 @@ #include <zencore/string.h> #include <zencore/thread.h> #include <zenstore/cas.h> +#include <zenstore/caslog.h> +#include <zenstore/gc.h> #include <functional> @@ -23,18 +25,19 @@ class BasicFile; /** CAS storage strategy using a file-per-chunk storage strategy */ -struct FileCasStrategy +struct FileCasStrategy : public GcStorage { - FileCasStrategy(const CasStoreConfiguration& Config); + FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc); ~FileCasStrategy(); + void Initialize(bool IsNewStore); CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); void FilterChunks(CasChunkSet& InOutChunks); void Flush(); - void GarbageCollect(GcContext& GcCtx); + virtual void CollectGarbage(GcContext& GcCtx) override; void Scrub(ScrubContext& Ctx); private: @@ -43,6 +46,18 @@ private: RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines spdlog::logger& m_Log; spdlog::logger& Log() { return m_Log; } + bool m_IsInitialized = false; + + struct FileCasIndexEntry + { + IoHash Key; + uint32_t Pad = 0; + uint64_t Size = 0; + }; + + static_assert(sizeof(FileCasIndexEntry) == 32); + + TCasLogFile<FileCasIndexEntry> m_CasLog; inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback); diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index bfb8f015e..cb03f72ff 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -1,10 +1,77 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zenstore/CAS.h> #include <zenstore/gc.h> namespace zen { -CasGc::CasGc(CasStore& Store) : m_CasStore(Store) +////////////////////////////////////////////////////////////////////////// + +struct GcContext::GcState +{ + CasChunkSet m_CasChunks; + CasChunkSet m_CidChunks; +}; + +GcContext::GcContext() : m_State(std::make_unique<GcState>()) +{ +} + +GcContext::~GcContext() +{ +} + +void +GcContext::ContributeCids(std::span<const IoHash> Cids) +{ + m_State->m_CidChunks.AddChunksToSet(Cids); +} + +void +GcContext::ContributeCas(std::span<const IoHash> Cas) +{ + m_State->m_CasChunks.AddChunksToSet(Cas); +} + +void +GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc) +{ + m_State->m_CidChunks.FilterChunks(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); }); +} + +void +GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc) +{ + m_State->m_CasChunks.FilterChunks(Cas, [&](const IoHash& Hash) { KeepFunc(Hash); }); +} + +////////////////////////////////////////////////////////////////////////// + +GcContributor::GcContributor(CasGc& Gc) : m_Gc(Gc) +{ + m_Gc.AddGcContributor(this); +} + +GcContributor::~GcContributor() +{ + m_Gc.RemoveGcContributor(this); +} + +////////////////////////////////////////////////////////////////////////// + +GcStorage::GcStorage(CasGc& Gc) : m_Gc(Gc) +{ + m_Gc.AddGcStorage(this); +} + +GcStorage::~GcStorage() +{ + m_Gc.AddGcStorage(this); +} + +////////////////////////////////////////////////////////////////////////// + +CasGc::CasGc() { } @@ -13,12 +80,52 @@ CasGc::~CasGc() } void +CasGc::AddGcContributor(GcContributor* Contributor) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_GcContribs.push_back(Contributor); +} + +void +CasGc::RemoveGcContributor(GcContributor* Contributor) +{ + RwLock::ExclusiveLockScope _(m_Lock); + std::erase_if(m_GcContribs, [&](GcContributor* $) { return $ == Contributor; }); +} + +void +CasGc::AddGcStorage(GcStorage* Storage) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_GcStorage.push_back(Storage); +} + +void +CasGc::RemoveGcStorage(GcStorage* Storage) +{ + RwLock::ExclusiveLockScope _(m_Lock); + std::erase_if(m_GcStorage, [&](GcStorage* $) { return $ == Storage; }); +} + +void CasGc::CollectGarbage() { } void -CasGc::OnNewReferences(std::span<IoHash> Hashes) +CasGc::OnNewCidReferences(std::span<IoHash> Hashes) +{ + ZEN_UNUSED(Hashes); +} + +void +CasGc::OnCommittedCidReferences(std::span<IoHash> Hashes) +{ + ZEN_UNUSED(Hashes); +} + +void +CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes) { ZEN_UNUSED(Hashes); } diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index 86e7e78d9..5b508baa0 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -11,6 +11,7 @@ #include <zencore/timer.h> #include <atomic> +#include <concepts> #include <filesystem> #include <functional> #include <memory> @@ -19,6 +20,9 @@ namespace zen { +class GcContext; +class CasGc; + struct CasStoreConfiguration { // Root directory for CAS store @@ -45,29 +49,22 @@ public: inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); } inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); } + inline void FilterChunks(std::span<const IoHash> Candidates, std::invocable<const IoHash&> auto MatchFunc) + { + for (const IoHash& Candidate : Candidates) + { + if (ContainsChunk(Candidate)) + { + MatchFunc(Candidate); + } + } + } + private: // Q: should we protect this with a lock, or is that a higher level concern? std::unordered_set<IoHash> m_ChunkSet; }; -/** Garbage Collection context object - */ - -class GcContext -{ -public: - GcContext(); - ~GcContext(); - - void ContributeCids(std::span<const IoHash> Cid); - void ContributeCas(std::span<const IoHash> Hash); - -private: - struct GcState; - - std::unique_ptr<GcState> m_State; -}; - /** Context object for data scrubbing * * Data scrubbing is when we traverse stored data to validate it and @@ -116,13 +113,14 @@ public: virtual void FilterChunks(CasChunkSet& InOutChunks) = 0; virtual void Flush() = 0; virtual void Scrub(ScrubContext& Ctx) = 0; + virtual void GarbageCollect(GcContext& GcCtx) = 0; protected: CasStoreConfiguration m_Config; uint64_t m_LastScrubTime = 0; }; -ZENCORE_API CasStore* CreateCasStore(); +ZENCORE_API CasStore* CreateCasStore(CasGc& Gc); void CAS_forcelink(); diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h index 00b987383..065a74b25 100644 --- a/zenstore/include/zenstore/caslog.h +++ b/zenstore/include/zenstore/caslog.h @@ -57,6 +57,8 @@ template<typename T> class TCasLogFile : public CasLogFile { public: + void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); } + // This should be called before the Replay() is called to do some basic sanity checking bool Initialize() { return true; } @@ -76,7 +78,6 @@ public: CasLogFile::Append(&Record, sizeof Record); } - void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); } }; } // namespace zen diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index 5f567e7fc..acfedbc64 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -4,10 +4,13 @@ #include "zenstore.h" -#include <tsl/robin_map.h> #include <zencore/iohash.h> #include <zenstore/CAS.h> +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + namespace std::filesystem { class path; } diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h index 055843547..ef62158ce 100644 --- a/zenstore/include/zenstore/gc.h +++ b/zenstore/include/zenstore/gc.h @@ -3,26 +3,99 @@ #pragma once #include <zencore/iohash.h> +#include <zencore/thread.h> #include <span> +#define ZEN_USE_REF_TRACKING 0 // This is not currently functional + namespace zen { class CasStore; +class CasGc; struct IoHash; +/** Garbage Collection context object + */ + +class GcContext +{ +public: + GcContext(); + ~GcContext(); + + void ContributeCids(std::span<const IoHash> Cid); + void ContributeCas(std::span<const IoHash> Hash); + + void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc); + void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc); + +private: + struct GcState; + + std::unique_ptr<GcState> m_State; +}; + +/** GC root contributor + + Higher level data structures provide roots for the garbage collector, + which ultimately determine what is garbage and what data we need to + retain. + + */ + +class GcContributor +{ +public: + GcContributor(CasGc& Gc); + ~GcContributor(); + + virtual void GatherReferences(GcContext& GcCtx) = 0; + +protected: + CasGc& m_Gc; +}; + +/** GC storage provider + */ + +class GcStorage +{ +public: + GcStorage(CasGc& Gc); + ~GcStorage(); + + virtual void CollectGarbage(GcContext& GcCtrx) = 0; + +private: + CasGc& m_Gc; +}; + +/** GC orchestrator + */ + class CasGc { public: - CasGc(CasStore& Store); + CasGc(); ~CasGc(); + void AddGcContributor(GcContributor* Contributor); + void RemoveGcContributor(GcContributor* Contributor); + + void AddGcStorage(GcStorage* Contributor); + void RemoveGcStorage(GcStorage* Contributor); + void CollectGarbage(); - void OnNewReferences(std::span<IoHash> Hashes); + void OnNewCidReferences(std::span<IoHash> Hashes); + void OnCommittedCidReferences(std::span<IoHash> Hashes); + void OnDroppedCidReferences(std::span<IoHash> Hashes); private: - CasStore& m_CasStore; + RwLock m_Lock; + std::vector<GcContributor*> m_GcContribs; + std::vector<GcStorage*> m_GcStorage; }; } // namespace zen diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp index d852fa64b..9fdf2dccf 100644 --- a/zenstore/zenstore.cpp +++ b/zenstore/zenstore.cpp @@ -4,6 +4,7 @@ #include <zenstore/CAS.h> #include <zenstore/basicfile.h> +#include "compactcas.h" #include "filecas.h" namespace zen { @@ -14,6 +15,7 @@ zenstore_forcelinktests() basicfile_forcelink(); CAS_forcelink(); filecas_forcelink(); + compactcas_forcelink(); } } // namespace zen diff --git a/zenstore/zenstore.vcxproj b/zenstore/zenstore.vcxproj index eb2ecd02b..832ea8159 100644 --- a/zenstore/zenstore.vcxproj +++ b/zenstore/zenstore.vcxproj @@ -97,7 +97,6 @@ </PropertyGroup> <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> <ClCompile> - <WarningLevel>Level3</WarningLevel> <SDLCheck>true</SDLCheck> <PreprocessorDefinitions>_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> <ConformanceMode>true</ConformanceMode> @@ -111,7 +110,6 @@ </ItemDefinitionGroup> <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> <ClCompile> - <WarningLevel>Level3</WarningLevel> <FunctionLevelLinking>true</FunctionLevelLinking> <IntrinsicFunctions>true</IntrinsicFunctions> <SDLCheck>true</SDLCheck> |