diff options
| author | Dan Engelbrecht <[email protected]> | 2024-11-25 13:36:53 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-11-25 13:36:53 +0100 |
| commit | e75c5860277681be7b4a18d8d630f76c051719b4 (patch) | |
| tree | 52533fe3c0e60cfe89e08ff5a4be00b215933670 /src/zenstore/filecas.cpp | |
| parent | add missing projectstore expire time in gc log (#227) (diff) | |
| download | zen-e75c5860277681be7b4a18d8d630f76c051719b4.tar.xz zen-e75c5860277681be7b4a18d8d630f76c051719b4.zip | |
stronger validation of payload existance (#229)
- Don't add RawSize and Size in ProjectStore::GetProjectFiles response if we can't get the payload
- Use validation of payload size/existance in all chunk fetch operations in file cas
- In project store oplog validate, make sure we can reach all the payloads
- Add threading to oplog validate request
Diffstat (limited to 'src/zenstore/filecas.cpp')
| -rw-r--r-- | src/zenstore/filecas.cpp | 114 |
1 files changed, 60 insertions, 54 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 62ed44bbb..3cee9f076 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -446,30 +446,11 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: } IoBuffer -FileCasStrategy::FindChunk(const IoHash& ChunkHash) +FileCasStrategy::SafeOpenChunk(const IoHash& ChunkHash, uint64 ExpectedSize) { - ZEN_TRACE_CPU("FileCas::FindChunk"); - - ZEN_ASSERT(m_IsInitialized); - - uint64_t ExpectedSize = 0; - { - RwLock::SharedLockScope _(m_Lock); - if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) - { - ExpectedSize = It->second.Size; - } - else - { - return {}; - } - } - ShardingHelper Name(m_RootDirectory, ChunkHash); const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath(); - - RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash)); - + RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash)); if (IoBuffer Chunk = IoBufferBuilder::MakeFromFile(ChunkPath); Chunk) { uint64 ChunkSize = Chunk.GetSize(); @@ -536,6 +517,29 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) return {}; } +IoBuffer +FileCasStrategy::FindChunk(const IoHash& ChunkHash) +{ + ZEN_TRACE_CPU("FileCas::FindChunk"); + + ZEN_ASSERT(m_IsInitialized); + + uint64_t ExpectedSize = 0; + { + RwLock::SharedLockScope _(m_Lock); + if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) + { + ExpectedSize = It->second.Size; + } + else + { + return {}; + } + } + + return SafeOpenChunk(ChunkHash, ExpectedSize); +} + bool FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { @@ -597,7 +601,8 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool) { - std::vector<size_t> FoundChunkIndexes; + std::vector<size_t> FoundChunkIndexes; + std::vector<uint64_t> FoundChunkExpectedSizes; { RwLock::SharedLockScope _(m_Lock); for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++) @@ -605,28 +610,28 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, if (auto KeyIt = m_Index.find(ChunkHashes[ChunkIndex]); KeyIt != m_Index.end()) { FoundChunkIndexes.push_back(ChunkIndex); + FoundChunkExpectedSizes.push_back(KeyIt->second.Size); } } } std::atomic_bool Continue = true; if (!FoundChunkIndexes.empty()) { - auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) { - ShardingHelper Name(m_RootDirectory, ChunkHashes[ChunkIndex]); - IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.ToPath()); - if (Payload) + auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) { + const IoHash& ChunkHash = ChunkHashes[ChunkIndex]; + IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize); + if (!AsyncCallback(ChunkIndex, std::move(Payload))) { - if (!AsyncCallback(ChunkIndex, std::move(Payload))) - { - return false; - } + return false; } return true; }; Latch WorkLatch(1); - for (size_t ChunkIndex : FoundChunkIndexes) + for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { + size_t ChunkIndex = FoundChunkIndexes[Index]; + uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; if (!Continue) { break; @@ -634,7 +639,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, if (OptionalWorkerPool) { WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, &Continue]() { + OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &Continue]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); if (!Continue) { @@ -642,7 +647,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, } try { - if (!ProcessOne(ChunkIndex)) + if (!ProcessOne(ChunkIndex, ExpectedSize)) { Continue = false; } @@ -658,7 +663,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, } else { - Continue = Continue && ProcessOne(ChunkIndex); + Continue = Continue && ProcessOne(ChunkIndex, ExpectedSize); } } WorkLatch.CountDown(); @@ -674,28 +679,25 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& ZEN_ASSERT(m_IsInitialized); - RwLock::SharedLockScope _(m_Lock); - for (const auto& It : m_Index) + std::vector<IoHash> RawHashes; + std::vector<uint64_t> ExpectedSizes; + { - const IoHash& NameHash = It.first; - ShardingHelper Name(m_RootDirectory, NameHash); - IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.ToPath()); - Callback(NameHash, std::move(Payload)); + RwLock::SharedLockScope _(m_Lock); + RawHashes.reserve(m_Index.size()); + ExpectedSizes.reserve(m_Index.size()); + for (const auto& It : m_Index) + { + RawHashes.push_back(It.first); + ExpectedSizes.push_back(It.second.Size); + } } -} - -void -FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, uint64_t Size)>&& Callback) -{ - ZEN_TRACE_CPU("FileCas::IterateChunks"); - - ZEN_ASSERT(m_IsInitialized); - - RwLock::SharedLockScope _(m_Lock); - for (const auto& It : m_Index) + for (size_t Index = 0; Index < RawHashes.size(); Index++) { - const IoHash& NameHash = It.first; - Callback(NameHash, It.second.Size); + const IoHash& ChunkHash = RawHashes[Index]; + const uint64_t ExpectedSize = ExpectedSizes[Index]; + IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize); + Callback(ChunkHash, std::move(Payload)); } } @@ -1259,7 +1261,11 @@ public: if (Ec) { // Target file may be open for read, attempt to move it to a temp file and mark it delete on close - IoBuffer OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath); + IoBuffer OldChunk; + { + RwLock::SharedLockScope HashLock(m_FileCasStrategy.LockForHash(ChunkHash)); + OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath); + } if (OldChunk) { std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString()); |