diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-03 22:23:26 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-05-03 22:23:26 +0200 |
| commit | c5b2435192f382fbaa39a8ff67de16ee3b69b7a6 (patch) | |
| tree | 294bb596d61582744dd7901f6a464c324bdec3d2 /zenstore/compactcas.cpp | |
| parent | Merge pull request #84 from EpicGames/de/cleanup-lock-sharding-in-iobuffer (diff) | |
| parent | macos compilation fix (diff) | |
| download | zen-c5b2435192f382fbaa39a8ff67de16ee3b69b7a6.tar.xz zen-c5b2435192f382fbaa39a8ff67de16ee3b69b7a6.zip | |
Merge pull request #86 from EpicGames/de/block-store-refactor
structured cache with block store
Diffstat (limited to 'zenstore/compactcas.cpp')
| -rw-r--r-- | zenstore/compactcas.cpp | 996 |
1 files changed, 213 insertions, 783 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 920ed965f..7cc742beb 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -48,25 +48,8 @@ struct CasDiskIndexHeader static_assert(sizeof(CasDiskIndexHeader) == 32); namespace { - std::vector<CasDiskIndexEntry> MakeCasDiskEntries(const std::unordered_map<IoHash, BlockStoreDiskLocation>& MovedChunks, - const std::vector<IoHash>& DeletedChunks) - { - std::vector<CasDiskIndexEntry> result; - result.reserve(MovedChunks.size()); - for (const auto& MovedEntry : MovedChunks) - { - result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second}); - } - for (const IoHash& ChunkHash : DeletedChunks) - { - result.push_back({.Key = ChunkHash, .Flags = CasDiskIndexEntry::kTombstone}); - } - return result; - } - const char* IndexExtension = ".uidx"; const char* LogExtension = ".ulog"; - const char* DataExtension = ".ucas"; std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { @@ -93,22 +76,6 @@ namespace { return GetBasePath(RootPath, ContainerBaseName) / "blocks"; } - std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) - { - ExtendablePathBuilder<256> Path; - - char BlockHexString[9]; - ToHexNumber(BlockIndex, BlockHexString); - - Path.Append(BlocksBasePath); - Path.AppendSeparator(); - Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); - Path.AppendSeparator(); - Path.Append(BlockHexString); - Path.Append(DataExtension); - return Path.ToPath(); - } - std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { return RootPath / (ContainerBaseName + LogExtension); @@ -116,7 +83,7 @@ namespace { std::filesystem::path GetLegacyDataPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) { - return RootPath / (ContainerBaseName + DataExtension); + return RootPath / (ContainerBaseName + ".ucas"); } std::filesystem::path GetLegacyIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) @@ -263,53 +230,12 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint3 CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { - uint32_t WriteBlockIndex; - Ref<BlockStoreFile> WriteBlock; - uint64_t InsertOffset; { - RwLock::ExclusiveLockScope _(m_InsertLock); - - { - RwLock::SharedLockScope __(m_LocationMapLock); - if (m_LocationMap.contains(ChunkHash)) - { - return CasStore::InsertResult{.New = false}; - } - } - - // New entry - - WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - bool IsWriting = m_WriteBlock != nullptr; - if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize) + RwLock::SharedLockScope _(m_LocationMapLock); + if (m_LocationMap.contains(ChunkHash)) { - if (m_WriteBlock) - { - m_WriteBlock = nullptr; - } - { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) - { - throw std::runtime_error( - fmt::format("unable to allocate a new block in '{}'", m_Config.RootDirectory / m_ContainerBaseName)); - } - WriteBlockIndex += IsWriting ? 1 : 0; - while (m_ChunkBlocks.contains(WriteBlockIndex)) - { - WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - } - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - m_WriteBlock = new BlockStoreFile(BlockPath); - m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; - m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); - } - m_CurrentInsertOffset = 0; - m_WriteBlock->Create(m_MaxBlockSize); + return CasStore::InsertResult{.New = false}; } - InsertOffset = m_CurrentInsertOffset; - m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment); - WriteBlock = m_WriteBlock; } // We can end up in a situation that InsertChunk writes the same chunk data in @@ -324,17 +250,15 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const // This should be a rare occasion and the current flow reduces the time we block for // reads, insert and GC. - BlockStoreDiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment); - const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; - - WriteBlock->Write(ChunkData, ChunkSize, InsertOffset); + BlockStoreLocation Location = m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment); + BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); + const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; m_CasLog.Append(IndexEntry); - - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst); { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - m_LocationMap.emplace(ChunkHash, Location); + RwLock::ExclusiveLockScope _(m_LocationMapLock); + m_LocationMap.emplace(ChunkHash, DiskLocation); } + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); return CasStore::InsertResult{.New = true}; } @@ -348,20 +272,21 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { - Ref<BlockStoreFile> ChunkBlock; - BlockStoreLocation Location; + RwLock::SharedLockScope _(m_LocationMapLock); + auto KeyIt = m_LocationMap.find(ChunkHash); + if (KeyIt == m_LocationMap.end()) { - RwLock::SharedLockScope _(m_LocationMapLock); - if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) - { - Location = KeyIt->second.Get(m_PayloadAlignment); - ChunkBlock = m_ChunkBlocks[Location.BlockIndex]; - } - else - { - return IoBuffer(); - } + return IoBuffer(); + } + BlockStoreLocation Location = KeyIt->second.Get(m_PayloadAlignment); + _.ReleaseNow(); + + Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); + if (!ChunkBlock) + { + return IoBuffer(); } + return ChunkBlock->GetChunk(Location.Offset, Location.Size); } @@ -388,128 +313,94 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) void CasContainerStrategy::Flush() { - { - RwLock::ExclusiveLockScope _(m_InsertLock); - if (m_CurrentInsertOffset > 0) - { - uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - m_WriteBlock = nullptr; - m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); - m_CurrentInsertOffset = 0; - } - } + m_BlockStore.Flush(); MakeIndexSnapshot(); } void CasContainerStrategy::Scrub(ScrubContext& Ctx) { - std::vector<CasDiskIndexEntry> BadChunks; - - // We do a read sweep through the payloads file and validate - // any entries that are contained within each segment, with - // the assumption that most entries will be checked in this - // pass. An alternative strategy would be to use memory mapping. + RwLock::SharedLockScope _(m_LocationMapLock); + uint64_t TotalChunkCount = m_LocationMap.size(); + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); { - std::vector<CasDiskIndexEntry> BigChunks; - const uint64_t WindowSize = 4 * 1024 * 1024; - IoBuffer ReadBuffer{WindowSize}; - void* BufferBase = ReadBuffer.MutableData(); - - RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time? - RwLock::SharedLockScope __(m_LocationMapLock); - - for (const auto& Block : m_ChunkBlocks) + for (const auto& Entry : m_LocationMap) { - uint64_t WindowStart = 0; - uint64_t WindowEnd = WindowSize; - const Ref<BlockStoreFile>& BlockFile = Block.second; - BlockFile->Open(); - const uint64_t FileSize = BlockFile->FileSize(); - - do - { - const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); - BlockFile->Read(BufferBase, ChunkSize, WindowStart); - - for (auto& Entry : m_LocationMap) - { - const BlockStoreLocation Location = Entry.second.Get(m_PayloadAlignment); - const uint64_t EntryOffset = Location.Offset; + const IoHash& ChunkHash = Entry.first; + const BlockStoreDiskLocation& DiskLocation = Entry.second; + BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); - if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) - { - const uint64_t EntryEnd = EntryOffset + Location.Size; - - if (EntryEnd >= WindowEnd) - { - BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); - - continue; - } - - const IoHash ComputedHash = - IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Location.Offset - WindowStart, Location.Size); - - if (Entry.first != ComputedHash) - { - // Hash mismatch - BadChunks.push_back({.Key = Entry.first, .Location = Entry.second, .Flags = CasDiskIndexEntry::kTombstone}); - } - } - } - - WindowStart += WindowSize; - WindowEnd += WindowSize; - } while (WindowStart < FileSize); + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; } + } - // Deal with large chunks - - for (const CasDiskIndexEntry& Entry : BigChunks) - { - IoHashStream Hasher; - const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment); - const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex]; - BlockFile->StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); - IoHash ComputedHash = Hasher.GetHash(); + std::vector<IoHash> BadKeys; - if (Entry.Key != ComputedHash) + m_BlockStore.IterateChunks( + ChunkLocations, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + const IoHash ComputedHash = IoHash::HashBuffer(Data, Size); + const IoHash& ExpectedHash = ChunkIndexToChunkHash[ChunkIndex]; + if (ComputedHash != ExpectedHash) { - BadChunks.push_back({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone}); + // Hash mismatch + BadKeys.push_back(ExpectedHash); } - } - } + }, + [&](size_t ChunkIndex, Ref<BlockStoreFile> BlockFile, uint64_t Offset, uint64_t Size) { + IoHashStream Hasher; + BlockFile->StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); + IoHash ComputedHash = Hasher.GetHash(); + const IoHash& ExpectedHash = ChunkIndexToChunkHash[ChunkIndex]; + if (ComputedHash != ExpectedHash) + { + // Hash mismatch + BadKeys.push_back(ExpectedHash); + } + }); - if (BadChunks.empty()) + if (BadKeys.empty()) { return; } - ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_Config.RootDirectory / m_ContainerBaseName); + ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_Config.RootDirectory / m_ContainerBaseName); - // Deal with bad chunks by removing them from our lookup map + _.ReleaseNow(); - std::vector<IoHash> BadChunkHashes; - BadChunkHashes.reserve(BadChunks.size()); - - m_CasLog.Append(BadChunks); + if (Ctx.RunRecovery()) { - RwLock::ExclusiveLockScope _(m_LocationMapLock); - for (const CasDiskIndexEntry& Entry : BadChunks) + // Deal with bad chunks by removing them from our lookup map + + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(BadKeys.size()); { - BadChunkHashes.push_back(Entry.Key); - m_LocationMap.erase(Entry.Key); + RwLock::ExclusiveLockScope __(m_LocationMapLock); + for (const IoHash& ChunkHash : BadKeys) + { + const auto KeyIt = m_LocationMap.find(ChunkHash); + if (KeyIt == m_LocationMap.end()) + { + // Might have been GC'd + continue; + } + LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); + m_LocationMap.erase(KeyIt); + } } + m_CasLog.Append(LogEntries); } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do - - Ctx.ReportBadCasChunks(BadChunkHashes); + Ctx.ReportBadCasChunks(BadKeys); } void @@ -533,93 +424,33 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) // We update the index as we complete each new block file. This makes it possible // to break the GC if we want to limit time for execution. // - // GC can fairly parallell to regular operation - it will block while taking - // a snapshot of the current m_LocationMap state. - // - // While moving blocks it will do a blocking operation and update the m_LocationMap - // after each new block is written and figuring out the path to the next new block. + // GC can very parallell to regular operation - it will block while taking + // a snapshot of the current m_LocationMap state and while moving blocks it will + // do a blocking operation and update the m_LocationMap after each new block is + // written and figuring out the path to the next new block. ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); + uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; uint64_t ReadBlockTimeUs = 0; uint64_t ReadBlockLongestTimeUs = 0; - uint64_t TotalChunkCount = 0; - uint64_t DeletedSize = 0; - uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); - std::vector<IoHash> DeletedChunks; - uint64_t MovedCount = 0; - - Stopwatch TotalTimer; - const auto _ = MakeGuard([this, - &TotalTimer, - &WriteBlockTimeUs, - &WriteBlockLongestTimeUs, - &ReadBlockTimeUs, - &ReadBlockLongestTimeUs, - &TotalChunkCount, - &DeletedChunks, - &MovedCount, - &DeletedSize, - OldTotalSize] { - ZEN_INFO( - "garbage collect for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " - "#{} " - "of #{} " - "chunks ({}).", - m_Config.RootDirectory / m_ContainerBaseName, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - NiceLatencyNs(WriteBlockTimeUs), - NiceLatencyNs(WriteBlockLongestTimeUs), - NiceLatencyNs(ReadBlockTimeUs), - NiceLatencyNs(ReadBlockLongestTimeUs), - NiceBytes(DeletedSize), - DeletedChunks.size(), - MovedCount, - TotalChunkCount, - NiceBytes(OldTotalSize)); - }); - - LocationMap_t LocationMap; - size_t BlockCount; - uint64_t ExcludeBlockIndex = 0x800000000ull; + LocationMap_t LocationMap; + BlockStore::ReclaimSnapshotState BlockStoreState; { - RwLock::SharedLockScope __(m_InsertLock); RwLock::SharedLockScope ___(m_LocationMapLock); - { - Stopwatch Timer; - const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_WriteBlock) - { - ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - } - __.ReleaseNow(); - } - LocationMap = m_LocationMap; - BlockCount = m_ChunkBlocks.size(); - } - - if (LocationMap.empty()) - { - ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName); - return; + Stopwatch Timer; + const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + LocationMap = m_LocationMap; + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); } - TotalChunkCount = LocationMap.size(); - - std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; - std::vector<std::vector<IoHash>> KeepChunks; - std::vector<std::vector<IoHash>> DeleteChunks; - - BlockIndexToChunkMapIndex.reserve(BlockCount); - KeepChunks.reserve(BlockCount); - DeleteChunks.reserve(BlockCount); - size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; + uint64_t TotalChunkCount = LocationMap.size(); std::vector<IoHash> TotalChunkHashes; TotalChunkHashes.reserve(TotalChunkCount); @@ -628,272 +459,82 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) TotalChunkHashes.push_back(Entry.first); } - uint64_t DeleteCount = 0; + std::vector<BlockStoreLocation> ChunkLocations; + BlockStore::ChunkIndexArray KeepChunkIndexes; + std::vector<IoHash> ChunkIndexToChunkHash; + ChunkLocations.reserve(TotalChunkCount); + ChunkIndexToChunkHash.reserve(TotalChunkCount); - uint64_t NewTotalSize = 0; GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { - auto KeyIt = LocationMap.find(ChunkHash); - const BlockStoreDiskLocation& Location = KeyIt->second; - uint32_t BlockIndex = Location.GetBlockIndex(); + auto KeyIt = LocationMap.find(ChunkHash); + const BlockStoreDiskLocation& DiskLocation = KeyIt->second; + BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); + size_t ChunkIndex = ChunkLocations.size(); - if (static_cast<uint64_t>(BlockIndex) == ExcludeBlockIndex) - { - return; - } - - auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex); - size_t ChunkMapIndex = 0; - if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) - { - ChunkMapIndex = KeepChunks.size(); - BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex; - KeepChunks.resize(ChunkMapIndex + 1); - KeepChunks.back().reserve(GuesstimateCountPerBlock); - DeleteChunks.resize(ChunkMapIndex + 1); - DeleteChunks.back().reserve(GuesstimateCountPerBlock); - } - else - { - ChunkMapIndex = BlockIndexPtr->second; - } + ChunkLocations.push_back(Location); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; if (Keep) { - std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex]; - ChunkMap.push_back(ChunkHash); - NewTotalSize += Location.GetSize(); - } - else - { - std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; - ChunkMap.push_back(ChunkHash); - DeleteCount++; + KeepChunkIndexes.push_back(ChunkIndex); } }); - std::unordered_set<uint32_t> BlocksToReWrite; - BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); - for (const auto& Entry : BlockIndexToChunkMapIndex) - { - uint32_t BlockIndex = Entry.first; - size_t ChunkMapIndex = Entry.second; - const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; - if (ChunkMap.empty()) - { - continue; - } - BlocksToReWrite.insert(BlockIndex); - } - const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); if (!PerformDelete) { - uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); - ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}", - m_Config.RootDirectory / m_ContainerBaseName, - DeleteCount, - NiceBytes(TotalSize - NewTotalSize), - TotalChunkCount, - NiceBytes(TotalSize)); + m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true); return; } - // Move all chunks in blocks that have chunks removed to new blocks - - Ref<BlockStoreFile> NewBlockFile; - uint64_t WriteOffset = 0; - uint32_t NewBlockIndex = 0; - DeletedChunks.reserve(DeleteCount); - - auto UpdateLocations = [this](const std::span<CasDiskIndexEntry>& Entries) { - for (const CasDiskIndexEntry& Entry : Entries) - { - if (Entry.Flags & CasDiskIndexEntry::kTombstone) + std::vector<IoHash> DeletedChunks; + m_BlockStore.ReclaimSpace( + BlockStoreState, + ChunkLocations, + KeepChunkIndexes, + m_PayloadAlignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) { + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(MovedChunks.size() + RemovedChunks.size()); + for (const auto& Entry : MovedChunks) { - auto KeyIt = m_LocationMap.find(Entry.Key); - uint64_t ChunkSize = KeyIt->second.GetSize(); - m_TotalSize.fetch_sub(ChunkSize); - m_LocationMap.erase(KeyIt); - continue; + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}}); + } + for (const size_t ChunkIndex : RemovedChunks) + { + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const BlockStoreDiskLocation& OldDiskLocation = LocationMap[ChunkHash]; + LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone}); + DeletedChunks.push_back(ChunkHash); } - m_LocationMap[Entry.Key] = Entry.Location; - } - }; - - std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks; - for (uint32_t BlockIndex : BlocksToReWrite) - { - const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; - - Ref<BlockStoreFile> OldBlockFile; - { - RwLock::SharedLockScope _i(m_LocationMapLock); - OldBlockFile = m_ChunkBlocks[BlockIndex]; - } - const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex]; - if (KeepMap.empty()) - { - const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; - std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries({}, DeleteMap); m_CasLog.Append(LogEntries); m_CasLog.Flush(); { - RwLock::ExclusiveLockScope _i(m_LocationMapLock); + RwLock::ExclusiveLockScope __(m_LocationMapLock); Stopwatch Timer; - const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + const auto ____ = MakeGuard([&] { uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); - UpdateLocations(LogEntries); - m_ChunkBlocks[BlockIndex] = nullptr; - } - DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); - ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", - m_ContainerBaseName, - BlockIndex, - OldBlockFile->GetPath()); - std::error_code Ec; - OldBlockFile->MarkAsDeleteOnClose(Ec); - if (Ec) - { - ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); - } - continue; - } - - std::vector<uint8_t> Chunk; - for (const IoHash& ChunkHash : KeepMap) - { - auto KeyIt = LocationMap.find(ChunkHash); - const BlockStoreLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment); - Chunk.resize(ChunkLocation.Size); - OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - - if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) - { - uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); - std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, {}); - m_CasLog.Append(LogEntries); - m_CasLog.Flush(); - - if (NewBlockFile) - { - NewBlockFile->Truncate(WriteOffset); - NewBlockFile->Flush(); - } - { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - UpdateLocations(LogEntries); - if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) - { - ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", - m_Config.RootDirectory / m_ContainerBaseName, - static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); - return; - } - while (m_ChunkBlocks.contains(NextBlockIndex)) - { - NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; - } - std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); - NewBlockFile = new BlockStoreFile(NewBlockPath); - m_ChunkBlocks[NextBlockIndex] = NewBlockFile; - } - - MovedCount += MovedBlockChunks.size(); - MovedBlockChunks.clear(); - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); - if (Error) + for (const CasDiskIndexEntry& Entry : LogEntries) { - ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_Config.RootDirectory, Error.message()); - return; - } - if (Space.Free < m_MaxBlockSize) - { - uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve(); - if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + if (Entry.Flags & CasDiskIndexEntry::kTombstone) { - ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", - m_Config.RootDirectory / m_ContainerBaseName, - m_MaxBlockSize, - NiceBytes(Space.Free + ReclaimedSpace)); - RwLock::ExclusiveLockScope _l(m_LocationMapLock); - Stopwatch Timer; - const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - m_ChunkBlocks.erase(NextBlockIndex); - return; + m_LocationMap.erase(Entry.Key); + uint64_t ChunkSize = Entry.Location.GetSize(); + m_TotalSize.fetch_sub(ChunkSize); + continue; } - - ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", - m_Config.RootDirectory / m_ContainerBaseName, - ReclaimedSpace, - NiceBytes(Space.Free + ReclaimedSpace)); + m_LocationMap[Entry.Key] = Entry.Location; } - NewBlockFile->Create(m_MaxBlockSize); - NewBlockIndex = NextBlockIndex; - WriteOffset = 0; } - - NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); - MovedBlockChunks.emplace( - ChunkHash, - BlockStoreDiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, m_PayloadAlignment)); - WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); - } - Chunk.clear(); - if (NewBlockFile) - { - NewBlockFile->Truncate(WriteOffset); - NewBlockFile->Flush(); - NewBlockFile = {}; - } - - const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; - std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap); - m_CasLog.Append(LogEntries); - m_CasLog.Flush(); - { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - Stopwatch Timer; - const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - ReadBlockTimeUs += ElapsedUs; - ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); - }); - UpdateLocations(LogEntries); - m_ChunkBlocks[BlockIndex] = nullptr; - } - MovedCount += MovedBlockChunks.size(); - DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); - MovedBlockChunks.clear(); - - ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_ContainerBaseName, BlockIndex, OldBlockFile->GetPath()); - std::error_code Ec; - OldBlockFile->MarkAsDeleteOnClose(Ec); - if (Ec) - { - ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); - } - OldBlockFile = nullptr; - } - - for (const IoHash& ChunkHash : DeletedChunks) - { - DeletedSize += LocationMap[ChunkHash].GetSize(); - } + }, + [&GcCtx]() { return GcCtx.CollectSmallObjects(); }); GcCtx.DeletedCas(DeletedChunks); } @@ -904,7 +545,7 @@ CasContainerStrategy::MakeIndexSnapshot() ZEN_INFO("write store snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName); uint64_t EntryCount = 0; Stopwatch Timer; - const auto _ = MakeGuard([this, &EntryCount, &Timer] { + const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, EntryCount, @@ -935,7 +576,6 @@ CasContainerStrategy::MakeIndexSnapshot() std::vector<CasDiskIndexEntry> Entries; { - RwLock::SharedLockScope __(m_InsertLock); RwLock::SharedLockScope ___(m_LocationMapLock); Entries.resize(m_LocationMap.size()); @@ -990,7 +630,7 @@ CasContainerStrategy::ReadIndexFile() if (std::filesystem::is_regular_file(IndexPath)) { Stopwatch Timer; - const auto _ = MakeGuard([this, &Entries, &Timer] { + const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing #{} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, Entries.size(), @@ -1043,7 +683,7 @@ CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) if (std::filesystem::is_regular_file(LogPath)) { Stopwatch Timer; - const auto _ = MakeGuard([this, &Entries, &Timer] { + const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing #{} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, Entries.size(), @@ -1103,7 +743,7 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) uint32_t MigratedBlockCount = 0; Stopwatch MigrationTimer; uint64_t TotalSize = 0; - const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] { + const auto _ = MakeGuard([&] { ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", m_Config.RootDirectory / m_ContainerBaseName, MigratedChunkCount, @@ -1112,32 +752,13 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) NiceBytes(TotalSize)); }); - uint32_t WriteBlockIndex = 0; - while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + uint64_t BlockFileSize = 0; { - ++WriteBlockIndex; + BasicFile BlockFile; + BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); + BlockFileSize = BlockFile.FileSize(); } - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); - if (Error) - { - ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message()); - return 0; - } - - if (Space.Free < m_MaxBlockSize) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", - m_Config.RootDirectory / m_ContainerBaseName, - m_MaxBlockSize, - NiceBytes(Space.Free)); - return 0; - } - - BasicFile BlockFile; - BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; uint64_t InvalidEntryCount = 0; @@ -1145,7 +766,7 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); { Stopwatch Timer; - const auto __ = MakeGuard([this, &LegacyDiskIndex, &Timer] { + const auto __ = MakeGuard([&] { ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, LegacyDiskIndex.size(), @@ -1173,7 +794,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) 0); std::vector<IoHash> BadEntries; - uint64_t BlockFileSize = BlockFile.FileSize(); for (const auto& Entry : LegacyDiskIndex) { const LegacyCasDiskIndexEntry& Record(Entry.second); @@ -1199,7 +819,6 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) if (LegacyDiskIndex.empty()) { - BlockFile.Close(); LegacyCasLog.Close(); if (CleanSource) { @@ -1218,219 +837,75 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) return 0; } - for (const auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskIndexEntry& Record(Entry.second); - TotalSize += Record.Location.GetSize(); - } - - uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size()); - uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize; - if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", - m_Config.RootDirectory / m_ContainerBaseName, - MaxRequiredBlockCount, - BlockStoreDiskLocation::MaxBlockIndex); - return 0; - } - - constexpr const uint64_t DiskReserve = 1ul << 28; - - if (CleanSource) - { - if (Space.Free < (m_MaxBlockSize + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(m_MaxBlockSize + DiskReserve), - NiceBytes(Space.Free)); - return 0; - } - } - else - { - if (Space.Free < (RequiredDiskSpace + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(RequiredDiskSpace + DiskReserve), - NiceBytes(Space.Free)); - return 0; - } - } - std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); CreateDirectories(LogPath.parent_path()); TCasLogFile<CasDiskIndexEntry> CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - if (CleanSource && (MaxRequiredBlockCount < 2)) - { - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(LegacyDiskIndex.size()); - - // We can use the block as is, just move it and add the blocks to our new log - for (auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskIndexEntry& Record(Entry.second); - - BlockStoreLocation NewChunkLocation{WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()}; - BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment); - LogEntries.push_back( - {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags}); - } - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - CreateDirectories(BlockPath.parent_path()); - BlockFile.Close(); - std::filesystem::rename(LegacyDataPath, BlockPath); - CasLog.Append(LogEntries); - for (const CasDiskIndexEntry& Entry : LogEntries) - { - m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); - } - - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; - } - else + std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash; + std::vector<BlockStoreLocation> ChunkLocations; + ChunkIndexToChunkHash.reserve(LegacyDiskIndex.size()); + ChunkLocations.reserve(LegacyDiskIndex.size()); + for (const auto& Entry : LegacyDiskIndex) { - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(LegacyDiskIndex.size()); - for (const auto& Entry : LegacyDiskIndex) - { - ChunkHashes.push_back(Entry.first); - } - - std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { - auto LhsKeyIt = LegacyDiskIndex.find(Lhs); - auto RhsKeyIt = LegacyDiskIndex.find(Rhs); - return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset(); - }); - - uint64_t BlockSize = 0; - uint64_t BlockOffset = 0; - std::vector<BlockStoreLocation> NewLocations; - struct BlockData - { - std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; - uint64_t BlockOffset; - uint64_t BlockSize; - uint32_t BlockIndex; - }; - - std::vector<BlockData> BlockRanges; - std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; - BlockRanges.reserve(MaxRequiredBlockCount); - for (const IoHash& ChunkHash : ChunkHashes) - { - const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; - const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location; - - uint64_t ChunkOffset = LegacyChunkLocation.GetOffset(); - uint64_t ChunkSize = LegacyChunkLocation.GetSize(); - uint64_t ChunkEnd = ChunkOffset + ChunkSize; - - if (BlockSize == 0) - { - BlockOffset = ChunkOffset; - } - if ((ChunkEnd - BlockOffset) > m_MaxBlockSize) - { - BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; - BlockRange.Chunks.swap(Chunks); - BlockRanges.push_back(BlockRange); - - WriteBlockIndex++; - while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) - { - ++WriteBlockIndex; - } - BlockOffset = ChunkOffset; - BlockSize = 0; - } - BlockSize = RoundUp(BlockSize, m_PayloadAlignment); - BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; - Chunks.push_back({ChunkHash, ChunkLocation}); - BlockSize = ChunkEnd - BlockOffset; - } - if (BlockSize > 0) - { - BlockRanges.push_back( - {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); - } - Stopwatch WriteBlockTimer; - - std::reverse(BlockRanges.begin(), BlockRanges.end()); - std::vector<std::uint8_t> Buffer(1 << 28); - for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) - { - const BlockData& BlockRange = BlockRanges[Idx]; - if (Idx > 0) - { - uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; - uint64_t Completed = BlockOffset + BlockSize - Remaining; - uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; - - ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", - m_Config.RootDirectory / m_ContainerBaseName, - Idx, - BlockRanges.size(), - NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), - NiceBytes(BlockOffset + BlockSize), - NiceTimeSpanMs(ETA)); - } - - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); - BlockStoreFile ChunkBlock(BlockPath); - ChunkBlock.Create(BlockRange.BlockSize); - uint64_t Offset = 0; - while (Offset < BlockRange.BlockSize) - { - uint64_t Size = BlockRange.BlockSize - Offset; - if (Size > Buffer.size()) - { - Size = Buffer.size(); - } - BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); - ChunkBlock.Write(Buffer.data(), Size, Offset); - Offset += Size; - } - ChunkBlock.Truncate(Offset); - ChunkBlock.Flush(); - + const LegacyCasDiskLocation& Location = Entry.second.Location; + const IoHash& ChunkHash = Entry.first; + size_t ChunkIndex = ChunkLocations.size(); + ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.GetOffset(), .Size = Location.GetSize()}); + ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; + TotalSize += Location.GetSize(); + } + m_BlockStore.Split( + ChunkLocations, + LegacyDataPath, + m_BlocksBasePath, + m_MaxBlockSize, + BlockStoreDiskLocation::MaxBlockIndex + 1, + m_PayloadAlignment, + CleanSource, + [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( + const BlockStore::MovedChunksArray& MovedChunks) { std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) + LogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) { - const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; - BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment); - LogEntries.push_back( - {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags}); + size_t ChunkIndex = Entry.first; + const BlockStoreLocation& NewLocation = Entry.second; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + LogEntries.push_back({.Key = ChunkHash, + .Location = {NewLocation, m_PayloadAlignment}, + .ContentType = OldEntry.ContentType, + .Flags = OldEntry.Flags}); } - CasLog.Append(LogEntries); for (const CasDiskIndexEntry& Entry : LogEntries) { m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); } - MigratedChunkCount += LogEntries.size(); - MigratedBlockCount++; - + CasLog.Append(LogEntries); + CasLog.Flush(); if (CleanSource) { std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries; - LegacyLogEntries.reserve(BlockRange.Chunks.size()); - for (const auto& Entry : BlockRange.Chunks) + LegacyLogEntries.reserve(MovedChunks.size()); + for (const auto& Entry : MovedChunks) { - LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone}); + size_t ChunkIndex = Entry.first; + const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; + const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; + LegacyLogEntries.push_back( + LegacyCasDiskIndexEntry{.Key = ChunkHash, + .Location = OldEntry.Location, + .ContentType = OldEntry.ContentType, + .Flags = (uint8_t)(OldEntry.Flags | LegacyCasDiskIndexEntry::kTombstone)}); } LegacyCasLog.Append(LegacyLogEntries); - BlockFile.SetFileSize(BlockRange.BlockOffset); + LegacyCasLog.Flush(); } - } - } + MigratedBlockCount++; + MigratedChunkCount += MovedChunks.size(); + }); - BlockFile.Close(); LegacyCasLog.Close(); CasLog.Close(); @@ -1480,67 +955,16 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - std::unordered_set<uint32_t> KnownBlocks; + std::vector<BlockStoreLocation> KnownLocations; + KnownLocations.reserve(m_LocationMap.size()); for (const auto& Entry : m_LocationMap) { const BlockStoreDiskLocation& Location = Entry.second; - m_TotalSize.fetch_add(Location.GetSize(), std::memory_order_seq_cst); - KnownBlocks.insert(Location.GetBlockIndex()); + m_TotalSize.fetch_add(Location.GetSize(), std::memory_order::relaxed); + KnownLocations.push_back(Location.Get(m_PayloadAlignment)); } - if (std::filesystem::is_directory(m_BlocksBasePath)) - { - std::vector<std::filesystem::path> FoldersToScan; - FoldersToScan.push_back(m_BlocksBasePath); - size_t FolderOffset = 0; - while (FolderOffset < FoldersToScan.size()) - { - for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) - { - if (Entry.is_directory()) - { - FoldersToScan.push_back(Entry.path()); - continue; - } - if (Entry.is_regular_file()) - { - const std::filesystem::path Path = Entry.path(); - if (Path.extension() != DataExtension) - { - continue; - } - std::string FileName = Path.stem().string(); - uint32_t BlockIndex; - bool OK = ParseHexNumber(FileName, BlockIndex); - if (!OK) - { - continue; - } - if (!KnownBlocks.contains(BlockIndex)) - { - // Log removing unreferenced block - // Clear out unused blocks - ZEN_INFO("removing unused block for '{}' at '{}'", m_ContainerBaseName, Path); - std::error_code Ec; - std::filesystem::remove(Path, Ec); - if (Ec) - { - ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); - } - continue; - } - Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path); - BlockFile->Open(); - m_ChunkBlocks[BlockIndex] = BlockFile; - } - } - ++FolderOffset; - } - } - else - { - CreateDirectories(m_BlocksBasePath); - } + m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0)) { @@ -2195,7 +1619,7 @@ TEST_CASE("compactcas.legacyconversion") Gc.CollectGarbage(GcCtx); } - std::filesystem::path BlockPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1); + std::filesystem::path BlockPath = BlockStore::GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1); std::filesystem::path LegacyDataPath = GetLegacyDataPath(CasConfig.RootDirectory, "test"); std::filesystem::rename(BlockPath, LegacyDataPath); @@ -2463,7 +1887,13 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) { ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { CHECK(Cas.HaveChunk(ChunkHash)); - CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + if (ChunkHash != IoHash::HashBuffer(Cas.FindChunk(ChunkHash))) + { + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + CHECK(Buffer); + IoHash BufferHash = IoHash::HashBuffer(Buffer); + CHECK(ChunkHash == BufferHash); + } WorkCompleted.fetch_add(1); }); } |