diff options
| author | Dan Engelbrecht <[email protected]> | 2024-08-06 14:54:33 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-08-06 14:54:33 +0200 |
| commit | 1eaacecbdc540280d4ebd347c9f4b155799e6f89 (patch) | |
| tree | 9ea10ecf4e6c5d8f83f1eb78ced37d2f3622d2d6 /src | |
| parent | validate cbobject before iterating for attachments to avoid crash on malforme... (diff) | |
| download | zen-1eaacecbdc540280d4ebd347c9f4b155799e6f89.tar.xz zen-1eaacecbdc540280d4ebd347c9f4b155799e6f89.zip | |
stop exceptions from leaking on threaded work (#102)
* catch exceptions in threaded work
* don't abort all project file/chunk info fetch for single failure
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 67 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 41 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 56 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 16 |
4 files changed, 115 insertions, 65 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index a542a6581..0f9481210 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -965,7 +965,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, break; } WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&WorkLatch, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() { + OptionalWorkerPool->ScheduleWork([this, &WorkLatch, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); if (Result.load() == false) { @@ -973,14 +973,21 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; - IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); - if (Payload) + try { - if (!AsyncCallback(FileChunkIndex, Payload)) + IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); + if (Payload) { - Result.store(false); + if (!AsyncCallback(FileChunkIndex, Payload)) + { + Result.store(false); + } } } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", FileChunkIndex, FilePath, Ex.what()); + } }); } @@ -2838,20 +2845,27 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, FoundLog->IterateChunks( Ids, [&](size_t Index, const IoBuffer& Payload) { - uint64_t Size = Payload.GetSize(); - if (WantsRawSizeField) + try { - uint64_t RawSize = Size; - if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + uint64_t Size = Payload.GetSize(); + if (WantsRawSizeField) + { + uint64_t RawSize = Size; + if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash __; + (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize); + } + RawSizes[Index] = RawSize; + } + if (WantsSizeField) { - IoHash __; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize); + Sizes[Index] = Size; } - RawSizes[Index] = RawSize; } - if (WantsSizeField) + catch (const std::exception& Ex) { - Sizes[Index] = Size; + ZEN_WARN("Failed getting project file info for id {}. Reason: '{}'", Ids[Index], Ex.what()); } return true; }, @@ -2955,20 +2969,27 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId, (void)FoundLog->IterateChunks( Hashes, [&](size_t Index, const IoBuffer& Chunk) -> bool { - uint64_t Size = Chunk.GetSize(); - if (WantsRawSizeField) + try { - uint64_t RawSize = Size; - if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) + uint64_t Size = Chunk.GetSize(); + if (WantsRawSizeField) + { + uint64_t RawSize = Size; + if (Chunk.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash __; + (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize); + } + RawSizes[Index] = RawSize; + } + if (WantsSizeField) { - IoHash __; - (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize); + Sizes[Index] = Size; } - RawSizes[Index] = RawSize; } - if (WantsSizeField) + catch (const std::exception& Ex) { - Sizes[Index] = Size; + ZEN_WARN("Failed getting chunk info for id {}. Reason: '{}'", Ids[Index], Ex.what()); } return true; }, diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index de6753a6b..15d329442 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -237,22 +237,31 @@ CreateBlock(WorkerThreadPool& WorkerPool, { return; } - ZEN_ASSERT(!Chunks.empty()); - size_t ChunkCount = Chunks.size(); - Stopwatch Timer; - CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope __(SectionsLock); - Blocks[BlockIndex].BlockHash = BlockHash; - } - uint64_t BlockSize = CompressedBlock.GetCompressedSize(); - AsyncOnBlock(std::move(CompressedBlock), BlockHash); - ZEN_INFO("Generated block with {} attachments in {} ({})", - ChunkCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - NiceBytes(BlockSize)); + size_t ChunkCount = Chunks.size(); + try + { + ZEN_ASSERT(ChunkCount > 0); + Stopwatch Timer; + CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); + IoHash BlockHash = CompressedBlock.DecodeRawHash(); + { + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope __(SectionsLock); + Blocks[BlockIndex].BlockHash = BlockHash; + } + uint64_t BlockSize = CompressedBlock.GetCompressedSize(); + AsyncOnBlock(std::move(CompressedBlock), BlockHash); + ZEN_INFO("Generated block with {} attachments in {} ({})", + ChunkCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + NiceBytes(BlockSize)); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount), + Ex.what()); + } }); } diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index dd92483b6..a00b17912 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -336,29 +336,39 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, Latch WorkLatch(1); std::atomic_bool AsyncContinue = true; - bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) { - if (OptionalWorkerPool) - { - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (!AsyncContinue) - { - return; - } - bool Continue = DoOneBlock(ChunkIndexes); - if (!Continue) - { - AsyncContinue.store(false); - } - }); - return AsyncContinue.load(); - } - else - { - return DoOneBlock(ChunkIndexes); - } - }); + bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { + if (OptionalWorkerPool) + { + WorkLatch.AddCount(1); + OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + if (!AsyncContinue) + { + return; + } + try + { + bool Continue = DoOneBlock(ChunkIndexes); + if (!Continue) + { + AsyncContinue.store(false); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", + m_RootDirectory, + BlockIndex, + Ex.what()); + } + }); + return AsyncContinue.load(); + } + else + { + return DoOneBlock(ChunkIndexes); + } + }); WorkLatch.CountDown(); WorkLatch.Wait(); return AsyncContinue.load() && Continue; diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 1c6aa539a..b3cdafd4e 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -852,15 +852,25 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, if (OptionalWorkerPool) { WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&WorkLatch, &ProcessOne, ChunkIndex, &Continue]() { + OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, &Continue]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); if (!Continue) { return; } - if (!ProcessOne(ChunkIndex)) + try { - Continue = false; + if (!ProcessOne(ChunkIndex)) + { + Continue = false; + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", + m_RootDirectory, + ChunkHashes[ChunkIndex], + Ex.what()); } }); } |