diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-04 14:16:27 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-04-04 16:57:34 +0200 |
| commit | 5967b39ae928539346990009638dee1af5ab15fa (patch) | |
| tree | 6018423b2cb2290bdb47cd85104a7f460b47cb26 /zenstore/compactcas.cpp | |
| parent | remove GetTempLogPath (diff) | |
| download | zen-5967b39ae928539346990009638dee1af5ab15fa.tar.xz zen-5967b39ae928539346990009638dee1af5ab15fa.zip | |
Make index/log reading non-static member functions
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 954 |
1 files changed, 469 insertions, 485 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 8078bc56f..174f59ac5 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -166,49 +166,6 @@ namespace { uint8_t Flags = 0; }; - std::vector<CasDiskIndexEntry> ReadIndexFile(const std::filesystem::path& RootDirectory, - const std::string& ContainerBaseName, - uint64_t& InOutPayloadAlignment, - uint64_t& OutLogPosition) - { - std::vector<CasDiskIndexEntry> Entries; - std::filesystem::path SidxPath = GetIndexPath(RootDirectory, ContainerBaseName); - if (std::filesystem::is_regular_file(SidxPath)) - { - Stopwatch Timer; - const auto _ = MakeGuard([RootDirectory, ContainerBaseName, &Entries, &Timer] { - ZEN_INFO("read store '{}' index containing #{} entries in {}", - RootDirectory / ContainerBaseName, - Entries.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(SidxPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CasDiskIndexHeader)) - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); - CasDiskIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && - (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && - (Header.EntryCount <= ExpectedEntryCount)) - { - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); - InOutPayloadAlignment = Header.PayloadAlignment; - OutLogPosition = Header.LogPosition; - } - else - { - ZEN_WARN("skipping invalid index file '{}'", SidxPath); - } - } - } - return Entries; - } - bool ValidateLegacyEntry(const LegacyCasDiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) @@ -271,377 +228,6 @@ namespace { return true; } - std::vector<CasDiskIndexEntry> ReadLog(const std::filesystem::path& RootDirectory, - const std::string& ContainerBaseName, - uint64_t SkipEntryCount) - { - std::vector<CasDiskIndexEntry> Entries; - std::filesystem::path SlogPath = GetLogPath(RootDirectory, ContainerBaseName); - if (std::filesystem::is_regular_file(SlogPath)) - { - Stopwatch Timer; - const auto _ = MakeGuard([RootDirectory, ContainerBaseName, &Entries, &Timer] { - ZEN_INFO("read store '{}' log containing #{} entries in {}", - RootDirectory / ContainerBaseName, - Entries.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - TCasLogFile<CasDiskIndexEntry> CasLog; - CasLog.Open(SlogPath, CasLogFile::Mode::kRead); - if (CasLog.Initialize()) - { - uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", SlogPath); - SkipEntryCount = 0; - } - Entries.reserve(EntryCount - SkipEntryCount); - CasLog.Replay([&](const CasDiskIndexEntry& Record) { Entries.push_back(Record); }, SkipEntryCount); - } - } - return Entries; - } - - std::vector<CasDiskIndexEntry> MigrateLegacyData(const std::filesystem::path& RootPath, - const std::string& ContainerBaseName, - uint64_t MaxBlockSize, - uint64_t PayloadAlignment, - bool CleanSource, - const std::unordered_set<IoHash, IoHash::Hasher>& ExistingChunks) - { - ZEN_INFO("migrating store '{}'", RootPath / ContainerBaseName); - - std::filesystem::path BlocksBasePath = GetBlocksBasePath(RootPath, ContainerBaseName); - std::filesystem::path LegacyLogPath = GetLegacyLogPath(RootPath, ContainerBaseName); - std::filesystem::path LegacySobsPath = GetLegacyUcasPath(RootPath, ContainerBaseName); - std::filesystem::path LegacySidxPath = GetLegacyUidxPath(RootPath, ContainerBaseName); - - uint64_t MigratedChunkCount = 0; - uint32_t MigratedBlockCount = 0; - Stopwatch MigrationTimer; - uint64_t TotalSize = 0; - const auto _ = MakeGuard([RootPath, ContainerBaseName, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] { - ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", - RootPath / ContainerBaseName, - MigratedChunkCount, - MigratedBlockCount, - NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), - NiceBytes(TotalSize)); - }); - - std::vector<CasDiskIndexEntry> Result; - - uint32_t WriteBlockIndex = 0; - while (std::filesystem::exists(GetBlockPath(BlocksBasePath, WriteBlockIndex))) - { - ++WriteBlockIndex; - } - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(RootPath, Error); - if (Error) - { - ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", ContainerBaseName, Error.message()); - return Result; - } - - if (Space.Free < MaxBlockSize) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", - RootPath / ContainerBaseName, - MaxBlockSize, - NiceBytes(Space.Free)); - return Result; - } - - BasicFile BlockFile; - BlockFile.Open(LegacySobsPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - - std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; - - TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; - LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); - { - Stopwatch Timer; - const auto __ = MakeGuard([RootPath, ContainerBaseName, &LegacyDiskIndex, &Timer] { - ZEN_INFO("read store '{}' legacy index containing #{} entries in {}", - RootPath / ContainerBaseName, - LegacyDiskIndex.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - if (LegacyCasLog.Initialize()) - { - LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); - LegacyCasLog.Replay( - [&](const LegacyCasDiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone) - { - LegacyDiskIndex.erase(Record.Key); - return; - } - if (!ValidateLegacyEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); - return; - } - if (ExistingChunks.contains(Record.Key)) - { - return; - } - LegacyDiskIndex[Record.Key] = Record; - }, - 0); - } - } - - if (LegacyDiskIndex.empty()) - { - BlockFile.Close(); - LegacyCasLog.Close(); - if (CleanSource) - { - // Older versions of CasContainerStrategy expects the legacy files to exist if it can find - // a CAS manifest and crashes on startup if they don't. - // In order to not break startup when switching back an older version, lets just reset - // the legacy data files to zero length. - - BasicFile LegacyLog; - LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); - BasicFile LegacySobs; - LegacySobs.Open(LegacySobsPath, BasicFile::Mode::kTruncate); - BasicFile LegacySidx; - LegacySidx.Open(LegacySidxPath, BasicFile::Mode::kTruncate); - } - return Result; - } - - for (const auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskIndexEntry& Record(Entry.second); - TotalSize += Record.Location.GetSize(); - } - - uint64_t RequiredDiskSpace = TotalSize + ((PayloadAlignment - 1) * LegacyDiskIndex.size()); - uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize; - if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", - RootPath / ContainerBaseName, - MaxRequiredBlockCount, - BlockStoreDiskLocation::MaxBlockIndex); - return Result; - } - - constexpr const uint64_t DiskReserve = 1ul << 28; - - if (CleanSource) - { - if (Space.Free < (MaxBlockSize + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - RootPath / ContainerBaseName, - NiceBytes(MaxBlockSize + DiskReserve), - NiceBytes(Space.Free)); - return Result; - } - } - else - { - if (Space.Free < (RequiredDiskSpace + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - RootPath / ContainerBaseName, - NiceBytes(RequiredDiskSpace + DiskReserve), - NiceBytes(Space.Free)); - return Result; - } - } - - std::filesystem::path SlogPath = GetLogPath(RootPath, ContainerBaseName); - CreateDirectories(SlogPath.parent_path()); - TCasLogFile<CasDiskIndexEntry> CasLog; - CasLog.Open(SlogPath, CasLogFile::Mode::kWrite); - - if (CleanSource && (MaxRequiredBlockCount < 2)) - { - Result.reserve(LegacyDiskIndex.size()); - - // We can use the block as is, just move it and add the blocks to our new log - for (auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskIndexEntry& Record(Entry.second); - - BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()); - BlockStoreDiskLocation NewLocation(NewChunkLocation, PayloadAlignment); - Result.push_back( - {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags}); - } - std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, WriteBlockIndex); - CreateDirectories(BlockPath.parent_path()); - BlockFile.Close(); - std::filesystem::rename(LegacySobsPath, BlockPath); - CasLog.Append(Result); - MigratedChunkCount += Result.size(); - MigratedBlockCount++; - } - else - { - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(LegacyDiskIndex.size()); - for (const auto& Entry : LegacyDiskIndex) - { - ChunkHashes.push_back(Entry.first); - } - - std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { - auto LhsKeyIt = LegacyDiskIndex.find(Lhs); - auto RhsKeyIt = LegacyDiskIndex.find(Rhs); - return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset(); - }); - - uint64_t BlockSize = 0; - uint64_t BlockOffset = 0; - std::vector<BlockStoreLocation> NewLocations; - struct BlockData - { - std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; - uint64_t BlockOffset; - uint64_t BlockSize; - uint32_t BlockIndex; - }; - - std::vector<BlockData> BlockRanges; - std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; - BlockRanges.reserve(MaxRequiredBlockCount); - for (const IoHash& ChunkHash : ChunkHashes) - { - const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; - const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location; - - uint64_t ChunkOffset = LegacyChunkLocation.GetOffset(); - uint64_t ChunkSize = LegacyChunkLocation.GetSize(); - uint64_t ChunkEnd = ChunkOffset + ChunkSize; - - if (BlockSize == 0) - { - BlockOffset = ChunkOffset; - } - if ((ChunkEnd - BlockOffset) > MaxBlockSize) - { - BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; - BlockRange.Chunks.swap(Chunks); - BlockRanges.push_back(BlockRange); - - WriteBlockIndex++; - while (std::filesystem::exists(GetBlockPath(BlocksBasePath, WriteBlockIndex))) - { - ++WriteBlockIndex; - } - BlockOffset = ChunkOffset; - BlockSize = 0; - } - BlockSize = RoundUp(BlockSize, PayloadAlignment); - BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; - Chunks.push_back({ChunkHash, ChunkLocation}); - BlockSize = ChunkEnd - BlockOffset; - } - if (BlockSize > 0) - { - BlockRanges.push_back( - {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); - } - Stopwatch WriteBlockTimer; - - std::reverse(BlockRanges.begin(), BlockRanges.end()); - std::vector<std::uint8_t> Buffer(1 << 28); - for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) - { - const BlockData& BlockRange = BlockRanges[Idx]; - if (Idx > 0) - { - uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; - uint64_t Completed = BlockOffset + BlockSize - Remaining; - uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; - - ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", - RootPath / ContainerBaseName, - Idx, - BlockRanges.size(), - NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), - NiceBytes(BlockOffset + BlockSize), - NiceTimeSpanMs(ETA)); - } - - std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, BlockRange.BlockIndex); - BlockStoreFile ChunkBlock(BlockPath); - ChunkBlock.Create(BlockRange.BlockSize); - uint64_t Offset = 0; - while (Offset < BlockRange.BlockSize) - { - uint64_t Size = BlockRange.BlockSize - Offset; - if (Size > Buffer.size()) - { - Size = Buffer.size(); - } - BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); - ChunkBlock.Write(Buffer.data(), Size, Offset); - Offset += Size; - } - ChunkBlock.Truncate(Offset); - ChunkBlock.Flush(); - - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) - { - const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; - BlockStoreDiskLocation Location(Entry.second, PayloadAlignment); - LogEntries.push_back( - {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags}); - } - CasLog.Append(LogEntries); - Result.insert(Result.end(), LogEntries.begin(), LogEntries.end()); - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; - - if (CleanSource) - { - std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries; - LegacyLogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) - { - LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone}); - } - LegacyCasLog.Append(LegacyLogEntries); - BlockFile.SetFileSize(BlockRange.BlockOffset); - } - } - } - - BlockFile.Close(); - LegacyCasLog.Close(); - CasLog.Close(); - - if (CleanSource) - { - // Older versions of CasContainerStrategy expects the legacy files to exist if it can find - // a CAS manifest and crashes on startup if they don't. - // In order to not break startup when switching back an older version, lets just reset - // the legacy data files to zero length. - - BasicFile LegacyLog; - LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); - BasicFile LegacySobs; - LegacySobs.Open(LegacySobsPath, BasicFile::Mode::kTruncate); - BasicFile LegacySidx; - LegacySidx.Open(LegacySidxPath, BasicFile::Mode::kTruncate); - } - return Result; - } - } // namespace ////////////////////////////////////////////////////////////////////////// @@ -1399,84 +985,483 @@ CasContainerStrategy::MakeIndexSnapshot() } } -void -CasContainerStrategy::OpenContainer(bool IsNewStore) +uint64_t +CasContainerStrategy::ReadIndexFile() { - // Add .running file and delete on clean on close to detect bad termination - m_TotalSize = 0; + std::vector<CasDiskIndexEntry> Entries; + std::filesystem::path SidxPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + if (std::filesystem::is_regular_file(SidxPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([this, &Entries, &Timer] { + ZEN_INFO("read store '{}' index containing #{} entries in {}", + m_Config.RootDirectory / m_ContainerBaseName, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); - m_LocationMap.clear(); + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(SidxPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CasDiskIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); + CasDiskIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && + (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && + (Header.EntryCount <= ExpectedEntryCount)) + { + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); + m_PayloadAlignment = Header.PayloadAlignment; + + std::string InvalidEntryReason; + for (const CasDiskIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", + GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName), + InvalidEntryReason); + continue; + } + m_LocationMap[Entry.Key] = Entry.Location; + } - std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName); + return Header.LogPosition; + } + else + { + ZEN_WARN("skipping invalid index file '{}'", SidxPath); + } + } + } + return 0; +} + +uint64_t +CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) +{ + std::vector<CasDiskIndexEntry> Entries; + std::filesystem::path SlogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); + if (std::filesystem::is_regular_file(SlogPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([this, &Entries, &Timer] { + ZEN_INFO("read store '{}' log containing #{} entries in {}", + m_Config.RootDirectory / m_ContainerBaseName, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<CasDiskIndexEntry> CasLog; + CasLog.Open(SlogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", SlogPath); + SkipEntryCount = 0; + } + uint64_t ReadCount = EntryCount - SkipEntryCount; + Entries.reserve(ReadCount); + CasLog.Replay( + [&](const CasDiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Flags & CasDiskIndexEntry::kTombstone) + { + m_LocationMap.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", + GetLogPath(m_Config.RootDirectory, m_ContainerBaseName), + InvalidEntryReason); + return; + } + m_LocationMap[Record.Key] = Record.Location; + }, + SkipEntryCount); + return ReadCount; + } + } + return 0; +} + +uint64_t +CasContainerStrategy::MigrateLegacyData(bool CleanSource) +{ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName); - bool MakeSnapshot = IsNewStore; + if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0) + { + return 0; + } - if (IsNewStore) + ZEN_INFO("migrating store '{}'", m_Config.RootDirectory / m_ContainerBaseName); + + std::filesystem::path BlocksBasePath = GetBlocksBasePath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path LegacySobsPath = GetLegacyUcasPath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path LegacySidxPath = GetLegacyUidxPath(m_Config.RootDirectory, m_ContainerBaseName); + + uint64_t MigratedChunkCount = 0; + uint32_t MigratedBlockCount = 0; + Stopwatch MigrationTimer; + uint64_t TotalSize = 0; + const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] { + ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", + m_Config.RootDirectory / m_ContainerBaseName, + MigratedChunkCount, + MigratedBlockCount, + NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), + NiceBytes(TotalSize)); + }); + + uint32_t WriteBlockIndex = 0; + while (std::filesystem::exists(GetBlockPath(BlocksBasePath, WriteBlockIndex))) { - std::filesystem::path LegacySobsPath = GetLegacyUcasPath(m_Config.RootDirectory, m_ContainerBaseName); - std::filesystem::remove(LegacyLogPath); - std::filesystem::remove(LegacySobsPath); - std::filesystem::remove_all(BasePath); + ++WriteBlockIndex; + } + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); + if (Error) + { + ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message()); + return 0; } - uint64_t LogPosition = 0; + if (Space.Free < m_MaxBlockSize) { - std::vector<CasDiskIndexEntry> IndexEntries = - ReadIndexFile(m_Config.RootDirectory, m_ContainerBaseName, m_PayloadAlignment, LogPosition); - std::string InvalidEntryReason; - for (const CasDiskIndexEntry& Entry : IndexEntries) + ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", + m_Config.RootDirectory / m_ContainerBaseName, + m_MaxBlockSize, + NiceBytes(Space.Free)); + return 0; + } + + BasicFile BlockFile; + BlockFile.Open(LegacySobsPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + + std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; + + TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; + LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); + { + Stopwatch Timer; + const auto __ = MakeGuard([this, &LegacyDiskIndex, &Timer] { + ZEN_INFO("read store '{}' legacy index containing #{} entries in {}", + m_Config.RootDirectory / m_ContainerBaseName, + LegacyDiskIndex.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + if (LegacyCasLog.Initialize()) { - if (!ValidateEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", - GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName), - InvalidEntryReason); - continue; - } - m_LocationMap[Entry.Key] = Entry.Location; + LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); + LegacyCasLog.Replay( + [&](const LegacyCasDiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone) + { + LegacyDiskIndex.erase(Record.Key); + return; + } + if (!ValidateLegacyEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); + return; + } + if (m_LocationMap.contains(Record.Key)) + { + return; + } + LegacyDiskIndex[Record.Key] = Record; + }, + 0); + } + } + + if (LegacyDiskIndex.empty()) + { + BlockFile.Close(); + LegacyCasLog.Close(); + if (CleanSource) + { + // Older versions of CasContainerStrategy expects the legacy files to exist if it can find + // a CAS manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacySobsPath, BasicFile::Mode::kTruncate); + BasicFile LegacySidx; + LegacySidx.Open(LegacySidxPath, BasicFile::Mode::kTruncate); } + return 0; + } + + for (const auto& Entry : LegacyDiskIndex) + { + const LegacyCasDiskIndexEntry& Record(Entry.second); + TotalSize += Record.Location.GetSize(); + } + + uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size()); + uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize; + if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", + m_Config.RootDirectory / m_ContainerBaseName, + MaxRequiredBlockCount, + BlockStoreDiskLocation::MaxBlockIndex); + return 0; } + constexpr const uint64_t DiskReserve = 1ul << 28; + + if (CleanSource) { - std::vector<CasDiskIndexEntry> LogEntries = ReadLog(m_Config.RootDirectory, m_ContainerBaseName, LogPosition); - std::string InvalidEntryReason; + if (Space.Free < (m_MaxBlockSize + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + m_Config.RootDirectory / m_ContainerBaseName, + NiceBytes(m_MaxBlockSize + DiskReserve), + NiceBytes(Space.Free)); + return 0; + } + } + else + { + if (Space.Free < (RequiredDiskSpace + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + m_Config.RootDirectory / m_ContainerBaseName, + NiceBytes(RequiredDiskSpace + DiskReserve), + NiceBytes(Space.Free)); + return 0; + } + } + + std::filesystem::path SlogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); + CreateDirectories(SlogPath.parent_path()); + TCasLogFile<CasDiskIndexEntry> CasLog; + CasLog.Open(SlogPath, CasLogFile::Mode::kWrite); + + if (CleanSource && (MaxRequiredBlockCount < 2)) + { + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(LegacyDiskIndex.size()); + + // We can use the block as is, just move it and add the blocks to our new log + for (auto& Entry : LegacyDiskIndex) + { + const LegacyCasDiskIndexEntry& Record(Entry.second); + + BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()); + BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment); + LogEntries.push_back( + {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags}); + } + std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, WriteBlockIndex); + CreateDirectories(BlockPath.parent_path()); + BlockFile.Close(); + std::filesystem::rename(LegacySobsPath, BlockPath); + CasLog.Append(LogEntries); for (const CasDiskIndexEntry& Entry : LogEntries) { - if (Entry.Flags & CasDiskIndexEntry::kTombstone) + m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); + } + + MigratedChunkCount += LogEntries.size(); + MigratedBlockCount++; + } + else + { + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(LegacyDiskIndex.size()); + for (const auto& Entry : LegacyDiskIndex) + { + ChunkHashes.push_back(Entry.first); + } + + std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { + auto LhsKeyIt = LegacyDiskIndex.find(Lhs); + auto RhsKeyIt = LegacyDiskIndex.find(Rhs); + return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset(); + }); + + uint64_t BlockSize = 0; + uint64_t BlockOffset = 0; + std::vector<BlockStoreLocation> NewLocations; + struct BlockData + { + std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; + uint64_t BlockOffset; + uint64_t BlockSize; + uint32_t BlockIndex; + }; + + std::vector<BlockData> BlockRanges; + std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; + BlockRanges.reserve(MaxRequiredBlockCount); + for (const IoHash& ChunkHash : ChunkHashes) + { + const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; + const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location; + + uint64_t ChunkOffset = LegacyChunkLocation.GetOffset(); + uint64_t ChunkSize = LegacyChunkLocation.GetSize(); + uint64_t ChunkEnd = ChunkOffset + ChunkSize; + + if (BlockSize == 0) { - m_LocationMap.erase(Entry.Key); - continue; + BlockOffset = ChunkOffset; } - if (!ValidateEntry(Entry, InvalidEntryReason)) + if ((ChunkEnd - BlockOffset) > m_MaxBlockSize) { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", - GetLogPath(m_Config.RootDirectory, m_ContainerBaseName), - InvalidEntryReason); - continue; + BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; + BlockRange.Chunks.swap(Chunks); + BlockRanges.push_back(BlockRange); + + WriteBlockIndex++; + while (std::filesystem::exists(GetBlockPath(BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + BlockOffset = ChunkOffset; + BlockSize = 0; } - m_LocationMap[Entry.Key] = Entry.Location; + BlockSize = RoundUp(BlockSize, m_PayloadAlignment); + BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; + Chunks.push_back({ChunkHash, ChunkLocation}); + BlockSize = ChunkEnd - BlockOffset; } - MakeSnapshot = !LogEntries.empty(); - } - - if (std::filesystem::is_regular_file(LegacyLogPath) && std::filesystem::file_size(LegacyLogPath) > 0) - { - std::unordered_set<IoHash, IoHash::Hasher> ExistingChunks; - ExistingChunks.reserve(m_LocationMap.size()); - for (const auto& Entry : m_LocationMap) + if (BlockSize > 0) { - ExistingChunks.insert(Entry.first); + BlockRanges.push_back( + {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); } - std::vector<CasDiskIndexEntry> LegacyEntries = - MigrateLegacyData(m_Config.RootDirectory, m_ContainerBaseName, m_MaxBlockSize, m_PayloadAlignment, true, ExistingChunks); - for (const CasDiskIndexEntry& Entry : LegacyEntries) + Stopwatch WriteBlockTimer; + + std::reverse(BlockRanges.begin(), BlockRanges.end()); + std::vector<std::uint8_t> Buffer(1 << 28); + for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) { - m_LocationMap[Entry.Key] = Entry.Location; + const BlockData& BlockRange = BlockRanges[Idx]; + if (Idx > 0) + { + uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; + uint64_t Completed = BlockOffset + BlockSize - Remaining; + uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; + + ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", + m_Config.RootDirectory / m_ContainerBaseName, + Idx, + BlockRanges.size(), + NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), + NiceBytes(BlockOffset + BlockSize), + NiceTimeSpanMs(ETA)); + } + + std::filesystem::path BlockPath = GetBlockPath(BlocksBasePath, BlockRange.BlockIndex); + BlockStoreFile ChunkBlock(BlockPath); + ChunkBlock.Create(BlockRange.BlockSize); + uint64_t Offset = 0; + while (Offset < BlockRange.BlockSize) + { + uint64_t Size = BlockRange.BlockSize - Offset; + if (Size > Buffer.size()) + { + Size = Buffer.size(); + } + BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); + ChunkBlock.Write(Buffer.data(), Size, Offset); + Offset += Size; + } + ChunkBlock.Truncate(Offset); + ChunkBlock.Flush(); + + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(BlockRange.Chunks.size()); + for (const auto& Entry : BlockRange.Chunks) + { + const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; + BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment); + LogEntries.push_back( + {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags}); + } + CasLog.Append(LogEntries); + for (const CasDiskIndexEntry& Entry : LogEntries) + { + m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); + } + MigratedChunkCount += LogEntries.size(); + MigratedBlockCount++; + + if (CleanSource) + { + std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries; + LegacyLogEntries.reserve(BlockRange.Chunks.size()); + for (const auto& Entry : BlockRange.Chunks) + { + LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone}); + } + LegacyCasLog.Append(LegacyLogEntries); + BlockFile.SetFileSize(BlockRange.BlockOffset); + } } - MakeSnapshot |= !LegacyEntries.empty(); } + BlockFile.Close(); + LegacyCasLog.Close(); + CasLog.Close(); + + if (CleanSource) + { + // Older versions of CasContainerStrategy expects the legacy files to exist if it can find + // a CAS manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacySobsPath, BasicFile::Mode::kTruncate); + BasicFile LegacySidx; + LegacySidx.Open(LegacySidxPath, BasicFile::Mode::kTruncate); + } + return MigratedChunkCount; +} + +void +CasContainerStrategy::OpenContainer(bool IsNewStore) +{ + // Add .running file and delete on clean on close to detect bad termination + m_TotalSize = 0; + + m_LocationMap.clear(); + + std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName); + + if (IsNewStore) + { + std::filesystem::path LegacySobsPath = GetLegacyUcasPath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName); + + std::filesystem::remove(LegacyLogPath); + std::filesystem::remove(LegacySobsPath); + std::filesystem::remove_all(BasePath); + } + + uint64_t LogPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(LogPosition); + uint64_t LegacyLogEntryCount = MigrateLegacyData(true); + CreateDirectories(BasePath); std::filesystem::path SlogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); @@ -1544,7 +1529,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) CreateDirectories(m_BlocksBasePath); } - if (MakeSnapshot) + if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0)) { MakeIndexSnapshot(); } @@ -2473,41 +2458,40 @@ TEST_CASE("compactcas.migrate.large.data" * doctest::skip(true)) std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs"); std::filesystem::remove_all(TobsBasePath); std::filesystem::remove_all(SobsBasePath); - uint64_t TobsPayloadAlignment = 16; - uint64_t TobsBlockSize = 1u << 28; - std::vector<CasDiskIndexEntry> TobsMigratedChunks = - MigrateLegacyData(BigDataPath, "tobs", TobsBlockSize, TobsPayloadAlignment, false, {}); - CHECK(TobsMigratedChunks.size() > 0); - - uint64_t SobsPayloadAlignment = 4096; - uint64_t SobsBlockSize = 1u << 30; - - std::vector<CasDiskIndexEntry> SobsMigratedChunks = - MigrateLegacyData(BigDataPath, "sobs", SobsBlockSize, SobsPayloadAlignment, false, {}); - CHECK(SobsMigratedChunks.size() > 0); CasStoreConfiguration CasConfig; CasConfig.RootDirectory = BigDataPath; + uint64_t TObsSize = 0; + { + CasGc TobsCasGc; + CasContainerStrategy TobsCas(CasConfig, TobsCasGc); + TobsCas.Initialize("tobs", 1u << 28, 16, false); + TObsSize = TobsCas.StorageSize().DiskSize; + CHECK(TObsSize > 0); + } + + uint64_t SObsSize = 0; + { + CasGc SobsCasGc; + CasContainerStrategy SobsCas(CasConfig, SobsCasGc); + SobsCas.Initialize("sobs", 1u << 30, 4096, false); + SObsSize = SobsCas.StorageSize().DiskSize; + CHECK(SObsSize > 0); + } CasGc TobsCasGc; CasContainerStrategy TobsCas(CasConfig, TobsCasGc); TobsCas.Initialize("tobs", 1u << 28, 16, false); GcContext TobsGcCtx; TobsCas.CollectGarbage(TobsGcCtx); - for (const CasDiskIndexEntry& Entry : TobsMigratedChunks) - { - CHECK(TobsCas.HaveChunk(Entry.Key)); - } + CHECK(TobsCas.StorageSize().DiskSize == TObsSize); CasGc SobsCasGc; CasContainerStrategy SobsCas(CasConfig, SobsCasGc); SobsCas.Initialize("sobs", 1u << 30, 4096, false); GcContext SobsGcCtx; SobsCas.CollectGarbage(SobsGcCtx); - for (const CasDiskIndexEntry& Entry : SobsMigratedChunks) - { - CHECK(SobsCas.HaveChunk(Entry.Key)); - } + CHECK(SobsCas.StorageSize().DiskSize == SObsSize); } #endif |