diff options
| author | Stefan Boberg <[email protected]> | 2021-10-21 21:13:13 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-10-21 21:13:13 +0200 |
| commit | cb1a2f52e37d9d92a908a8761a7f69d6d33cc4b1 (patch) | |
| tree | a96e89cfc076d8e8a2f1ceca5d3985cdd3ba0c37 /zenstore | |
| parent | Removed accidentally committed test code (diff) | |
| download | zen-cb1a2f52e37d9d92a908a8761a7f69d6d33cc4b1.tar.xz zen-cb1a2f52e37d9d92a908a8761a7f69d6d33cc4b1.zip | |
filecas: Added commit log, chunk gc
Diffstat (limited to 'zenstore')
| -rw-r--r-- | zenstore/CAS.cpp | 1 | ||||
| -rw-r--r-- | zenstore/caslog.cpp | 14 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 95 | ||||
| -rw-r--r-- | zenstore/filecas.h | 23 | ||||
| -rw-r--r-- | zenstore/gc.cpp | 18 | ||||
| -rw-r--r-- | zenstore/include/zenstore/CAS.h | 12 | ||||
| -rw-r--r-- | zenstore/include/zenstore/caslog.h | 3 | ||||
| -rw-r--r-- | zenstore/include/zenstore/gc.h | 3 |
8 files changed, 151 insertions, 18 deletions
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index 09e13a702..ec8e8f570 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -149,6 +149,7 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) // Initialize payload storage + m_LargeStrategy.Initialize(IsNewStore); m_TinyStrategy.Initialize("tobs", 16, IsNewStore); m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); } diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp index 2bac6affd..af03e0391 100644 --- a/zenstore/caslog.cpp +++ b/zenstore/caslog.cpp @@ -55,7 +55,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat uint64_t AppendOffset = 0; - if (IsCreate) + if (IsCreate || (m_File.FileSize() < sizeof(FileHeader))) { // Initialize log by writing header FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0}; @@ -76,11 +76,17 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum())) { - // TODO: provide more context! - throw std::runtime_error("Mangled log header"); + throw std::runtime_error("Mangled log header (invalid header magic) in '{}'"_format(FileName)); } AppendOffset = m_File.FileSize(); + + // Adjust the offset to ensure we end up on a good boundary, in case there is some garbage appended + + AppendOffset -= sizeof Header; + AppendOffset -= AppendOffset % RecordSize; + AppendOffset += sizeof Header; + m_Header = Header; } @@ -125,6 +131,8 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler) { Handler(ReadBuffer.data() + (i * m_RecordSize)); } + + m_AppendOffset = LogBaseOffset + (LogFileSize * LogEntryCount); } void diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 9cb6e5c79..8c4df4029 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -70,7 +70,10 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo ////////////////////////////////////////////////////////////////////////// -FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc) : GcStorage(Gc), m_Config(Config), m_Log(logging::Get("filecas")) +FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc) +: GcStorage(Gc) +, m_Config(Config) +, m_Log(logging::Get("filecas")) { } @@ -78,9 +81,23 @@ FileCasStrategy::~FileCasStrategy() { } +void +FileCasStrategy::Initialize(bool IsNewStore) +{ + m_IsInitialized = true; + + CreateDirectories(m_Config.RootDirectory); + + m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore); + + m_CasLog.Replay([&](const FileCasIndexEntry& Entry) {}); +} + CasStore::InsertResult FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { + ZEN_ASSERT(m_IsInitialized); + // File-based chunks have special case handling whereby we move the file into // place in the file store directory, thus avoiding unnecessary copying @@ -212,6 +229,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) if (Success) { + m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); + return CasStore::InsertResult{.New = true}; } @@ -237,6 +256,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) CasStore::InsertResult FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash) { + ZEN_ASSERT(m_IsInitialized); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); // See if file already exists @@ -309,12 +330,16 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize // *after* the lock is released due to the initialization order PayloadFile.Close(); + m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); + return {.New = true}; } IoBuffer FileCasStrategy::FindChunk(const IoHash& ChunkHash) { + ZEN_ASSERT(m_IsInitialized); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -325,6 +350,8 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) bool FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { + ZEN_ASSERT(m_IsInitialized); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -345,11 +372,18 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) ZEN_DEBUG("deleting CAS payload file '{}'", WideToUtf8(Name.ShardedPath)); std::filesystem::remove(Name.ShardedPath.c_str(), Ec); + + if (!Ec) + { + m_CasLog.Append({.Key = ChunkHash, .Size = ~(0ull)}); + } } void FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) { + ZEN_ASSERT(m_IsInitialized); + // NOTE: it's not a problem now, but in the future if a GC should happen while this // is in flight, the result could be wrong since chunks could go away in the meantime. // @@ -364,6 +398,8 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) void FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback) { + ZEN_ASSERT(m_IsInitialized); + struct Visitor : public FileSystemTraversal::TreeVisitor { Visitor(const std::filesystem::path& RootDir) : RootDirectory(RootDir) {} @@ -435,6 +471,8 @@ FileCasStrategy::Flush() void FileCasStrategy::Scrub(ScrubContext& Ctx) { + ZEN_ASSERT(m_IsInitialized); + std::vector<IoHash> BadHashes; std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; @@ -483,7 +521,53 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) void FileCasStrategy::CollectGarbage(GcContext& GcCtx) { - ZEN_UNUSED(GcCtx); + ZEN_ASSERT(m_IsInitialized); + + ZEN_INFO("collecting garbage from {}", m_Config.RootDirectory); + + std::vector<IoHash> ChunksToDelete; + std::atomic<uint64_t> ChunksToDeleteBytes{0}; + std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; + + std::vector<IoHash> CandidateCas; + + IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { + bool KeepThis = false; + CandidateCas.clear(); + CandidateCas.push_back(Hash); + GcCtx.FilterCas(CandidateCas, [&](const IoHash& Hash) { KeepThis = true; }); + + const uint64_t FileSize = Payload.FileSize(); + + if (!KeepThis) + { + ChunksToDelete.push_back(Hash); + ChunksToDeleteBytes.fetch_add(FileSize); + } + + ++ChunkCount; + ChunkBytes.fetch_add(FileSize); + }); + + ZEN_INFO("file CAS gc scanned: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes)); + + if (ChunksToDelete.empty()) + { + return; + } + + ZEN_INFO("deleting file CAS garbage: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunksToDeleteBytes)); + + for (const IoHash& Hash : ChunksToDelete) + { + std::error_code Ec; + DeleteChunk(Hash, Ec); + + if (Ec) + { + ZEN_WARN("failed to delete file for chunk {}: '{}'", Hash, Ec.message()); + } + } } ////////////////////////////////////////////////////////////////////////// @@ -503,6 +587,7 @@ TEST_CASE("cas.file.move") CasConfig.RootDirectory = TempDir.Path() / "cas"; FileCasStrategy FileCas(CasConfig, Gc); + FileCas.Initialize(/* IsNewStore */true); { std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; @@ -577,12 +662,12 @@ TEST_CASE("cas.file.gc") // specifying an absolute path here can be helpful when using procmon to dig into things ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; - CasGc Gc; - CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path() / "cas"; + CasGc Gc; FileCasStrategy FileCas(CasConfig, Gc); + FileCas.Initialize(/* IsNewStore */ true); for (int i = 0; i < 1000; ++i) { @@ -594,11 +679,9 @@ TEST_CASE("cas.file.gc") IoHash Hash = HashBuffer(ObjBuffer); FileCas.InsertChunk(ObjBuffer, Hash); - ; } GcContext Ctx; - FileCas.CollectGarbage(Ctx); } diff --git a/zenstore/filecas.h b/zenstore/filecas.h index bbba9733e..686fd2ea8 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -9,6 +9,7 @@ #include <zencore/string.h> #include <zencore/thread.h> #include <zenstore/cas.h> +#include <zenstore/caslog.h> #include <zenstore/gc.h> #include <functional> @@ -21,6 +22,15 @@ namespace zen { class BasicFile; +struct FileCasIndexEntry +{ + IoHash Key; + uint32_t Pad = 0; + uint64_t Size = 0; +}; + +static_assert(sizeof(FileCasIndexEntry) == 32); + /** CAS storage strategy using a file-per-chunk storage strategy */ @@ -29,6 +39,7 @@ struct FileCasStrategy : public GcStorage FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc); ~FileCasStrategy(); + void Initialize(bool IsNewStore); CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); @@ -39,11 +50,13 @@ struct FileCasStrategy : public GcStorage void Scrub(ScrubContext& Ctx); private: - const CasStoreConfiguration& m_Config; - RwLock m_Lock; - RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines - spdlog::logger& m_Log; - spdlog::logger& Log() { return m_Log; } + const CasStoreConfiguration& m_Config; + RwLock m_Lock; + RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines + spdlog::logger& m_Log; + spdlog::logger& Log() { return m_Log; } + TCasLogFile<FileCasIndexEntry> m_CasLog; + bool m_IsInitialized = false; inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback); diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index 79c646db2..cb03f72ff 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -1,7 +1,7 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <zenstore/gc.h> #include <zenstore/CAS.h> +#include <zenstore/gc.h> namespace zen { @@ -33,6 +33,18 @@ GcContext::ContributeCas(std::span<const IoHash> Cas) m_State->m_CasChunks.AddChunksToSet(Cas); } +void +GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc) +{ + m_State->m_CidChunks.FilterChunks(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); }); +} + +void +GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc) +{ + m_State->m_CasChunks.FilterChunks(Cas, [&](const IoHash& Hash) { KeepFunc(Hash); }); +} + ////////////////////////////////////////////////////////////////////////// GcContributor::GcContributor(CasGc& Gc) : m_Gc(Gc) @@ -67,14 +79,14 @@ CasGc::~CasGc() { } -void +void CasGc::AddGcContributor(GcContributor* Contributor) { RwLock::ExclusiveLockScope _(m_Lock); m_GcContribs.push_back(Contributor); } -void +void CasGc::RemoveGcContributor(GcContributor* Contributor) { RwLock::ExclusiveLockScope _(m_Lock); diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index a387b905e..5b508baa0 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -11,6 +11,7 @@ #include <zencore/timer.h> #include <atomic> +#include <concepts> #include <filesystem> #include <functional> #include <memory> @@ -48,6 +49,17 @@ public: inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); } inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); } + inline void FilterChunks(std::span<const IoHash> Candidates, std::invocable<const IoHash&> auto MatchFunc) + { + for (const IoHash& Candidate : Candidates) + { + if (ContainsChunk(Candidate)) + { + MatchFunc(Candidate); + } + } + } + private: // Q: should we protect this with a lock, or is that a higher level concern? std::unordered_set<IoHash> m_ChunkSet; diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h index 00b987383..065a74b25 100644 --- a/zenstore/include/zenstore/caslog.h +++ b/zenstore/include/zenstore/caslog.h @@ -57,6 +57,8 @@ template<typename T> class TCasLogFile : public CasLogFile { public: + void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); } + // This should be called before the Replay() is called to do some basic sanity checking bool Initialize() { return true; } @@ -76,7 +78,6 @@ public: CasLogFile::Append(&Record, sizeof Record); } - void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); } }; } // namespace zen diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h index d51925a0c..ef62158ce 100644 --- a/zenstore/include/zenstore/gc.h +++ b/zenstore/include/zenstore/gc.h @@ -27,6 +27,9 @@ public: void ContributeCids(std::span<const IoHash> Cid); void ContributeCas(std::span<const IoHash> Hash); + void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc); + void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc); + private: struct GcState; |