diff options
| author | Dan Engelbrecht <[email protected]> | 2024-11-25 13:36:53 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-11-25 13:36:53 +0100 |
| commit | e75c5860277681be7b4a18d8d630f76c051719b4 (patch) | |
| tree | 52533fe3c0e60cfe89e08ff5a4be00b215933670 /src | |
| parent | add missing projectstore expire time in gc log (#227) (diff) | |
| download | zen-e75c5860277681be7b4a18d8d630f76c051719b4.tar.xz zen-e75c5860277681be7b4a18d8d630f76c051719b4.zip | |
stronger validation of payload existance (#229)
- Don't add RawSize and Size in ProjectStore::GetProjectFiles response if we can't get the payload
- Use validation of payload size/existance in all chunk fetch operations in file cas
- In project store oplog validate, make sure we can reach all the payloads
- Add threading to oplog validate request
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 5 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 147 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 2 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 28 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 114 | ||||
| -rw-r--r-- | src/zenstore/filecas.h | 2 |
6 files changed, 197 insertions, 101 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 2954bcdc0..4babcd224 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -14,6 +14,7 @@ #include <zencore/stream.h> #include <zencore/trace.h> #include <zenstore/zenstore.h> +#include <zenutil/workerpools.h> namespace zen { @@ -1164,7 +1165,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) void HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req) { - ZEN_TRACE_CPU("ProjectService::OplogOpNew"); + ZEN_TRACE_CPU("ProjectService::OplogOpValidate"); using namespace std::literals; @@ -1197,7 +1198,7 @@ HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req) ProjectStore::Oplog& Oplog = *FoundLog; std::atomic_bool CancelFlag = false; - ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(CancelFlag); + ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(CancelFlag, &GetSmallWorkerPool(EWorkloadType::Burst)); tsl::robin_map<Oid, std::string, Oid::Hasher> KeyNameLookup; KeyNameLookup.reserve(Result.OpKeys.size()); for (const auto& It : Result.OpKeys) diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 1b48a542c..a43c9e0e2 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1323,7 +1323,7 @@ ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::f } ProjectStore::Oplog::ValidationResult -ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag) +ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool) { using namespace std::literals; @@ -1348,25 +1348,27 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag) }); Result.OpCount = gsl::narrow<uint32_t>(Keys.size()); - for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) - { + + RwLock ResultLock; + + auto ValidateOne = [&](uint32_t OpIndex) { const Oid& KeyHash = KeyHashes[OpIndex]; const std::string& Key = Keys[OpIndex]; const OplogEntryMapping& Mapping(Mappings[OpIndex]); bool HasMissingEntries = false; for (const ChunkMapping& Chunk : Mapping.Chunks) { - if (!m_CidStore.ContainsChunk(Chunk.Hash)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Hash); !Payload) { - Result.MissingChunks.push_back({KeyHash, Chunk}); + ResultLock.WithExclusiveLock([&]() { Result.MissingChunks.push_back({KeyHash, Chunk}); }); HasMissingEntries = true; } } for (const ChunkMapping& Meta : Mapping.Meta) { - if (!m_CidStore.ContainsChunk(Meta.Hash)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(Meta.Hash); !Payload) { - Result.MissingMetas.push_back({KeyHash, Meta}); + ResultLock.WithExclusiveLock([&]() { Result.MissingMetas.push_back({KeyHash, Meta}); }); HasMissingEntries = true; } } @@ -1377,15 +1379,15 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag) std::filesystem::path FilePath = m_OuterProject->RootDir / File.ServerPath; if (!std::filesystem::is_regular_file(FilePath)) { - Result.MissingFiles.push_back({KeyHash, File}); + ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); HasMissingEntries = true; } } else { - if (!m_CidStore.ContainsChunk(File.Hash)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(File.Hash); !Payload) { - Result.MissingFiles.push_back({KeyHash, File}); + ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); HasMissingEntries = true; } } @@ -1393,18 +1395,39 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag) const std::vector<IoHash>& OpAttachments = Attachments[OpIndex]; for (const IoHash& Attachment : OpAttachments) { - if (!m_CidStore.ContainsChunk(Attachment)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(Attachment); !Payload) { - Result.MissingAttachments.push_back({KeyHash, Attachment}); + ResultLock.WithExclusiveLock([&]() { Result.MissingAttachments.push_back({KeyHash, Attachment}); }); HasMissingEntries = true; } } if (HasMissingEntries) { - Result.OpKeys.push_back({KeyHash, Key}); + ResultLock.WithExclusiveLock([&]() { Result.OpKeys.push_back({KeyHash, Key}); }); + } + }; + + Latch WorkLatch(1); + + for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) + { + if (OptionalWorkerPool) + { + WorkLatch.AddCount(1); + OptionalWorkerPool->ScheduleWork([&, Index = OpIndex]() { + auto _ = MakeGuard([&WorkLatch] { WorkLatch.CountDown(); }); + ValidateOne(Index); + }); + } + else + { + ValidateOne(OpIndex); } } + WorkLatch.CountDown(); + WorkLatch.Wait(); + { // Check if we were deleted while we were checking the references without a lock... RwLock::SharedLockScope _(m_OplogLock); @@ -1963,12 +1986,9 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, try { IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); - if (Payload) + if (!AsyncCallback(FileChunkIndex, Payload)) { - if (!AsyncCallback(FileChunkIndex, Payload)) - { - Result.store(false); - } + Result.store(false); } } catch (const std::exception& Ex) @@ -3922,11 +3942,11 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, { if (WantsSizeField) { - Sizes.resize(Ids.size(), 0u); + Sizes.resize(Ids.size(), (uint64_t)-1); } if (WantsRawSizeField) { - RawSizes.resize(Ids.size(), 0u); + RawSizes.resize(Ids.size(), (uint64_t)-1); } FoundLog->IterateChunks( @@ -3934,20 +3954,35 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, [&](size_t Index, const IoBuffer& Payload) { try { - uint64_t Size = Payload.GetSize(); - if (WantsRawSizeField) + if (Payload) { - uint64_t RawSize = Size; - if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + if (WantsRawSizeField) + { + if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash _; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSize)) + { + ZEN_WARN("oplog '{}/{}': payload for project file info for id {} is not a valid compressed binary.", + ProjectId, + OplogId, + Ids[Index]); + } + else + { + RawSizes[Index] = RawSize; + } + } + } + if (WantsSizeField) { - IoHash __; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize); + Sizes[Index] = Payload.GetSize(); } - RawSizes[Index] = RawSize; } - if (WantsSizeField) + else { - Sizes[Index] = Size; + ZEN_WARN("oplog '{}/{}': failed getting payload for project file info for id {}.", ProjectId, OplogId, Ids[Index]); } } catch (const std::exception& Ex) @@ -3980,11 +4015,11 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, { Response << "clientpath"sv << ClientPaths[Index]; } - if (WantsSizeField) + if (WantsSizeField && Sizes[Index] != (uint64_t)-1) { Response << "size"sv << Sizes[Index]; } - if (WantsRawSizeField) + if (WantsRawSizeField && RawSizes[Index] != (uint64_t)-1) { Response << "rawsize"sv << RawSizes[Index]; } @@ -4062,20 +4097,50 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, [&](size_t Index, const IoBuffer& Chunk) -> bool { try { - uint64_t Size = Chunk.GetSize(); - if (WantsRawSizeField) + if (Chunk) { - uint64_t RawSize = Size; - if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) + if (WantsRawSizeField) + { + if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t RawSize; + IoHash RawHash; + if (!CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize)) + { + ZEN_WARN("oplog '{}/{}': payload for project file info for id {} is not a valid compressed binary.", + ProjectId, + OplogId, + Ids[Index]); + } + else if (RawHash != Hashes[Index]) + { + ZEN_WARN( + "oplog '{}/{}': payload for project file info for id {} does not match expected raw hash {}, found " + "{}.", + ProjectId, + OplogId, + Ids[Index], + Hashes[Index], + RawHash); + } + else + { + RawSizes[Index] = RawSize; + } + } + else + { + RawSizes[Index] = Chunk.GetSize(); + } + } + if (WantsSizeField) { - IoHash __; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize); + Sizes[Index] = Chunk.GetSize(); } - RawSizes[Index] = RawSize; } - if (WantsSizeField) + else { - Sizes[Index] = Size; + ZEN_WARN("oplog '{}/{}': failed getting payload for chunk for id {}", ProjectId, OplogId, Ids[Index]); } } catch (const std::exception& Ex) @@ -5782,7 +5847,7 @@ public: if (Oplog != nullptr) { - Result = Oplog->Validate(Ctx.IsCancelledFlag); + Result = Oplog->Validate(Ctx.IsCancelledFlag, nullptr); if (Ctx.IsCancelledFlag) { return; diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 1619151dd..d8b13585b 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -212,7 +212,7 @@ public: } }; - ValidationResult Validate(std::atomic_bool& IsCancelledFlag); + ValidationResult Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool); private: struct FileMapEntry diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index b3309f7a7..bc30301d1 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -307,6 +307,18 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool) { + if (ChunkHashes.size() < 3) + { + for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++) + { + IoBuffer Chunk = FindChunk(ChunkHashes[ChunkIndex]); + if (!AsyncCallback(ChunkIndex, Chunk)) + { + return false; + } + } + return true; + } std::vector<size_t> FoundChunkIndexes; std::vector<BlockStoreLocation> FoundChunkLocations; RwLock::SharedLockScope _(m_LocationMapLock); @@ -326,7 +338,7 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, [&](size_t ChunkIndex, const void* Data, uint64_t Size) { if (Data == nullptr) { - return true; + return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer()); } return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size)); }, @@ -338,7 +350,19 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, Latch WorkLatch(1); std::atomic_bool AsyncContinue = true; bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { - if (OptionalWorkerPool) + if (ChunkIndexes.size() < 3) + { + for (size_t ChunkIndex : ChunkIndexes) + { + IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]); + if (!AsyncCallback(FoundChunkIndexes[ChunkIndex], Chunk)) + { + return false; + } + } + return true; + } + else if (OptionalWorkerPool) { WorkLatch.AddCount(1); OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 62ed44bbb..3cee9f076 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -446,30 +446,11 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: } IoBuffer -FileCasStrategy::FindChunk(const IoHash& ChunkHash) +FileCasStrategy::SafeOpenChunk(const IoHash& ChunkHash, uint64 ExpectedSize) { - ZEN_TRACE_CPU("FileCas::FindChunk"); - - ZEN_ASSERT(m_IsInitialized); - - uint64_t ExpectedSize = 0; - { - RwLock::SharedLockScope _(m_Lock); - if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) - { - ExpectedSize = It->second.Size; - } - else - { - return {}; - } - } - ShardingHelper Name(m_RootDirectory, ChunkHash); const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath(); - - RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash)); - + RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash)); if (IoBuffer Chunk = IoBufferBuilder::MakeFromFile(ChunkPath); Chunk) { uint64 ChunkSize = Chunk.GetSize(); @@ -536,6 +517,29 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) return {}; } +IoBuffer +FileCasStrategy::FindChunk(const IoHash& ChunkHash) +{ + ZEN_TRACE_CPU("FileCas::FindChunk"); + + ZEN_ASSERT(m_IsInitialized); + + uint64_t ExpectedSize = 0; + { + RwLock::SharedLockScope _(m_Lock); + if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) + { + ExpectedSize = It->second.Size; + } + else + { + return {}; + } + } + + return SafeOpenChunk(ChunkHash, ExpectedSize); +} + bool FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { @@ -597,7 +601,8 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback, WorkerThreadPool* OptionalWorkerPool) { - std::vector<size_t> FoundChunkIndexes; + std::vector<size_t> FoundChunkIndexes; + std::vector<uint64_t> FoundChunkExpectedSizes; { RwLock::SharedLockScope _(m_Lock); for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++) @@ -605,28 +610,28 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, if (auto KeyIt = m_Index.find(ChunkHashes[ChunkIndex]); KeyIt != m_Index.end()) { FoundChunkIndexes.push_back(ChunkIndex); + FoundChunkExpectedSizes.push_back(KeyIt->second.Size); } } } std::atomic_bool Continue = true; if (!FoundChunkIndexes.empty()) { - auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) { - ShardingHelper Name(m_RootDirectory, ChunkHashes[ChunkIndex]); - IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.ToPath()); - if (Payload) + auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) { + const IoHash& ChunkHash = ChunkHashes[ChunkIndex]; + IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize); + if (!AsyncCallback(ChunkIndex, std::move(Payload))) { - if (!AsyncCallback(ChunkIndex, std::move(Payload))) - { - return false; - } + return false; } return true; }; Latch WorkLatch(1); - for (size_t ChunkIndex : FoundChunkIndexes) + for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { + size_t ChunkIndex = FoundChunkIndexes[Index]; + uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; if (!Continue) { break; @@ -634,7 +639,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, if (OptionalWorkerPool) { WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, &Continue]() { + OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &Continue]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); if (!Continue) { @@ -642,7 +647,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, } try { - if (!ProcessOne(ChunkIndex)) + if (!ProcessOne(ChunkIndex, ExpectedSize)) { Continue = false; } @@ -658,7 +663,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, } else { - Continue = Continue && ProcessOne(ChunkIndex); + Continue = Continue && ProcessOne(ChunkIndex, ExpectedSize); } } WorkLatch.CountDown(); @@ -674,28 +679,25 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& ZEN_ASSERT(m_IsInitialized); - RwLock::SharedLockScope _(m_Lock); - for (const auto& It : m_Index) + std::vector<IoHash> RawHashes; + std::vector<uint64_t> ExpectedSizes; + { - const IoHash& NameHash = It.first; - ShardingHelper Name(m_RootDirectory, NameHash); - IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.ToPath()); - Callback(NameHash, std::move(Payload)); + RwLock::SharedLockScope _(m_Lock); + RawHashes.reserve(m_Index.size()); + ExpectedSizes.reserve(m_Index.size()); + for (const auto& It : m_Index) + { + RawHashes.push_back(It.first); + ExpectedSizes.push_back(It.second.Size); + } } -} - -void -FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, uint64_t Size)>&& Callback) -{ - ZEN_TRACE_CPU("FileCas::IterateChunks"); - - ZEN_ASSERT(m_IsInitialized); - - RwLock::SharedLockScope _(m_Lock); - for (const auto& It : m_Index) + for (size_t Index = 0; Index < RawHashes.size(); Index++) { - const IoHash& NameHash = It.first; - Callback(NameHash, It.second.Size); + const IoHash& ChunkHash = RawHashes[Index]; + const uint64_t ExpectedSize = ExpectedSizes[Index]; + IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize); + Callback(ChunkHash, std::move(Payload)); } } @@ -1259,7 +1261,11 @@ public: if (Ec) { // Target file may be open for read, attempt to move it to a temp file and mark it delete on close - IoBuffer OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath); + IoBuffer OldChunk; + { + RwLock::SharedLockScope HashLock(m_FileCasStrategy.LockForHash(ChunkHash)); + OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath); + } if (OldChunk) { std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString()); diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h index fb4b1888b..21d8c3b9e 100644 --- a/src/zenstore/filecas.h +++ b/src/zenstore/filecas.h @@ -90,8 +90,8 @@ private: inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } void IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback); - void IterateChunks(std::function<void(const IoHash& Hash, uint64_t PayloadSize)>&& Callback); void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec); + IoBuffer SafeOpenChunk(const IoHash& ChunkHash, uint64_t ExpectedSize); struct ShardingHelper { |