diff options
| author | Per Larsson <[email protected]> | 2021-12-01 16:17:30 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-12-01 16:17:30 +0100 |
| commit | 9045ebeb0f1bf4290013749482a8ee8f9c007088 (patch) | |
| tree | b9f7345ea400e8e8c3d1d493dd23299442593cb6 /zenstore/compactcas.cpp | |
| parent | Added CacheStore and CAS store sizes to status endpoint. (diff) | |
| download | zen-9045ebeb0f1bf4290013749482a8ee8f9c007088.tar.xz zen-9045ebeb0f1bf4290013749482a8ee8f9c007088.zip | |
Added naive container CAS GC support.
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 228 |
1 files changed, 187 insertions, 41 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 584db496b..fd4b9441e 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -7,6 +7,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/except.h> #include <zencore/filesystem.h> +#include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory.h> #include <zencore/string.h> @@ -30,7 +31,12 @@ namespace zen { -CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config) : m_Config(Config) +using namespace fmt::literals; + +CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc) +: GcStorage(Gc) +, m_Config(Config) +, m_Log(logging::Get("containercas")) { } @@ -47,42 +53,9 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6 m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; - std::string BaseName(ContainerBaseName); - std::filesystem::path SobsPath = m_Config.RootDirectory / (BaseName + ".ucas"); - std::filesystem::path SidxPath = m_Config.RootDirectory / (BaseName + ".uidx"); - std::filesystem::path SlogPath = m_Config.RootDirectory / (BaseName + ".ulog"); - - m_SmallObjectFile.Open(SobsPath, IsNewStore); - m_SmallObjectIndex.Open(SidxPath, IsNewStore); - m_CasLog.Open(SlogPath, IsNewStore); - - // TODO: should validate integrity of container files here + OpenContainer(IsNewStore); - uint64_t MaxFileOffset = 0; - - { - // This is not technically necessary (nobody should be accessing us from - // another thread at this stage) but may help static analysis - - RwLock::ExclusiveLockScope _(m_LocationMapLock); - - m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { - if (Record.Flags & CasDiskIndexEntry::kTombstone) - { - m_TotalSize.fetch_sub(Record.Location.GetSize()); - } - else - { - m_TotalSize.fetch_add(Record.Location.GetSize()); - m_LocationMap[Record.Key] = Record.Location; - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize()); - } - }); - } - - m_CurrentInsertOffset = (MaxFileOffset + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); - m_CurrentIndexOffset = m_SmallObjectIndex.FileSize(); - m_IsInitialized = true; + m_IsInitialized = true; } CasStore::InsertResult @@ -282,7 +255,130 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) void CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { - ZEN_UNUSED(GcCtx); + namespace fs = std::filesystem; + + ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); + + RwLock::ExclusiveLockScope _(m_LocationMapLock); + + Flush(); + + std::vector<IoHash> Candidates; + std::vector<IoHash> Keep; + const uint64_t ChunkCount = m_LocationMap.size(); + uint64_t TotalSize{}; + + Candidates.reserve(m_LocationMap.size()); + + for (auto& Entry : m_LocationMap) + { + Candidates.push_back(Entry.first); + TotalSize += Entry.second.GetSize(); + } + + Keep.reserve(Candidates.size()); + GcCtx.FilterCas(Candidates, [&](const IoHash& Hash) { Keep.push_back(Hash); }); + + if (m_LocationMap.empty() || Keep.size() == m_LocationMap.size()) + { + ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete", + ChunkCount, + NiceBytes(TotalSize), + m_Config.RootDirectory / m_ContainerBaseName); + return; + } + + const uint64_t NewChunkCount = Keep.size(); + uint64_t NewTotalSize = 0; + + for (const IoHash& Key : Keep) + { + const CasDiskLocation& Loc = m_LocationMap[Key]; + NewTotalSize += Loc.GetSize(); + } + + const bool GcEnabled = GcCtx.IsDeletionMode() && GcCtx.IsContainerGcEnabled(); + + if (GcEnabled) + { + ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_Config.RootDirectory / m_ContainerBaseName, + ChunkCount - NewChunkCount, + NiceBytes(TotalSize - NewTotalSize), + ChunkCount, + NiceBytes(TotalSize)); + return; + } + + fs::path TmpSobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ucas"); + fs::path TmpSlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ulog"); + + { + ZEN_DEBUG("creating temporary container cas '{}'...", TmpSobsPath); + + TCasLogFile<CasDiskIndexEntry> TmpLog; + BasicFile TmpObjectFile; + bool IsNew = true; + + TmpLog.Open(TmpSlogPath, IsNew); + TmpObjectFile.Open(TmpSobsPath, IsNew); + + std::vector<uint8_t> Chunk; + uint64_t NextInsertOffset{}; + + for (const IoHash& Key : Keep) + { + const auto Entry = m_LocationMap.find(Key); + const auto& Loc = Entry->second; + + Chunk.resize(Loc.GetSize()); + m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), Loc.GetOffset()); + + const uint64_t InsertOffset = NextInsertOffset; + TmpObjectFile.Write(Chunk.data(), Chunk.size(), InsertOffset); + TmpLog.Append({.Key = Key, .Location = {InsertOffset, Chunk.size()}}); + + NextInsertOffset = (NextInsertOffset + Chunk.size() + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); + } + } + + try + { + CloseContainer(); + + fs::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas"); + fs::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx"); + fs::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); + + fs::remove(SobsPath); + fs::remove(SidxPath); + fs::remove(SlogPath); + + fs::rename(TmpSobsPath, SobsPath); + fs::rename(TmpSlogPath, SlogPath); + + { + // Create a new empty index file + BasicFile SidxFile; + SidxFile.Open(SidxPath, true); + } + + OpenContainer(false /* IsNewStore */); + + ZEN_INFO("garbage collect from '{}' DONE, collected #{} {} chunks of total #{} {}", + m_Config.RootDirectory / m_ContainerBaseName, + ChunkCount - NewChunkCount, + NiceBytes(TotalSize - NewTotalSize), + ChunkCount, + NiceBytes(TotalSize)); + } + catch (std::exception& Err) + { + ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); + + // Something went wrong, try create a new container + OpenContainer(true /* IsNewStore */); + } } void @@ -303,6 +399,52 @@ CasContainerStrategy::MakeSnapshot() m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0); } +void +CasContainerStrategy::OpenContainer(bool IsNewStore) +{ + std::filesystem::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas"); + std::filesystem::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx"); + std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); + + m_SmallObjectFile.Open(SobsPath, IsNewStore); + m_SmallObjectIndex.Open(SidxPath, IsNewStore); + m_CasLog.Open(SlogPath, IsNewStore); + + // TODO: should validate integrity of container files here + + m_CurrentInsertOffset = 0; + m_CurrentIndexOffset = 0; + m_TotalSize = 0; + + m_LocationMap.clear(); + + uint64_t MaxFileOffset = 0; + + m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { + if (Record.Flags & CasDiskIndexEntry::kTombstone) + { + m_TotalSize.fetch_sub(Record.Location.GetSize()); + } + else + { + m_TotalSize.fetch_add(Record.Location.GetSize()); + m_LocationMap[Record.Key] = Record.Location; + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize()); + } + }); + + m_CurrentInsertOffset = (MaxFileOffset + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); + m_CurrentIndexOffset = m_SmallObjectIndex.FileSize(); +} + +void +CasContainerStrategy::CloseContainer() +{ + m_SmallObjectFile.Close(); + m_SmallObjectIndex.Close(); + m_CasLog.Close(); +} + ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS @@ -321,7 +463,8 @@ TEST_CASE("cas.compact.gc") std::vector<IoHash> Keys(kIterationCount); { - CasContainerStrategy Cas(CasConfig); + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 16, true); for (int i = 0; i < kIterationCount; ++i) @@ -354,7 +497,8 @@ TEST_CASE("cas.compact.gc") // the original cas store { - CasContainerStrategy Cas(CasConfig); + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 16, false); for (int i = 0; i < kIterationCount; ++i) @@ -402,7 +546,8 @@ TEST_CASE("cas.compact.totalsize") const int32_t kChunkCount = 16; { - CasContainerStrategy Cas(CasConfig); + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 16, true); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) @@ -418,7 +563,8 @@ TEST_CASE("cas.compact.totalsize") } { - CasContainerStrategy Cas(CasConfig); + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 16, false); const uint64_t TotalSize = Cas.TotalSize(); |