aboutsummaryrefslogtreecommitdiff
path: root/zenstore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-06-17 07:06:21 -0700
committerGitHub <[email protected]>2022-06-17 07:06:21 -0700
commitc7e22a4ef1cce7103b9afbeec487461cb32f8dbe (patch)
tree8b99d51bf496c96f82161c18fbdcfd5c6f8f31fd /zenstore
parentfixed merge mistake which caused a build error (diff)
downloadzen-0.1.4-pre6.tar.xz
zen-0.1.4-pre6.zip
Make cas storage an hidden implementation detail of CidStore (#130)v0.1.4-pre6v0.1.4-pre5
- Bumped ZEN_SCHEMA_VERSION - CasStore no longer a public API, it is hidden behind CidStore - Moved cas.h from public header folder - CidStore no longer maps from Cid -> Cas, we store entries in Cas under RawHash - CasStore now decompresses data to validate content (matching against RawHash) - CasChunkSet renames to HashKeySet and put in separate header/cpp file - Disabled "Chunk" command for now as it relied on CAS being exposed as a service - Changed CAS http service to Cid http server - Moved "Run" command completely inside ZEN_WITH_EXEC_SERVICES define - Removed "cas.basic" test - Uncommented ".exec.basic" test and added return-skip at start of test - Moved ScrubContext to separate header file - Renamed CasGC to GcManager - Cleaned up configuration passing in cas store classes - Removed CAS stuff from GcContext and clarified naming in class - Remove migration code
Diffstat (limited to 'zenstore')
-rw-r--r--zenstore/cas.cpp90
-rw-r--r--zenstore/cas.h61
-rw-r--r--zenstore/caslog.cpp2
-rw-r--r--zenstore/cidstore.cpp261
-rw-r--r--zenstore/compactcas.cpp745
-rw-r--r--zenstore/compactcas.h16
-rw-r--r--zenstore/filecas.cpp111
-rw-r--r--zenstore/filecas.h23
-rw-r--r--zenstore/gc.cpp174
-rw-r--r--zenstore/hashkeyset.cpp60
-rw-r--r--zenstore/include/zenstore/cas.h144
-rw-r--r--zenstore/include/zenstore/caslog.h2
-rw-r--r--zenstore/include/zenstore/cidstore.h57
-rw-r--r--zenstore/include/zenstore/gc.h44
-rw-r--r--zenstore/include/zenstore/hashkeyset.h54
-rw-r--r--zenstore/include/zenstore/scrubcontext.h40
-rw-r--r--zenstore/zenstore.cpp5
17 files changed, 620 insertions, 1269 deletions
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp
index 0e1d5b242..54e8cb11c 100644
--- a/zenstore/cas.cpp
+++ b/zenstore/cas.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include <zenstore/cas.h>
+#include "cas.h"
#include "compactcas.h"
#include "filecas.h"
@@ -18,7 +18,9 @@
#include <zencore/thread.h>
#include <zencore/trace.h>
#include <zencore/uid.h>
+#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
+#include <zenstore/scrubcontext.h>
#include <gsl/gsl-lite.hpp>
@@ -30,58 +32,6 @@
namespace zen {
-void
-CasChunkSet::AddChunkToSet(const IoHash& HashToAdd)
-{
- m_ChunkSet.insert(HashToAdd);
-}
-
-void
-CasChunkSet::AddChunksToSet(std::span<const IoHash> HashesToAdd)
-{
- m_ChunkSet.insert(HashesToAdd.begin(), HashesToAdd.end());
-}
-
-void
-CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate)
-{
- for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;)
- {
- if (Predicate(*It))
- {
- It = m_ChunkSet.erase(It);
- }
- else
- {
- ++It;
- }
- }
-}
-
-void
-CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback)
-{
- for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd; ++It)
- {
- Callback(*It);
- }
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-void
-ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks)
-{
- m_BadCas.AddChunksToSet(BadCasChunks);
-}
-
-void
-ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
-{
- m_ChunkCount.fetch_add(ChunkCount);
- m_ByteCount.fetch_add(ChunkBytes);
-}
-
/**
* CAS store implementation
*
@@ -93,18 +43,18 @@ ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
class CasImpl : public CasStore
{
public:
- CasImpl(CasGc& Gc);
+ CasImpl(GcManager& Gc);
virtual ~CasImpl();
- virtual void Initialize(const CasStoreConfiguration& InConfig) override;
+ virtual void Initialize(const CidStoreConfiguration& InConfig) override;
virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) override;
virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
virtual bool ContainsChunk(const IoHash& ChunkHash) override;
- virtual void FilterChunks(CasChunkSet& InOutChunks) override;
+ virtual void FilterChunks(HashKeySet& InOutChunks) override;
virtual void Flush() override;
virtual void Scrub(ScrubContext& Ctx) override;
virtual void GarbageCollect(GcContext& GcCtx) override;
- virtual CasStoreSize TotalSize() const override;
+ virtual CidStoreSize TotalSize() const override;
private:
CasContainerStrategy m_TinyStrategy;
@@ -124,7 +74,7 @@ private:
void UpdateManifest();
};
-CasImpl::CasImpl(CasGc& Gc) : m_TinyStrategy(m_Config, Gc), m_SmallStrategy(m_Config, Gc), m_LargeStrategy(m_Config, Gc)
+CasImpl::CasImpl(GcManager& Gc) : m_TinyStrategy(Gc), m_SmallStrategy(Gc), m_LargeStrategy(Gc)
{
}
@@ -133,7 +83,7 @@ CasImpl::~CasImpl()
}
void
-CasImpl::Initialize(const CasStoreConfiguration& InConfig)
+CasImpl::Initialize(const CidStoreConfiguration& InConfig)
{
m_Config = InConfig;
@@ -149,9 +99,9 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig)
// Initialize payload storage
- m_LargeStrategy.Initialize(IsNewStore);
- m_TinyStrategy.Initialize("tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block
- m_SmallStrategy.Initialize("sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block
+ m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore);
+ m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block
+ m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block
}
bool
@@ -292,7 +242,7 @@ CasImpl::ContainsChunk(const IoHash& ChunkHash)
}
void
-CasImpl::FilterChunks(CasChunkSet& InOutChunks)
+CasImpl::FilterChunks(HashKeySet& InOutChunks)
{
m_SmallStrategy.FilterChunks(InOutChunks);
m_TinyStrategy.FilterChunks(InOutChunks);
@@ -330,7 +280,7 @@ CasImpl::GarbageCollect(GcContext& GcCtx)
m_LargeStrategy.CollectGarbage(GcCtx);
}
-CasStoreSize
+CidStoreSize
CasImpl::TotalSize() const
{
const uint64_t Tiny = m_TinyStrategy.StorageSize().DiskSize;
@@ -343,7 +293,7 @@ CasImpl::TotalSize() const
//////////////////////////////////////////////////////////////////////////
std::unique_ptr<CasStore>
-CreateCasStore(CasGc& Gc)
+CreateCasStore(GcManager& Gc)
{
return std::make_unique<CasImpl>(Gc);
}
@@ -359,10 +309,10 @@ TEST_CASE("CasStore")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration config;
+ CidStoreConfiguration config;
config.RootDirectory = TempDir.Path();
- CasGc Gc;
+ GcManager Gc;
std::unique_ptr<CasStore> Store = CreateCasStore(Gc);
Store->Initialize(config);
@@ -382,9 +332,9 @@ TEST_CASE("CasStore")
CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2);
CHECK(Result2.New);
- CasChunkSet ChunkSet;
- ChunkSet.AddChunkToSet(Hash1);
- ChunkSet.AddChunkToSet(Hash2);
+ HashKeySet ChunkSet;
+ ChunkSet.AddHashToSet(Hash1);
+ ChunkSet.AddHashToSet(Hash2);
Store->FilterChunks(ChunkSet);
CHECK(ChunkSet.IsEmpty());
diff --git a/zenstore/cas.h b/zenstore/cas.h
new file mode 100644
index 000000000..2ad160d28
--- /dev/null
+++ b/zenstore/cas.h
@@ -0,0 +1,61 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/blake3.h>
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/refcount.h>
+#include <zencore/timer.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/hashkeyset.h>
+
+#include <atomic>
+#include <filesystem>
+#include <functional>
+#include <memory>
+#include <string>
+#include <unordered_set>
+
+namespace zen {
+
+class GcContext;
+class GcManager;
+class ScrubContext;
+
+/** Content Addressable Storage interface
+
+ */
+
+class CasStore
+{
+public:
+ virtual ~CasStore() = default;
+
+ const CidStoreConfiguration& Config() { return m_Config; }
+
+ struct InsertResult
+ {
+ bool New = false;
+ };
+
+ virtual void Initialize(const CidStoreConfiguration& Config) = 0;
+ virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash) = 0;
+ virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
+ virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
+ virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
+ virtual void Flush() = 0;
+ virtual void Scrub(ScrubContext& Ctx) = 0;
+ virtual void GarbageCollect(GcContext& GcCtx) = 0;
+ virtual CidStoreSize TotalSize() const = 0;
+
+protected:
+ CidStoreConfiguration m_Config;
+ uint64_t m_LastScrubTime = 0;
+};
+
+ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc);
+
+void CAS_forcelink();
+
+} // namespace zen
diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp
index 03a56f010..9c5258bce 100644
--- a/zenstore/caslog.cpp
+++ b/zenstore/caslog.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include <zenstore/cas.h>
+#include <zenstore/caslog.h>
#include "compactcas.h"
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index 01eda4697..5a079dbed 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -7,8 +7,9 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/string.h>
-#include <zenstore/cas.h>
-#include <zenstore/caslog.h>
+#include <zenstore/scrubcontext.h>
+
+#include "cas.h"
#include <filesystem>
@@ -18,153 +19,32 @@ struct CidStore::Impl
{
Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {}
- struct IndexEntry
- {
- IoHash Uncompressed;
- IoHash Compressed;
- };
-
- CasStore& m_CasStore;
- TCasLogFile<IndexEntry> m_LogFile;
+ CasStore& m_CasStore;
- RwLock m_Lock;
- tsl::robin_map<IoHash, IoHash> m_CidMap;
+ void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); }
- CidStore::InsertResult AddChunk(CompressedBuffer& ChunkData)
+ CidStore::InsertResult AddChunk(const CompressedBuffer& ChunkData)
{
const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash());
IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer();
- IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
Payload.SetContentType(ZenContentType::kCompressedBinary);
- CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, CompressedHash);
- AddCompressedCid(DecompressedId, CompressedHash);
+ CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, DecompressedId);
- return {.DecompressedId = DecompressedId, .CompressedHash = CompressedHash, .New = Result.New};
+ return {.New = Result.New};
}
- void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed)
- {
- ZEN_ASSERT(Compressed != IoHash::Zero);
-
- RwLock::ExclusiveLockScope _(m_Lock);
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId) { return m_CasStore.FindChunk(DecompressedId); }
- auto It = m_CidMap.try_emplace(DecompressedId, Compressed);
- if (!It.second)
- {
- if (It.first.value() != Compressed)
- {
- It.first.value() = Compressed;
- }
- else
- {
- // No point logging an update that won't change anything
- return;
- }
- }
+ bool ContainsChunk(const IoHash& DecompressedId) { return m_CasStore.ContainsChunk(DecompressedId); }
- // It's not ideal to do this while holding the lock in case
- // we end up in blocking I/O but that's for later
- LogMapping(DecompressedId, Compressed);
- }
-
- void LogMapping(const IoHash& DecompressedId, const IoHash& CompressedHash)
+ void FilterChunks(HashKeySet& InOutChunks)
{
- ZEN_ASSERT(DecompressedId != CompressedHash);
- m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = CompressedHash});
+ InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); });
}
- IoHash RemapCid(const IoHash& DecompressedId)
- {
- RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end())
- {
- return It->second;
- }
-
- return IoHash::Zero;
- }
-
- IoBuffer FindChunkByCid(const IoHash& DecompressedId)
- {
- IoHash CompressedHash;
-
- {
- RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end())
- {
- CompressedHash = It->second;
- }
- else
- {
- return {};
- }
- }
-
- ZEN_ASSERT(CompressedHash != IoHash::Zero);
-
- return m_CasStore.FindChunk(CompressedHash);
- }
-
- bool ContainsChunk(const IoHash& DecompressedId)
- {
- IoHash CasHash = IoHash::Zero;
-
- {
- RwLock::SharedLockScope _(m_Lock);
- if (const auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end())
- {
- CasHash = It->second;
- }
- }
-
- return CasHash != IoHash::Zero ? m_CasStore.ContainsChunk(CasHash) : false;
- }
-
- void InitializeIndex(const std::filesystem::path& RootDir)
- {
- CreateDirectories(RootDir);
- std::filesystem::path SlogPath{RootDir / "cid.slog"};
-
- bool IsNew = !std::filesystem::exists(SlogPath);
-
- m_LogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
-
- ZEN_DEBUG("Initializing index from '{}' ({})", SlogPath, NiceBytes(m_LogFile.GetLogSize()));
-
- uint64_t TombstoneCount = 0;
- uint64_t InvalidCount = 0;
-
- m_LogFile.Replay(
- [&](const IndexEntry& Entry) {
- if (Entry.Compressed != IoHash::Zero)
- {
- // Update
- m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed);
- }
- else
- {
- if (Entry.Uncompressed != IoHash::Zero)
- {
- // Tombstone
- m_CidMap.erase(Entry.Uncompressed);
- ++TombstoneCount;
- }
- else
- {
- // Completely uninitialized entry with both hashes set to zero indicates a
- // problem. Might be an unwritten page due to BSOD or some other problem
- ++InvalidCount;
- }
- }
- },
- 0);
-
- ZEN_INFO("CID index initialized: {} entries found ({} tombstones, {} invalid)", m_CidMap.size(), TombstoneCount, InvalidCount);
- }
-
- void Flush() { m_LogFile.Flush(); }
+ void Flush() { m_CasStore.Flush(); }
void Scrub(ScrubContext& Ctx)
{
@@ -175,83 +55,7 @@ struct CidStore::Impl
m_LastScrubTime = Ctx.ScrubTimestamp();
- CasChunkSet ChunkSet;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_CidMap)
- {
- ChunkSet.AddChunkToSet(Kv.second);
- }
- }
-
- m_CasStore.FilterChunks(ChunkSet);
-
- if (ChunkSet.IsEmpty())
- {
- // All good - we have all the chunks
- return;
- }
-
- ZEN_ERROR("Scrubbing found that {} cid mappings (out of {}) mapped to non-existent CAS chunks. These mappings will be removed",
- ChunkSet.GetSize(),
- m_CidMap.size());
-
- // Erase all mappings to chunks which are not present in the underlying CAS store
- // we do this by removing mappings from the in-memory lookup structure and also
- // by emitting tombstone records to the commit log
-
- std::vector<IoHash> BadChunks;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;)
- {
- if (ChunkSet.ContainsChunk(It->second))
- {
- const IoHash& BadHash = It->first;
-
- // Log a tombstone record
- LogMapping(BadHash, IoHash::Zero);
-
- BadChunks.push_back(BadHash);
-
- It = m_CidMap.erase(It);
- }
- else
- {
- ++It;
- }
- }
- }
-
- m_LogFile.Flush();
-
- // TODO: Should compute a snapshot index here
-
- Ctx.ReportBadCasChunks(BadChunks);
- }
-
- void RemoveCids(CasChunkSet& CasChunks)
- {
- std::vector<IndexEntry> RemovedEntries;
- RemovedEntries.reserve(CasChunks.GetSize());
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- for (auto It = m_CidMap.begin(), End = m_CidMap.end(); It != End;)
- {
- if (CasChunks.ContainsChunk(It->second))
- {
- RemovedEntries.push_back({It->first, IoHash::Zero});
- It = m_CidMap.erase(It);
- continue;
- }
- ++It;
- }
- }
- m_LogFile.Append(RemovedEntries);
+ m_CasStore.Scrub(Ctx);
}
uint64_t m_LastScrubTime = 0;
@@ -259,25 +63,24 @@ struct CidStore::Impl
//////////////////////////////////////////////////////////////////////////
-CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<Impl>(InCasStore))
+CidStore::CidStore(GcManager& Gc) : m_CasStore(CreateCasStore(Gc)), m_Impl(std::make_unique<Impl>(*m_CasStore))
{
- m_Impl->InitializeIndex(RootDir);
}
CidStore::~CidStore()
{
}
-CidStore::InsertResult
-CidStore::AddChunk(CompressedBuffer& ChunkData)
+void
+CidStore::Initialize(const CidStoreConfiguration& Config)
{
- return m_Impl->AddChunk(ChunkData);
+ m_Impl->Initialize(Config);
}
-void
-CidStore::AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed)
+CidStore::InsertResult
+CidStore::AddChunk(const CompressedBuffer& ChunkData)
{
- m_Impl->AddCompressedCid(DecompressedId, Compressed);
+ return m_Impl->AddChunk(ChunkData);
}
IoBuffer
@@ -286,12 +89,6 @@ CidStore::FindChunkByCid(const IoHash& DecompressedId)
return m_Impl->FindChunkByCid(DecompressedId);
}
-IoHash
-CidStore::RemapCid(const IoHash& DecompressedId)
-{
- return m_Impl->RemapCid(DecompressedId);
-}
-
bool
CidStore::ContainsChunk(const IoHash& DecompressedId)
{
@@ -299,25 +96,25 @@ CidStore::ContainsChunk(const IoHash& DecompressedId)
}
void
-CidStore::Flush()
+CidStore::FilterChunks(HashKeySet& InOutChunks)
{
- m_Impl->Flush();
+ return m_Impl->FilterChunks(InOutChunks);
}
void
-CidStore::Scrub(ScrubContext& Ctx)
+CidStore::Flush()
{
- m_Impl->Scrub(Ctx);
+ m_Impl->Flush();
}
void
-CidStore::RemoveCids(CasChunkSet& CasChunks)
+CidStore::Scrub(ScrubContext& Ctx)
{
- m_Impl->RemoveCids(CasChunks);
+ m_Impl->Scrub(Ctx);
}
-CasStoreSize
-CidStore::CasSize() const
+CidStoreSize
+CidStore::TotalSize() const
{
return m_Impl->m_CasStore.TotalSize();
}
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 5aed02e7f..a7fdfa1f5 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -2,13 +2,16 @@
#include "compactcas.h"
-#include <zenstore/cas.h>
+#include "cas.h"
+#include <zencore/compress.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
+#include <zenstore/scrubcontext.h>
+
#include <gsl/gsl-lite.hpp>
#include <xxhash.h>
@@ -76,94 +79,6 @@ namespace {
return GetBasePath(RootPath, ContainerBaseName) / "blocks";
}
- std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return RootPath / (ContainerBaseName + LogExtension);
- }
-
- std::filesystem::path GetLegacyDataPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return RootPath / (ContainerBaseName + ".ucas");
- }
-
- std::filesystem::path GetLegacyIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return RootPath / (ContainerBaseName + IndexExtension);
- }
-
- struct LegacyCasDiskLocation
- {
- LegacyCasDiskLocation(uint64_t InOffset, uint64_t InSize)
- {
- ZEN_ASSERT(InOffset <= 0xff'ffff'ffff);
- ZEN_ASSERT(InSize <= 0xff'ffff'ffff);
-
- memcpy(&m_Offset[0], &InOffset, sizeof m_Offset);
- memcpy(&m_Size[0], &InSize, sizeof m_Size);
- }
-
- LegacyCasDiskLocation() = default;
-
- inline uint64_t GetOffset() const
- {
- uint64_t Offset = 0;
- memcpy(&Offset, &m_Offset, sizeof m_Offset);
- return Offset;
- }
-
- inline uint64_t GetSize() const
- {
- uint64_t Size = 0;
- memcpy(&Size, &m_Size, sizeof m_Size);
- return Size;
- }
-
- private:
- uint8_t m_Offset[5];
- uint8_t m_Size[5];
- };
-
- struct LegacyCasDiskIndexEntry
- {
- static const uint8_t kTombstone = 0x01;
-
- IoHash Key;
- LegacyCasDiskLocation Location;
- ZenContentType ContentType = ZenContentType::kUnknownContentType;
- uint8_t Flags = 0;
- };
-
- bool ValidateLegacyEntry(const LegacyCasDiskIndexEntry& Entry, std::string& OutReason)
- {
- if (Entry.Key == IoHash::Zero)
- {
- OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
- return false;
- }
- if ((Entry.Flags & ~LegacyCasDiskIndexEntry::kTombstone) != 0)
- {
- OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Flags & LegacyCasDiskIndexEntry::kTombstone)
- {
- return true;
- }
- if (Entry.ContentType != ZenContentType::kUnknownContentType)
- {
- OutReason =
- fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString());
- return false;
- }
- uint64_t Size = Entry.Location.GetSize();
- if (Size == 0)
- {
- OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
- return false;
- }
- return true;
- }
-
bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason)
{
if (Entry.Key == IoHash::Zero)
@@ -199,10 +114,7 @@ namespace {
//////////////////////////////////////////////////////////////////////////
-CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
-: GcStorage(Gc)
-, m_Config(Config)
-, m_Log(logging::Get("containercas"))
+CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("containercas"))
{
}
@@ -211,16 +123,21 @@ CasContainerStrategy::~CasContainerStrategy()
}
void
-CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore)
+CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory,
+ const std::string_view ContainerBaseName,
+ uint32_t MaxBlockSize,
+ uint64_t Alignment,
+ bool IsNewStore)
{
ZEN_ASSERT(IsPow2(Alignment));
ZEN_ASSERT(!m_IsInitialized);
ZEN_ASSERT(MaxBlockSize > 0);
+ m_RootDirectory = RootDirectory;
m_ContainerBaseName = ContainerBaseName;
m_PayloadAlignment = Alignment;
m_MaxBlockSize = MaxBlockSize;
- m_BlocksBasePath = GetBlocksBasePath(m_Config.RootDirectory, m_ContainerBaseName);
+ m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName);
OpenContainer(IsNewStore);
@@ -267,6 +184,9 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
CasStore::InsertResult
CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
{
+#if !ZEN_WITH_TESTS
+ ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
+#endif
return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
}
@@ -293,7 +213,7 @@ CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
}
void
-CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks)
+CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
{
// This implementation is good enough for relatively small
// chunk sets (in terms of chunk identifiers), but would
@@ -302,7 +222,7 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks)
// we're likely to already have a large proportion of the
// chunks in the set
- InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
+ InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
}
void
@@ -316,6 +236,7 @@ void
CasContainerStrategy::Scrub(ScrubContext& Ctx)
{
std::vector<IoHash> BadKeys;
+ uint64_t ChunkCount{0}, ChunkBytes{0};
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
@@ -337,6 +258,9 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
}
const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
+
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
if (!Data)
{
@@ -344,66 +268,97 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
BadKeys.push_back(Hash);
return;
}
- const IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
- if (ComputedHash != Hash)
+
+ IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed)
+ {
+ if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash)
+ {
+ // Hash mismatch
+ BadKeys.push_back(Hash);
+ return;
+ }
+ return;
+ }
+#if ZEN_WITH_TESTS
+ IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
+ if (ComputedHash == Hash)
{
- // Hash mismatch
- BadKeys.push_back(Hash);
return;
}
+#endif
+ BadKeys.push_back(Hash);
};
const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
+
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+ // TODO: Add API to verify compressed buffer without having to memorymap the whole file
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed)
+ {
+ if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash)
+ {
+ // Hash mismatch
+ BadKeys.push_back(Hash);
+ return;
+ }
+ return;
+ }
+#if ZEN_WITH_TESTS
IoHashStream Hasher;
- File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
- IoHash ComputedHash = Hasher.GetHash();
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- if (ComputedHash != Hash)
+ File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
+ IoHash ComputedHash = Hasher.GetHash();
+ if (ComputedHash == Hash)
{
- // Hash mismatch
- BadKeys.push_back(Hash);
return;
}
+#endif
+ BadKeys.push_back(Hash);
};
m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
_.ReleaseNow();
- if (BadKeys.empty())
- {
- return;
- }
-
- ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_Config.RootDirectory / m_ContainerBaseName);
+ Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
- if (Ctx.RunRecovery())
+ if (!BadKeys.empty())
{
- // Deal with bad chunks by removing them from our lookup map
+ ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName);
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(BadKeys.size());
+ if (Ctx.RunRecovery())
{
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- for (const IoHash& ChunkHash : BadKeys)
+ // Deal with bad chunks by removing them from our lookup map
+
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(BadKeys.size());
{
- const auto KeyIt = m_LocationMap.find(ChunkHash);
- if (KeyIt == m_LocationMap.end())
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ for (const IoHash& ChunkHash : BadKeys)
{
- // Might have been GC'd
- continue;
+ const auto KeyIt = m_LocationMap.find(ChunkHash);
+ if (KeyIt == m_LocationMap.end())
+ {
+ // Might have been GC'd
+ continue;
+ }
+ LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone});
+ m_LocationMap.erase(KeyIt);
}
- LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone});
- m_LocationMap.erase(KeyIt);
}
+ m_CasLog.Append(LogEntries);
}
- m_CasLog.Append(LogEntries);
}
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadCasChunks(BadKeys);
+ Ctx.ReportBadCidChunks(BadKeys);
+
+ ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
}
void
@@ -432,7 +387,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
// do a blocking operation and update the m_LocationMap after each new block is
// written and figuring out the path to the next new block.
- ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+ ZEN_INFO("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName);
uint64_t WriteBlockTimeUs = 0;
uint64_t WriteBlockLongestTimeUs = 0;
@@ -468,7 +423,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
ChunkLocations.reserve(TotalChunkCount);
ChunkIndexToChunkHash.reserve(TotalChunkCount);
- GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
auto KeyIt = LocationMap.find(ChunkHash);
const BlockStoreDiskLocation& DiskLocation = KeyIt->second;
BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
@@ -539,26 +494,26 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
},
[&GcCtx]() { return GcCtx.CollectSmallObjects(); });
- GcCtx.DeletedCas(DeletedChunks);
+ GcCtx.AddDeletedCids(DeletedChunks);
}
void
CasContainerStrategy::MakeIndexSnapshot()
{
- ZEN_INFO("write store snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+ ZEN_INFO("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName);
uint64_t EntryCount = 0;
Stopwatch Timer;
const auto _ = MakeGuard([&] {
ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
- m_Config.RootDirectory / m_ContainerBaseName,
+ m_RootDirectory / m_ContainerBaseName,
EntryCount,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
namespace fs = std::filesystem;
- fs::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
- fs::path TempIndexPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+ fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
+ fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName);
// Move index away, we keep it if something goes wrong
if (fs::is_regular_file(TempIndexPath))
@@ -629,13 +584,13 @@ uint64_t
CasContainerStrategy::ReadIndexFile()
{
std::vector<CasDiskIndexEntry> Entries;
- std::filesystem::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
if (std::filesystem::is_regular_file(IndexPath))
{
Stopwatch Timer;
const auto _ = MakeGuard([&] {
ZEN_INFO("read store '{}' index containing #{} entries in {}",
- m_Config.RootDirectory / m_ContainerBaseName,
+ m_RootDirectory / m_ContainerBaseName,
Entries.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
@@ -682,13 +637,13 @@ uint64_t
CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
{
std::vector<CasDiskIndexEntry> Entries;
- std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
if (std::filesystem::is_regular_file(LogPath))
{
Stopwatch Timer;
const auto _ = MakeGuard([&] {
ZEN_INFO("read store '{}' log containing #{} entries in {}",
- m_Config.RootDirectory / m_ContainerBaseName,
+ m_RootDirectory / m_ContainerBaseName,
Entries.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
@@ -727,208 +682,6 @@ CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
return 0;
}
-uint64_t
-CasContainerStrategy::MigrateLegacyData(bool CleanSource)
-{
- std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName);
-
- if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0)
- {
- return 0;
- }
-
- ZEN_INFO("migrating store '{}'", m_Config.RootDirectory / m_ContainerBaseName);
-
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName);
- std::filesystem::path LegacyIndexPath = GetLegacyIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
-
- uint64_t MigratedChunkCount = 0;
- uint32_t MigratedBlockCount = 0;
- Stopwatch MigrationTimer;
- uint64_t TotalSize = 0;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
- m_Config.RootDirectory / m_ContainerBaseName,
- MigratedChunkCount,
- MigratedBlockCount,
- NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
- NiceBytes(TotalSize));
- });
-
- uint64_t BlockFileSize = 0;
- {
- BasicFile BlockFile;
- BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
- BlockFileSize = BlockFile.FileSize();
- }
-
- std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
- uint64_t InvalidEntryCount = 0;
-
- TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
- LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
- {
- Stopwatch Timer;
- const auto __ = MakeGuard([&] {
- ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- LegacyDiskIndex.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
- if (LegacyCasLog.Initialize())
- {
- LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
- LegacyCasLog.Replay(
- [&](const LegacyCasDiskIndexEntry& Record) {
- std::string InvalidEntryReason;
- if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone)
- {
- LegacyDiskIndex.erase(Record.Key);
- return;
- }
- if (!ValidateLegacyEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
- InvalidEntryCount++;
- return;
- }
- LegacyDiskIndex.insert_or_assign(Record.Key, Record);
- },
- 0);
-
- std::vector<IoHash> BadEntries;
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskIndexEntry& Record(Entry.second);
- if (Record.Location.GetOffset() + Record.Location.GetSize() <= BlockFileSize)
- {
- continue;
- }
- ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath);
- BadEntries.push_back(Entry.first);
- }
- for (const IoHash& BadHash : BadEntries)
- {
- LegacyDiskIndex.erase(BadHash);
- }
- InvalidEntryCount += BadEntries.size();
- }
- }
-
- if (InvalidEntryCount)
- {
- ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_Config.RootDirectory / m_ContainerBaseName);
- }
-
- if (LegacyDiskIndex.empty())
- {
- LegacyCasLog.Close();
- if (CleanSource)
- {
- // Older versions of CasContainerStrategy expects the legacy files to exist if it can find
- // a CAS manifest and crashes on startup if they don't.
- // In order to not break startup when switching back an older version, lets just reset
- // the legacy data files to zero length.
-
- BasicFile LegacyLog;
- LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySobs;
- LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySidx;
- LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate);
- }
- return 0;
- }
-
- std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
- CreateDirectories(LogPath.parent_path());
- TCasLogFile<CasDiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
-
- std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
- std::vector<BlockStoreLocation> ChunkLocations;
- ChunkIndexToChunkHash.reserve(LegacyDiskIndex.size());
- ChunkLocations.reserve(LegacyDiskIndex.size());
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskLocation& Location = Entry.second.Location;
- const IoHash& ChunkHash = Entry.first;
- size_t ChunkIndex = ChunkLocations.size();
- ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.GetOffset(), .Size = Location.GetSize()});
- ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
- TotalSize += Location.GetSize();
- }
- m_BlockStore.Split(
- ChunkLocations,
- LegacyDataPath,
- m_BlocksBasePath,
- m_MaxBlockSize,
- BlockStoreDiskLocation::MaxBlockIndex + 1,
- m_PayloadAlignment,
- CleanSource,
- [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
- const BlockStore::MovedChunksArray& MovedChunks) {
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(MovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
- LogEntries.push_back({.Key = ChunkHash,
- .Location = {NewLocation, m_PayloadAlignment},
- .ContentType = OldEntry.ContentType,
- .Flags = OldEntry.Flags});
- }
- for (const CasDiskIndexEntry& Entry : LogEntries)
- {
- m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
- }
- CasLog.Append(LogEntries);
- CasLog.Flush();
- if (CleanSource)
- {
- std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries;
- LegacyLogEntries.reserve(MovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
- LegacyLogEntries.push_back(
- LegacyCasDiskIndexEntry{.Key = ChunkHash,
- .Location = OldEntry.Location,
- .ContentType = OldEntry.ContentType,
- .Flags = (uint8_t)(OldEntry.Flags | LegacyCasDiskIndexEntry::kTombstone)});
- }
- LegacyCasLog.Append(LegacyLogEntries);
- LegacyCasLog.Flush();
- }
- MigratedBlockCount++;
- MigratedChunkCount += MovedChunks.size();
- });
-
- LegacyCasLog.Close();
- CasLog.Close();
-
- if (CleanSource)
- {
- // Older versions of CasContainerStrategy expects the legacy files to exist if it can find
- // a CAS manifest and crashes on startup if they don't.
- // In order to not break startup when switching back an older version, lets just reset
- // the legacy data files to zero length.
-
- BasicFile LegacyLog;
- LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySobs;
- LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySidx;
- LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate);
- }
- return MigratedChunkCount;
-}
-
void
CasContainerStrategy::OpenContainer(bool IsNewStore)
{
@@ -937,25 +690,19 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
m_LocationMap.clear();
- std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName);
if (IsNewStore)
{
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName);
- std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName);
-
- std::filesystem::remove(LegacyLogPath);
- std::filesystem::remove(LegacyDataPath);
std::filesystem::remove_all(BasePath);
}
- uint64_t LogPosition = ReadIndexFile();
- uint64_t LogEntryCount = ReadLog(LogPosition);
- uint64_t LegacyLogEntryCount = MigrateLegacyData(true);
+ uint64_t LogPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(LogPosition);
CreateDirectories(BasePath);
- std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
std::vector<BlockStoreLocation> KnownLocations;
@@ -969,7 +716,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
- if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0))
+ if (IsNewStore || (LogEntryCount > 0))
{
MakeIndexSnapshot();
}
@@ -1040,18 +787,14 @@ TEST_CASE("compactcas.compact.gc")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
-
const int kIterationCount = 1000;
std::vector<IoHash> Keys(kIterationCount);
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 65536, 16, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, true);
for (int i = 0; i < kIterationCount; ++i)
{
@@ -1083,9 +826,9 @@ TEST_CASE("compactcas.compact.gc")
// the original cas store
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 65536, 16, false);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
for (int i = 0; i < kIterationCount; ++i)
{
@@ -1109,18 +852,13 @@ TEST_CASE("compactcas.compact.totalsize")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
-
- CreateDirectories(CasConfig.RootDirectory);
-
const uint64_t kChunkSize = 1024;
const int32_t kChunkCount = 16;
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 65536, 16, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, true);
for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
{
@@ -1135,9 +873,9 @@ TEST_CASE("compactcas.compact.totalsize")
}
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 65536, 16, false);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
@@ -1145,9 +883,9 @@ TEST_CASE("compactcas.compact.totalsize")
// Re-open again, this time we should have a snapshot
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 65536, 16, false);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
@@ -1159,13 +897,9 @@ TEST_CASE("compactcas.gc.basic")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
-
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("cb", 65536, 1 << 4, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true);
IoBuffer Chunk = CreateChunk(128);
IoHash ChunkHash = IoHash::HashBuffer(Chunk);
@@ -1186,16 +920,12 @@ TEST_CASE("compactcas.gc.removefile")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
-
IoBuffer Chunk = CreateChunk(128);
IoHash ChunkHash = IoHash::HashBuffer(Chunk);
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("cb", 65536, 1 << 4, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true);
const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
CHECK(InsertResult.New);
@@ -1204,9 +934,9 @@ TEST_CASE("compactcas.gc.removefile")
Cas.Flush();
}
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("cb", 65536, 1 << 4, false);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false);
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
@@ -1222,13 +952,9 @@ TEST_CASE("compactcas.gc.compact")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
-
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("cb", 2048, 1 << 4, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true);
uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
std::vector<IoBuffer> Chunks;
@@ -1275,7 +1001,7 @@ TEST_CASE("compactcas.gc.compact")
std::vector<IoHash> KeepChunks;
KeepChunks.push_back(ChunkHashes[0]);
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
Cas.Flush();
Cas.CollectGarbage(GcCtx);
@@ -1308,7 +1034,7 @@ TEST_CASE("compactcas.gc.compact")
GcCtx.CollectSmallObjects(true);
std::vector<IoHash> KeepChunks;
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
Cas.Flush();
Cas.CollectGarbage(GcCtx);
@@ -1342,7 +1068,7 @@ TEST_CASE("compactcas.gc.compact")
KeepChunks.push_back(ChunkHashes[1]);
KeepChunks.push_back(ChunkHashes[4]);
KeepChunks.push_back(ChunkHashes[7]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
Cas.Flush();
Cas.CollectGarbage(GcCtx);
@@ -1377,7 +1103,7 @@ TEST_CASE("compactcas.gc.compact")
KeepChunks.push_back(ChunkHashes[6]);
KeepChunks.push_back(ChunkHashes[7]);
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
Cas.Flush();
Cas.CollectGarbage(GcCtx);
@@ -1414,7 +1140,7 @@ TEST_CASE("compactcas.gc.compact")
KeepChunks.push_back(ChunkHashes[4]);
KeepChunks.push_back(ChunkHashes[6]);
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
Cas.Flush();
Cas.CollectGarbage(GcCtx);
@@ -1476,13 +1202,10 @@ TEST_CASE("compactcas.gc.deleteblockonopen")
ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
}
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 1024, 16, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 1024, 16, true);
for (size_t i = 0; i < 20; i++)
{
@@ -1498,7 +1221,7 @@ TEST_CASE("compactcas.gc.deleteblockonopen")
{
KeepChunks.push_back(ChunkHashes[i]);
}
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
Cas.Flush();
Cas.CollectGarbage(GcCtx);
@@ -1513,9 +1236,9 @@ TEST_CASE("compactcas.gc.deleteblockonopen")
}
{
// Re-open
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 1024, 16, false);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 1024, 16, false);
for (size_t i = 0; i < 20; i += 2)
{
@@ -1545,13 +1268,9 @@ TEST_CASE("compactcas.gc.handleopeniobuffer")
ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
}
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
-
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 1024, 16, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 1024, 16, true);
for (size_t i = 0; i < 20; i++)
{
@@ -1574,131 +1293,12 @@ TEST_CASE("compactcas.gc.handleopeniobuffer")
CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk));
}
-TEST_CASE("compactcas.legacyconversion")
-{
- ScopedTemporaryDirectory TempDir;
-
- uint64_t ChunkSizes[] = {2041, 1123, 1223, 1239, 341, 1412, 912, 774, 341, 431, 554, 1098, 2048, 339, 561, 16, 16, 2048, 2048};
- size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t);
- size_t SingleBlockSize = 0;
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(ChunkCount);
- for (uint64_t Size : ChunkSizes)
- {
- Chunks.push_back(CreateChunk(Size));
- SingleBlockSize += Size;
- }
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(ChunkCount);
- for (const IoBuffer& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
-
- {
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", gsl::narrow<uint32_t>(SingleBlockSize * 2), 16, true);
-
- for (size_t i = 0; i < ChunkCount; i++)
- {
- CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
- }
-
- std::vector<IoHash> KeepChunks;
- for (size_t i = 0; i < ChunkCount; i += 2)
- {
- KeepChunks.push_back(ChunkHashes[i]);
- }
- GcContext GcCtx;
- GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepChunks);
- Cas.Flush();
- Gc.CollectGarbage(GcCtx);
- }
-
- std::filesystem::path BlockPath = BlockStore::GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1);
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(CasConfig.RootDirectory, "test");
- std::filesystem::rename(BlockPath, LegacyDataPath);
-
- std::vector<CasDiskIndexEntry> LogEntries;
- std::filesystem::path IndexPath = GetIndexPath(CasConfig.RootDirectory, "test");
- if (std::filesystem::is_regular_file(IndexPath))
- {
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CasDiskIndexHeader))
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
- CasDiskIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion &&
- Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
- {
- LogEntries.resize(Header.EntryCount);
- ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
- }
- }
- ObjectIndexFile.Close();
- std::filesystem::remove(IndexPath);
- }
-
- std::filesystem::path LogPath = GetLogPath(CasConfig.RootDirectory, "test");
- {
- TCasLogFile<CasDiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- LogEntries.reserve(CasLog.GetLogCount());
- CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
- }
- TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
- std::filesystem::path LegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test");
- LegacyCasLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate);
-
- for (const CasDiskIndexEntry& Entry : LogEntries)
- {
- BlockStoreLocation Location = Entry.Location.Get(16);
- LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size);
- LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key,
- .Location = LegacyLocation,
- .ContentType = Entry.ContentType,
- .Flags = Entry.Flags};
- LegacyCasLog.Append(LegacyEntry);
- }
- LegacyCasLog.Close();
-
- std::filesystem::remove_all(CasConfig.RootDirectory / "test");
-
- {
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 2048, 16, false);
-
- for (size_t i = 0; i < ChunkCount; i += 2)
- {
- CHECK(Cas.HaveChunk(ChunkHashes[i]));
- CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
- CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
- }
- }
-}
-
TEST_CASE("compactcas.threadedinsert")
{
// for (uint32_t i = 0; i < 100; ++i)
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
-
- CreateDirectories(CasConfig.RootDirectory);
-
const uint64_t kChunkSize = 1048;
const int32_t kChunkCount = 4096;
uint64_t ExpectedSize = 0;
@@ -1724,9 +1324,9 @@ TEST_CASE("compactcas.threadedinsert")
std::atomic<size_t> WorkCompleted = 0;
WorkerThreadPool ThreadPool(4);
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 32768, 16, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 32768, 16, true);
{
for (const auto& Chunk : Chunks)
{
@@ -1838,10 +1438,10 @@ TEST_CASE("compactcas.threadedinsert")
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepHashes);
+ GcCtx.AddRetainedCids(KeepHashes);
Cas.CollectGarbage(GcCtx);
- CasChunkSet& Deleted = GcCtx.DeletedCas();
- Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
while (WorkCompleted < NewChunks.size() + Chunks.size())
@@ -1879,10 +1479,10 @@ TEST_CASE("compactcas.threadedinsert")
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepHashes);
+ GcCtx.AddRetainedCids(KeepHashes);
Cas.CollectGarbage(GcCtx);
- CasChunkSet& Deleted = GcCtx.DeletedCas();
- Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
{
WorkCompleted = 0;
@@ -1902,53 +1502,6 @@ TEST_CASE("compactcas.threadedinsert")
}
}
-TEST_CASE("compactcas.migrate.large.data") // * doctest::skip(true))
-{
- if (true)
- {
- return;
- }
- const char* BigDataPath = "D:\\zen-data\\dc4-zen-cache-t\\cas";
- std::filesystem::path TobsBasePath = GetBasePath(BigDataPath, "tobs");
- std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs");
- std::filesystem::remove_all(TobsBasePath);
- std::filesystem::remove_all(SobsBasePath);
-
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = BigDataPath;
- uint64_t TObsSize = 0;
- {
- CasGc TobsCasGc;
- CasContainerStrategy TobsCas(CasConfig, TobsCasGc);
- TobsCas.Initialize("tobs", 1u << 28, 16, false);
- TObsSize = TobsCas.StorageSize().DiskSize;
- CHECK(TObsSize > 0);
- }
-
- uint64_t SObsSize = 0;
- {
- CasGc SobsCasGc;
- CasContainerStrategy SobsCas(CasConfig, SobsCasGc);
- SobsCas.Initialize("sobs", 1u << 30, 4096, false);
- SObsSize = SobsCas.StorageSize().DiskSize;
- CHECK(SObsSize > 0);
- }
-
- CasGc TobsCasGc;
- CasContainerStrategy TobsCas(CasConfig, TobsCasGc);
- TobsCas.Initialize("tobs", 1u << 28, 16, false);
- GcContext TobsGcCtx;
- TobsCas.CollectGarbage(TobsGcCtx);
- CHECK(TobsCas.StorageSize().DiskSize == TObsSize);
-
- CasGc SobsCasGc;
- CasContainerStrategy SobsCas(CasConfig, SobsCasGc);
- SobsCas.Initialize("sobs", 1u << 30, 4096, false);
- GcContext SobsGcCtx;
- SobsCas.CollectGarbage(SobsGcCtx);
- CHECK(SobsCas.StorageSize().DiskSize == SObsSize);
-}
-
#endif
void
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
index 114a6a48c..2acac7ca3 100644
--- a/zenstore/compactcas.h
+++ b/zenstore/compactcas.h
@@ -4,10 +4,11 @@
#include <zencore/zencore.h>
#include <zenstore/blockstore.h>
-#include <zenstore/cas.h>
#include <zenstore/caslog.h>
#include <zenstore/gc.h>
+#include "cas.h"
+
#include <atomic>
#include <limits>
#include <unordered_map>
@@ -47,14 +48,18 @@ static_assert(sizeof(CasDiskIndexEntry) == 32);
struct CasContainerStrategy final : public GcStorage
{
- CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc);
+ CasContainerStrategy(GcManager& Gc);
~CasContainerStrategy();
CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
- void FilterChunks(CasChunkSet& InOutChunks);
- void Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore);
+ void FilterChunks(HashKeySet& InOutChunks);
+ void Initialize(const std::filesystem::path& RootDirectory,
+ const std::string_view ContainerBaseName,
+ uint32_t MaxBlockSize,
+ uint64_t Alignment,
+ bool IsNewStore);
void Flush();
void Scrub(ScrubContext& Ctx);
virtual void CollectGarbage(GcContext& GcCtx) override;
@@ -65,12 +70,11 @@ private:
void MakeIndexSnapshot();
uint64_t ReadIndexFile();
uint64_t ReadLog(uint64_t SkipEntryCount);
- uint64_t MigrateLegacyData(bool CleanSource);
void OpenContainer(bool IsNewStore);
spdlog::logger& Log() { return m_Log; }
- const CasStoreConfiguration& m_Config;
+ std::filesystem::path m_RootDirectory;
spdlog::logger& m_Log;
uint64_t m_PayloadAlignment = 1u << 4;
uint64_t m_MaxBlockSize = 1u << 28;
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index d074a906f..23e3f4cd8 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -2,6 +2,7 @@
#include "filecas.h"
+#include <zencore/compress.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
@@ -16,6 +17,7 @@
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
#include <zenstore/gc.h>
+#include <zenstore/scrubcontext.h>
#if ZEN_WITH_TESTS
# include <zencore/compactbinarybuilder.h>
@@ -71,10 +73,7 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo
//////////////////////////////////////////////////////////////////////////
-FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
-: GcStorage(Gc)
-, m_Config(Config)
-, m_Log(logging::Get("filecas"))
+FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas"))
{
}
@@ -83,17 +82,19 @@ FileCasStrategy::~FileCasStrategy()
}
void
-FileCasStrategy::Initialize(bool IsNewStore)
+FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore)
{
m_IsInitialized = true;
- CreateDirectories(m_Config.RootDirectory);
+ m_RootDirectory = RootDirectory;
- m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ CreateDirectories(m_RootDirectory);
+
+ m_CasLog.Open(m_RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
- ZEN_INFO("read log {} containing {}", m_Config.RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed)));
+ ZEN_INFO("read log {} containing {}", m_RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed)));
});
std::unordered_set<IoHash> FoundEntries;
@@ -127,13 +128,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
{
ZEN_ASSERT(m_IsInitialized);
+#if !ZEN_WITH_TESTS
+ ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
+#endif
+
// File-based chunks have special case handling whereby we move the file into
// place in the file store directory, thus avoiding unnecessary copying
IoBufferFileReference FileRef;
if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef))
{
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
RwLock::ExclusiveLockScope _(LockForHash(ChunkHash));
@@ -340,7 +345,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
{
ZEN_ASSERT(m_IsInitialized);
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
// See if file already exists
//
@@ -485,7 +490,7 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash)
{
ZEN_ASSERT(m_IsInitialized);
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -497,7 +502,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
ZEN_ASSERT(m_IsInitialized);
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -513,7 +518,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
void
FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
{
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec));
if (Ec)
@@ -534,7 +539,7 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
}
void
-FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
+FileCasStrategy::FilterChunks(HashKeySet& InOutChunks)
{
ZEN_ASSERT(m_IsInitialized);
@@ -546,7 +551,7 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
// a caller, this is something which needs to be taken into account by anyone consuming
// this functionality in any case
- InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
+ InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
}
void
@@ -602,12 +607,12 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile&
const std::filesystem::path& RootDirectory;
std::function<void(const IoHash& Hash, BasicFile& PayloadFile)> Callback;
- } CasVisitor{m_Config.RootDirectory};
+ } CasVisitor{m_RootDirectory};
CasVisitor.Callback = std::move(Callback);
FileSystemTraversal Traversal;
- Traversal.TraverseFileSystem(m_Config.RootDirectory, CasVisitor);
+ Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor);
}
void
@@ -630,21 +635,34 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
{
ZEN_ASSERT(m_IsInitialized);
- std::vector<IoHash> BadHashes;
- std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
+ std::vector<IoHash> BadHashes;
+ uint64_t ChunkCount{0}, ChunkBytes{0};
IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
+ ++ChunkCount;
+ ChunkBytes += Payload.FileSize();
+
+ IoBuffer Buffer(IoBuffer::BorrowedFile, Payload.Handle(), 0, Payload.FileSize());
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed)
+ {
+ if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash)
+ {
+ // Hash mismatch
+ BadHashes.push_back(Hash);
+ return;
+ }
+ return;
+ }
+#if ZEN_WITH_TESTS
IoHashStream Hasher;
- Payload.StreamFile([&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
+ Payload.StreamByteRange(0, Payload.FileSize(), [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
IoHash ComputedHash = Hasher.GetHash();
-
- if (ComputedHash != Hash)
+ if (ComputedHash == Hash)
{
- BadHashes.push_back(Hash);
+ return;
}
-
- ++ChunkCount;
- ChunkBytes.fetch_add(Payload.FileSize());
+#endif
+ BadHashes.push_back(Hash);
});
Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
@@ -670,9 +688,12 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
}
}
- Ctx.ReportBadCasChunks(BadHashes);
+ // Let whomever it concerns know about the bad chunks. This could
+ // be used to invalidate higher level data structures more efficiently
+ // than a full validation pass might be able to do
+ Ctx.ReportBadCidChunks(BadHashes);
- ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes));
+ ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
}
void
@@ -680,7 +701,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
{
ZEN_ASSERT(m_IsInitialized);
- ZEN_INFO("collecting garbage from {}", m_Config.RootDirectory);
+ ZEN_INFO("collecting garbage from {}", m_RootDirectory);
std::vector<IoHash> ChunksToDelete;
std::atomic<uint64_t> ChunksToDeleteBytes{0};
@@ -694,7 +715,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
Stopwatch TotalTimer;
const auto _ = MakeGuard([&] {
ZEN_INFO("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}",
- m_Config.RootDirectory,
+ m_RootDirectory,
NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
DeletedCount,
ChunkCount,
@@ -706,7 +727,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
bool KeepThis = false;
CandidateCas.clear();
CandidateCas.push_back(Hash);
- GcCtx.FilterCas(CandidateCas, [&](const IoHash& Hash) {
+ GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) {
ZEN_UNUSED(Hash);
KeepThis = true;
});
@@ -725,12 +746,12 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
if (ChunksToDelete.empty())
{
- ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_Config.RootDirectory);
+ ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory);
return;
}
ZEN_INFO("deleting file CAS garbage for '{}': {} out of {} chunks ({})",
- m_Config.RootDirectory,
+ m_RootDirectory,
ChunksToDelete.size(),
ChunkCount.load(),
NiceBytes(ChunksToDeleteBytes));
@@ -751,13 +772,13 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
if (Ec)
{
- ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_Config.RootDirectory, Hash, Ec.message());
+ ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message());
continue;
}
DeletedCount++;
}
- GcCtx.DeletedCas(ChunksToDelete);
+ GcCtx.AddDeletedCids(ChunksToDelete);
}
//////////////////////////////////////////////////////////////////////////
@@ -769,13 +790,10 @@ TEST_CASE("cas.file.move")
// specifying an absolute path here can be helpful when using procmon to dig into things
ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
- CasGc Gc;
+ GcManager Gc;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path() / "cas";
-
- FileCasStrategy FileCas(CasConfig, Gc);
- FileCas.Initialize(/* IsNewStore */ true);
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
{
std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
@@ -850,12 +868,9 @@ TEST_CASE("cas.file.gc")
// specifying an absolute path here can be helpful when using procmon to dig into things
ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path() / "cas";
-
- CasGc Gc;
- FileCasStrategy FileCas(CasConfig, Gc);
- FileCas.Initialize(/* IsNewStore */ true);
+ GcManager Gc;
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
const int kIterationCount = 1000;
std::vector<IoHash> Keys{kIterationCount};
@@ -903,7 +918,7 @@ TEST_CASE("cas.file.gc")
{
if (Key.Hash[0] & 1)
{
- Ctx.ContributeCas(std::vector<IoHash>{Key});
+ Ctx.AddRetainedCids(std::vector<IoHash>{Key});
}
}
diff --git a/zenstore/filecas.h b/zenstore/filecas.h
index ef67ae9eb..f14e5d057 100644
--- a/zenstore/filecas.h
+++ b/zenstore/filecas.h
@@ -8,10 +8,11 @@
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/thread.h>
-#include <zenstore/cas.h>
#include <zenstore/caslog.h>
#include <zenstore/gc.h>
+#include "cas.h"
+
#include <atomic>
#include <functional>
@@ -28,28 +29,28 @@ class BasicFile;
struct FileCasStrategy final : public GcStorage
{
- FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc);
+ FileCasStrategy(GcManager& Gc);
~FileCasStrategy();
- void Initialize(bool IsNewStore);
+ void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore);
CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
- void FilterChunks(CasChunkSet& InOutChunks);
+ void FilterChunks(HashKeySet& InOutChunks);
void Flush();
void Scrub(ScrubContext& Ctx);
virtual void CollectGarbage(GcContext& GcCtx) override;
virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; }
private:
- const CasStoreConfiguration& m_Config;
- RwLock m_Lock;
- RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
- spdlog::logger& m_Log;
- spdlog::logger& Log() { return m_Log; }
- std::atomic_uint64_t m_TotalSize{};
- bool m_IsInitialized = false;
+ std::filesystem::path m_RootDirectory;
+ RwLock m_Lock;
+ RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
+ spdlog::logger& m_Log;
+ spdlog::logger& Log() { return m_Log; }
+ std::atomic_uint64_t m_TotalSize{};
+ bool m_IsInitialized = false;
struct FileCasIndexEntry
{
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index bb03b9751..0902abf4a 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -14,9 +14,10 @@
#include <zencore/testing.h>
#include <zencore/testutils.h>
#include <zencore/timer.h>
-#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
+#include "cas.h"
+
#include <fmt/format.h>
#include <filesystem>
@@ -173,9 +174,8 @@ struct GcContext::GcState
using CacheKeyContexts = std::unordered_map<std::string, std::vector<IoHash>>;
CacheKeyContexts m_ExpiredCacheKeys;
- CasChunkSet m_CasChunks;
- CasChunkSet m_DeletedCasChunks;
- CasChunkSet m_CidChunks;
+ HashKeySet m_RetainedCids;
+ HashKeySet m_DeletedCids;
GcClock::TimePoint m_GcTime;
GcClock::Duration m_MaxCacheDuration = std::chrono::hours(24);
bool m_DeletionMode = true;
@@ -194,19 +194,13 @@ GcContext::~GcContext()
}
void
-GcContext::ContributeCids(std::span<const IoHash> Cids)
-{
- m_State->m_CidChunks.AddChunksToSet(Cids);
-}
-
-void
-GcContext::ContributeCas(std::span<const IoHash> Cas)
+GcContext::AddRetainedCids(std::span<const IoHash> Cids)
{
- m_State->m_CasChunks.AddChunksToSet(Cas);
+ m_State->m_RetainedCids.AddHashesToSet(Cids);
}
void
-GcContext::ContributeCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys)
+GcContext::SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys)
{
m_State->m_ExpiredCacheKeys[CacheKeyContext] = std::move(ExpiredKeys);
}
@@ -214,37 +208,31 @@ GcContext::ContributeCacheKeys(const std::string& CacheKeyContext, std::vector<I
void
GcContext::IterateCids(std::function<void(const IoHash&)> Callback)
{
- m_State->m_CidChunks.IterateChunks([&](const IoHash& Hash) { Callback(Hash); });
+ m_State->m_RetainedCids.IterateHashes([&](const IoHash& Hash) { Callback(Hash); });
}
void
GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc)
{
- m_State->m_CidChunks.FilterChunks(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); });
+ m_State->m_RetainedCids.FilterHashes(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); });
}
void
-GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc)
+GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc)
{
- m_State->m_CasChunks.FilterChunks(Cas, [&](const IoHash& Hash) { KeepFunc(Hash); });
+ m_State->m_RetainedCids.FilterHashes(Cid, std::move(FilterFunc));
}
void
-GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&, bool)>&& FilterFunc)
+GcContext::AddDeletedCids(std::span<const IoHash> Cas)
{
- m_State->m_CasChunks.FilterChunks(Cas, std::move(FilterFunc));
+ m_State->m_DeletedCids.AddHashesToSet(Cas);
}
-void
-GcContext::DeletedCas(std::span<const IoHash> Cas)
+const HashKeySet&
+GcContext::DeletedCids()
{
- m_State->m_DeletedCasChunks.AddChunksToSet(Cas);
-}
-
-CasChunkSet&
-GcContext::DeletedCas()
-{
- return m_State->m_DeletedCasChunks;
+ return m_State->m_DeletedCids;
}
std::span<const IoHash>
@@ -318,7 +306,7 @@ GcContext::ClaimGCReserve()
//////////////////////////////////////////////////////////////////////////
-GcContributor::GcContributor(CasGc& Gc) : m_Gc(Gc)
+GcContributor::GcContributor(GcManager& Gc) : m_Gc(Gc)
{
m_Gc.AddGcContributor(this);
}
@@ -330,7 +318,7 @@ GcContributor::~GcContributor()
//////////////////////////////////////////////////////////////////////////
-GcStorage::GcStorage(CasGc& Gc) : m_Gc(Gc)
+GcStorage::GcStorage(GcManager& Gc) : m_Gc(Gc)
{
m_Gc.AddGcStorage(this);
}
@@ -342,30 +330,30 @@ GcStorage::~GcStorage()
//////////////////////////////////////////////////////////////////////////
-CasGc::CasGc()
+GcManager::GcManager()
{
}
-CasGc::~CasGc()
+GcManager::~GcManager()
{
}
void
-CasGc::AddGcContributor(GcContributor* Contributor)
+GcManager::AddGcContributor(GcContributor* Contributor)
{
RwLock::ExclusiveLockScope _(m_Lock);
m_GcContribs.push_back(Contributor);
}
void
-CasGc::RemoveGcContributor(GcContributor* Contributor)
+GcManager::RemoveGcContributor(GcContributor* Contributor)
{
RwLock::ExclusiveLockScope _(m_Lock);
std::erase_if(m_GcContribs, [&](GcContributor* $) { return $ == Contributor; });
}
void
-CasGc::AddGcStorage(GcStorage* Storage)
+GcManager::AddGcStorage(GcStorage* Storage)
{
ZEN_ASSERT(Storage != nullptr);
RwLock::ExclusiveLockScope _(m_Lock);
@@ -373,14 +361,14 @@ CasGc::AddGcStorage(GcStorage* Storage)
}
void
-CasGc::RemoveGcStorage(GcStorage* Storage)
+GcManager::RemoveGcStorage(GcStorage* Storage)
{
RwLock::ExclusiveLockScope _(m_Lock);
std::erase_if(m_GcStorage, [&](GcStorage* $) { return $ == Storage; });
}
void
-CasGc::CollectGarbage(GcContext& GcCtx)
+GcManager::CollectGarbage(GcContext& GcCtx)
{
RwLock::SharedLockScope _(m_Lock);
@@ -394,36 +382,6 @@ CasGc::CollectGarbage(GcContext& GcCtx)
}
}
- // Cache records reference CAS chunks with the uncompressed
- // raw hash (Cid). Map the content ID to CAS hash to enable
- // the CAS storage backends to filter valid chunks.
-
- if (CidStore* CidStore = m_CidStore)
- {
- std::vector<IoHash> CasHashes;
- uint64_t UnknownChunks = 0;
-
- GcCtx.IterateCids([&](const IoHash& Cid) {
- IoHash Cas = CidStore->RemapCid(Cid);
-
- if (Cas == IoHash::Zero)
- {
- ++UnknownChunks;
- }
- else
- {
- CasHashes.push_back(Cas);
- }
- });
-
- if (UnknownChunks)
- {
- ZEN_WARN("found {} unknown CIDs", UnknownChunks);
- }
-
- GcCtx.ContributeCas(CasHashes);
- }
-
// Then trim storage
{
@@ -434,61 +392,48 @@ CasGc::CollectGarbage(GcContext& GcCtx)
Storage->CollectGarbage(GcCtx);
}
}
+}
+
+GcStorageSize
+GcManager::TotalStorageSize() const
+{
+ RwLock::SharedLockScope _(m_Lock);
- // Remove Cid to CAS hash mappings. Scrub?
+ GcStorageSize TotalSize;
- if (CidStore* CidStore = m_CidStore)
+ for (GcStorage* Storage : m_GcStorage)
{
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] { ZEN_INFO("clean up deleted content ids in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
- CidStore->RemoveCids(GcCtx.DeletedCas());
+ const auto Size = Storage->StorageSize();
+ TotalSize.DiskSize += Size.DiskSize;
+ TotalSize.MemorySize += Size.MemorySize;
}
-}
-void
-CasGc::SetCidStore(CidStore* Cids)
-{
- m_CidStore = Cids;
+ return TotalSize;
}
+#if ZEN_USE_REF_TRACKING
void
-CasGc::OnNewCidReferences(std::span<IoHash> Hashes)
+GcManager::OnNewCidReferences(std::span<IoHash> Hashes)
{
ZEN_UNUSED(Hashes);
}
void
-CasGc::OnCommittedCidReferences(std::span<IoHash> Hashes)
+GcManager::OnCommittedCidReferences(std::span<IoHash> Hashes)
{
ZEN_UNUSED(Hashes);
}
void
-CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes)
+GcManager::OnDroppedCidReferences(std::span<IoHash> Hashes)
{
ZEN_UNUSED(Hashes);
}
-
-GcStorageSize
-CasGc::TotalStorageSize() const
-{
- RwLock::SharedLockScope _(m_Lock);
-
- GcStorageSize TotalSize;
-
- for (GcStorage* Storage : m_GcStorage)
- {
- const auto Size = Storage->StorageSize();
- TotalSize.DiskSize += Size.DiskSize;
- TotalSize.MemorySize += Size.MemorySize;
- }
-
- return TotalSize;
-}
+#endif
//////////////////////////////////////////////////////////////////////////
-GcScheduler::GcScheduler(CasGc& CasGc) : m_Log(logging::Get("gc")), m_CasGc(CasGc)
+GcScheduler::GcScheduler(GcManager& GcManager) : m_Log(logging::Get("gc")), m_GcManager(GcManager)
{
}
@@ -606,7 +551,7 @@ GcScheduler::SchedulerThread()
{
std::error_code Ec;
DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec);
- GcStorageSize TotalSize = m_CasGc.TotalStorageSize();
+ GcStorageSize TotalSize = m_GcManager.TotalStorageSize();
std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now());
if (RemaingTime < std::chrono::seconds::zero())
@@ -668,7 +613,7 @@ GcScheduler::SchedulerThread()
Stopwatch Timer;
const auto __ = MakeGuard([&] { ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
- m_CasGc.CollectGarbage(GcCtx);
+ m_GcManager.CollectGarbage(GcCtx);
m_LastGcTime = GcClock::Now();
m_NextGcTime = NextGcTime(m_LastGcTime);
@@ -745,38 +690,37 @@ TEST_CASE("gc.basic")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
+ CidStoreConfiguration CasConfig;
CasConfig.RootDirectory = TempDir.Path() / "cas";
- CasGc Gc;
- std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc);
- CidStore CidStore{*CasStore, TempDir.Path() / "cid"};
+ GcManager Gc;
+ CidStore CidStore(Gc);
- CasStore->Initialize(CasConfig);
- Gc.SetCidStore(&CidStore);
+ CidStore.Initialize(CasConfig);
IoBuffer Chunk = CreateChunk(128);
auto CompressedChunk = Compress(Chunk);
const auto InsertResult = CidStore.AddChunk(CompressedChunk);
+ CHECK(InsertResult.New);
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
- CasStore->Flush();
+ CidStore.Flush();
Gc.CollectGarbage(GcCtx);
- CHECK(!CidStore.ContainsChunk(InsertResult.DecompressedId));
+ CHECK(!CidStore.ContainsChunk(IoHash::FromBLAKE3(CompressedChunk.GetRawHash())));
}
TEST_CASE("gc.full")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
+ CidStoreConfiguration CasConfig;
CasConfig.RootDirectory = TempDir.Path() / "cas";
- CasGc Gc;
+ GcManager Gc;
std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc);
CasStore->Initialize(CasConfig);
@@ -813,7 +757,7 @@ TEST_CASE("gc.full")
CasStore->InsertChunk(Chunks[7], ChunkHashes[7]);
CasStore->InsertChunk(Chunks[8], ChunkHashes[8]);
- CasStoreSize InitialSize = CasStore->TotalSize();
+ CidStoreSize InitialSize = CasStore->TotalSize();
// Keep first and last
{
@@ -823,7 +767,7 @@ TEST_CASE("gc.full")
std::vector<IoHash> KeepChunks;
KeepChunks.push_back(ChunkHashes[0]);
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
CasStore->Flush();
Gc.CollectGarbage(GcCtx);
@@ -856,7 +800,7 @@ TEST_CASE("gc.full")
GcCtx.CollectSmallObjects(true);
std::vector<IoHash> KeepChunks;
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
CasStore->Flush();
Gc.CollectGarbage(GcCtx);
@@ -890,7 +834,7 @@ TEST_CASE("gc.full")
KeepChunks.push_back(ChunkHashes[1]);
KeepChunks.push_back(ChunkHashes[4]);
KeepChunks.push_back(ChunkHashes[7]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
CasStore->Flush();
Gc.CollectGarbage(GcCtx);
@@ -925,7 +869,7 @@ TEST_CASE("gc.full")
KeepChunks.push_back(ChunkHashes[6]);
KeepChunks.push_back(ChunkHashes[7]);
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
CasStore->Flush();
Gc.CollectGarbage(GcCtx);
diff --git a/zenstore/hashkeyset.cpp b/zenstore/hashkeyset.cpp
new file mode 100644
index 000000000..a5436f5cb
--- /dev/null
+++ b/zenstore/hashkeyset.cpp
@@ -0,0 +1,60 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/hashkeyset.h>
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+void
+HashKeySet::AddHashToSet(const IoHash& HashToAdd)
+{
+ m_HashSet.insert(HashToAdd);
+}
+
+void
+HashKeySet::AddHashesToSet(std::span<const IoHash> HashesToAdd)
+{
+ m_HashSet.insert(HashesToAdd.begin(), HashesToAdd.end());
+}
+
+void
+HashKeySet::RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate)
+{
+ for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd;)
+ {
+ if (Predicate(*It))
+ {
+ It = m_HashSet.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
+HashKeySet::IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const
+{
+ for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd; ++It)
+ {
+ Callback(*It);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Testing related code follows...
+//
+
+#if ZEN_WITH_TESTS
+
+void
+hashkeyset_forcelink()
+{
+}
+
+#endif
+
+} // namespace zen
diff --git a/zenstore/include/zenstore/cas.h b/zenstore/include/zenstore/cas.h
deleted file mode 100644
index 5592fbd0a..000000000
--- a/zenstore/include/zenstore/cas.h
+++ /dev/null
@@ -1,144 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "zenstore.h"
-
-#include <zencore/blake3.h>
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-#include <zencore/refcount.h>
-#include <zencore/timer.h>
-
-#include <atomic>
-#include <filesystem>
-#include <functional>
-#include <memory>
-#include <string>
-#include <unordered_set>
-
-namespace zen {
-
-class GcContext;
-class CasGc;
-
-struct CasStoreConfiguration
-{
- // Root directory for CAS store
- std::filesystem::path RootDirectory;
-
- // Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy
- uint64_t TinyValueThreshold = 1024;
-
- // Threshold above which values are considered 'huge' and managed using the 'huge values' strategy
- uint64_t HugeValueThreshold = 1024 * 1024;
-};
-
-/** Manage a set of IoHash values
- */
-
-class CasChunkSet
-{
-public:
- void AddChunkToSet(const IoHash& HashToAdd);
- void AddChunksToSet(std::span<const IoHash> HashesToAdd);
- void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate);
- void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback);
- [[nodiscard]] inline bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); }
- [[nodiscard]] inline bool IsEmpty() const { return m_ChunkSet.empty(); }
- [[nodiscard]] inline size_t GetSize() const { return m_ChunkSet.size(); }
-
- inline void FilterChunks(std::span<const IoHash> Candidates, Invocable<const IoHash&> auto MatchFunc)
- {
- for (const IoHash& Candidate : Candidates)
- {
- if (ContainsChunk(Candidate))
- {
- MatchFunc(Candidate);
- }
- }
- }
-
- inline void FilterChunks(std::span<const IoHash> Candidates, Invocable<const IoHash&, bool> auto MatchFunc)
- {
- for (const IoHash& Candidate : Candidates)
- {
- MatchFunc(Candidate, ContainsChunk(Candidate));
- }
- }
-
-private:
- // Q: should we protect this with a lock, or is that a higher level concern?
- std::unordered_set<IoHash, IoHash::Hasher> m_ChunkSet;
-};
-
-/** Context object for data scrubbing
- *
- * Data scrubbing is when we traverse stored data to validate it and
- * optionally correct/recover
- */
-
-class ScrubContext
-{
-public:
- virtual void ReportBadCasChunks(std::span<IoHash> BadCasChunks);
- inline uint64_t ScrubTimestamp() const { return m_ScrubTime; }
- inline bool RunRecovery() const { return m_Recover; }
- void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes);
-
- inline uint64_t ScrubbedChunks() const { return m_ChunkCount; }
- inline uint64_t ScrubbedBytes() const { return m_ByteCount; }
-
-private:
- uint64_t m_ScrubTime = GetHifreqTimerValue();
- bool m_Recover = true;
- std::atomic<uint64_t> m_ChunkCount{0};
- std::atomic<uint64_t> m_ByteCount{0};
- CasChunkSet m_BadCas;
- CasChunkSet m_BadCid;
-};
-
-struct CasStoreSize
-{
- uint64_t TinySize{};
- uint64_t SmallSize{};
- uint64_t LargeSize{};
- uint64_t TotalSize{};
-};
-
-/** Content Addressable Storage interface
-
- */
-
-class CasStore
-{
-public:
- virtual ~CasStore() = default;
-
- const CasStoreConfiguration& Config() { return m_Config; }
-
- struct InsertResult
- {
- bool New = false;
- };
-
- virtual void Initialize(const CasStoreConfiguration& Config) = 0;
- virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash) = 0;
- virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
- virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
- virtual void FilterChunks(CasChunkSet& InOutChunks) = 0;
- virtual void Flush() = 0;
- virtual void Scrub(ScrubContext& Ctx) = 0;
- virtual void GarbageCollect(GcContext& GcCtx) = 0;
- virtual CasStoreSize TotalSize() const = 0;
-
-protected:
- CasStoreConfiguration m_Config;
- uint64_t m_LastScrubTime = 0;
-};
-
-ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(CasGc& Gc);
-
-void CAS_forcelink();
-
-} // namespace zen
diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h
index 4b93a708f..c56b653fc 100644
--- a/zenstore/include/zenstore/caslog.h
+++ b/zenstore/include/zenstore/caslog.h
@@ -2,8 +2,6 @@
#pragma once
-#include "zenstore.h"
-
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h
index b0252a2a6..21e3c3160 100644
--- a/zenstore/include/zenstore/cidstore.h
+++ b/zenstore/include/zenstore/cidstore.h
@@ -5,7 +5,7 @@
#include "zenstore.h"
#include <zencore/iohash.h>
-#include <zenstore/cas.h>
+#include <zenstore/hashkeyset.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
@@ -15,53 +15,68 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+class GcManager;
class CasStore;
class CompressedBuffer;
class IoBuffer;
+class ScrubContext;
/** Content Store
*
- * Data in the content store is referenced by content identifiers (CIDs), rather than their
- * literal hash. This class maps uncompressed hashes to compressed hashes and may
+ * Data in the content store is referenced by content identifiers (CIDs), it works
+ * with compressed buffers so the CID is expected to be the RAW hash. It stores the
+ * chunk directly under the RAW hash.
+ * This class maps uncompressed hashes (CIDs) to compressed hashes and may
* be used to deal with other kinds of indirections in the future. For example, if we want
* to support chunking then a CID may represent a list of chunks which could be concatenated
* to form the referenced chunk.
*
- * It would likely be possible to implement this mapping in a more efficient way if we
- * integrate it into the CAS store itself, so we can avoid maintaining copies of large
- * hashes in multiple locations. This would also allow us to consolidate commit logs etc
- * which would be more resilient than the current split log scheme
- *
*/
+
+struct CidStoreSize
+{
+ uint64_t TinySize = 0;
+ uint64_t SmallSize = 0;
+ uint64_t LargeSize = 0;
+ uint64_t TotalSize = 0;
+};
+
+struct CidStoreConfiguration
+{
+ // Root directory for CAS store
+ std::filesystem::path RootDirectory;
+
+ // Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy
+ uint64_t TinyValueThreshold = 1024;
+
+ // Threshold above which values are considered 'huge' and managed using the 'huge values' strategy
+ uint64_t HugeValueThreshold = 1024 * 1024;
+};
+
class CidStore
{
public:
- CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir);
+ CidStore(GcManager& Gc);
~CidStore();
struct InsertResult
{
- IoHash DecompressedId;
- IoHash CompressedHash;
- bool New = false;
+ bool New = false;
};
- InsertResult AddChunk(CompressedBuffer& ChunkData);
- void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed);
+ void Initialize(const CidStoreConfiguration& Config);
+ InsertResult AddChunk(const CompressedBuffer& ChunkData);
IoBuffer FindChunkByCid(const IoHash& DecompressedId);
bool ContainsChunk(const IoHash& DecompressedId);
+ void FilterChunks(HashKeySet& InOutChunks);
void Flush();
void Scrub(ScrubContext& Ctx);
- void RemoveCids(CasChunkSet& CasChunks);
- CasStoreSize CasSize() const;
-
- // TODO: add batch filter support
-
- IoHash RemapCid(const IoHash& DecompressedId);
+ CidStoreSize TotalSize() const;
private:
struct Impl;
- std::unique_ptr<Impl> m_Impl;
+ std::unique_ptr<CasStore> m_CasStore;
+ std::unique_ptr<Impl> m_Impl;
};
} // namespace zen
diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h
index 398025181..656e594af 100644
--- a/zenstore/include/zenstore/gc.h
+++ b/zenstore/include/zenstore/gc.h
@@ -22,8 +22,8 @@ class logger;
namespace zen {
-class CasChunkSet;
-class CasGc;
+class HashKeySet;
+class GcManager;
class CidStore;
struct IoHash;
@@ -50,18 +50,16 @@ public:
GcContext(GcClock::TimePoint Time = GcClock::Now());
~GcContext();
- void ContributeCids(std::span<const IoHash> Cid);
- void ContributeCas(std::span<const IoHash> Hash);
- void ContributeCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys);
+ void AddRetainedCids(std::span<const IoHash> Cid);
+ void SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys);
void IterateCids(std::function<void(const IoHash&)> Callback);
void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc);
- void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc);
- void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&, bool)>&& FilterFunc);
+ void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc);
- void DeletedCas(std::span<const IoHash> Cas);
- CasChunkSet& DeletedCas();
+ void AddDeletedCids(std::span<const IoHash> Cas);
+ const HashKeySet& DeletedCids();
std::span<const IoHash> ExpiredCacheKeys(const std::string& CacheKeyContext) const;
@@ -97,13 +95,13 @@ private:
class GcContributor
{
public:
- GcContributor(CasGc& Gc);
+ GcContributor(GcManager& Gc);
~GcContributor();
virtual void GatherReferences(GcContext& GcCtx) = 0;
protected:
- CasGc& m_Gc;
+ GcManager& m_Gc;
};
struct GcStorageSize
@@ -117,23 +115,23 @@ struct GcStorageSize
class GcStorage
{
public:
- GcStorage(CasGc& Gc);
+ GcStorage(GcManager& Gc);
~GcStorage();
virtual void CollectGarbage(GcContext& GcCtx) = 0;
virtual GcStorageSize StorageSize() const = 0;
private:
- CasGc& m_Gc;
+ GcManager& m_Gc;
};
/** GC orchestrator
*/
-class CasGc
+class GcManager
{
public:
- CasGc();
- ~CasGc();
+ GcManager();
+ ~GcManager();
void AddGcContributor(GcContributor* Contributor);
void RemoveGcContributor(GcContributor* Contributor);
@@ -143,12 +141,14 @@ public:
void CollectGarbage(GcContext& GcCtx);
- void SetCidStore(CidStore* Cids);
- void OnNewCidReferences(std::span<IoHash> Hashes);
- void OnCommittedCidReferences(std::span<IoHash> Hashes);
- void OnDroppedCidReferences(std::span<IoHash> Hashes);
GcStorageSize TotalStorageSize() const;
+#if ZEN_USE_REF_TRACKING
+ void OnNewCidReferences(std::span<IoHash> Hashes);
+ void OnCommittedCidReferences(std::span<IoHash> Hashes);
+ void OnDroppedCidReferences(std::span<IoHash> Hashes);
+#endif
+
private:
mutable RwLock m_Lock;
std::vector<GcContributor*> m_GcContribs;
@@ -180,7 +180,7 @@ struct GcSchedulerConfig
class GcScheduler
{
public:
- GcScheduler(CasGc& CasGc);
+ GcScheduler(GcManager& GcManager);
~GcScheduler();
void Initialize(const GcSchedulerConfig& Config);
@@ -201,7 +201,7 @@ private:
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
- CasGc& m_CasGc;
+ GcManager& m_GcManager;
GcSchedulerConfig m_Config;
GcClock::TimePoint m_LastGcTime{};
GcClock::TimePoint m_NextGcTime{};
diff --git a/zenstore/include/zenstore/hashkeyset.h b/zenstore/include/zenstore/hashkeyset.h
new file mode 100644
index 000000000..411a6256e
--- /dev/null
+++ b/zenstore/include/zenstore/hashkeyset.h
@@ -0,0 +1,54 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zenstore.h"
+
+#include <zencore/iohash.h>
+
+#include <functional>
+#include <unordered_set>
+
+namespace zen {
+
+/** Manage a set of IoHash values
+ */
+
+class HashKeySet
+{
+public:
+ void AddHashToSet(const IoHash& HashToAdd);
+ void AddHashesToSet(std::span<const IoHash> HashesToAdd);
+ void RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate);
+ void IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const;
+ [[nodiscard]] inline bool ContainsHash(const IoHash& Hash) const { return m_HashSet.find(Hash) != m_HashSet.end(); }
+ [[nodiscard]] inline bool IsEmpty() const { return m_HashSet.empty(); }
+ [[nodiscard]] inline size_t GetSize() const { return m_HashSet.size(); }
+
+ inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&> auto MatchFunc) const
+ {
+ for (const IoHash& Candidate : Candidates)
+ {
+ if (ContainsHash(Candidate))
+ {
+ MatchFunc(Candidate);
+ }
+ }
+ }
+
+ inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&, bool> auto MatchFunc) const
+ {
+ for (const IoHash& Candidate : Candidates)
+ {
+ MatchFunc(Candidate, ContainsHash(Candidate));
+ }
+ }
+
+private:
+ // Q: should we protect this with a lock, or is that a higher level concern?
+ std::unordered_set<IoHash, IoHash::Hasher> m_HashSet;
+};
+
+void hashkeyset_forcelink();
+
+} // namespace zen
diff --git a/zenstore/include/zenstore/scrubcontext.h b/zenstore/include/zenstore/scrubcontext.h
new file mode 100644
index 000000000..bf906492c
--- /dev/null
+++ b/zenstore/include/zenstore/scrubcontext.h
@@ -0,0 +1,40 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/timer.h>
+
+namespace zen {
+
+/** Context object for data scrubbing
+ *
+ * Data scrubbing is when we traverse stored data to validate it and
+ * optionally correct/recover
+ */
+
+class ScrubContext
+{
+public:
+ virtual void ReportBadCidChunks(std::span<IoHash> BadCasChunks) { m_BadCid.AddHashesToSet(BadCasChunks); }
+ inline uint64_t ScrubTimestamp() const { return m_ScrubTime; }
+ inline bool RunRecovery() const { return m_Recover; }
+ void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
+ {
+ m_ChunkCount.fetch_add(ChunkCount);
+ m_ByteCount.fetch_add(ChunkBytes);
+ }
+
+ inline uint64_t ScrubbedChunks() const { return m_ChunkCount; }
+ inline uint64_t ScrubbedBytes() const { return m_ByteCount; }
+
+ const HashKeySet BadCids() const { return m_BadCid; }
+
+private:
+ uint64_t m_ScrubTime = GetHifreqTimerValue();
+ bool m_Recover = true;
+ std::atomic<uint64_t> m_ChunkCount{0};
+ std::atomic<uint64_t> m_ByteCount{0};
+ HashKeySet m_BadCid;
+};
+
+} // namespace zen
diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp
index 5f40b7f60..836cdf691 100644
--- a/zenstore/zenstore.cpp
+++ b/zenstore/zenstore.cpp
@@ -4,8 +4,10 @@
#include <zenstore/basicfile.h>
#include <zenstore/blockstore.h>
-#include <zenstore/cas.h>
#include <zenstore/gc.h>
+#include <zenstore/hashkeyset.h>
+
+#include "cas.h"
#include "compactcas.h"
#include "filecas.h"
@@ -20,6 +22,7 @@ zenstore_forcelinktests()
blockstore_forcelink();
compactcas_forcelink();
gc_forcelink();
+ hashkeyset_forcelink();
}
} // namespace zen