diff options
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 395 |
1 files changed, 360 insertions, 35 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 612f87c7c..f3fcbca28 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -4,22 +4,39 @@ #include "CompactCas.h" +#include <zencore/compactbinarybuilder.h> #include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory.h> #include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> #include <zencore/thread.h> #include <zencore/uid.h> +#include <zenstore/gc.h> + #include <filesystem> #include <functional> #include <gsl/gsl-lite.hpp> +#if ZEN_WITH_TESTS +# include <algorithm> +# include <random> +#endif + ////////////////////////////////////////////////////////////////////////// namespace zen { -CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config) : m_Config(Config) +using namespace fmt::literals; + +CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc) +: GcStorage(Gc) +, m_Config(Config) +, m_Log(logging::Get("containercas")) { } @@ -36,35 +53,9 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6 m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; - std::string BaseName(ContainerBaseName); - std::filesystem::path SobsPath = m_Config.RootDirectory / (BaseName + ".ucas"); - std::filesystem::path SidxPath = m_Config.RootDirectory / (BaseName + ".uidx"); - std::filesystem::path SlogPath = m_Config.RootDirectory / (BaseName + ".ulog"); - - m_SmallObjectFile.Open(SobsPath, IsNewStore); - m_SmallObjectIndex.Open(SidxPath, IsNewStore); - m_CasLog.Open(SlogPath, IsNewStore); - - // TODO: should validate integrity of container files here - - uint64_t MaxFileOffset = 0; - - { - // This is not technically necessary (nobody should be accessing us from - // another thread at this stage) but may help static analysis - - RwLock::ExclusiveLockScope _(m_LocationMapLock); + OpenContainer(IsNewStore); - m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { - m_LocationMap[Record.Key] = Record.Location; - - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset + Record.Location.Size); - }); - } - - m_CurrentInsertOffset = (MaxFileOffset + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); - m_CurrentIndexOffset = m_SmallObjectIndex.FileSize(); - m_IsInitialized = true; + m_IsInitialized = true; } CasStore::InsertResult @@ -91,12 +82,13 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const RwLock::ExclusiveLockScope __(m_LocationMapLock); - CasDiskLocation Location{.Offset = InsertOffset, .Size = /* TODO FIX */ uint32_t(ChunkSize)}; + const CasDiskLocation Location{InsertOffset, ChunkSize}; m_LocationMap[ChunkHash] = Location; CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize)); m_CasLog.Append(IndexEntry); return CasStore::InsertResult{.New = true}; @@ -116,7 +108,8 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { const CasDiskLocation& Location = KeyIt->second; - return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.Offset, Location.Size); + + return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize()); } // Not found @@ -187,11 +180,11 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) for (auto& Entry : m_LocationMap) { - const uint64_t EntryOffset = Entry.second.Offset; + const uint64_t EntryOffset = Entry.second.GetOffset(); if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { - const uint64_t EntryEnd = EntryOffset + Entry.second.Size; + const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize(); if (EntryEnd >= WindowEnd) { @@ -201,7 +194,8 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) } const IoHash ComputedHash = - IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, Entry.second.Size); + IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart, + Entry.second.GetSize()); if (Entry.first != ComputedHash) { @@ -222,7 +216,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) for (const CasDiskIndexEntry& Entry : BigChunks) { IoHashStream Hasher; - m_SmallObjectFile.StreamByteRange(Entry.Location.Offset, Entry.Location.Size, [&](const void* Data, uint64_t Size) { + m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); @@ -247,6 +241,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) for (const CasDiskIndexEntry& Entry : BadChunks) { BadChunkHashes.push_back(Entry.Key); + m_CasLog.Append({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone}); m_LocationMap.erase(Entry.Key); } @@ -258,6 +253,156 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) } void +CasContainerStrategy::CollectGarbage(GcContext& GcCtx) +{ + namespace fs = std::filesystem; + + // A naive garbage collection implementation that just copies evicted chunks + // into a new container file. We probably need to partition the container file + // into several parts to prevent needing to keep the entire container file during GC. + + ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); + + RwLock::ExclusiveLockScope _(m_LocationMapLock); + + Flush(); + + std::vector<IoHash> Candidates; + std::vector<IoHash> Keep; + const uint64_t ChunkCount = m_LocationMap.size(); + uint64_t TotalSize{}; + + Candidates.reserve(m_LocationMap.size()); + + for (auto& Entry : m_LocationMap) + { + Candidates.push_back(Entry.first); + TotalSize += Entry.second.GetSize(); + } + + Keep.reserve(Candidates.size()); + GcCtx.FilterCas(Candidates, [&](const IoHash& Hash) { Keep.push_back(Hash); }); + + if (m_LocationMap.empty() || Keep.size() == m_LocationMap.size()) + { + ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete", + ChunkCount, + NiceBytes(TotalSize), + m_Config.RootDirectory / m_ContainerBaseName); + return; + } + + const uint64_t NewChunkCount = Keep.size(); + uint64_t NewTotalSize = 0; + + for (const IoHash& Key : Keep) + { + const CasDiskLocation& Loc = m_LocationMap[Key]; + NewTotalSize += Loc.GetSize(); + } + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); + if (Error) + { + ZEN_ERROR("get disk space FAILED, reason '{}'", Error.message()); + return; + } + + if (Space.Free < NewTotalSize + (64 << 20)) + { + ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", + m_Config.RootDirectory / m_ContainerBaseName, + NiceBytes(NewTotalSize), + NiceBytes(Space.Free)); + return; + } + + const bool CollectSmallObjects = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + + if (!CollectSmallObjects) + { + ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_Config.RootDirectory / m_ContainerBaseName, + ChunkCount - NewChunkCount, + NiceBytes(TotalSize - NewTotalSize), + ChunkCount, + NiceBytes(TotalSize)); + return; + } + + fs::path TmpSobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ucas"); + fs::path TmpSlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ulog"); + + { + ZEN_DEBUG("creating temporary container cas '{}'...", TmpSobsPath); + + TCasLogFile<CasDiskIndexEntry> TmpLog; + BasicFile TmpObjectFile; + bool IsNew = true; + + TmpLog.Open(TmpSlogPath, IsNew); + TmpObjectFile.Open(TmpSobsPath, IsNew); + + std::vector<uint8_t> Chunk; + uint64_t NextInsertOffset{}; + + for (const IoHash& Key : Keep) + { + const auto Entry = m_LocationMap.find(Key); + const auto& Loc = Entry->second; + + Chunk.resize(Loc.GetSize()); + m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), Loc.GetOffset()); + + const uint64_t InsertOffset = NextInsertOffset; + TmpObjectFile.Write(Chunk.data(), Chunk.size(), InsertOffset); + TmpLog.Append({.Key = Key, .Location = {InsertOffset, Chunk.size()}}); + + NextInsertOffset = (NextInsertOffset + Chunk.size() + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); + } + } + + try + { + CloseContainer(); + + fs::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas"); + fs::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx"); + fs::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); + + fs::remove(SobsPath); + fs::remove(SidxPath); + fs::remove(SlogPath); + + fs::rename(TmpSobsPath, SobsPath); + fs::rename(TmpSlogPath, SlogPath); + + { + // Create a new empty index file + BasicFile SidxFile; + SidxFile.Open(SidxPath, true); + } + + OpenContainer(false /* IsNewStore */); + + ZEN_INFO("garbage collect from '{}' DONE, collected #{} {} chunks of total #{} {}", + m_Config.RootDirectory / m_ContainerBaseName, + ChunkCount - NewChunkCount, + NiceBytes(TotalSize - NewTotalSize), + ChunkCount, + NiceBytes(TotalSize)); + } + catch (std::exception& Err) + { + ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); + + // Something went wrong, try create a new container + OpenContainer(true /* IsNewStore */); + } +} + +void CasContainerStrategy::MakeSnapshot() { RwLock::SharedLockScope _(m_LocationMapLock); @@ -275,4 +420,184 @@ CasContainerStrategy::MakeSnapshot() m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0); } +void +CasContainerStrategy::OpenContainer(bool IsNewStore) +{ + std::filesystem::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas"); + std::filesystem::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx"); + std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); + + m_SmallObjectFile.Open(SobsPath, IsNewStore); + m_SmallObjectIndex.Open(SidxPath, IsNewStore); + m_CasLog.Open(SlogPath, IsNewStore); + + // TODO: should validate integrity of container files here + + m_CurrentInsertOffset = 0; + m_CurrentIndexOffset = 0; + m_TotalSize = 0; + + m_LocationMap.clear(); + + uint64_t MaxFileOffset = 0; + + m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { + if (Record.Flags & CasDiskIndexEntry::kTombstone) + { + m_TotalSize.fetch_sub(Record.Location.GetSize()); + } + else + { + m_TotalSize.fetch_add(Record.Location.GetSize()); + m_LocationMap[Record.Key] = Record.Location; + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize()); + } + }); + + m_CurrentInsertOffset = (MaxFileOffset + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); + m_CurrentIndexOffset = m_SmallObjectIndex.FileSize(); +} + +void +CasContainerStrategy::CloseContainer() +{ + m_SmallObjectFile.Close(); + m_SmallObjectIndex.Close(); + m_CasLog.Close(); +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +TEST_CASE("cas.compact.gc") +{ + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + + CreateDirectories(CasConfig.RootDirectory); + + const int kIterationCount = 1000; + + std::vector<IoHash> Keys(kIterationCount); + + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 16, true); + + for (int i = 0; i < kIterationCount; ++i) + { + CbObjectWriter Cbo; + Cbo << "id" << i; + CbObject Obj = Cbo.Save(); + + IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); + const IoHash Hash = HashBuffer(ObjBuffer); + + Cas.InsertChunk(ObjBuffer, Hash); + + Keys[i] = Hash; + } + + for (int i = 0; i < kIterationCount; ++i) + { + IoBuffer Chunk = Cas.FindChunk(Keys[i]); + + CHECK(!!Chunk); + + CbObject Value = LoadCompactBinaryObject(Chunk); + + CHECK_EQ(Value["id"].AsInt32(), i); + } + } + + // Validate that we can still read the inserted data after closing + // the original cas store + + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 16, false); + + for (int i = 0; i < kIterationCount; ++i) + { + IoBuffer Chunk = Cas.FindChunk(Keys[i]); + + CHECK(!!Chunk); + + CbObject Value = LoadCompactBinaryObject(Chunk); + + CHECK_EQ(Value["id"].AsInt32(), i); + } + + GcContext Ctx; + Cas.CollectGarbage(Ctx); + } +} + +TEST_CASE("cas.compact.totalsize") +{ + std::random_device rd; + std::mt19937 g(rd()); + + const auto CreateChunk = [&](uint64_t Size) -> IoBuffer { + const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t)); + std::vector<uint32_t> Values; + Values.resize(Count); + for (size_t Idx = 0; Idx < Count; ++Idx) + { + Values[Idx] = static_cast<uint32_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t)); + }; + + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + + CreateDirectories(CasConfig.RootDirectory); + + const uint64_t kChunkSize = 1024; + const int32_t kChunkCount = 16; + + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 16, true); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateChunk(kChunkSize); + const IoHash Hash = HashBuffer(Chunk); + auto InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + } + + const uint64_t TotalSize = Cas.TotalSize(); + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } + + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 16, false); + + const uint64_t TotalSize = Cas.TotalSize(); + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } +} + +#endif + +void +compactcas_forcelink() +{ +} + } // namespace zen |