aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-02 10:18:31 +0200
committerDan Engelbrecht <[email protected]>2022-05-02 10:18:31 +0200
commitc89190f7fabf8a08cda2255937dc99ca35972210 (patch)
treef67248118b6dc47f5f3665ba09f7745bd69b0f5a /zenstore/compactcas.cpp
parentcleanup (diff)
downloadzen-c89190f7fabf8a08cda2255937dc99ca35972210.tar.xz
zen-c89190f7fabf8a08cda2255937dc99ca35972210.zip
Move bulk of MigrateLegacyData to blockstore.cpp
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp268
1 files changed, 51 insertions, 217 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index a79928fba..8d90ba186 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -493,7 +493,6 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
m_PayloadAlignment,
false,
[this, &DeletedChunks, &ChunkIndexToChunkHash, &LocationMap, &ReadBlockTimeUs, &ReadBlockLongestTimeUs](
- uint32_t,
const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks,
const std::vector<size_t>& RemovedChunks) {
std::vector<CasDiskIndexEntry> LogEntries;
@@ -753,32 +752,13 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
NiceBytes(TotalSize));
});
- uint32_t WriteBlockIndex = 0;
- while (std::filesystem::exists(BlockStore::GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
+ uint64_t BlockFileSize = 0;
{
- ++WriteBlockIndex;
+ BasicFile BlockFile;
+ BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+ BlockFileSize = BlockFile.FileSize();
}
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
- if (Error)
- {
- ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message());
- return 0;
- }
-
- if (Space.Free < m_MaxBlockSize)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- m_MaxBlockSize,
- NiceBytes(Space.Free));
- return 0;
- }
-
- BasicFile BlockFile;
- BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
-
std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
uint64_t InvalidEntryCount = 0;
@@ -814,7 +794,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
0);
std::vector<IoHash> BadEntries;
- uint64_t BlockFileSize = BlockFile.FileSize();
for (const auto& Entry : LegacyDiskIndex)
{
const LegacyCasDiskIndexEntry& Record(Entry.second);
@@ -840,7 +819,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
if (LegacyDiskIndex.empty())
{
- BlockFile.Close();
LegacyCasLog.Close();
if (CleanSource)
{
@@ -859,219 +837,75 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
return 0;
}
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskIndexEntry& Record(Entry.second);
- TotalSize += Record.Location.GetSize();
- }
-
- uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size());
- uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize;
- if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- MaxRequiredBlockCount,
- BlockStoreDiskLocation::MaxBlockIndex);
- return 0;
- }
-
- constexpr const uint64_t DiskReserve = 1ul << 28;
-
- if (CleanSource)
- {
- if (Space.Free < (m_MaxBlockSize + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceBytes(m_MaxBlockSize + DiskReserve),
- NiceBytes(Space.Free));
- return 0;
- }
- }
- else
- {
- if (Space.Free < (RequiredDiskSpace + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceBytes(RequiredDiskSpace + DiskReserve),
- NiceBytes(Space.Free));
- return 0;
- }
- }
-
std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
CreateDirectories(LogPath.parent_path());
TCasLogFile<CasDiskIndexEntry> CasLog;
CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
- if (CleanSource && (MaxRequiredBlockCount < 2))
+ std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ ChunkIndexToChunkHash.reserve(LegacyDiskIndex.size());
+ ChunkLocations.reserve(LegacyDiskIndex.size());
+ for (const auto& Entry : LegacyDiskIndex)
{
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(LegacyDiskIndex.size());
-
- // We can use the block as is, just move it and add the blocks to our new log
- for (auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskIndexEntry& Record(Entry.second);
-
- BlockStoreLocation NewChunkLocation{WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()};
- BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment);
- LogEntries.push_back(
- {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags});
- }
- std::filesystem::path BlockPath = BlockStore::GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
- CreateDirectories(BlockPath.parent_path());
- BlockFile.Close();
- std::filesystem::rename(LegacyDataPath, BlockPath);
- CasLog.Append(LogEntries);
- for (const CasDiskIndexEntry& Entry : LogEntries)
- {
- m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
- }
-
- MigratedChunkCount += LogEntries.size();
- MigratedBlockCount++;
+ const LegacyCasDiskLocation& Location = Entry.second.Location;
+ const IoHash& ChunkHash = Entry.first;
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.GetOffset(), .Size = Location.GetSize()});
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ TotalSize += Location.GetSize();
}
- else
- {
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(LegacyDiskIndex.size());
- for (const auto& Entry : LegacyDiskIndex)
- {
- ChunkHashes.push_back(Entry.first);
- }
-
- std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
- auto LhsKeyIt = LegacyDiskIndex.find(Lhs);
- auto RhsKeyIt = LegacyDiskIndex.find(Rhs);
- return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset();
- });
-
- uint64_t BlockSize = 0;
- uint64_t BlockOffset = 0;
- std::vector<BlockStoreLocation> NewLocations;
- struct BlockData
- {
- std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
- uint64_t BlockOffset;
- uint64_t BlockSize;
- uint32_t BlockIndex;
- };
-
- std::vector<BlockData> BlockRanges;
- std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
- BlockRanges.reserve(MaxRequiredBlockCount);
- for (const IoHash& ChunkHash : ChunkHashes)
- {
- const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash];
- const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location;
-
- uint64_t ChunkOffset = LegacyChunkLocation.GetOffset();
- uint64_t ChunkSize = LegacyChunkLocation.GetSize();
- uint64_t ChunkEnd = ChunkOffset + ChunkSize;
-
- if (BlockSize == 0)
- {
- BlockOffset = ChunkOffset;
- }
- if ((ChunkEnd - BlockOffset) > m_MaxBlockSize)
- {
- BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
- BlockRange.Chunks.swap(Chunks);
- BlockRanges.push_back(BlockRange);
-
- WriteBlockIndex++;
- while (std::filesystem::exists(BlockStore::GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
- {
- ++WriteBlockIndex;
- }
- BlockOffset = ChunkOffset;
- BlockSize = 0;
- }
- BlockSize = RoundUp(BlockSize, m_PayloadAlignment);
- BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
- Chunks.push_back({ChunkHash, ChunkLocation});
- BlockSize = ChunkEnd - BlockOffset;
- }
- if (BlockSize > 0)
- {
- BlockRanges.push_back(
- {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
- }
- Stopwatch WriteBlockTimer;
-
- std::reverse(BlockRanges.begin(), BlockRanges.end());
- std::vector<std::uint8_t> Buffer(1 << 28);
- for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
- {
- const BlockData& BlockRange = BlockRanges[Idx];
- if (Idx > 0)
- {
- uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
- uint64_t Completed = BlockOffset + BlockSize - Remaining;
- uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
-
- ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- Idx,
- BlockRanges.size(),
- NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
- NiceBytes(BlockOffset + BlockSize),
- NiceTimeSpanMs(ETA));
- }
-
- std::filesystem::path BlockPath = BlockStore::GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex);
- BlockStoreFile ChunkBlock(BlockPath);
- ChunkBlock.Create(BlockRange.BlockSize);
- uint64_t Offset = 0;
- while (Offset < BlockRange.BlockSize)
- {
- uint64_t Size = BlockRange.BlockSize - Offset;
- if (Size > Buffer.size())
- {
- Size = Buffer.size();
- }
- BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
- ChunkBlock.Write(Buffer.data(), Size, Offset);
- Offset += Size;
- }
- ChunkBlock.Truncate(Offset);
- ChunkBlock.Flush();
-
+ m_BlockStore.Split(
+ ChunkLocations,
+ LegacyDataPath,
+ m_BlocksBasePath,
+ m_MaxBlockSize,
+ BlockStoreDiskLocation::MaxBlockIndex + 1,
+ m_PayloadAlignment,
+ CleanSource,
+ [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
+ const std::vector<std::pair<size_t, BlockStoreLocation>>& MovedChunks) {
std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(BlockRange.Chunks.size());
- for (const auto& Entry : BlockRange.Chunks)
+ LogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first];
- BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment);
- LogEntries.push_back(
- {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags});
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ LogEntries.push_back({.Key = ChunkHash,
+ .Location = {NewLocation, m_PayloadAlignment},
+ .ContentType = OldEntry.ContentType,
+ .Flags = OldEntry.Flags});
}
- CasLog.Append(LogEntries);
for (const CasDiskIndexEntry& Entry : LogEntries)
{
m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
}
- MigratedChunkCount += LogEntries.size();
- MigratedBlockCount++;
-
+ CasLog.Append(LogEntries);
+ CasLog.Flush();
if (CleanSource)
{
std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries;
- LegacyLogEntries.reserve(BlockRange.Chunks.size());
- for (const auto& Entry : BlockRange.Chunks)
+ LegacyLogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone});
+ size_t ChunkIndex = Entry.first;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ LegacyLogEntries.push_back(
+ LegacyCasDiskIndexEntry{.Key = ChunkHash,
+ .Location = OldEntry.Location,
+ .ContentType = OldEntry.ContentType,
+ .Flags = (uint8_t)(OldEntry.Flags | LegacyCasDiskIndexEntry::kTombstone)});
}
LegacyCasLog.Append(LegacyLogEntries);
- BlockFile.SetFileSize(BlockRange.BlockOffset);
+ LegacyCasLog.Flush();
}
- }
- }
+ MigratedBlockCount++;
+ MigratedChunkCount += MovedChunks.size();
+ });
- BlockFile.Close();
LegacyCasLog.Close();
CasLog.Close();