aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-04 14:16:27 +0200
committerDan Engelbrecht <[email protected]>2022-04-04 16:57:34 +0200
commit5967b39ae928539346990009638dee1af5ab15fa (patch)
tree6018423b2cb2290bdb47cd85104a7f460b47cb26 /zenstore/compactcas.cpp
parentremove GetTempLogPath (diff)
downloadzen-5967b39ae928539346990009638dee1af5ab15fa.tar.xz
zen-5967b39ae928539346990009638dee1af5ab15fa.zip
Make index/log reading non-static member functions
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp954
1 files changed, 469 insertions, 485 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 8078bc56f..174f59ac5 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -166,49 +166,6 @@ namespace {
uint8_t Flags = 0;
};
- std::vector<CasDiskIndexEntry> ReadIndexFile(const std::filesystem::path& RootDirectory,
- const std::string& ContainerBaseName,
- uint64_t& InOutPayloadAlignment,
- uint64_t& OutLogPosition)
- {
- std::vector<CasDiskIndexEntry> Entries;
- std::filesystem::path SidxPath = GetIndexPath(RootDirectory, ContainerBaseName);
- if (std::filesystem::is_regular_file(SidxPath))
- {
- Stopwatch Timer;
- const auto _ = MakeGuard([RootDirectory, ContainerBaseName, &Entries, &Timer] {
- ZEN_INFO("read store '{}' index containing #{} entries in {}",
- RootDirectory / ContainerBaseName,
- Entries.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(SidxPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CasDiskIndexHeader))
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
- CasDiskIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) &&
- (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
- (Header.EntryCount <= ExpectedEntryCount))
- {
- Entries.resize(Header.EntryCount);
- ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
- InOutPayloadAlignment = Header.PayloadAlignment;
- OutLogPosition = Header.LogPosition;
- }
- else
- {
- ZEN_WARN("skipping invalid index file '{}'", SidxPath);
- }
- }
- }
- return Entries;
- }
-
bool ValidateLegacyEntry(const LegacyCasDiskIndexEntry& Entry, std::string& OutReason)
{
if (Entry.Key == IoHash::Zero)
@@ -271,377 +228,6 @@ namespace {
return true;
}
- std::vector<CasDiskIndexEntry> ReadLog(const std::filesystem::path& RootDirectory,
- const std::string& ContainerBaseName,
- uint64_t SkipEntryCount)
- {
- std::vector<CasDiskIndexEntry> Entries;
- std::filesystem::path SlogPath = GetLogPath(RootDirectory, ContainerBaseName);
- if (std::filesystem::is_regular_file(SlogPath))
- {
- Stopwatch Timer;
- const auto _ = MakeGuard([RootDirectory, ContainerBaseName, &Entries, &Timer] {
- ZEN_INFO("read store '{}' log containing #{} entries in {}",
- RootDirectory / ContainerBaseName,
- Entries.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- TCasLogFile<CasDiskIndexEntry> CasLog;
- CasLog.Open(SlogPath, CasLogFile::Mode::kRead);
- if (CasLog.Initialize())
- {
- uint64_t EntryCount = CasLog.GetLogCount();
- if (EntryCount < SkipEntryCount)
- {
- ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", SlogPath);
- SkipEntryCount = 0;
- }
- Entries.reserve(EntryCount - SkipEntryCount);
- CasLog.Replay([&](const CasDiskIndexEntry& Record) { Entries.push_back(Record); }, SkipEntryCount);
- }
- }
- return Entries;
- }
-
- std::vector<CasDiskIndexEntry> MigrateLegacyData(const std::filesystem::path& RootPath,
- const std::string& ContainerBaseName,
- uint64_t MaxBlockSize,
- uint64_t PayloadAlignment,
- bool CleanSource,
- const std::unordered_set<IoHash, IoHash::Hasher>& ExistingChunks)
- {
- ZEN_INFO("migrating store '{}'", RootPath / ContainerBaseName);
-
- std::filesystem::path BlocksBasePath = GetBlocksBasePath(RootPath, ContainerBaseName);
- std::filesystem::path LegacyLogPath = GetLegacyLogPath(RootPath, ContainerBaseName);
- std::filesystem::path LegacySobsPath = GetLegacyUcasPath(RootPath, ContainerBaseName);
- std::filesystem::path LegacySidxPath = GetLegacyUidxPath(RootPath, ContainerBaseName);
-
- uint64_t MigratedChunkCount = 0;
- uint32_t MigratedBlockCount = 0;
- Stopwatch MigrationTimer;
- uint64_t TotalSize = 0;
- const auto _ = MakeGuard([RootPath, ContainerBaseName, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] {
- ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
- RootPath / ContainerBaseName,
- MigratedChunkCount,
- MigratedBlockCount,
- NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
- NiceBytes(TotalSize));
- });
-
- std::vector<CasDiskIndexEntry> Result;
-
- uint32_t WriteBlockIndex = 0;
- while (std::filesystem::exists(GetBlockPath(BlocksBasePath, WriteBlockIndex)))
- {
- ++WriteBlockIndex;
- }
-
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(RootPath, Error);
- if (Error)
- {
- ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", ContainerBaseName, Error.message());
- return Result;
- }
-
- if (Space.Free < MaxBlockSize)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
- RootPath / ContainerBaseName,
- MaxBlockSize,
- NiceBytes(Space.Free));
- return Result;
- }
-
- BasicFile BlockFile;
- BlockFile.Open(LegacySobsPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
-
- std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
-
- TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
- LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
- {
- Stopwatch Timer;
- const auto __ = MakeGuard([RootPath, ContainerBaseName, &LegacyDiskIndex, &Timer] {
- ZEN_INFO("read store '{}' legacy index containing #{} entries in {}",
- RootPath / ContainerBaseName,
- LegacyDiskIndex.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
- if (LegacyCasLog.Initialize())
- {
- LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
- LegacyCasLog.Replay(
- [&](const LegacyCasDiskIndexEntry& Record) {
- std::string InvalidEntryReason;
- if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone)
- {
- LegacyDiskIndex.erase(Record.Key);
- return;
- }
- if (!ValidateLegacyEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
- return;
- }
- if (ExistingChunks.contains(Record.Key))
- {
- return;
- }
- LegacyDiskIndex[Record.Key] = Record;
- },
- 0);
- }
- }
-
- if (LegacyDiskIndex.empty())
- {
- BlockFile.Close();
- LegacyCasLog.Close();
- if (CleanSource)
- {
- // Older versions of CasContainerStrategy expects the legacy files to exist if it can find
- // a CAS manifest and crashes on startup if they don't.
- // In order to not break startup when switching back an older version, lets just reset
- // the legacy data files to zero length.
-
- BasicFile LegacyLog;
- LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySobs;
- LegacySobs.Open(LegacySobsPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySidx;
- LegacySidx.Open(LegacySidxPath, BasicFile::Mode::kTruncate);
- }
- return Result;
- }
-
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskIndexEntry& Record(Entry.second);
- TotalSize += Record.Location.GetSize();
- }
-
- uint64_t RequiredDiskSpace = TotalSize + ((PayloadAlignment - 1) * LegacyDiskIndex.size());
- uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize;
- if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
- RootPath / ContainerBaseName,
- MaxRequiredBlockCount,
- BlockStoreDiskLocation::MaxBlockIndex);
- return Result;
- }
-
- constexpr const uint64_t DiskReserve = 1ul << 28;
-
- if (CleanSource)
- {
- if (Space.Free < (MaxBlockSize + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- RootPath / ContainerBaseName,
- NiceBytes(MaxBlockSize + DiskReserve),
- NiceBytes(Space.Free));
- return Result;
- }
- }
- else
- {
- if (Space.Free < (RequiredDiskSpace + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- RootPath / ContainerBaseName,
- NiceBytes(RequiredDiskSpace + DiskReserve),
- NiceBytes(Space.Free));
- return Result;
- }
- }
-
- std::filesystem::path SlogPath = GetLogPath(RootPath, ContainerBaseName);
- CreateDirectories(SlogPath.parent_path());
- TCasLogFile<CasDiskIndexEntry> CasLog;
- CasLog.Open(SlogPath, CasLogFile::Mode::kWrite);
-
- if (CleanSource && (MaxRequiredBlockCount < 2))
- {
- Result.reserve(LegacyDiskIndex.size());
-
- // We can use the block as is, just move it and add the blocks to our new log
- for (auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskIndexEntry& Record(Entry.second);
-
- BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize());
- BlockStoreDiskLocation NewLocation(NewChunkLocation, PayloadAlignment);
- Result.push_back(
- {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags});
- }
- std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, WriteBlockIndex);
- CreateDirectories(BlockPath.parent_path());
- BlockFile.Close();
- std::filesystem::rename(LegacySobsPath, BlockPath);
- CasLog.Append(Result);
- MigratedChunkCount += Result.size();
- MigratedBlockCount++;
- }
- else
- {
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(LegacyDiskIndex.size());
- for (const auto& Entry : LegacyDiskIndex)
- {
- ChunkHashes.push_back(Entry.first);
- }
-
- std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
- auto LhsKeyIt = LegacyDiskIndex.find(Lhs);
- auto RhsKeyIt = LegacyDiskIndex.find(Rhs);
- return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset();
- });
-
- uint64_t BlockSize = 0;
- uint64_t BlockOffset = 0;
- std::vector<BlockStoreLocation> NewLocations;
- struct BlockData
- {
- std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
- uint64_t BlockOffset;
- uint64_t BlockSize;
- uint32_t BlockIndex;
- };
-
- std::vector<BlockData> BlockRanges;
- std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
- BlockRanges.reserve(MaxRequiredBlockCount);
- for (const IoHash& ChunkHash : ChunkHashes)
- {
- const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash];
- const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location;
-
- uint64_t ChunkOffset = LegacyChunkLocation.GetOffset();
- uint64_t ChunkSize = LegacyChunkLocation.GetSize();
- uint64_t ChunkEnd = ChunkOffset + ChunkSize;
-
- if (BlockSize == 0)
- {
- BlockOffset = ChunkOffset;
- }
- if ((ChunkEnd - BlockOffset) > MaxBlockSize)
- {
- BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
- BlockRange.Chunks.swap(Chunks);
- BlockRanges.push_back(BlockRange);
-
- WriteBlockIndex++;
- while (std::filesystem::exists(GetBlockPath(BlocksBasePath, WriteBlockIndex)))
- {
- ++WriteBlockIndex;
- }
- BlockOffset = ChunkOffset;
- BlockSize = 0;
- }
- BlockSize = RoundUp(BlockSize, PayloadAlignment);
- BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
- Chunks.push_back({ChunkHash, ChunkLocation});
- BlockSize = ChunkEnd - BlockOffset;
- }
- if (BlockSize > 0)
- {
- BlockRanges.push_back(
- {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
- }
- Stopwatch WriteBlockTimer;
-
- std::reverse(BlockRanges.begin(), BlockRanges.end());
- std::vector<std::uint8_t> Buffer(1 << 28);
- for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
- {
- const BlockData& BlockRange = BlockRanges[Idx];
- if (Idx > 0)
- {
- uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
- uint64_t Completed = BlockOffset + BlockSize - Remaining;
- uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
-
- ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
- RootPath / ContainerBaseName,
- Idx,
- BlockRanges.size(),
- NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
- NiceBytes(BlockOffset + BlockSize),
- NiceTimeSpanMs(ETA));
- }
-
- std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, BlockRange.BlockIndex);
- BlockStoreFile ChunkBlock(BlockPath);
- ChunkBlock.Create(BlockRange.BlockSize);
- uint64_t Offset = 0;
- while (Offset < BlockRange.BlockSize)
- {
- uint64_t Size = BlockRange.BlockSize - Offset;
- if (Size > Buffer.size())
- {
- Size = Buffer.size();
- }
- BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
- ChunkBlock.Write(Buffer.data(), Size, Offset);
- Offset += Size;
- }
- ChunkBlock.Truncate(Offset);
- ChunkBlock.Flush();
-
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(BlockRange.Chunks.size());
- for (const auto& Entry : BlockRange.Chunks)
- {
- const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first];
- BlockStoreDiskLocation Location(Entry.second, PayloadAlignment);
- LogEntries.push_back(
- {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags});
- }
- CasLog.Append(LogEntries);
- Result.insert(Result.end(), LogEntries.begin(), LogEntries.end());
- MigratedChunkCount += LogEntries.size();
- MigratedBlockCount++;
-
- if (CleanSource)
- {
- std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries;
- LegacyLogEntries.reserve(BlockRange.Chunks.size());
- for (const auto& Entry : BlockRange.Chunks)
- {
- LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone});
- }
- LegacyCasLog.Append(LegacyLogEntries);
- BlockFile.SetFileSize(BlockRange.BlockOffset);
- }
- }
- }
-
- BlockFile.Close();
- LegacyCasLog.Close();
- CasLog.Close();
-
- if (CleanSource)
- {
- // Older versions of CasContainerStrategy expects the legacy files to exist if it can find
- // a CAS manifest and crashes on startup if they don't.
- // In order to not break startup when switching back an older version, lets just reset
- // the legacy data files to zero length.
-
- BasicFile LegacyLog;
- LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySobs;
- LegacySobs.Open(LegacySobsPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySidx;
- LegacySidx.Open(LegacySidxPath, BasicFile::Mode::kTruncate);
- }
- return Result;
- }
-
} // namespace
//////////////////////////////////////////////////////////////////////////
@@ -1399,84 +985,483 @@ CasContainerStrategy::MakeIndexSnapshot()
}
}
-void
-CasContainerStrategy::OpenContainer(bool IsNewStore)
+uint64_t
+CasContainerStrategy::ReadIndexFile()
{
- // Add .running file and delete on clean on close to detect bad termination
- m_TotalSize = 0;
+ std::vector<CasDiskIndexEntry> Entries;
+ std::filesystem::path SidxPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+ if (std::filesystem::is_regular_file(SidxPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([this, &Entries, &Timer] {
+ ZEN_INFO("read store '{}' index containing #{} entries in {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
- m_LocationMap.clear();
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(SidxPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CasDiskIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
+ CasDiskIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) &&
+ (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
+ (Header.EntryCount <= ExpectedEntryCount))
+ {
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
+ m_PayloadAlignment = Header.PayloadAlignment;
+
+ std::string InvalidEntryReason;
+ for (const CasDiskIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'",
+ GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName),
+ InvalidEntryReason);
+ continue;
+ }
+ m_LocationMap[Entry.Key] = Entry.Location;
+ }
- std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName);
+ return Header.LogPosition;
+ }
+ else
+ {
+ ZEN_WARN("skipping invalid index file '{}'", SidxPath);
+ }
+ }
+ }
+ return 0;
+}
+
+uint64_t
+CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
+{
+ std::vector<CasDiskIndexEntry> Entries;
+ std::filesystem::path SlogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ if (std::filesystem::is_regular_file(SlogPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([this, &Entries, &Timer] {
+ ZEN_INFO("read store '{}' log containing #{} entries in {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ TCasLogFile<CasDiskIndexEntry> CasLog;
+ CasLog.Open(SlogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
+ {
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", SlogPath);
+ SkipEntryCount = 0;
+ }
+ uint64_t ReadCount = EntryCount - SkipEntryCount;
+ Entries.reserve(ReadCount);
+ CasLog.Replay(
+ [&](const CasDiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Flags & CasDiskIndexEntry::kTombstone)
+ {
+ m_LocationMap.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'",
+ GetLogPath(m_Config.RootDirectory, m_ContainerBaseName),
+ InvalidEntryReason);
+ return;
+ }
+ m_LocationMap[Record.Key] = Record.Location;
+ },
+ SkipEntryCount);
+ return ReadCount;
+ }
+ }
+ return 0;
+}
+
+uint64_t
+CasContainerStrategy::MigrateLegacyData(bool CleanSource)
+{
std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName);
- bool MakeSnapshot = IsNewStore;
+ if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0)
+ {
+ return 0;
+ }
- if (IsNewStore)
+ ZEN_INFO("migrating store '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+
+ std::filesystem::path BlocksBasePath = GetBlocksBasePath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LegacySobsPath = GetLegacyUcasPath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LegacySidxPath = GetLegacyUidxPath(m_Config.RootDirectory, m_ContainerBaseName);
+
+ uint64_t MigratedChunkCount = 0;
+ uint32_t MigratedBlockCount = 0;
+ Stopwatch MigrationTimer;
+ uint64_t TotalSize = 0;
+ const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] {
+ ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ MigratedChunkCount,
+ MigratedBlockCount,
+ NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
+ NiceBytes(TotalSize));
+ });
+
+ uint32_t WriteBlockIndex = 0;
+ while (std::filesystem::exists(GetBlockPath(BlocksBasePath, WriteBlockIndex)))
{
- std::filesystem::path LegacySobsPath = GetLegacyUcasPath(m_Config.RootDirectory, m_ContainerBaseName);
- std::filesystem::remove(LegacyLogPath);
- std::filesystem::remove(LegacySobsPath);
- std::filesystem::remove_all(BasePath);
+ ++WriteBlockIndex;
+ }
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message());
+ return 0;
}
- uint64_t LogPosition = 0;
+ if (Space.Free < m_MaxBlockSize)
{
- std::vector<CasDiskIndexEntry> IndexEntries =
- ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment, LogPosition);
- std::string InvalidEntryReason;
- for (const CasDiskIndexEntry& Entry : IndexEntries)
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free));
+ return 0;
+ }
+
+ BasicFile BlockFile;
+ BlockFile.Open(LegacySobsPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+
+ std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
+
+ TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
+ LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
+ {
+ Stopwatch Timer;
+ const auto __ = MakeGuard([this, &LegacyDiskIndex, &Timer] {
+ ZEN_INFO("read store '{}' legacy index containing #{} entries in {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ LegacyDiskIndex.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ if (LegacyCasLog.Initialize())
{
- if (!ValidateEntry(Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'",
- GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName),
- InvalidEntryReason);
- continue;
- }
- m_LocationMap[Entry.Key] = Entry.Location;
+ LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
+ LegacyCasLog.Replay(
+ [&](const LegacyCasDiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone)
+ {
+ LegacyDiskIndex.erase(Record.Key);
+ return;
+ }
+ if (!ValidateLegacyEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
+ return;
+ }
+ if (m_LocationMap.contains(Record.Key))
+ {
+ return;
+ }
+ LegacyDiskIndex[Record.Key] = Record;
+ },
+ 0);
+ }
+ }
+
+ if (LegacyDiskIndex.empty())
+ {
+ BlockFile.Close();
+ LegacyCasLog.Close();
+ if (CleanSource)
+ {
+ // Older versions of CasContainerStrategy expects the legacy files to exist if it can find
+ // a CAS manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacySobsPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySidx;
+ LegacySidx.Open(LegacySidxPath, BasicFile::Mode::kTruncate);
}
+ return 0;
+ }
+
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ const LegacyCasDiskIndexEntry& Record(Entry.second);
+ TotalSize += Record.Location.GetSize();
+ }
+
+ uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size());
+ uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize;
+ if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ MaxRequiredBlockCount,
+ BlockStoreDiskLocation::MaxBlockIndex);
+ return 0;
}
+ constexpr const uint64_t DiskReserve = 1ul << 28;
+
+ if (CleanSource)
{
- std::vector<CasDiskIndexEntry> LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName, LogPosition);
- std::string InvalidEntryReason;
+ if (Space.Free < (m_MaxBlockSize + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ NiceBytes(m_MaxBlockSize + DiskReserve),
+ NiceBytes(Space.Free));
+ return 0;
+ }
+ }
+ else
+ {
+ if (Space.Free < (RequiredDiskSpace + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ NiceBytes(RequiredDiskSpace + DiskReserve),
+ NiceBytes(Space.Free));
+ return 0;
+ }
+ }
+
+ std::filesystem::path SlogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ CreateDirectories(SlogPath.parent_path());
+ TCasLogFile<CasDiskIndexEntry> CasLog;
+ CasLog.Open(SlogPath, CasLogFile::Mode::kWrite);
+
+ if (CleanSource && (MaxRequiredBlockCount < 2))
+ {
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(LegacyDiskIndex.size());
+
+ // We can use the block as is, just move it and add the blocks to our new log
+ for (auto& Entry : LegacyDiskIndex)
+ {
+ const LegacyCasDiskIndexEntry& Record(Entry.second);
+
+ BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize());
+ BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment);
+ LogEntries.push_back(
+ {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags});
+ }
+ std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, WriteBlockIndex);
+ CreateDirectories(BlockPath.parent_path());
+ BlockFile.Close();
+ std::filesystem::rename(LegacySobsPath, BlockPath);
+ CasLog.Append(LogEntries);
for (const CasDiskIndexEntry& Entry : LogEntries)
{
- if (Entry.Flags & CasDiskIndexEntry::kTombstone)
+ m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
+ }
+
+ MigratedChunkCount += LogEntries.size();
+ MigratedBlockCount++;
+ }
+ else
+ {
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(LegacyDiskIndex.size());
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ ChunkHashes.push_back(Entry.first);
+ }
+
+ std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
+ auto LhsKeyIt = LegacyDiskIndex.find(Lhs);
+ auto RhsKeyIt = LegacyDiskIndex.find(Rhs);
+ return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset();
+ });
+
+ uint64_t BlockSize = 0;
+ uint64_t BlockOffset = 0;
+ std::vector<BlockStoreLocation> NewLocations;
+ struct BlockData
+ {
+ std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
+ uint64_t BlockOffset;
+ uint64_t BlockSize;
+ uint32_t BlockIndex;
+ };
+
+ std::vector<BlockData> BlockRanges;
+ std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
+ BlockRanges.reserve(MaxRequiredBlockCount);
+ for (const IoHash& ChunkHash : ChunkHashes)
+ {
+ const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location;
+
+ uint64_t ChunkOffset = LegacyChunkLocation.GetOffset();
+ uint64_t ChunkSize = LegacyChunkLocation.GetSize();
+ uint64_t ChunkEnd = ChunkOffset + ChunkSize;
+
+ if (BlockSize == 0)
{
- m_LocationMap.erase(Entry.Key);
- continue;
+ BlockOffset = ChunkOffset;
}
- if (!ValidateEntry(Entry, InvalidEntryReason))
+ if ((ChunkEnd - BlockOffset) > m_MaxBlockSize)
{
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'",
- GetLogPath(m_Config.RootDirectory, m_ContainerBaseName),
- InvalidEntryReason);
- continue;
+ BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
+ BlockRange.Chunks.swap(Chunks);
+ BlockRanges.push_back(BlockRange);
+
+ WriteBlockIndex++;
+ while (std::filesystem::exists(GetBlockPath(BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
+ }
+ BlockOffset = ChunkOffset;
+ BlockSize = 0;
}
- m_LocationMap[Entry.Key] = Entry.Location;
+ BlockSize = RoundUp(BlockSize, m_PayloadAlignment);
+ BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
+ Chunks.push_back({ChunkHash, ChunkLocation});
+ BlockSize = ChunkEnd - BlockOffset;
}
- MakeSnapshot = !LogEntries.empty();
- }
-
- if (std::filesystem::is_regular_file(LegacyLogPath) && std::filesystem::file_size(LegacyLogPath) > 0)
- {
- std::unordered_set<IoHash, IoHash::Hasher> ExistingChunks;
- ExistingChunks.reserve(m_LocationMap.size());
- for (const auto& Entry : m_LocationMap)
+ if (BlockSize > 0)
{
- ExistingChunks.insert(Entry.first);
+ BlockRanges.push_back(
+ {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
}
- std::vector<CasDiskIndexEntry> LegacyEntries =
- MigrateLegacyData(m_Config.RootDirectory, m_ContainerBaseName, m_MaxBlockSize, m_PayloadAlignment, true, ExistingChunks);
- for (const CasDiskIndexEntry& Entry : LegacyEntries)
+ Stopwatch WriteBlockTimer;
+
+ std::reverse(BlockRanges.begin(), BlockRanges.end());
+ std::vector<std::uint8_t> Buffer(1 << 28);
+ for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
{
- m_LocationMap[Entry.Key] = Entry.Location;
+ const BlockData& BlockRange = BlockRanges[Idx];
+ if (Idx > 0)
+ {
+ uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
+ uint64_t Completed = BlockOffset + BlockSize - Remaining;
+ uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
+
+ ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ Idx,
+ BlockRanges.size(),
+ NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
+ NiceBytes(BlockOffset + BlockSize),
+ NiceTimeSpanMs(ETA));
+ }
+
+ std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, BlockRange.BlockIndex);
+ BlockStoreFile ChunkBlock(BlockPath);
+ ChunkBlock.Create(BlockRange.BlockSize);
+ uint64_t Offset = 0;
+ while (Offset < BlockRange.BlockSize)
+ {
+ uint64_t Size = BlockRange.BlockSize - Offset;
+ if (Size > Buffer.size())
+ {
+ Size = Buffer.size();
+ }
+ BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
+ ChunkBlock.Write(Buffer.data(), Size, Offset);
+ Offset += Size;
+ }
+ ChunkBlock.Truncate(Offset);
+ ChunkBlock.Flush();
+
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(BlockRange.Chunks.size());
+ for (const auto& Entry : BlockRange.Chunks)
+ {
+ const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first];
+ BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment);
+ LogEntries.push_back(
+ {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags});
+ }
+ CasLog.Append(LogEntries);
+ for (const CasDiskIndexEntry& Entry : LogEntries)
+ {
+ m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
+ }
+ MigratedChunkCount += LogEntries.size();
+ MigratedBlockCount++;
+
+ if (CleanSource)
+ {
+ std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries;
+ LegacyLogEntries.reserve(BlockRange.Chunks.size());
+ for (const auto& Entry : BlockRange.Chunks)
+ {
+ LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone});
+ }
+ LegacyCasLog.Append(LegacyLogEntries);
+ BlockFile.SetFileSize(BlockRange.BlockOffset);
+ }
}
- MakeSnapshot |= !LegacyEntries.empty();
}
+ BlockFile.Close();
+ LegacyCasLog.Close();
+ CasLog.Close();
+
+ if (CleanSource)
+ {
+ // Older versions of CasContainerStrategy expects the legacy files to exist if it can find
+ // a CAS manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacySobsPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySidx;
+ LegacySidx.Open(LegacySidxPath, BasicFile::Mode::kTruncate);
+ }
+ return MigratedChunkCount;
+}
+
+void
+CasContainerStrategy::OpenContainer(bool IsNewStore)
+{
+ // Add .running file and delete on clean on close to detect bad termination
+ m_TotalSize = 0;
+
+ m_LocationMap.clear();
+
+ std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName);
+
+ if (IsNewStore)
+ {
+ std::filesystem::path LegacySobsPath = GetLegacyUcasPath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+
+ std::filesystem::remove(LegacyLogPath);
+ std::filesystem::remove(LegacySobsPath);
+ std::filesystem::remove_all(BasePath);
+ }
+
+ uint64_t LogPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(LogPosition);
+ uint64_t LegacyLogEntryCount = MigrateLegacyData(true);
+
CreateDirectories(BasePath);
std::filesystem::path SlogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
@@ -1544,7 +1529,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
CreateDirectories(m_BlocksBasePath);
}
- if (MakeSnapshot)
+ if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0))
{
MakeIndexSnapshot();
}
@@ -2473,41 +2458,40 @@ TEST_CASE("compactcas.migrate.large.data" * doctest::skip(true))
std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs");
std::filesystem::remove_all(TobsBasePath);
std::filesystem::remove_all(SobsBasePath);
- uint64_t TobsPayloadAlignment = 16;
- uint64_t TobsBlockSize = 1u << 28;
- std::vector<CasDiskIndexEntry> TobsMigratedChunks =
- MigrateLegacyData(BigDataPath, "tobs", TobsBlockSize, TobsPayloadAlignment, false, {});
- CHECK(TobsMigratedChunks.size() > 0);
-
- uint64_t SobsPayloadAlignment = 4096;
- uint64_t SobsBlockSize = 1u << 30;
-
- std::vector<CasDiskIndexEntry> SobsMigratedChunks =
- MigrateLegacyData(BigDataPath, "sobs", SobsBlockSize, SobsPayloadAlignment, false, {});
- CHECK(SobsMigratedChunks.size() > 0);
CasStoreConfiguration CasConfig;
CasConfig.RootDirectory = BigDataPath;
+ uint64_t TObsSize = 0;
+ {
+ CasGc TobsCasGc;
+ CasContainerStrategy TobsCas(CasConfig, TobsCasGc);
+ TobsCas.Initialize("tobs", 1u << 28, 16, false);
+ TObsSize = TobsCas.StorageSize().DiskSize;
+ CHECK(TObsSize > 0);
+ }
+
+ uint64_t SObsSize = 0;
+ {
+ CasGc SobsCasGc;
+ CasContainerStrategy SobsCas(CasConfig, SobsCasGc);
+ SobsCas.Initialize("sobs", 1u << 30, 4096, false);
+ SObsSize = SobsCas.StorageSize().DiskSize;
+ CHECK(SObsSize > 0);
+ }
CasGc TobsCasGc;
CasContainerStrategy TobsCas(CasConfig, TobsCasGc);
TobsCas.Initialize("tobs", 1u << 28, 16, false);
GcContext TobsGcCtx;
TobsCas.CollectGarbage(TobsGcCtx);
- for (const CasDiskIndexEntry& Entry : TobsMigratedChunks)
- {
- CHECK(TobsCas.HaveChunk(Entry.Key));
- }
+ CHECK(TobsCas.StorageSize().DiskSize == TObsSize);
CasGc SobsCasGc;
CasContainerStrategy SobsCas(CasConfig, SobsCasGc);
SobsCas.Initialize("sobs", 1u << 30, 4096, false);
GcContext SobsGcCtx;
SobsCas.CollectGarbage(SobsGcCtx);
- for (const CasDiskIndexEntry& Entry : SobsMigratedChunks)
- {
- CHECK(SobsCas.HaveChunk(Entry.Key));
- }
+ CHECK(SobsCas.StorageSize().DiskSize == SObsSize);
}
#endif