aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-03-18 12:40:28 +0100
committerDan Engelbrecht <[email protected]>2022-03-31 11:28:32 +0200
commit567db5817c15e8e1cf4415cb38642fe8bcd533a0 (patch)
treee29019f39f271595ee8d78fc004eb737497842af /zenstore/compactcas.cpp
parentrename compact cas test cases (diff)
downloadzen-567db5817c15e8e1cf4415cb38642fe8bcd533a0.tar.xz
zen-567db5817c15e8e1cf4415cb38642fe8bcd533a0.zip
Clean up thread locking
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp154
1 files changed, 66 insertions, 88 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 5261a89e4..28f989bfb 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -240,32 +240,38 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
// New entry
- if ((m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize)
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load();
+ auto WriteBlock = m_WriteBlock.lock();
+ if (!WriteBlock || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize)
{
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- uint32_t NewBlockIndex = m_CurrentBlockIndex + 1;
- while (m_OpenBlocks.contains(NewBlockIndex))
{
- NewBlockIndex++;
- if (NewBlockIndex == m_CurrentBlockIndex)
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex)
{
throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName));
}
+ WriteBlockIndex += WriteBlock ? 1 : 0;
+ while (m_ChunkBlocks.contains(WriteBlockIndex))
+ {
+ WriteBlockIndex++;
+ }
+ WriteBlock = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, WriteBlockIndex);
+ m_ChunkBlocks[WriteBlockIndex] = WriteBlock;
+ m_WriteBlockIndex.store(WriteBlockIndex);
}
- m_CurrentBlockIndex = NewBlockIndex;
- auto SmallObjectFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex);
- SmallObjectFile->Create(m_MaxBlockSize);
- m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
- m_CurrentBlock = SmallObjectFile;
- m_CurrentInsertOffset = 0;
+ m_WriteBlock = WriteBlock;
+ m_CurrentInsertOffset = 0;
+ WriteBlock->Create(m_MaxBlockSize);
+ }
+ else
+ {
+ WriteBlock->Open();
}
const uint64_t InsertOffset = m_CurrentInsertOffset;
- auto CurrentBlock = m_CurrentBlock.lock();
- CurrentBlock->Open();
- CurrentBlock->Write(ChunkData, ChunkSize, InsertOffset);
+ WriteBlock->Write(ChunkData, ChunkSize, InsertOffset);
m_CurrentInsertOffset = AlignPositon(InsertOffset + ChunkSize, m_PayloadAlignment);
- const CasLocation Location(m_CurrentBlockIndex, InsertOffset, ChunkSize);
+ const CasLocation Location(WriteBlockIndex, InsertOffset, ChunkSize);
CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = CasDiskLocation(Location, m_PayloadAlignment)};
RwLock::ExclusiveLockScope __(m_LocationMapLock);
@@ -291,7 +297,7 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
CasLocation Location = KeyIt->second.Get(m_PayloadAlignment);
- if (auto BlockIt = m_OpenBlocks.find(Location.BlockIndex); BlockIt != m_OpenBlocks.end())
+ if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
{
return BlockIt->second->GetRange(Location.Offset, Location.Size);
}
@@ -325,9 +331,12 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks)
void
CasContainerStrategy::Flush()
{
- RwLock::SharedLockScope _(m_InsertLock);
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
m_CasLog.Flush();
- m_CurrentBlock.lock()->Flush();
+ if (auto WriteBlock = m_WriteBlock.lock())
+ {
+ WriteBlock->Flush();
+ }
}
void
@@ -350,7 +359,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time?
RwLock::SharedLockScope __(m_LocationMapLock);
- for (const auto& Block : m_OpenBlocks)
+ for (const auto& Block : m_ChunkBlocks)
{
uint64_t WindowStart = 0;
uint64_t WindowEnd = WindowSize;
@@ -402,7 +411,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
{
IoHashStream Hasher;
const CasLocation Location = Entry.Location.Get(m_PayloadAlignment);
- auto& SmallObjectFile = *m_OpenBlocks[Location.BlockIndex];
+ auto& SmallObjectFile = *m_ChunkBlocks[Location.BlockIndex];
SmallObjectFile.StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) {
Hasher.Append(Data, Size);
});
@@ -485,9 +494,12 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
RwLock::ExclusiveLockScope _l(m_LocationMapLock);
m_CasLog.Flush();
- m_CurrentBlock.lock()->Flush();
+ if (auto WriteBlock = m_WriteBlock.lock())
+ {
+ WriteBlock->Flush();
+ }
- BlocksToReWrite.reserve(m_OpenBlocks.size());
+ BlocksToReWrite.reserve(m_ChunkBlocks.size());
if (m_LocationMap.empty())
{
@@ -553,26 +565,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
// TODO: Be smarter about terminating current block - we should probably not rewrite if there is just
// a small amount of bytes to gain.
- if (BlocksToReWrite.contains(m_CurrentBlockIndex))
+ if (BlocksToReWrite.contains(m_WriteBlockIndex.load()))
{
- uint32_t NewBlockIndex = m_CurrentBlockIndex + 1;
- while (m_OpenBlocks.contains(NewBlockIndex))
- {
- NewBlockIndex++;
- if (NewBlockIndex == m_CurrentBlockIndex)
- {
- ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded",
- m_ContainerBaseName,
- static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
- return;
- }
- }
- m_CurrentBlockIndex = NewBlockIndex;
- auto SmallObjectFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex);
- SmallObjectFile->Create(m_MaxBlockSize);
- m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
- m_CurrentBlock = SmallObjectFile;
- m_CurrentInsertOffset = 0;
+ m_WriteBlock.reset();
}
}
@@ -580,7 +575,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
std::shared_ptr<ChunkBlock> NewBlockFile;
uint64_t WriteOffset = {};
- uint32_t NewBlockIndex = {};
+ uint32_t NewBlockIndex = m_WriteBlockIndex.load();
std::unordered_map<IoHash, CasDiskLocation> MovedBlocks;
for (auto BlockIndex : BlocksToReWrite)
@@ -597,18 +592,18 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
// Can we create a Sub-IoBuffer from our main buffer even if the size has grown past the initial
// size when creating it?
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- auto BlockFile = m_OpenBlocks[BlockIndex];
+ RwLock::ExclusiveLockScope _i(m_LocationMapLock);
+ auto BlockFile = m_ChunkBlocks[BlockIndex];
ZEN_INFO("marking cas store file for delete {}, block {}", m_ContainerBaseName, std::to_string(BlockIndex));
BlockFile->MarkAsDeleteOnClose();
- m_OpenBlocks.erase(BlockIndex);
+ BlockFile.reset();
continue;
}
std::shared_ptr<ChunkBlock> BlockFile;
{
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- BlockFile = m_OpenBlocks[BlockIndex];
+ RwLock::SharedLockScope _i(m_LocationMapLock);
+ BlockFile = m_ChunkBlocks[BlockIndex];
BlockFile->Open();
}
@@ -622,23 +617,17 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
{
+ NewBlockIndex = m_WriteBlockIndex.load();
{
- RwLock::ExclusiveLockScope _i(m_InsertLock);
+ RwLock::ExclusiveLockScope _l(m_LocationMapLock);
if (NewBlockFile)
{
- m_OpenBlocks[NewBlockIndex] = NewBlockFile;
- RwLock::ExclusiveLockScope _l(m_LocationMapLock);
for (const auto& MovedEntry : MovedBlocks)
{
m_LocationMap[MovedEntry.first] = MovedEntry.second;
m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second});
}
- }
- NewBlockIndex = m_CurrentBlockIndex + 1;
- while (m_OpenBlocks.contains(NewBlockIndex))
- {
- NewBlockIndex++;
- if (NewBlockIndex == m_CurrentBlockIndex)
+ if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex)
{
ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded",
m_ContainerBaseName,
@@ -646,9 +635,12 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
return;
}
}
- m_OpenBlocks[NewBlockIndex] = std::make_shared<ChunkBlock>(m_Config.RootDirectory,
- m_ContainerBaseName,
- NewBlockIndex); // Make sure nobody steals this slot
+ while (m_ChunkBlocks.contains(NewBlockIndex))
+ {
+ NewBlockIndex++;
+ }
+ NewBlockFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, NewBlockIndex);
+ m_ChunkBlocks[NewBlockIndex] = NewBlockFile;
}
std::error_code Error;
@@ -659,18 +651,17 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
return;
}
- if (Space.Free < (m_MaxBlockSize * 2)) // Never let GC steal the last block space
+ if (Space.Free < m_MaxBlockSize)
{
ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}",
m_Config.RootDirectory / m_ContainerBaseName,
m_MaxBlockSize * m_MaxBlockSize,
NiceBytes(Space.Free));
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- m_OpenBlocks.erase(NewBlockIndex);
+ RwLock::ExclusiveLockScope _l(m_LocationMapLock);
+ m_ChunkBlocks.erase(NewBlockIndex);
return;
}
- NewBlockFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, NewBlockIndex);
NewBlockFile->Create(m_MaxBlockSize);
MovedBlocks.clear();
WriteOffset = 0;
@@ -685,11 +676,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
Chunk.clear();
// Remap moved chunks to the new block file
- RwLock::ExclusiveLockScope _i(m_InsertLock);
+ RwLock::ExclusiveLockScope _l(m_LocationMapLock);
if (NewBlockFile)
{
- m_OpenBlocks[NewBlockIndex] = NewBlockFile;
- RwLock::ExclusiveLockScope _l(m_LocationMapLock);
for (const auto& MovedEntry : MovedBlocks)
{
m_LocationMap[MovedEntry.first] = MovedEntry.second;
@@ -698,7 +687,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
}
ZEN_INFO("marking cas store file for delete {}, block index {}", m_ContainerBaseName, BlockIndex);
BlockFile->MarkAsDeleteOnClose();
- m_OpenBlocks.erase(BlockIndex);
+ BlockFile.reset();
}
}
@@ -1088,7 +1077,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
}
auto SmallObjectFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, BlockIndex);
SmallObjectFile->Open();
- m_OpenBlocks[BlockIndex] = SmallObjectFile;
+ m_ChunkBlocks[BlockIndex] = SmallObjectFile;
}
catch (const std::invalid_argument&)
{
@@ -1099,31 +1088,20 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
uint64_t LargestSizeToUse = m_MaxBlockSize - m_PayloadAlignment;
uint64_t SmallestBlockSize = LargestSizeToUse;
- bool CreateNewBlock = m_OpenBlocks.empty();
- if (!CreateNewBlock)
+ bool OpenExistingBlock = false;
+ for (const auto& Entry : BlockUsage)
{
- for (const auto& Entry : BlockUsage)
+ if (Entry.second < SmallestBlockSize)
{
- if (Entry.second < SmallestBlockSize)
- {
- SmallestBlockSize = Entry.second;
- m_CurrentBlockIndex = Entry.first;
- CreateNewBlock = false;
- }
+ SmallestBlockSize = Entry.second;
+ m_WriteBlockIndex = Entry.first;
+ OpenExistingBlock = true;
}
}
- if (CreateNewBlock)
- {
- auto SmallObjectFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex);
- SmallObjectFile->Create(m_MaxBlockSize);
- m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
- m_CurrentBlock = SmallObjectFile;
- m_CurrentInsertOffset = 0;
- }
- else
+ if (OpenExistingBlock)
{
- m_CurrentBlock = m_OpenBlocks[m_CurrentBlockIndex];
+ m_WriteBlock = m_ChunkBlocks[m_WriteBlockIndex];
m_CurrentInsertOffset = AlignPositon(SmallestBlockSize, m_PayloadAlignment);
}