diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-02 10:48:57 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-02 10:48:57 +0200 |
| commit | 48f2e3af59e2a06c81e37170db95e432b148e5e8 (patch) | |
| tree | c91aef099acb9b2f7555c60ad84ae9c2650db3dc /zenserver/cache/structuredcachestore.cpp | |
| parent | Move bulk of MigrateLegacyData to blockstore.cpp (diff) | |
| download | zen-48f2e3af59e2a06c81e37170db95e432b148e5e8.tar.xz zen-48f2e3af59e2a06c81e37170db95e432b148e5e8.zip | |
refactor structured cache to use blockstore migrate
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 309 |
1 files changed, 66 insertions, 243 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 5cebaa948..e9c051f88 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -877,35 +877,17 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) NiceBytes(TotalSize)); }); - uint32_t WriteBlockIndex = 0; - while (std::filesystem::exists(BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + uint64_t BlockFileSize = 0; { - ++WriteBlockIndex; + BasicFile BlockFile; + BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + BlockFileSize = BlockFile.FileSize(); } - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); - if (Error) - { - ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); - return 0; - } - - if (Space.Free < MaxBlockSize) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", - m_BucketDir / m_BucketName, - MaxBlockSize, - NiceBytes(Space.Free)); - return 0; - } - - BasicFile BlockFile; - BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; uint64_t InvalidEntryCount = 0; + size_t BlockChunkCount = 0; TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog; LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); { @@ -942,7 +924,6 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) 0); std::vector<IoHash> BadEntries; - uint64_t BlockFileSize = BlockFile.FileSize(); for (const auto& Entry : LegacyDiskIndex) { const LegacyDiskIndexEntry& Record(Entry.second); @@ -952,6 +933,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) } if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize) { + BlockChunkCount++; continue; } ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath); @@ -972,7 +954,6 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) if (LegacyDiskIndex.empty()) { LegacyCasLog.Close(); - BlockFile.Close(); if (CleanSource) { // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find @@ -988,250 +969,92 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) return 0; } - uint64_t BlockChunkCount = 0; - uint64_t BlockTotalSize = 0; - for (const auto& Entry : LegacyDiskIndex) - { - const LegacyDiskIndexEntry& Record(Entry.second); - if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) - { - continue; - } - BlockChunkCount++; - BlockTotalSize += Record.Location.Size(); - } - - uint64_t RequiredDiskSpace = BlockTotalSize + ((m_PayloadAlignment - 1) * BlockChunkCount); - uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize; - if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", - m_BucketDir / m_BucketName, - MaxRequiredBlockCount, - BlockStoreDiskLocation::MaxBlockIndex); - return 0; - } - - constexpr const uint64_t DiskReserve = 1ul << 28; - - if (CleanSource) - { - if (Space.Free < (MaxBlockSize + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - m_BucketDir / m_BucketName, - NiceBytes(MaxBlockSize + DiskReserve), - NiceBytes(Space.Free)); - return 0; - } - } - else - { - if (Space.Free < (RequiredDiskSpace + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - m_BucketDir / m_BucketName, - NiceBytes(RequiredDiskSpace + DiskReserve), - NiceBytes(Space.Free)); - return 0; - } - } - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); CreateDirectories(LogPath.parent_path()); TCasLogFile<DiskIndexEntry> CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - if (CleanSource && (MaxRequiredBlockCount < 2)) - { - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(LegacyDiskIndex.size()); + std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash; + std::vector<BlockStoreLocation> ChunkLocations; + ChunkIndexToChunkHash.reserve(BlockChunkCount); + ChunkLocations.reserve(BlockChunkCount); - // We can use the block as is, just move it and add the blocks to our new log - for (auto& Entry : LegacyDiskIndex) - { - const LegacyDiskIndexEntry& Record(Entry.second); + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount); - DiskLocation NewLocation; - uint8_t Flags = 0xff & (Record.Location.Flags() >> 56); - if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) - { - NewLocation = DiskLocation(Record.Location.Size(), Flags); - } - else - { - BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.Offset(), Record.Location.Size()); - NewLocation = DiskLocation(NewChunkLocation, m_PayloadAlignment, Flags); - } - LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation}); - } - std::filesystem::path BlockPath = BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - CreateDirectories(BlockPath.parent_path()); - BlockFile.Close(); - std::filesystem::rename(LegacyDataPath, BlockPath); - CasLog.Append(LogEntries); - for (const DiskIndexEntry& Entry : LogEntries) + for (const auto& Entry : LegacyDiskIndex) + { + const IoHash& ChunkHash = Entry.first; + const LegacyDiskLocation& Location = Entry.second.Location; + if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) { - m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + uint8_t Flags = 0xff & (Location.Flags() >> 56); + DiskLocation NewLocation = DiskLocation(Location.Size(), Flags); + LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation}); + continue; } - - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()}); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + TotalSize += Location.Size(); } - else + for (const DiskIndexEntry& Entry : LogEntries) { - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(LegacyDiskIndex.size()); - for (const auto& Entry : LegacyDiskIndex) - { - ChunkHashes.push_back(Entry.first); - } - - std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { - auto LhsKeyIt = LegacyDiskIndex.find(Lhs); - auto RhsKeyIt = LegacyDiskIndex.find(Rhs); - return LhsKeyIt->second.Location.Offset() < RhsKeyIt->second.Location.Offset(); - }); - - uint64_t BlockSize = 0; - uint64_t BlockOffset = 0; - std::vector<BlockStoreLocation> NewLocations; - struct BlockData - { - std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; - uint64_t BlockOffset; - uint64_t BlockSize; - uint32_t BlockIndex; - }; - - std::vector<BlockData> BlockRanges; - std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; - BlockRanges.reserve(MaxRequiredBlockCount); - for (const IoHash& ChunkHash : ChunkHashes) - { - const LegacyDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; - const LegacyDiskLocation& LegacyChunkLocation = LegacyEntry.Location; - - if (LegacyChunkLocation.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) - { - // For standalone files we just store the chunk hash an use the size from the legacy index as is - Chunks.push_back({ChunkHash, {}}); - continue; - } - - uint64_t ChunkOffset = LegacyChunkLocation.Offset(); - uint64_t ChunkSize = LegacyChunkLocation.Size(); - uint64_t ChunkEnd = ChunkOffset + ChunkSize; - - if (BlockSize == 0) - { - BlockOffset = ChunkOffset; - } - if ((ChunkEnd - BlockOffset) > MaxBlockSize) - { - BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; - BlockRange.Chunks.swap(Chunks); - BlockRanges.push_back(BlockRange); - - WriteBlockIndex++; - while (std::filesystem::exists(BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) - { - ++WriteBlockIndex; - } - BlockOffset = ChunkOffset; - BlockSize = 0; - } - BlockSize = RoundUp(BlockSize, m_PayloadAlignment); - BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; - Chunks.push_back({ChunkHash, ChunkLocation}); - BlockSize = ChunkEnd - BlockOffset; - } - if (BlockSize > 0) - { - BlockRanges.push_back( - {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); - } - Stopwatch WriteBlockTimer; - - std::reverse(BlockRanges.begin(), BlockRanges.end()); - std::vector<std::uint8_t> Buffer(1 << 28); - for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) - { - const BlockData& BlockRange = BlockRanges[Idx]; - if (Idx > 0) - { - uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; - uint64_t Completed = BlockOffset + BlockSize - Remaining; - uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; - - ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", - m_BucketDir / m_BucketDir, - Idx, - BlockRanges.size(), - NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), - NiceBytes(BlockOffset + BlockSize), - NiceTimeSpanMs(ETA)); - } - - std::filesystem::path BlockPath = BlockStore ::GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); - BlockStoreFile ChunkBlock(BlockPath); - ChunkBlock.Create(BlockRange.BlockSize); - uint64_t Offset = 0; - while (Offset < BlockRange.BlockSize) - { - uint64_t Size = BlockRange.BlockSize - Offset; - if (Size > Buffer.size()) - { - Size = Buffer.size(); - } - BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); - ChunkBlock.Write(Buffer.data(), Size, Offset); - Offset += Size; - } - ChunkBlock.Truncate(Offset); - ChunkBlock.Flush(); + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + } + CasLog.Append(LogEntries); + m_BlockStore.Split( + ChunkLocations, + LegacyDataPath, + m_BlocksBasePath, + MaxBlockSize, + BlockStoreDiskLocation::MaxBlockIndex + 1, + m_PayloadAlignment, + CleanSource, + [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( + const std::vector<std::pair<size_t, BlockStoreLocation>>& MovedChunks) { std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) + LogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) { - const LegacyDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; - - DiskLocation NewLocation; - uint8_t Flags = 0xff & (LegacyEntry.Location.Flags() >> 56); - if (LegacyEntry.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) - { - NewLocation = DiskLocation(LegacyEntry.Location.Size(), Flags); - } - else - { - NewLocation = DiskLocation(Entry.second, m_PayloadAlignment, Flags); - } - LogEntries.push_back({.Key = Entry.first, .Location = NewLocation}); + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + const LegacyDiskLocation& OldLocation = OldEntry.Location; + uint8_t Flags = 0xff & (OldLocation.Flags() >> 56); + LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)}); } - CasLog.Append(LogEntries); for (const DiskIndexEntry& Entry : LogEntries) { m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); } - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; - + CasLog.Append(LogEntries); + CasLog.Flush(); if (CleanSource) { std::vector<LegacyDiskIndexEntry> LegacyLogEntries; - LegacyLogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) + LegacyLogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) { - LegacyLogEntries.push_back( - {.Key = Entry.first, .Location = LegacyDiskLocation(0, 0, 0, LegacyDiskLocation::kTombStone)}); + size_t ChunkIndex = Entry.first; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + const LegacyDiskLocation& OldLocation = OldEntry.Location; + LegacyDiskLocation NewLocation(OldLocation.Offset(), + OldLocation.Size(), + 0, + OldLocation.Flags() | LegacyDiskLocation::kTombStone); + LegacyLogEntries.push_back(LegacyDiskIndexEntry(ChunkHash, NewLocation)); } LegacyCasLog.Append(LegacyLogEntries); - BlockFile.SetFileSize(BlockRange.BlockOffset); + LegacyCasLog.Flush(); } - } - } - BlockFile.Close(); + MigratedBlockCount++; + MigratedChunkCount += MovedChunks.size(); + }); + LegacyCasLog.Close(); CasLog.Close(); |