aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-03-22 18:33:06 +0100
committerDan Engelbrecht <[email protected]>2022-03-31 11:29:26 +0200
commit6c7ba22c9a287bf4bd2c06d59f17753b6a5280ea (patch)
tree56db8565952ddca186ca12eb98d3715d509946eb /zenstore/compactcas.cpp
parentAdd Flush to workthreadpool (diff)
downloadzen-6c7ba22c9a287bf4bd2c06d59f17753b6a5280ea.tar.xz
zen-6c7ba22c9a287bf4bd2c06d59f17753b6a5280ea.zip
Don't GC currently writing block, reduce lock contention during GC
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp174
1 files changed, 60 insertions, 114 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 8571be065..faf54c106 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -267,13 +267,6 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
{
RwLock::SharedLockScope _l(m_LocationMapLock);
-// for (const auto& Entry : m_LocationMap)
-// {
-// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex;
-// CHECK(m_ChunkBlocks.contains(CheckBlockIndex));
-// CHECK(m_ChunkBlocks[CheckBlockIndex]);
-// }
-
auto KeyIt = m_LocationMap.find(ChunkHash);
if (KeyIt != m_LocationMap.end())
@@ -290,13 +283,6 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
{
{
RwLock::ExclusiveLockScope __(m_LocationMapLock);
-// for (const auto& Entry : m_LocationMap)
-// {
-// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex;
-// CHECK(m_ChunkBlocks.contains(CheckBlockIndex));
-// CHECK(m_ChunkBlocks[CheckBlockIndex]);
-// }
-
if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex)
{
throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName));
@@ -310,13 +296,6 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
WriteBlock = std::make_shared<ChunkBlock>(BlockPath);
m_ChunkBlocks[WriteBlockIndex] = WriteBlock;
m_WriteBlockIndex.store(WriteBlockIndex);
-
-// for (const auto& Entry : m_LocationMap)
-// {
-// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex;
-// CHECK(m_ChunkBlocks.contains(CheckBlockIndex));
-// CHECK(m_ChunkBlocks[CheckBlockIndex]);
-// }
}
m_WriteBlock = WriteBlock;
m_CurrentInsertOffset = 0;
@@ -351,12 +330,6 @@ IoBuffer
CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
RwLock::SharedLockScope _(m_LocationMapLock);
-// for (const auto& Entry : m_LocationMap)
-// {
-// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex;
-// CHECK(m_ChunkBlocks.contains(CheckBlockIndex));
-// CHECK(m_ChunkBlocks[CheckBlockIndex]);
-// }
if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
{
@@ -564,21 +537,13 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
// path to the next new block.
ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName);
- std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
- std::vector<std::unordered_set<IoHash, IoHash::Hasher>> KeepChunks;
- std::vector<std::unordered_set<IoHash, IoHash::Hasher>> DeleteChunks;
- std::vector<IoHash> DeletedChunks;
- std::vector<IoHash> PendingDeletedChunks;
- std::unordered_set<uint32_t> BlocksToReWrite;
-// std::uint64_t CurrentWritePosition;
-// std::uint64_t CurrentWriteBlock;
- std::unordered_map<IoHash, CasLocation, IoHash::Hasher> LocationMap;
- size_t BlockCount;
+ std::unordered_map<IoHash, CasLocation, IoHash::Hasher> LocationMap;
+ size_t BlockCount;
{
RwLock::SharedLockScope _i(m_InsertLock);
RwLock::SharedLockScope _l(m_LocationMapLock);
LocationMap.reserve(m_LocationMap.size());
- bool IsWriting = !m_WriteBlock.expired();
+ bool IsWriting = !m_WriteBlock.expired();
uint32_t WritingBlock = m_WriteBlockIndex;
for (const auto& Entry : m_LocationMap)
{
@@ -600,11 +565,16 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
const uint64_t TotalChunkCount = LocationMap.size();
+ std::unordered_set<uint32_t> BlocksToReWrite;
+ std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
+ std::vector<std::unordered_set<IoHash, IoHash::Hasher>> KeepChunks;
+ std::vector<std::unordered_set<IoHash, IoHash::Hasher>> DeleteChunks;
+
BlocksToReWrite.reserve(BlockCount);
BlockIndexToChunkMapIndex.reserve(BlockCount);
KeepChunks.reserve(BlockCount);
DeleteChunks.reserve(BlockCount);
- size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount;
+ size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
std::vector<IoHash> TotalChunkHashes;
TotalChunkHashes.reserve(TotalChunkCount);
@@ -627,9 +597,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
uint64_t NewTotalSize = 0;
GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
- auto KeyIt = LocationMap.find(ChunkHash);
+ auto KeyIt = LocationMap.find(ChunkHash);
const CasLocation& ChunkLocation = KeyIt->second;
- size_t ChunkMapIndex = BlockIndexToChunkMapIndex[ChunkLocation.BlockIndex];
+ size_t ChunkMapIndex = BlockIndexToChunkMapIndex[ChunkLocation.BlockIndex];
if (Keep)
{
auto& ChunkMap = KeepChunks[ChunkMapIndex];
@@ -649,38 +619,30 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
{
uint64_t TotalSize = m_TotalSize.load();
ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- DeleteCount,
- NiceBytes(TotalSize - NewTotalSize),
- TotalChunkCount,
- NiceBytes(TotalSize));
+ m_Config.RootDirectory / m_ContainerBaseName,
+ DeleteCount,
+ NiceBytes(TotalSize - NewTotalSize),
+ TotalChunkCount,
+ NiceBytes(TotalSize));
return;
}
- DeletedChunks.reserve(DeleteCount);
+ // {
+ // RwLock::ExclusiveLockScope _i(m_InsertLock);
+ // m_CasLog.Flush();
+ // }
// Move all chunks in blocks that have chunks removed to new blocks
- {
- RwLock::ExclusiveLockScope _i(m_InsertLock);
- m_CasLog.Flush();
- }
-
std::shared_ptr<ChunkBlock> NewBlockFile;
uint64_t WriteOffset = {};
uint32_t NewBlockIndex = m_WriteBlockIndex.load();
- std::unordered_map<IoHash, CasDiskLocation> MovedBlocks;
+ std::unordered_map<IoHash, CasDiskLocation> MovedChunks;
+ std::vector<IoHash> DeletedChunks;
+ DeletedChunks.reserve(DeleteCount);
for (auto BlockIndex : BlocksToReWrite)
{
-// {
-// RwLock::SharedLockScope _i(m_InsertLock);
-// if (m_WriteBlockIndex == BlockIndex)
-// {
-// continue;
-// }
-// }
-
const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
const auto& KeepMap = KeepChunks[ChunkMapIndex];
if (KeepMap.empty())
@@ -688,25 +650,17 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
std::shared_ptr<ChunkBlock> BlockFile;
{
RwLock::ExclusiveLockScope _i(m_LocationMapLock);
- const auto& DeleteMap = DeleteChunks[ChunkMapIndex];
+ const auto& DeleteMap = DeleteChunks[ChunkMapIndex];
for (const auto& ChunkHash : DeleteMap)
{
- auto KeyIt = m_LocationMap.find(ChunkHash);
- CHECK(KeyIt != m_LocationMap.end());
- m_LocationMap.erase(KeyIt);
+ auto KeyIt = m_LocationMap.find(ChunkHash);
const CasLocation& DeleteChunkLocation = KeyIt->second.Get(m_PayloadAlignment);
m_CasLog.Append({.Key = ChunkHash, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone});
m_TotalSize.fetch_sub(static_cast<uint64_t>(DeleteChunkLocation.Size));
+ m_LocationMap.erase(KeyIt);
}
DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
m_ChunkBlocks[BlockIndex].swap(BlockFile);
-
-// for (const auto& Entry : m_LocationMap)
-// {
-// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex;
-// CHECK(m_ChunkBlocks.contains(CheckBlockIndex));
-// CHECK(m_ChunkBlocks[CheckBlockIndex]);
-// }
}
ZEN_DEBUG("marking cas store file for delete {}, block {}", m_ContainerBaseName, std::to_string(BlockIndex));
std::error_code Ec;
@@ -729,7 +683,6 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
Chunks.emplace(KeyIt->first, KeyIt->second.Get(m_PayloadAlignment));
}
BlockFile = m_ChunkBlocks[BlockIndex];
- CHECK(BlockFile);
BlockFile->Open();
}
@@ -748,7 +701,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
RwLock::ExclusiveLockScope _l(m_LocationMapLock);
if (NewBlockFile)
{
- for (const auto& MovedEntry : MovedBlocks)
+ for (const auto& MovedEntry : MovedChunks)
{
m_LocationMap[MovedEntry.first] = MovedEntry.second;
m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second});
@@ -756,8 +709,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex)
{
ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded",
- m_ContainerBaseName,
- static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ m_ContainerBaseName,
+ static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
return;
}
}
@@ -772,12 +725,6 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
auto NewBlockPath = BuildUcasPath(m_BlocksBasePath, NextBlockIndex);
NewBlockFile = std::make_shared<ChunkBlock>(NewBlockPath);
m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
-// for (const auto& CheckEntry : m_LocationMap)
-// {
-// uint32_t CheckBlockIndex = CheckEntry.second.Get(m_PayloadAlignment).BlockIndex;
-// CHECK(m_ChunkBlocks.contains(CheckBlockIndex));
-// CHECK(m_ChunkBlocks[CheckBlockIndex]);
-// }
}
std::error_code Error;
@@ -794,23 +741,17 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
if (!std::filesystem::is_regular_file(GCReservePath))
{
ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- m_MaxBlockSize,
- NiceBytes(Space.Free));
+ m_Config.RootDirectory / m_ContainerBaseName,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free));
RwLock::ExclusiveLockScope _l(m_LocationMapLock);
m_ChunkBlocks.erase(NextBlockIndex);
-// for (const auto& CheckEntry : m_LocationMap)
-// {
-// uint32_t CheckBlockIndex = CheckEntry.second.Get(m_PayloadAlignment).BlockIndex;
-// CHECK(m_ChunkBlocks.contains(CheckBlockIndex));
-// CHECK(m_ChunkBlocks[CheckBlockIndex]);
-// }
return;
}
ZEN_INFO("using gc reserve for '{}', disk free {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceBytes(Space.Free));
+ m_Config.RootDirectory / m_ContainerBaseName,
+ NiceBytes(Space.Free));
auto NewBlockPath = BuildUcasPath(m_BlocksBasePath, NextBlockIndex);
std::filesystem::rename(GCReservePath, NewBlockPath);
NewBlockFile->Open();
@@ -820,13 +761,13 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
NewBlockFile->Create(m_MaxBlockSize);
}
NewBlockIndex = NextBlockIndex;
- MovedBlocks.clear();
+ MovedChunks.clear();
WriteOffset = 0;
}
NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
CasLocation NewChunkLocation(NewBlockIndex, WriteOffset, Chunk.size());
- MovedBlocks.emplace(Entry.first, CasDiskLocation(NewChunkLocation, m_PayloadAlignment));
+ MovedChunks.emplace(Entry.first, CasDiskLocation(NewChunkLocation, m_PayloadAlignment));
WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment);
}
Chunk.clear();
@@ -835,7 +776,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
RwLock::ExclusiveLockScope _l(m_LocationMapLock);
if (NewBlockFile)
{
- for (const auto& MovedEntry : MovedBlocks)
+ for (const auto& MovedEntry : MovedChunks)
{
m_LocationMap[MovedEntry.first] = MovedEntry.second;
m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second});
@@ -844,13 +785,11 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
const auto& DeleteMap = DeleteChunks[ChunkMapIndex];
for (const auto& ChunkHash : DeleteMap)
{
- auto KeyIt = m_LocationMap.find(ChunkHash);
- CHECK(KeyIt != m_LocationMap.end());
-
- m_LocationMap.erase(KeyIt);
+ auto KeyIt = m_LocationMap.find(ChunkHash);
const CasLocation& DeleteChunkLocation = KeyIt->second.Get(m_PayloadAlignment);
m_CasLog.Append({.Key = ChunkHash, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone});
m_TotalSize.fetch_sub(static_cast<uint64_t>(DeleteChunkLocation.Size));
+ m_LocationMap.erase(KeyIt);
}
DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
}
@@ -862,20 +801,14 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", BlockFile->GetPath(), Ec.message());
}
BlockFile.reset();
-
-// for (const auto& Entry : m_LocationMap)
-// {
-// uint32_t CheckBlockIndex = Entry.second.Get(m_PayloadAlignment).BlockIndex;
-// CHECK(m_ChunkBlocks.contains(CheckBlockIndex));
-// CHECK(m_ChunkBlocks[CheckBlockIndex]);
-// }
}
GcCtx.DeletedCas(DeletedChunks);
- ZEN_INFO("garbage collection complete '{}', deleted {} chunks", m_Config.RootDirectory / m_ContainerBaseName, DeletedChunks.size());
-
- MakeIndexSnapshot();
+ ZEN_INFO("garbage collection complete '{}', deleted {} and moved {} chunks",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ DeletedChunks.size(),
+ MovedChunks.size());
}
void
@@ -1053,6 +986,8 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog");
std::filesystem::path LegacySobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas");
+ bool CasLogEmpty = true;
+
if (IsNewStore)
{
if (std::filesystem::is_regular_file(LegacySobsPath))
@@ -1187,6 +1122,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
std::filesystem::remove(LegacySobsPath);
ZEN_INFO("migrated store {} to {} to chunks", m_Config.RootDirectory / m_ContainerBaseName, NewBlockIndex + 1);
+ CasLogEmpty = false;
}
if (std::filesystem::is_regular_file(SidxPath))
@@ -1224,6 +1160,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
{
m_LocationMap[Record.Key] = Record.Location;
}
+ CasLogEmpty = false;
});
}
@@ -1324,6 +1261,11 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
}
}
+ if (!CasLogEmpty)
+ {
+ MakeIndexSnapshot();
+ }
+
// TODO: should validate integrity of container files here
}
@@ -2026,13 +1968,17 @@ TEST_CASE("compactcas.legacyconversion")
std::filesystem::remove(SidxPath);
}
- std::filesystem::path SlogPath = CasConfig.RootDirectory / "test.ulog";
+ std::filesystem::path SlogPath = CasConfig.RootDirectory / "test.ulog";
+ {
+ TCasLogFile<CasDiskIndexEntry> CasLog;
+ CasLog.Open(SlogPath, false);
+ CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); });
+ }
TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
LegacyCasLog.Open(SlogPath, true);
for (const auto& Entry : LogEntries)
{
- CasLocation Location = Entry.Location.Get(16);
- CHECK(Location.BlockIndex == 1);
+ CasLocation Location = Entry.Location.Get(16);
LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size);
LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key,
.Location = LegacyLocation,
@@ -2058,7 +2004,7 @@ TEST_CASE("compactcas.legacyconversion")
TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
{
-// for (uint32_t i = 0; i < 100; ++i)
+ // for (uint32_t i = 0; i < 100; ++i)
{
ScopedTemporaryDirectory TempDir;