aboutsummaryrefslogtreecommitdiff
path: root/zenstore
diff options
context:
space:
mode:
Diffstat (limited to 'zenstore')
-rw-r--r--zenstore/CAS.cpp166
-rw-r--r--zenstore/caslog.cpp18
-rw-r--r--zenstore/compactcas.cpp105
-rw-r--r--zenstore/compactcas.h38
-rw-r--r--zenstore/filecas.cpp135
-rw-r--r--zenstore/filecas.h21
-rw-r--r--zenstore/gc.cpp111
-rw-r--r--zenstore/include/zenstore/CAS.h36
-rw-r--r--zenstore/include/zenstore/caslog.h3
-rw-r--r--zenstore/include/zenstore/cidstore.h5
-rw-r--r--zenstore/include/zenstore/gc.h79
-rw-r--r--zenstore/zenstore.cpp2
-rw-r--r--zenstore/zenstore.vcxproj2
13 files changed, 614 insertions, 107 deletions
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp
index a4bbfa340..86c6eb849 100644
--- a/zenstore/CAS.cpp
+++ b/zenstore/CAS.cpp
@@ -5,6 +5,9 @@
#include "compactcas.h"
#include "filecas.h"
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
#include <zencore/except.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
@@ -14,6 +17,7 @@
#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
+#include <zenstore/gc.h>
#include <gsl/gsl-lite.hpp>
@@ -67,34 +71,6 @@ CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callba
//////////////////////////////////////////////////////////////////////////
-struct GcContext::GcState
-{
- CasChunkSet m_CasChunks;
- CasChunkSet m_CidChunks;
-};
-
-GcContext::GcContext() : m_State(std::make_unique<GcState>())
-{
-}
-
-GcContext::~GcContext()
-{
-}
-
-void
-GcContext::ContributeCids(std::span<const IoHash> Cids)
-{
- m_State->m_CidChunks.AddChunksToSet(Cids);
-}
-
-void
-GcContext::ContributeCas(std::span<const IoHash> Cas)
-{
- m_State->m_CasChunks.AddChunksToSet(Cas);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
void
ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks)
{
@@ -119,7 +95,7 @@ ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
class CasImpl : public CasStore
{
public:
- CasImpl();
+ CasImpl(CasGc& Gc);
virtual ~CasImpl();
virtual void Initialize(const CasStoreConfiguration& InConfig) override;
@@ -128,14 +104,27 @@ public:
virtual void FilterChunks(CasChunkSet& InOutChunks) override;
virtual void Flush() override;
virtual void Scrub(ScrubContext& Ctx) override;
+ virtual void GarbageCollect(GcContext& GcCtx) override;
private:
CasContainerStrategy m_TinyStrategy;
CasContainerStrategy m_SmallStrategy;
FileCasStrategy m_LargeStrategy;
+ CbObject m_ManifestObject;
+
+ enum class StorageScheme
+ {
+ Legacy = 0,
+ WithCbManifest = 1
+ };
+
+ StorageScheme m_StorageScheme = StorageScheme::Legacy;
+
+ bool OpenOrCreateManifest();
+ void UpdateManifest();
};
-CasImpl::CasImpl() : m_TinyStrategy(m_Config), m_SmallStrategy(m_Config), m_LargeStrategy(m_Config)
+CasImpl::CasImpl(CasGc& Gc) : m_TinyStrategy(m_Config), m_SmallStrategy(m_Config), m_LargeStrategy(m_Config, Gc)
{
}
@@ -155,39 +144,100 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig)
std::filesystem::create_directories(m_Config.RootDirectory);
// Open or create manifest
- //
- // The manifest is not currently fully implemented. The goal is to
- // use it for recovery and configuration
+ const bool IsNewStore = OpenOrCreateManifest();
+
+ // Initialize payload storage
+
+ m_LargeStrategy.Initialize(IsNewStore);
+ m_TinyStrategy.Initialize("tobs", 16, IsNewStore);
+ m_SmallStrategy.Initialize("sobs", 4096, IsNewStore);
+}
+
+bool
+CasImpl::OpenOrCreateManifest()
+{
bool IsNewStore = false;
- {
- std::filesystem::path ManifestPath = m_Config.RootDirectory;
- ManifestPath /= ".ucas_root";
+ std::filesystem::path ManifestPath = m_Config.RootDirectory;
+ ManifestPath /= ".ucas_root";
- std::error_code Ec;
- BasicFile Marker;
- Marker.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec);
+ std::error_code Ec;
+ BasicFile ManifestFile;
+ ManifestFile.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec);
- if (Ec)
+ bool ManifestIsOk = false;
+
+ if (Ec)
+ {
+ if (Ec == std::errc::no_such_file_or_directory)
{
IsNewStore = true;
+ }
+ }
+ else
+ {
+ IoBuffer ManifestBuffer = ManifestFile.ReadAll();
+ ManifestFile.Close();
- ExtendableStringBuilder<128> manifest;
- manifest.Append("#CAS_ROOT\n");
- manifest.Append("ID=");
- zen::Oid id = zen::Oid::NewOid();
- id.ToString(manifest);
-
- Marker.Open(ManifestPath.c_str(), /* IsCreate */ true);
- Marker.Write(manifest.c_str(), (DWORD)manifest.Size(), 0);
+ if (ManifestBuffer.Size() > 0 && ManifestBuffer.Data<uint8_t>()[0] == '#')
+ {
+ // Old-style manifest, does not contain any useful information, so we may as well update it
+ }
+ else
+ {
+ CbObject Manifest{SharedBuffer(ManifestBuffer)};
+ CbValidateError ValidationResult = ValidateCompactBinary(ManifestBuffer, CbValidateMode::All);
+
+ if (ValidationResult == CbValidateError::None)
+ {
+ if (Manifest["id"])
+ {
+ ManifestIsOk = true;
+ }
+ }
+ else
+ {
+ ZEN_ERROR("Store manifest validation failed: {:#x}, will generate new manifest to recover", ValidationResult);
+ }
+
+ if (ManifestIsOk)
+ {
+ m_ManifestObject = std::move(Manifest);
+ }
}
}
- // Initialize payload storage
+ if (!ManifestIsOk)
+ {
+ UpdateManifest();
+ }
- m_TinyStrategy.Initialize("tobs", 16, IsNewStore);
- m_SmallStrategy.Initialize("sobs", 4096, IsNewStore);
+ return IsNewStore;
+}
+
+void
+CasImpl::UpdateManifest()
+{
+ if (!m_ManifestObject)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "id" << zen::Oid::NewOid() << "created" << DateTime::Now();
+ m_ManifestObject = Cbo.Save();
+ }
+
+ // Write manifest to file
+
+ std::filesystem::path ManifestPath = m_Config.RootDirectory;
+ ManifestPath /= ".ucas_root";
+
+ // This will throw on failure
+
+ ZEN_TRACE("Writing new manifest to '{}'", ManifestPath);
+
+ BasicFile Marker;
+ Marker.Open(ManifestPath.c_str(), /* IsCreate */ true);
+ Marker.Write(m_ManifestObject.GetBuffer(), 0);
}
CasStore::InsertResult
@@ -262,12 +312,18 @@ CasImpl::Scrub(ScrubContext& Ctx)
m_LargeStrategy.Scrub(Ctx);
}
+void
+CasImpl::GarbageCollect(GcContext& GcCtx)
+{
+ m_LargeStrategy.CollectGarbage(GcCtx);
+}
+
//////////////////////////////////////////////////////////////////////////
CasStore*
-CreateCasStore()
+CreateCasStore(CasGc& Gc)
{
- return new CasImpl();
+ return new CasImpl(Gc);
}
//////////////////////////////////////////////////////////////////////////
@@ -284,7 +340,9 @@ TEST_CASE("CasStore")
CasStoreConfiguration config;
config.RootDirectory = TempDir.Path();
- std::unique_ptr<CasStore> Store{CreateCasStore()};
+ CasGc Gc;
+
+ std::unique_ptr<CasStore> Store{CreateCasStore(Gc)};
Store->Initialize(config);
ScrubContext Ctx;
diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp
index 2bac6affd..38d0f818e 100644
--- a/zenstore/caslog.cpp
+++ b/zenstore/caslog.cpp
@@ -46,7 +46,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
m_RecordSize = RecordSize;
std::error_code Ec;
- m_File.Open(FileName, IsCreate);
+ m_File.Open(FileName, IsCreate, Ec);
if (Ec)
{
@@ -55,7 +55,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
uint64_t AppendOffset = 0;
- if (IsCreate)
+ if (IsCreate || (m_File.FileSize() < sizeof(FileHeader)))
{
// Initialize log by writing header
FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0};
@@ -76,12 +76,18 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum()))
{
- // TODO: provide more context!
- throw std::runtime_error("Mangled log header");
+ throw std::runtime_error("Mangled log header (invalid header magic) in '{}'"_format(FileName));
}
AppendOffset = m_File.FileSize();
- m_Header = Header;
+
+ // Adjust the offset to ensure we end up on a good boundary, in case there is some garbage appended
+
+ AppendOffset -= sizeof Header;
+ AppendOffset -= AppendOffset % RecordSize;
+ AppendOffset += sizeof Header;
+
+ m_Header = Header;
}
m_AppendOffset = AppendOffset;
@@ -125,6 +131,8 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler)
{
Handler(ReadBuffer.data() + (i * m_RecordSize));
}
+
+ m_AppendOffset = LogBaseOffset + (LogFileSize * LogEntryCount);
}
void
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 612f87c7c..dbe5572b9 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -4,13 +4,19 @@
#include "CompactCas.h"
+#include <zencore/compactbinarybuilder.h>
#include <zencore/except.h>
+#include <zencore/filesystem.h>
#include <zencore/logging.h>
#include <zencore/memory.h>
#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
+#include <zenstore/gc.h>
+
#include <filesystem>
#include <functional>
#include <gsl/gsl-lite.hpp>
@@ -58,7 +64,7 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6
m_CasLog.Replay([&](const CasDiskIndexEntry& Record) {
m_LocationMap[Record.Key] = Record.Location;
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset + Record.Location.Size);
+ MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize());
});
}
@@ -91,7 +97,7 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
RwLock::ExclusiveLockScope __(m_LocationMapLock);
- CasDiskLocation Location{.Offset = InsertOffset, .Size = /* TODO FIX */ uint32_t(ChunkSize)};
+ const CasDiskLocation Location{InsertOffset, ChunkSize};
m_LocationMap[ChunkHash] = Location;
@@ -116,7 +122,8 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
{
const CasDiskLocation& Location = KeyIt->second;
- return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.Offset, Location.Size);
+
+ return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize());
}
// Not found
@@ -187,11 +194,11 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
for (auto& Entry : m_LocationMap)
{
- const uint64_t EntryOffset = Entry.second.Offset;
+ const uint64_t EntryOffset = Entry.second.GetOffset();
if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
{
- const uint64_t EntryEnd = EntryOffset + Entry.second.Size;
+ const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize();
if (EntryEnd >= WindowEnd)
{
@@ -201,7 +208,8 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
}
const IoHash ComputedHash =
- IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, Entry.second.Size);
+ IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart,
+ Entry.second.GetSize());
if (Entry.first != ComputedHash)
{
@@ -222,7 +230,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
for (const CasDiskIndexEntry& Entry : BigChunks)
{
IoHashStream Hasher;
- m_SmallObjectFile.StreamByteRange(Entry.Location.Offset, Entry.Location.Size, [&](const void* Data, uint64_t Size) {
+ m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) {
Hasher.Append(Data, Size);
});
IoHash ComputedHash = Hasher.GetHash();
@@ -258,6 +266,12 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
}
void
+CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
+void
CasContainerStrategy::MakeSnapshot()
{
RwLock::SharedLockScope _(m_LocationMapLock);
@@ -275,4 +289,81 @@ CasContainerStrategy::MakeSnapshot()
m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0);
}
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("cas.compact.gc")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+
+ CreateDirectories(CasConfig.RootDirectory);
+
+ const int kIterationCount = 1000;
+
+ std::vector<IoHash> Keys(kIterationCount);
+
+ {
+ CasContainerStrategy Cas(CasConfig);
+ Cas.Initialize("test", 16, true);
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "id" << i;
+ CbObject Obj = Cbo.Save();
+
+ IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
+ const IoHash Hash = HashBuffer(ObjBuffer);
+
+ Cas.InsertChunk(ObjBuffer, Hash);
+
+ Keys[i] = Hash;
+ }
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ IoBuffer Chunk = Cas.FindChunk(Keys[i]);
+
+ CHECK(!!Chunk);
+
+ CbObject Value = LoadCompactBinaryObject(Chunk);
+
+ CHECK_EQ(Value["id"].AsInt32(), i);
+ }
+ }
+
+ // Validate that we can still read the inserted data after closing
+ // the original cas store
+
+ {
+ CasContainerStrategy Cas(CasConfig);
+ Cas.Initialize("test", 16, false);
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ IoBuffer Chunk = Cas.FindChunk(Keys[i]);
+
+ CHECK(!!Chunk);
+
+ CbObject Value = LoadCompactBinaryObject(Chunk);
+
+ CHECK_EQ(Value["id"].AsInt32(), i);
+ }
+
+ GcContext Ctx;
+ Cas.CollectGarbage(Ctx);
+ }
+}
+
+#endif
+
+void
+compactcas_forcelink()
+{
+}
+
} // namespace zen
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
index a512c3d93..a3f3121e6 100644
--- a/zenstore/compactcas.h
+++ b/zenstore/compactcas.h
@@ -23,17 +23,42 @@ namespace zen {
struct CasDiskLocation
{
- uint64_t Offset;
- // If we wanted to be able to store larger chunks using this storage mechanism then
- // we could make this more like the IoStore index so we can store larger chunks.
- // I.e use five bytes for size and seven for offset
- uint32_t Size;
+ CasDiskLocation(uint64_t InOffset, uint64_t InSize)
+ {
+ ZEN_ASSERT(InOffset <= 0xff'ffff'ffff);
+ ZEN_ASSERT(InSize <= 0xff'ffff'ffff);
+
+ memcpy(&m_Offset[0], &InOffset, sizeof m_Offset);
+ memcpy(&m_Size[0], &InSize, sizeof m_Size);
+ }
+
+ CasDiskLocation() = default;
+
+ inline uint64_t GetOffset() const
+ {
+ uint64_t Offset = 0;
+ memcpy(&Offset, &m_Offset, sizeof m_Offset);
+ return Offset;
+ }
+
+ inline uint64_t GetSize() const
+ {
+ uint64_t Size = 0;
+ memcpy(&Size, &m_Size, sizeof m_Size);
+ return Size;
+ }
+
+private:
+ uint8_t m_Offset[5];
+ uint8_t m_Size[5];
};
struct CasDiskIndexEntry
{
IoHash Key;
CasDiskLocation Location;
+ ZenContentType ContentType = ZenContentType::kUnknownContentType;
+ uint8_t Flags = 0;
};
#pragma pack(pop)
@@ -61,6 +86,7 @@ struct CasContainerStrategy
void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore);
void Flush();
void Scrub(ScrubContext& Ctx);
+ void CollectGarbage(GcContext& GcCtx);
private:
const CasStoreConfiguration& m_Config;
@@ -80,4 +106,6 @@ private:
void MakeSnapshot();
};
+void compactcas_forcelink();
+
} // namespace zen
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index a37450cd8..0714637c6 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -14,6 +14,11 @@
#include <zencore/thread.h>
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
+#include <zenstore/gc.h>
+
+#if ZEN_WITH_TESTS
+# include <zencore/compactbinarybuilder.h>
+#endif
#include <gsl/gsl-lite.hpp>
@@ -65,7 +70,10 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo
//////////////////////////////////////////////////////////////////////////
-FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config), m_Log(logging::Get("filecas"))
+FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
+: GcStorage(Gc)
+, m_Config(Config)
+, m_Log(logging::Get("filecas"))
{
}
@@ -73,9 +81,23 @@ FileCasStrategy::~FileCasStrategy()
{
}
+void
+FileCasStrategy::Initialize(bool IsNewStore)
+{
+ m_IsInitialized = true;
+
+ CreateDirectories(m_Config.RootDirectory);
+
+ m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore);
+
+ m_CasLog.Replay([&](const FileCasIndexEntry& Entry) { ZEN_UNUSED(Entry); });
+}
+
CasStore::InsertResult
FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
{
+ ZEN_ASSERT(m_IsInitialized);
+
// File-based chunks have special case handling whereby we move the file into
// place in the file store directory, thus avoiding unnecessary copying
@@ -207,6 +229,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
if (Success)
{
+ m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
+
return CasStore::InsertResult{.New = true};
}
@@ -232,6 +256,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
CasStore::InsertResult
FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash)
{
+ ZEN_ASSERT(m_IsInitialized);
+
ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
// See if file already exists
@@ -304,12 +330,16 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
// *after* the lock is released due to the initialization order
PayloadFile.Close();
+ m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize});
+
return {.New = true};
}
IoBuffer
FileCasStrategy::FindChunk(const IoHash& ChunkHash)
{
+ ZEN_ASSERT(m_IsInitialized);
+
ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -320,6 +350,8 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash)
bool
FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
+ ZEN_ASSERT(m_IsInitialized);
+
ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -332,6 +364,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
return false;
}
+
void
FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
{
@@ -340,11 +373,18 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
ZEN_DEBUG("deleting CAS payload file '{}'", WideToUtf8(Name.ShardedPath));
std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
+
+ if (!Ec)
+ {
+ m_CasLog.Append({.Key = ChunkHash, .Size = ~(0ull)});
+ }
}
void
FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
{
+ ZEN_ASSERT(m_IsInitialized);
+
// NOTE: it's not a problem now, but in the future if a GC should happen while this
// is in flight, the result could be wrong since chunks could go away in the meantime.
//
@@ -359,6 +399,8 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
void
FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback)
{
+ ZEN_ASSERT(m_IsInitialized);
+
struct Visitor : public FileSystemTraversal::TreeVisitor
{
Visitor(const std::filesystem::path& RootDir) : RootDirectory(RootDir) {}
@@ -430,6 +472,8 @@ FileCasStrategy::Flush()
void
FileCasStrategy::Scrub(ScrubContext& Ctx)
{
+ ZEN_ASSERT(m_IsInitialized);
+
std::vector<IoHash> BadHashes;
std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
@@ -476,9 +520,58 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
}
void
-FileCasStrategy::GarbageCollect(GcContext& GcCtx)
+FileCasStrategy::CollectGarbage(GcContext& GcCtx)
{
- ZEN_UNUSED(GcCtx);
+ ZEN_ASSERT(m_IsInitialized);
+
+ ZEN_INFO("collecting garbage from {}", m_Config.RootDirectory);
+
+ std::vector<IoHash> ChunksToDelete;
+ std::atomic<uint64_t> ChunksToDeleteBytes{0};
+ std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
+
+ std::vector<IoHash> CandidateCas;
+
+ IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
+ bool KeepThis = false;
+ CandidateCas.clear();
+ CandidateCas.push_back(Hash);
+ GcCtx.FilterCas(CandidateCas, [&](const IoHash& Hash) {
+ ZEN_UNUSED(Hash);
+ KeepThis = true;
+ });
+
+ const uint64_t FileSize = Payload.FileSize();
+
+ if (!KeepThis)
+ {
+ ChunksToDelete.push_back(Hash);
+ ChunksToDeleteBytes.fetch_add(FileSize);
+ }
+
+ ++ChunkCount;
+ ChunkBytes.fetch_add(FileSize);
+ });
+
+ ZEN_INFO("file CAS gc scanned: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes));
+
+ if (ChunksToDelete.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("deleting file CAS garbage: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunksToDeleteBytes));
+
+ for (const IoHash& Hash : ChunksToDelete)
+ {
+ std::error_code Ec;
+ DeleteChunk(Hash, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("failed to delete file for chunk {}: '{}'", Hash, Ec.message());
+ }
+ }
}
//////////////////////////////////////////////////////////////////////////
@@ -489,12 +582,16 @@ TEST_CASE("cas.file.move")
{
using namespace fmt::literals;
- ScopedTemporaryDirectory TempDir{"d:\\filecas_testdir"};
+ // specifying an absolute path here can be helpful when using procmon to dig into things
+ ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
+
+ CasGc Gc;
CasStoreConfiguration CasConfig;
CasConfig.RootDirectory = TempDir.Path() / "cas";
- FileCasStrategy FileCas(CasConfig);
+ FileCasStrategy FileCas(CasConfig, Gc);
+ FileCas.Initialize(/* IsNewStore */ true);
{
std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
@@ -564,6 +661,34 @@ TEST_CASE("cas.file.move")
# endif
}
+TEST_CASE("cas.file.gc")
+{
+ // specifying an absolute path here can be helpful when using procmon to dig into things
+ ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path() / "cas";
+
+ CasGc Gc;
+ FileCasStrategy FileCas(CasConfig, Gc);
+ FileCas.Initialize(/* IsNewStore */ true);
+
+ for (int i = 0; i < 1000; ++i)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "id" << i;
+ CbObject Obj = Cbo.Save();
+
+ IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
+ IoHash Hash = HashBuffer(ObjBuffer);
+
+ FileCas.InsertChunk(ObjBuffer, Hash);
+ }
+
+ GcContext Ctx;
+ FileCas.CollectGarbage(Ctx);
+}
+
#endif
void
diff --git a/zenstore/filecas.h b/zenstore/filecas.h
index 14314ce52..ec2ca3f31 100644
--- a/zenstore/filecas.h
+++ b/zenstore/filecas.h
@@ -9,6 +9,8 @@
#include <zencore/string.h>
#include <zencore/thread.h>
#include <zenstore/cas.h>
+#include <zenstore/caslog.h>
+#include <zenstore/gc.h>
#include <functional>
@@ -23,18 +25,19 @@ class BasicFile;
/** CAS storage strategy using a file-per-chunk storage strategy
*/
-struct FileCasStrategy
+struct FileCasStrategy : public GcStorage
{
- FileCasStrategy(const CasStoreConfiguration& Config);
+ FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc);
~FileCasStrategy();
+ void Initialize(bool IsNewStore);
CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
void FilterChunks(CasChunkSet& InOutChunks);
void Flush();
- void GarbageCollect(GcContext& GcCtx);
+ virtual void CollectGarbage(GcContext& GcCtx) override;
void Scrub(ScrubContext& Ctx);
private:
@@ -43,6 +46,18 @@ private:
RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
spdlog::logger& m_Log;
spdlog::logger& Log() { return m_Log; }
+ bool m_IsInitialized = false;
+
+ struct FileCasIndexEntry
+ {
+ IoHash Key;
+ uint32_t Pad = 0;
+ uint64_t Size = 0;
+ };
+
+ static_assert(sizeof(FileCasIndexEntry) == 32);
+
+ TCasLogFile<FileCasIndexEntry> m_CasLog;
inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback);
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index bfb8f015e..cb03f72ff 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -1,10 +1,77 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zenstore/CAS.h>
#include <zenstore/gc.h>
namespace zen {
-CasGc::CasGc(CasStore& Store) : m_CasStore(Store)
+//////////////////////////////////////////////////////////////////////////
+
+struct GcContext::GcState
+{
+ CasChunkSet m_CasChunks;
+ CasChunkSet m_CidChunks;
+};
+
+GcContext::GcContext() : m_State(std::make_unique<GcState>())
+{
+}
+
+GcContext::~GcContext()
+{
+}
+
+void
+GcContext::ContributeCids(std::span<const IoHash> Cids)
+{
+ m_State->m_CidChunks.AddChunksToSet(Cids);
+}
+
+void
+GcContext::ContributeCas(std::span<const IoHash> Cas)
+{
+ m_State->m_CasChunks.AddChunksToSet(Cas);
+}
+
+void
+GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc)
+{
+ m_State->m_CidChunks.FilterChunks(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); });
+}
+
+void
+GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc)
+{
+ m_State->m_CasChunks.FilterChunks(Cas, [&](const IoHash& Hash) { KeepFunc(Hash); });
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+GcContributor::GcContributor(CasGc& Gc) : m_Gc(Gc)
+{
+ m_Gc.AddGcContributor(this);
+}
+
+GcContributor::~GcContributor()
+{
+ m_Gc.RemoveGcContributor(this);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+GcStorage::GcStorage(CasGc& Gc) : m_Gc(Gc)
+{
+ m_Gc.AddGcStorage(this);
+}
+
+GcStorage::~GcStorage()
+{
+ m_Gc.AddGcStorage(this);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+CasGc::CasGc()
{
}
@@ -13,12 +80,52 @@ CasGc::~CasGc()
}
void
+CasGc::AddGcContributor(GcContributor* Contributor)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_GcContribs.push_back(Contributor);
+}
+
+void
+CasGc::RemoveGcContributor(GcContributor* Contributor)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ std::erase_if(m_GcContribs, [&](GcContributor* $) { return $ == Contributor; });
+}
+
+void
+CasGc::AddGcStorage(GcStorage* Storage)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_GcStorage.push_back(Storage);
+}
+
+void
+CasGc::RemoveGcStorage(GcStorage* Storage)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ std::erase_if(m_GcStorage, [&](GcStorage* $) { return $ == Storage; });
+}
+
+void
CasGc::CollectGarbage()
{
}
void
-CasGc::OnNewReferences(std::span<IoHash> Hashes)
+CasGc::OnNewCidReferences(std::span<IoHash> Hashes)
+{
+ ZEN_UNUSED(Hashes);
+}
+
+void
+CasGc::OnCommittedCidReferences(std::span<IoHash> Hashes)
+{
+ ZEN_UNUSED(Hashes);
+}
+
+void
+CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes)
{
ZEN_UNUSED(Hashes);
}
diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h
index 86e7e78d9..5b508baa0 100644
--- a/zenstore/include/zenstore/CAS.h
+++ b/zenstore/include/zenstore/CAS.h
@@ -11,6 +11,7 @@
#include <zencore/timer.h>
#include <atomic>
+#include <concepts>
#include <filesystem>
#include <functional>
#include <memory>
@@ -19,6 +20,9 @@
namespace zen {
+class GcContext;
+class CasGc;
+
struct CasStoreConfiguration
{
// Root directory for CAS store
@@ -45,29 +49,22 @@ public:
inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); }
inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); }
+ inline void FilterChunks(std::span<const IoHash> Candidates, std::invocable<const IoHash&> auto MatchFunc)
+ {
+ for (const IoHash& Candidate : Candidates)
+ {
+ if (ContainsChunk(Candidate))
+ {
+ MatchFunc(Candidate);
+ }
+ }
+ }
+
private:
// Q: should we protect this with a lock, or is that a higher level concern?
std::unordered_set<IoHash> m_ChunkSet;
};
-/** Garbage Collection context object
- */
-
-class GcContext
-{
-public:
- GcContext();
- ~GcContext();
-
- void ContributeCids(std::span<const IoHash> Cid);
- void ContributeCas(std::span<const IoHash> Hash);
-
-private:
- struct GcState;
-
- std::unique_ptr<GcState> m_State;
-};
-
/** Context object for data scrubbing
*
* Data scrubbing is when we traverse stored data to validate it and
@@ -116,13 +113,14 @@ public:
virtual void FilterChunks(CasChunkSet& InOutChunks) = 0;
virtual void Flush() = 0;
virtual void Scrub(ScrubContext& Ctx) = 0;
+ virtual void GarbageCollect(GcContext& GcCtx) = 0;
protected:
CasStoreConfiguration m_Config;
uint64_t m_LastScrubTime = 0;
};
-ZENCORE_API CasStore* CreateCasStore();
+ZENCORE_API CasStore* CreateCasStore(CasGc& Gc);
void CAS_forcelink();
diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h
index 00b987383..065a74b25 100644
--- a/zenstore/include/zenstore/caslog.h
+++ b/zenstore/include/zenstore/caslog.h
@@ -57,6 +57,8 @@ template<typename T>
class TCasLogFile : public CasLogFile
{
public:
+ void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); }
+
// This should be called before the Replay() is called to do some basic sanity checking
bool Initialize() { return true; }
@@ -76,7 +78,6 @@ public:
CasLogFile::Append(&Record, sizeof Record);
}
- void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); }
};
} // namespace zen
diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h
index 5f567e7fc..acfedbc64 100644
--- a/zenstore/include/zenstore/cidstore.h
+++ b/zenstore/include/zenstore/cidstore.h
@@ -4,10 +4,13 @@
#include "zenstore.h"
-#include <tsl/robin_map.h>
#include <zencore/iohash.h>
#include <zenstore/CAS.h>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
namespace std::filesystem {
class path;
}
diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h
index 055843547..ef62158ce 100644
--- a/zenstore/include/zenstore/gc.h
+++ b/zenstore/include/zenstore/gc.h
@@ -3,26 +3,99 @@
#pragma once
#include <zencore/iohash.h>
+#include <zencore/thread.h>
#include <span>
+#define ZEN_USE_REF_TRACKING 0 // This is not currently functional
+
namespace zen {
class CasStore;
+class CasGc;
struct IoHash;
+/** Garbage Collection context object
+ */
+
+class GcContext
+{
+public:
+ GcContext();
+ ~GcContext();
+
+ void ContributeCids(std::span<const IoHash> Cid);
+ void ContributeCas(std::span<const IoHash> Hash);
+
+ void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc);
+ void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc);
+
+private:
+ struct GcState;
+
+ std::unique_ptr<GcState> m_State;
+};
+
+/** GC root contributor
+
+ Higher level data structures provide roots for the garbage collector,
+ which ultimately determine what is garbage and what data we need to
+ retain.
+
+ */
+
+class GcContributor
+{
+public:
+ GcContributor(CasGc& Gc);
+ ~GcContributor();
+
+ virtual void GatherReferences(GcContext& GcCtx) = 0;
+
+protected:
+ CasGc& m_Gc;
+};
+
+/** GC storage provider
+ */
+
+class GcStorage
+{
+public:
+ GcStorage(CasGc& Gc);
+ ~GcStorage();
+
+ virtual void CollectGarbage(GcContext& GcCtrx) = 0;
+
+private:
+ CasGc& m_Gc;
+};
+
+/** GC orchestrator
+ */
+
class CasGc
{
public:
- CasGc(CasStore& Store);
+ CasGc();
~CasGc();
+ void AddGcContributor(GcContributor* Contributor);
+ void RemoveGcContributor(GcContributor* Contributor);
+
+ void AddGcStorage(GcStorage* Contributor);
+ void RemoveGcStorage(GcStorage* Contributor);
+
void CollectGarbage();
- void OnNewReferences(std::span<IoHash> Hashes);
+ void OnNewCidReferences(std::span<IoHash> Hashes);
+ void OnCommittedCidReferences(std::span<IoHash> Hashes);
+ void OnDroppedCidReferences(std::span<IoHash> Hashes);
private:
- CasStore& m_CasStore;
+ RwLock m_Lock;
+ std::vector<GcContributor*> m_GcContribs;
+ std::vector<GcStorage*> m_GcStorage;
};
} // namespace zen
diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp
index d852fa64b..9fdf2dccf 100644
--- a/zenstore/zenstore.cpp
+++ b/zenstore/zenstore.cpp
@@ -4,6 +4,7 @@
#include <zenstore/CAS.h>
#include <zenstore/basicfile.h>
+#include "compactcas.h"
#include "filecas.h"
namespace zen {
@@ -14,6 +15,7 @@ zenstore_forcelinktests()
basicfile_forcelink();
CAS_forcelink();
filecas_forcelink();
+ compactcas_forcelink();
}
} // namespace zen
diff --git a/zenstore/zenstore.vcxproj b/zenstore/zenstore.vcxproj
index eb2ecd02b..832ea8159 100644
--- a/zenstore/zenstore.vcxproj
+++ b/zenstore/zenstore.vcxproj
@@ -97,7 +97,6 @@
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
- <WarningLevel>Level3</WarningLevel>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
@@ -111,7 +110,6 @@
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
- <WarningLevel>Level3</WarningLevel>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<SDLCheck>true</SDLCheck>