diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-02 10:18:31 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-02 10:18:31 +0200 |
| commit | c89190f7fabf8a08cda2255937dc99ca35972210 (patch) | |
| tree | f67248118b6dc47f5f3665ba09f7745bd69b0f5a | |
| parent | cleanup (diff) | |
| download | zen-c89190f7fabf8a08cda2255937dc99ca35972210.tar.xz zen-c89190f7fabf8a08cda2255937dc99ca35972210.zip | |
Move bulk of MigrateLegacyData to blockstore.cpp
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 1 | ||||
| -rw-r--r-- | zenstore/blockstore.cpp | 219 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 268 | ||||
| -rw-r--r-- | zenstore/include/zenstore/blockstore.h | 23 |
4 files changed, 281 insertions, 230 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 9ae5b0f17..5cebaa948 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1836,7 +1836,6 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_PayloadAlignment, false, [this, &DeletedChunks, &ChunkIndexToChunkHash, &Index, &ReadBlockTimeUs, &ReadBlockLongestTimeUs]( - uint32_t, const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, const std::vector<size_t>& RemovedChunks) { std::vector<DiskIndexEntry> LogEntries; diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp index 9961e734d..6f5578be8 100644 --- a/zenstore/blockstore.cpp +++ b/zenstore/blockstore.cpp @@ -437,7 +437,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, { DeletedSize += ChunkLocations[DeleteIndex].Size; } - Callback(BlockIndex, {}, DeleteMap); + Callback({}, DeleteMap); DeletedCount += DeleteMap.size(); { RwLock::ExclusiveLockScope _i(m_InsertLock); @@ -477,7 +477,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, NewBlockFile->Flush(); } { - Callback(0xfffffffful, MovedChunks, {}); + Callback(MovedChunks, {}); MovedCount += KeepMap.size(); MovedChunks.clear(); RwLock::ExclusiveLockScope __(m_InsertLock); @@ -558,7 +558,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, DeletedSize += ChunkLocations[DeleteIndex].Size; } - Callback(BlockIndex, MovedChunks, DeleteMap); + Callback(MovedChunks, DeleteMap); MovedCount += KeepMap.size(); DeletedCount += DeleteMap.size(); MovedChunks.clear(); @@ -599,7 +599,7 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, IoBuffer ReadBuffer{WindowSize}; void* BufferBase = ReadBuffer.MutableData(); - RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time? + RwLock::SharedLockScope _(m_InsertLock); for (const auto& Block : m_ChunkBlocks) { @@ -659,6 +659,217 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, } } +bool +BlockStore::Split(const std::vector<BlockStoreLocation>& ChunkLocations, + const std::filesystem::path& SourceBlockFilePath, + const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + size_t PayloadAlignment, + bool CleanSource, + const SplitCallback& Callback) +{ + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(BlocksBasePath.parent_path(), Error); + if (Error) + { + ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", BlocksBasePath, Error.message()); + return false; + } + + if (Space.Free < MaxBlockSize) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", + BlocksBasePath, + MaxBlockSize, + NiceBytes(Space.Free)); + return false; + } + + size_t TotalSize = 0; + for (const BlockStoreLocation& Location : ChunkLocations) + { + TotalSize += Location.Size; + } + size_t ChunkCount = ChunkLocations.size(); + uint64_t RequiredDiskSpace = TotalSize + ((PayloadAlignment - 1) * ChunkCount); + uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize; + if (MaxRequiredBlockCount > MaxBlockCount) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", + BlocksBasePath, + MaxRequiredBlockCount, + MaxBlockCount); + return false; + } + + constexpr const uint64_t DiskReserve = 1ul << 28; + + if (CleanSource) + { + if (Space.Free < (MaxBlockSize + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + BlocksBasePath, + NiceBytes(MaxBlockSize + DiskReserve), + NiceBytes(Space.Free)); + return false; + } + } + else + { + if (Space.Free < (RequiredDiskSpace + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + BlocksBasePath, + NiceBytes(RequiredDiskSpace + DiskReserve), + NiceBytes(Space.Free)); + return false; + } + } + + uint32_t WriteBlockIndex = 0; + while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + + BasicFile BlockFile; + BlockFile.Open(SourceBlockFilePath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + + if (CleanSource && (MaxRequiredBlockCount < 2)) + { + std::vector<std::pair<size_t, BlockStoreLocation>> Chunks; + Chunks.reserve(ChunkCount); + for (size_t Index = 0; Index < ChunkCount; ++Index) + { + const BlockStoreLocation& ChunkLocation = ChunkLocations[Index]; + Chunks.push_back({Index, {.BlockIndex = WriteBlockIndex, .Offset = ChunkLocation.Offset, .Size = ChunkLocation.Size}}); + } + std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex); + CreateDirectories(BlockPath.parent_path()); + BlockFile.Close(); + std::filesystem::rename(SourceBlockFilePath, BlockPath); + Callback(Chunks); + return true; + } + + std::vector<size_t> ChunkIndexes; + ChunkIndexes.reserve(ChunkCount); + for (size_t Index = 0; Index < ChunkCount; ++Index) + { + ChunkIndexes.push_back(Index); + } + + std::sort(begin(ChunkIndexes), end(ChunkIndexes), [&ChunkLocations](size_t Lhs, size_t Rhs) { + const BlockStoreLocation& LhsLocation = ChunkLocations[Lhs]; + const BlockStoreLocation& RhsLocation = ChunkLocations[Rhs]; + return LhsLocation.Offset < RhsLocation.Offset; + }); + + uint64_t BlockSize = 0; + uint64_t BlockOffset = 0; + std::vector<BlockStoreLocation> NewLocations; + struct BlockData + { + std::vector<std::pair<size_t, BlockStoreLocation>> Chunks; + uint64_t BlockOffset; + uint64_t BlockSize; + uint32_t BlockIndex; + }; + + std::vector<BlockData> BlockRanges; + std::vector<std::pair<size_t, BlockStoreLocation>> Chunks; + BlockRanges.reserve(MaxRequiredBlockCount); + for (const size_t& ChunkIndex : ChunkIndexes) + { + const BlockStoreLocation& LegacyChunkLocation = ChunkLocations[ChunkIndex]; + + uint64_t ChunkOffset = LegacyChunkLocation.Offset; + uint64_t ChunkSize = LegacyChunkLocation.Size; + uint64_t ChunkEnd = ChunkOffset + ChunkSize; + + if (BlockSize == 0) + { + BlockOffset = ChunkOffset; + } + if ((ChunkEnd - BlockOffset) > MaxBlockSize) + { + BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; + BlockRange.Chunks.swap(Chunks); + BlockRanges.push_back(BlockRange); + + WriteBlockIndex++; + while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + BlockOffset = ChunkOffset; + BlockSize = 0; + } + BlockSize = RoundUp(BlockSize, PayloadAlignment); + BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; + Chunks.push_back({ChunkIndex, ChunkLocation}); + BlockSize = ChunkEnd - BlockOffset; + } + if (BlockSize > 0) + { + BlockRanges.push_back( + {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); + } + + Stopwatch WriteBlockTimer; + + std::reverse(BlockRanges.begin(), BlockRanges.end()); + std::vector<std::uint8_t> Buffer(1 << 28); + for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) + { + const BlockData& BlockRange = BlockRanges[Idx]; + if (Idx > 0) + { + uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; + uint64_t Completed = BlockOffset + BlockSize - Remaining; + uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; + + ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", + BlocksBasePath, + Idx, + BlockRanges.size(), + NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), + NiceBytes(BlockOffset + BlockSize), + NiceTimeSpanMs(ETA)); + } + + std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, BlockRange.BlockIndex); + BlockStoreFile ChunkBlock(BlockPath); + ChunkBlock.Create(BlockRange.BlockSize); + uint64_t Offset = 0; + while (Offset < BlockRange.BlockSize) + { + uint64_t Size = BlockRange.BlockSize - Offset; + if (Size > Buffer.size()) + { + Size = Buffer.size(); + } + BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); + ChunkBlock.Write(Buffer.data(), Size, Offset); + Offset += Size; + } + ChunkBlock.Truncate(Offset); + ChunkBlock.Flush(); + + Callback(BlockRange.Chunks); + + if (CleanSource) + { + BlockFile.SetFileSize(BlockRange.BlockOffset); + } + } + BlockFile.Close(); + + return true; +} + const char* BlockStore::GetBlockFileExtension() { diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index a79928fba..8d90ba186 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -493,7 +493,6 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) m_PayloadAlignment, false, [this, &DeletedChunks, &ChunkIndexToChunkHash, &LocationMap, &ReadBlockTimeUs, &ReadBlockLongestTimeUs]( - uint32_t, const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, const std::vector<size_t>& RemovedChunks) { std::vector<CasDiskIndexEntry> LogEntries; @@ -753,32 +752,13 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) NiceBytes(TotalSize)); }); - uint32_t WriteBlockIndex = 0; - while (std::filesystem::exists(BlockStore::GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + uint64_t BlockFileSize = 0; { - ++WriteBlockIndex; + BasicFile BlockFile; + BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + BlockFileSize = BlockFile.FileSize(); } - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); - if (Error) - { - ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message()); - return 0; - } - - if (Space.Free < m_MaxBlockSize) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", - m_Config.RootDirectory / m_ContainerBaseName, - m_MaxBlockSize, - NiceBytes(Space.Free)); - return 0; - } - - BasicFile BlockFile; - BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; uint64_t InvalidEntryCount = 0; @@ -814,7 +794,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) 0); std::vector<IoHash> BadEntries; - uint64_t BlockFileSize = BlockFile.FileSize(); for (const auto& Entry : LegacyDiskIndex) { const LegacyCasDiskIndexEntry& Record(Entry.second); @@ -840,7 +819,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) if (LegacyDiskIndex.empty()) { - BlockFile.Close(); LegacyCasLog.Close(); if (CleanSource) { @@ -859,219 +837,75 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) return 0; } - for (const auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskIndexEntry& Record(Entry.second); - TotalSize += Record.Location.GetSize(); - } - - uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size()); - uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize; - if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", - m_Config.RootDirectory / m_ContainerBaseName, - MaxRequiredBlockCount, - BlockStoreDiskLocation::MaxBlockIndex); - return 0; - } - - constexpr const uint64_t DiskReserve = 1ul << 28; - - if (CleanSource) - { - if (Space.Free < (m_MaxBlockSize + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(m_MaxBlockSize + DiskReserve), - NiceBytes(Space.Free)); - return 0; - } - } - else - { - if (Space.Free < (RequiredDiskSpace + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(RequiredDiskSpace + DiskReserve), - NiceBytes(Space.Free)); - return 0; - } - } - std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); CreateDirectories(LogPath.parent_path()); TCasLogFile<CasDiskIndexEntry> CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - if (CleanSource && (MaxRequiredBlockCount < 2)) + std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash; + std::vector<BlockStoreLocation> ChunkLocations; + ChunkIndexToChunkHash.reserve(LegacyDiskIndex.size()); + ChunkLocations.reserve(LegacyDiskIndex.size()); + for (const auto& Entry : LegacyDiskIndex) { - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(LegacyDiskIndex.size()); - - // We can use the block as is, just move it and add the blocks to our new log - for (auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskIndexEntry& Record(Entry.second); - - BlockStoreLocation NewChunkLocation{WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()}; - BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment); - LogEntries.push_back( - {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags}); - } - std::filesystem::path BlockPath = BlockStore::GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - CreateDirectories(BlockPath.parent_path()); - BlockFile.Close(); - std::filesystem::rename(LegacyDataPath, BlockPath); - CasLog.Append(LogEntries); - for (const CasDiskIndexEntry& Entry : LogEntries) - { - m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); - } - - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; + const LegacyCasDiskLocation& Location = Entry.second.Location; + const IoHash& ChunkHash = Entry.first; + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.GetOffset(), .Size = Location.GetSize()}); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + TotalSize += Location.GetSize(); } - else - { - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(LegacyDiskIndex.size()); - for (const auto& Entry : LegacyDiskIndex) - { - ChunkHashes.push_back(Entry.first); - } - - std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { - auto LhsKeyIt = LegacyDiskIndex.find(Lhs); - auto RhsKeyIt = LegacyDiskIndex.find(Rhs); - return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset(); - }); - - uint64_t BlockSize = 0; - uint64_t BlockOffset = 0; - std::vector<BlockStoreLocation> NewLocations; - struct BlockData - { - std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; - uint64_t BlockOffset; - uint64_t BlockSize; - uint32_t BlockIndex; - }; - - std::vector<BlockData> BlockRanges; - std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; - BlockRanges.reserve(MaxRequiredBlockCount); - for (const IoHash& ChunkHash : ChunkHashes) - { - const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; - const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location; - - uint64_t ChunkOffset = LegacyChunkLocation.GetOffset(); - uint64_t ChunkSize = LegacyChunkLocation.GetSize(); - uint64_t ChunkEnd = ChunkOffset + ChunkSize; - - if (BlockSize == 0) - { - BlockOffset = ChunkOffset; - } - if ((ChunkEnd - BlockOffset) > m_MaxBlockSize) - { - BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; - BlockRange.Chunks.swap(Chunks); - BlockRanges.push_back(BlockRange); - - WriteBlockIndex++; - while (std::filesystem::exists(BlockStore::GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) - { - ++WriteBlockIndex; - } - BlockOffset = ChunkOffset; - BlockSize = 0; - } - BlockSize = RoundUp(BlockSize, m_PayloadAlignment); - BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; - Chunks.push_back({ChunkHash, ChunkLocation}); - BlockSize = ChunkEnd - BlockOffset; - } - if (BlockSize > 0) - { - BlockRanges.push_back( - {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); - } - Stopwatch WriteBlockTimer; - - std::reverse(BlockRanges.begin(), BlockRanges.end()); - std::vector<std::uint8_t> Buffer(1 << 28); - for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) - { - const BlockData& BlockRange = BlockRanges[Idx]; - if (Idx > 0) - { - uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; - uint64_t Completed = BlockOffset + BlockSize - Remaining; - uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; - - ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", - m_Config.RootDirectory / m_ContainerBaseName, - Idx, - BlockRanges.size(), - NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), - NiceBytes(BlockOffset + BlockSize), - NiceTimeSpanMs(ETA)); - } - - std::filesystem::path BlockPath = BlockStore::GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); - BlockStoreFile ChunkBlock(BlockPath); - ChunkBlock.Create(BlockRange.BlockSize); - uint64_t Offset = 0; - while (Offset < BlockRange.BlockSize) - { - uint64_t Size = BlockRange.BlockSize - Offset; - if (Size > Buffer.size()) - { - Size = Buffer.size(); - } - BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); - ChunkBlock.Write(Buffer.data(), Size, Offset); - Offset += Size; - } - ChunkBlock.Truncate(Offset); - ChunkBlock.Flush(); - + m_BlockStore.Split( + ChunkLocations, + LegacyDataPath, + m_BlocksBasePath, + m_MaxBlockSize, + BlockStoreDiskLocation::MaxBlockIndex + 1, + m_PayloadAlignment, + CleanSource, + [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( + const std::vector<std::pair<size_t, BlockStoreLocation>>& MovedChunks) { std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) + LogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) { - const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; - BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment); - LogEntries.push_back( - {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags}); + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + LogEntries.push_back({.Key = ChunkHash, + .Location = {NewLocation, m_PayloadAlignment}, + .ContentType = OldEntry.ContentType, + .Flags = OldEntry.Flags}); } - CasLog.Append(LogEntries); for (const CasDiskIndexEntry& Entry : LogEntries) { m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); } - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; - + CasLog.Append(LogEntries); + CasLog.Flush(); if (CleanSource) { std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries; - LegacyLogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) + LegacyLogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) { - LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone}); + size_t ChunkIndex = Entry.first; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + LegacyLogEntries.push_back( + LegacyCasDiskIndexEntry{.Key = ChunkHash, + .Location = OldEntry.Location, + .ContentType = OldEntry.ContentType, + .Flags = (uint8_t)(OldEntry.Flags | LegacyCasDiskIndexEntry::kTombstone)}); } LegacyCasLog.Append(LegacyLogEntries); - BlockFile.SetFileSize(BlockRange.BlockOffset); + LegacyCasLog.Flush(); } - } - } + MigratedBlockCount++; + MigratedChunkCount += MovedChunks.size(); + }); - BlockFile.Close(); LegacyCasLog.Close(); CasLog.Close(); diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h index 21c02d389..0cef7600f 100644 --- a/zenstore/include/zenstore/blockstore.h +++ b/zenstore/include/zenstore/blockstore.h @@ -117,14 +117,14 @@ public: std::unordered_set<uint32_t> ExcludeBlockIndexes; size_t BlockCount; }; - typedef std::function<void(uint32_t BlockIndex, - const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, - const std::vector<size_t>& RemovedChunks)> + typedef std::function<void(const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, const std::vector<size_t>& RemovedChunks)> ReclaimCallback; typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback; typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback; typedef std::function<void(size_t ChunkIndex, BasicFile& BlockFile, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback; + typedef std::function<void(const std::vector<std::pair<size_t, BlockStoreLocation>>& MovedChunks)> SplitCallback; + void Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount, @@ -144,11 +144,18 @@ public: const std::vector<size_t>& KeepChunkIndexes, uint64_t PayloadAlignment, bool DryRun, - const ReclaimCallback& Callback = [](uint32_t, const std::unordered_map<size_t, BlockStoreLocation>&, const std::vector<size_t>&) { - }); - void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, - IterateChunksSmallSizeCallback SmallSizeCallback, - IterateChunksLargeSizeCallback LargeSizeCallback); + const ReclaimCallback& Callback = [](const std::unordered_map<size_t, BlockStoreLocation>&, const std::vector<size_t>&) {}); + void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, + IterateChunksSmallSizeCallback SmallSizeCallback, + IterateChunksLargeSizeCallback LargeSizeCallback); + static bool Split(const std::vector<BlockStoreLocation>& ChunkLocations, + const std::filesystem::path& SourceBlockFilePath, + const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + size_t PayloadAlignment, + bool CleanSource, + const SplitCallback& Callback); static const char* GetBlockFileExtension(); static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex); |