diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-02 10:18:31 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-02 10:18:31 +0200 |
| commit | c89190f7fabf8a08cda2255937dc99ca35972210 (patch) | |
| tree | f67248118b6dc47f5f3665ba09f7745bd69b0f5a /zenstore/compactcas.cpp | |
| parent | cleanup (diff) | |
| download | zen-c89190f7fabf8a08cda2255937dc99ca35972210.tar.xz zen-c89190f7fabf8a08cda2255937dc99ca35972210.zip | |
Move bulk of MigrateLegacyData to blockstore.cpp
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 268 |
1 files changed, 51 insertions, 217 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index a79928fba..8d90ba186 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -493,7 +493,6 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) m_PayloadAlignment, false, [this, &DeletedChunks, &ChunkIndexToChunkHash, &LocationMap, &ReadBlockTimeUs, &ReadBlockLongestTimeUs]( - uint32_t, const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, const std::vector<size_t>& RemovedChunks) { std::vector<CasDiskIndexEntry> LogEntries; @@ -753,32 +752,13 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) NiceBytes(TotalSize)); }); - uint32_t WriteBlockIndex = 0; - while (std::filesystem::exists(BlockStore::GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + uint64_t BlockFileSize = 0; { - ++WriteBlockIndex; + BasicFile BlockFile; + BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + BlockFileSize = BlockFile.FileSize(); } - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); - if (Error) - { - ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message()); - return 0; - } - - if (Space.Free < m_MaxBlockSize) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", - m_Config.RootDirectory / m_ContainerBaseName, - m_MaxBlockSize, - NiceBytes(Space.Free)); - return 0; - } - - BasicFile BlockFile; - BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; uint64_t InvalidEntryCount = 0; @@ -814,7 +794,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) 0); std::vector<IoHash> BadEntries; - uint64_t BlockFileSize = BlockFile.FileSize(); for (const auto& Entry : LegacyDiskIndex) { const LegacyCasDiskIndexEntry& Record(Entry.second); @@ -840,7 +819,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) if (LegacyDiskIndex.empty()) { - BlockFile.Close(); LegacyCasLog.Close(); if (CleanSource) { @@ -859,219 +837,75 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) return 0; } - for (const auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskIndexEntry& Record(Entry.second); - TotalSize += Record.Location.GetSize(); - } - - uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size()); - uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize; - if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", - m_Config.RootDirectory / m_ContainerBaseName, - MaxRequiredBlockCount, - BlockStoreDiskLocation::MaxBlockIndex); - return 0; - } - - constexpr const uint64_t DiskReserve = 1ul << 28; - - if (CleanSource) - { - if (Space.Free < (m_MaxBlockSize + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(m_MaxBlockSize + DiskReserve), - NiceBytes(Space.Free)); - return 0; - } - } - else - { - if (Space.Free < (RequiredDiskSpace + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(RequiredDiskSpace + DiskReserve), - NiceBytes(Space.Free)); - return 0; - } - } - std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); CreateDirectories(LogPath.parent_path()); TCasLogFile<CasDiskIndexEntry> CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - if (CleanSource && (MaxRequiredBlockCount < 2)) + std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash; + std::vector<BlockStoreLocation> ChunkLocations; + ChunkIndexToChunkHash.reserve(LegacyDiskIndex.size()); + ChunkLocations.reserve(LegacyDiskIndex.size()); + for (const auto& Entry : LegacyDiskIndex) { - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(LegacyDiskIndex.size()); - - // We can use the block as is, just move it and add the blocks to our new log - for (auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskIndexEntry& Record(Entry.second); - - BlockStoreLocation NewChunkLocation{WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()}; - BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment); - LogEntries.push_back( - {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags}); - } - std::filesystem::path BlockPath = BlockStore::GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - CreateDirectories(BlockPath.parent_path()); - BlockFile.Close(); - std::filesystem::rename(LegacyDataPath, BlockPath); - CasLog.Append(LogEntries); - for (const CasDiskIndexEntry& Entry : LogEntries) - { - m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); - } - - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; + const LegacyCasDiskLocation& Location = Entry.second.Location; + const IoHash& ChunkHash = Entry.first; + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.GetOffset(), .Size = Location.GetSize()}); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + TotalSize += Location.GetSize(); } - else - { - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(LegacyDiskIndex.size()); - for (const auto& Entry : LegacyDiskIndex) - { - ChunkHashes.push_back(Entry.first); - } - - std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { - auto LhsKeyIt = LegacyDiskIndex.find(Lhs); - auto RhsKeyIt = LegacyDiskIndex.find(Rhs); - return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset(); - }); - - uint64_t BlockSize = 0; - uint64_t BlockOffset = 0; - std::vector<BlockStoreLocation> NewLocations; - struct BlockData - { - std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; - uint64_t BlockOffset; - uint64_t BlockSize; - uint32_t BlockIndex; - }; - - std::vector<BlockData> BlockRanges; - std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; - BlockRanges.reserve(MaxRequiredBlockCount); - for (const IoHash& ChunkHash : ChunkHashes) - { - const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; - const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location; - - uint64_t ChunkOffset = LegacyChunkLocation.GetOffset(); - uint64_t ChunkSize = LegacyChunkLocation.GetSize(); - uint64_t ChunkEnd = ChunkOffset + ChunkSize; - - if (BlockSize == 0) - { - BlockOffset = ChunkOffset; - } - if ((ChunkEnd - BlockOffset) > m_MaxBlockSize) - { - BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; - BlockRange.Chunks.swap(Chunks); - BlockRanges.push_back(BlockRange); - - WriteBlockIndex++; - while (std::filesystem::exists(BlockStore::GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) - { - ++WriteBlockIndex; - } - BlockOffset = ChunkOffset; - BlockSize = 0; - } - BlockSize = RoundUp(BlockSize, m_PayloadAlignment); - BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; - Chunks.push_back({ChunkHash, ChunkLocation}); - BlockSize = ChunkEnd - BlockOffset; - } - if (BlockSize > 0) - { - BlockRanges.push_back( - {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); - } - Stopwatch WriteBlockTimer; - - std::reverse(BlockRanges.begin(), BlockRanges.end()); - std::vector<std::uint8_t> Buffer(1 << 28); - for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) - { - const BlockData& BlockRange = BlockRanges[Idx]; - if (Idx > 0) - { - uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; - uint64_t Completed = BlockOffset + BlockSize - Remaining; - uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; - - ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", - m_Config.RootDirectory / m_ContainerBaseName, - Idx, - BlockRanges.size(), - NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), - NiceBytes(BlockOffset + BlockSize), - NiceTimeSpanMs(ETA)); - } - - std::filesystem::path BlockPath = BlockStore::GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); - BlockStoreFile ChunkBlock(BlockPath); - ChunkBlock.Create(BlockRange.BlockSize); - uint64_t Offset = 0; - while (Offset < BlockRange.BlockSize) - { - uint64_t Size = BlockRange.BlockSize - Offset; - if (Size > Buffer.size()) - { - Size = Buffer.size(); - } - BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); - ChunkBlock.Write(Buffer.data(), Size, Offset); - Offset += Size; - } - ChunkBlock.Truncate(Offset); - ChunkBlock.Flush(); - + m_BlockStore.Split( + ChunkLocations, + LegacyDataPath, + m_BlocksBasePath, + m_MaxBlockSize, + BlockStoreDiskLocation::MaxBlockIndex + 1, + m_PayloadAlignment, + CleanSource, + [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( + const std::vector<std::pair<size_t, BlockStoreLocation>>& MovedChunks) { std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) + LogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) { - const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; - BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment); - LogEntries.push_back( - {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags}); + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + LogEntries.push_back({.Key = ChunkHash, + .Location = {NewLocation, m_PayloadAlignment}, + .ContentType = OldEntry.ContentType, + .Flags = OldEntry.Flags}); } - CasLog.Append(LogEntries); for (const CasDiskIndexEntry& Entry : LogEntries) { m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); } - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; - + CasLog.Append(LogEntries); + CasLog.Flush(); if (CleanSource) { std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries; - LegacyLogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) + LegacyLogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) { - LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone}); + size_t ChunkIndex = Entry.first; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + LegacyLogEntries.push_back( + LegacyCasDiskIndexEntry{.Key = ChunkHash, + .Location = OldEntry.Location, + .ContentType = OldEntry.ContentType, + .Flags = (uint8_t)(OldEntry.Flags | LegacyCasDiskIndexEntry::kTombstone)}); } LegacyCasLog.Append(LegacyLogEntries); - BlockFile.SetFileSize(BlockRange.BlockOffset); + LegacyCasLog.Flush(); } - } - } + MigratedBlockCount++; + MigratedChunkCount += MovedChunks.size(); + }); - BlockFile.Close(); LegacyCasLog.Close(); CasLog.Close(); |