diff options
| author | Dan Engelbrecht <[email protected]> | 2022-03-18 12:40:28 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-03-31 11:28:32 +0200 |
| commit | 567db5817c15e8e1cf4415cb38642fe8bcd533a0 (patch) | |
| tree | e29019f39f271595ee8d78fc004eb737497842af /zenstore/compactcas.cpp | |
| parent | rename compact cas test cases (diff) | |
| download | zen-567db5817c15e8e1cf4415cb38642fe8bcd533a0.tar.xz zen-567db5817c15e8e1cf4415cb38642fe8bcd533a0.zip | |
Clean up thread locking
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 154 |
1 files changed, 66 insertions, 88 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 5261a89e4..28f989bfb 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -240,32 +240,38 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const // New entry - if ((m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize) + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(); + auto WriteBlock = m_WriteBlock.lock(); + if (!WriteBlock || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize) { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - uint32_t NewBlockIndex = m_CurrentBlockIndex + 1; - while (m_OpenBlocks.contains(NewBlockIndex)) { - NewBlockIndex++; - if (NewBlockIndex == m_CurrentBlockIndex) + RwLock::ExclusiveLockScope __(m_LocationMapLock); + if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) { throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); } + WriteBlockIndex += WriteBlock ? 1 : 0; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex++; + } + WriteBlock = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, WriteBlockIndex); + m_ChunkBlocks[WriteBlockIndex] = WriteBlock; + m_WriteBlockIndex.store(WriteBlockIndex); } - m_CurrentBlockIndex = NewBlockIndex; - auto SmallObjectFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex); - SmallObjectFile->Create(m_MaxBlockSize); - m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; - m_CurrentBlock = SmallObjectFile; - m_CurrentInsertOffset = 0; + m_WriteBlock = WriteBlock; + m_CurrentInsertOffset = 0; + WriteBlock->Create(m_MaxBlockSize); + } + else + { + WriteBlock->Open(); } const uint64_t InsertOffset = m_CurrentInsertOffset; - auto CurrentBlock = m_CurrentBlock.lock(); - CurrentBlock->Open(); - CurrentBlock->Write(ChunkData, ChunkSize, InsertOffset); + WriteBlock->Write(ChunkData, ChunkSize, InsertOffset); m_CurrentInsertOffset = AlignPositon(InsertOffset + ChunkSize, m_PayloadAlignment); - const CasLocation Location(m_CurrentBlockIndex, InsertOffset, ChunkSize); + const CasLocation Location(WriteBlockIndex, InsertOffset, ChunkSize); CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = CasDiskLocation(Location, m_PayloadAlignment)}; RwLock::ExclusiveLockScope __(m_LocationMapLock); @@ -291,7 +297,7 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { CasLocation Location = KeyIt->second.Get(m_PayloadAlignment); - if (auto BlockIt = m_OpenBlocks.find(Location.BlockIndex); BlockIt != m_OpenBlocks.end()) + if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) { return BlockIt->second->GetRange(Location.Offset, Location.Size); } @@ -325,9 +331,12 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) void CasContainerStrategy::Flush() { - RwLock::SharedLockScope _(m_InsertLock); + RwLock::ExclusiveLockScope _i(m_InsertLock); m_CasLog.Flush(); - m_CurrentBlock.lock()->Flush(); + if (auto WriteBlock = m_WriteBlock.lock()) + { + WriteBlock->Flush(); + } } void @@ -350,7 +359,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time? RwLock::SharedLockScope __(m_LocationMapLock); - for (const auto& Block : m_OpenBlocks) + for (const auto& Block : m_ChunkBlocks) { uint64_t WindowStart = 0; uint64_t WindowEnd = WindowSize; @@ -402,7 +411,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) { IoHashStream Hasher; const CasLocation Location = Entry.Location.Get(m_PayloadAlignment); - auto& SmallObjectFile = *m_OpenBlocks[Location.BlockIndex]; + auto& SmallObjectFile = *m_ChunkBlocks[Location.BlockIndex]; SmallObjectFile.StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); @@ -485,9 +494,12 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) RwLock::ExclusiveLockScope _l(m_LocationMapLock); m_CasLog.Flush(); - m_CurrentBlock.lock()->Flush(); + if (auto WriteBlock = m_WriteBlock.lock()) + { + WriteBlock->Flush(); + } - BlocksToReWrite.reserve(m_OpenBlocks.size()); + BlocksToReWrite.reserve(m_ChunkBlocks.size()); if (m_LocationMap.empty()) { @@ -553,26 +565,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) // TODO: Be smarter about terminating current block - we should probably not rewrite if there is just // a small amount of bytes to gain. - if (BlocksToReWrite.contains(m_CurrentBlockIndex)) + if (BlocksToReWrite.contains(m_WriteBlockIndex.load())) { - uint32_t NewBlockIndex = m_CurrentBlockIndex + 1; - while (m_OpenBlocks.contains(NewBlockIndex)) - { - NewBlockIndex++; - if (NewBlockIndex == m_CurrentBlockIndex) - { - ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", - m_ContainerBaseName, - static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); - return; - } - } - m_CurrentBlockIndex = NewBlockIndex; - auto SmallObjectFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex); - SmallObjectFile->Create(m_MaxBlockSize); - m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; - m_CurrentBlock = SmallObjectFile; - m_CurrentInsertOffset = 0; + m_WriteBlock.reset(); } } @@ -580,7 +575,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) std::shared_ptr<ChunkBlock> NewBlockFile; uint64_t WriteOffset = {}; - uint32_t NewBlockIndex = {}; + uint32_t NewBlockIndex = m_WriteBlockIndex.load(); std::unordered_map<IoHash, CasDiskLocation> MovedBlocks; for (auto BlockIndex : BlocksToReWrite) @@ -597,18 +592,18 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) // Can we create a Sub-IoBuffer from our main buffer even if the size has grown past the initial // size when creating it? - RwLock::ExclusiveLockScope _i(m_InsertLock); - auto BlockFile = m_OpenBlocks[BlockIndex]; + RwLock::ExclusiveLockScope _i(m_LocationMapLock); + auto BlockFile = m_ChunkBlocks[BlockIndex]; ZEN_INFO("marking cas store file for delete {}, block {}", m_ContainerBaseName, std::to_string(BlockIndex)); BlockFile->MarkAsDeleteOnClose(); - m_OpenBlocks.erase(BlockIndex); + BlockFile.reset(); continue; } std::shared_ptr<ChunkBlock> BlockFile; { - RwLock::ExclusiveLockScope _i(m_InsertLock); - BlockFile = m_OpenBlocks[BlockIndex]; + RwLock::SharedLockScope _i(m_LocationMapLock); + BlockFile = m_ChunkBlocks[BlockIndex]; BlockFile->Open(); } @@ -622,23 +617,17 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) { + NewBlockIndex = m_WriteBlockIndex.load(); { - RwLock::ExclusiveLockScope _i(m_InsertLock); + RwLock::ExclusiveLockScope _l(m_LocationMapLock); if (NewBlockFile) { - m_OpenBlocks[NewBlockIndex] = NewBlockFile; - RwLock::ExclusiveLockScope _l(m_LocationMapLock); for (const auto& MovedEntry : MovedBlocks) { m_LocationMap[MovedEntry.first] = MovedEntry.second; m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); } - } - NewBlockIndex = m_CurrentBlockIndex + 1; - while (m_OpenBlocks.contains(NewBlockIndex)) - { - NewBlockIndex++; - if (NewBlockIndex == m_CurrentBlockIndex) + if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) { ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", m_ContainerBaseName, @@ -646,9 +635,12 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) return; } } - m_OpenBlocks[NewBlockIndex] = std::make_shared<ChunkBlock>(m_Config.RootDirectory, - m_ContainerBaseName, - NewBlockIndex); // Make sure nobody steals this slot + while (m_ChunkBlocks.contains(NewBlockIndex)) + { + NewBlockIndex++; + } + NewBlockFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, NewBlockIndex); + m_ChunkBlocks[NewBlockIndex] = NewBlockFile; } std::error_code Error; @@ -659,18 +651,17 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) return; } - if (Space.Free < (m_MaxBlockSize * 2)) // Never let GC steal the last block space + if (Space.Free < m_MaxBlockSize) { ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", m_Config.RootDirectory / m_ContainerBaseName, m_MaxBlockSize * m_MaxBlockSize, NiceBytes(Space.Free)); - RwLock::ExclusiveLockScope _i(m_InsertLock); - m_OpenBlocks.erase(NewBlockIndex); + RwLock::ExclusiveLockScope _l(m_LocationMapLock); + m_ChunkBlocks.erase(NewBlockIndex); return; } - NewBlockFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, NewBlockIndex); NewBlockFile->Create(m_MaxBlockSize); MovedBlocks.clear(); WriteOffset = 0; @@ -685,11 +676,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) Chunk.clear(); // Remap moved chunks to the new block file - RwLock::ExclusiveLockScope _i(m_InsertLock); + RwLock::ExclusiveLockScope _l(m_LocationMapLock); if (NewBlockFile) { - m_OpenBlocks[NewBlockIndex] = NewBlockFile; - RwLock::ExclusiveLockScope _l(m_LocationMapLock); for (const auto& MovedEntry : MovedBlocks) { m_LocationMap[MovedEntry.first] = MovedEntry.second; @@ -698,7 +687,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) } ZEN_INFO("marking cas store file for delete {}, block index {}", m_ContainerBaseName, BlockIndex); BlockFile->MarkAsDeleteOnClose(); - m_OpenBlocks.erase(BlockIndex); + BlockFile.reset(); } } @@ -1088,7 +1077,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) } auto SmallObjectFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, BlockIndex); SmallObjectFile->Open(); - m_OpenBlocks[BlockIndex] = SmallObjectFile; + m_ChunkBlocks[BlockIndex] = SmallObjectFile; } catch (const std::invalid_argument&) { @@ -1099,31 +1088,20 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) uint64_t LargestSizeToUse = m_MaxBlockSize - m_PayloadAlignment; uint64_t SmallestBlockSize = LargestSizeToUse; - bool CreateNewBlock = m_OpenBlocks.empty(); - if (!CreateNewBlock) + bool OpenExistingBlock = false; + for (const auto& Entry : BlockUsage) { - for (const auto& Entry : BlockUsage) + if (Entry.second < SmallestBlockSize) { - if (Entry.second < SmallestBlockSize) - { - SmallestBlockSize = Entry.second; - m_CurrentBlockIndex = Entry.first; - CreateNewBlock = false; - } + SmallestBlockSize = Entry.second; + m_WriteBlockIndex = Entry.first; + OpenExistingBlock = true; } } - if (CreateNewBlock) - { - auto SmallObjectFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex); - SmallObjectFile->Create(m_MaxBlockSize); - m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile; - m_CurrentBlock = SmallObjectFile; - m_CurrentInsertOffset = 0; - } - else + if (OpenExistingBlock) { - m_CurrentBlock = m_OpenBlocks[m_CurrentBlockIndex]; + m_WriteBlock = m_ChunkBlocks[m_WriteBlockIndex]; m_CurrentInsertOffset = AlignPositon(SmallestBlockSize, m_PayloadAlignment); } |