diff options
| author | Dan Engelbrecht <[email protected]> | 2024-08-06 14:54:33 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-08-06 14:54:33 +0200 |
| commit | 1eaacecbdc540280d4ebd347c9f4b155799e6f89 (patch) | |
| tree | 9ea10ecf4e6c5d8f83f1eb78ced37d2f3622d2d6 | |
| parent | validate cbobject before iterating for attachments to avoid crash on malforme... (diff) | |
| download | zen-1eaacecbdc540280d4ebd347c9f4b155799e6f89.tar.xz zen-1eaacecbdc540280d4ebd347c9f4b155799e6f89.zip | |
stop exceptions from leaking on threaded work (#102)
* catch exceptions in threaded work
* don't abort all project file/chunk info fetch for single failure
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 67 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 41 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 56 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 16 |
5 files changed, 116 insertions, 65 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index b5b10a629..63574cacb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Improvement: Validate data when gathering attachments in GCv2 - Improvement: Add file and size of file when reading a iobuffer into memory via ReadFromFileMaybe - Improvement: Add hardening to gracefully handle malformed oplogs in project store +- Improvement: Catch exceptions in threaded work to avoid uncaught exception errors ## 5.5.3 - Feature: New 'workspaces' service which allows a user to share a local folder via zenserver. A workspace can have mulitple workspace shares and they provie an HTTP API that is compatible with the project oplog HTTP API. Workspaces and shares are preserved between runs. Workspaces feature is disabled by default - enable with `--workspaces-enabled` option when launching zenserver. diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index a542a6581..0f9481210 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -965,7 +965,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, break; } WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&WorkLatch, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() { + OptionalWorkerPool->ScheduleWork([this, &WorkLatch, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); if (Result.load() == false) { @@ -973,14 +973,21 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; - IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); - if (Payload) + try { - if (!AsyncCallback(FileChunkIndex, Payload)) + IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); + if (Payload) { - Result.store(false); + if (!AsyncCallback(FileChunkIndex, Payload)) + { + Result.store(false); + } } } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", FileChunkIndex, FilePath, Ex.what()); + } }); } @@ -2838,20 +2845,27 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, FoundLog->IterateChunks( Ids, [&](size_t Index, const IoBuffer& Payload) { - uint64_t Size = Payload.GetSize(); - if (WantsRawSizeField) + try { - uint64_t RawSize = Size; - if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + uint64_t Size = Payload.GetSize(); + if (WantsRawSizeField) + { + uint64_t RawSize = Size; + if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash __; + (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize); + } + RawSizes[Index] = RawSize; + } + if (WantsSizeField) { - IoHash __; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize); + Sizes[Index] = Size; } - RawSizes[Index] = RawSize; } - if (WantsSizeField) + catch (const std::exception& Ex) { - Sizes[Index] = Size; + ZEN_WARN("Failed getting project file info for id {}. Reason: '{}'", Ids[Index], Ex.what()); } return true; }, @@ -2955,20 +2969,27 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, (void)FoundLog->IterateChunks( Hashes, [&](size_t Index, const IoBuffer& Chunk) -> bool { - uint64_t Size = Chunk.GetSize(); - if (WantsRawSizeField) + try { - uint64_t RawSize = Size; - if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) + uint64_t Size = Chunk.GetSize(); + if (WantsRawSizeField) + { + uint64_t RawSize = Size; + if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash __; + (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize); + } + RawSizes[Index] = RawSize; + } + if (WantsSizeField) { - IoHash __; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize); + Sizes[Index] = Size; } - RawSizes[Index] = RawSize; } - if (WantsSizeField) + catch (const std::exception& Ex) { - Sizes[Index] = Size; + ZEN_WARN("Failed getting chunk info for id {}. Reason: '{}'", Ids[Index], Ex.what()); } return true; }, diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index de6753a6b..15d329442 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -237,22 +237,31 @@ CreateBlock(WorkerThreadPool& WorkerPool, { return; } - ZEN_ASSERT(!Chunks.empty()); - size_t ChunkCount = Chunks.size(); - Stopwatch Timer; - CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope __(SectionsLock); - Blocks[BlockIndex].BlockHash = BlockHash; - } - uint64_t BlockSize = CompressedBlock.GetCompressedSize(); - AsyncOnBlock(std::move(CompressedBlock), BlockHash); - ZEN_INFO("Generated block with {} attachments in {} ({})", - ChunkCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - NiceBytes(BlockSize)); + size_t ChunkCount = Chunks.size(); + try + { + ZEN_ASSERT(ChunkCount > 0); + Stopwatch Timer; + CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); + IoHash BlockHash = CompressedBlock.DecodeRawHash(); + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope __(SectionsLock); + Blocks[BlockIndex].BlockHash = BlockHash; + } + uint64_t BlockSize = CompressedBlock.GetCompressedSize(); + AsyncOnBlock(std::move(CompressedBlock), BlockHash); + ZEN_INFO("Generated block with {} attachments in {} ({})", + ChunkCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + NiceBytes(BlockSize)); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount), + Ex.what()); + } }); } diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index dd92483b6..a00b17912 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -336,29 +336,39 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, Latch WorkLatch(1); std::atomic_bool AsyncContinue = true; - bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { - if (OptionalWorkerPool) - { - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (!AsyncContinue) - { - return; - } - bool Continue = DoOneBlock(ChunkIndexes); - if (!Continue) - { - AsyncContinue.store(false); - } - }); - return AsyncContinue.load(); - } - else - { - return DoOneBlock(ChunkIndexes); - } - }); + bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { + if (OptionalWorkerPool) + { + WorkLatch.AddCount(1); + OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + if (!AsyncContinue) + { + return; + } + try + { + bool Continue = DoOneBlock(ChunkIndexes); + if (!Continue) + { + AsyncContinue.store(false); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", + m_RootDirectory, + BlockIndex, + Ex.what()); + } + }); + return AsyncContinue.load(); + } + else + { + return DoOneBlock(ChunkIndexes); + } + }); WorkLatch.CountDown(); WorkLatch.Wait(); return AsyncContinue.load() && Continue; diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 1c6aa539a..b3cdafd4e 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -852,15 +852,25 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, if (OptionalWorkerPool) { WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&WorkLatch, &ProcessOne, ChunkIndex, &Continue]() { + OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, &Continue]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); if (!Continue) { return; } - if (!ProcessOne(ChunkIndex)) + try { - Continue = false; + if (!ProcessOne(ChunkIndex)) + { + Continue = false; + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", + m_RootDirectory, + ChunkHashes[ChunkIndex], + Ex.what()); } }); } |