diff options
| author | Dan Engelbrecht <[email protected]> | 2025-06-09 09:03:39 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-06-09 09:03:39 +0200 |
| commit | 6f2d68d2c11011d541259d0037908dd76eadeb8a (patch) | |
| tree | 4fa165343dd42544dded51fad0e13ebae44dd442 | |
| parent | 5.6.10-pre0 (diff) | |
| download | zen-6f2d68d2c11011d541259d0037908dd76eadeb8a.tar.xz zen-6f2d68d2c11011d541259d0037908dd76eadeb8a.zip | |
missing chunks bugfix (#424)
* make sure to close log file when resetting log
* drop entries that refers to missing blocks
* Don't scrub keys that has been rewritten
* currectly count added bytes / m_TotalSize
* fix negative sleep time in BlockStoreFile::Open()
* be defensive when fetching log position
* append to log files *after* we updated all state successfully
* explicitly close stuff in destructors with exception catching
* clean up empty size block store files
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 53 | ||||
| -rw-r--r-- | src/zenstore/buildstore/buildstore.cpp | 98 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 84 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 306 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 9 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 6 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h | 7 |
9 files changed, 481 insertions, 85 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f90dfc19..b35eb650d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - Bugfix: Zen CLI now initializes Sentry after command line options parsing, so that the options can be propetly taken into account during init - Bugfix: Revert: `zen builds upload` now use the system temp directory for temporary files leaving the source folder untouched - Bugfix: Use selected subcommand when displaying help for failed command line options in zen builds +- Bugfix: Make sure we recreate the log file in CAS/cache bucket when creating snapshot at startup causing lost changes. UE-291196 ## 5.6.9 - Bugfix: Remove long running exclusive namespace wide locks when dropping buckets or namespaces diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index a2e73380f..6359b9db9 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1010,8 +1010,8 @@ struct ProjectStore::OplogStorage : public RefCounted .OpCoreHash = OpData.OpCoreHash, .OpKeyHash = OpData.KeyHash}; - m_Oplog.Append(Entry); m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset); + m_Oplog.Append(Entry); return Entry; } diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 5081ae65d..7b56c64bd 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -70,7 +70,7 @@ BlockStoreFile::Open() return false; } ZEN_WARN("Failed to open cas block '{}', reason: '{}', retries left: {}.", m_Path, Ec.message(), RetriesLeft); - Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms + Sleep(100 + (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; }); @@ -286,6 +286,14 @@ BlockStore::BlockStore() BlockStore::~BlockStore() { + try + { + Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~BlockStore() failed with: ", Ex.what()); + } } void @@ -307,6 +315,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max if (IsDir(m_BlocksBasePath)) { + std::vector<std::filesystem::path> EmptyBlockFiles; uint32_t NextBlockIndex = 0; std::vector<std::filesystem::path> FoldersToScan; FoldersToScan.push_back(m_BlocksBasePath); @@ -334,6 +343,12 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max { continue; } + if (Entry.file_size() == 0) + { + EmptyBlockFiles.push_back(Path); + continue; + } + Ref<BlockStoreFile> BlockFile{new BlockStoreFile(Path)}; BlockFile->Open(); m_TotalSize.fetch_add(BlockFile->TotalSize(), std::memory_order::relaxed); @@ -347,6 +362,17 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max } ++FolderOffset; } + + for (const std::filesystem::path& EmptyBlockFile : EmptyBlockFiles) + { + std::error_code Ec; + RemoveFile(EmptyBlockFile, Ec); + if (Ec) + { + ZEN_WARN("Unable to remove empty block file {}. Reason: {}", EmptyBlockFile, Ec.message()); + } + } + m_WriteBlockIndex.store(NextBlockIndex, std::memory_order_release); } else @@ -355,7 +381,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max } } -void +BlockStore::BlockIndexSet BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks) { ZEN_MEMSCOPE(GetBlocksTag()); @@ -363,8 +389,8 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks) RwLock::ExclusiveLockScope InsertLock(m_InsertLock); - tsl::robin_set<uint32_t> MissingBlocks; - tsl::robin_set<uint32_t> DeleteBlocks; + BlockIndexSet MissingBlocks; + BlockIndexSet DeleteBlocks; DeleteBlocks.reserve(m_ChunkBlocks.size()); for (auto It : m_ChunkBlocks) { @@ -383,13 +409,6 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks) MissingBlocks.insert(BlockIndex); } } - for (std::uint32_t BlockIndex : MissingBlocks) - { - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex); - Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath)); - NewBlockFile->Create(0); - m_ChunkBlocks[BlockIndex] = NewBlockFile; - } for (std::uint32_t BlockIndex : DeleteBlocks) { std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex); @@ -400,6 +419,7 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks) } m_ChunkBlocks.erase(BlockIndex); } + return MissingBlocks; } BlockStore::BlockEntryCountMap @@ -1037,6 +1057,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, { Continue = ChangeCallback(MovedChunks, ScrubbedChunks, RemovedSize > AddedSize ? RemovedSize - AddedSize : 0); DeletedSize += RemovedSize; + m_TotalSize.fetch_add(AddedSize); RemovedSize = 0; AddedSize = 0; MovedCount += MovedChunks.size(); @@ -1220,14 +1241,13 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, NiceBytes(Space.Free + ReclaimedSpace)); } NewBlockFile->Create(m_MaxBlockSize); - NewBlockIndex = NextBlockIndex; - WriteOffset = 0; - AddedSize += WriteOffset; + NewBlockIndex = NextBlockIndex; WriteOffset = 0; TargetFileBuffer = std::make_unique<BasicFileWriter>(NewBlockFile->GetBasicFile(), Min(256u * 1024u, m_MaxBlockSize)); } - WriteOffset = TargetFileBuffer->AlignTo(PayloadAlignment); + const uint64_t OldWriteOffset = WriteOffset; + WriteOffset = TargetFileBuffer->AlignTo(PayloadAlignment); TargetFileBuffer->Write(ChunkView.GetData(), ChunkLocation.Size, WriteOffset); MovedChunks.push_back( @@ -1235,8 +1255,9 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, WriteOffset += ChunkLocation.Size; MovedFromBlock += RoundUp(ChunkLocation.Offset + ChunkLocation.Size, PayloadAlignment) - ChunkLocation.Offset; + uint64_t WrittenBytes = WriteOffset - OldWriteOffset; + AddedSize += WrittenBytes; } - AddedSize += WriteOffset; ZEN_INFO("{}moved {} chunks ({}) from '{}' to new block, freeing {}", LogPrefix, KeepChunkIndexes.size(), diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp index afb7e4bee..c25f762f5 100644 --- a/src/zenstore/buildstore/buildstore.cpp +++ b/src/zenstore/buildstore/buildstore.cpp @@ -177,22 +177,60 @@ BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc) m_Config.SmallBlobBlockStoreAlignement, IsNew); m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20); + + BlockStore::BlockIndexSet KnownBlocks; + for (const BlobEntry& Blob : m_BlobEntries) { - BlockStore::BlockIndexSet KnownBlocks; - for (const BlobEntry& Blob : m_BlobEntries) + if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex) { - if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex) - { - const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex]; - KnownBlocks.insert(Metadata.Location.BlockIndex); - } + const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex]; + KnownBlocks.insert(Metadata.Location.BlockIndex); } - m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks); } + BlockStore::BlockIndexSet MissingBlocks = m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks); m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite); m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite); + if (!MissingBlocks.empty()) + { + std::vector<MetadataDiskEntry> MissingMetadatas; + for (auto& It : m_BlobLookup) + { + const IoHash& BlobHash = It.first; + const BlobIndex ReadBlobIndex = It.second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + if (ReadBlobEntry.Metadata) + { + const MetadataEntry& MetaData = m_MetadataEntries[ReadBlobEntry.Metadata]; + if (MissingBlocks.contains(MetaData.Location.BlockIndex)) + { + MissingMetadatas.push_back( + MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = BlobHash}); + MissingMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone; + m_MetadataEntries[ReadBlobEntry.Metadata] = {}; + m_BlobEntries[ReadBlobIndex].Metadata = {}; + } + } + } + ZEN_ASSERT(!MissingMetadatas.empty()); + + for (const MetadataDiskEntry& Entry : MissingMetadatas) + { + auto It = m_BlobLookup.find(Entry.BlobHash); + ZEN_ASSERT(It != m_BlobLookup.end()); + + const BlobIndex ReadBlobIndex = It->second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + if (!ReadBlobEntry.Payload) + { + m_BlobLookup.erase(It); + } + } + m_MetadatalogFile.Append(MissingMetadatas); + CompactState(); + } + m_Gc.AddGcReferencer(*this); m_Gc.AddGcReferenceLocker(*this); m_Gc.AddGcStorage(this); @@ -256,34 +294,36 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) ZEN_UNUSED(Result); Entry = PayloadEntry(0, PayloadSize); } - m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { - const BlobIndex ExistingBlobIndex = It->second; - BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; - if (Blob.Payload) + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { - m_PayloadEntries[Blob.Payload] = Entry; + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + if (Blob.Payload) + { + m_PayloadEntries[Blob.Payload] = Entry; + } + else + { + Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Entry); + } + Blob.LastAccessTime = GcClock::TickCount(); } else { - Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); m_PayloadEntries.push_back(Entry); - } - Blob.LastAccessTime = GcClock::TickCount(); - } - else - { - PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); - m_PayloadEntries.push_back(Entry); - const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); - // we only remove during GC and compact this then... - m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); - m_BlobLookup.insert({BlobHash, NewBlobIndex}); + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + // we only remove during GC and compact this then... + m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); + m_BlobLookup.insert({BlobHash, NewBlobIndex}); + } } + m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); m_LastAccessTimeUpdateCount++; } @@ -370,7 +410,6 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB const BlockStoreLocation& Location = Locations[LocationIndex]; MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0}; - m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { @@ -396,6 +435,9 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert({BlobHash, NewBlobIndex}); } + + m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); + m_LastAccessTimeUpdateCount++; WriteBlobIndex++; if (m_TrackedCacheKeys) diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 3f1f0e34a..0ee70890c 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -751,6 +751,16 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, ZenCacheDiskLayer::CacheBucket::~CacheBucket() { + try + { + m_SlogFile.Flush(); + m_SlogFile.Close(); + m_BlockStore.Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~CacheBucket() failed with: ", Ex.what()); + } m_Gc.RemoveGcReferencer(*this); } @@ -824,11 +834,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } void -ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const std::function<uint64_t()>& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(uint64_t LogPosition, + bool ResetLog, + const std::function<uint64_t()>& ClaimDiskReserveFunc) { ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot"); - if (m_LogFlushPosition == m_SlogFile.GetLogCount()) + if (m_LogFlushPosition == LogPosition) { return; } @@ -877,7 +889,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const st throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); } - const uint64_t IndexLogPosition = ResetLog ? 0 : m_SlogFile.GetLogCount(); + const uint64_t IndexLogPosition = ResetLog ? 0 : LogPosition; cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount, .LogPosition = IndexLogPosition, @@ -930,12 +942,14 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const st if (IsFile(LogPath)) { + m_SlogFile.Close(); if (!RemoveFile(LogPath, Ec) || Ec) { // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in // the end it will be the same result ZEN_WARN("snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); } + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); } } m_LogFlushPosition = IndexLogPosition; @@ -1149,13 +1163,6 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco } } - if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0) - { - WriteIndexSnapshot(IndexLock, /*Flush log*/ true); - } - - m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_Index) { @@ -1173,7 +1180,53 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco KnownBlocks.insert(BlockIndex); } } - m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); + + bool RemovedEntries = false; + if (!MissingBlocks.empty()) + { + std::vector<DiskIndexEntry> MissingEntries; + + for (auto& It : m_Index) + { + BucketPayload& Payload = m_Payloads[It.second]; + DiskLocation Location = Payload.Location; + if (!Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + if (MissingBlocks.contains(Location.Location.BlockLocation.GetBlockIndex())) + { + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); + } + } + Location.Flags |= DiskLocation::kTombStone; + MissingEntries.push_back(DiskIndexEntry{.Key = It.first, .Location = Location}); + } + + ZEN_ASSERT(!MissingEntries.empty()); + + for (const DiskIndexEntry& Entry : MissingEntries) + { + m_Index.erase(Entry.Key); + } + m_SlogFile.Append(MissingEntries); + m_SlogFile.Flush(); + { + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + std::vector<BucketMetaData> MetaDatas; + std::vector<MemCacheData> MemCachedPayloads; + IndexMap Index; + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index); + } + RemovedEntries = true; + } + + if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0 || RemovedEntries) + { + WriteIndexSnapshot(IndexLock, m_SlogFile.GetLogCount(), /*Flush log*/ true); + } } void @@ -2024,6 +2077,9 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot"); try { + // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock + const uint64_t LogPosition = m_SlogFile.GetLogCount(); + bool UseLegacyScheme = false; IoBuffer Buffer; @@ -2038,7 +2094,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl { RwLock::SharedLockScope IndexLock(m_IndexLock); - WriteIndexSnapshot(IndexLock, /*Flush log*/ false); + WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false); // Note: this copy could be eliminated on shutdown to // reduce memory usage and execution time Index = m_Index; @@ -2078,7 +2134,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl else { RwLock::SharedLockScope IndexLock(m_IndexLock); - WriteIndexSnapshot(IndexLock, /*Flush log*/ false); + WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false); const uint64_t EntryCount = m_Index.size(); Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); uint64_t SidecarSize = ManifestWriter.GetSidecarSize(); @@ -2727,7 +2783,6 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, ZEN_MEMSCOPE(GetCacheDiskTag()); ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation"); DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); - m_SlogFile.Append({.Key = HashKey, .Location = Location}); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (m_TrackedCacheKeys) @@ -2757,6 +2812,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } + m_SlogFile.Append({.Key = HashKey, .Location = Location}); }); } diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 0c9302ec8..2ab5752ff 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -145,6 +145,16 @@ CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get(" CasContainerStrategy::~CasContainerStrategy() { + try + { + m_BlockStore.Close(); + m_CasLog.Flush(); + m_CasLog.Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~CasContainerStrategy failed with: ", Ex.what()); + } m_Gc.RemoveGcReferenceStore(*this); m_Gc.RemoveGcStorage(this); } @@ -204,12 +214,12 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const ZEN_TRACE_CPU("CasContainer::UpdateLocation"); BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; - m_CasLog.Append(IndexEntry); { RwLock::ExclusiveLockScope _(m_LocationMapLock); m_LocationMap.emplace(ChunkHash, m_Locations.size()); m_Locations.push_back(DiskLocation); } + m_CasLog.Append(IndexEntry); }); return CasStore::InsertResult{.New = true}; @@ -273,7 +283,6 @@ CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<c IndexEntries.emplace_back( CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)}); } - m_CasLog.Append(IndexEntries); { RwLock::ExclusiveLockScope _(m_LocationMapLock); for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries) @@ -282,6 +291,7 @@ CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<c m_Locations.push_back(DiskIndexEntry.Location); } } + m_CasLog.Append(IndexEntries); }); return Result; } @@ -746,19 +756,27 @@ public: MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location}); } } - for (size_t ScrubbedIndex : ScrubbedArray) + for (size_t ChunkIndex : ScrubbedArray) { - const IoHash& Key = BlockCompactStateKeys[ScrubbedIndex]; + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key); It != m_CasContainerStrategy.m_LocationMap.end()) { - BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second]; + BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second]; + const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); + if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation) + { + // Someone has moved our chunk so lets just skip the new location we were provided, it will be + // GC:d at a later time + continue; + } MovedEntries.push_back( CasDiskIndexEntry{.Key = Key, .Location = Location, .Flags = CasDiskIndexEntry::kTombstone}); m_CasContainerStrategy.m_LocationMap.erase(It); } } m_CasContainerStrategy.m_CasLog.Append(MovedEntries); + m_CasContainerStrategy.m_CasLog.Flush(); Stats.RemovedDisk += FreedDiskSpace; if (Ctx.IsCancelledFlag.load()) { @@ -984,14 +1002,11 @@ CasContainerStrategy::MakeIndexSnapshot(bool ResetLog) { // Write the current state of the location map to a new index state std::vector<CasDiskIndexEntry> Entries; - uint64_t IndexLogPosition = 0; + // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock + const uint64_t IndexLogPosition = ResetLog ? 0 : m_CasLog.GetLogCount(); { RwLock::SharedLockScope ___(m_LocationMapLock); - if (!ResetLog) - { - IndexLogPosition = m_CasLog.GetLogCount(); - } Entries.resize(m_LocationMap.size()); uint64_t EntryIndex = 0; @@ -1036,12 +1051,14 @@ CasContainerStrategy::MakeIndexSnapshot(bool ResetLog) if (IsFile(LogPath)) { + m_CasLog.Close(); if (!RemoveFile(LogPath, Ec) || Ec) { // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in // the end it will be the same result ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); } + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); } } m_LogFlushPosition = IndexLogPosition; @@ -1154,7 +1171,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } - LogEntryCount = EntryCount - SkipEntryCount; + LogEntryCount = SkipEntryCount; CasLog.Replay( [&](const CasDiskIndexEntry& Record) { LogEntryCount++; @@ -1173,7 +1190,6 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski m_Locations.push_back(Record.Location); }, SkipEntryCount); - return LogEntryCount; } return 0; @@ -1229,8 +1245,6 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) } } - m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_LocationMap) @@ -1240,9 +1254,39 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) KnownBlocks.insert(BlockIndex); } - m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + bool RemovedEntries = false; + if (!MissingBlocks.empty()) + { + std::vector<CasDiskIndexEntry> MissingEntries; + for (auto& It : m_LocationMap) + { + const uint32_t BlockIndex = m_Locations[It.second].GetBlockIndex(); + if (MissingBlocks.contains(BlockIndex)) + { + MissingEntries.push_back({.Key = It.first, .Location = m_Locations[It.second], .Flags = CasDiskIndexEntry::kTombstone}); + } + } + ZEN_ASSERT(!MissingEntries.empty()); + + for (const CasDiskIndexEntry& Entry : MissingEntries) + { + m_LocationMap.erase(Entry.Key); + } + m_CasLog.Append(MissingEntries); + m_CasLog.Flush(); + + { + RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock); + CompactIndex(IndexLock); + } + RemovedEntries = true; + } - if (IsNewStore || (LogEntryCount > 0)) + if (IsNewStore || (LogEntryCount > 0) || RemovedEntries) { MakeIndexSnapshot(/*ResetLog*/ true); } @@ -1612,6 +1656,236 @@ TEST_CASE("compactcas.threadedinsert") } } +TEST_CASE("compactcas.restart") +{ + uint64_t ExpectedSize = 0; + + auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) { + WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + + Latch WorkLatch(1); + tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup; + ChunkHashesLookup.reserve(ChunkCount); + RwLock InsertLock; + for (size_t Offset = 0; Offset < ChunkCount;) + { + size_t BatchCount = Min<size_t>(ChunkCount - Offset, 512u); + WorkLatch.AddCount(1); + ThreadPool.ScheduleWork( + [&WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount, ChunkSize]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + + std::vector<IoBuffer> BatchBlobs; + std::vector<IoHash> BatchHashes; + BatchBlobs.reserve(BatchCount); + BatchHashes.reserve(BatchCount); + + while (BatchBlobs.size() < BatchCount) + { + IoBuffer Chunk = + CreateSemiRandomBlob(ChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377)); + IoHash Hash = IoHash::HashBuffer(Chunk); + { + RwLock::ExclusiveLockScope __(InsertLock); + if (ChunkHashesLookup.contains(Hash)) + { + continue; + } + ChunkHashesLookup.insert(Hash); + ExpectedSize += Chunk.Size(); + } + + BatchBlobs.emplace_back(CompressedBuffer::Compress(SharedBuffer(Chunk)).GetCompressed().Flatten().AsIoBuffer()); + BatchHashes.push_back(Hash); + } + + Cas.InsertChunks(BatchBlobs, BatchHashes); + { + RwLock::ExclusiveLockScope __(InsertLock); + Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); + } + }); + Offset += BatchCount; + } + WorkLatch.CountDown(); + WorkLatch.Wait(); + }; + + ScopedTemporaryDirectory TempDir; + std::filesystem::path CasPath = TempDir.Path(); + CreateDirectories(CasPath); + + bool Generate = false; + if (!Generate) + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + } + + const uint64_t kChunkSize = 1048 + 395; + const size_t kChunkCount = 7167; + + std::vector<IoHash> Hashes; + Hashes.reserve(kChunkCount); + + auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) { + for (const IoHash& Hash : Hashes) + { + if (ShouldExist) + { + CHECK(Cas.HaveChunk(Hash)); + IoBuffer Buffer = Cas.FindChunk(Hash); + CHECK(Buffer); + IoHash ValidateHash; + uint64_t ValidateRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize); + CHECK(Compressed); + CHECK(ValidateHash == Hash); + } + else + { + CHECK(!Cas.HaveChunk(Hash)); + IoBuffer Buffer = Cas.FindChunk(Hash); + CHECK(!Buffer); + } + } + }; + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, true); + GenerateChunks(Cas, kChunkCount, kChunkSize, Hashes); + ValidateChunks(Cas, Hashes, true); + Cas.Flush(); + ValidateChunks(Cas, Hashes, true); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + GenerateChunks(Cas, kChunkCount, kChunkSize / 4, Hashes); + ValidateChunks(Cas, Hashes, true); + } + + class GcRefChecker : public GcReferenceChecker + { + public: + explicit GcRefChecker(std::vector<IoHash>&& HashesToKeep) : m_HashesToKeep(std::move(HashesToKeep)) {} + ~GcRefChecker() {} + std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return "test"; + } + void PreCache(GcCtx& Ctx) override { FilterReferences(Ctx, "test", m_HashesToKeep); } + void UpdateLockedState(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); } + std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override + { + ZEN_UNUSED(Ctx); + return KeepUnusedReferences(m_HashesToKeep, IoCids); + } + + private: + std::vector<IoHash> m_HashesToKeep; + }; + + class GcRef : public GcReferencer + { + public: + GcRef(GcManager& Gc, std::span<const IoHash> HashesToKeep) : m_Gc(Gc) + { + m_HashesToKeep.insert(m_HashesToKeep.begin(), HashesToKeep.begin(), HashesToKeep.end()); + m_Gc.AddGcReferencer(*this); + } + ~GcRef() { m_Gc.RemoveGcReferencer(*this); } + std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return "test"; + } + GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override + { + ZEN_UNUSED(Ctx, Stats); + return nullptr; + } + std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return {new GcRefChecker(std::move(m_HashesToKeep))}; + } + std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return {}; + } + + private: + GcManager& m_Gc; + std::vector<IoHash> m_HashesToKeep; + }; + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + GenerateChunks(Cas, kChunkCount, kChunkSize / 5, Hashes); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + GenerateChunks(Cas, kChunkCount, kChunkSize / 2, Hashes); + ValidateChunks(Cas, Hashes, true); + if (true) + { + std::vector<IoHash> DropHashes; + std::vector<IoHash> KeepHashes; + for (size_t Index = 0; Index < Hashes.size(); Index++) + { + if (Index % 5 == 0) + { + KeepHashes.push_back(Hashes[Index]); + } + else + { + DropHashes.push_back(Hashes[Index]); + } + } + // std::span<const IoHash> KeepHashes(Hashes); + // ZEN_ASSERT(ExpectedGcCount < Hashes.size()); + // KeepHashes = KeepHashes.subspan(ExpectedGcCount); + GcRef Ref(Gc, KeepHashes); + Gc.CollectGarbage(GcSettings{.CollectSmallObjects = true, .IsDeleteMode = true}); + ValidateChunks(Cas, KeepHashes, true); + ValidateChunks(Cas, DropHashes, false); + Hashes = KeepHashes; + } + GenerateChunks(Cas, kChunkCount, kChunkSize / 3, Hashes); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + Cas.Flush(); + ValidateChunks(Cas, Hashes, true); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + } +} + TEST_CASE("compactcas.iteratechunks") { std::atomic<size_t> WorkCompleted = 0; diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 539b5e95b..11a266f1c 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -969,14 +969,11 @@ FileCasStrategy::MakeIndexSnapshot(bool ResetLog) { // Write the current state of the location map to a new index state std::vector<FileCasIndexEntry> Entries; - uint64_t IndexLogPosition = 0; + // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock + const uint64_t IndexLogPosition = ResetLog ? 0 : m_CasLog.GetLogCount(); { RwLock::SharedLockScope __(m_Lock); - if (!ResetLog) - { - IndexLogPosition = m_CasLog.GetLogCount(); - } Entries.resize(m_Index.size()); uint64_t EntryIndex = 0; @@ -1019,12 +1016,14 @@ FileCasStrategy::MakeIndexSnapshot(bool ResetLog) if (IsFile(LogPath)) { + m_CasLog.Close(); if (!RemoveFile(LogPath, Ec) || Ec) { // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in // the end it will be the same result ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); } + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); } } m_LogFlushPosition = IndexLogPosition; diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 8cbcad11b..fce05766f 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -147,10 +147,10 @@ public: typedef tsl::robin_set<uint32_t> BlockIndexSet; - // Ask the store to create empty blocks for all locations that does not have a block // Remove any block that is not referenced - void SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks); - BlockEntryCountMap GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent); + // Return a list of blocks that are not present + [[nodiscard]] BlockIndexSet SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks); + BlockEntryCountMap GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent); void Close(); diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index b67a043df..3cd2d6423 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -409,19 +409,22 @@ public: void SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }); void WriteIndexSnapshot( RwLock::ExclusiveLockScope&, + uint64_t LogPosition, bool ResetLog, const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }) { - WriteIndexSnapshotLocked(ResetLog, ClaimDiskReserveFunc); + WriteIndexSnapshotLocked(LogPosition, ResetLog, ClaimDiskReserveFunc); } void WriteIndexSnapshot( RwLock::SharedLockScope&, + uint64_t LogPosition, bool ResetLog, const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }) { - WriteIndexSnapshotLocked(ResetLog, ClaimDiskReserveFunc); + WriteIndexSnapshotLocked(LogPosition, ResetLog, ClaimDiskReserveFunc); } void WriteIndexSnapshotLocked( + uint64_t LogPosition, bool ResetLog, const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }); |