diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-30 11:51:05 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-30 11:51:05 +0200 |
| commit | 42aa2c9a8124ada33c88f5c608e4865b1ff84c64 (patch) | |
| tree | 4c237a2e515f73a256ee290e8f9a113f0a2060f2 /src | |
| parent | frequent disk space check (#407) (diff) | |
| download | zen-42aa2c9a8124ada33c88f5c608e4865b1ff84c64.tar.xz zen-42aa2c9a8124ada33c88f5c608e4865b1ff84c64.zip | |
faster oplog validate (#408)
Improvement: Faster oplog validate to reduce GC wall time and disk I/O pressure
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 25 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 47 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 7 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 20 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 5 | ||||
| -rw-r--r-- | src/zenutil/parallelwork.cpp | 2 |
6 files changed, 86 insertions, 20 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 7d22da717..3ec4373a2 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1511,11 +1511,17 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo ValidationResult Result; + const size_t OpCount = OplogCount(); + std::vector<Oid> KeyHashes; std::vector<std::string> Keys; std::vector<std::vector<IoHash>> Attachments; std::vector<OplogEntryMapping> Mappings; + KeyHashes.reserve(OpCount); + Keys.reserve(OpCount); + Mappings.reserve(OpCount); + IterateOplogWithKey([&](uint32_t LSN, const Oid& Key, CbObjectView OpView) { Result.LSNLow = Min(Result.LSNLow, LSN); Result.LSNHigh = Max(Result.LSNHigh, LSN); @@ -1540,20 +1546,22 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo bool HasMissingEntries = false; for (const ChunkMapping& Chunk : Mapping.Chunks) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Hash); !Payload) + if (!m_CidStore.ContainsChunk(Chunk.Hash)) { ResultLock.WithExclusiveLock([&]() { Result.MissingChunks.push_back({KeyHash, Chunk}); }); HasMissingEntries = true; } } + for (const ChunkMapping& Meta : Mapping.Meta) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(Meta.Hash); !Payload) + if (!m_CidStore.ContainsChunk(Meta.Hash)) { ResultLock.WithExclusiveLock([&]() { Result.MissingMetas.push_back({KeyHash, Meta}); }); HasMissingEntries = true; } } + for (const FileMapping& File : Mapping.Files) { if (File.Hash == IoHash::Zero) @@ -1565,24 +1573,23 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo HasMissingEntries = true; } } - else + else if (!m_CidStore.ContainsChunk(File.Hash)) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(File.Hash); !Payload) - { - ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); - HasMissingEntries = true; - } + ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); + HasMissingEntries = true; } } + const std::vector<IoHash>& OpAttachments = Attachments[OpIndex]; for (const IoHash& Attachment : OpAttachments) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(Attachment); !Payload) + if (!m_CidStore.ContainsChunk(Attachment)) { ResultLock.WithExclusiveLock([&]() { Result.MissingAttachments.push_back({KeyHash, Attachment}); }); HasMissingEntries = true; } } + if (HasMissingEntries) { ResultLock.WithExclusiveLock([&]() { Result.OpKeys.push_back({KeyHash, Key}); }); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index e0f371061..86dbcc971 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -153,14 +153,28 @@ void BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset) { ZEN_TRACE_CPU("BlockStoreFile::Write"); +#if ZEN_BUILD_DEBUG + if (uint64_t CachedFileSize = m_CachedFileSize.load(); CachedFileSize > 0) + { + ZEN_ASSERT(FileOffset + Size <= CachedFileSize); + } +#endif // ZEN_BUILD_DEBUG m_File.Write(Data, Size, FileOffset); } void -BlockStoreFile::Flush() +BlockStoreFile::Flush(uint64_t FinalSize) { ZEN_TRACE_CPU("BlockStoreFile::Flush"); m_File.Flush(); + if (FinalSize != (uint64_t)-1) + { + uint64_t ExpectedSize = 0; + if (!m_CachedFileSize.compare_exchange_weak(ExpectedSize, FinalSize)) + { + ZEN_ASSERT(m_CachedFileSize.load() == FinalSize); + } + } } BasicFile& @@ -540,7 +554,7 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons { if (m_WriteBlock) { - m_WriteBlock->Flush(); + m_WriteBlock->Flush(m_CurrentInsertOffset); m_WriteBlock = nullptr; } @@ -674,6 +688,27 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con } } +bool +BlockStore::HasChunk(const BlockStoreLocation& Location) const +{ + ZEN_TRACE_CPU("BlockStore::TryGetChunk"); + RwLock::SharedLockScope InsertLock(m_InsertLock); + if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) + { + if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block) + { + InsertLock.ReleaseNow(); + + const uint64_t BlockSize = Block->FileSize(); + if (Location.Offset + Location.Size <= BlockSize) + { + return true; + } + } + } + return false; +} + IoBuffer BlockStore::TryGetChunk(const BlockStoreLocation& Location) const { @@ -706,7 +741,7 @@ BlockStore::Flush(bool ForceNewBlock) { if (m_WriteBlock) { - m_WriteBlock->Flush(); + m_WriteBlock->Flush(m_CurrentInsertOffset); } m_WriteBlock = nullptr; m_CurrentInsertOffset = 0; @@ -1097,7 +1132,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, if (NewBlockFile) { ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); - NewBlockFile->Flush(); + NewBlockFile->Flush(WriteOffset); uint64_t NewBlockSize = NewBlockFile->FileSize(); MovedSize += NewBlockSize; NewBlockFile = nullptr; @@ -1228,7 +1263,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, if (NewBlockFile) { ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); - NewBlockFile->Flush(); + NewBlockFile->Flush(WriteOffset); uint64_t NewBlockSize = NewBlockFile->FileSize(); MovedSize += NewBlockSize; NewBlockFile = nullptr; @@ -1359,6 +1394,8 @@ TEST_CASE("blockstore.blockfile") CHECK(std::string(Boop) == "boop"); File1.Flush(); CHECK(File1.FileSize() == 10); + File1.Flush(10); + CHECK(File1.FileSize() == 10); } { BlockStoreFile File1(RootDirectory / "1"); diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 15bea272b..730bdf143 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -307,7 +307,12 @@ bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); - return m_LocationMap.contains(ChunkHash); + if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) + { + const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + return m_BlockStore.HasChunk(Location); + } + return false; } void diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 6354edf70..56979f267 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -564,8 +564,24 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); - RwLock::SharedLockScope _(m_Lock); - return m_Index.contains(ChunkHash); + { + RwLock::SharedLockScope _(m_Lock); + if (auto It = m_Index.find(ChunkHash); It == m_Index.end()) + { + return false; + } + } + + ShardingHelper Name(m_RootDirectory, ChunkHash); + const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath(); + RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash)); + + if (IsFile(ChunkPath)) + { + return true; + } + + return false; } void diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 0c72a13aa..b0713fea1 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -94,7 +94,7 @@ struct BlockStoreFile : public RefCounted IoBuffer GetChunk(uint64_t Offset, uint64_t Size); void Read(void* Data, uint64_t Size, uint64_t FileOffset); void Write(const void* Data, uint64_t Size, uint64_t FileOffset); - void Flush(); + void Flush(uint64_t FinalSize = (uint64_t)-1); BasicFile& GetBasicFile(); void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); bool IsOpen() const; @@ -107,7 +107,7 @@ private: const std::filesystem::path m_Path; IoBuffer m_IoBuffer; BasicFile m_File; - uint64_t m_CachedFileSize = 0; + std::atomic<uint64_t> m_CachedFileSize = 0; }; class BlockStoreCompactState; @@ -158,6 +158,7 @@ public: typedef std::function<void(std::span<BlockStoreLocation> Locations)> WriteChunksCallback; void WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback); + bool HasChunk(const BlockStoreLocation& Location) const; IoBuffer TryGetChunk(const BlockStoreLocation& Location) const; void Flush(bool ForceNewBlock); diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index 91591375a..ecacc4b5a 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -85,7 +85,7 @@ ParallelWork::RethrowErrors() { if (m_Errors.size() > 1) { - ZEN_INFO("Multiple exceptions throwm during ParallelWork execution, dropping the following exceptions:"); + ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:"); auto It = m_Errors.begin() + 1; while (It != m_Errors.end()) { |