aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-03-22 22:11:49 +0100
committerDan Engelbrecht <[email protected]>2022-03-31 11:29:26 +0200
commitf1b449696d903e273b0a0c04160846543d2830a7 (patch)
treef4e0c59dd165f7b0d0adf7f30ee14e9172a3236c /zenstore/compactcas.cpp
parentDon't GC currently writing block, reduce lock contention during GC (diff)
downloadzen-f1b449696d903e273b0a0c04160846543d2830a7.tar.xz
zen-f1b449696d903e273b0a0c04160846543d2830a7.zip
memory order for atomic values
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp115
1 files changed, 63 insertions, 52 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index faf54c106..170687600 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -112,7 +112,7 @@ CasContainerStrategy::ChunkBlock::ChunkBlock(const std::filesystem::path& BlockP
CasContainerStrategy::ChunkBlock::~ChunkBlock()
{
- if (m_IsOpened.load())
+ if (m_IsOpened.load(std::memory_order_acquire))
{
m_File.Detach();
}
@@ -129,13 +129,13 @@ CasContainerStrategy::ChunkBlock::Open()
{
// Open can have a race if multiple requests wants to read the same block
// Create or ~ChunkBlock() can not have a race so we only need to guard Open()
- if (m_IsOpened.load(std::memory_order::memory_order_acquire))
+ if (m_IsOpened.load(std::memory_order_acquire))
{
return;
}
RwLock::ExclusiveLockScope _(m_OpenLock);
- if (m_IsOpened.load(std::memory_order::memory_order_acquire))
+ if (m_IsOpened.load(std::memory_order_acquire))
{
return;
}
@@ -143,13 +143,13 @@ CasContainerStrategy::ChunkBlock::Open()
m_File.Open(m_Path, false);
void* FileHandle = m_File.Handle();
m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize());
- m_IsOpened.store(true);
+ m_IsOpened.store(true, std::memory_order_release);
}
void
CasContainerStrategy::ChunkBlock::Create(uint64_t InitialSize)
{
- ZEN_ASSERT(!m_IsOpened.load());
+ ZEN_ASSERT(!m_IsOpened.load(std::memory_order_acquire));
auto ParentPath = m_Path.parent_path();
if (!std::filesystem::is_directory(ParentPath))
@@ -164,13 +164,13 @@ CasContainerStrategy::ChunkBlock::Create(uint64_t InitialSize)
}
void* FileHandle = m_File.Handle();
m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize);
- m_IsOpened.store(true);
+ m_IsOpened.store(true, std::memory_order_release);
}
uint64_t
CasContainerStrategy::ChunkBlock::FileSize()
{
- ZEN_ASSERT(m_IsOpened.load());
+ ZEN_ASSERT(m_IsOpened.load(std::memory_order_acquire));
return m_File.FileSize();
}
@@ -200,21 +200,21 @@ CasContainerStrategy::ChunkBlock::GetChunk(uint64_t Offset, uint64_t Size)
void
CasContainerStrategy::ChunkBlock::Read(void* Data, uint64_t Size, uint64_t FileOffset)
{
- ZEN_ASSERT(m_IsOpened.load());
+ ZEN_ASSERT(m_IsOpened.load(std::memory_order_acquire));
m_File.Read(Data, Size, FileOffset);
}
void
CasContainerStrategy::ChunkBlock::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
{
- ZEN_ASSERT(m_IsOpened.load());
+ ZEN_ASSERT(m_IsOpened.load(std::memory_order_acquire));
m_File.Write(Data, Size, FileOffset);
}
void
CasContainerStrategy::ChunkBlock::Flush()
{
- if (!m_IsOpened.load())
+ if (!m_IsOpened.load(std::memory_order_acquire))
{
return;
}
@@ -226,7 +226,7 @@ CasContainerStrategy::ChunkBlock::StreamByteRange(uint64_t FileOffse
uint64_t Size,
std::function<void(const void* Data, uint64_t Size)>&& ChunkFun)
{
- ZEN_ASSERT(m_IsOpened.load());
+ ZEN_ASSERT(m_IsOpened.load(std::memory_order_acquire));
m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun));
}
@@ -263,59 +263,71 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint3
CasStore::InsertResult
CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
{
- RwLock::ExclusiveLockScope _i(m_InsertLock);
-
+ uint32_t WriteBlockIndex;
+ std::shared_ptr<ChunkBlock> WriteBlock;
+ uint64_t InsertOffset;
{
- RwLock::SharedLockScope _l(m_LocationMapLock);
- auto KeyIt = m_LocationMap.find(ChunkHash);
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
- if (KeyIt != m_LocationMap.end())
{
- return CasStore::InsertResult{.New = false};
+ RwLock::SharedLockScope _l(m_LocationMapLock);
+ auto KeyIt = m_LocationMap.find(ChunkHash);
+
+ if (KeyIt != m_LocationMap.end())
+ {
+ return CasStore::InsertResult{.New = false};
+ }
}
- }
- // New entry
+ // New entry
- uint32_t WriteBlockIndex = m_WriteBlockIndex.load();
- auto WriteBlock = m_WriteBlock.lock();
- if (!WriteBlock || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize)
- {
+ WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ WriteBlock = m_WriteBlock.lock();
+ if (!WriteBlock || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize)
{
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex)
+ if (WriteBlock)
{
- throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName));
+ WriteBlock->Flush();
}
- WriteBlockIndex += WriteBlock ? 1 : 0;
- while (m_ChunkBlocks.contains(WriteBlockIndex))
{
- WriteBlockIndex = (WriteBlockIndex + 1) & CasDiskLocation::MaxBlockIndex;
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex)
+ {
+ throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName));
+ }
+ WriteBlockIndex += WriteBlock ? 1 : 0;
+ while (m_ChunkBlocks.contains(WriteBlockIndex))
+ {
+ WriteBlockIndex = (WriteBlockIndex + 1) & CasDiskLocation::MaxBlockIndex;
+ }
+ auto BlockPath = BuildUcasPath(m_BlocksBasePath, WriteBlockIndex);
+ WriteBlock = std::make_shared<ChunkBlock>(BlockPath);
+ m_ChunkBlocks[WriteBlockIndex] = WriteBlock;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
}
- auto BlockPath = BuildUcasPath(m_BlocksBasePath, WriteBlockIndex);
- WriteBlock = std::make_shared<ChunkBlock>(BlockPath);
- m_ChunkBlocks[WriteBlockIndex] = WriteBlock;
- m_WriteBlockIndex.store(WriteBlockIndex);
+ m_WriteBlock = WriteBlock;
+ m_CurrentInsertOffset = 0;
+ WriteBlock->Create(m_MaxBlockSize);
}
- m_WriteBlock = WriteBlock;
- m_CurrentInsertOffset = 0;
- WriteBlock->Create(m_MaxBlockSize);
- }
- else
- {
- WriteBlock->Open();
+ else
+ {
+ WriteBlock->Open();
+ }
+ InsertOffset = m_CurrentInsertOffset;
+ m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment);
}
- const uint64_t InsertOffset = m_CurrentInsertOffset;
+
WriteBlock->Write(ChunkData, ChunkSize, InsertOffset);
- m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment);
const CasLocation Location(WriteBlockIndex, InsertOffset, ChunkSize);
CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = CasDiskLocation(Location, m_PayloadAlignment)};
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- m_LocationMap.emplace(ChunkHash, CasDiskLocation(Location, m_PayloadAlignment));
m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize));
- m_CasLog.Append(IndexEntry);
+ {
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ m_LocationMap.emplace(ChunkHash, CasDiskLocation(Location, m_PayloadAlignment));
+ m_CasLog.Append(IndexEntry);
+ }
return CasStore::InsertResult{.New = true};
}
@@ -377,7 +389,7 @@ CasContainerStrategy::Flush()
RwLock::ExclusiveLockScope _(m_InsertLock);
if (m_CurrentInsertOffset > 0)
{
- uint32_t WriteBlockIndex = m_WriteBlockIndex.load();
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
auto WriteBlock = m_WriteBlock.lock();
WriteBlockIndex++;
while (m_ChunkBlocks.contains(WriteBlockIndex))
@@ -386,7 +398,7 @@ CasContainerStrategy::Flush()
}
WriteBlock->Flush();
m_WriteBlock.reset();
- m_WriteBlockIndex = WriteBlockIndex;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
m_CurrentInsertOffset = 0;
}
}
@@ -544,7 +556,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
RwLock::SharedLockScope _l(m_LocationMapLock);
LocationMap.reserve(m_LocationMap.size());
bool IsWriting = !m_WriteBlock.expired();
- uint32_t WritingBlock = m_WriteBlockIndex;
+ uint32_t WritingBlock = m_WriteBlockIndex.load(std::memory_order_acquire);
for (const auto& Entry : m_LocationMap)
{
CasLocation Location = Entry.second.Get(m_PayloadAlignment);
@@ -617,7 +629,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
if (!PerformDelete)
{
- uint64_t TotalSize = m_TotalSize.load();
+ uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
m_Config.RootDirectory / m_ContainerBaseName,
DeleteCount,
@@ -636,7 +648,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
std::shared_ptr<ChunkBlock> NewBlockFile;
uint64_t WriteOffset = {};
- uint32_t NewBlockIndex = m_WriteBlockIndex.load();
+ uint32_t NewBlockIndex = {};
std::unordered_map<IoHash, CasDiskLocation> MovedChunks;
std::vector<IoHash> DeletedChunks;
DeletedChunks.reserve(DeleteCount);
@@ -695,8 +707,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
{
- uint32_t NextBlockIndex = m_WriteBlockIndex.load();
-
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order::memory_order_relaxed);
{
RwLock::ExclusiveLockScope _l(m_LocationMapLock);
if (NewBlockFile)