aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcachestore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-02 10:48:57 +0200
committerDan Engelbrecht <[email protected]>2022-05-02 10:48:57 +0200
commit48f2e3af59e2a06c81e37170db95e432b148e5e8 (patch)
treec91aef099acb9b2f7555c60ad84ae9c2650db3dc /zenserver/cache/structuredcachestore.cpp
parentMove bulk of MigrateLegacyData to blockstore.cpp (diff)
downloadzen-48f2e3af59e2a06c81e37170db95e432b148e5e8.tar.xz
zen-48f2e3af59e2a06c81e37170db95e432b148e5e8.zip
refactor structured cache to use blockstore migrate
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
-rw-r--r--zenserver/cache/structuredcachestore.cpp309
1 files changed, 66 insertions, 243 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 5cebaa948..e9c051f88 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -877,35 +877,17 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
NiceBytes(TotalSize));
});
- uint32_t WriteBlockIndex = 0;
- while (std::filesystem::exists(BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
+ uint64_t BlockFileSize = 0;
{
- ++WriteBlockIndex;
+ BasicFile BlockFile;
+ BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+ BlockFileSize = BlockFile.FileSize();
}
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error);
- if (Error)
- {
- ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message());
- return 0;
- }
-
- if (Space.Free < MaxBlockSize)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
- m_BucketDir / m_BucketName,
- MaxBlockSize,
- NiceBytes(Space.Free));
- return 0;
- }
-
- BasicFile BlockFile;
- BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
-
std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
uint64_t InvalidEntryCount = 0;
+ size_t BlockChunkCount = 0;
TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog;
LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
{
@@ -942,7 +924,6 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
0);
std::vector<IoHash> BadEntries;
- uint64_t BlockFileSize = BlockFile.FileSize();
for (const auto& Entry : LegacyDiskIndex)
{
const LegacyDiskIndexEntry& Record(Entry.second);
@@ -952,6 +933,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
}
if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize)
{
+ BlockChunkCount++;
continue;
}
ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath);
@@ -972,7 +954,6 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
if (LegacyDiskIndex.empty())
{
LegacyCasLog.Close();
- BlockFile.Close();
if (CleanSource)
{
// Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
@@ -988,250 +969,92 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
return 0;
}
- uint64_t BlockChunkCount = 0;
- uint64_t BlockTotalSize = 0;
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyDiskIndexEntry& Record(Entry.second);
- if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
- {
- continue;
- }
- BlockChunkCount++;
- BlockTotalSize += Record.Location.Size();
- }
-
- uint64_t RequiredDiskSpace = BlockTotalSize + ((m_PayloadAlignment - 1) * BlockChunkCount);
- uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize;
- if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
- m_BucketDir / m_BucketName,
- MaxRequiredBlockCount,
- BlockStoreDiskLocation::MaxBlockIndex);
- return 0;
- }
-
- constexpr const uint64_t DiskReserve = 1ul << 28;
-
- if (CleanSource)
- {
- if (Space.Free < (MaxBlockSize + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- m_BucketDir / m_BucketName,
- NiceBytes(MaxBlockSize + DiskReserve),
- NiceBytes(Space.Free));
- return 0;
- }
- }
- else
- {
- if (Space.Free < (RequiredDiskSpace + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- m_BucketDir / m_BucketName,
- NiceBytes(RequiredDiskSpace + DiskReserve),
- NiceBytes(Space.Free));
- return 0;
- }
- }
-
std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
CreateDirectories(LogPath.parent_path());
TCasLogFile<DiskIndexEntry> CasLog;
CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
- if (CleanSource && (MaxRequiredBlockCount < 2))
- {
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(LegacyDiskIndex.size());
+ std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ ChunkIndexToChunkHash.reserve(BlockChunkCount);
+ ChunkLocations.reserve(BlockChunkCount);
- // We can use the block as is, just move it and add the blocks to our new log
- for (auto& Entry : LegacyDiskIndex)
- {
- const LegacyDiskIndexEntry& Record(Entry.second);
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount);
- DiskLocation NewLocation;
- uint8_t Flags = 0xff & (Record.Location.Flags() >> 56);
- if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
- {
- NewLocation = DiskLocation(Record.Location.Size(), Flags);
- }
- else
- {
- BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.Offset(), Record.Location.Size());
- NewLocation = DiskLocation(NewChunkLocation, m_PayloadAlignment, Flags);
- }
- LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation});
- }
- std::filesystem::path BlockPath = BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
- CreateDirectories(BlockPath.parent_path());
- BlockFile.Close();
- std::filesystem::rename(LegacyDataPath, BlockPath);
- CasLog.Append(LogEntries);
- for (const DiskIndexEntry& Entry : LogEntries)
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ const IoHash& ChunkHash = Entry.first;
+ const LegacyDiskLocation& Location = Entry.second.Location;
+ if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
{
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ uint8_t Flags = 0xff & (Location.Flags() >> 56);
+ DiskLocation NewLocation = DiskLocation(Location.Size(), Flags);
+ LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation});
+ continue;
}
-
- MigratedChunkCount += LogEntries.size();
- MigratedBlockCount++;
+ size_t ChunkIndex = ChunkLocations.size();
+ ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()});
+ ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
+ TotalSize += Location.Size();
}
- else
+ for (const DiskIndexEntry& Entry : LogEntries)
{
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(LegacyDiskIndex.size());
- for (const auto& Entry : LegacyDiskIndex)
- {
- ChunkHashes.push_back(Entry.first);
- }
-
- std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
- auto LhsKeyIt = LegacyDiskIndex.find(Lhs);
- auto RhsKeyIt = LegacyDiskIndex.find(Rhs);
- return LhsKeyIt->second.Location.Offset() < RhsKeyIt->second.Location.Offset();
- });
-
- uint64_t BlockSize = 0;
- uint64_t BlockOffset = 0;
- std::vector<BlockStoreLocation> NewLocations;
- struct BlockData
- {
- std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
- uint64_t BlockOffset;
- uint64_t BlockSize;
- uint32_t BlockIndex;
- };
-
- std::vector<BlockData> BlockRanges;
- std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
- BlockRanges.reserve(MaxRequiredBlockCount);
- for (const IoHash& ChunkHash : ChunkHashes)
- {
- const LegacyDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash];
- const LegacyDiskLocation& LegacyChunkLocation = LegacyEntry.Location;
-
- if (LegacyChunkLocation.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
- {
- // For standalone files we just store the chunk hash an use the size from the legacy index as is
- Chunks.push_back({ChunkHash, {}});
- continue;
- }
-
- uint64_t ChunkOffset = LegacyChunkLocation.Offset();
- uint64_t ChunkSize = LegacyChunkLocation.Size();
- uint64_t ChunkEnd = ChunkOffset + ChunkSize;
-
- if (BlockSize == 0)
- {
- BlockOffset = ChunkOffset;
- }
- if ((ChunkEnd - BlockOffset) > MaxBlockSize)
- {
- BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
- BlockRange.Chunks.swap(Chunks);
- BlockRanges.push_back(BlockRange);
-
- WriteBlockIndex++;
- while (std::filesystem::exists(BlockStore ::GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
- {
- ++WriteBlockIndex;
- }
- BlockOffset = ChunkOffset;
- BlockSize = 0;
- }
- BlockSize = RoundUp(BlockSize, m_PayloadAlignment);
- BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
- Chunks.push_back({ChunkHash, ChunkLocation});
- BlockSize = ChunkEnd - BlockOffset;
- }
- if (BlockSize > 0)
- {
- BlockRanges.push_back(
- {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
- }
- Stopwatch WriteBlockTimer;
-
- std::reverse(BlockRanges.begin(), BlockRanges.end());
- std::vector<std::uint8_t> Buffer(1 << 28);
- for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
- {
- const BlockData& BlockRange = BlockRanges[Idx];
- if (Idx > 0)
- {
- uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
- uint64_t Completed = BlockOffset + BlockSize - Remaining;
- uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
-
- ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
- m_BucketDir / m_BucketDir,
- Idx,
- BlockRanges.size(),
- NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
- NiceBytes(BlockOffset + BlockSize),
- NiceTimeSpanMs(ETA));
- }
-
- std::filesystem::path BlockPath = BlockStore ::GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex);
- BlockStoreFile ChunkBlock(BlockPath);
- ChunkBlock.Create(BlockRange.BlockSize);
- uint64_t Offset = 0;
- while (Offset < BlockRange.BlockSize)
- {
- uint64_t Size = BlockRange.BlockSize - Offset;
- if (Size > Buffer.size())
- {
- Size = Buffer.size();
- }
- BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
- ChunkBlock.Write(Buffer.data(), Size, Offset);
- Offset += Size;
- }
- ChunkBlock.Truncate(Offset);
- ChunkBlock.Flush();
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ }
+ CasLog.Append(LogEntries);
+ m_BlockStore.Split(
+ ChunkLocations,
+ LegacyDataPath,
+ m_BlocksBasePath,
+ MaxBlockSize,
+ BlockStoreDiskLocation::MaxBlockIndex + 1,
+ m_PayloadAlignment,
+ CleanSource,
+ [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
+ const std::vector<std::pair<size_t, BlockStoreLocation>>& MovedChunks) {
std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(BlockRange.Chunks.size());
- for (const auto& Entry : BlockRange.Chunks)
+ LogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- const LegacyDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first];
-
- DiskLocation NewLocation;
- uint8_t Flags = 0xff & (LegacyEntry.Location.Flags() >> 56);
- if (LegacyEntry.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
- {
- NewLocation = DiskLocation(LegacyEntry.Location.Size(), Flags);
- }
- else
- {
- NewLocation = DiskLocation(Entry.second, m_PayloadAlignment, Flags);
- }
- LogEntries.push_back({.Key = Entry.first, .Location = NewLocation});
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyDiskLocation& OldLocation = OldEntry.Location;
+ uint8_t Flags = 0xff & (OldLocation.Flags() >> 56);
+ LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)});
}
- CasLog.Append(LogEntries);
for (const DiskIndexEntry& Entry : LogEntries)
{
m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
}
- MigratedChunkCount += LogEntries.size();
- MigratedBlockCount++;
-
+ CasLog.Append(LogEntries);
+ CasLog.Flush();
if (CleanSource)
{
std::vector<LegacyDiskIndexEntry> LegacyLogEntries;
- LegacyLogEntries.reserve(BlockRange.Chunks.size());
- for (const auto& Entry : BlockRange.Chunks)
+ LegacyLogEntries.reserve(MovedChunks.size());
+ for (const auto& Entry : MovedChunks)
{
- LegacyLogEntries.push_back(
- {.Key = Entry.first, .Location = LegacyDiskLocation(0, 0, 0, LegacyDiskLocation::kTombStone)});
+ size_t ChunkIndex = Entry.first;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyDiskLocation& OldLocation = OldEntry.Location;
+ LegacyDiskLocation NewLocation(OldLocation.Offset(),
+ OldLocation.Size(),
+ 0,
+ OldLocation.Flags() | LegacyDiskLocation::kTombStone);
+ LegacyLogEntries.push_back(LegacyDiskIndexEntry(ChunkHash, NewLocation));
}
LegacyCasLog.Append(LegacyLogEntries);
- BlockFile.SetFileSize(BlockRange.BlockOffset);
+ LegacyCasLog.Flush();
}
- }
- }
- BlockFile.Close();
+ MigratedBlockCount++;
+ MigratedChunkCount += MovedChunks.size();
+ });
+
LegacyCasLog.Close();
CasLog.Close();