aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-03-27 22:15:18 +0200
committerDan Engelbrecht <[email protected]>2022-03-31 11:29:28 +0200
commitcae295366b42882678625927df1066dc5418e93a (patch)
treedc9546f651d16d0feaf9f879e6c7de09ce4c6f02 /zenstore/compactcas.cpp
parentRemove redundant lock in BlockStoreFile (diff)
downloadzen-cae295366b42882678625927df1066dc5418e93a.tar.xz
zen-cae295366b42882678625927df1066dc5418e93a.zip
Switch from std::shared_ptr<> to Ref<>
Remove a bunch of 'auto' with explicit type
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp160
1 files changed, 79 insertions, 81 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 521546e10..7b7062df6 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -397,7 +397,7 @@ namespace {
Result.push_back(
{.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags});
}
- auto BlockPath = GetBlockPath(BlocksBasePath, WriteBlockIndex);
+ std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, WriteBlockIndex);
CreateDirectories(BlockPath.parent_path());
BlockFile.Close();
std::filesystem::rename(LegacySobsPath, BlockPath);
@@ -436,8 +436,8 @@ namespace {
BlockRanges.reserve(MaxRequiredBlockCount);
for (const IoHash& ChunkHash : ChunkHashes)
{
- const auto& LegacyEntry = LegacyDiskIndex[ChunkHash];
- const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location;
+ const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location;
uint64_t ChunkOffset = LegacyChunkLocation.GetOffset();
uint64_t ChunkSize = LegacyChunkLocation.GetSize();
@@ -488,8 +488,8 @@ namespace {
NiceBytes(TotalSize),
NiceTimeSpanMs(ETA));
- auto BlockPath = GetBlockPath(BlocksBasePath, BlockRange.BlockIndex);
- BlockStoreFile ChunkBlock(BlockPath);
+ std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, BlockRange.BlockIndex);
+ BlockStoreFile ChunkBlock(BlockPath);
ChunkBlock.Create(BlockRange.BlockSize);
uint64_t Offset = 0;
while (Offset < BlockRange.BlockSize)
@@ -580,9 +580,9 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint3
CasStore::InsertResult
CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
{
- uint32_t WriteBlockIndex;
- std::shared_ptr<BlockStoreFile> WriteBlock;
- uint64_t InsertOffset;
+ uint32_t WriteBlockIndex;
+ Ref<BlockStoreFile> WriteBlock;
+ uint64_t InsertOffset;
{
RwLock::ExclusiveLockScope _(m_InsertLock);
@@ -602,7 +602,7 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
{
if (m_WriteBlock)
{
- m_WriteBlock.reset();
+ m_WriteBlock = nullptr;
}
{
RwLock::ExclusiveLockScope __(m_LocationMapLock);
@@ -615,9 +615,9 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
{
WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
}
- auto BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
- m_WriteBlock = std::make_shared<BlockStoreFile>(BlockPath);
- m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+ m_WriteBlock = new BlockStoreFile(BlockPath);
+ m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
}
m_CurrentInsertOffset = 0;
@@ -628,15 +628,16 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
WriteBlock = m_WriteBlock;
}
- // We can end up in a situation that two InsertChunk writes the same chunk
- // data in different locations.
+ // We can end up in a situation that InsertChunk writes the same chunk data in
+ // different locations.
// We release the insert lock once we have the correct WriteBlock ready and we know
- // where to write the data. If a new InsertChunk requests comes in before we update
- // m_LocationMap below we will have a race.
- // The outcome of that is that we will occupy space more than once but the hash will
- // only point to one of the chunks. We will in that case waste space until the next
- // GC operation.
- // This would be a rare occasion and the current flow reduces the time we block for
+ // where to write the data. If a new InsertChunk request for the same chunk hash/data
+ // comes in before we update m_LocationMap below we will have a race.
+ // The outcome of that is that we will write the chunk data in more than one location
+ // but the chunk hash will only point to one of the chunks.
+ // We will in that case waste space until the next GC operation.
+ //
+ // This should be a rare occasion and the current flow reduces the time we block for
// reads, insert and GC.
BlockStoreDiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment);
@@ -663,8 +664,8 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
IoBuffer
CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
- std::shared_ptr<BlockStoreFile> ChunkBlock;
- BlockStoreLocation Location;
+ Ref<BlockStoreFile> ChunkBlock;
+ BlockStoreLocation Location;
{
RwLock::SharedLockScope _(m_LocationMapLock);
auto KeyIt = m_LocationMap.find(ChunkHash);
@@ -673,12 +674,7 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
return IoBuffer();
}
Location = KeyIt->second.Get(m_PayloadAlignment);
- auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex);
- if (BlockIt == m_ChunkBlocks.end())
- {
- return IoBuffer();
- }
- ChunkBlock = BlockIt->second;
+ ChunkBlock = m_ChunkBlocks[Location.BlockIndex];
}
return ChunkBlock->GetChunk(Location.Offset, Location.Size);
}
@@ -712,7 +708,7 @@ CasContainerStrategy::Flush()
{
uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
- m_WriteBlock.reset();
+ m_WriteBlock = nullptr;
m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
m_CurrentInsertOffset = 0;
}
@@ -741,16 +737,16 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
for (const auto& Block : m_ChunkBlocks)
{
- uint64_t WindowStart = 0;
- uint64_t WindowEnd = WindowSize;
- auto& BlockFile = *Block.second;
- BlockFile.Open();
- const uint64_t FileSize = BlockFile.FileSize();
+ uint64_t WindowStart = 0;
+ uint64_t WindowEnd = WindowSize;
+ const Ref<BlockStoreFile>& BlockFile = Block.second;
+ BlockFile->Open();
+ const uint64_t FileSize = BlockFile->FileSize();
do
{
const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart);
- BlockFile.Read(BufferBase, ChunkSize, WindowStart);
+ BlockFile->Read(BufferBase, ChunkSize, WindowStart);
for (auto& Entry : m_LocationMap)
{
@@ -788,10 +784,10 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
for (const CasDiskIndexEntry& Entry : BigChunks)
{
- IoHashStream Hasher;
- const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment);
- auto& BlockFile = *m_ChunkBlocks[Location.BlockIndex];
- BlockFile.StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
+ IoHashStream Hasher;
+ const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment);
+ const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex];
+ BlockFile->StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
IoHash ComputedHash = Hasher.GetHash();
if (Entry.Key != ComputedHash)
@@ -978,13 +974,13 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
}
if (Keep)
{
- auto& ChunkMap = KeepChunks[ChunkMapIndex];
+ std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex];
ChunkMap.push_back(ChunkHash);
NewTotalSize += Location.GetSize();
}
else
{
- auto& ChunkMap = DeleteChunks[ChunkMapIndex];
+ std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
ChunkMap.push_back(ChunkHash);
DeleteCount++;
}
@@ -994,9 +990,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
for (const auto& Entry : BlockIndexToChunkMapIndex)
{
- uint32_t BlockIndex = Entry.first;
- size_t ChunkMapIndex = Entry.second;
- const auto& ChunkMap = DeleteChunks[ChunkMapIndex];
+ uint32_t BlockIndex = Entry.first;
+ size_t ChunkMapIndex = Entry.second;
+ const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
if (ChunkMap.empty())
{
continue;
@@ -1019,9 +1015,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
// Move all chunks in blocks that have chunks removed to new blocks
- std::shared_ptr<BlockStoreFile> NewBlockFile;
- uint64_t WriteOffset = 0;
- uint32_t NewBlockIndex = 0;
+ Ref<BlockStoreFile> NewBlockFile;
+ uint64_t WriteOffset = 0;
+ uint32_t NewBlockIndex = 0;
DeletedChunks.reserve(DeleteCount);
std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks;
@@ -1029,24 +1025,24 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
{
const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
- std::shared_ptr<BlockStoreFile> OldBlockFile;
+ Ref<BlockStoreFile> OldBlockFile;
{
RwLock::SharedLockScope _i(m_LocationMapLock);
OldBlockFile = m_ChunkBlocks[BlockIndex];
}
- const auto& KeepMap = KeepChunks[ChunkMapIndex];
+ const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex];
if (KeepMap.empty())
{
- const auto& DeleteMap = DeleteChunks[ChunkMapIndex];
- auto LogEntries = MakeCasDiskEntries({}, DeleteMap);
+ const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries({}, DeleteMap);
m_CasLog.Append(LogEntries);
{
RwLock::ExclusiveLockScope _i(m_LocationMapLock);
Stopwatch Timer;
const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs] { ReadBlockTimeUs += Timer.GetElapsedTimeUs(); });
UpdateLocations(LogEntries);
- m_ChunkBlocks[BlockIndex].reset();
+ m_ChunkBlocks[BlockIndex] = nullptr;
}
DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
ZEN_DEBUG("marking cas store file for delete {}, block {}", m_ContainerBaseName, std::to_string(BlockIndex));
@@ -1069,8 +1065,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
{
- uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order::memory_order_relaxed);
- auto LogEntries = MakeCasDiskEntries(MovedBlockChunks, {});
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order::memory_order_relaxed);
+ std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, {});
m_CasLog.Append(LogEntries);
{
@@ -1089,9 +1085,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
{
NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
}
- auto NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
- NewBlockFile = std::make_shared<BlockStoreFile>(NewBlockPath);
- m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
+ std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
+ NewBlockFile = new BlockStoreFile(NewBlockPath);
+ m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
}
MovedCount += MovedBlockChunks.size();
@@ -1123,7 +1119,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
ZEN_INFO("using gc reserve for '{}', disk free {}",
m_Config.RootDirectory / m_ContainerBaseName,
NiceBytes(Space.Free));
- auto NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
+ std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
std::filesystem::rename(GCReservePath, NewBlockPath);
NewBlockFile->Open();
}
@@ -1143,15 +1139,15 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
}
Chunk.clear();
- const auto& DeleteMap = DeleteChunks[ChunkMapIndex];
- auto LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap);
+ const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap);
m_CasLog.Append(LogEntries);
{
RwLock::ExclusiveLockScope __(m_LocationMapLock);
Stopwatch Timer;
const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs] { ReadBlockTimeUs += Timer.GetElapsedTimeUs(); });
UpdateLocations(LogEntries);
- m_ChunkBlocks[BlockIndex].reset();
+ m_ChunkBlocks[BlockIndex] = nullptr;
}
MovedCount += MovedBlockChunks.size();
DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
@@ -1164,7 +1160,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
{
ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
}
- OldBlockFile.reset();
+ OldBlockFile = nullptr;
}
for (const IoHash& ChunkHash : DeletedChunks)
@@ -1348,7 +1344,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
{
std::vector<CasDiskIndexEntry> IndexEntries = ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment);
- for (const auto& Entry : IndexEntries)
+ for (const CasDiskIndexEntry& Entry : IndexEntries)
{
m_LocationMap[Entry.Key] = Entry.Location;
}
@@ -1357,7 +1353,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
bool MakeSnapshot = false;
{
std::vector<CasDiskIndexEntry> LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName);
- for (const auto& Entry : LogEntries)
+ for (const CasDiskIndexEntry& Entry : LogEntries)
{
if (Entry.Flags & CasDiskIndexEntry::kTombstone)
{
@@ -1379,7 +1375,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
}
std::vector<CasDiskIndexEntry> LegacyEntries =
MigrateLegacyData(m_Config.RootDirectory, m_ContainerBaseName, m_MaxBlockSize, m_PayloadAlignment, true, ExistingChunks);
- for (const auto& Entry : LegacyEntries)
+ for (const CasDiskIndexEntry& Entry : LegacyEntries)
{
m_LocationMap[Entry.Key] = Entry.Location;
}
@@ -1438,8 +1434,8 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
std::filesystem::remove(Path);
continue;
}
- auto BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex);
- auto BlockFile = std::make_shared<BlockStoreFile>(BlockPath);
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex);
+ Ref<BlockStoreFile> BlockFile = new BlockStoreFile(BlockPath);
BlockFile->Open();
m_ChunkBlocks[BlockIndex] = BlockFile;
}
@@ -1651,9 +1647,9 @@ TEST_CASE("compactcas.compact.totalsize")
for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
{
- IoBuffer Chunk = CreateChunk(kChunkSize);
- const IoHash Hash = HashBuffer(Chunk);
- auto InsertResult = Cas.InsertChunk(Chunk, Hash);
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ const IoHash Hash = HashBuffer(Chunk);
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
ZEN_ASSERT(InsertResult.New);
}
@@ -1687,7 +1683,7 @@ TEST_CASE("compactcas.gc.basic")
IoBuffer Chunk = CreateChunk(128);
IoHash ChunkHash = IoHash::HashBuffer(Chunk);
- const auto InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
+ const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
CHECK(InsertResult.New);
Cas.Flush();
@@ -1714,9 +1710,9 @@ TEST_CASE("compactcas.gc.removefile")
CasContainerStrategy Cas(CasConfig, Gc);
Cas.Initialize("cb", 65536, 1 << 4, true);
- const auto InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
+ const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
CHECK(InsertResult.New);
- const auto InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash);
+ const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash);
CHECK(!InsertResultDup.New);
Cas.Flush();
}
@@ -2139,8 +2135,8 @@ TEST_CASE("compactcas.legacyconversion")
Gc.CollectGarbage(GcCtx);
}
- auto CasPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1);
- auto LegacyCasPath = GetLegacyUcasPath(CasConfig.RootDirectory, "test");
+ std::filesystem::path CasPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1);
+ std::filesystem::path LegacyCasPath = GetLegacyUcasPath(CasConfig.RootDirectory, "test");
std::filesystem::rename(CasPath, LegacyCasPath);
std::vector<CasDiskIndexEntry> LogEntries;
@@ -2240,7 +2236,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
const IoBuffer& Chunk = Chunks[Idx];
const IoHash& Hash = ChunkHashes[Idx];
ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() {
- auto InsertResult = Cas.InsertChunk(Chunk, Hash);
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
ZEN_ASSERT(InsertResult.New);
});
}
@@ -2288,7 +2284,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
const IoBuffer& Chunk = NewChunks[Idx];
const IoHash& Hash = NewChunkHashes[Idx];
ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() {
- auto InsertResult = Cas.InsertChunk(Chunk, Hash);
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
ZEN_ASSERT(InsertResult.New);
AddedChunkCount.fetch_add(1);
});
@@ -2411,15 +2407,17 @@ TEST_CASE("compactcas.migrate.large.data" * doctest::skip(true))
std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs");
std::filesystem::remove_all(TobsBasePath);
std::filesystem::remove_all(SobsBasePath);
- uint64_t TobsPayloadAlignment = 16;
- uint64_t TobsBlockSize = 1u << 28;
- auto TobsMigratedChunks = MigrateLegacyData(BigDataPath, "tobs", TobsBlockSize, TobsPayloadAlignment, false, {});
+ uint64_t TobsPayloadAlignment = 16;
+ uint64_t TobsBlockSize = 1u << 28;
+ std::vector<CasDiskIndexEntry> TobsMigratedChunks =
+ MigrateLegacyData(BigDataPath, "tobs", TobsBlockSize, TobsPayloadAlignment, false, {});
CHECK(TobsMigratedChunks.size() > 0);
uint64_t SobsPayloadAlignment = 4096;
uint64_t SobsBlockSize = 1u << 30;
- auto SobsMigratedChunks = MigrateLegacyData(BigDataPath, "sobs", SobsBlockSize, SobsPayloadAlignment, false, {});
+ std::vector<CasDiskIndexEntry> SobsMigratedChunks =
+ MigrateLegacyData(BigDataPath, "sobs", SobsBlockSize, SobsPayloadAlignment, false, {});
CHECK(SobsMigratedChunks.size() > 0);
CasStoreConfiguration CasConfig;