aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-02 10:18:31 +0200
committerDan Engelbrecht <[email protected]>2022-05-02 10:18:31 +0200
commitc89190f7fabf8a08cda2255937dc99ca35972210 (patch)
treef67248118b6dc47f5f3665ba09f7745bd69b0f5a
parentcleanup (diff)
downloadzen-c89190f7fabf8a08cda2255937dc99ca35972210.tar.xz
zen-c89190f7fabf8a08cda2255937dc99ca35972210.zip
Move bulk of MigrateLegacyData to blockstore.cpp
-rw-r--r--zenserver/cache/structuredcachestore.cpp1
-rw-r--r--zenstore/blockstore.cpp219
-rw-r--r--zenstore/compactcas.cpp268
-rw-r--r--zenstore/include/zenstore/blockstore.h23
4 files changed, 281 insertions, 230 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 9ae5b0f17..5cebaa948 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -1836,7 +1836,6 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
m_PayloadAlignment,
false,
[this, &DeletedChunks, &ChunkIndexToChunkHash, &Index, &ReadBlockTimeUs, &ReadBlockLongestTimeUs](
- uint32_t,
const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks,
const std::vector<size_t>& RemovedChunks) {
std::vector<DiskIndexEntry> LogEntries;
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index 9961e734d..6f5578be8 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -437,7 +437,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
{
DeletedSize += ChunkLocations[DeleteIndex].Size;
}
- Callback(BlockIndex, {}, DeleteMap);
+ Callback({}, DeleteMap);
DeletedCount += DeleteMap.size();
{
RwLock::ExclusiveLockScope _i(m_InsertLock);
@@ -477,7 +477,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
NewBlockFile->Flush();
}
{
- Callback(0xfffffffful, MovedChunks, {});
+ Callback(MovedChunks, {});
MovedCount += KeepMap.size();
MovedChunks.clear();
RwLock::ExclusiveLockScope __(m_InsertLock);
@@ -558,7 +558,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
DeletedSize += ChunkLocations[DeleteIndex].Size;
}
- Callback(BlockIndex, MovedChunks, DeleteMap);
+ Callback(MovedChunks, DeleteMap);
MovedCount += KeepMap.size();
DeletedCount += DeleteMap.size();
MovedChunks.clear();
@@ -599,7 +599,7 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
IoBuffer ReadBuffer{WindowSize};
void* BufferBase = ReadBuffer.MutableData();
- RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time?
+ RwLock::SharedLockScope _(m_InsertLock);
for (const auto& Block : m_ChunkBlocks)
{
@@ -659,6 +659,217 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
}
}
+bool
+BlockStore::Split(const std::vector<BlockStoreLocation>& ChunkLocations,
+ const std::filesystem::path& SourceBlockFilePath,
+ const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ size_t PayloadAlignment,
+ bool CleanSource,
+ const SplitCallback& Callback)
+{
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(BlocksBasePath.parent_path(), Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", BlocksBasePath, Error.message());
+ return false;
+ }
+
+ if (Space.Free < MaxBlockSize)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
+ BlocksBasePath,
+ MaxBlockSize,
+ NiceBytes(Space.Free));
+ return false;
+ }
+
+ size_t TotalSize = 0;
+ for (const BlockStoreLocation& Location : ChunkLocations)
+ {
+ TotalSize += Location.Size;
+ }
+ size_t ChunkCount = ChunkLocations.size();
+ uint64_t RequiredDiskSpace = TotalSize + ((PayloadAlignment - 1) * ChunkCount);
+ uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize;
+ if (MaxRequiredBlockCount > MaxBlockCount)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
+ BlocksBasePath,
+ MaxRequiredBlockCount,
+ MaxBlockCount);
+ return false;
+ }
+
+ constexpr const uint64_t DiskReserve = 1ul << 28;
+
+ if (CleanSource)
+ {
+ if (Space.Free < (MaxBlockSize + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ BlocksBasePath,
+ NiceBytes(MaxBlockSize + DiskReserve),
+ NiceBytes(Space.Free));
+ return false;
+ }
+ }
+ else
+ {
+ if (Space.Free < (RequiredDiskSpace + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ BlocksBasePath,
+ NiceBytes(RequiredDiskSpace + DiskReserve),
+ NiceBytes(Space.Free));
+ return false;
+ }
+ }
+
+ uint32_t WriteBlockIndex = 0;
+ while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
+ }
+
+ BasicFile BlockFile;
+ BlockFile.Open(SourceBlockFilePath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+
+ if (CleanSource && (MaxRequiredBlockCount < 2))
+ {
+ std::vector<std::pair<size_t, BlockStoreLocation>> Chunks;
+ Chunks.reserve(ChunkCount);
+ for (size_t Index = 0; Index < ChunkCount; ++Index)
+ {
+ const BlockStoreLocation& ChunkLocation = ChunkLocations[Index];
+ Chunks.push_back({Index, {.BlockIndex = WriteBlockIndex, .Offset = ChunkLocation.Offset, .Size = ChunkLocation.Size}});
+ }
+ std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex);
+ CreateDirectories(BlockPath.parent_path());
+ BlockFile.Close();
+ std::filesystem::rename(SourceBlockFilePath, BlockPath);
+ Callback(Chunks);
+ return true;
+ }
+
+ std::vector<size_t> ChunkIndexes;
+ ChunkIndexes.reserve(ChunkCount);
+ for (size_t Index = 0; Index < ChunkCount; ++Index)
+ {
+ ChunkIndexes.push_back(Index);
+ }
+
+ std::sort(begin(ChunkIndexes), end(ChunkIndexes), [&ChunkLocations](size_t Lhs, size_t Rhs) {
+ const BlockStoreLocation& LhsLocation = ChunkLocations[Lhs];
+ const BlockStoreLocation& RhsLocation = ChunkLocations[Rhs];
+ return LhsLocation.Offset < RhsLocation.Offset;
+ });
+
+ uint64_t BlockSize = 0;
+ uint64_t BlockOffset = 0;
+ std::vector<BlockStoreLocation> NewLocations;
+ struct BlockData
+ {
+ std::vector<std::pair<size_t, BlockStoreLocation>> Chunks;
+ uint64_t BlockOffset;
+ uint64_t BlockSize;
+ uint32_t BlockIndex;
+ };
+
+ std::vector<BlockData> BlockRanges;
+ std::vector<std::pair<size_t, BlockStoreLocation>> Chunks;
+ BlockRanges.reserve(MaxRequiredBlockCount);
+ for (const size_t& ChunkIndex : ChunkIndexes)
+ {
+ const BlockStoreLocation& LegacyChunkLocation = ChunkLocations[ChunkIndex];
+
+ uint64_t ChunkOffset = LegacyChunkLocation.Offset;
+ uint64_t ChunkSize = LegacyChunkLocation.Size;
+ uint64_t ChunkEnd = ChunkOffset + ChunkSize;
+
+ if (BlockSize == 0)
+ {
+ BlockOffset = ChunkOffset;
+ }
+ if ((ChunkEnd - BlockOffset) > MaxBlockSize)
+ {
+ BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
+ BlockRange.Chunks.swap(Chunks);
+ BlockRanges.push_back(BlockRange);
+
+ WriteBlockIndex++;
+ while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
+ }
+ BlockOffset = ChunkOffset;
+ BlockSize = 0;
+ }
+ BlockSize = RoundUp(BlockSize, PayloadAlignment);
+ BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
+ Chunks.push_back({ChunkIndex, ChunkLocation});
+ BlockSize = ChunkEnd - BlockOffset;
+ }
+ if (BlockSize > 0)
+ {
+ BlockRanges.push_back(
+ {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
+ }
+
+ Stopwatch WriteBlockTimer;
+
+ std::reverse(BlockRanges.begin(), BlockRanges.end());
+ std::vector<std::uint8_t> Buffer(1 << 28);
+ for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
+ {
+ const BlockData& BlockRange = BlockRanges[Idx];
+ if (Idx > 0)
+ {
+ uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
+ uint64_t Completed = BlockOffset + BlockSize - Remaining;
+ uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
+
+ ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
+ BlocksBasePath,
+ Idx,
+ BlockRanges.size(),
+ NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
+ NiceBytes(BlockOffset + BlockSize),
+ NiceTimeSpanMs(ETA));
+ }
+
+ std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, BlockRange.BlockIndex);
+ BlockStoreFile ChunkBlock(BlockPath);
+ ChunkBlock.Create(BlockRange.BlockSize);
+ uint64_t Offset = 0;
+ while (Offset < BlockRange.BlockSize)
+ {
+ uint64_t Size = BlockRange.BlockSize - Offset;
+ if (Size > Buffer.size())
+ {
+ Size = Buffer.size();
+ }
+ BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
+ ChunkBlock.Write(Buffer.data(), Size, Offset);
+ Offset += Size;
+ }
+ ChunkBlock.Truncate(Offset);
+ ChunkBlock.Flush();
+
+ Callback(BlockRange.Chunks);
+
+ if (CleanSource)
+ {
+ BlockFile.SetFileSize(BlockRange.BlockOffset);
+ }
+ }
+ BlockFile.Close();
+
+ return true;
+}
+
const char*
BlockStore::GetBlockFileExtension()
{
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index a79928fba..8d90ba186 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -493,7 +493,6 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
m_PayloadAlignment,
false,
[this, &DeletedChunks, &ChunkIndexToChunkHash, &LocationMap, &ReadBlockTimeUs, &ReadBlockLongestTimeUs](
- uint32_t,
const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks,
const std::vector<size_t>& RemovedChunks) {
std::vector<CasDiskIndexEntry> LogEntries;
@@ -753,32 +752,13 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
NiceBytes(TotalSize));
});
- uint32_t WriteBlockIndex = 0;
- while (std::filesystem::exists(BlockStore::GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
+ uint64_t BlockFileSize = 0;
{
- ++WriteBlockIndex;
+ BasicFile BlockFile;
+ BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+ BlockFileSize = BlockFile.FileSize();
}
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
- if (Error)
- {
- ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message());
- return 0;
- }
-
- if (Space.Free < m_MaxBlockSize)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- m_MaxBlockSize,
- NiceBytes(Space.Free));
- return 0;
- }
-
- BasicFile BlockFile;
- BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
-
std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
uint64_t InvalidEntryCount = 0;
@@ -814,7 +794,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
0);
std::vector<IoHash> BadEntries;
- uint64_t BlockFileSize = BlockFile.FileSize();
for (const auto& Entry : LegacyDiskIndex)
{
const LegacyCasDiskIndexEntry& Record(Entry.second);
@@ -840,7 +819,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
if (LegacyDiskIndex.empty())
{
- BlockFile.Close();
LegacyCasLog.Close();
if (CleanSource)
{
@@ -859,219 +837,75 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
return 0;
}
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskIndexEntry& Record(Entry.second);
- TotalSize += Record.Location.GetSize();
- }
-
- uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size());
- uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize;
- if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- MaxRequiredBlockCount,
- BlockStoreDiskLocation::MaxBlockIndex);
- return 0;
- }
-
- constexpr const uint64_t DiskReserve = 1ul << 28;
-
- if (CleanSource)
- {
- if (Space.Free < (m_MaxBlockSize + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceBytes(m_MaxBlockSize + DiskReserve),
- NiceBytes(Space.Free));
- return 0;
- }
- }
- else
- {
- if (Space.Free < (RequiredDiskSpace + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- m_Config.RootDirectory / m_ContainerBaseName,
- NiceBytes(RequiredDiskSpace + DiskReserve),
- NiceBytes(Space.Free));
- return 0;
- }
- }
-
std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
CreateDirectories(LogPath.parent_path());
TCasLogFile<CasDiskIndexEntry> CasLog;
CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
- if (CleanSource && (MaxRequiredBlockCount < 2))
+ std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ ChunkIndexToChunkHash.reserve(LegacyDiskIndex.size());
+ ChunkLocations.reserve(LegacyDiskIndex.size());
+ for (const auto& Entry : LegacyDiskIndex)
{
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(LegacyDiskIndex.size());
-
- // We can use the block as is, just move it and add the blocks to our new log
- for (auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskIndexEntry& Record(Entry.second);
-
- BlockStoreLocation NewChunkLocation{WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()};
- BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment);
- LogEntries.push_back(
- {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags});
- }
- std::filesystem::path BlockPath = BlockStore::GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
- CreateDirectories(BlockPath.parent_path());
- BlockFile.Close();
- std::filesystem::rename(LegacyDataPath, BlockPath);
- CasLog.Append(LogEntries);
- for (const CasDiskIndexEntry& Entry : LogEntries)
- {
- m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
- }
-
- MigratedChunkCount += LogEntries.size();
- MigratedBlockCount++;
+ const LegacyCasDiskLocation& Location = Entry.second.Location;
+ const IoHash& ChunkHash = Entry.first;
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.GetOffset(), .Size = Location.GetSize()});
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ TotalSize += Location.GetSize();
}
- else
- {
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(LegacyDiskIndex.size());
- for (const auto& Entry : LegacyDiskIndex)
- {
- ChunkHashes.push_back(Entry.first);
- }
-
- std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
- auto LhsKeyIt = LegacyDiskIndex.find(Lhs);
- auto RhsKeyIt = LegacyDiskIndex.find(Rhs);
- return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset();
- });
-
- uint64_t BlockSize = 0;
- uint64_t BlockOffset = 0;
- std::vector<BlockStoreLocation> NewLocations;
- struct BlockData
- {
- std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
- uint64_t BlockOffset;
- uint64_t BlockSize;
- uint32_t BlockIndex;
- };
-
- std::vector<BlockData> BlockRanges;
- std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
- BlockRanges.reserve(MaxRequiredBlockCount);
- for (const IoHash& ChunkHash : ChunkHashes)
- {
- const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash];
- const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location;
-
- uint64_t ChunkOffset = LegacyChunkLocation.GetOffset();
- uint64_t ChunkSize = LegacyChunkLocation.GetSize();
- uint64_t ChunkEnd = ChunkOffset + ChunkSize;
-
- if (BlockSize == 0)
- {
- BlockOffset = ChunkOffset;
- }
- if ((ChunkEnd - BlockOffset) > m_MaxBlockSize)
- {
- BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
- BlockRange.Chunks.swap(Chunks);
- BlockRanges.push_back(BlockRange);
-
- WriteBlockIndex++;
- while (std::filesystem::exists(BlockStore::GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
- {
- ++WriteBlockIndex;
- }
- BlockOffset = ChunkOffset;
- BlockSize = 0;
- }
- BlockSize = RoundUp(BlockSize, m_PayloadAlignment);
- BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
- Chunks.push_back({ChunkHash, ChunkLocation});
- BlockSize = ChunkEnd - BlockOffset;
- }
- if (BlockSize > 0)
- {
- BlockRanges.push_back(
- {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
- }
- Stopwatch WriteBlockTimer;
-
- std::reverse(BlockRanges.begin(), BlockRanges.end());
- std::vector<std::uint8_t> Buffer(1 << 28);
- for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
- {
- const BlockData& BlockRange = BlockRanges[Idx];
- if (Idx > 0)
- {
- uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
- uint64_t Completed = BlockOffset + BlockSize - Remaining;
- uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
-
- ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- Idx,
- BlockRanges.size(),
- NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
- NiceBytes(BlockOffset + BlockSize),
- NiceTimeSpanMs(ETA));
- }
-
- std::filesystem::path BlockPath = BlockStore::GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex);
- BlockStoreFile ChunkBlock(BlockPath);
- ChunkBlock.Create(BlockRange.BlockSize);
- uint64_t Offset = 0;
- while (Offset < BlockRange.BlockSize)
- {
- uint64_t Size = BlockRange.BlockSize - Offset;
- if (Size > Buffer.size())
- {
- Size = Buffer.size();
- }
- BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
- ChunkBlock.Write(Buffer.data(), Size, Offset);
- Offset += Size;
- }
- ChunkBlock.Truncate(Offset);
- ChunkBlock.Flush();
-
+ m_BlockStore.Split(
+ ChunkLocations,
+ LegacyDataPath,
+ m_BlocksBasePath,
+ m_MaxBlockSize,
+ BlockStoreDiskLocation::MaxBlockIndex + 1,
+ m_PayloadAlignment,
+ CleanSource,
+ [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
+ const std::vector<std::pair<size_t, BlockStoreLocation>>& MovedChunks) {
std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(BlockRange.Chunks.size());
- for (const auto& Entry : BlockRange.Chunks)
+ LogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first];
- BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment);
- LogEntries.push_back(
- {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags});
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ LogEntries.push_back({.Key = ChunkHash,
+ .Location = {NewLocation, m_PayloadAlignment},
+ .ContentType = OldEntry.ContentType,
+ .Flags = OldEntry.Flags});
}
- CasLog.Append(LogEntries);
for (const CasDiskIndexEntry& Entry : LogEntries)
{
m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
}
- MigratedChunkCount += LogEntries.size();
- MigratedBlockCount++;
-
+ CasLog.Append(LogEntries);
+ CasLog.Flush();
if (CleanSource)
{
std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries;
- LegacyLogEntries.reserve(BlockRange.Chunks.size());
- for (const auto& Entry : BlockRange.Chunks)
+ LegacyLogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone});
+ size_t ChunkIndex = Entry.first;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ LegacyLogEntries.push_back(
+ LegacyCasDiskIndexEntry{.Key = ChunkHash,
+ .Location = OldEntry.Location,
+ .ContentType = OldEntry.ContentType,
+ .Flags = (uint8_t)(OldEntry.Flags | LegacyCasDiskIndexEntry::kTombstone)});
}
LegacyCasLog.Append(LegacyLogEntries);
- BlockFile.SetFileSize(BlockRange.BlockOffset);
+ LegacyCasLog.Flush();
}
- }
- }
+ MigratedBlockCount++;
+ MigratedChunkCount += MovedChunks.size();
+ });
- BlockFile.Close();
LegacyCasLog.Close();
CasLog.Close();
diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h
index 21c02d389..0cef7600f 100644
--- a/zenstore/include/zenstore/blockstore.h
+++ b/zenstore/include/zenstore/blockstore.h
@@ -117,14 +117,14 @@ public:
std::unordered_set<uint32_t> ExcludeBlockIndexes;
size_t BlockCount;
};
- typedef std::function<void(uint32_t BlockIndex,
- const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks,
- const std::vector<size_t>& RemovedChunks)>
+ typedef std::function<void(const std::unordered_map<size_t, BlockStoreLocation>& MovedChunks, const std::vector<size_t>& RemovedChunks)>
ReclaimCallback;
typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback;
typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback;
typedef std::function<void(size_t ChunkIndex, BasicFile& BlockFile, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback;
+ typedef std::function<void(const std::vector<std::pair<size_t, BlockStoreLocation>>& MovedChunks)> SplitCallback;
+
void Initialize(const std::filesystem::path& BlocksBasePath,
uint64_t MaxBlockSize,
uint64_t MaxBlockCount,
@@ -144,11 +144,18 @@ public:
const std::vector<size_t>& KeepChunkIndexes,
uint64_t PayloadAlignment,
bool DryRun,
- const ReclaimCallback& Callback = [](uint32_t, const std::unordered_map<size_t, BlockStoreLocation>&, const std::vector<size_t>&) {
- });
- void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
- IterateChunksSmallSizeCallback SmallSizeCallback,
- IterateChunksLargeSizeCallback LargeSizeCallback);
+ const ReclaimCallback& Callback = [](const std::unordered_map<size_t, BlockStoreLocation>&, const std::vector<size_t>&) {});
+ void IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
+ IterateChunksSmallSizeCallback SmallSizeCallback,
+ IterateChunksLargeSizeCallback LargeSizeCallback);
+ static bool Split(const std::vector<BlockStoreLocation>& ChunkLocations,
+ const std::filesystem::path& SourceBlockFilePath,
+ const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ size_t PayloadAlignment,
+ bool CleanSource,
+ const SplitCallback& Callback);
static const char* GetBlockFileExtension();
static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex);