aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp395
1 files changed, 360 insertions, 35 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 612f87c7c..f3fcbca28 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -4,22 +4,39 @@
#include "CompactCas.h"
+#include <zencore/compactbinarybuilder.h>
#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/memory.h>
#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
+#include <zenstore/gc.h>
+
#include <filesystem>
#include <functional>
#include <gsl/gsl-lite.hpp>
+#if ZEN_WITH_TESTS
+# include <algorithm>
+# include <random>
+#endif
+
//////////////////////////////////////////////////////////////////////////
namespace zen {
-CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config) : m_Config(Config)
+using namespace fmt::literals;
+
+CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
+: GcStorage(Gc)
+, m_Config(Config)
+, m_Log(logging::Get("containercas"))
{
}
@@ -36,35 +53,9 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6
m_ContainerBaseName = ContainerBaseName;
m_PayloadAlignment = Alignment;
- std::string BaseName(ContainerBaseName);
- std::filesystem::path SobsPath = m_Config.RootDirectory / (BaseName + ".ucas");
- std::filesystem::path SidxPath = m_Config.RootDirectory / (BaseName + ".uidx");
- std::filesystem::path SlogPath = m_Config.RootDirectory / (BaseName + ".ulog");
-
- m_SmallObjectFile.Open(SobsPath, IsNewStore);
- m_SmallObjectIndex.Open(SidxPath, IsNewStore);
- m_CasLog.Open(SlogPath, IsNewStore);
-
- // TODO: should validate integrity of container files here
-
- uint64_t MaxFileOffset = 0;
-
- {
- // This is not technically necessary (nobody should be accessing us from
- // another thread at this stage) but may help static analysis
-
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ OpenContainer(IsNewStore);
- m_CasLog.Replay([&](const CasDiskIndexEntry& Record) {
- m_LocationMap[Record.Key] = Record.Location;
-
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset + Record.Location.Size);
- });
- }
-
- m_CurrentInsertOffset = (MaxFileOffset + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1);
- m_CurrentIndexOffset = m_SmallObjectIndex.FileSize();
- m_IsInitialized = true;
+ m_IsInitialized = true;
}
CasStore::InsertResult
@@ -91,12 +82,13 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
RwLock::ExclusiveLockScope __(m_LocationMapLock);
- CasDiskLocation Location{.Offset = InsertOffset, .Size = /* TODO FIX */ uint32_t(ChunkSize)};
+ const CasDiskLocation Location{InsertOffset, ChunkSize};
m_LocationMap[ChunkHash] = Location;
CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location};
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize));
m_CasLog.Append(IndexEntry);
return CasStore::InsertResult{.New = true};
@@ -116,7 +108,8 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
{
const CasDiskLocation& Location = KeyIt->second;
- return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.Offset, Location.Size);
+
+ return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize());
}
// Not found
@@ -187,11 +180,11 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
for (auto& Entry : m_LocationMap)
{
- const uint64_t EntryOffset = Entry.second.Offset;
+ const uint64_t EntryOffset = Entry.second.GetOffset();
if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
{
- const uint64_t EntryEnd = EntryOffset + Entry.second.Size;
+ const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize();
if (EntryEnd >= WindowEnd)
{
@@ -201,7 +194,8 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
}
const IoHash ComputedHash =
- IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, Entry.second.Size);
+ IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart,
+ Entry.second.GetSize());
if (Entry.first != ComputedHash)
{
@@ -222,7 +216,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
for (const CasDiskIndexEntry& Entry : BigChunks)
{
IoHashStream Hasher;
- m_SmallObjectFile.StreamByteRange(Entry.Location.Offset, Entry.Location.Size, [&](const void* Data, uint64_t Size) {
+ m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) {
Hasher.Append(Data, Size);
});
IoHash ComputedHash = Hasher.GetHash();
@@ -247,6 +241,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
for (const CasDiskIndexEntry& Entry : BadChunks)
{
BadChunkHashes.push_back(Entry.Key);
+ m_CasLog.Append({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone});
m_LocationMap.erase(Entry.Key);
}
@@ -258,6 +253,156 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
}
void
+CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
+{
+ namespace fs = std::filesystem;
+
+ // A naive garbage collection implementation that just copies evicted chunks
+ // into a new container file. We probably need to partition the container file
+ // into several parts to prevent needing to keep the entire container file during GC.
+
+ ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+
+ Flush();
+
+ std::vector<IoHash> Candidates;
+ std::vector<IoHash> Keep;
+ const uint64_t ChunkCount = m_LocationMap.size();
+ uint64_t TotalSize{};
+
+ Candidates.reserve(m_LocationMap.size());
+
+ for (auto& Entry : m_LocationMap)
+ {
+ Candidates.push_back(Entry.first);
+ TotalSize += Entry.second.GetSize();
+ }
+
+ Keep.reserve(Candidates.size());
+ GcCtx.FilterCas(Candidates, [&](const IoHash& Hash) { Keep.push_back(Hash); });
+
+ if (m_LocationMap.empty() || Keep.size() == m_LocationMap.size())
+ {
+ ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete",
+ ChunkCount,
+ NiceBytes(TotalSize),
+ m_Config.RootDirectory / m_ContainerBaseName);
+ return;
+ }
+
+ const uint64_t NewChunkCount = Keep.size();
+ uint64_t NewTotalSize = 0;
+
+ for (const IoHash& Key : Keep)
+ {
+ const CasDiskLocation& Loc = m_LocationMap[Key];
+ NewTotalSize += Loc.GetSize();
+ }
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space FAILED, reason '{}'", Error.message());
+ return;
+ }
+
+ if (Space.Free < NewTotalSize + (64 << 20))
+ {
+ ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ NiceBytes(NewTotalSize),
+ NiceBytes(Space.Free));
+ return;
+ }
+
+ const bool CollectSmallObjects = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+
+ if (!CollectSmallObjects)
+ {
+ ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ ChunkCount - NewChunkCount,
+ NiceBytes(TotalSize - NewTotalSize),
+ ChunkCount,
+ NiceBytes(TotalSize));
+ return;
+ }
+
+ fs::path TmpSobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ucas");
+ fs::path TmpSlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ulog");
+
+ {
+ ZEN_DEBUG("creating temporary container cas '{}'...", TmpSobsPath);
+
+ TCasLogFile<CasDiskIndexEntry> TmpLog;
+ BasicFile TmpObjectFile;
+ bool IsNew = true;
+
+ TmpLog.Open(TmpSlogPath, IsNew);
+ TmpObjectFile.Open(TmpSobsPath, IsNew);
+
+ std::vector<uint8_t> Chunk;
+ uint64_t NextInsertOffset{};
+
+ for (const IoHash& Key : Keep)
+ {
+ const auto Entry = m_LocationMap.find(Key);
+ const auto& Loc = Entry->second;
+
+ Chunk.resize(Loc.GetSize());
+ m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), Loc.GetOffset());
+
+ const uint64_t InsertOffset = NextInsertOffset;
+ TmpObjectFile.Write(Chunk.data(), Chunk.size(), InsertOffset);
+ TmpLog.Append({.Key = Key, .Location = {InsertOffset, Chunk.size()}});
+
+ NextInsertOffset = (NextInsertOffset + Chunk.size() + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1);
+ }
+ }
+
+ try
+ {
+ CloseContainer();
+
+ fs::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas");
+ fs::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx");
+ fs::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog");
+
+ fs::remove(SobsPath);
+ fs::remove(SidxPath);
+ fs::remove(SlogPath);
+
+ fs::rename(TmpSobsPath, SobsPath);
+ fs::rename(TmpSlogPath, SlogPath);
+
+ {
+ // Create a new empty index file
+ BasicFile SidxFile;
+ SidxFile.Open(SidxPath, true);
+ }
+
+ OpenContainer(false /* IsNewStore */);
+
+ ZEN_INFO("garbage collect from '{}' DONE, collected #{} {} chunks of total #{} {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ ChunkCount - NewChunkCount,
+ NiceBytes(TotalSize - NewTotalSize),
+ ChunkCount,
+ NiceBytes(TotalSize));
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what());
+
+ // Something went wrong, try create a new container
+ OpenContainer(true /* IsNewStore */);
+ }
+}
+
+void
CasContainerStrategy::MakeSnapshot()
{
RwLock::SharedLockScope _(m_LocationMapLock);
@@ -275,4 +420,184 @@ CasContainerStrategy::MakeSnapshot()
m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0);
}
+void
+CasContainerStrategy::OpenContainer(bool IsNewStore)
+{
+ std::filesystem::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas");
+ std::filesystem::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx");
+ std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog");
+
+ m_SmallObjectFile.Open(SobsPath, IsNewStore);
+ m_SmallObjectIndex.Open(SidxPath, IsNewStore);
+ m_CasLog.Open(SlogPath, IsNewStore);
+
+ // TODO: should validate integrity of container files here
+
+ m_CurrentInsertOffset = 0;
+ m_CurrentIndexOffset = 0;
+ m_TotalSize = 0;
+
+ m_LocationMap.clear();
+
+ uint64_t MaxFileOffset = 0;
+
+ m_CasLog.Replay([&](const CasDiskIndexEntry& Record) {
+ if (Record.Flags & CasDiskIndexEntry::kTombstone)
+ {
+ m_TotalSize.fetch_sub(Record.Location.GetSize());
+ }
+ else
+ {
+ m_TotalSize.fetch_add(Record.Location.GetSize());
+ m_LocationMap[Record.Key] = Record.Location;
+ MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize());
+ }
+ });
+
+ m_CurrentInsertOffset = (MaxFileOffset + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1);
+ m_CurrentIndexOffset = m_SmallObjectIndex.FileSize();
+}
+
+void
+CasContainerStrategy::CloseContainer()
+{
+ m_SmallObjectFile.Close();
+ m_SmallObjectIndex.Close();
+ m_CasLog.Close();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("cas.compact.gc")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+
+ CreateDirectories(CasConfig.RootDirectory);
+
+ const int kIterationCount = 1000;
+
+ std::vector<IoHash> Keys(kIterationCount);
+
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 16, true);
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "id" << i;
+ CbObject Obj = Cbo.Save();
+
+ IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
+ const IoHash Hash = HashBuffer(ObjBuffer);
+
+ Cas.InsertChunk(ObjBuffer, Hash);
+
+ Keys[i] = Hash;
+ }
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ IoBuffer Chunk = Cas.FindChunk(Keys[i]);
+
+ CHECK(!!Chunk);
+
+ CbObject Value = LoadCompactBinaryObject(Chunk);
+
+ CHECK_EQ(Value["id"].AsInt32(), i);
+ }
+ }
+
+ // Validate that we can still read the inserted data after closing
+ // the original cas store
+
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 16, false);
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ IoBuffer Chunk = Cas.FindChunk(Keys[i]);
+
+ CHECK(!!Chunk);
+
+ CbObject Value = LoadCompactBinaryObject(Chunk);
+
+ CHECK_EQ(Value["id"].AsInt32(), i);
+ }
+
+ GcContext Ctx;
+ Cas.CollectGarbage(Ctx);
+ }
+}
+
+TEST_CASE("cas.compact.totalsize")
+{
+ std::random_device rd;
+ std::mt19937 g(rd());
+
+ const auto CreateChunk = [&](uint64_t Size) -> IoBuffer {
+ const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t));
+ std::vector<uint32_t> Values;
+ Values.resize(Count);
+ for (size_t Idx = 0; Idx < Count; ++Idx)
+ {
+ Values[Idx] = static_cast<uint32_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
+
+ return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t));
+ };
+
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+
+ CreateDirectories(CasConfig.RootDirectory);
+
+ const uint64_t kChunkSize = 1024;
+ const int32_t kChunkCount = 16;
+
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 16, true);
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ const IoHash Hash = HashBuffer(Chunk);
+ auto InsertResult = Cas.InsertChunk(Chunk, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ }
+
+ const uint64_t TotalSize = Cas.TotalSize();
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ }
+
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 16, false);
+
+ const uint64_t TotalSize = Cas.TotalSize();
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ }
+}
+
+#endif
+
+void
+compactcas_forcelink()
+{
+}
+
} // namespace zen