diff options
| author | Dan Engelbrecht <[email protected]> | 2022-03-22 18:33:06 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-03-31 11:29:26 +0200 |
| commit | 6c7ba22c9a287bf4bd2c06d59f17753b6a5280ea (patch) | |
| tree | 56db8565952ddca186ca12eb98d3715d509946eb /zenstore/compactcas.cpp | |
| parent | Add Flush to workthreadpool (diff) | |
| download | zen-6c7ba22c9a287bf4bd2c06d59f17753b6a5280ea.tar.xz zen-6c7ba22c9a287bf4bd2c06d59f17753b6a5280ea.zip | |
Don't GC currently writing block, reduce lock contention during GC
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 174 |
1 files changed, 60 insertions, 114 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 8571be065..faf54c106 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -267,13 +267,6 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const { RwLock::SharedLockScope _l(m_LocationMapLock); -// for (const auto& Entry : m_LocationMap) -// { -// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex; -// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); -// CHECK(m_ChunkBlocks[CheckBlockIndex]); -// } - auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt != m_LocationMap.end()) @@ -290,13 +283,6 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const { { RwLock::ExclusiveLockScope __(m_LocationMapLock); -// for (const auto& Entry : m_LocationMap) -// { -// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex; -// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); -// CHECK(m_ChunkBlocks[CheckBlockIndex]); -// } - if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) { throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); @@ -310,13 +296,6 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const WriteBlock = std::make_shared<ChunkBlock>(BlockPath); m_ChunkBlocks[WriteBlockIndex] = WriteBlock; m_WriteBlockIndex.store(WriteBlockIndex); - -// for (const auto& Entry : m_LocationMap) -// { -// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex; -// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); -// CHECK(m_ChunkBlocks[CheckBlockIndex]); -// } } m_WriteBlock = WriteBlock; m_CurrentInsertOffset = 0; @@ -351,12 +330,6 @@ IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); -// for (const auto& Entry : m_LocationMap) -// { -// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex; -// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); -// CHECK(m_ChunkBlocks[CheckBlockIndex]); -// } if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { @@ -564,21 +537,13 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) // path to the next new block. ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); - std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; - std::vector<std::unordered_set<IoHash, IoHash::Hasher>> KeepChunks; - std::vector<std::unordered_set<IoHash, IoHash::Hasher>> DeleteChunks; - std::vector<IoHash> DeletedChunks; - std::vector<IoHash> PendingDeletedChunks; - std::unordered_set<uint32_t> BlocksToReWrite; -// std::uint64_t CurrentWritePosition; -// std::uint64_t CurrentWriteBlock; - std::unordered_map<IoHash, CasLocation, IoHash::Hasher> LocationMap; - size_t BlockCount; + std::unordered_map<IoHash, CasLocation, IoHash::Hasher> LocationMap; + size_t BlockCount; { RwLock::SharedLockScope _i(m_InsertLock); RwLock::SharedLockScope _l(m_LocationMapLock); LocationMap.reserve(m_LocationMap.size()); - bool IsWriting = !m_WriteBlock.expired(); + bool IsWriting = !m_WriteBlock.expired(); uint32_t WritingBlock = m_WriteBlockIndex; for (const auto& Entry : m_LocationMap) { @@ -600,11 +565,16 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) const uint64_t TotalChunkCount = LocationMap.size(); + std::unordered_set<uint32_t> BlocksToReWrite; + std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; + std::vector<std::unordered_set<IoHash, IoHash::Hasher>> KeepChunks; + std::vector<std::unordered_set<IoHash, IoHash::Hasher>> DeleteChunks; + BlocksToReWrite.reserve(BlockCount); BlockIndexToChunkMapIndex.reserve(BlockCount); KeepChunks.reserve(BlockCount); DeleteChunks.reserve(BlockCount); - size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount; + size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; std::vector<IoHash> TotalChunkHashes; TotalChunkHashes.reserve(TotalChunkCount); @@ -627,9 +597,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) uint64_t NewTotalSize = 0; GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { - auto KeyIt = LocationMap.find(ChunkHash); + auto KeyIt = LocationMap.find(ChunkHash); const CasLocation& ChunkLocation = KeyIt->second; - size_t ChunkMapIndex = BlockIndexToChunkMapIndex[ChunkLocation.BlockIndex]; + size_t ChunkMapIndex = BlockIndexToChunkMapIndex[ChunkLocation.BlockIndex]; if (Keep) { auto& ChunkMap = KeepChunks[ChunkMapIndex]; @@ -649,38 +619,30 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { uint64_t TotalSize = m_TotalSize.load(); ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", - m_Config.RootDirectory / m_ContainerBaseName, - DeleteCount, - NiceBytes(TotalSize - NewTotalSize), - TotalChunkCount, - NiceBytes(TotalSize)); + m_Config.RootDirectory / m_ContainerBaseName, + DeleteCount, + NiceBytes(TotalSize - NewTotalSize), + TotalChunkCount, + NiceBytes(TotalSize)); return; } - DeletedChunks.reserve(DeleteCount); + // { + // RwLock::ExclusiveLockScope _i(m_InsertLock); + // m_CasLog.Flush(); + // } // Move all chunks in blocks that have chunks removed to new blocks - { - RwLock::ExclusiveLockScope _i(m_InsertLock); - m_CasLog.Flush(); - } - std::shared_ptr<ChunkBlock> NewBlockFile; uint64_t WriteOffset = {}; uint32_t NewBlockIndex = m_WriteBlockIndex.load(); - std::unordered_map<IoHash, CasDiskLocation> MovedBlocks; + std::unordered_map<IoHash, CasDiskLocation> MovedChunks; + std::vector<IoHash> DeletedChunks; + DeletedChunks.reserve(DeleteCount); for (auto BlockIndex : BlocksToReWrite) { -// { -// RwLock::SharedLockScope _i(m_InsertLock); -// if (m_WriteBlockIndex == BlockIndex) -// { -// continue; -// } -// } - const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; const auto& KeepMap = KeepChunks[ChunkMapIndex]; if (KeepMap.empty()) @@ -688,25 +650,17 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) std::shared_ptr<ChunkBlock> BlockFile; { RwLock::ExclusiveLockScope _i(m_LocationMapLock); - const auto& DeleteMap = DeleteChunks[ChunkMapIndex]; + const auto& DeleteMap = DeleteChunks[ChunkMapIndex]; for (const auto& ChunkHash : DeleteMap) { - auto KeyIt = m_LocationMap.find(ChunkHash); - CHECK(KeyIt != m_LocationMap.end()); - m_LocationMap.erase(KeyIt); + auto KeyIt = m_LocationMap.find(ChunkHash); const CasLocation& DeleteChunkLocation = KeyIt->second.Get(m_PayloadAlignment); m_CasLog.Append({.Key = ChunkHash, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); m_TotalSize.fetch_sub(static_cast<uint64_t>(DeleteChunkLocation.Size)); + m_LocationMap.erase(KeyIt); } DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); m_ChunkBlocks[BlockIndex].swap(BlockFile); - -// for (const auto& Entry : m_LocationMap) -// { -// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex; -// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); -// CHECK(m_ChunkBlocks[CheckBlockIndex]); -// } } ZEN_DEBUG("marking cas store file for delete {}, block {}", m_ContainerBaseName, std::to_string(BlockIndex)); std::error_code Ec; @@ -729,7 +683,6 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) Chunks.emplace(KeyIt->first, KeyIt->second.Get(m_PayloadAlignment)); } BlockFile = m_ChunkBlocks[BlockIndex]; - CHECK(BlockFile); BlockFile->Open(); } @@ -748,7 +701,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) RwLock::ExclusiveLockScope _l(m_LocationMapLock); if (NewBlockFile) { - for (const auto& MovedEntry : MovedBlocks) + for (const auto& MovedEntry : MovedChunks) { m_LocationMap[MovedEntry.first] = MovedEntry.second; m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); @@ -756,8 +709,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) { ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", - m_ContainerBaseName, - static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + m_ContainerBaseName, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); return; } } @@ -772,12 +725,6 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) auto NewBlockPath = BuildUcasPath(m_BlocksBasePath, NextBlockIndex); NewBlockFile = std::make_shared<ChunkBlock>(NewBlockPath); m_ChunkBlocks[NextBlockIndex] = NewBlockFile; -// for (const auto& CheckEntry : m_LocationMap) -// { -// uint32_t CheckBlockIndex = CheckEntry.second.Get(m_PayloadAlignment).BlockIndex; -// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); -// CHECK(m_ChunkBlocks[CheckBlockIndex]); -// } } std::error_code Error; @@ -794,23 +741,17 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) if (!std::filesystem::is_regular_file(GCReservePath)) { ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", - m_Config.RootDirectory / m_ContainerBaseName, - m_MaxBlockSize, - NiceBytes(Space.Free)); + m_Config.RootDirectory / m_ContainerBaseName, + m_MaxBlockSize, + NiceBytes(Space.Free)); RwLock::ExclusiveLockScope _l(m_LocationMapLock); m_ChunkBlocks.erase(NextBlockIndex); -// for (const auto& CheckEntry : m_LocationMap) -// { -// uint32_t CheckBlockIndex = CheckEntry.second.Get(m_PayloadAlignment).BlockIndex; -// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); -// CHECK(m_ChunkBlocks[CheckBlockIndex]); -// } return; } ZEN_INFO("using gc reserve for '{}', disk free {}", - m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(Space.Free)); + m_Config.RootDirectory / m_ContainerBaseName, + NiceBytes(Space.Free)); auto NewBlockPath = BuildUcasPath(m_BlocksBasePath, NextBlockIndex); std::filesystem::rename(GCReservePath, NewBlockPath); NewBlockFile->Open(); @@ -820,13 +761,13 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) NewBlockFile->Create(m_MaxBlockSize); } NewBlockIndex = NextBlockIndex; - MovedBlocks.clear(); + MovedChunks.clear(); WriteOffset = 0; } NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); CasLocation NewChunkLocation(NewBlockIndex, WriteOffset, Chunk.size()); - MovedBlocks.emplace(Entry.first, CasDiskLocation(NewChunkLocation, m_PayloadAlignment)); + MovedChunks.emplace(Entry.first, CasDiskLocation(NewChunkLocation, m_PayloadAlignment)); WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); } Chunk.clear(); @@ -835,7 +776,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) RwLock::ExclusiveLockScope _l(m_LocationMapLock); if (NewBlockFile) { - for (const auto& MovedEntry : MovedBlocks) + for (const auto& MovedEntry : MovedChunks) { m_LocationMap[MovedEntry.first] = MovedEntry.second; m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); @@ -844,13 +785,11 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) const auto& DeleteMap = DeleteChunks[ChunkMapIndex]; for (const auto& ChunkHash : DeleteMap) { - auto KeyIt = m_LocationMap.find(ChunkHash); - CHECK(KeyIt != m_LocationMap.end()); - - m_LocationMap.erase(KeyIt); + auto KeyIt = m_LocationMap.find(ChunkHash); const CasLocation& DeleteChunkLocation = KeyIt->second.Get(m_PayloadAlignment); m_CasLog.Append({.Key = ChunkHash, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); m_TotalSize.fetch_sub(static_cast<uint64_t>(DeleteChunkLocation.Size)); + m_LocationMap.erase(KeyIt); } DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); } @@ -862,20 +801,14 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", BlockFile->GetPath(), Ec.message()); } BlockFile.reset(); - -// for (const auto& Entry : m_LocationMap) -// { -// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex; -// CHECK(m_ChunkBlocks.contains(CheckBlockIndex)); -// CHECK(m_ChunkBlocks[CheckBlockIndex]); -// } } GcCtx.DeletedCas(DeletedChunks); - ZEN_INFO("garbage collection complete '{}', deleted {} chunks", m_Config.RootDirectory / m_ContainerBaseName, DeletedChunks.size()); - - MakeIndexSnapshot(); + ZEN_INFO("garbage collection complete '{}', deleted {} and moved {} chunks", + m_Config.RootDirectory / m_ContainerBaseName, + DeletedChunks.size(), + MovedChunks.size()); } void @@ -1053,6 +986,8 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); std::filesystem::path LegacySobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas"); + bool CasLogEmpty = true; + if (IsNewStore) { if (std::filesystem::is_regular_file(LegacySobsPath)) @@ -1187,6 +1122,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) std::filesystem::remove(LegacySobsPath); ZEN_INFO("migrated store {} to {} to chunks", m_Config.RootDirectory / m_ContainerBaseName, NewBlockIndex + 1); + CasLogEmpty = false; } if (std::filesystem::is_regular_file(SidxPath)) @@ -1224,6 +1160,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) { m_LocationMap[Record.Key] = Record.Location; } + CasLogEmpty = false; }); } @@ -1324,6 +1261,11 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) } } + if (!CasLogEmpty) + { + MakeIndexSnapshot(); + } + // TODO: should validate integrity of container files here } @@ -2026,13 +1968,17 @@ TEST_CASE("compactcas.legacyconversion") std::filesystem::remove(SidxPath); } - std::filesystem::path SlogPath = CasConfig.RootDirectory / "test.ulog"; + std::filesystem::path SlogPath = CasConfig.RootDirectory / "test.ulog"; + { + TCasLogFile<CasDiskIndexEntry> CasLog; + CasLog.Open(SlogPath, false); + CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }); + } TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; LegacyCasLog.Open(SlogPath, true); for (const auto& Entry : LogEntries) { - CasLocation Location = Entry.Location.Get(16); - CHECK(Location.BlockIndex == 1); + CasLocation Location = Entry.Location.Get(16); LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size); LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation, @@ -2058,7 +2004,7 @@ TEST_CASE("compactcas.legacyconversion") TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) { -// for (uint32_t i = 0; i < 100; ++i) + // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; |