diff options
| -rw-r--r-- | zenstore/chunkbundler.cpp | 689 |
1 files changed, 398 insertions, 291 deletions
diff --git a/zenstore/chunkbundler.cpp b/zenstore/chunkbundler.cpp index 2c1924303..5b9242eec 100644 --- a/zenstore/chunkbundler.cpp +++ b/zenstore/chunkbundler.cpp @@ -26,18 +26,6 @@ AlignPositon(uint64_t Offset, uint64_t Alignment) return (Offset + Alignment - 1) & ~(Alignment - 1); } -template<typename T> -class reverse -{ -private: - T& iterable_; - -public: - explicit reverse(T& iterable) : iterable_{iterable} {} - auto begin() const { return std::rbegin(iterable_); } - auto end() const { return std::rend(iterable_); } -}; - ChunkBundler::ChunkBundler(std::filesystem::path RootDirectory, ChunkBundlerValidator* Validator) : m_Validator(Validator) , m_Log(logging::Get("chunkbundler")) @@ -63,51 +51,7 @@ ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t MaxB std::filesystem::path SobsPath = m_RootDirectory / (m_ContainerBaseName + ".ucas"); std::filesystem::path SlogPath = m_RootDirectory / (m_ContainerBaseName + ".ulog"); - CreateDirectories(m_RootDirectory / "ucas"); - for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_RootDirectory / "ucas")) - { - if (Entry.is_regular_file()) - { - if (IsNewStore) - { - std::filesystem::remove(Entry.path()); - continue; - } - if (Entry.path().extension() == ".ucas") - { - try - { - std::string stem = Entry.path().stem().string(); - uint16_t fileIndex = static_cast<uint16_t>(std::stoi(stem)); - auto SmallObjectFile = std::make_shared<BasicFile>(); - SmallObjectFile->Open(Entry.path(), false); - m_OpenBlocks[fileIndex] = SmallObjectFile; - m_CurrentFileIndex = std::max<uint16_t>(m_CurrentFileIndex, fileIndex); - } - catch(const std::invalid_argument&) - { - // Non-valid file, skip it - } - } - } - } - if (m_OpenBlocks.empty()) - { - std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentFileIndex) + ".ucas"); - auto SmallObjectFile = std::make_shared<BasicFile>(); - SmallObjectFile->Open(path, true); - m_OpenBlocks[m_CurrentFileIndex] = SmallObjectFile; - m_CurrentInsertOffset = 0; - } - - m_CurrentBlock = m_OpenBlocks[m_CurrentFileIndex]; - m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(m_CurrentBlock.lock()->FileSize(), m_PayloadAlignment)); - m_OpLog.Open(SlogPath, IsNewStore); - - // TODO: should validate integrity of container files here - - m_CurrentInsertOffset = 0; - m_TotalSize = 0; + m_TotalSize = 0; m_LocationMap.clear(); @@ -127,6 +71,7 @@ ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t MaxB SmallObjectIndex.Close(); } + m_OpLog.Open(SlogPath, IsNewStore); m_OpLog.Replay([&](const CompactDiskIndexEntry& Record) { if (Record.Flags & CompactDiskIndexEntry::kTombstone) { @@ -138,12 +83,71 @@ ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t MaxB } }); + std::unordered_set<uint16_t> ReferencedBlockIndexes; for (const auto& Entry : m_LocationMap) { const auto& Location = Entry.second; m_TotalSize.fetch_add(Location.Size); + ReferencedBlockIndexes.insert(Location.BlockIndex); + } + + uint32_t SmallestBlockSize = 0xffffffffu; + CreateDirectories(m_RootDirectory / "ucas"); + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_RootDirectory / "ucas")) + { + if (Entry.is_regular_file()) + { + if (IsNewStore) + { + std::filesystem::remove(Entry.path()); + continue; + } + if (Entry.path().extension() == ".ucas") + { + try + { + std::string FileName = Entry.path().stem().string(); + uint16_t BlockIndex = static_cast<uint16_t>(std::stoi(FileName)); + if (!ReferencedBlockIndexes.contains(BlockIndex)) + { + // Clear out unused blocks + std::filesystem::remove(Entry.path()); + continue; + } + auto SmallObjectFile = std::make_shared<BasicFile>(); + SmallObjectFile->Open(Entry.path(), false); + m_OpenBlocks[BlockIndex] = SmallObjectFile; + if (SmallObjectFile->FileSize() < SmallestBlockSize) + { + m_CurrentBlockIndex = BlockIndex; + SmallestBlockSize = gsl::narrow<std::uint32_t>(SmallObjectFile->FileSize()); + } + } + catch (const std::invalid_argument&) + { + // Non-valid file, skip it (or should we remove it?) + } + } + } + } + if (m_OpenBlocks.empty()) + { + std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentBlockIndex) + ".ucas"); + auto SmallObjectFile = std::make_shared<BasicFile>(); + SmallObjectFile->Open(path, true); + m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; + m_CurrentBlock = SmallObjectFile; + m_CurrentInsertOffset = 0; + + } + else + { + m_CurrentBlock = m_OpenBlocks[m_CurrentBlockIndex]; + m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(m_CurrentBlock.lock()->FileSize(), m_PayloadAlignment)); } + // TODO: should validate integrity of container files here + m_IsInitialized = true; } @@ -168,19 +172,28 @@ ChunkBundler::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& if (CurrentBlockSize + m_CurrentInsertOffset > m_MaxBlockSize) { RwLock::ExclusiveLockScope __(m_LocationMapLock); - m_CurrentFileIndex++; - std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentFileIndex) + ".ucas"); + uint16_t NewBlockIndex = m_CurrentBlockIndex + 1; + while (m_OpenBlocks.contains(NewBlockIndex)) + { + NewBlockIndex++; + if (NewBlockIndex == m_CurrentBlockIndex) + { + throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); + } + } + m_CurrentBlockIndex = NewBlockIndex; + std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentBlockIndex) + ".ucas"); auto SmallObjectFile = std::make_shared<BasicFile>(); SmallObjectFile->Open(path, true); - m_OpenBlocks[m_CurrentFileIndex] = SmallObjectFile; - m_CurrentBlock = SmallObjectFile; - m_CurrentInsertOffset = 0; + m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; + m_CurrentBlock = SmallObjectFile; + m_CurrentInsertOffset = 0; } const uint32_t InsertOffset = m_CurrentInsertOffset; m_CurrentBlock.lock()->Write(ChunkData, ChunkSize, InsertOffset); m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(InsertOffset + ChunkSize, m_PayloadAlignment)); - const CompactDiskLocation Location{m_CurrentFileIndex, InsertOffset, static_cast<uint32_t>(ChunkSize)}; + const CompactDiskLocation Location{m_CurrentBlockIndex, InsertOffset, static_cast<uint32_t>(ChunkSize)}; CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; RwLock::ExclusiveLockScope __(m_LocationMapLock); @@ -249,7 +262,7 @@ ChunkBundler::Flush() void ChunkBundler::Scrub(ScrubContext& Ctx) { - const uint64_t WindowSize = 4 * 1024 * 1024; + const uint64_t WindowSize = 4 * 1024 * 1024; std::vector<CompactDiskIndexEntry> BigChunks; std::vector<CompactDiskIndexEntry> BadChunks; @@ -267,8 +280,8 @@ ChunkBundler::Scrub(ScrubContext& Ctx) for (const auto& Block : m_OpenBlocks) { - uint64_t WindowStart = 0; - uint64_t WindowEnd = WindowSize; + uint64_t WindowStart = 0; + uint64_t WindowEnd = WindowSize; auto& SmallObjectFile = *Block.second; const uint64_t FileSize = SmallObjectFile.FileSize(); @@ -292,11 +305,11 @@ ChunkBundler::Scrub(ScrubContext& Ctx) continue; } - if (m_Validator && !m_Validator->ValidateChunk( - IoBuffer(IoBuffer::Wrap, - reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, - Entry.second.Size), - Entry.first)) + if (m_Validator && + !m_Validator->ValidateChunk(IoBuffer(IoBuffer::Wrap, + reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, + Entry.second.Size), + Entry.first)) { // Hash mismatch BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); @@ -327,7 +340,7 @@ ChunkBundler::Scrub(ScrubContext& Ctx) } */ } -#endif // 0 +#endif // 0 } if (BadChunks.empty()) @@ -361,31 +374,40 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx) { namespace fs = std::filesystem; - // Garbage collection will first remove any chunks that are flushed from the index. - // It then tries to compact the existing small object file, it does this by - // collecting all chunks that should be kept and sort them in position order. - // It then steps from chunk to chunk and checks if there is space to move the last - // chunk before the current chunk. It repeats this until it can't fit the last chunk - // or the last chunk is the current chunk. - // After this it check to see if there is space to move the current chunk closer to - // the preceeding chunk (or beginning of file if there is no preceeding chunk). - // It updates the new write position for any new chunks and rewrites the cas log - // to match the new content of the store. + // It collects all the blocks that we want to delete chunks from. For each such + // block we keep a list of chunks to retain. + // + // It will first remove any chunks that are flushed from the m_LocationMap. + // + // It then checks to see if we want to purge any chunks that are in the currently + // active block. If so, we break off the current block and start on a new block, + // otherwise we just let the active block be. // - // It currently grabs a full lock during the GC operation but the compacting is - // done gradually and can be stopped after each chunk if the GC operation needs to - // be time limited. This will leave holes in the small object file that will not - // be reclaimed unless a GC operation is executed again, but the state of the - // cas store is intact. + // Next it will iterate over all blocks that we want to remove chunks from. + // If the block is empty after removal of chunks we mark the block as pending + // delete - we want to delete it as soon as there are no IoBuffers using the + // block file. // - // It is also possible to more fine-grained locking of GC operation when moving - // blocks but that requires more work and additional checking if new blocks are - // added betwen each move of a block. + // If the block is non-empty we write out the chunks we want to keep to a new + // block file (creating new block files as needed). + // + // We update the index as we complete each new block file. This makes it possible + // to break the GC if we want to limit time for execution. + // + // GC can fairly parallell to regular operation - it will block while figuring + // out which chunks to remove and what blocks to rewrite but the actual + // reading and writing of data to new block files does not block regular operation. + // + // While moving blocks it will do a blocking operation and update the m_LocationMap + // after each new block is written and it will also block when figuring out the + // path to the next new block. + ZEN_INFO("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName); - std::unordered_map<uint16_t, std::unordered_map<IoHash, CompactDiskLocation, IoHash::Hasher>> KeepChunksPerBlock; - std::vector<IoHash> DeletedChunks; - std::unordered_set<uint16_t> BlocksToReWrite; + std::unordered_map<uint64_t, size_t> BlockIndexToKeepChunksMap; + std::vector<std::unordered_map<IoHash, CompactDiskLocation, IoHash::Hasher>> KeepChunks; + std::vector<IoHash> DeletedChunks; + std::unordered_set<uint16_t> BlocksToReWrite; { RwLock::ExclusiveLockScope _i(m_InsertLock); RwLock::ExclusiveLockScope _l(m_LocationMapLock); @@ -409,12 +431,15 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx) for (const auto& Entry : m_LocationMap) { TotalChunkHashes.push_back(Entry.first); + if (BlockIndexToKeepChunksMap.contains(Entry.second.BlockIndex)) + { + continue; + } + BlockIndexToKeepChunksMap[Entry.second.BlockIndex] = KeepChunks.size(); + KeepChunks.resize(KeepChunks.size() + 1); } - //std::vector<IoHash> ChunkHashes; // Same sort order as ChunkLocations - //ChunkHashes.reserve(m_LocationMap.size()); - - const bool CollectSmallObjects = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); uint64_t NewTotalSize = 0; GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { @@ -422,7 +447,8 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx) { auto KeyIt = m_LocationMap.find(ChunkHash); const auto& ChunkLocation = KeyIt->second; - KeepChunksPerBlock[ChunkLocation.BlockIndex][ChunkHash] = ChunkLocation; + auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[ChunkLocation.BlockIndex]]; + ChunkMap[ChunkHash] = ChunkLocation; NewTotalSize += ChunkLocation.Size; } else @@ -430,36 +456,8 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx) DeletedChunks.push_back(ChunkHash); } }); - /* - if (ChunkHashes.size() == TotalChunkCount) - { - ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete", - TotalChunkCount, - NiceBytes(TotalSize), - m_RootDirectory / m_ContainerBaseName); - return; - } - - const uint64_t ChunkCount = ChunkHashes.size(); - - std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { - auto LhsKeyIt = m_LocationMap.find(Lhs); - auto RhsKeyIt = m_LocationMap.find(Rhs); - return LhsKeyIt->second.Offset < RhsKeyIt->second.Offset; - }); - uint64_t NewTotalSize = 0; - std::vector<CompactDiskLocation> ChunkLocations; - ChunkLocations.reserve(ChunkHashes.size()); - for (auto ChunkHash : ChunkHashes) - { - auto KeyIt = m_LocationMap.find(ChunkHash); - const auto& ChunkLocation = KeyIt->second; - ChunkLocations.push_back(ChunkLocation); - NewTotalSize += ChunkLocation.Size; - } - */ - if (!CollectSmallObjects) + if (!PerformDelete) { ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", m_RootDirectory / m_ContainerBaseName, @@ -480,174 +478,151 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx) m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.Size)); } - if (BlocksToReWrite.contains(m_CurrentFileIndex)) - { - m_CurrentFileIndex++; - std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentFileIndex) + ".ucas"); - auto SmallObjectFile = std::make_shared<BasicFile>(); - SmallObjectFile->Open(path, true); - m_OpenBlocks[m_CurrentFileIndex] = SmallObjectFile; - m_CurrentBlock = SmallObjectFile; - m_CurrentInsertOffset = 0; - } - } - - { - // Rewrite all BlocksToReWrite - for (auto BlockIndex : BlocksToReWrite) + if (BlocksToReWrite.contains(m_CurrentBlockIndex)) { - std::shared_ptr<BasicFile> BlockFile; - { - RwLock::ExclusiveLockScope _i(m_InsertLock); - BlockFile = m_OpenBlocks[BlockIndex]; - } - std::filesystem::path BlockPath = m_RootDirectory / "ucas" / (std::to_string(BlockIndex) + ".ucas"); - auto& KeepChunksForBlock = KeepChunksPerBlock[BlockIndex]; - if (KeepChunksForBlock.empty()) - { - RwLock::ExclusiveLockScope _i(m_InsertLock); - BlockFile = m_OpenBlocks[BlockIndex]; - BlockFile->Close(); // TODO: We can't know that someone isn't holding a IoBuffer for this block at this point! - m_OpenBlocks.erase(BlockIndex); - fs::remove(BlockPath); - } - else + uint16_t NewBlockIndex = m_CurrentBlockIndex + 1; + while (m_OpenBlocks.contains(NewBlockIndex)) { - std::filesystem::path TmpBlockPath = m_RootDirectory / "ucas" / (std::to_string(BlockIndex) + ".gc.ucas"); - auto TmpBlock = std::make_shared<BasicFile>(); - TmpBlock->Open(TmpBlockPath, true); - std::vector<uint8_t> Chunk; - uint64_t WriteOffset = 0; - - for (auto& Entry : KeepChunksForBlock) - { - const CompactDiskLocation& ChunkLocation = Entry.second; - Chunk.resize(ChunkLocation.Size); - BlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - CompactDiskLocation NewChunkLocation(ChunkLocation.BlockIndex, - gsl::narrow<uint32_t>(WriteOffset), - gsl::narrow<uint32_t>(Chunk.size())); - TmpBlock->Write(Chunk.data(), Chunk.size(), NewChunkLocation.Offset); - Entry.second = NewChunkLocation; - WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment); - } - TmpBlock->Close(); - - RwLock::ExclusiveLockScope _i(m_InsertLock); - RwLock::ExclusiveLockScope _l(m_LocationMapLock); - for (const auto& Entry : KeepChunksForBlock) + NewBlockIndex++; + if (NewBlockIndex == m_CurrentBlockIndex) { - m_LocationMap[Entry.first] = Entry.second; - m_OpLog.Append({.Key = Entry.first, .Location = Entry.second}); + ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", + m_ContainerBaseName, + std::numeric_limits<uint16_t>::max() + 1); + return; } - BlockFile->Close(); // TODO: We can't know that someone isn't holding a IoBuffer for this block at this point! - fs::remove(BlockPath); - fs::rename(TmpBlockPath, BlockPath); - BlockFile->Open(BlockPath, false); } + m_CurrentBlockIndex = NewBlockIndex; + std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentBlockIndex) + ".ucas"); + auto SmallObjectFile = std::make_shared<BasicFile>(); + SmallObjectFile->Open(path, true); + m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; + m_CurrentBlock = SmallObjectFile; + m_CurrentInsertOffset = 0; } + } -#if 0 - // We can break here if we only want to remove items without compacting of space + // Move all chunks in blocks that have chunks removed to new blocks - std::vector<IoHash> MovedChunks; + std::shared_ptr<BasicFile> NewBlockFile; + uint64_t WriteOffset = {}; + uint16_t NewBlockIndex = {}; + std::unordered_map<IoHash, CompactDiskLocation> MovedBlocks; - uint64_t WriteOffset{}; - uint64_t ChunkIndex{}; - while (ChunkIndex < ChunkHashes.size()) + for (auto BlockIndex : BlocksToReWrite) + { + auto& ChunkMap = KeepChunks[BlockIndexToKeepChunksMap[BlockIndex]]; + if (ChunkMap.empty()) { - IoHash ChunkHash = ChunkHashes[ChunkIndex]; - const auto& ChunkLocation = ChunkLocations[ChunkIndex]; - - uint64_t NextChunkOffset = AlignPositon(ChunkLocation.GetOffset() + ChunkLocation.GetSize(), m_PayloadAlignment); + // The block has no references to it, it should be removed as soon as no references is held on the file + // TODO: We currently don't know if someone is holding a IoBuffer for this block at this point! + + //std::filesystem::path BlockPath = m_RootDirectory / "ucas" / (std::to_string(BlockIndex) + ".ucas"); + //RwLock::ExclusiveLockScope _i(m_InsertLock); + //auto BlockFile = m_OpenBlocks[BlockIndex]; + //m_OpenBlocks.erase(BlockIndex); + //BlockFile->Close(); + //fs::remove(BlockPath); + continue; + } - uint64_t FreeChunkSize = ChunkLocation.GetOffset() - WriteOffset; + std::shared_ptr<BasicFile> BlockFile; + { + RwLock::ExclusiveLockScope _i(m_InsertLock); + BlockFile = m_OpenBlocks[BlockIndex]; + } - // TODO: We could keep some wiggle room here, only try to find the last keep block if there is a reasonable amount of space free - while (FreeChunkSize >= m_PayloadAlignment) + { + std::vector<uint8_t> Chunk; + for (auto& Entry : ChunkMap) { - // We should move as many keep chunk at the end as we can possibly fit - uint64_t LastKeepChunkIndex = ChunkHashes.size() - 1; - if (LastKeepChunkIndex == ChunkIndex) - { - break; - } + const CompactDiskLocation& ChunkLocation = Entry.second; + Chunk.resize(ChunkLocation.Size); + BlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - IoHash LastChunkHash = ChunkHashes[LastKeepChunkIndex]; - const auto& LastChunkLocation = ChunkLocations[LastKeepChunkIndex]; - if (LastChunkLocation.GetSize() > FreeChunkSize) + if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) { - break; - } - - // Move the last chunk to our write location - std::vector<uint8_t> Chunk; - Chunk.resize(LastChunkLocation.GetSize()); - m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), LastChunkLocation.GetOffset()); - CompactDiskLocation NewChunkLocation(WriteOffset, Chunk.size()); - m_SmallObjectFile.Write(Chunk.data(), Chunk.size(), NewChunkLocation.GetOffset()); + { + RwLock::ExclusiveLockScope _i(m_InsertLock); + if (NewBlockFile) + { + m_OpenBlocks[NewBlockIndex] = NewBlockFile; + RwLock::ExclusiveLockScope _l(m_LocationMapLock); + for (const auto& MovedEntry : MovedBlocks) + { + m_LocationMap[MovedEntry.first] = MovedEntry.second; + m_OpLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); + } + } + NewBlockIndex = m_CurrentBlockIndex + 1; + while (m_OpenBlocks.contains(NewBlockIndex)) + { + NewBlockIndex++; + if (NewBlockIndex == m_CurrentBlockIndex) + { + ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", m_ContainerBaseName, std::numeric_limits<uint16_t>::max()+1); + return; + } + } + m_OpenBlocks[NewBlockIndex] = std::shared_ptr<BasicFile>(); // Make sure nobody steals this slot + } - CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = NewChunkLocation}; - m_OpLog.Append(IndexEntry); - m_LocationMap[LastChunkHash] = NewChunkLocation; - ChunkHashes.pop_back(); + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_RootDirectory, Error); + if (Error) + { + ZEN_ERROR("get disk space in {} FAILED, reason '{}'", m_ContainerBaseName, Error.message()); + return; + } - WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment); - FreeChunkSize = ChunkLocation.GetOffset() - WriteOffset; - MovedChunks.push_back(LastChunkHash); + if (Space.Free < (m_MaxBlockSize * 2)) // Never let GC steal the last block space + { + ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", + m_RootDirectory / m_ContainerBaseName, + m_MaxBlockSize * m_MaxBlockSize, + NiceBytes(Space.Free)); + RwLock::ExclusiveLockScope _i(m_InsertLock); + m_OpenBlocks.erase(NewBlockIndex); + return; + } - uint64_t LastChunkNextChunkOffset = AlignPositon(LastChunkLocation.GetOffset() + Chunk.size(), m_PayloadAlignment); - if (m_CurrentInsertOffset == LastChunkNextChunkOffset) - { - m_CurrentInsertOffset = LastChunkLocation.GetOffset(); + std::filesystem::path NewBlockPath = m_RootDirectory / "ucas" / (std::to_string(NewBlockIndex) + ".ucas"); + NewBlockFile = std::make_shared<BasicFile>(); + NewBlockFile->Open(NewBlockPath, true); + MovedBlocks.clear(); + WriteOffset = 0; } - } - // TODO: We could keep some wiggle room here, don't move chunk if we only move it a very small amount - if (FreeChunkSize > m_PayloadAlignment) - { - std::vector<uint8_t> Chunk; - Chunk.resize(ChunkLocation.GetSize()); - m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), ChunkLocation.GetOffset()); - CompactDiskLocation NewChunkLocation(WriteOffset, Chunk.size()); - m_SmallObjectFile.Write(Chunk.data(), Chunk.size(), NewChunkLocation.GetOffset()); - - CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = NewChunkLocation}; - m_OpLog.Append(IndexEntry); - m_LocationMap[ChunkHash] = NewChunkLocation; - - MovedChunks.push_back(ChunkHash); - WriteOffset = AlignPositon(NewChunkLocation.GetOffset() + Chunk.size(), m_PayloadAlignment); - } - else - { - WriteOffset = NextChunkOffset; + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + CompactDiskLocation NewChunkLocation(NewBlockIndex, + gsl::narrow<uint32_t>(WriteOffset), + gsl::narrow<uint32_t>(Chunk.size())); + Entry.second = {.BlockIndex = NewBlockIndex, + .Offset = gsl::narrow<uint32_t>(WriteOffset), + .Size = gsl::narrow<uint32_t>(Chunk.size())}; + MovedBlocks[Entry.first] = Entry.second; + WriteOffset = AlignPositon(WriteOffset + Chunk.size(), m_PayloadAlignment); } + Chunk.clear(); - // Update insert location if this is the last chunk in the file - if (m_CurrentInsertOffset == NextChunkOffset) + // Remap moved chunks to the new block file + RwLock::ExclusiveLockScope _i(m_InsertLock); + if (NewBlockFile) { - m_CurrentInsertOffset = WriteOffset; + m_OpenBlocks[NewBlockIndex] = NewBlockFile; + RwLock::ExclusiveLockScope _l(m_LocationMapLock); + for (const auto& MovedEntry : MovedBlocks) + { + m_LocationMap[MovedEntry.first] = MovedEntry.second; + m_OpLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); + } } - - // We can break here if we want to do incremental GC - - ChunkIndex++; - } - - if (ChunkCount == 0) - { - m_CurrentInsertOffset = 0; } -#endif // 0 + } - GcCtx.DeletedCas(DeletedChunks); + GcCtx.DeletedCas(DeletedChunks); - ZEN_INFO("garbage collection complete '{}', deleted {} chunks", - m_RootDirectory / m_ContainerBaseName, - DeletedChunks.size()); - // TODO: Should we truncate the file or just keep the size of the file and reuse the space? - } + ZEN_INFO("garbage collection complete '{}', deleted {} chunks", m_RootDirectory / m_ContainerBaseName, DeletedChunks.size()); MakeIndexSnapshot(); } @@ -931,29 +906,22 @@ TEST_CASE("chunkbundler.gc.compact") ScopedTemporaryDirectory TempDir; ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); - ChunkBundlerStore.Initialize("cb", 65536, 1 << 4, true); + ChunkBundlerStore.Initialize("cb", 2048, 1 << 4, true); uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; - IoBuffer Chunks[9] = {CreateChunk(ChunkSizes[0]), - CreateChunk(ChunkSizes[1]), - CreateChunk(ChunkSizes[2]), - CreateChunk(ChunkSizes[3]), - CreateChunk(ChunkSizes[4]), - CreateChunk(ChunkSizes[5]), - CreateChunk(ChunkSizes[6]), - CreateChunk(ChunkSizes[7]), - CreateChunk(ChunkSizes[8])}; - IoHash ChunkHashes[9] = { - IoHash::HashBuffer(Chunks[0].Data(), Chunks[0].Size()), - IoHash::HashBuffer(Chunks[1].Data(), Chunks[1].Size()), - IoHash::HashBuffer(Chunks[2].Data(), Chunks[2].Size()), - IoHash::HashBuffer(Chunks[3].Data(), Chunks[3].Size()), - IoHash::HashBuffer(Chunks[4].Data(), Chunks[4].Size()), - IoHash::HashBuffer(Chunks[5].Data(), Chunks[5].Size()), - IoHash::HashBuffer(Chunks[6].Data(), Chunks[6].Size()), - IoHash::HashBuffer(Chunks[7].Data(), Chunks[7].Size()), - IoHash::HashBuffer(Chunks[8].Data(), Chunks[8].Size()), - }; + std::vector<IoBuffer> Chunks; + Chunks.reserve(9); + for (const auto& Size : ChunkSizes) + { + Chunks.push_back(CreateChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(9); + for (const auto& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } CHECK(ChunkBundlerStore.InsertChunk(Chunks[0], ChunkHashes[0]).New); CHECK(ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]).New); @@ -1110,6 +1078,42 @@ TEST_CASE("chunkbundler.gc.compact") ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]); } + // Keep every other + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[2]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + ChunkBundlerStore.CollectGarbage(GcCtx); + + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[0])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[1])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[2])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[3])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[4])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[5])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[6])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[7])); + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[8]))); + + ChunkBundlerStore.InsertChunk(Chunks[1], ChunkHashes[1]); + ChunkBundlerStore.InsertChunk(Chunks[3], ChunkHashes[3]); + ChunkBundlerStore.InsertChunk(Chunks[5], ChunkHashes[5]); + ChunkBundlerStore.InsertChunk(Chunks[7], ChunkHashes[7]); + } + // Verify that we nicely appended blocks even after all GC operations CHECK(ChunkHashes[0] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[0]))); CHECK(ChunkHashes[1] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[1]))); @@ -1125,8 +1129,111 @@ TEST_CASE("chunkbundler.gc.compact") CHECK(InitialSize == FinalSize); } -void -chunkbundler_forcelink() +TEST_CASE("chunkbundler.gc.deleteblockonopen") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(20); + for (const auto& Size : ChunkSizes) + { + Chunks.push_back(CreateChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(20); + for (const auto& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + { + ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); + ChunkBundlerStore.Initialize("cb", 1024, 1 << 4, true); + + for (size_t i = 0; i < 20; i++) + { + CHECK(ChunkBundlerStore.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + // GC every other block + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + for (size_t i = 0; i < 20; i += 2) + { + KeepChunks.push_back(ChunkHashes[i]); + } + GcCtx.ContributeCas(KeepChunks); + + ChunkBundlerStore.CollectGarbage(GcCtx); + + for (size_t i = 0; i < 20; i += 2) + { + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[i])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[i+1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[i]))); + } + } + } + { + // Re-open + ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); + ChunkBundlerStore.Initialize("cb", 1024, 1 << 4, false); + for (size_t i = 0; i < 20; i += 2) + { + CHECK(ChunkBundlerStore.HaveChunk(ChunkHashes[i])); + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(ChunkBundlerStore.FindChunk(ChunkHashes[i]))); + } + } +} + +TEST_CASE("chunkbundler.gc.handleopeniobuffer") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(20); + for (const auto& Size : ChunkSizes) + { + Chunks.push_back(CreateChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(20); + for (const auto& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr); + ChunkBundlerStore.Initialize("cb", 1024, 1 << 4, true); + + for (size_t i = 0; i < 20; i++) + { + CHECK(ChunkBundlerStore.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + auto RetainChunk = ChunkBundlerStore.FindChunk(ChunkHashes[5]); + + // GC everything + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + ChunkBundlerStore.CollectGarbage(GcCtx); + + for (size_t i = 0; i < 20; i++) + { + CHECK(!ChunkBundlerStore.HaveChunk(ChunkHashes[i])); + } + + CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); +} + +void chunkbundler_forcelink() { } |