diff options
| author | Dan Engelbrecht <[email protected]> | 2022-03-23 11:23:24 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-03-31 11:29:27 +0200 |
| commit | 4dd1ae392ff26a7ba27f1cf8f0bb3fcbb4a04d73 (patch) | |
| tree | 75fac9f50f2d6f64fc435e62913b3d494f739972 /zenstore/compactcas.cpp | |
| parent | Add separate blockstore.h/.cpp (diff) | |
| download | zen-4dd1ae392ff26a7ba27f1cf8f0bb3fcbb4a04d73.tar.xz zen-4dd1ae392ff26a7ba27f1cf8f0bb3fcbb4a04d73.zip | |
Use blockstore in compactcas
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 272 |
1 files changed, 45 insertions, 227 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 1f49dd7c2..fbdac491e 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -58,148 +58,6 @@ namespace { } // namespace -struct CasContainerStrategy::ChunkBlock -{ - explicit ChunkBlock(const std::filesystem::path& BlockPath); - ~ChunkBlock(); - const std::filesystem::path& GetPath() const; - void Open(); - void Create(uint64_t InitialSize); - void MarkAsDeleteOnClose(std::error_code& Ec); - uint64_t FileSize(); - IoBuffer GetChunk(uint64_t Offset, uint64_t Size); - void Read(void* Data, uint64_t Size, uint64_t FileOffset); - void Write(const void* Data, uint64_t Size, uint64_t FileOffset); - void Flush(); - void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); - -private: - void InternalOpen(); - const std::filesystem::path m_Path; - RwLock m_OpenLock; - BasicFile m_File; - IoBuffer m_IoBuffer; -}; - -CasContainerStrategy::ChunkBlock::ChunkBlock(const std::filesystem::path& BlockPath) : m_Path(BlockPath) -{ -} - -CasContainerStrategy::ChunkBlock::~ChunkBlock() -{ - RwLock::ExclusiveLockScope _(m_OpenLock); - m_File.Detach(); -} - -const std::filesystem::path& -CasContainerStrategy::ChunkBlock::GetPath() const -{ - return m_Path; -} - -void -CasContainerStrategy::ChunkBlock::InternalOpen() -{ - if (m_File.Handle()) - { - return; - } - m_File.Open(m_Path, false); - void* FileHandle = m_File.Handle(); - m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize()); -} - -void -CasContainerStrategy::ChunkBlock::Open() -{ - RwLock::ExclusiveLockScope _(m_OpenLock); - InternalOpen(); -} - -void -CasContainerStrategy::ChunkBlock::Create(uint64_t InitialSize) -{ - RwLock::ExclusiveLockScope _(m_OpenLock); - - auto ParentPath = m_Path.parent_path(); - if (!std::filesystem::is_directory(ParentPath)) - { - CreateDirectories(ParentPath); - } - - m_File.Open(m_Path, true); - if (InitialSize > 0) - { - m_File.SetFileSize(InitialSize); - } - void* FileHandle = m_File.Handle(); - m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize); -} - -uint64_t -CasContainerStrategy::ChunkBlock::FileSize() -{ - RwLock::SharedLockScope _(m_OpenLock); - return m_File.FileSize(); -} - -void -CasContainerStrategy::ChunkBlock::MarkAsDeleteOnClose(std::error_code& Ec) -{ - RwLock::ExclusiveLockScope _(m_OpenLock); - if (m_File.Handle()) - { - m_File.MarkAsDeleteOnClose(Ec); - return; - } - if (std::filesystem::is_regular_file(m_Path)) - { - Ec.clear(); - std::filesystem::remove(m_Path, Ec); - } -} - -IoBuffer -CasContainerStrategy::ChunkBlock::GetChunk(uint64_t Offset, uint64_t Size) -{ - InternalOpen(); - return IoBuffer(m_IoBuffer, Offset, Size); -} - -void -CasContainerStrategy::ChunkBlock::Read(void* Data, uint64_t Size, uint64_t FileOffset) -{ - RwLock::SharedLockScope _(m_OpenLock); - m_File.Read(Data, Size, FileOffset); -} - -void -CasContainerStrategy::ChunkBlock::Write(const void* Data, uint64_t Size, uint64_t FileOffset) -{ - RwLock::SharedLockScope _(m_OpenLock); - m_File.Write(Data, Size, FileOffset); -} - -void -CasContainerStrategy::ChunkBlock::Flush() -{ - RwLock::ExclusiveLockScope _(m_OpenLock); - if (!m_File.Handle()) - { - return; - } - m_File.Flush(); -} - -void -CasContainerStrategy::ChunkBlock::StreamByteRange(uint64_t FileOffset, - uint64_t Size, - std::function<void(const void* Data, uint64_t Size)>&& ChunkFun) -{ - RwLock::SharedLockScope _(m_OpenLock); - m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); -} - ////////////////////////////////////////////////////////////////////////// CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc) @@ -233,9 +91,9 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint3 CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { - uint32_t WriteBlockIndex; - std::shared_ptr<ChunkBlock> WriteBlock; - uint64_t InsertOffset; + uint32_t WriteBlockIndex; + std::shared_ptr<BlockStoreFile> WriteBlock; + uint64_t InsertOffset; { RwLock::ExclusiveLockScope _i(m_InsertLock); @@ -261,17 +119,17 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const } { RwLock::ExclusiveLockScope __(m_LocationMapLock); - if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) + if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) { throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); } WriteBlockIndex += WriteBlock ? 1 : 0; while (m_ChunkBlocks.contains(WriteBlockIndex)) { - WriteBlockIndex = (WriteBlockIndex + 1) & CasDiskLocation::MaxBlockIndex; + WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; } auto BlockPath = BuildUcasPath(m_BlocksBasePath, WriteBlockIndex); - WriteBlock = std::make_shared<ChunkBlock>(BlockPath); + WriteBlock = std::make_shared<BlockStoreFile>(BlockPath); m_ChunkBlocks[WriteBlockIndex] = WriteBlock; m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); } @@ -289,13 +147,13 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const WriteBlock->Write(ChunkData, ChunkSize, InsertOffset); - const CasLocation Location(WriteBlockIndex, InsertOffset, ChunkSize); - CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = CasDiskLocation(Location, m_PayloadAlignment)}; + const BlockStoreLocation Location(WriteBlockIndex, InsertOffset, ChunkSize); + CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)}; m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize)); { RwLock::ExclusiveLockScope __(m_LocationMapLock); - m_LocationMap.emplace(ChunkHash, CasDiskLocation(Location, m_PayloadAlignment)); + m_LocationMap.emplace(ChunkHash, BlockStoreDiskLocation(Location, m_PayloadAlignment)); m_CasLog.Append(IndexEntry); } @@ -311,8 +169,8 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { - std::shared_ptr<ChunkBlock> ChunkBlock; - CasLocation Location; + std::shared_ptr<BlockStoreFile> ChunkBlock; + BlockStoreLocation Location; { RwLock::SharedLockScope _(m_LocationMapLock); auto KeyIt = m_LocationMap.find(ChunkHash); @@ -363,7 +221,7 @@ CasContainerStrategy::Flush() WriteBlockIndex++; while (m_ChunkBlocks.contains(WriteBlockIndex)) { - WriteBlockIndex = (WriteBlockIndex + 1) & CasDiskLocation::MaxBlockIndex; + WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; } WriteBlock->Flush(); m_WriteBlock.reset(); @@ -409,8 +267,8 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) for (auto& Entry : m_LocationMap) { - const CasLocation Location = Entry.second.Get(m_PayloadAlignment); - const uint64_t EntryOffset = Location.Offset; + const BlockStoreLocation Location = Entry.second.Get(m_PayloadAlignment); + const uint64_t EntryOffset = Location.Offset; if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { @@ -443,9 +301,9 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) for (const CasDiskIndexEntry& Entry : BigChunks) { - IoHashStream Hasher; - const CasLocation Location = Entry.Location.Get(m_PayloadAlignment); - auto& BlockFile = *m_ChunkBlocks[Location.BlockIndex]; + IoHashStream Hasher; + const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment); + auto& BlockFile = *m_ChunkBlocks[Location.BlockIndex]; BlockFile.StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); @@ -516,8 +374,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) // path to the next new block. ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); - std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher> LocationMap; - size_t BlockCount; + std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap; + size_t BlockCount; { RwLock::SharedLockScope _i(m_InsertLock); RwLock::SharedLockScope _l(m_LocationMapLock); @@ -609,22 +467,22 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) // Move all chunks in blocks that have chunks removed to new blocks - std::shared_ptr<ChunkBlock> NewBlockFile; - uint64_t WriteOffset = {}; - uint32_t NewBlockIndex = {}; - std::vector<IoHash> DeletedChunks; + std::shared_ptr<BlockStoreFile> NewBlockFile; + uint64_t WriteOffset = {}; + uint32_t NewBlockIndex = {}; + std::vector<IoHash> DeletedChunks; DeletedChunks.reserve(DeleteCount); std::vector<IoHash> MovedChunks; DeletedChunks.reserve(MoveCount); - std::unordered_map<IoHash, CasDiskLocation> MovedBlockChunks; + std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks; for (auto BlockIndex : BlocksToReWrite) { const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; const auto& KeepMap = KeepChunks[ChunkMapIndex]; if (KeepMap.empty()) { - std::shared_ptr<ChunkBlock> BlockFile; + std::shared_ptr<BlockStoreFile> BlockFile; { RwLock::ExclusiveLockScope _i(m_LocationMapLock); const auto& DeleteMap = DeleteChunks[ChunkMapIndex]; @@ -649,7 +507,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) continue; } - std::shared_ptr<ChunkBlock> OldBlockFile; + std::shared_ptr<BlockStoreFile> OldBlockFile; { RwLock::SharedLockScope _i(m_LocationMapLock); OldBlockFile = m_ChunkBlocks[BlockIndex]; @@ -660,8 +518,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) std::vector<uint8_t> Chunk; for (const auto& ChunkHash : KeepMap) { - auto KeyIt = LocationMap.find(ChunkHash); - const CasLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment); + auto KeyIt = LocationMap.find(ChunkHash); + const BlockStoreLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment); Chunk.resize(ChunkLocation.Size); OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); @@ -678,7 +536,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second}); MovedChunks.push_back(MovedEntry.first); } - if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) + if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) { ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded", m_ContainerBaseName, @@ -686,16 +544,16 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) return; } } - if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex) + if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) { throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName)); } while (m_ChunkBlocks.contains(NextBlockIndex)) { - NextBlockIndex = (NextBlockIndex + 1) & CasDiskLocation::MaxBlockIndex; + NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; } auto NewBlockPath = BuildUcasPath(m_BlocksBasePath, NextBlockIndex); - NewBlockFile = std::make_shared<ChunkBlock>(NewBlockPath); + NewBlockFile = std::make_shared<BlockStoreFile>(NewBlockPath); m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } @@ -738,8 +596,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) } NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); - CasLocation NewChunkLocation(NewBlockIndex, WriteOffset, Chunk.size()); - MovedBlockChunks.emplace(ChunkHash, CasDiskLocation(NewChunkLocation, m_PayloadAlignment)); + BlockStoreLocation NewChunkLocation(NewBlockIndex, WriteOffset, Chunk.size()); + MovedBlockChunks.emplace(ChunkHash, BlockStoreDiskLocation(NewChunkLocation, m_PayloadAlignment)); WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); } Chunk.clear(); @@ -1065,12 +923,12 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) BlockFile.SetFileSize(MaxUsedSize); uint64_t MaxRequiredChunkCount = MaxUsedSize / m_MaxBlockSize; - if (MaxRequiredChunkCount > CasDiskLocation::MaxBlockIndex) + if (MaxRequiredChunkCount > BlockStoreDiskLocation::MaxBlockIndex) { ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", m_Config.RootDirectory / m_ContainerBaseName, MaxRequiredChunkCount, - CasDiskLocation::MaxBlockIndex); + BlockStoreDiskLocation::MaxBlockIndex); return; } @@ -1091,9 +949,9 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) m_CasLog.Open(SlogPath, true); - std::unique_ptr<ChunkBlock> NewBlockFile; - uint64_t WriteOffset = {}; - uint32_t NewBlockIndex = {}; + std::unique_ptr<BlockStoreFile> NewBlockFile; + uint64_t WriteOffset = {}; + uint32_t NewBlockIndex = {}; std::vector<uint8_t> Chunk; for (const auto& ChunkHash : ChunkHashes) @@ -1105,7 +963,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) if (!NewBlockFile) { auto BlockPath = BuildUcasPath(m_BlocksBasePath, NewBlockIndex); - NewBlockFile = std::make_unique<ChunkBlock>(BlockPath); + NewBlockFile = std::make_unique<BlockStoreFile>(BlockPath); NewBlockFile->Create(m_MaxBlockSize); } else if (WriteOffset + Chunk.size() > m_MaxBlockSize) @@ -1114,13 +972,13 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) BlockFile.SetFileSize(ChunkEnd); NewBlockIndex = NewBlockIndex + 1; auto BlockPath = BuildUcasPath(m_BlocksBasePath, NewBlockIndex); - NewBlockFile = std::make_unique<ChunkBlock>(BlockPath); + NewBlockFile = std::make_unique<BlockStoreFile>(BlockPath); NewBlockFile->Create(m_MaxBlockSize); WriteOffset = 0; } NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); - CasLocation NewChunkLocation(NewBlockIndex, WriteOffset, Chunk.size()); - m_CasLog.Append({.Key = ChunkHash, .Location = CasDiskLocation(NewChunkLocation, m_PayloadAlignment)}); + BlockStoreLocation NewChunkLocation(NewBlockIndex, WriteOffset, Chunk.size()); + m_CasLog.Append({.Key = ChunkHash, .Location = BlockStoreDiskLocation(NewChunkLocation, m_PayloadAlignment)}); WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); } m_CasLog.Close(); @@ -1219,7 +1077,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) continue; } auto BlockPath = BuildUcasPath(m_BlocksBasePath, BlockIndex); - auto BlockFile = std::make_shared<ChunkBlock>(BlockPath); + auto BlockFile = std::make_shared<BlockStoreFile>(BlockPath); m_ChunkBlocks[BlockIndex] = BlockFile; } } @@ -1303,46 +1161,6 @@ namespace { } } // namespace -bool -operator==(const CasLocation& Lhs, const CasLocation& Rhs) -{ - return Lhs.BlockIndex == Rhs.BlockIndex && Lhs.Offset == Rhs.Offset && Lhs.Size == Rhs.Size; -} - -TEST_CASE("compactcas.casdisklocation") -{ - CasLocation Zero = CasLocation{.BlockIndex = 0, .Offset = 0, .Size = 0}; - CHECK(Zero == CasDiskLocation(Zero, 4).Get(4)); - - CasLocation MaxBlockIndex = CasLocation{.BlockIndex = CasDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0}; - CHECK(MaxBlockIndex == CasDiskLocation(MaxBlockIndex, 4).Get(4)); - - CasLocation MaxOffset = CasLocation{.BlockIndex = 0, .Offset = CasDiskLocation::MaxOffset * 4, .Size = 0}; - CHECK(MaxOffset == CasDiskLocation(MaxOffset, 4).Get(4)); - - CasLocation MaxSize = CasLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits<uint32_t>::max()}; - CHECK(MaxSize == CasDiskLocation(MaxSize, 4).Get(4)); - - CasLocation MaxBlockIndexAndOffset = - CasLocation{.BlockIndex = CasDiskLocation::MaxBlockIndex, .Offset = CasDiskLocation::MaxOffset * 4, .Size = 0}; - CHECK(MaxBlockIndexAndOffset == CasDiskLocation(MaxBlockIndexAndOffset, 4).Get(4)); - - CasLocation MaxAll = CasLocation{.BlockIndex = CasDiskLocation::MaxBlockIndex, - .Offset = CasDiskLocation::MaxOffset * 4, - .Size = std::numeric_limits<uint32_t>::max()}; - CHECK(MaxAll == CasDiskLocation(MaxAll, 4).Get(4)); - - CasLocation MaxAll4096 = CasLocation{.BlockIndex = CasDiskLocation::MaxBlockIndex, - .Offset = CasDiskLocation::MaxOffset * 4096, - .Size = std::numeric_limits<uint32_t>::max()}; - CHECK(MaxAll4096 == CasDiskLocation(MaxAll4096, 4096).Get(4096)); - - CasLocation Middle = CasLocation{.BlockIndex = (CasDiskLocation::MaxBlockIndex) / 2, - .Offset = ((CasDiskLocation::MaxOffset) / 2) * 4, - .Size = std::numeric_limits<uint32_t>::max() / 2}; - CHECK(Middle == CasDiskLocation(Middle, 4).Get(4)); -} - TEST_CASE("compactcas.hex") { uint32_t Value; @@ -1990,7 +1808,7 @@ TEST_CASE("compactcas.legacyconversion") LegacyCasLog.Open(SlogPath, true); for (const auto& Entry : LogEntries) { - CasLocation Location = Entry.Location.Get(16); + BlockStoreLocation Location = Entry.Location.Get(16); LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size); LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation, |