diff options
| author | Dan Engelbrecht <[email protected]> | 2022-03-22 17:34:11 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-03-31 11:28:33 +0200 |
| commit | f4d040e2c0f602c34af47614907fecd8f6229316 (patch) | |
| tree | 987b8e864d46d5851cabe01ab6179d5d78ff425e /zenstore/compactcas.cpp | |
| parent | Reduce lock contention when garbage collecting (diff) | |
| download | zen-f4d040e2c0f602c34af47614907fecd8f6229316.tar.xz zen-f4d040e2c0f602c34af47614907fecd8f6229316.zip | |
WIP
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 789 |
1 files changed, 431 insertions, 358 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 4138d12ce..8571be065 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -129,13 +129,13 @@ CasContainerStrategy::ChunkBlock::Open() { // Open can have a race if multiple requests wants to read the same block // Create or ~ChunkBlock() can not have a race so we only need to guard Open() - if (m_IsOpened.load()) + if (m_IsOpened.load(std::memory_order::memory_order_acquire)) { return; } RwLock::ExclusiveLockScope _(m_OpenLock); - if (m_IsOpened.load()) + if (m_IsOpened.load(std::memory_order::memory_order_acquire)) { return; } @@ -178,7 +178,7 @@ void CasContainerStrategy::ChunkBlock::MarkAsDeleteOnClose(std::error_code& Ec) { RwLock::ExclusiveLockScope _(m_OpenLock); - if (m_IsOpened.load()) + if (m_IsOpened.load(std::memory_order::memory_order_acquire)) { m_File.MarkAsDeleteOnClose(Ec); return; @@ -267,6 +267,13 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const { RwLock::SharedLockScope _l(m_LocationMapLock); +// for (const auto& Entry : m_LocationMap) +// { +// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex; +// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); +// CHECK(m_ChunkBlocks[CheckBlockIndex]); +// } + auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt != m_LocationMap.end()) @@ -283,6 +290,13 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const { { RwLock::ExclusiveLockScope __(m_LocationMapLock); +// for (const auto& Entry : m_LocationMap) +// { +// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex; +// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); +// CHECK(m_ChunkBlocks[CheckBlockIndex]); +// } + if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) { throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); @@ -296,6 +310,13 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const WriteBlock = std::make_shared<ChunkBlock>(BlockPath); m_ChunkBlocks[WriteBlockIndex] = WriteBlock; m_WriteBlockIndex.store(WriteBlockIndex); + +// for (const auto& Entry : m_LocationMap) +// { +// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex; +// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); +// CHECK(m_ChunkBlocks[CheckBlockIndex]); +// } } m_WriteBlock = WriteBlock; m_CurrentInsertOffset = 0; @@ -330,6 +351,12 @@ IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); +// for (const auto& Entry : m_LocationMap) +// { +// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex; +// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); +// CHECK(m_ChunkBlocks[CheckBlockIndex]); +// } if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { @@ -373,12 +400,24 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) void CasContainerStrategy::Flush() { - RwLock::ExclusiveLockScope _(m_InsertLock); - m_CasLog.Flush(); - if (auto WriteBlock = m_WriteBlock.lock()) { - WriteBlock->Flush(); + RwLock::ExclusiveLockScope _(m_InsertLock); + if (m_CurrentInsertOffset > 0) + { + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(); + auto WriteBlock = m_WriteBlock.lock(); + WriteBlockIndex++; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex = (WriteBlockIndex + 1) & CasDiskLocation::MaxBlockIndex; + } + WriteBlock->Flush(); + m_WriteBlock.reset(); + m_WriteBlockIndex = WriteBlockIndex; + m_CurrentInsertOffset = 0; + } } + MakeIndexSnapshot(); } void @@ -525,114 +564,108 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) // path to the next new block. ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); - std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; - std::vector<std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher>> KeepChunks; + std::vector<std::unordered_set<IoHash, IoHash::Hasher>> KeepChunks; std::vector<std::unordered_set<IoHash, IoHash::Hasher>> DeleteChunks; std::vector<IoHash> DeletedChunks; std::vector<IoHash> PendingDeletedChunks; std::unordered_set<uint32_t> BlocksToReWrite; - std::uint64_t CurrentWritePosition; - std::uint64_t CurrentWriteBlock; +// std::uint64_t CurrentWritePosition; +// std::uint64_t CurrentWriteBlock; + std::unordered_map<IoHash, CasLocation, IoHash::Hasher> LocationMap; + size_t BlockCount; { + RwLock::SharedLockScope _i(m_InsertLock); RwLock::SharedLockScope _l(m_LocationMapLock); - - const auto& LocationMap = m_LocationMap; - - if (LocationMap.empty()) - { - ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName); - return; - } - - const uint64_t TotalChunkCount = LocationMap.size(); - - BlocksToReWrite.reserve(m_ChunkBlocks.size()); - BlockIndexToChunkMapIndex.reserve(m_ChunkBlocks.size()); - KeepChunks.reserve(m_ChunkBlocks.size()); - DeleteChunks.reserve(m_ChunkBlocks.size()); - size_t GuesstimateCountPerBlock = TotalChunkCount / m_ChunkBlocks.size(); - - std::vector<IoHash> TotalChunkHashes; - TotalChunkHashes.reserve(TotalChunkCount); - for (const auto& Entry : LocationMap) + LocationMap.reserve(m_LocationMap.size()); + bool IsWriting = !m_WriteBlock.expired(); + uint32_t WritingBlock = m_WriteBlockIndex; + for (const auto& Entry : m_LocationMap) { - TotalChunkHashes.push_back(Entry.first); - const CasLocation Location = Entry.second.Get(m_PayloadAlignment); - if (BlockIndexToChunkMapIndex.contains(Location.BlockIndex)) + CasLocation Location = Entry.second.Get(m_PayloadAlignment); + if (IsWriting && Location.BlockIndex == WritingBlock) { continue; } - BlockIndexToChunkMapIndex[Location.BlockIndex] = KeepChunks.size(); - KeepChunks.resize(KeepChunks.size() + 1); - KeepChunks.back().reserve(GuesstimateCountPerBlock); - DeleteChunks.resize(DeleteChunks.size() + 1); - DeleteChunks.back().reserve(GuesstimateCountPerBlock); + LocationMap.emplace(Entry.first, Location); } + BlockCount = m_ChunkBlocks.size(); + } - const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); - uint64_t DeleteCount = {}; + if (LocationMap.empty()) + { + ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName); + return; + } - uint64_t NewTotalSize = 0; - GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { - auto KeyIt = LocationMap.find(ChunkHash); - const CasLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment); - size_t ChunkMapIndex = BlockIndexToChunkMapIndex[ChunkLocation.BlockIndex]; - if (Keep) - { - auto& ChunkMap = KeepChunks[ChunkMapIndex]; - ChunkMap.emplace(ChunkHash, KeyIt->second); - NewTotalSize += ChunkLocation.Size; - } - else - { - auto& ChunkMap = DeleteChunks[ChunkMapIndex]; - ChunkMap.insert(ChunkHash); - DeleteCount++; - BlocksToReWrite.insert(ChunkLocation.BlockIndex); - } - }); + const uint64_t TotalChunkCount = LocationMap.size(); + + BlocksToReWrite.reserve(BlockCount); + BlockIndexToChunkMapIndex.reserve(BlockCount); + KeepChunks.reserve(BlockCount); + DeleteChunks.reserve(BlockCount); + size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount; - if (!PerformDelete) + std::vector<IoHash> TotalChunkHashes; + TotalChunkHashes.reserve(TotalChunkCount); + for (const auto& Entry : LocationMap) + { + TotalChunkHashes.push_back(Entry.first); + if (BlockIndexToChunkMapIndex.contains(Entry.second.BlockIndex)) + { + continue; + } + BlockIndexToChunkMapIndex[Entry.second.BlockIndex] = KeepChunks.size(); + KeepChunks.resize(KeepChunks.size() + 1); + KeepChunks.back().reserve(GuesstimateCountPerBlock); + DeleteChunks.resize(DeleteChunks.size() + 1); + DeleteChunks.back().reserve(GuesstimateCountPerBlock); + } + + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + uint64_t DeleteCount = {}; + + uint64_t NewTotalSize = 0; + GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + auto KeyIt = LocationMap.find(ChunkHash); + const CasLocation& ChunkLocation = KeyIt->second; + size_t ChunkMapIndex = BlockIndexToChunkMapIndex[ChunkLocation.BlockIndex]; + if (Keep) + { + auto& ChunkMap = KeepChunks[ChunkMapIndex]; + ChunkMap.insert(ChunkHash); + NewTotalSize += ChunkLocation.Size; + } + else { - uint64_t TotalSize = m_TotalSize.load(); - ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", - m_Config.RootDirectory / m_ContainerBaseName, - DeleteCount, - NiceBytes(TotalSize - NewTotalSize), - TotalChunkCount, - NiceBytes(TotalSize)); - return; + auto& ChunkMap = DeleteChunks[ChunkMapIndex]; + ChunkMap.insert(ChunkHash); + DeleteCount++; + BlocksToReWrite.insert(ChunkLocation.BlockIndex); } + }); - DeletedChunks.reserve(DeleteCount); - CurrentWritePosition = m_CurrentInsertOffset; - CurrentWriteBlock = m_WriteBlockIndex.load(); + if (!PerformDelete) + { + uint64_t TotalSize = m_TotalSize.load(); + ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_Config.RootDirectory / m_ContainerBaseName, + DeleteCount, + NiceBytes(TotalSize - NewTotalSize), + TotalChunkCount, + NiceBytes(TotalSize)); + return; } + DeletedChunks.reserve(DeleteCount); + + // Move all chunks in blocks that have chunks removed to new blocks + { RwLock::ExclusiveLockScope _i(m_InsertLock); m_CasLog.Flush(); - - // Did someone write into the current block while we released the m_InsertLock? - uint32_t WriteBlockIndex = m_WriteBlockIndex.load(); - if (BlocksToReWrite.contains(WriteBlockIndex)) - { - // No - we can safely terminate appending to block and rewrite it - if (CurrentWritePosition == m_CurrentInsertOffset) - { - m_WriteBlock.reset(); - } - // Yes - don't touch the block! - else - { - BlocksToReWrite.erase(WriteBlockIndex); - } - } } - // Move all chunks in blocks that have chunks removed to new blocks - std::shared_ptr<ChunkBlock> NewBlockFile; uint64_t WriteOffset = {}; uint32_t NewBlockIndex = m_WriteBlockIndex.load(); @@ -640,6 +673,14 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) for (auto BlockIndex : BlocksToReWrite) { +// { +// RwLock::SharedLockScope _i(m_InsertLock); +// if (m_WriteBlockIndex == BlockIndex) +// { +// continue; +// } +// } + const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; const auto& KeepMap = KeepChunks[ChunkMapIndex]; if (KeepMap.empty()) @@ -647,7 +688,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) std::shared_ptr<ChunkBlock> BlockFile; { RwLock::ExclusiveLockScope _i(m_LocationMapLock); - const auto& DeleteMap = DeleteChunks[ChunkMapIndex]; + const auto& DeleteMap = DeleteChunks[ChunkMapIndex]; for (const auto& ChunkHash : DeleteMap) { auto KeyIt = m_LocationMap.find(ChunkHash); @@ -659,6 +700,13 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) } DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); m_ChunkBlocks[BlockIndex].swap(BlockFile); + +// for (const auto& Entry : m_LocationMap) +// { +// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex; +// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); +// CHECK(m_ChunkBlocks[CheckBlockIndex]); +// } } ZEN_DEBUG("marking cas store file for delete {}, block {}", m_ContainerBaseName, std::to_string(BlockIndex)); std::error_code Ec; @@ -670,132 +718,157 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) continue; } - std::shared_ptr<ChunkBlock> BlockFile; + std::shared_ptr<ChunkBlock> BlockFile; + std::unordered_map<IoHash, CasLocation, IoHash::Hasher> Chunks; + Chunks.reserve(KeepMap.size()); { RwLock::SharedLockScope _i(m_LocationMapLock); + for (const auto& ChunkHash : KeepMap) + { + auto KeyIt = m_LocationMap.find(ChunkHash); + Chunks.emplace(KeyIt->first, KeyIt->second.Get(m_PayloadAlignment)); + } BlockFile = m_ChunkBlocks[BlockIndex]; + CHECK(BlockFile); BlockFile->Open(); } + std::vector<uint8_t> Chunk; + for (auto& Entry : Chunks) { - std::vector<uint8_t> Chunk; - for (auto& Entry : KeepMap) + const CasLocation ChunkLocation = Entry.second; + Chunk.resize(ChunkLocation.Size); + BlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + + if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) { - const CasLocation ChunkLocation = Entry.second.Get(m_PayloadAlignment); - Chunk.resize(ChunkLocation.Size); - BlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + uint32_t NextBlockIndex = m_WriteBlockIndex.load(); - if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) { - uint32_t NextBlockIndex = m_WriteBlockIndex.load(); - + RwLock::ExclusiveLockScope _l(m_LocationMapLock); + if (NewBlockFile) { - RwLock::ExclusiveLockScope _l(m_LocationMapLock); - if (NewBlockFile) + for (const auto& MovedEntry : MovedBlocks) { - for (const auto& MovedEntry : MovedBlocks) - { - m_LocationMap[MovedEntry.first] = MovedEntry.second; - m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); - } - if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) - { - ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", - m_ContainerBaseName, - static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); - return; - } + m_LocationMap[MovedEntry.first] = MovedEntry.second; + m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); } if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) { - throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); - } - while (m_ChunkBlocks.contains(NextBlockIndex)) - { - NextBlockIndex = (NextBlockIndex + 1) & CasDiskLocation::MaxBlockIndex; + ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", + m_ContainerBaseName, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return; } - auto NewBlockPath = BuildUcasPath(m_BlocksBasePath, NextBlockIndex); - NewBlockFile = std::make_shared<ChunkBlock>(NewBlockPath); - m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); - if (Error) - { - ZEN_ERROR("get disk space in {} FAILED, reason '{}'", m_ContainerBaseName, Error.message()); - return; - } - - if (Space.Free < m_MaxBlockSize) + if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) { - std::filesystem::path GCReservePath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.reserve.ucas"); - if (!std::filesystem::is_regular_file(GCReservePath)) - { - ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", - m_Config.RootDirectory / m_ContainerBaseName, - m_MaxBlockSize, - NiceBytes(Space.Free)); - RwLock::ExclusiveLockScope _l(m_LocationMapLock); - m_ChunkBlocks.erase(NextBlockIndex); - return; - } - - ZEN_INFO("using gc reserve for '{}', disk free {}", - m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(Space.Free)); - auto NewBlockPath = BuildUcasPath(m_BlocksBasePath, NextBlockIndex); - std::filesystem::rename(GCReservePath, NewBlockPath); - NewBlockFile->Open(); + throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); } - else + while (m_ChunkBlocks.contains(NextBlockIndex)) { - NewBlockFile->Create(m_MaxBlockSize); + NextBlockIndex = (NextBlockIndex + 1) & CasDiskLocation::MaxBlockIndex; } - NewBlockIndex = NextBlockIndex; - MovedBlocks.clear(); - WriteOffset = 0; + auto NewBlockPath = BuildUcasPath(m_BlocksBasePath, NextBlockIndex); + NewBlockFile = std::make_shared<ChunkBlock>(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; +// for (const auto& CheckEntry : m_LocationMap) +// { +// uint32_t CheckBlockIndex = CheckEntry.second.Get(m_PayloadAlignment).BlockIndex; +// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); +// CHECK(m_ChunkBlocks[CheckBlockIndex]); +// } } - NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); - CasLocation NewChunkLocation(NewBlockIndex, WriteOffset, Chunk.size()); - MovedBlocks.emplace(Entry.first, CasDiskLocation(NewChunkLocation, m_PayloadAlignment)); - WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); - } - Chunk.clear(); - - // Remap moved chunks to the new block file - RwLock::ExclusiveLockScope _l(m_LocationMapLock); - if (NewBlockFile) - { - for (const auto& MovedEntry : MovedBlocks) + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); + if (Error) { - m_LocationMap[MovedEntry.first] = MovedEntry.second; - m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); + ZEN_ERROR("get disk space in {} FAILED, reason '{}'", m_ContainerBaseName, Error.message()); + return; } - const auto& DeleteMap = DeleteChunks[ChunkMapIndex]; - for (const auto& ChunkHash : DeleteMap) + if (Space.Free < m_MaxBlockSize) { - auto KeyIt = m_LocationMap.find(ChunkHash); - CHECK(KeyIt != m_LocationMap.end()); + std::filesystem::path GCReservePath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.reserve.ucas"); + if (!std::filesystem::is_regular_file(GCReservePath)) + { + ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", + m_Config.RootDirectory / m_ContainerBaseName, + m_MaxBlockSize, + NiceBytes(Space.Free)); + RwLock::ExclusiveLockScope _l(m_LocationMapLock); + m_ChunkBlocks.erase(NextBlockIndex); +// for (const auto& CheckEntry : m_LocationMap) +// { +// uint32_t CheckBlockIndex = CheckEntry.second.Get(m_PayloadAlignment).BlockIndex; +// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); +// CHECK(m_ChunkBlocks[CheckBlockIndex]); +// } + return; + } - m_LocationMap.erase(KeyIt); - const CasLocation& DeleteChunkLocation = KeyIt->second.Get(m_PayloadAlignment); - m_CasLog.Append({.Key = ChunkHash, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); - m_TotalSize.fetch_sub(static_cast<uint64_t>(DeleteChunkLocation.Size)); + ZEN_INFO("using gc reserve for '{}', disk free {}", + m_Config.RootDirectory / m_ContainerBaseName, + NiceBytes(Space.Free)); + auto NewBlockPath = BuildUcasPath(m_BlocksBasePath, NextBlockIndex); + std::filesystem::rename(GCReservePath, NewBlockPath); + NewBlockFile->Open(); } - DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); + else + { + NewBlockFile->Create(m_MaxBlockSize); + } + NewBlockIndex = NextBlockIndex; + MovedBlocks.clear(); + WriteOffset = 0; } - ZEN_DEBUG("marking cas store file for delete {}, block index {}", m_ContainerBaseName, BlockIndex); - std::error_code Ec; - BlockFile->MarkAsDeleteOnClose(Ec); - if (Ec) + + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + CasLocation NewChunkLocation(NewBlockIndex, WriteOffset, Chunk.size()); + MovedBlocks.emplace(Entry.first, CasDiskLocation(NewChunkLocation, m_PayloadAlignment)); + WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); + } + Chunk.clear(); + + // Remap moved chunks to the new block file + RwLock::ExclusiveLockScope _l(m_LocationMapLock); + if (NewBlockFile) + { + for (const auto& MovedEntry : MovedBlocks) { - ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", BlockFile->GetPath(), Ec.message()); + m_LocationMap[MovedEntry.first] = MovedEntry.second; + m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); } - BlockFile.reset(); + + const auto& DeleteMap = DeleteChunks[ChunkMapIndex]; + for (const auto& ChunkHash : DeleteMap) + { + auto KeyIt = m_LocationMap.find(ChunkHash); + CHECK(KeyIt != m_LocationMap.end()); + + m_LocationMap.erase(KeyIt); + const CasLocation& DeleteChunkLocation = KeyIt->second.Get(m_PayloadAlignment); + m_CasLog.Append({.Key = ChunkHash, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); + m_TotalSize.fetch_sub(static_cast<uint64_t>(DeleteChunkLocation.Size)); + } + DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); + } + ZEN_DEBUG("marking cas store file for delete {}, block index {}", m_ContainerBaseName, BlockIndex); + std::error_code Ec; + BlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", BlockFile->GetPath(), Ec.message()); } + BlockFile.reset(); + +// for (const auto& Entry : m_LocationMap) +// { +// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex; +// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); +// CHECK(m_ChunkBlocks[CheckBlockIndex]); +// } } GcCtx.DeletedCas(DeletedChunks); @@ -818,25 +891,28 @@ CasContainerStrategy::MakeIndexSnapshot() fs::path STmpSidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".tmp.uidx"); fs::path SRecoveredlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".recover.ulog"); - // Move cas and index away, we keep them if something goes wrong, any new chunks will be added to the new log + // Index away, we keep it if something goes wrong + if (fs::is_regular_file(STmpSidxPath)) { - RwLock::ExclusiveLockScope _(m_LocationMapLock); + fs::remove(STmpSidxPath); + } + if (fs::is_regular_file(SidxPath)) + { + fs::rename(SidxPath, STmpSidxPath); + } + + // Move cas away, we keep it if something goes wrong, any new chunks will be added to the new log + { + RwLock::ExclusiveLockScope _(m_InsertLock); + RwLock::ExclusiveLockScope __(m_LocationMapLock); m_CasLog.Close(); if (fs::is_regular_file(STmplogPath)) { fs::remove(STmplogPath); } - if (fs::is_regular_file(STmpSidxPath)) - { - fs::remove(STmpSidxPath); - } fs::rename(SlogPath, STmplogPath); - if (fs::is_regular_file(SidxPath)) - { - fs::rename(SidxPath, STmpSidxPath); - } // Open an new log m_CasLog.Open(SlogPath, true); @@ -1151,22 +1227,12 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) }); } - std::unordered_map<uint32_t, uint64_t> BlockUsage; + std::unordered_set<uint32_t> BlockUsage; for (const auto& Entry : m_LocationMap) { const CasLocation Location = Entry.second.Get(m_PayloadAlignment); m_TotalSize.fetch_add(Location.Size); - uint64_t NextBlockStart = Location.Offset + Location.Size; - auto It = BlockUsage.find(Location.BlockIndex); - if (It == BlockUsage.end()) - { - BlockUsage[Location.BlockIndex] = NextBlockStart; - continue; - } - if (It->second < NextBlockStart) - { - It->second = NextBlockStart; - } + BlockUsage.insert(Location.BlockIndex); } if (std::filesystem::is_directory(m_BlocksBasePath)) @@ -1217,25 +1283,6 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) } } - uint64_t LargestSizeToUse = m_MaxBlockSize - m_PayloadAlignment; - uint64_t SmallestBlockSize = LargestSizeToUse; - bool OpenExistingBlock = false; - for (const auto& Entry : BlockUsage) - { - if (Entry.second < SmallestBlockSize) - { - SmallestBlockSize = Entry.second; - m_WriteBlockIndex = Entry.first; - OpenExistingBlock = true; - } - } - - if (OpenExistingBlock) - { - m_WriteBlock = m_ChunkBlocks[m_WriteBlockIndex]; - m_CurrentInsertOffset = RoundUp(SmallestBlockSize, m_PayloadAlignment); - } - // Create GC reserve file if possible std::filesystem::path GCReservePath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.reserve.ucas"); @@ -1505,6 +1552,7 @@ TEST_CASE("compactcas.gc.basic") const auto InsertResult = Cas.InsertChunk(Chunk, ChunkHash); CHECK(InsertResult.New); + Cas.Flush(); GcContext GcCtx; GcCtx.CollectSmallObjects(true); @@ -1533,6 +1581,7 @@ TEST_CASE("compactcas.gc.removefile") CHECK(InsertResult.New); const auto InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash); CHECK(!InsertResultDup.New); + Cas.Flush(); } CasGc Gc; @@ -1606,6 +1655,7 @@ TEST_CASE("compactcas.gc.compact") KeepChunks.push_back(ChunkHashes[8]); GcCtx.ContributeCas(KeepChunks); + Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(Cas.HaveChunk(ChunkHashes[0])); @@ -1638,6 +1688,7 @@ TEST_CASE("compactcas.gc.compact") KeepChunks.push_back(ChunkHashes[8]); GcCtx.ContributeCas(KeepChunks); + Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); @@ -1671,6 +1722,7 @@ TEST_CASE("compactcas.gc.compact") KeepChunks.push_back(ChunkHashes[7]); GcCtx.ContributeCas(KeepChunks); + Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); @@ -1705,6 +1757,7 @@ TEST_CASE("compactcas.gc.compact") KeepChunks.push_back(ChunkHashes[8]); GcCtx.ContributeCas(KeepChunks); + Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(!Cas.HaveChunk(ChunkHashes[0])); @@ -1741,6 +1794,7 @@ TEST_CASE("compactcas.gc.compact") KeepChunks.push_back(ChunkHashes[8]); GcCtx.ContributeCas(KeepChunks); + Cas.Flush(); Cas.CollectGarbage(GcCtx); CHECK(Cas.HaveChunk(ChunkHashes[0])); @@ -1823,6 +1877,7 @@ TEST_CASE("compactcas.gc.deleteblockonopen") } GcCtx.ContributeCas(KeepChunks); + Cas.Flush(); Cas.CollectGarbage(GcCtx); for (size_t i = 0; i < 20; i += 2) @@ -1881,6 +1936,7 @@ TEST_CASE("compactcas.gc.handleopeniobuffer") } auto RetainChunk = Cas.FindChunk(ChunkHashes[5]); + Cas.Flush(); // GC everything GcContext GcCtx; @@ -1939,6 +1995,7 @@ TEST_CASE("compactcas.legacyconversion") GcContext GcCtx; GcCtx.CollectSmallObjects(true); GcCtx.ContributeCas(KeepChunks); + Cas.Flush(); Gc.CollectGarbage(GcCtx); } @@ -1999,176 +2056,192 @@ TEST_CASE("compactcas.legacyconversion") } } -TEST_CASE("compactcas.threadedinsert") +TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) { - ScopedTemporaryDirectory TempDir; - - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); +// for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; - CreateDirectories(CasConfig.RootDirectory); + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); - const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 8192; + CreateDirectories(CasConfig.RootDirectory); - std::unordered_set<IoHash, IoHash::Hasher> ChunkHashes; - ChunkHashes.reserve(kChunkCount); + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 8192; - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 32768, 16, true); - { - WorkerThreadPool ThreadPool(std::thread::hardware_concurrency() + 2); // Flood it a little - RwLock ChunkHashesLock; + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(kChunkCount); + std::vector<IoBuffer> Chunks; + Chunks.reserve(kChunkCount); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - ThreadPool.ScheduleWork([&Cas, kChunkSize, &ChunkHashesLock, &ChunkHashes]() { - IoBuffer Chunk = CreateChunk(kChunkSize); - const IoHash Hash = HashBuffer(Chunk); - auto InsertResult = Cas.InsertChunk(Chunk, Hash); - ZEN_ASSERT(InsertResult.New); - RwLock::ExclusiveLockScope _(ChunkHashesLock); - ChunkHashes.insert(Hash); - }); + IoBuffer Chunk = CreateChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + ChunkHashes.emplace_back(Hash); + Chunks.emplace_back(Chunk); } - ThreadPool.Flush(); - } - - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); - - { - WorkerThreadPool ThreadPool(std::thread::hardware_concurrency() + 2); // Flood it a little - std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 32768, 16, true); { - ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() { - auto ChunkHash = OldChunkHashes[Idx]; - auto Chunk = Cas.FindChunk(ChunkHash); - auto Hash = IoHash::HashBuffer(Chunk); - CHECK(ChunkHash == Hash); - }); - } - ThreadPool.Flush(); - } + WorkerThreadPool ThreadPool(4 /* std::thread::hardware_concurrency() + 2*/); // Flood it a little - std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); - { - std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); - std::vector<IoHash> NewChunkHashes; - NewChunkHashes.reserve(kChunkCount); + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + const IoBuffer& Chunk = Chunks[Idx]; + const IoHash& Hash = ChunkHashes[Idx]; + ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() { + auto InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + }); + } + ThreadPool.Flush(); + } - WorkerThreadPool ThreadPool(std::thread::hardware_concurrency() + 2); // Flood it a little - RwLock ChunkHashesLock; - std::atomic_uint32_t AddedChunkCount; + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - ThreadPool.ScheduleWork([&Cas, kChunkSize, &ChunkHashesLock, &NewChunkHashes, &AddedChunkCount]() { - IoBuffer Chunk = CreateChunk(kChunkSize); - const IoHash Hash = HashBuffer(Chunk); - auto InsertResult = Cas.InsertChunk(Chunk, Hash); - ZEN_ASSERT(InsertResult.New); - { - RwLock::ExclusiveLockScope _(ChunkHashesLock); - NewChunkHashes.emplace_back(Hash); - } - AddedChunkCount.fetch_add(1); - }); - ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() { - IoHash ChunkHash = OldChunkHashes[Idx]; - auto Chunk = Cas.FindChunk(OldChunkHashes[Idx]); - if (Chunk) - { - CHECK(ChunkHash == IoHash::HashBuffer(Chunk)); - } - }); + WorkerThreadPool ThreadPool(4 /* std::thread::hardware_concurrency() + 2*/); // Flood it a little + + std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() { + auto ChunkHash = OldChunkHashes[Idx]; + auto Chunk = Cas.FindChunk(ChunkHash); + auto Hash = IoHash::HashBuffer(Chunk); + CHECK(ChunkHash == Hash); + }); + } + ThreadPool.Flush(); } - while (AddedChunkCount.load() < kChunkCount) + std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); { - std::vector<IoHash> AddedHashes; + std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + std::vector<IoHash> NewChunkHashes; + NewChunkHashes.reserve(kChunkCount); + std::vector<IoBuffer> NewChunks; + NewChunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - RwLock::ExclusiveLockScope _(ChunkHashesLock); - AddedHashes.swap(NewChunkHashes); + IoBuffer Chunk = CreateChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunkHashes.emplace_back(Hash); + NewChunks.emplace_back(Chunk); } - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (auto ChunkHash : AddedHashes) + + WorkerThreadPool ThreadPool(4 /* std::thread::hardware_concurrency() + 2*/); // Flood it a little + RwLock ChunkHashesLock; + std::atomic_uint32_t AddedChunkCount; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - if (Cas.HaveChunk(ChunkHash)) - { - GcChunkHashes.emplace(ChunkHash); - } + const IoBuffer& Chunk = NewChunks[Idx]; + const IoHash& Hash = NewChunkHashes[Idx]; + ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() { + auto InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + AddedChunkCount.fetch_add(1); + }); + ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() { + IoHash ChunkHash = OldChunkHashes[Idx]; + auto Chunk = Cas.FindChunk(OldChunkHashes[Idx]); + if (Chunk) + { + CHECK(ChunkHash == IoHash::HashBuffer(Chunk)); + } + }); } - std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - int32_t C = 0; - while (C < KeepHashes.size()) + + while (AddedChunkCount.load() < kChunkCount) { - if (C % 3 == 0 && C < KeepHashes.size() - 1) + std::vector<IoHash> AddedHashes; { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); + RwLock::ExclusiveLockScope _(ChunkHashesLock); + AddedHashes.swap(NewChunkHashes); + } + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (auto ChunkHash : AddedHashes) + { + if (Cas.HaveChunk(ChunkHash)) + { + GcChunkHashes.emplace(ChunkHash); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + int32_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 3 == 0 && C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + C++; } - C++; - } - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - GcCtx.ContributeCas(KeepHashes); - Cas.CollectGarbage(GcCtx); - CasChunkSet& Deleted = GcCtx.DeletedCas(); - Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); - } + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } - ThreadPool.Flush(); + ThreadPool.Flush(); - { - std::vector<IoHash> AddedHashes; - { - RwLock::ExclusiveLockScope _(ChunkHashesLock); - AddedHashes.swap(NewChunkHashes); - } - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (auto ChunkHash : AddedHashes) { - if (Cas.HaveChunk(ChunkHash)) + std::vector<IoHash> AddedHashes; { - GcChunkHashes.emplace(ChunkHash); + RwLock::ExclusiveLockScope _(ChunkHashesLock); + AddedHashes.swap(NewChunkHashes); } - } - std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - int32_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 3 == 0 && C < KeepHashes.size() - 1) + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (auto ChunkHash : AddedHashes) { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); + if (Cas.HaveChunk(ChunkHash)) + { + GcChunkHashes.emplace(ChunkHash); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + int32_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 3 == 0 && C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + C++; } - C++; - } - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - GcCtx.ContributeCas(KeepHashes); - Cas.CollectGarbage(GcCtx); - CasChunkSet& Deleted = GcCtx.DeletedCas(); - Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } } - } - { - WorkerThreadPool ThreadPool(std::thread::hardware_concurrency() + 2); // Flood it a little - - for (IoHash ChunkHash : GcChunkHashes) { - ThreadPool.ScheduleWork([&Cas, ChunkHash]() { - CHECK(Cas.HaveChunk(ChunkHash)); - CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); - }); + WorkerThreadPool ThreadPool(4 /* std::thread::hardware_concurrency() + 2*/); // Flood it a little + + for (const auto& ChunkHash : GcChunkHashes) + { + ThreadPool.ScheduleWork([&Cas, ChunkHash]() { + CHECK(Cas.HaveChunk(ChunkHash)); + CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + }); + } + ThreadPool.Flush(); } - ThreadPool.Flush(); } } |