aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-10-21 21:13:13 +0200
committerStefan Boberg <[email protected]>2021-10-21 21:13:13 +0200
commitcb1a2f52e37d9d92a908a8761a7f69d6d33cc4b1 (patch)
treea96e89cfc076d8e8a2f1ceca5d3985cdd3ba0c37
parentRemoved accidentally committed test code (diff)
downloadzen-cb1a2f52e37d9d92a908a8761a7f69d6d33cc4b1.tar.xz
zen-cb1a2f52e37d9d92a908a8761a7f69d6d33cc4b1.zip
filecas: Added commit log, chunk gc
-rw-r--r--zenstore/CAS.cpp1
-rw-r--r--zenstore/caslog.cpp14
-rw-r--r--zenstore/filecas.cpp95
-rw-r--r--zenstore/filecas.h23
-rw-r--r--zenstore/gc.cpp18
-rw-r--r--zenstore/include/zenstore/CAS.h12
-rw-r--r--zenstore/include/zenstore/caslog.h3
-rw-r--r--zenstore/include/zenstore/gc.h3
8 files changed, 151 insertions, 18 deletions
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp
index 09e13a702..ec8e8f570 100644
--- a/zenstore/CAS.cpp
+++ b/zenstore/CAS.cpp
@@ -149,6 +149,7 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig)
// Initialize payload storage
+ m_LargeStrategy.Initialize(IsNewStore);
m_TinyStrategy.Initialize("tobs", 16, IsNewStore);
m_SmallStrategy.Initialize("sobs", 4096, IsNewStore);
}
diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp
index 2bac6affd..af03e0391 100644
--- a/zenstore/caslog.cpp
+++ b/zenstore/caslog.cpp
@@ -55,7 +55,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
uint64_t AppendOffset = 0;
- if (IsCreate)
+ if (IsCreate || (m_File.FileSize() < sizeof(FileHeader)))
{
// Initialize log by writing header
FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0};
@@ -76,11 +76,17 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum()))
{
- // TODO: provide more context!
- throw std::runtime_error("Mangled log header");
+ throw std::runtime_error("Mangled log header (invalid header magic) in '{}'"_format(FileName));
}
AppendOffset = m_File.FileSize();
+
+ // Adjust the offset to ensure we end up on a good boundary, in case there is some garbage appended
+
+ AppendOffset -= sizeof Header;
+ AppendOffset -= AppendOffset % RecordSize;
+ AppendOffset += sizeof Header;
+
m_Header = Header;
}
@@ -125,6 +131,8 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler)
{
Handler(ReadBuffer.data() + (i * m_RecordSize));
}
+
+ m_AppendOffset = LogBaseOffset + (LogFileSize * LogEntryCount);
}
void
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index 9cb6e5c79..8c4df4029 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -70,7 +70,10 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo
//////////////////////////////////////////////////////////////////////////
-FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc) : GcStorage(Gc), m_Config(Config), m_Log(logging::Get("filecas"))
+FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
+: GcStorage(Gc)
+, m_Config(Config)
+, m_Log(logging::Get("filecas"))
{
}
@@ -78,9 +81,23 @@ FileCasStrategy::~FileCasStrategy()
{
}
+void
+FileCasStrategy::Initialize(bool IsNewStore)
+{
+ m_IsInitialized = true;
+
+ CreateDirectories(m_Config.RootDirectory);
+
+ m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore);
+
+ m_CasLog.Replay([&](const FileCasIndexEntry& Entry) {});
+}
+
CasStore::InsertResult
FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
{
+ ZEN_ASSERT(m_IsInitialized);
+
// File-based chunks have special case handling whereby we move the file into
// place in the file store directory, thus avoiding unnecessary copying
@@ -212,6 +229,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
if (Success)
{
+ m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
+
return CasStore::InsertResult{.New = true};
}
@@ -237,6 +256,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
CasStore::InsertResult
FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash)
{
+ ZEN_ASSERT(m_IsInitialized);
+
ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
// See if file already exists
@@ -309,12 +330,16 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
// *after* the lock is released due to the initialization order
PayloadFile.Close();
+ m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize});
+
return {.New = true};
}
IoBuffer
FileCasStrategy::FindChunk(const IoHash& ChunkHash)
{
+ ZEN_ASSERT(m_IsInitialized);
+
ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -325,6 +350,8 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash)
bool
FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
+ ZEN_ASSERT(m_IsInitialized);
+
ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -345,11 +372,18 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
ZEN_DEBUG("deleting CAS payload file '{}'", WideToUtf8(Name.ShardedPath));
std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
+
+ if (!Ec)
+ {
+ m_CasLog.Append({.Key = ChunkHash, .Size = ~(0ull)});
+ }
}
void
FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
{
+ ZEN_ASSERT(m_IsInitialized);
+
// NOTE: it's not a problem now, but in the future if a GC should happen while this
// is in flight, the result could be wrong since chunks could go away in the meantime.
//
@@ -364,6 +398,8 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
void
FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback)
{
+ ZEN_ASSERT(m_IsInitialized);
+
struct Visitor : public FileSystemTraversal::TreeVisitor
{
Visitor(const std::filesystem::path& RootDir) : RootDirectory(RootDir) {}
@@ -435,6 +471,8 @@ FileCasStrategy::Flush()
void
FileCasStrategy::Scrub(ScrubContext& Ctx)
{
+ ZEN_ASSERT(m_IsInitialized);
+
std::vector<IoHash> BadHashes;
std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
@@ -483,7 +521,53 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
void
FileCasStrategy::CollectGarbage(GcContext& GcCtx)
{
- ZEN_UNUSED(GcCtx);
+ ZEN_ASSERT(m_IsInitialized);
+
+ ZEN_INFO("collecting garbage from {}", m_Config.RootDirectory);
+
+ std::vector<IoHash> ChunksToDelete;
+ std::atomic<uint64_t> ChunksToDeleteBytes{0};
+ std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
+
+ std::vector<IoHash> CandidateCas;
+
+ IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
+ bool KeepThis = false;
+ CandidateCas.clear();
+ CandidateCas.push_back(Hash);
+ GcCtx.FilterCas(CandidateCas, [&](const IoHash& Hash) { KeepThis = true; });
+
+ const uint64_t FileSize = Payload.FileSize();
+
+ if (!KeepThis)
+ {
+ ChunksToDelete.push_back(Hash);
+ ChunksToDeleteBytes.fetch_add(FileSize);
+ }
+
+ ++ChunkCount;
+ ChunkBytes.fetch_add(FileSize);
+ });
+
+ ZEN_INFO("file CAS gc scanned: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes));
+
+ if (ChunksToDelete.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("deleting file CAS garbage: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunksToDeleteBytes));
+
+ for (const IoHash& Hash : ChunksToDelete)
+ {
+ std::error_code Ec;
+ DeleteChunk(Hash, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("failed to delete file for chunk {}: '{}'", Hash, Ec.message());
+ }
+ }
}
//////////////////////////////////////////////////////////////////////////
@@ -503,6 +587,7 @@ TEST_CASE("cas.file.move")
CasConfig.RootDirectory = TempDir.Path() / "cas";
FileCasStrategy FileCas(CasConfig, Gc);
+ FileCas.Initialize(/* IsNewStore */true);
{
std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
@@ -577,12 +662,12 @@ TEST_CASE("cas.file.gc")
// specifying an absolute path here can be helpful when using procmon to dig into things
ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
- CasGc Gc;
-
CasStoreConfiguration CasConfig;
CasConfig.RootDirectory = TempDir.Path() / "cas";
+ CasGc Gc;
FileCasStrategy FileCas(CasConfig, Gc);
+ FileCas.Initialize(/* IsNewStore */ true);
for (int i = 0; i < 1000; ++i)
{
@@ -594,11 +679,9 @@ TEST_CASE("cas.file.gc")
IoHash Hash = HashBuffer(ObjBuffer);
FileCas.InsertChunk(ObjBuffer, Hash);
- ;
}
GcContext Ctx;
-
FileCas.CollectGarbage(Ctx);
}
diff --git a/zenstore/filecas.h b/zenstore/filecas.h
index bbba9733e..686fd2ea8 100644
--- a/zenstore/filecas.h
+++ b/zenstore/filecas.h
@@ -9,6 +9,7 @@
#include <zencore/string.h>
#include <zencore/thread.h>
#include <zenstore/cas.h>
+#include <zenstore/caslog.h>
#include <zenstore/gc.h>
#include <functional>
@@ -21,6 +22,15 @@ namespace zen {
class BasicFile;
+struct FileCasIndexEntry
+{
+ IoHash Key;
+ uint32_t Pad = 0;
+ uint64_t Size = 0;
+};
+
+static_assert(sizeof(FileCasIndexEntry) == 32);
+
/** CAS storage strategy using a file-per-chunk storage strategy
*/
@@ -29,6 +39,7 @@ struct FileCasStrategy : public GcStorage
FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc);
~FileCasStrategy();
+ void Initialize(bool IsNewStore);
CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
IoBuffer FindChunk(const IoHash& ChunkHash);
@@ -39,11 +50,13 @@ struct FileCasStrategy : public GcStorage
void Scrub(ScrubContext& Ctx);
private:
- const CasStoreConfiguration& m_Config;
- RwLock m_Lock;
- RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
- spdlog::logger& m_Log;
- spdlog::logger& Log() { return m_Log; }
+ const CasStoreConfiguration& m_Config;
+ RwLock m_Lock;
+ RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
+ spdlog::logger& m_Log;
+ spdlog::logger& Log() { return m_Log; }
+ TCasLogFile<FileCasIndexEntry> m_CasLog;
+ bool m_IsInitialized = false;
inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback);
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index 79c646db2..cb03f72ff 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -1,7 +1,7 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include <zenstore/gc.h>
#include <zenstore/CAS.h>
+#include <zenstore/gc.h>
namespace zen {
@@ -33,6 +33,18 @@ GcContext::ContributeCas(std::span<const IoHash> Cas)
m_State->m_CasChunks.AddChunksToSet(Cas);
}
+void
+GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc)
+{
+ m_State->m_CidChunks.FilterChunks(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); });
+}
+
+void
+GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc)
+{
+ m_State->m_CasChunks.FilterChunks(Cas, [&](const IoHash& Hash) { KeepFunc(Hash); });
+}
+
//////////////////////////////////////////////////////////////////////////
GcContributor::GcContributor(CasGc& Gc) : m_Gc(Gc)
@@ -67,14 +79,14 @@ CasGc::~CasGc()
{
}
-void
+void
CasGc::AddGcContributor(GcContributor* Contributor)
{
RwLock::ExclusiveLockScope _(m_Lock);
m_GcContribs.push_back(Contributor);
}
-void
+void
CasGc::RemoveGcContributor(GcContributor* Contributor)
{
RwLock::ExclusiveLockScope _(m_Lock);
diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h
index a387b905e..5b508baa0 100644
--- a/zenstore/include/zenstore/CAS.h
+++ b/zenstore/include/zenstore/CAS.h
@@ -11,6 +11,7 @@
#include <zencore/timer.h>
#include <atomic>
+#include <concepts>
#include <filesystem>
#include <functional>
#include <memory>
@@ -48,6 +49,17 @@ public:
inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); }
inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); }
+ inline void FilterChunks(std::span<const IoHash> Candidates, std::invocable<const IoHash&> auto MatchFunc)
+ {
+ for (const IoHash& Candidate : Candidates)
+ {
+ if (ContainsChunk(Candidate))
+ {
+ MatchFunc(Candidate);
+ }
+ }
+ }
+
private:
// Q: should we protect this with a lock, or is that a higher level concern?
std::unordered_set<IoHash> m_ChunkSet;
diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h
index 00b987383..065a74b25 100644
--- a/zenstore/include/zenstore/caslog.h
+++ b/zenstore/include/zenstore/caslog.h
@@ -57,6 +57,8 @@ template<typename T>
class TCasLogFile : public CasLogFile
{
public:
+ void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); }
+
// This should be called before the Replay() is called to do some basic sanity checking
bool Initialize() { return true; }
@@ -76,7 +78,6 @@ public:
CasLogFile::Append(&Record, sizeof Record);
}
- void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); }
};
} // namespace zen
diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h
index d51925a0c..ef62158ce 100644
--- a/zenstore/include/zenstore/gc.h
+++ b/zenstore/include/zenstore/gc.h
@@ -27,6 +27,9 @@ public:
void ContributeCids(std::span<const IoHash> Cid);
void ContributeCas(std::span<const IoHash> Hash);
+ void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc);
+ void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc);
+
private:
struct GcState;