diff options
| author | Dan Engelbrecht <[email protected]> | 2022-03-22 22:11:49 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-03-31 11:29:26 +0200 |
| commit | f1b449696d903e273b0a0c04160846543d2830a7 (patch) | |
| tree | f4e0c59dd165f7b0d0adf7f30ee14e9172a3236c /zenstore/compactcas.cpp | |
| parent | Don't GC currently writing block, reduce lock contention during GC (diff) | |
| download | zen-f1b449696d903e273b0a0c04160846543d2830a7.tar.xz zen-f1b449696d903e273b0a0c04160846543d2830a7.zip | |
memory order for atomic values
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 115 |
1 files changed, 63 insertions, 52 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index faf54c106..170687600 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -112,7 +112,7 @@ CasContainerStrategy::ChunkBlock::ChunkBlock(const std::filesystem::path& BlockP CasContainerStrategy::ChunkBlock::~ChunkBlock() { - if (m_IsOpened.load()) + if (m_IsOpened.load(std::memory_order_acquire)) { m_File.Detach(); } @@ -129,13 +129,13 @@ CasContainerStrategy::ChunkBlock::Open() { // Open can have a race if multiple requests wants to read the same block // Create or ~ChunkBlock() can not have a race so we only need to guard Open() - if (m_IsOpened.load(std::memory_order::memory_order_acquire)) + if (m_IsOpened.load(std::memory_order_acquire)) { return; } RwLock::ExclusiveLockScope _(m_OpenLock); - if (m_IsOpened.load(std::memory_order::memory_order_acquire)) + if (m_IsOpened.load(std::memory_order_acquire)) { return; } @@ -143,13 +143,13 @@ CasContainerStrategy::ChunkBlock::Open() m_File.Open(m_Path, false); void* FileHandle = m_File.Handle(); m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize()); - m_IsOpened.store(true); + m_IsOpened.store(true, std::memory_order_release); } void CasContainerStrategy::ChunkBlock::Create(uint64_t InitialSize) { - ZEN_ASSERT(!m_IsOpened.load()); + ZEN_ASSERT(!m_IsOpened.load(std::memory_order_acquire)); auto ParentPath = m_Path.parent_path(); if (!std::filesystem::is_directory(ParentPath)) @@ -164,13 +164,13 @@ CasContainerStrategy::ChunkBlock::Create(uint64_t InitialSize) } void* FileHandle = m_File.Handle(); m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize); - m_IsOpened.store(true); + m_IsOpened.store(true, std::memory_order_release); } uint64_t CasContainerStrategy::ChunkBlock::FileSize() { - ZEN_ASSERT(m_IsOpened.load()); + ZEN_ASSERT(m_IsOpened.load(std::memory_order_acquire)); return m_File.FileSize(); } @@ -200,21 +200,21 @@ CasContainerStrategy::ChunkBlock::GetChunk(uint64_t Offset, uint64_t Size) void CasContainerStrategy::ChunkBlock::Read(void* Data, uint64_t Size, uint64_t FileOffset) { - ZEN_ASSERT(m_IsOpened.load()); + ZEN_ASSERT(m_IsOpened.load(std::memory_order_acquire)); m_File.Read(Data, Size, FileOffset); } void CasContainerStrategy::ChunkBlock::Write(const void* Data, uint64_t Size, uint64_t FileOffset) { - ZEN_ASSERT(m_IsOpened.load()); + ZEN_ASSERT(m_IsOpened.load(std::memory_order_acquire)); m_File.Write(Data, Size, FileOffset); } void CasContainerStrategy::ChunkBlock::Flush() { - if (!m_IsOpened.load()) + if (!m_IsOpened.load(std::memory_order_acquire)) { return; } @@ -226,7 +226,7 @@ CasContainerStrategy::ChunkBlock::StreamByteRange(uint64_t FileOffse uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun) { - ZEN_ASSERT(m_IsOpened.load()); + ZEN_ASSERT(m_IsOpened.load(std::memory_order_acquire)); m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); } @@ -263,59 +263,71 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint3 CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { - RwLock::ExclusiveLockScope _i(m_InsertLock); - + uint32_t WriteBlockIndex; + std::shared_ptr<ChunkBlock> WriteBlock; + uint64_t InsertOffset; { - RwLock::SharedLockScope _l(m_LocationMapLock); - auto KeyIt = m_LocationMap.find(ChunkHash); + RwLock::ExclusiveLockScope _i(m_InsertLock); - if (KeyIt != m_LocationMap.end()) { - return CasStore::InsertResult{.New = false}; + RwLock::SharedLockScope _l(m_LocationMapLock); + auto KeyIt = m_LocationMap.find(ChunkHash); + + if (KeyIt != m_LocationMap.end()) + { + return CasStore::InsertResult{.New = false}; + } } - } - // New entry + // New entry - uint32_t WriteBlockIndex = m_WriteBlockIndex.load(); - auto WriteBlock = m_WriteBlock.lock(); - if (!WriteBlock || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize) - { + WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + WriteBlock = m_WriteBlock.lock(); + if (!WriteBlock || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize) { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) + if (WriteBlock) { - throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); + WriteBlock->Flush(); } - WriteBlockIndex += WriteBlock ? 1 : 0; - while (m_ChunkBlocks.contains(WriteBlockIndex)) { - WriteBlockIndex = (WriteBlockIndex + 1) & CasDiskLocation::MaxBlockIndex; + RwLock::ExclusiveLockScope __(m_LocationMapLock); + if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) + { + throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); + } + WriteBlockIndex += WriteBlock ? 1 : 0; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex = (WriteBlockIndex + 1) & CasDiskLocation::MaxBlockIndex; + } + auto BlockPath = BuildUcasPath(m_BlocksBasePath, WriteBlockIndex); + WriteBlock = std::make_shared<ChunkBlock>(BlockPath); + m_ChunkBlocks[WriteBlockIndex] = WriteBlock; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); } - auto BlockPath = BuildUcasPath(m_BlocksBasePath, WriteBlockIndex); - WriteBlock = std::make_shared<ChunkBlock>(BlockPath); - m_ChunkBlocks[WriteBlockIndex] = WriteBlock; - m_WriteBlockIndex.store(WriteBlockIndex); + m_WriteBlock = WriteBlock; + m_CurrentInsertOffset = 0; + WriteBlock->Create(m_MaxBlockSize); } - m_WriteBlock = WriteBlock; - m_CurrentInsertOffset = 0; - WriteBlock->Create(m_MaxBlockSize); - } - else - { - WriteBlock->Open(); + else + { + WriteBlock->Open(); + } + InsertOffset = m_CurrentInsertOffset; + m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment); } - const uint64_t InsertOffset = m_CurrentInsertOffset; + WriteBlock->Write(ChunkData, ChunkSize, InsertOffset); - m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment); const CasLocation Location(WriteBlockIndex, InsertOffset, ChunkSize); CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = CasDiskLocation(Location, m_PayloadAlignment)}; - RwLock::ExclusiveLockScope __(m_LocationMapLock); - m_LocationMap.emplace(ChunkHash, CasDiskLocation(Location, m_PayloadAlignment)); m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize)); - m_CasLog.Append(IndexEntry); + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + m_LocationMap.emplace(ChunkHash, CasDiskLocation(Location, m_PayloadAlignment)); + m_CasLog.Append(IndexEntry); + } return CasStore::InsertResult{.New = true}; } @@ -377,7 +389,7 @@ CasContainerStrategy::Flush() RwLock::ExclusiveLockScope _(m_InsertLock); if (m_CurrentInsertOffset > 0) { - uint32_t WriteBlockIndex = m_WriteBlockIndex.load(); + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); auto WriteBlock = m_WriteBlock.lock(); WriteBlockIndex++; while (m_ChunkBlocks.contains(WriteBlockIndex)) @@ -386,7 +398,7 @@ CasContainerStrategy::Flush() } WriteBlock->Flush(); m_WriteBlock.reset(); - m_WriteBlockIndex = WriteBlockIndex; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); m_CurrentInsertOffset = 0; } } @@ -544,7 +556,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) RwLock::SharedLockScope _l(m_LocationMapLock); LocationMap.reserve(m_LocationMap.size()); bool IsWriting = !m_WriteBlock.expired(); - uint32_t WritingBlock = m_WriteBlockIndex; + uint32_t WritingBlock = m_WriteBlockIndex.load(std::memory_order_acquire); for (const auto& Entry : m_LocationMap) { CasLocation Location = Entry.second.Get(m_PayloadAlignment); @@ -617,7 +629,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) if (!PerformDelete) { - uint64_t TotalSize = m_TotalSize.load(); + uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", m_Config.RootDirectory / m_ContainerBaseName, DeleteCount, @@ -636,7 +648,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) std::shared_ptr<ChunkBlock> NewBlockFile; uint64_t WriteOffset = {}; - uint32_t NewBlockIndex = m_WriteBlockIndex.load(); + uint32_t NewBlockIndex = {}; std::unordered_map<IoHash, CasDiskLocation> MovedChunks; std::vector<IoHash> DeletedChunks; DeletedChunks.reserve(DeleteCount); @@ -695,8 +707,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) { - uint32_t NextBlockIndex = m_WriteBlockIndex.load(); - + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order::memory_order_relaxed); { RwLock::ExclusiveLockScope _l(m_LocationMapLock); if (NewBlockFile) |