diff options
| -rw-r--r-- | zenstore/cas.cpp | 4 | ||||
| -rw-r--r-- | zenstore/chunkbundler.cpp | 1242 | ||||
| -rw-r--r-- | zenstore/chunkbundler.h | 105 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 960 | ||||
| -rw-r--r-- | zenstore/compactcas.h | 56 |
5 files changed, 737 insertions, 1630 deletions
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp index a90e45c04..5b13e27fd 100644 --- a/zenstore/cas.cpp +++ b/zenstore/cas.cpp @@ -150,8 +150,8 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) // Initialize payload storage m_LargeStrategy.Initialize(IsNewStore); - m_TinyStrategy.Initialize("tobs", 16, IsNewStore); - m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); + m_TinyStrategy.Initialize("tobs", 1L << 30, 16, IsNewStore); + m_SmallStrategy.Initialize("sobs", 1L << 30, 4096, IsNewStore); } bool diff --git a/zenstore/chunkbundler.cpp b/zenstore/chunkbundler.cpp deleted file mode 100644 index 0e81dc8dc..000000000 --- a/zenstore/chunkbundler.cpp +++ /dev/null @@ -1,1242 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "chunkbundler.h" - -#include <zencore/except.h> -#include <zencore/fmtutils.h> -#include <zencore/iobuffer.h> -#include <zencore/logging.h> -#include <zencore/testing.h> - -#if ZEN_WITH_TESTS -# include <zencore/compactbinary.h> -# include <zencore/compactbinarybuilder.h> -# include <zencore/compress.h> -# include <zencore/filesystem.h> -# include <zencore/testutils.h> -# include <algorithm> -# include <random> -#endif - -namespace zen { - -static uint64_t -AlignPositon(uint64_t Offset, uint64_t Alignment) -{ - return (Offset + Alignment - 1) & ~(Alignment - 1); -} - -ChunkBundler::ChunkBundler(std::filesystem::path RootDirectory, ChunkBundlerValidator* Validator) -: m_Validator(Validator) -, m_Log(logging::Get("chunkbundler")) -, m_RootDirectory(RootDirectory) -{ -} - -ChunkBundler::~ChunkBundler() -{ -} - -void -ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t MaxBlockSize, uint64_t Alignment, bool IsNewStore) -{ - ZEN_ASSERT(IsPow2(Alignment)); - ZEN_ASSERT(!m_IsInitialized); - ZEN_ASSERT(MaxBlockSize > 0); - - m_ContainerBaseName = ContainerBaseName; - m_PayloadAlignment = Alignment; - m_MaxBlockSize = MaxBlockSize; - - std::filesystem::path SobsPath = m_RootDirectory / (m_ContainerBaseName + ".ucas"); - std::filesystem::path SlogPath = m_RootDirectory / (m_ContainerBaseName + ".ulog"); - - m_TotalSize = 0; - - m_LocationMap.clear(); - - std::filesystem::path SidxPath = m_RootDirectory / (m_ContainerBaseName + ".uidx"); - if (std::filesystem::exists(SidxPath)) - { - BasicFile SmallObjectIndex; - SmallObjectIndex.Open(SidxPath, false); - uint64_t Size = SmallObjectIndex.FileSize(); - uint64_t EntryCount = Size / sizeof(CompactDiskIndexEntry); - std::vector<CompactDiskIndexEntry> Entries{EntryCount}; - SmallObjectIndex.Read(Entries.data(), Size, 0); - for (const auto& Entry : Entries) - { - m_LocationMap[Entry.Key] = Entry.Location; - } - SmallObjectIndex.Close(); - } - - m_OpLog.Open(SlogPath, IsNewStore); - m_OpLog.Replay([&](const CompactDiskIndexEntry& Record) { - if (Record.Flags & CompactDiskIndexEntry::kTombstone) - { - m_LocationMap.erase(Record.Key); - } - else - { - m_LocationMap[Record.Key] = Record.Location; - } - }); - - std::unordered_set<uint16_t> ReferencedBlockIndexes; - for (const auto& Entry : m_LocationMap) - { - const auto& Location = Entry.second; - m_TotalSize.fetch_add(Location.Size); - ReferencedBlockIndexes.insert(Location.BlockIndex); - } - - uint32_t SmallestBlockSize = 0xffffffffu; - CreateDirectories(m_RootDirectory / "ucas"); - for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_RootDirectory / "ucas")) - { - if (Entry.is_regular_file()) - { - if (IsNewStore) - { - std::filesystem::remove(Entry.path()); - continue; - } - if (Entry.path().extension() == ".ucas") - { - try - { - std::string FileName = Entry.path().stem().string(); - uint16_t BlockIndex = static_cast<uint16_t>(std::stoi(FileName)); - if (!ReferencedBlockIndexes.contains(BlockIndex)) - { - // Clear out unused blocks - std::filesystem::remove(Entry.path()); - continue; - } - auto SmallObjectFile = std::make_shared<BasicFile>(); - SmallObjectFile->Open(Entry.path(), false); - m_OpenBlocks[BlockIndex] = SmallObjectFile; - if (SmallObjectFile->FileSize() < SmallestBlockSize) - { - m_CurrentBlockIndex = BlockIndex; - SmallestBlockSize = gsl::narrow<std::uint32_t>(SmallObjectFile->FileSize()); - } - } - catch (const std::invalid_argument&) - { - // Non-valid file, skip it (or should we remove it?) - } - } - } - } - if (m_OpenBlocks.empty()) - { - std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentBlockIndex) + ".ucas"); - auto SmallObjectFile = std::make_shared<BasicFile>(); - SmallObjectFile->Open(path, true); - m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; - m_CurrentBlock = SmallObjectFile; - m_CurrentInsertOffset = 0; - } - else - { - m_CurrentBlock = m_OpenBlocks[m_CurrentBlockIndex]; - m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(m_CurrentBlock.lock()->FileSize(), m_PayloadAlignment)); - } - - // TODO: should validate integrity of container files here - - m_IsInitialized = true; -} - -ChunkBundler::InsertResult -ChunkBundler::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) -{ - RwLock::ExclusiveLockScope _i(m_InsertLock); - - { - RwLock::SharedLockScope _l(m_LocationMapLock); - auto KeyIt = m_LocationMap.find(ChunkHash); - - if (KeyIt != m_LocationMap.end()) - { - return InsertResult{.New = false}; - } - } - - // New entry - - uint64_t CurrentBlockSize = m_CurrentBlock.lock()->FileSize(); - if (CurrentBlockSize + m_CurrentInsertOffset > m_MaxBlockSize) - { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - uint16_t NewBlockIndex = m_CurrentBlockIndex + 1; - while (m_OpenBlocks.contains(NewBlockIndex)) - { - NewBlockIndex++; - if (NewBlockIndex == m_CurrentBlockIndex) - { - throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); - } - } - m_CurrentBlockIndex = NewBlockIndex; - std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentBlockIndex) + ".ucas"); - auto SmallObjectFile = std::make_shared<BasicFile>(); - SmallObjectFile->Open(path, true); - m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; - m_CurrentBlock = SmallObjectFile; - m_CurrentInsertOffset = 0; - } - const uint32_t InsertOffset = m_CurrentInsertOffset; - m_CurrentBlock.lock()->Write(ChunkData, ChunkSize, InsertOffset); - m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(InsertOffset + ChunkSize, m_PayloadAlignment)); - - const CompactDiskLocation Location{m_CurrentBlockIndex, InsertOffset, static_cast<uint32_t>(ChunkSize)}; - CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; - - RwLock::ExclusiveLockScope __(m_LocationMapLock); - m_LocationMap[ChunkHash] = Location; - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize)); - m_OpLog.Append(IndexEntry); - - return InsertResult{.New = true}; -} - -ChunkBundler::InsertResult -ChunkBundler::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) -{ - return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); -} - -IoBuffer -ChunkBundler::FindChunk(const IoHash& ChunkHash) -{ - RwLock::SharedLockScope _(m_LocationMapLock); - - if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) - { - const CompactDiskLocation& Location = KeyIt->second; - - if (auto BlockIt = m_OpenBlocks.find(Location.BlockIndex); BlockIt != m_OpenBlocks.end()) - { - return IoBufferBuilder::MakeFromFileHandle(BlockIt->second->Handle(), Location.Offset, Location.Size); - } - } - - // Not found - - return IoBuffer(); -} - -bool -ChunkBundler::HaveChunk(const IoHash& ChunkHash) -{ - RwLock::SharedLockScope _(m_LocationMapLock); - - return m_LocationMap.contains(ChunkHash); -} - -void -ChunkBundler::FilterChunks(CasChunkSet& InOutChunks) -{ - // This implementation is good enough for relatively small - // chunk sets (in terms of chunk identifiers), but would - // benefit from a better implementation which removes - // items incrementally for large sets, especially when - // we're likely to already have a large proportion of the - // chunks in the set - - InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); -} - -void -ChunkBundler::Flush() -{ - RwLock::ExclusiveLockScope _l(m_InsertLock); - m_OpLog.Flush(); - m_CurrentBlock.lock()->Flush(); -} - -void -ChunkBundler::Scrub(ScrubContext& Ctx) -{ - const uint64_t WindowSize = 4 * 1024 * 1024; - - std::vector<CompactDiskIndexEntry> BigChunks; - std::vector<CompactDiskIndexEntry> BadChunks; - - // We do a read sweep through the payloads file and validate - // any entries that are contained within each segment, with - // the assumption that most entries will be checked in this - // pass. An alternative strategy would be to use memory mapping. - - { - IoBuffer ReadBuffer{WindowSize}; - void* BufferBase = ReadBuffer.MutableData(); - - RwLock::SharedLockScope _(m_LocationMapLock); - - for (const auto& Block : m_OpenBlocks) - { - uint64_t WindowStart = 0; - uint64_t WindowEnd = WindowSize; - auto& SmallObjectFile = *Block.second; - const uint64_t FileSize = SmallObjectFile.FileSize(); - - do - { - const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); - SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); - - for (auto& Entry : m_LocationMap) - { - const uint64_t EntryOffset = Entry.second.Offset; - - if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) - { - const uint64_t EntryEnd = EntryOffset + Entry.second.Size; - - if (EntryEnd >= WindowEnd) - { - BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); - - continue; - } - - if (m_Validator && - !m_Validator->ValidateChunk(IoBuffer(IoBuffer::Wrap, - reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, - Entry.second.Size), - Entry.first)) - { - // Hash mismatch - BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); - } - } - } - - WindowStart += WindowSize; - WindowEnd += WindowSize; - } while (WindowStart < FileSize); - } - - // TODO: Figure out API for this - // Deal with large chunks -#if 0 - for (const auto& _ : BigChunks) - { - auto& ___ = *m_OpenBlocks[Entry.Location.BlockIndex]; - /* IoHashStream Hasher; - m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) { - Hasher.Append(Data, Size); - }); - IoHash ComputedHash = Hasher.GetHash(); - - if (Entry.Key != ComputedHash) - { - BadChunks.push_back(Entry); - } - */ - } -#endif // 0 - } - - if (BadChunks.empty()) - { - return; - } - - ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName); - - // Deal with bad chunks by removing them from our lookup map - - std::vector<IoHash> BadChunkHashes; - - RwLock::ExclusiveLockScope _(m_LocationMapLock); - for (const CompactDiskIndexEntry& Entry : BadChunks) - { - BadChunkHashes.push_back(Entry.Key); - m_OpLog.Append({.Key = Entry.Key, .Location = Entry.Location, .Flags = CompactDiskIndexEntry::kTombstone}); - m_LocationMap.erase(Entry.Key); - } - - // Let whomever it concerns know about the bad chunks. This could - // be used to invalidate higher level data structures more efficiently - // than a full validation pass might be able to do - - Ctx.ReportBadCasChunks(BadChunkHashes); -} - -void -ChunkBundler::CollectGarbage(GcContext& GcCtx) -{ - namespace fs = std::filesystem; - - // It collects all the blocks that we want to delete chunks from. For each such - // block we keep a list of chunks to retain. - // - // It will first remove any chunks that are flushed from the m_LocationMap. - // - // It then checks to see if we want to purge any chunks that are in the currently - // active block. If so, we break off the current block and start on a new block, - // otherwise we just let the active block be. - // - // Next it will iterate over all blocks that we want to remove chunks from. - // If the block is empty after removal of chunks we mark the block as pending - // delete - we want to delete it as soon as there are no IoBuffers using the - // block file. - // - // If the block is non-empty we write out the chunks we want to keep to a new - // block file (creating new block files as needed). - // - // We update the index as we complete each new block file. This makes it possible - // to break the GC if we want to limit time for execution. - // - // GC can fairly parallell to regular operation - it will block while figuring - // out which chunks to remove and what blocks to rewrite but the actual - // reading and writing of data to new block files does not block regular operation. - // - // While moving blocks it will do a blocking operation and update the m_LocationMap - // after each new block is written and it will also block when figuring out the - // path to the next new block. - - ZEN_INFO("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName); - - std::unordered_map<uint64_t, size_t> BlockIndexToKeepChunksMap; - std::vector<std::unordered_map<IoHash, CompactDiskLocation, IoHash::Hasher>> KeepChunks; - std::vector<IoHash> DeletedChunks; - std::unordered_set<uint16_t> BlocksToReWrite; - { - RwLock::ExclusiveLockScope _i(m_InsertLock); - RwLock::ExclusiveLockScope _l(m_LocationMapLock); - - m_OpLog.Flush(); - m_CurrentBlock.lock()->Flush(); - - BlocksToReWrite.reserve(m_OpenBlocks.size()); - - if (m_LocationMap.empty()) - { - ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_RootDirectory / m_ContainerBaseName); - return; - } - - const uint64_t TotalChunkCount = m_LocationMap.size(); - uint64_t TotalSize = m_TotalSize.load(); - - std::vector<IoHash> TotalChunkHashes; - TotalChunkHashes.reserve(m_LocationMap.size()); - for (const auto& Entry : m_LocationMap) - { - TotalChunkHashes.push_back(Entry.first); - if (BlockIndexToKeepChunksMap.contains(Entry.second.BlockIndex)) - { - continue; - } - BlockIndexToKeepChunksMap[Entry.second.BlockIndex] = KeepChunks.size(); - KeepChunks.resize(KeepChunks.size() + 1); - } - - const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); - - uint64_t NewTotalSize = 0; - GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { - if (Keep) - { - auto KeyIt = m_LocationMap.find(ChunkHash); - const auto& ChunkLocation = KeyIt->second; - auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[ChunkLocation.BlockIndex]]; - ChunkMap[ChunkHash] = ChunkLocation; - NewTotalSize += ChunkLocation.Size; - } - else - { - DeletedChunks.push_back(ChunkHash); - } - }); - - if (!PerformDelete) - { - ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", - m_RootDirectory / m_ContainerBaseName, - DeletedChunks.size(), - NiceBytes(TotalSize - NewTotalSize), - TotalChunkCount, - NiceBytes(TotalSize)); - return; - } - - for (const auto& ChunkHash : DeletedChunks) - { - auto KeyIt = m_LocationMap.find(ChunkHash); - const auto& ChunkLocation = KeyIt->second; - BlocksToReWrite.insert(ChunkLocation.BlockIndex); - m_OpLog.Append({.Key = ChunkHash, .Location = ChunkLocation, .Flags = CompactDiskIndexEntry::kTombstone}); - m_LocationMap.erase(ChunkHash); - m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.Size)); - } - - if (BlocksToReWrite.contains(m_CurrentBlockIndex)) - { - uint16_t NewBlockIndex = m_CurrentBlockIndex + 1; - while (m_OpenBlocks.contains(NewBlockIndex)) - { - NewBlockIndex++; - if (NewBlockIndex == m_CurrentBlockIndex) - { - ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", - m_ContainerBaseName, - std::numeric_limits<uint16_t>::max() + 1); - return; - } - } - m_CurrentBlockIndex = NewBlockIndex; - std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentBlockIndex) + ".ucas"); - auto SmallObjectFile = std::make_shared<BasicFile>(); - SmallObjectFile->Open(path, true); - m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; - m_CurrentBlock = SmallObjectFile; - m_CurrentInsertOffset = 0; - } - } - - // Move all chunks in blocks that have chunks removed to new blocks - - std::shared_ptr<BasicFile> NewBlockFile; - uint64_t WriteOffset = {}; - uint16_t NewBlockIndex = {}; - std::unordered_map<IoHash, CompactDiskLocation> MovedBlocks; - - for (auto BlockIndex : BlocksToReWrite) - { - auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[BlockIndex]]; - if (ChunkMap.empty()) - { - // The block has no references to it, it should be removed as soon as no references is held on the file - // TODO: We currently don't know if someone is holding a IoBuffer for this block at this point! - - // std::filesystem::path BlockPath = m_RootDirectory / "ucas" / (std::to_string(BlockIndex) + ".ucas"); - // RwLock::ExclusiveLockScope _i(m_InsertLock); - // auto BlockFile = m_OpenBlocks[BlockIndex]; - // m_OpenBlocks.erase(BlockIndex); - // BlockFile->Close(); - // fs::remove(BlockPath); - continue; - } - - std::shared_ptr<BasicFile> BlockFile; - { - RwLock::ExclusiveLockScope _i(m_InsertLock); - BlockFile = m_OpenBlocks[BlockIndex]; - } - - { - std::vector<uint8_t> Chunk; - for (auto& Entry : ChunkMap) - { - const CompactDiskLocation& ChunkLocation = Entry.second; - Chunk.resize(ChunkLocation.Size); - BlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - - if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) - { - { - RwLock::ExclusiveLockScope _i(m_InsertLock); - if (NewBlockFile) - { - m_OpenBlocks[NewBlockIndex] = NewBlockFile; - RwLock::ExclusiveLockScope _l(m_LocationMapLock); - for (const auto& MovedEntry : MovedBlocks) - { - m_LocationMap[MovedEntry.first] = MovedEntry.second; - m_OpLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); - } - } - NewBlockIndex = m_CurrentBlockIndex + 1; - while (m_OpenBlocks.contains(NewBlockIndex)) - { - NewBlockIndex++; - if (NewBlockIndex == m_CurrentBlockIndex) - { - ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", - m_ContainerBaseName, - std::numeric_limits<uint16_t>::max() + 1); - return; - } - } - m_OpenBlocks[NewBlockIndex] = std::shared_ptr<BasicFile>(); // Make sure nobody steals this slot - } - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_RootDirectory, Error); - if (Error) - { - ZEN_ERROR("get disk space in {} FAILED, reason '{}'", m_ContainerBaseName, Error.message()); - return; - } - - if (Space.Free < (m_MaxBlockSize * 2)) // Never let GC steal the last block space - { - ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", - m_RootDirectory / m_ContainerBaseName, - m_MaxBlockSize * m_MaxBlockSize, - NiceBytes(Space.Free)); - RwLock::ExclusiveLockScope _i(m_InsertLock); - m_OpenBlocks.erase(NewBlockIndex); - return; - } - - std::filesystem::path NewBlockPath = m_RootDirectory / "ucas" / (std::to_string(NewBlockIndex) + ".ucas"); - NewBlockFile = std::make_shared<BasicFile>(); - NewBlockFile->Open(NewBlockPath, true); - MovedBlocks.clear(); - WriteOffset = 0; - } - - NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); - CompactDiskLocation NewChunkLocation(NewBlockIndex, - gsl::narrow<uint32_t>(WriteOffset), - gsl::narrow<uint32_t>(Chunk.size())); - Entry.second = {.BlockIndex = NewBlockIndex, - .Offset = gsl::narrow<uint32_t>(WriteOffset), - .Size = gsl::narrow<uint32_t>(Chunk.size())}; - MovedBlocks[Entry.first] = Entry.second; - WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment); - } - Chunk.clear(); - - // Remap moved chunks to the new block file - RwLock::ExclusiveLockScope _i(m_InsertLock); - if (NewBlockFile) - { - m_OpenBlocks[NewBlockIndex] = NewBlockFile; - RwLock::ExclusiveLockScope _l(m_LocationMapLock); - for (const auto& MovedEntry : MovedBlocks) - { - m_LocationMap[MovedEntry.first] = MovedEntry.second; - m_OpLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); - } - } - } - } - - GcCtx.DeletedCas(DeletedChunks); - - ZEN_INFO("garbage collection complete '{}', deleted {} chunks", m_RootDirectory / m_ContainerBaseName, DeletedChunks.size()); - - MakeIndexSnapshot(); -} - -void -ChunkBundler::MakeIndexSnapshot() -{ - ZEN_INFO("writing index snapshot for '{}'", m_RootDirectory / m_ContainerBaseName); - - namespace fs = std::filesystem; - - fs::path SlogPath = m_RootDirectory / (m_ContainerBaseName + ".ulog"); - fs::path SidxPath = m_RootDirectory / (m_ContainerBaseName + ".uidx"); - fs::path STmplogPath = m_RootDirectory / (m_ContainerBaseName + ".tmp.ulog"); - fs::path STmpSidxPath = m_RootDirectory / (m_ContainerBaseName + ".tmp.uidx"); - fs::path SRecoveredlogPath = m_RootDirectory / (m_ContainerBaseName + ".recover.ulog"); - - // Move log and index away, we keep them if something goes wrong, any new chunks will be added to the new log - { - RwLock::ExclusiveLockScope _(m_LocationMapLock); - m_OpLog.Close(); - - if (fs::exists(STmplogPath)) - { - fs::remove(STmplogPath); - } - if (fs::exists(STmpSidxPath)) - { - fs::remove(STmpSidxPath); - } - - fs::rename(SlogPath, STmplogPath); - if (fs::exists(SidxPath)) - { - fs::rename(SidxPath, STmpSidxPath); - } - - // Open an new log - m_OpLog.Open(SlogPath, true); - } - - try - { - // Write the current state of the location map to a new index state - std::vector<CompactDiskIndexEntry> Entries; - - { - RwLock::SharedLockScope _l(m_LocationMapLock); - Entries.resize(m_LocationMap.size()); - - uint64_t EntryIndex = 0; - for (auto& Entry : m_LocationMap) - { - CompactDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Location = Entry.second; - } - } - - BasicFile SmallObjectIndex; - SmallObjectIndex.Open(SidxPath, true); - SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CompactDiskIndexEntry), 0); - SmallObjectIndex.Close(); - } - catch (std::exception& Err) - { - ZEN_ERROR("snapshot FAILED, reason '{}'", Err.what()); - - // Reconstruct the log from old log and any added log entries - RwLock::ExclusiveLockScope _(m_LocationMapLock); - if (fs::exists(STmplogPath)) - { - std::vector<CompactDiskIndexEntry> Records; - Records.reserve(m_LocationMap.size()); - { - TCasLogFile<CompactDiskIndexEntry> OldOpLog; - OldOpLog.Open(STmplogPath, false); - OldOpLog.Replay([&](const CompactDiskIndexEntry& Record) { Records.push_back(Record); }); - } - { - m_OpLog.Replay([&](const CompactDiskIndexEntry& Record) { Records.push_back(Record); }); - } - - TCasLogFile<CompactDiskIndexEntry> RecoveredOpLog; - RecoveredOpLog.Open(SRecoveredlogPath, true); - for (const auto& Record : Records) - { - RecoveredOpLog.Append(Record); - } - RecoveredOpLog.Close(); - - fs::remove(SlogPath); - fs::rename(SRecoveredlogPath, SlogPath); - fs::remove(STmplogPath); - } - - if (fs::exists(SidxPath)) - { - fs::remove(SidxPath); - } - - // Restore any previous snapshot - if (fs::exists(STmpSidxPath)) - { - fs::remove(SidxPath); - fs::rename(STmpSidxPath, SidxPath); - } - } - if (fs::exists(STmpSidxPath)) - { - fs::remove(STmpSidxPath); - } -} - -GcStorageSize -ChunkBundler::StorageSize() const -{ - return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; -} - -////////////////////////////////////////////////////////////////////////// - -#if ZEN_WITH_TESTS - -namespace { - static IoBuffer CreateChunk(uint64_t Size) - { - static std::random_device rd; - static std::mt19937 g(rd()); - - std::vector<uint8_t> Values; - Values.resize(Size); - for (size_t Idx = 0; Idx < Size; ++Idx) - { - Values[Idx] = static_cast<uint8_t>(Idx); - } - std::shuffle(Values.begin(), Values.end(), g); - - return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); - } -} // namespace - -#endif - -TEST_CASE("chunkbundler.reopen") -{ - ScopedTemporaryDirectory TempDir; - - CreateDirectories(TempDir.Path()); - - const int kIterationCount = 1000; - - std::vector<IoHash> Keys(kIterationCount); - - { - ChunkBundler Store(TempDir.Path(), nullptr); - Store.Initialize("test", 65536, 16, true); - - for (int i = 0; i < kIterationCount; ++i) - { - CbObjectWriter Cbo; - Cbo << "id" << i; - CbObject Obj = Cbo.Save(); - - IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); - const IoHash Hash = HashBuffer(ObjBuffer); - - Store.InsertChunk(ObjBuffer, Hash); - - Keys[i] = Hash; - } - - for (int i = 0; i < kIterationCount; ++i) - { - IoBuffer Chunk = Store.FindChunk(Keys[i]); - - CHECK(!!Chunk); - - CbObject Value = LoadCompactBinaryObject(Chunk); - - CHECK_EQ(Value["id"].AsInt32(), i); - } - } - - // Validate that we can still read the inserted data after closing - // the original store - - { - ChunkBundler Store(TempDir.Path(), nullptr); - Store.Initialize("test", 65536, 16, false); - - for (int i = 0; i < kIterationCount; ++i) - { - IoBuffer Chunk = Store.FindChunk(Keys[i]); - - CHECK(!!Chunk); - - CbObject Value = LoadCompactBinaryObject(Chunk); - - CHECK_EQ(Value["id"].AsInt32(), i); - } - - GcContext Ctx; - Store.CollectGarbage(Ctx); - } -} - -TEST_CASE("chunkbundler.totalsize") -{ - std::random_device rd; - std::mt19937 g(rd()); - - const auto CreateChunk = [&](uint64_t Size) -> IoBuffer { - const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t)); - std::vector<uint32_t> Values; - Values.resize(Count); - for (size_t Idx = 0; Idx < Count; ++Idx) - { - Values[Idx] = static_cast<uint32_t>(Idx); - } - std::shuffle(Values.begin(), Values.end(), g); - - return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t)); - }; - - ScopedTemporaryDirectory TempDir; - - CreateDirectories(TempDir.Path()); - - const uint64_t kChunkSize = 1024; - const int32_t kChunkCount = 16; - - { - ChunkBundler Store(TempDir.Path(), nullptr); - Store.Initialize("test", 65536, 16, true); - - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) - { - IoBuffer Chunk = CreateChunk(kChunkSize); - const IoHash Hash = HashBuffer(Chunk); - auto InsertResult = Store.InsertChunk(Chunk, Hash); - ZEN_ASSERT(InsertResult.New); - } - - const uint64_t TotalSize = Store.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); - } - - { - ChunkBundler Store(TempDir.Path(), nullptr); - Store.Initialize("test", 65536, 16, false); - - const uint64_t TotalSize = Store.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); - } -} - -TEST_CASE("chunkbundler.gc.basic") -{ - ScopedTemporaryDirectory TempDir; - - ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); - - ChunkBundlerStore.Initialize("cb", 65536, 1 << 4, true); - - IoBuffer Chunk = CreateChunk(128); - IoHash ChunkHash = IoHash::HashBuffer(Chunk); - - const auto InsertResult = ChunkBundlerStore.InsertChunk(Chunk, ChunkHash); - - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - - ChunkBundlerStore.CollectGarbage(GcCtx); - - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHash)); -} - -TEST_CASE("chunkbundler.gc.compact") -{ - ScopedTemporaryDirectory TempDir; - - ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); - ChunkBundlerStore.Initialize("cb", 2048, 1 << 4, true); - - uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; - std::vector<IoBuffer> Chunks; - Chunks.reserve(9); - for (const auto& Size : ChunkSizes) - { - Chunks.push_back(CreateChunk(Size)); - } - - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(9); - for (const auto& Chunk : Chunks) - { - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - CHECK(ChunkBundlerStore.InsertChunk(Chunks[0], ChunkHashes[0]).New); - CHECK(ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]).New); - CHECK(ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]).New); - CHECK(ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]).New); - CHECK(ChunkBundlerStore.InsertChunk(Chunks[4], ChunkHashes[4]).New); - CHECK(ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]).New); - CHECK(ChunkBundlerStore.InsertChunk(Chunks[6], ChunkHashes[6]).New); - CHECK(ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]).New); - CHECK(ChunkBundlerStore.InsertChunk(Chunks[8], ChunkHashes[8]).New); - - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[0])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[1])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[2])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[3])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[4])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[5])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[6])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[7])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8])); - - auto InitialSize = ChunkBundlerStore.StorageSize().DiskSize; - - // Keep first and last - { - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[0]); - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); - - ChunkBundlerStore.CollectGarbage(GcCtx); - - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[0])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[2])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[4])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[6])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[7])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[0] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8]))); - } - - ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]); - ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]); - ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]); - ChunkBundlerStore.InsertChunk(Chunks[4], ChunkHashes[4]); - ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]); - ChunkBundlerStore.InsertChunk(Chunks[6], ChunkHashes[6]); - ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]); - - // Keep last - { - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); - - ChunkBundlerStore.CollectGarbage(GcCtx); - - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[0])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[2])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[4])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[6])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[7])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8]))); - - ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]); - ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]); - ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]); - ChunkBundlerStore.InsertChunk(Chunks[4], ChunkHashes[4]); - ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]); - ChunkBundlerStore.InsertChunk(Chunks[6], ChunkHashes[6]); - ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]); - } - - // Keep mixed - { - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[1]); - KeepChunks.push_back(ChunkHashes[4]); - KeepChunks.push_back(ChunkHashes[7]); - GcCtx.ContributeCas(KeepChunks); - - ChunkBundlerStore.CollectGarbage(GcCtx); - - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[0])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[1])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[2])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[4])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[6])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[7])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[1] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[1]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[7]))); - - ChunkBundlerStore.InsertChunk(Chunks[0], ChunkHashes[0]); - ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]); - ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]); - ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]); - ChunkBundlerStore.InsertChunk(Chunks[6], ChunkHashes[6]); - ChunkBundlerStore.InsertChunk(Chunks[8], ChunkHashes[8]); - } - - // Keep multiple at end - { - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[6]); - KeepChunks.push_back(ChunkHashes[7]); - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); - - ChunkBundlerStore.CollectGarbage(GcCtx); - - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[0])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[2])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[4])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[6])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[7])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[6] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[7]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8]))); - - ChunkBundlerStore.InsertChunk(Chunks[0], ChunkHashes[0]); - ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]); - ChunkBundlerStore.InsertChunk(Chunks[2], ChunkHashes[2]); - ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]); - ChunkBundlerStore.InsertChunk(Chunks[4], ChunkHashes[4]); - ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]); - } - - // Keep every other - { - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - KeepChunks.push_back(ChunkHashes[0]); - KeepChunks.push_back(ChunkHashes[2]); - KeepChunks.push_back(ChunkHashes[4]); - KeepChunks.push_back(ChunkHashes[6]); - KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); - - ChunkBundlerStore.CollectGarbage(GcCtx); - - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[0])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[2])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[4])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[6])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[7])); - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8])); - - CHECK(ChunkHashes[0] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[2] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[2]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[6] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8]))); - - ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]); - ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]); - ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]); - ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]); - } - - // Verify that we nicely appended blocks even after all GC operations - CHECK(ChunkHashes[0] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[1] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[1]))); - CHECK(ChunkHashes[2] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[2]))); - CHECK(ChunkHashes[3] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[3]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[5] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[5]))); - CHECK(ChunkHashes[6] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[7]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8]))); - - auto FinalSize = ChunkBundlerStore.StorageSize().DiskSize; - CHECK(InitialSize == FinalSize); -} - -TEST_CASE("chunkbundler.gc.deleteblockonopen") -{ - ScopedTemporaryDirectory TempDir; - - uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; - std::vector<IoBuffer> Chunks; - Chunks.reserve(20); - for (const auto& Size : ChunkSizes) - { - Chunks.push_back(CreateChunk(Size)); - } - - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(20); - for (const auto& Chunk : Chunks) - { - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - { - ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); - ChunkBundlerStore.Initialize("cb", 1024, 1 << 4, true); - - for (size_t i = 0; i < 20; i++) - { - CHECK(ChunkBundlerStore.InsertChunk(Chunks[i], ChunkHashes[i]).New); - } - - // GC every other block - { - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - std::vector<IoHash> KeepChunks; - for (size_t i = 0; i < 20; i += 2) - { - KeepChunks.push_back(ChunkHashes[i]); - } - GcCtx.ContributeCas(KeepChunks); - - ChunkBundlerStore.CollectGarbage(GcCtx); - - for (size_t i = 0; i < 20; i += 2) - { - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[i])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[i + 1])); - CHECK(ChunkHashes[i] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[i]))); - } - } - } - { - // Re-open - ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); - ChunkBundlerStore.Initialize("cb", 1024, 1 << 4, false); - for (size_t i = 0; i < 20; i += 2) - { - CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[i])); - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[i + 1])); - CHECK(ChunkHashes[i] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[i]))); - } - } -} - -TEST_CASE("chunkbundler.gc.handleopeniobuffer") -{ - ScopedTemporaryDirectory TempDir; - - uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; - std::vector<IoBuffer> Chunks; - Chunks.reserve(20); - for (const auto& Size : ChunkSizes) - { - Chunks.push_back(CreateChunk(Size)); - } - - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(20); - for (const auto& Chunk : Chunks) - { - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); - ChunkBundlerStore.Initialize("cb", 1024, 1 << 4, true); - - for (size_t i = 0; i < 20; i++) - { - CHECK(ChunkBundlerStore.InsertChunk(Chunks[i], ChunkHashes[i]).New); - } - - auto RetainChunk = ChunkBundlerStore.FindChunk(ChunkHashes[5]); - - // GC everything - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - ChunkBundlerStore.CollectGarbage(GcCtx); - - for (size_t i = 0; i < 20; i++) - { - CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[i])); - } - - CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); -} - -void -chunkbundler_forcelink() -{ -} - -} // namespace zen diff --git a/zenstore/chunkbundler.h b/zenstore/chunkbundler.h deleted file mode 100644 index d8913aec6..000000000 --- a/zenstore/chunkbundler.h +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/zencore.h> - -#include <zenstore/basicfile.h> -#include <zenstore/cas.h> -#include <zenstore/caslog.h> -#include <zenstore/gc.h> - -#include <atomic> -#include <unordered_map> - -namespace spdlog { -class logger; -} - -namespace zen { - -class ChunkBundlerValidator -{ -public: - virtual bool ValidateChunk(IoBuffer Buffer, IoHash Key) = 0; -}; - -#pragma pack(push) -#pragma pack(1) - -struct CompactDiskLocation -{ - uint16_t BlockIndex; - uint32_t Offset; - uint32_t Size; -}; - -struct CompactDiskIndexEntry -{ - static const uint8_t kTombstone = 0x01; - - IoHash Key; - CompactDiskLocation Location; - ZenContentType ContentType = ZenContentType::kUnknownContentType; - uint8_t Flags = 0; -}; - -#pragma pack(pop) - -static_assert(sizeof(CompactDiskIndexEntry) == 32); - -class ChunkBundler final -{ -public: - ChunkBundler(std::filesystem::path RootDirectory, ChunkBundlerValidator* Validator); - ~ChunkBundler(); - - struct InsertResult - { - bool New = false; - }; - - void Initialize(const std::string_view ContainerBaseName, uint64_t MaxBlockSize, uint64_t Alignment, bool IsNewStore); - InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); - InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); - IoBuffer FindChunk(const IoHash& ChunkHash); - bool HaveChunk(const IoHash& ChunkHash); - void FilterChunks(CasChunkSet& InOutChunks); - void Flush(); - void Scrub(ScrubContext& Ctx); - void CollectGarbage(GcContext& GcCtx); - GcStorageSize StorageSize() const; - -private: - spdlog::logger& Log() { return m_Log; } - - ChunkBundlerValidator* m_Validator; - spdlog::logger& m_Log; - uint64_t m_PayloadAlignment = 1 << 4; - uint64_t m_MaxBlockSize = 1L << 30; // 1 Gb - bool m_IsInitialized = false; - std::filesystem::path RootDirectory; - TCasLogFile<CompactDiskIndexEntry> m_OpLog; - std::filesystem::path m_RootDirectory; - std::string m_ContainerBaseName; - - RwLock m_LocationMapLock; - std::unordered_map<IoHash, CompactDiskLocation, IoHash::Hasher> m_LocationMap; - std::unordered_map<uint16_t, std::shared_ptr<BasicFile>> m_OpenBlocks; - uint16_t m_CurrentBlockIndex = 0; - - RwLock m_InsertLock; // used to serialize inserts - std::weak_ptr<BasicFile> m_CurrentBlock; - std::atomic_uint32_t m_CurrentInsertOffset{}; - - std::atomic_uint64_t m_CurrentIndexOffset{}; - std::atomic_uint64_t m_TotalSize{}; - - void MakeIndexSnapshot(); -}; - -////////////////////////////////////////////////////////////////////////// - -void chunkbundler_forcelink(); - -} // namespace zen diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 91ee232fe..18dcc7fc8 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -2,27 +2,18 @@ #include <zenstore/cas.h> -#include "compactcas.h" - -#include <zencore/compactbinarybuilder.h> -#include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> -#include <zencore/memory.h> -#include <zencore/string.h> #include <zencore/testing.h> -#include <zencore/testutils.h> -#include <zencore/thread.h> -#include <zencore/uid.h> - #include <zenstore/gc.h> - -#include <filesystem> -#include <functional> #include <gsl/gsl-lite.hpp> +#include "compactcas.h" #if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +# include <zencore/testutils.h> +# include <zenstore/cidstore.h> # include <algorithm> # include <random> #endif @@ -31,11 +22,16 @@ namespace zen { -static uint64_t -AlignPositon(uint64_t Offset, uint64_t Alignment) -{ - return (Offset + Alignment - 1) & ~(Alignment - 1); -} +namespace { + uint64_t AlignPositon(uint64_t Offset, uint64_t Alignment) { return (Offset + Alignment - 1) & ~(Alignment - 1); } + + std::filesystem::path BuildUcasPath(const std::filesystem::path& RootDirectory, + const std::string_view ContainerBaseName, + const uint16_t BlockIndex) + { + return RootDirectory / (std::string(ContainerBaseName) + "." + (std::to_string(BlockIndex) + ".ucas")); + } +} // namespace CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc) : GcStorage(Gc) @@ -49,13 +45,15 @@ CasContainerStrategy::~CasContainerStrategy() } void -CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore) +CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore) { ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); + ZEN_ASSERT(MaxBlockSize > 0); m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; + m_MaxBlockSize = MaxBlockSize; OpenContainer(IsNewStore); @@ -79,11 +77,32 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const // New entry - const uint64_t InsertOffset = m_CurrentInsertOffset; - m_SmallObjectFile.Write(ChunkData, ChunkSize, InsertOffset); - m_CurrentInsertOffset = AlignPositon(m_CurrentInsertOffset + ChunkSize, m_PayloadAlignment); + uint64_t CurrentBlockSize = m_CurrentBlock.lock()->FileSize(); + if (CurrentBlockSize + m_CurrentInsertOffset > m_MaxBlockSize) + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + uint16_t NewBlockIndex = m_CurrentBlockIndex + 1; + while (m_OpenBlocks.contains(NewBlockIndex)) + { + NewBlockIndex++; + if (NewBlockIndex == m_CurrentBlockIndex) + { + throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); + } + } + m_CurrentBlockIndex = NewBlockIndex; + std::filesystem::path path = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex); + auto SmallObjectFile = std::make_shared<BasicFile>(); + SmallObjectFile->Open(path, true); + m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; + m_CurrentBlock = SmallObjectFile; + m_CurrentInsertOffset = 0; + } + const uint32_t InsertOffset = m_CurrentInsertOffset; + m_CurrentBlock.lock()->Write(ChunkData, ChunkSize, InsertOffset); + m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(InsertOffset + ChunkSize, m_PayloadAlignment)); - const CasDiskLocation Location{InsertOffset, ChunkSize}; + const CasDiskLocation Location{m_CurrentBlockIndex, InsertOffset, static_cast<uint32_t>(ChunkSize)}; CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; RwLock::ExclusiveLockScope __(m_LocationMapLock); @@ -109,7 +128,10 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { const CasDiskLocation& Location = KeyIt->second; - return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize()); + if (auto BlockIt = m_OpenBlocks.find(Location.BlockIndex); BlockIt != m_OpenBlocks.end()) + { + return IoBufferBuilder::MakeFromFileHandle(BlockIt->second->Handle(), Location.Offset, Location.Size); + } } // Not found @@ -121,13 +143,7 @@ bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); - - if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) - { - return true; - } - - return false; + return m_LocationMap.contains(ChunkHash); } void @@ -146,18 +162,15 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) void CasContainerStrategy::Flush() { - RwLock::SharedLockScope _(m_LocationMapLock); + RwLock::SharedLockScope _(m_InsertLock); m_CasLog.Flush(); - m_SmallObjectFile.Flush(); + m_CurrentBlock.lock()->Flush(); } void CasContainerStrategy::Scrub(ScrubContext& Ctx) { - const uint64_t WindowSize = 4 * 1024 * 1024; - uint64_t WindowStart = 0; - uint64_t WindowEnd = WindowSize; - const uint64_t FileSize = m_SmallObjectFile.FileSize(); + const uint64_t WindowSize = 4 * 1024 * 1024; std::vector<CasDiskIndexEntry> BigChunks; std::vector<CasDiskIndexEntry> BadChunks; @@ -171,51 +184,61 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) IoBuffer ReadBuffer{WindowSize}; void* BufferBase = ReadBuffer.MutableData(); - RwLock::SharedLockScope _(m_LocationMapLock); + RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time? + RwLock::SharedLockScope __(m_LocationMapLock); - do + for (const auto& Block : m_OpenBlocks) { - const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); - m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); + uint64_t WindowStart = 0; + uint64_t WindowEnd = WindowSize; + auto& SmallObjectFile = *Block.second; + const uint64_t FileSize = SmallObjectFile.FileSize(); - for (auto& Entry : m_LocationMap) + do { - const uint64_t EntryOffset = Entry.second.GetOffset(); + const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); + SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); - if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) + for (auto& Entry : m_LocationMap) { - const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize(); + const uint64_t EntryOffset = Entry.second.Offset; - if (EntryEnd >= WindowEnd) + if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { - BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + const uint64_t EntryEnd = EntryOffset + Entry.second.Size; - continue; - } + if (EntryEnd >= WindowEnd) + { + BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); - const IoHash ComputedHash = - IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart, - Entry.second.GetSize()); + continue; + } - if (Entry.first != ComputedHash) - { - // Hash mismatch + const IoHash ComputedHash = + IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, + Entry.second.Size); - BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + if (Entry.first != ComputedHash) + { + // Hash mismatch + + BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + } } } - } - WindowStart += WindowSize; - WindowEnd += WindowSize; - } while (WindowStart < FileSize); + WindowStart += WindowSize; + WindowEnd += WindowSize; + } while (WindowStart < FileSize); + } // Deal with large chunks for (const CasDiskIndexEntry& Entry : BigChunks) { IoHashStream Hasher; - m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) { + auto& SmallObjectFile = *m_OpenBlocks[Entry.Location.BlockIndex]; + SmallObjectFile.StreamByteRange(Entry.Location.Offset, Entry.Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); @@ -258,33 +281,48 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { namespace fs = std::filesystem; - // Garbage collection will first remove any chunks that are flushed from the index. - // It then tries to compact the existing small object file, it does this by - // collecting all chunks that should be kept and sort them in position order. - // It then steps from chunk to chunk and checks if there is space to move the last - // chunk before the current chunk. It repeats this until it can't fit the last chunk - // or the last chunk is the current chunk. - // After this it check to see if there is space to move the current chunk closer to - // the preceeding chunk (or beginning of file if there is no preceeding chunk). - // It updates the new write position for any new chunks and rewrites the cas log - // to match the new content of the store. + // It collects all the blocks that we want to delete chunks from. For each such + // block we keep a list of chunks to retain. // - // It currently grabs a full lock during the GC operation but the compacting is - // done gradually and can be stopped after each chunk if the GC operation needs to - // be time limited. This will leave holes in the small object file that will not - // be reclaimed unless a GC operation is executed again, but the state of the - // cas store is intact. + // It will first remove any chunks that are flushed from the m_LocationMap. // - // It is also possible to more fine-grained locking of GC operation when moving - // blocks but that requires more work and additional checking if new blocks are - // added betwen each move of a block. + // It then checks to see if we want to purge any chunks that are in the currently + // active block. If so, we break off the current block and start on a new block, + // otherwise we just let the active block be. + // + // Next it will iterate over all blocks that we want to remove chunks from. + // If the block is empty after removal of chunks we mark the block as pending + // delete - we want to delete it as soon as there are no IoBuffers using the + // block file. + // + // If the block is non-empty we write out the chunks we want to keep to a new + // block file (creating new block files as needed). + // + // We update the index as we complete each new block file. This makes it possible + // to break the GC if we want to limit time for execution. + // + // GC can fairly parallell to regular operation - it will block while figuring + // out which chunks to remove and what blocks to rewrite but the actual + // reading and writing of data to new block files does not block regular operation. + // + // While moving blocks it will do a blocking operation and update the m_LocationMap + // after each new block is written and it will also block when figuring out the + // path to the next new block. + ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); + std::unordered_map<uint64_t, size_t> BlockIndexToKeepChunksMap; + std::vector<std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher>> KeepChunks; + std::vector<IoHash> DeletedChunks; + std::unordered_set<uint16_t> BlocksToReWrite; { RwLock::ExclusiveLockScope _i(m_InsertLock); RwLock::ExclusiveLockScope _l(m_LocationMapLock); - Flush(); + m_CasLog.Flush(); + m_CurrentBlock.lock()->Flush(); + + BlocksToReWrite.reserve(m_OpenBlocks.size()); if (m_LocationMap.empty()) { @@ -297,21 +335,28 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) std::vector<IoHash> TotalChunkHashes; TotalChunkHashes.reserve(m_LocationMap.size()); - for (auto& Entry : m_LocationMap) + for (const auto& Entry : m_LocationMap) { TotalChunkHashes.push_back(Entry.first); + if (BlockIndexToKeepChunksMap.contains(Entry.second.BlockIndex)) + { + continue; + } + BlockIndexToKeepChunksMap[Entry.second.BlockIndex] = KeepChunks.size(); + KeepChunks.resize(KeepChunks.size() + 1); } - std::vector<IoHash> DeletedChunks; - std::vector<IoHash> ChunkHashes; // Same sort order as ChunkLocations - ChunkHashes.reserve(m_LocationMap.size()); - - const bool CollectSmallObjects = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + uint64_t NewTotalSize = 0; GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { if (Keep) { - ChunkHashes.push_back(ChunkHash); + auto KeyIt = m_LocationMap.find(ChunkHash); + const auto& ChunkLocation = KeyIt->second; + auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[ChunkLocation.BlockIndex]]; + ChunkMap[ChunkHash] = ChunkLocation; + NewTotalSize += ChunkLocation.Size; } else { @@ -319,162 +364,172 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) } }); - if (ChunkHashes.size() == TotalChunkCount) - { - ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete", - TotalChunkCount, - NiceBytes(TotalSize), - m_Config.RootDirectory / m_ContainerBaseName); - return; - } - - const uint64_t ChunkCount = ChunkHashes.size(); - - std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { - auto LhsKeyIt = m_LocationMap.find(Lhs); - auto RhsKeyIt = m_LocationMap.find(Rhs); - return LhsKeyIt->second.GetOffset() < RhsKeyIt->second.GetOffset(); - }); - - uint64_t NewTotalSize = 0; - std::vector<CasDiskLocation> ChunkLocations; - ChunkLocations.reserve(ChunkHashes.size()); - for (auto Entry : ChunkHashes) - { - auto KeyIt = m_LocationMap.find(Entry); - const auto& ChunkLocation = KeyIt->second; - ChunkLocations.push_back(ChunkLocation); - NewTotalSize += ChunkLocation.GetSize(); - } - - if (!CollectSmallObjects) + if (!PerformDelete) { ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", m_Config.RootDirectory / m_ContainerBaseName, - TotalChunkCount - ChunkCount, + DeletedChunks.size(), NiceBytes(TotalSize - NewTotalSize), TotalChunkCount, NiceBytes(TotalSize)); return; } - for (auto ChunkHash : DeletedChunks) + for (const auto& ChunkHash : DeletedChunks) { - auto KeyIt = m_LocationMap.find(ChunkHash); - const auto& ChunkLocation = KeyIt->second; - uint64_t NextChunkOffset = ChunkLocation.GetOffset() + ChunkLocation.GetSize(); + auto KeyIt = m_LocationMap.find(ChunkHash); + const auto& ChunkLocation = KeyIt->second; + BlocksToReWrite.insert(ChunkLocation.BlockIndex); m_CasLog.Append({.Key = ChunkHash, .Location = ChunkLocation, .Flags = CasDiskIndexEntry::kTombstone}); m_LocationMap.erase(ChunkHash); - if (m_CurrentInsertOffset == NextChunkOffset) + m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.Size)); + } + + if (BlocksToReWrite.contains(m_CurrentBlockIndex)) + { + uint16_t NewBlockIndex = m_CurrentBlockIndex + 1; + while (m_OpenBlocks.contains(NewBlockIndex)) { - m_CurrentInsertOffset = ChunkLocation.GetOffset(); + NewBlockIndex++; + if (NewBlockIndex == m_CurrentBlockIndex) + { + ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", + m_ContainerBaseName, + std::numeric_limits<uint16_t>::max() + 1); + return; + } } - m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.GetSize())); + m_CurrentBlockIndex = NewBlockIndex; + std::filesystem::path path = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex); + auto SmallObjectFile = std::make_shared<BasicFile>(); + SmallObjectFile->Open(path, true); + m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; + m_CurrentBlock = SmallObjectFile; + m_CurrentInsertOffset = 0; } + } - // We can break here if we only want to remove items without compacting of space + // Move all chunks in blocks that have chunks removed to new blocks - std::vector<IoHash> MovedChunks; + std::shared_ptr<BasicFile> NewBlockFile; + uint64_t WriteOffset = {}; + uint16_t NewBlockIndex = {}; + std::unordered_map<IoHash, CasDiskLocation> MovedBlocks; - uint64_t WriteOffset{}; - uint64_t ChunkIndex{}; - while (ChunkIndex < ChunkHashes.size()) + for (auto BlockIndex : BlocksToReWrite) + { + auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[BlockIndex]]; + if (ChunkMap.empty()) { - IoHash ChunkHash = ChunkHashes[ChunkIndex]; - const auto& ChunkLocation = ChunkLocations[ChunkIndex]; - - uint64_t NextChunkOffset = AlignPositon(ChunkLocation.GetOffset() + ChunkLocation.GetSize(), m_PayloadAlignment); + // The block has no references to it, it should be removed as soon as no references is held on the file + // TODO: We currently don't know if someone is holding a IoBuffer for this block at this point! + + // std::filesystem::path BlockPath = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, BlockIndex); + // RwLock::ExclusiveLockScope _i(m_InsertLock); + // auto BlockFile = m_OpenBlocks[BlockIndex]; + // m_OpenBlocks.erase(BlockIndex); + // BlockFile->Close(); + // fs::remove(BlockPath); + continue; + } - uint64_t FreeChunkSize = ChunkLocation.GetOffset() - WriteOffset; + std::shared_ptr<BasicFile> BlockFile; + { + RwLock::ExclusiveLockScope _i(m_InsertLock); + BlockFile = m_OpenBlocks[BlockIndex]; + } - // TODO: We could keep some wiggle room here, only try to find the last keep block if there is a reasonable amount of space free - while (FreeChunkSize >= m_PayloadAlignment) + { + std::vector<uint8_t> Chunk; + for (auto& Entry : ChunkMap) { - // We should move as many keep chunk at the end as we can possibly fit - uint64_t LastKeepChunkIndex = ChunkHashes.size() - 1; - if (LastKeepChunkIndex == ChunkIndex) - { - break; - } + const CasDiskLocation& ChunkLocation = Entry.second; + Chunk.resize(ChunkLocation.Size); + BlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - IoHash LastChunkHash = ChunkHashes[LastKeepChunkIndex]; - const auto& LastChunkLocation = ChunkLocations[LastKeepChunkIndex]; - if (LastChunkLocation.GetSize() > FreeChunkSize) + if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) { - break; - } - - // Move the last chunk to our write location - std::vector<uint8_t> Chunk; - Chunk.resize(LastChunkLocation.GetSize()); - m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), LastChunkLocation.GetOffset()); - CasDiskLocation NewChunkLocation(WriteOffset, Chunk.size()); - m_SmallObjectFile.Write(Chunk.data(), Chunk.size(), NewChunkLocation.GetOffset()); + { + RwLock::ExclusiveLockScope _i(m_InsertLock); + if (NewBlockFile) + { + m_OpenBlocks[NewBlockIndex] = NewBlockFile; + RwLock::ExclusiveLockScope _l(m_LocationMapLock); + for (const auto& MovedEntry : MovedBlocks) + { + m_LocationMap[MovedEntry.first] = MovedEntry.second; + m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); + } + } + NewBlockIndex = m_CurrentBlockIndex + 1; + while (m_OpenBlocks.contains(NewBlockIndex)) + { + NewBlockIndex++; + if (NewBlockIndex == m_CurrentBlockIndex) + { + ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", + m_ContainerBaseName, + std::numeric_limits<uint16_t>::max() + 1); + return; + } + } + m_OpenBlocks[NewBlockIndex] = std::shared_ptr<BasicFile>(); // Make sure nobody steals this slot + } - CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = NewChunkLocation}; - m_CasLog.Append(IndexEntry); - m_LocationMap[LastChunkHash] = NewChunkLocation; - ChunkHashes.pop_back(); + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); + if (Error) + { + ZEN_ERROR("get disk space in {} FAILED, reason '{}'", m_ContainerBaseName, Error.message()); + return; + } - WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment); - FreeChunkSize = ChunkLocation.GetOffset() - WriteOffset; - MovedChunks.push_back(LastChunkHash); + if (Space.Free < (m_MaxBlockSize * 2)) // Never let GC steal the last block space + { + ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", + m_Config.RootDirectory / m_ContainerBaseName, + m_MaxBlockSize * m_MaxBlockSize, + NiceBytes(Space.Free)); + RwLock::ExclusiveLockScope _i(m_InsertLock); + m_OpenBlocks.erase(NewBlockIndex); + return; + } - uint64_t LastChunkNextChunkOffset = AlignPositon(LastChunkLocation.GetOffset() + Chunk.size(), m_PayloadAlignment); - if (m_CurrentInsertOffset == LastChunkNextChunkOffset) - { - m_CurrentInsertOffset = LastChunkLocation.GetOffset(); + std::filesystem::path NewBlockPath = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, NewBlockIndex); + NewBlockFile = std::make_shared<BasicFile>(); + NewBlockFile->Open(NewBlockPath, true); + MovedBlocks.clear(); + WriteOffset = 0; } - } - // TODO: We could keep some wiggle room here, don't move chunk if we only move it a very small amount - if (FreeChunkSize > m_PayloadAlignment) - { - std::vector<uint8_t> Chunk; - Chunk.resize(ChunkLocation.GetSize()); - m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), ChunkLocation.GetOffset()); - CasDiskLocation NewChunkLocation(WriteOffset, Chunk.size()); - m_SmallObjectFile.Write(Chunk.data(), Chunk.size(), NewChunkLocation.GetOffset()); - - CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = NewChunkLocation}; - m_CasLog.Append(IndexEntry); - m_LocationMap[ChunkHash] = NewChunkLocation; - - MovedChunks.push_back(ChunkHash); - WriteOffset = AlignPositon(NewChunkLocation.GetOffset() + Chunk.size(), m_PayloadAlignment); - } - else - { - WriteOffset = NextChunkOffset; + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + CasDiskLocation NewChunkLocation(NewBlockIndex, gsl::narrow<uint32_t>(WriteOffset), gsl::narrow<uint32_t>(Chunk.size())); + Entry.second = {.BlockIndex = NewBlockIndex, + .Offset = gsl::narrow<uint32_t>(WriteOffset), + .Size = gsl::narrow<uint32_t>(Chunk.size())}; + MovedBlocks[Entry.first] = Entry.second; + WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment); } + Chunk.clear(); - // Update insert location if this is the last chunk in the file - if (m_CurrentInsertOffset == NextChunkOffset) + // Remap moved chunks to the new block file + RwLock::ExclusiveLockScope _i(m_InsertLock); + if (NewBlockFile) { - m_CurrentInsertOffset = WriteOffset; + m_OpenBlocks[NewBlockIndex] = NewBlockFile; + RwLock::ExclusiveLockScope _l(m_LocationMapLock); + for (const auto& MovedEntry : MovedBlocks) + { + m_LocationMap[MovedEntry.first] = MovedEntry.second; + m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); + } } - - // We can break here if we want to do incremental GC - - ChunkIndex++; - } - - if (ChunkCount == 0) - { - m_CurrentInsertOffset = 0; } + } - GcCtx.DeletedCas(DeletedChunks); + GcCtx.DeletedCas(DeletedChunks); - uint64_t CurrentSize = m_SmallObjectFile.FileSize(); - ZEN_INFO("garbage collection complete '{}', space {} to {}, moved {} and delete {} chunks", - m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(CurrentSize), - NiceBytes(m_CurrentInsertOffset), - MovedChunks.size(), - DeletedChunks.size()); - // TODO: Should we truncate the file or just keep the size of the file and reuse the space? - } + ZEN_INFO("garbage collection complete '{}', deleted {} chunks", m_Config.RootDirectory / m_ContainerBaseName, DeletedChunks.size()); MakeIndexSnapshot(); } @@ -592,16 +647,11 @@ CasContainerStrategy::MakeIndexSnapshot() void CasContainerStrategy::OpenContainer(bool IsNewStore) { - std::filesystem::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas"); - std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); - - m_SmallObjectFile.Open(SobsPath, IsNewStore); - m_CasLog.Open(SlogPath, IsNewStore); + // TODO: Pick up old Cas store format so we can use it in our store - // TODO: should validate integrity of container files here + std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); - m_CurrentInsertOffset = 0; - m_TotalSize = 0; + m_TotalSize = 0; m_LocationMap.clear(); @@ -621,6 +671,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) SmallObjectIndex.Close(); } + m_CasLog.Open(SlogPath, IsNewStore); m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { if (Record.Flags & CasDiskIndexEntry::kTombstone) { @@ -632,28 +683,104 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) } }); - uint64_t MaxFileOffset = 0; + std::unordered_set<uint16_t> ReferencedBlockIndexes; for (const auto& Entry : m_LocationMap) { const auto& Location = Entry.second; - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Location.GetOffset() + Location.GetSize()); - m_TotalSize.fetch_add(Location.GetSize()); + m_TotalSize.fetch_add(Location.Size); + ReferencedBlockIndexes.insert(Location.BlockIndex); + } + + uint32_t SmallestBlockSize = 0xffffffffu; + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.RootDirectory)) + { + if (Entry.is_regular_file()) + { + // TODO: Clean up naming/storage structure so we don't have to do this complicated parsing to find our ucas files + if (Entry.path().extension() != ".ucas") + { + continue; + } + std::string FileName = Entry.path().stem().string(); + if (!FileName.starts_with(m_ContainerBaseName)) + { + continue; + } + if (IsNewStore) + { + std::filesystem::remove(Entry.path()); + continue; + } + try + { + uint16_t BlockIndex = static_cast<uint16_t>(std::stoi(FileName.substr(m_ContainerBaseName.length() + 1))); + if (!ReferencedBlockIndexes.contains(BlockIndex)) + { + // Clear out unused blocks + std::filesystem::remove(Entry.path()); + continue; + } + auto SmallObjectFile = std::make_shared<BasicFile>(); + SmallObjectFile->Open(Entry.path(), false); + m_OpenBlocks[BlockIndex] = SmallObjectFile; + if (SmallObjectFile->FileSize() < SmallestBlockSize) + { + m_CurrentBlockIndex = BlockIndex; + SmallestBlockSize = gsl::narrow<std::uint32_t>(SmallObjectFile->FileSize()); + } + } + catch (const std::invalid_argument&) + { + // Non-valid file, skip it (or should we remove it?) + } + } + } + if (m_OpenBlocks.empty()) + { + std::filesystem::path path = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex); + auto SmallObjectFile = std::make_shared<BasicFile>(); + SmallObjectFile->Open(path, true); + m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; + m_CurrentBlock = SmallObjectFile; + m_CurrentInsertOffset = 0; + } + else + { + m_CurrentBlock = m_OpenBlocks[m_CurrentBlockIndex]; + m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(m_CurrentBlock.lock()->FileSize(), m_PayloadAlignment)); } - m_CurrentInsertOffset = AlignPositon(MaxFileOffset, m_PayloadAlignment); + // TODO: should validate integrity of container files here } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS +namespace { + static IoBuffer CreateChunk(uint64_t Size) + { + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); + } +} // namespace + TEST_CASE("cas.compact.gc") { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); const int kIterationCount = 1000; @@ -663,7 +790,7 @@ TEST_CASE("cas.compact.gc") { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 16, true); + Cas.Initialize("test", 65536, 16, true); for (int i = 0; i < kIterationCount; ++i) { @@ -697,7 +824,7 @@ TEST_CASE("cas.compact.gc") { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 16, false); + Cas.Initialize("test", 65536, 16, false); for (int i = 0; i < kIterationCount; ++i) { @@ -720,19 +847,6 @@ TEST_CASE("cas.compact.totalsize") std::random_device rd; std::mt19937 g(rd()); - const auto CreateChunk = [&](uint64_t Size) -> IoBuffer { - const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t)); - std::vector<uint32_t> Values; - Values.resize(Count); - for (size_t Idx = 0; Idx < Count; ++Idx) - { - Values[Idx] = static_cast<uint32_t>(Idx); - } - std::shuffle(Values.begin(), Values.end(), g); - - return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t)); - }; - ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; @@ -746,7 +860,7 @@ TEST_CASE("cas.compact.totalsize") { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 16, true); + Cas.Initialize("test", 65536, 16, true); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { @@ -763,13 +877,385 @@ TEST_CASE("cas.compact.totalsize") { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 16, false); + Cas.Initialize("test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } } +TEST_CASE("cas.gc.basic") +{ + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("cb", 65536, 1 << 4, true); + + IoBuffer Chunk = CreateChunk(128); + IoHash ChunkHash = IoHash::HashBuffer(Chunk); + + const auto InsertResult = Cas.InsertChunk(Chunk, ChunkHash); + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHash)); +} + +TEST_CASE("cas.gc.compact") +{ + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("cb", 2048, 1 << 4, true); + + uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(9); + for (const auto& Size : ChunkSizes) + { + Chunks.push_back(CreateChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(9); + for (const auto& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New); + CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New); + CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New); + CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New); + CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New); + CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New); + CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New); + CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New); + CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(Cas.HaveChunk(ChunkHashes[1])); + CHECK(Cas.HaveChunk(ChunkHashes[2])); + CHECK(Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + auto InitialSize = Cas.StorageSize().DiskSize; + + // Keep first and last + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + Cas.CollectGarbage(GcCtx); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + } + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + + // Keep last + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Keep mixed + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[1]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[7]); + GcCtx.ContributeCas(KeepChunks); + + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(!Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + + Cas.InsertChunk(Chunks[0], ChunkHashes[0]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[8], ChunkHashes[8]); + } + + // Keep multiple at end + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[7]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[0], ChunkHashes[0]); + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + } + + // Keep every other + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[2]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + Cas.CollectGarbage(GcCtx); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Verify that we nicely appended blocks even after all GC operations + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + auto FinalSize = Cas.StorageSize().DiskSize; + CHECK(InitialSize == FinalSize); +} + +TEST_CASE("cas.gc.deleteblockonopen") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(20); + for (const auto& Size : ChunkSizes) + { + Chunks.push_back(CreateChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(20); + for (const auto& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 1024, 16, true); + + for (size_t i = 0; i < 20; i++) + { + CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + // GC every other block + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + for (size_t i = 0; i < 20; i += 2) + { + KeepChunks.push_back(ChunkHashes[i]); + } + GcCtx.ContributeCas(KeepChunks); + + Cas.CollectGarbage(GcCtx); + + for (size_t i = 0; i < 20; i += 2) + { + CHECK(Cas.HaveChunk(ChunkHashes[i])); + CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); + } + } + } + { + // Re-open + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 1024, 16, false); + + for (size_t i = 0; i < 20; i += 2) + { + CHECK(Cas.HaveChunk(ChunkHashes[i])); + CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); + } + } +} + +TEST_CASE("cas.gc.handleopeniobuffer") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(20); + for (const auto& Size : ChunkSizes) + { + Chunks.push_back(CreateChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(20); + for (const auto& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 1024, 16, true); + + for (size_t i = 0; i < 20; i++) + { + CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + auto RetainChunk = Cas.FindChunk(ChunkHashes[5]); + + // GC everything + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + Cas.CollectGarbage(GcCtx); + + for (size_t i = 0; i < 20; i++) + { + CHECK(!Cas.HaveChunk(ChunkHashes[i])); + } + + CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); +} #endif void diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h index 204482534..35f118f34 100644 --- a/zenstore/compactcas.h +++ b/zenstore/compactcas.h @@ -3,21 +3,10 @@ #pragma once #include <zencore/zencore.h> - -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> -#include <zencore/string.h> -#include <zencore/thread.h> -#include <zencore/uid.h> #include <zenstore/basicfile.h> -#include <zenstore/cas.h> #include <zenstore/caslog.h> #include <zenstore/gc.h> -#if ZEN_PLATFORM_WINDOWS -# include <zencore/windows.h> -#endif - #include <atomic> #include <unordered_map> @@ -34,34 +23,9 @@ namespace zen { struct CasDiskLocation { - CasDiskLocation(uint64_t InOffset, uint64_t InSize) - { - ZEN_ASSERT(InOffset <= 0xff'ffff'ffff); - ZEN_ASSERT(InSize <= 0xff'ffff'ffff); - - memcpy(&m_Offset[0], &InOffset, sizeof m_Offset); - memcpy(&m_Size[0], &InSize, sizeof m_Size); - } - - CasDiskLocation() = default; - - inline uint64_t GetOffset() const - { - uint64_t Offset = 0; - memcpy(&Offset, &m_Offset, sizeof m_Offset); - return Offset; - } - - inline uint64_t GetSize() const - { - uint64_t Size = 0; - memcpy(&Size, &m_Size, sizeof m_Size); - return Size; - } - -private: - uint8_t m_Offset[5]; - uint8_t m_Size[5]; + uint16_t BlockIndex; + uint32_t Offset; + uint32_t Size; }; struct CasDiskIndexEntry @@ -96,7 +60,7 @@ struct CasContainerStrategy final : public GcStorage IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); void FilterChunks(CasChunkSet& InOutChunks); - void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore); + void Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore); void Flush(); void Scrub(ScrubContext& Ctx); virtual void CollectGarbage(GcContext& GcCtx) override; @@ -109,16 +73,20 @@ private: const CasStoreConfiguration& m_Config; spdlog::logger& m_Log; uint64_t m_PayloadAlignment = 1 << 4; + uint64_t m_MaxBlockSize = 1L << 30; // 1 Gb bool m_IsInitialized = false; - BasicFile m_SmallObjectFile; TCasLogFile<CasDiskIndexEntry> m_CasLog; std::string m_ContainerBaseName; RwLock m_LocationMapLock; std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher> m_LocationMap; - RwLock m_InsertLock; // used to serialize inserts - std::atomic_uint64_t m_CurrentInsertOffset{}; - std::atomic_uint64_t m_TotalSize{}; + + RwLock m_InsertLock; // used to serialize inserts + std::unordered_map<uint16_t, std::shared_ptr<BasicFile>> m_OpenBlocks; + std::weak_ptr<BasicFile> m_CurrentBlock; + uint16_t m_CurrentBlockIndex = 0; + std::atomic_uint32_t m_CurrentInsertOffset{}; + std::atomic_uint64_t m_TotalSize{}; void MakeIndexSnapshot(); }; |