diff options
| author | Dan Engelbrecht <[email protected]> | 2024-11-25 13:36:53 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-11-25 13:36:53 +0100 |
| commit | e75c5860277681be7b4a18d8d630f76c051719b4 (patch) | |
| tree | 52533fe3c0e60cfe89e08ff5a4be00b215933670 /src/zenserver | |
| parent | add missing projectstore expire time in gc log (#227) (diff) | |
| download | zen-e75c5860277681be7b4a18d8d630f76c051719b4.tar.xz zen-e75c5860277681be7b4a18d8d630f76c051719b4.zip | |
stronger validation of payload existance (#229)
- Don't add RawSize and Size in ProjectStore::GetProjectFiles response if we can't get the payload
- Use validation of payload size/existance in all chunk fetch operations in file cas
- In project store oplog validate, make sure we can reach all the payloads
- Add threading to oplog validate request
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 5 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 147 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 2 |
3 files changed, 110 insertions, 44 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 2954bcdc0..4babcd224 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -14,6 +14,7 @@ #include <zencore/stream.h> #include <zencore/trace.h> #include <zenstore/zenstore.h> +#include <zenutil/workerpools.h> namespace zen { @@ -1164,7 +1165,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) void HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req) { - ZEN_TRACE_CPU("ProjectService::OplogOpNew"); + ZEN_TRACE_CPU("ProjectService::OplogOpValidate"); using namespace std::literals; @@ -1197,7 +1198,7 @@ HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req) ProjectStore::Oplog& Oplog = *FoundLog; std::atomic_bool CancelFlag = false; - ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(CancelFlag); + ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(CancelFlag, &GetSmallWorkerPool(EWorkloadType::Burst)); tsl::robin_map<Oid, std::string, Oid::Hasher> KeyNameLookup; KeyNameLookup.reserve(Result.OpKeys.size()); for (const auto& It : Result.OpKeys) diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 1b48a542c..a43c9e0e2 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1323,7 +1323,7 @@ ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::f } ProjectStore::Oplog::ValidationResult -ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag) +ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool) { using namespace std::literals; @@ -1348,25 +1348,27 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag) }); Result.OpCount = gsl::narrow<uint32_t>(Keys.size()); - for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) - { + + RwLock ResultLock; + + auto ValidateOne = [&](uint32_t OpIndex) { const Oid& KeyHash = KeyHashes[OpIndex]; const std::string& Key = Keys[OpIndex]; const OplogEntryMapping& Mapping(Mappings[OpIndex]); bool HasMissingEntries = false; for (const ChunkMapping& Chunk : Mapping.Chunks) { - if (!m_CidStore.ContainsChunk(Chunk.Hash)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Hash); !Payload) { - Result.MissingChunks.push_back({KeyHash, Chunk}); + ResultLock.WithExclusiveLock([&]() { Result.MissingChunks.push_back({KeyHash, Chunk}); }); HasMissingEntries = true; } } for (const ChunkMapping& Meta : Mapping.Meta) { - if (!m_CidStore.ContainsChunk(Meta.Hash)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(Meta.Hash); !Payload) { - Result.MissingMetas.push_back({KeyHash, Meta}); + ResultLock.WithExclusiveLock([&]() { Result.MissingMetas.push_back({KeyHash, Meta}); }); HasMissingEntries = true; } } @@ -1377,15 +1379,15 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag) std::filesystem::path FilePath = m_OuterProject->RootDir / File.ServerPath; if (!std::filesystem::is_regular_file(FilePath)) { - Result.MissingFiles.push_back({KeyHash, File}); + ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); HasMissingEntries = true; } } else { - if (!m_CidStore.ContainsChunk(File.Hash)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(File.Hash); !Payload) { - Result.MissingFiles.push_back({KeyHash, File}); + ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); HasMissingEntries = true; } } @@ -1393,18 +1395,39 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag) const std::vector<IoHash>& OpAttachments = Attachments[OpIndex]; for (const IoHash& Attachment : OpAttachments) { - if (!m_CidStore.ContainsChunk(Attachment)) + if (IoBuffer Payload = m_CidStore.FindChunkByCid(Attachment); !Payload) { - Result.MissingAttachments.push_back({KeyHash, Attachment}); + ResultLock.WithExclusiveLock([&]() { Result.MissingAttachments.push_back({KeyHash, Attachment}); }); HasMissingEntries = true; } } if (HasMissingEntries) { - Result.OpKeys.push_back({KeyHash, Key}); + ResultLock.WithExclusiveLock([&]() { Result.OpKeys.push_back({KeyHash, Key}); }); + } + }; + + Latch WorkLatch(1); + + for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) + { + if (OptionalWorkerPool) + { + WorkLatch.AddCount(1); + OptionalWorkerPool->ScheduleWork([&, Index = OpIndex]() { + auto _ = MakeGuard([&WorkLatch] { WorkLatch.CountDown(); }); + ValidateOne(Index); + }); + } + else + { + ValidateOne(OpIndex); } } + WorkLatch.CountDown(); + WorkLatch.Wait(); + { // Check if we were deleted while we were checking the references without a lock... RwLock::SharedLockScope _(m_OplogLock); @@ -1963,12 +1986,9 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, try { IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); - if (Payload) + if (!AsyncCallback(FileChunkIndex, Payload)) { - if (!AsyncCallback(FileChunkIndex, Payload)) - { - Result.store(false); - } + Result.store(false); } } catch (const std::exception& Ex) @@ -3922,11 +3942,11 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, { if (WantsSizeField) { - Sizes.resize(Ids.size(), 0u); + Sizes.resize(Ids.size(), (uint64_t)-1); } if (WantsRawSizeField) { - RawSizes.resize(Ids.size(), 0u); + RawSizes.resize(Ids.size(), (uint64_t)-1); } FoundLog->IterateChunks( @@ -3934,20 +3954,35 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, [&](size_t Index, const IoBuffer& Payload) { try { - uint64_t Size = Payload.GetSize(); - if (WantsRawSizeField) + if (Payload) { - uint64_t RawSize = Size; - if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + if (WantsRawSizeField) + { + if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash _; + uint64_t RawSize; + if (!CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSize)) + { + ZEN_WARN("oplog '{}/{}': payload for project file info for id {} is not a valid compressed binary.", + ProjectId, + OplogId, + Ids[Index]); + } + else + { + RawSizes[Index] = RawSize; + } + } + } + if (WantsSizeField) { - IoHash __; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize); + Sizes[Index] = Payload.GetSize(); } - RawSizes[Index] = RawSize; } - if (WantsSizeField) + else { - Sizes[Index] = Size; + ZEN_WARN("oplog '{}/{}': failed getting payload for project file info for id {}.", ProjectId, OplogId, Ids[Index]); } } catch (const std::exception& Ex) @@ -3980,11 +4015,11 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, { Response << "clientpath"sv << ClientPaths[Index]; } - if (WantsSizeField) + if (WantsSizeField && Sizes[Index] != (uint64_t)-1) { Response << "size"sv << Sizes[Index]; } - if (WantsRawSizeField) + if (WantsRawSizeField && RawSizes[Index] != (uint64_t)-1) { Response << "rawsize"sv << RawSizes[Index]; } @@ -4062,20 +4097,50 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, [&](size_t Index, const IoBuffer& Chunk) -> bool { try { - uint64_t Size = Chunk.GetSize(); - if (WantsRawSizeField) + if (Chunk) { - uint64_t RawSize = Size; - if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) + if (WantsRawSizeField) + { + if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t RawSize; + IoHash RawHash; + if (!CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize)) + { + ZEN_WARN("oplog '{}/{}': payload for project file info for id {} is not a valid compressed binary.", + ProjectId, + OplogId, + Ids[Index]); + } + else if (RawHash != Hashes[Index]) + { + ZEN_WARN( + "oplog '{}/{}': payload for project file info for id {} does not match expected raw hash {}, found " + "{}.", + ProjectId, + OplogId, + Ids[Index], + Hashes[Index], + RawHash); + } + else + { + RawSizes[Index] = RawSize; + } + } + else + { + RawSizes[Index] = Chunk.GetSize(); + } + } + if (WantsSizeField) { - IoHash __; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize); + Sizes[Index] = Chunk.GetSize(); } - RawSizes[Index] = RawSize; } - if (WantsSizeField) + else { - Sizes[Index] = Size; + ZEN_WARN("oplog '{}/{}': failed getting payload for chunk for id {}", ProjectId, OplogId, Ids[Index]); } } catch (const std::exception& Ex) @@ -5782,7 +5847,7 @@ public: if (Oplog != nullptr) { - Result = Oplog->Validate(Ctx.IsCancelledFlag); + Result = Oplog->Validate(Ctx.IsCancelledFlag, nullptr); if (Ctx.IsCancelledFlag) { return; diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 1619151dd..d8b13585b 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -212,7 +212,7 @@ public: } }; - ValidationResult Validate(std::atomic_bool& IsCancelledFlag); + ValidationResult Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool); private: struct FileMapEntry |