diff options
| author | Dan Engelbrecht <[email protected]> | 2022-03-27 22:15:18 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-03-31 11:29:28 +0200 |
| commit | cae295366b42882678625927df1066dc5418e93a (patch) | |
| tree | dc9546f651d16d0feaf9f879e6c7de09ce4c6f02 /zenstore/compactcas.cpp | |
| parent | Remove redundant lock in BlockStoreFile (diff) | |
| download | zen-cae295366b42882678625927df1066dc5418e93a.tar.xz zen-cae295366b42882678625927df1066dc5418e93a.zip | |
Switch from std::shared_ptr<> to Ref<>
Remove a bunch of 'auto' with explicit type
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 160 |
1 files changed, 79 insertions, 81 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 521546e10..7b7062df6 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -397,7 +397,7 @@ namespace { Result.push_back( {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags}); } - auto BlockPath = GetBlockPath(BlocksBasePath, WriteBlockIndex); + std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, WriteBlockIndex); CreateDirectories(BlockPath.parent_path()); BlockFile.Close(); std::filesystem::rename(LegacySobsPath, BlockPath); @@ -436,8 +436,8 @@ namespace { BlockRanges.reserve(MaxRequiredBlockCount); for (const IoHash& ChunkHash : ChunkHashes) { - const auto& LegacyEntry = LegacyDiskIndex[ChunkHash]; - const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location; + const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; + const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location; uint64_t ChunkOffset = LegacyChunkLocation.GetOffset(); uint64_t ChunkSize = LegacyChunkLocation.GetSize(); @@ -488,8 +488,8 @@ namespace { NiceBytes(TotalSize), NiceTimeSpanMs(ETA)); - auto BlockPath = GetBlockPath(BlocksBasePath, BlockRange.BlockIndex); - BlockStoreFile ChunkBlock(BlockPath); + std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, BlockRange.BlockIndex); + BlockStoreFile ChunkBlock(BlockPath); ChunkBlock.Create(BlockRange.BlockSize); uint64_t Offset = 0; while (Offset < BlockRange.BlockSize) @@ -580,9 +580,9 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint3 CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { - uint32_t WriteBlockIndex; - std::shared_ptr<BlockStoreFile> WriteBlock; - uint64_t InsertOffset; + uint32_t WriteBlockIndex; + Ref<BlockStoreFile> WriteBlock; + uint64_t InsertOffset; { RwLock::ExclusiveLockScope _(m_InsertLock); @@ -602,7 +602,7 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const { if (m_WriteBlock) { - m_WriteBlock.reset(); + m_WriteBlock = nullptr; } { RwLock::ExclusiveLockScope __(m_LocationMapLock); @@ -615,9 +615,9 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const { WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; } - auto BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - m_WriteBlock = std::make_shared<BlockStoreFile>(BlockPath); - m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + m_WriteBlock = new BlockStoreFile(BlockPath); + m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); } m_CurrentInsertOffset = 0; @@ -628,15 +628,16 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const WriteBlock = m_WriteBlock; } - // We can end up in a situation that two InsertChunk writes the same chunk - // data in different locations. + // We can end up in a situation that InsertChunk writes the same chunk data in + // different locations. // We release the insert lock once we have the correct WriteBlock ready and we know - // where to write the data. If a new InsertChunk requests comes in before we update - // m_LocationMap below we will have a race. - // The outcome of that is that we will occupy space more than once but the hash will - // only point to one of the chunks. We will in that case waste space until the next - // GC operation. - // This would be a rare occasion and the current flow reduces the time we block for + // where to write the data. If a new InsertChunk request for the same chunk hash/data + // comes in before we update m_LocationMap below we will have a race. + // The outcome of that is that we will write the chunk data in more than one location + // but the chunk hash will only point to one of the chunks. + // We will in that case waste space until the next GC operation. + // + // This should be a rare occasion and the current flow reduces the time we block for // reads, insert and GC. BlockStoreDiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment); @@ -663,8 +664,8 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { - std::shared_ptr<BlockStoreFile> ChunkBlock; - BlockStoreLocation Location; + Ref<BlockStoreFile> ChunkBlock; + BlockStoreLocation Location; { RwLock::SharedLockScope _(m_LocationMapLock); auto KeyIt = m_LocationMap.find(ChunkHash); @@ -673,12 +674,7 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) return IoBuffer(); } Location = KeyIt->second.Get(m_PayloadAlignment); - auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); - if (BlockIt == m_ChunkBlocks.end()) - { - return IoBuffer(); - } - ChunkBlock = BlockIt->second; + ChunkBlock = m_ChunkBlocks[Location.BlockIndex]; } return ChunkBlock->GetChunk(Location.Offset, Location.Size); } @@ -712,7 +708,7 @@ CasContainerStrategy::Flush() { uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - m_WriteBlock.reset(); + m_WriteBlock = nullptr; m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); m_CurrentInsertOffset = 0; } @@ -741,16 +737,16 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) for (const auto& Block : m_ChunkBlocks) { - uint64_t WindowStart = 0; - uint64_t WindowEnd = WindowSize; - auto& BlockFile = *Block.second; - BlockFile.Open(); - const uint64_t FileSize = BlockFile.FileSize(); + uint64_t WindowStart = 0; + uint64_t WindowEnd = WindowSize; + const Ref<BlockStoreFile>& BlockFile = Block.second; + BlockFile->Open(); + const uint64_t FileSize = BlockFile->FileSize(); do { const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); - BlockFile.Read(BufferBase, ChunkSize, WindowStart); + BlockFile->Read(BufferBase, ChunkSize, WindowStart); for (auto& Entry : m_LocationMap) { @@ -788,10 +784,10 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) for (const CasDiskIndexEntry& Entry : BigChunks) { - IoHashStream Hasher; - const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment); - auto& BlockFile = *m_ChunkBlocks[Location.BlockIndex]; - BlockFile.StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); + IoHashStream Hasher; + const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment); + const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex]; + BlockFile->StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); if (Entry.Key != ComputedHash) @@ -978,13 +974,13 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) } if (Keep) { - auto& ChunkMap = KeepChunks[ChunkMapIndex]; + std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex]; ChunkMap.push_back(ChunkHash); NewTotalSize += Location.GetSize(); } else { - auto& ChunkMap = DeleteChunks[ChunkMapIndex]; + std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; ChunkMap.push_back(ChunkHash); DeleteCount++; } @@ -994,9 +990,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); for (const auto& Entry : BlockIndexToChunkMapIndex) { - uint32_t BlockIndex = Entry.first; - size_t ChunkMapIndex = Entry.second; - const auto& ChunkMap = DeleteChunks[ChunkMapIndex]; + uint32_t BlockIndex = Entry.first; + size_t ChunkMapIndex = Entry.second; + const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; if (ChunkMap.empty()) { continue; @@ -1019,9 +1015,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) // Move all chunks in blocks that have chunks removed to new blocks - std::shared_ptr<BlockStoreFile> NewBlockFile; - uint64_t WriteOffset = 0; - uint32_t NewBlockIndex = 0; + Ref<BlockStoreFile> NewBlockFile; + uint64_t WriteOffset = 0; + uint32_t NewBlockIndex = 0; DeletedChunks.reserve(DeleteCount); std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks; @@ -1029,24 +1025,24 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; - std::shared_ptr<BlockStoreFile> OldBlockFile; + Ref<BlockStoreFile> OldBlockFile; { RwLock::SharedLockScope _i(m_LocationMapLock); OldBlockFile = m_ChunkBlocks[BlockIndex]; } - const auto& KeepMap = KeepChunks[ChunkMapIndex]; + const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex]; if (KeepMap.empty()) { - const auto& DeleteMap = DeleteChunks[ChunkMapIndex]; - auto LogEntries = MakeCasDiskEntries({}, DeleteMap); + const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; + std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries({}, DeleteMap); m_CasLog.Append(LogEntries); { RwLock::ExclusiveLockScope _i(m_LocationMapLock); Stopwatch Timer; const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs] { ReadBlockTimeUs += Timer.GetElapsedTimeUs(); }); UpdateLocations(LogEntries); - m_ChunkBlocks[BlockIndex].reset(); + m_ChunkBlocks[BlockIndex] = nullptr; } DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); ZEN_DEBUG("marking cas store file for delete {}, block {}", m_ContainerBaseName, std::to_string(BlockIndex)); @@ -1069,8 +1065,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) { - uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order::memory_order_relaxed); - auto LogEntries = MakeCasDiskEntries(MovedBlockChunks, {}); + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order::memory_order_relaxed); + std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, {}); m_CasLog.Append(LogEntries); { @@ -1089,9 +1085,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; } - auto NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); - NewBlockFile = std::make_shared<BlockStoreFile>(NewBlockPath); - m_ChunkBlocks[NextBlockIndex] = NewBlockFile; + std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } MovedCount += MovedBlockChunks.size(); @@ -1123,7 +1119,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) ZEN_INFO("using gc reserve for '{}', disk free {}", m_Config.RootDirectory / m_ContainerBaseName, NiceBytes(Space.Free)); - auto NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); + std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); std::filesystem::rename(GCReservePath, NewBlockPath); NewBlockFile->Open(); } @@ -1143,15 +1139,15 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) } Chunk.clear(); - const auto& DeleteMap = DeleteChunks[ChunkMapIndex]; - auto LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap); + const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; + std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap); m_CasLog.Append(LogEntries); { RwLock::ExclusiveLockScope __(m_LocationMapLock); Stopwatch Timer; const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs] { ReadBlockTimeUs += Timer.GetElapsedTimeUs(); }); UpdateLocations(LogEntries); - m_ChunkBlocks[BlockIndex].reset(); + m_ChunkBlocks[BlockIndex] = nullptr; } MovedCount += MovedBlockChunks.size(); DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); @@ -1164,7 +1160,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); } - OldBlockFile.reset(); + OldBlockFile = nullptr; } for (const IoHash& ChunkHash : DeletedChunks) @@ -1348,7 +1344,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) { std::vector<CasDiskIndexEntry> IndexEntries = ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment); - for (const auto& Entry : IndexEntries) + for (const CasDiskIndexEntry& Entry : IndexEntries) { m_LocationMap[Entry.Key] = Entry.Location; } @@ -1357,7 +1353,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) bool MakeSnapshot = false; { std::vector<CasDiskIndexEntry> LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName); - for (const auto& Entry : LogEntries) + for (const CasDiskIndexEntry& Entry : LogEntries) { if (Entry.Flags & CasDiskIndexEntry::kTombstone) { @@ -1379,7 +1375,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) } std::vector<CasDiskIndexEntry> LegacyEntries = MigrateLegacyData(m_Config.RootDirectory, m_ContainerBaseName, m_MaxBlockSize, m_PayloadAlignment, true, ExistingChunks); - for (const auto& Entry : LegacyEntries) + for (const CasDiskIndexEntry& Entry : LegacyEntries) { m_LocationMap[Entry.Key] = Entry.Location; } @@ -1438,8 +1434,8 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) std::filesystem::remove(Path); continue; } - auto BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex); - auto BlockFile = std::make_shared<BlockStoreFile>(BlockPath); + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex); + Ref<BlockStoreFile> BlockFile = new BlockStoreFile(BlockPath); BlockFile->Open(); m_ChunkBlocks[BlockIndex] = BlockFile; } @@ -1651,9 +1647,9 @@ TEST_CASE("compactcas.compact.totalsize") for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - IoBuffer Chunk = CreateChunk(kChunkSize); - const IoHash Hash = HashBuffer(Chunk); - auto InsertResult = Cas.InsertChunk(Chunk, Hash); + IoBuffer Chunk = CreateChunk(kChunkSize); + const IoHash Hash = HashBuffer(Chunk); + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); ZEN_ASSERT(InsertResult.New); } @@ -1687,7 +1683,7 @@ TEST_CASE("compactcas.gc.basic") IoBuffer Chunk = CreateChunk(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); - const auto InsertResult = Cas.InsertChunk(Chunk, ChunkHash); + const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); CHECK(InsertResult.New); Cas.Flush(); @@ -1714,9 +1710,9 @@ TEST_CASE("compactcas.gc.removefile") CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("cb", 65536, 1 << 4, true); - const auto InsertResult = Cas.InsertChunk(Chunk, ChunkHash); + const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); CHECK(InsertResult.New); - const auto InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash); + const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash); CHECK(!InsertResultDup.New); Cas.Flush(); } @@ -2139,8 +2135,8 @@ TEST_CASE("compactcas.legacyconversion") Gc.CollectGarbage(GcCtx); } - auto CasPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1); - auto LegacyCasPath = GetLegacyUcasPath(CasConfig.RootDirectory, "test"); + std::filesystem::path CasPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1); + std::filesystem::path LegacyCasPath = GetLegacyUcasPath(CasConfig.RootDirectory, "test"); std::filesystem::rename(CasPath, LegacyCasPath); std::vector<CasDiskIndexEntry> LogEntries; @@ -2240,7 +2236,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) const IoBuffer& Chunk = Chunks[Idx]; const IoHash& Hash = ChunkHashes[Idx]; ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() { - auto InsertResult = Cas.InsertChunk(Chunk, Hash); + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); ZEN_ASSERT(InsertResult.New); }); } @@ -2288,7 +2284,7 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) const IoBuffer& Chunk = NewChunks[Idx]; const IoHash& Hash = NewChunkHashes[Idx]; ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() { - auto InsertResult = Cas.InsertChunk(Chunk, Hash); + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); ZEN_ASSERT(InsertResult.New); AddedChunkCount.fetch_add(1); }); @@ -2411,15 +2407,17 @@ TEST_CASE("compactcas.migrate.large.data" * doctest::skip(true)) std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs"); std::filesystem::remove_all(TobsBasePath); std::filesystem::remove_all(SobsBasePath); - uint64_t TobsPayloadAlignment = 16; - uint64_t TobsBlockSize = 1u << 28; - auto TobsMigratedChunks = MigrateLegacyData(BigDataPath, "tobs", TobsBlockSize, TobsPayloadAlignment, false, {}); + uint64_t TobsPayloadAlignment = 16; + uint64_t TobsBlockSize = 1u << 28; + std::vector<CasDiskIndexEntry> TobsMigratedChunks = + MigrateLegacyData(BigDataPath, "tobs", TobsBlockSize, TobsPayloadAlignment, false, {}); CHECK(TobsMigratedChunks.size() > 0); uint64_t SobsPayloadAlignment = 4096; uint64_t SobsBlockSize = 1u << 30; - auto SobsMigratedChunks = MigrateLegacyData(BigDataPath, "sobs", SobsBlockSize, SobsPayloadAlignment, false, {}); + std::vector<CasDiskIndexEntry> SobsMigratedChunks = + MigrateLegacyData(BigDataPath, "sobs", SobsBlockSize, SobsPayloadAlignment, false, {}); CHECK(SobsMigratedChunks.size() > 0); CasStoreConfiguration CasConfig; |