aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-08-06 14:54:33 +0200
committerGitHub Enterprise <[email protected]>2024-08-06 14:54:33 +0200
commit1eaacecbdc540280d4ebd347c9f4b155799e6f89 (patch)
tree9ea10ecf4e6c5d8f83f1eb78ced37d2f3622d2d6
parentvalidate cbobject before iterating for attachments to avoid crash on malforme... (diff)
downloadzen-1eaacecbdc540280d4ebd347c9f4b155799e6f89.tar.xz
zen-1eaacecbdc540280d4ebd347c9f4b155799e6f89.zip
stop exceptions from leaking on threaded work (#102)
* catch exceptions in threaded work * don't abort all project file/chunk info fetch for single failure
-rw-r--r--CHANGELOG.md1
-rw-r--r--src/zenserver/projectstore/projectstore.cpp67
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp41
-rw-r--r--src/zenstore/compactcas.cpp56
-rw-r--r--src/zenstore/filecas.cpp16
5 files changed, 116 insertions, 65 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b5b10a629..63574cacb 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,7 @@
- Improvement: Validate data when gathering attachments in GCv2
- Improvement: Add file and size of file when reading a iobuffer into memory via ReadFromFileMaybe
- Improvement: Add hardening to gracefully handle malformed oplogs in project store
+- Improvement: Catch exceptions in threaded work to avoid uncaught exception errors
## 5.5.3
- Feature: New 'workspaces' service which allows a user to share a local folder via zenserver. A workspace can have mulitple workspace shares and they provie an HTTP API that is compatible with the project oplog HTTP API. Workspaces and shares are preserved between runs. Workspaces feature is disabled by default - enable with `--workspaces-enabled` option when launching zenserver.
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index a542a6581..0f9481210 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -965,7 +965,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
break;
}
WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([&WorkLatch, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() {
+ OptionalWorkerPool->ScheduleWork([this, &WorkLatch, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() {
auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
if (Result.load() == false)
{
@@ -973,14 +973,21 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
}
size_t FileChunkIndex = FileChunkIndexes[ChunkIndex];
const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex];
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
- if (Payload)
+ try
{
- if (!AsyncCallback(FileChunkIndex, Payload))
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
+ if (Payload)
{
- Result.store(false);
+ if (!AsyncCallback(FileChunkIndex, Payload))
+ {
+ Result.store(false);
+ }
}
}
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", FileChunkIndex, FilePath, Ex.what());
+ }
});
}
@@ -2838,20 +2845,27 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
FoundLog->IterateChunks(
Ids,
[&](size_t Index, const IoBuffer& Payload) {
- uint64_t Size = Payload.GetSize();
- if (WantsRawSizeField)
+ try
{
- uint64_t RawSize = Size;
- if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
+ uint64_t Size = Payload.GetSize();
+ if (WantsRawSizeField)
+ {
+ uint64_t RawSize = Size;
+ if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash __;
+ (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize);
+ }
+ RawSizes[Index] = RawSize;
+ }
+ if (WantsSizeField)
{
- IoHash __;
- (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize);
+ Sizes[Index] = Size;
}
- RawSizes[Index] = RawSize;
}
- if (WantsSizeField)
+ catch (const std::exception& Ex)
{
- Sizes[Index] = Size;
+ ZEN_WARN("Failed getting project file info for id {}. Reason: '{}'", Ids[Index], Ex.what());
}
return true;
},
@@ -2955,20 +2969,27 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
(void)FoundLog->IterateChunks(
Hashes,
[&](size_t Index, const IoBuffer& Chunk) -> bool {
- uint64_t Size = Chunk.GetSize();
- if (WantsRawSizeField)
+ try
{
- uint64_t RawSize = Size;
- if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
+ uint64_t Size = Chunk.GetSize();
+ if (WantsRawSizeField)
+ {
+ uint64_t RawSize = Size;
+ if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash __;
+ (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize);
+ }
+ RawSizes[Index] = RawSize;
+ }
+ if (WantsSizeField)
{
- IoHash __;
- (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize);
+ Sizes[Index] = Size;
}
- RawSizes[Index] = RawSize;
}
- if (WantsSizeField)
+ catch (const std::exception& Ex)
{
- Sizes[Index] = Size;
+ ZEN_WARN("Failed getting chunk info for id {}. Reason: '{}'", Ids[Index], Ex.what());
}
return true;
},
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index de6753a6b..15d329442 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -237,22 +237,31 @@ CreateBlock(WorkerThreadPool& WorkerPool,
{
return;
}
- ZEN_ASSERT(!Chunks.empty());
- size_t ChunkCount = Chunks.size();
- Stopwatch Timer;
- CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks));
- IoHash BlockHash = CompressedBlock.DecodeRawHash();
- {
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope __(SectionsLock);
- Blocks[BlockIndex].BlockHash = BlockHash;
- }
- uint64_t BlockSize = CompressedBlock.GetCompressedSize();
- AsyncOnBlock(std::move(CompressedBlock), BlockHash);
- ZEN_INFO("Generated block with {} attachments in {} ({})",
- ChunkCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- NiceBytes(BlockSize));
+ size_t ChunkCount = Chunks.size();
+ try
+ {
+ ZEN_ASSERT(ChunkCount > 0);
+ Stopwatch Timer;
+ CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks));
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope __(SectionsLock);
+ Blocks[BlockIndex].BlockHash = BlockHash;
+ }
+ uint64_t BlockSize = CompressedBlock.GetCompressedSize();
+ AsyncOnBlock(std::move(CompressedBlock), BlockHash);
+ ZEN_INFO("Generated block with {} attachments in {} ({})",
+ ChunkCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ NiceBytes(BlockSize));
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed creating block {} with {} chunks", BlockIndex, ChunkCount),
+ Ex.what());
+ }
});
}
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index dd92483b6..a00b17912 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -336,29 +336,39 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
Latch WorkLatch(1);
std::atomic_bool AsyncContinue = true;
- bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
- if (OptionalWorkerPool)
- {
- WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- if (!AsyncContinue)
- {
- return;
- }
- bool Continue = DoOneBlock(ChunkIndexes);
- if (!Continue)
- {
- AsyncContinue.store(false);
- }
- });
- return AsyncContinue.load();
- }
- else
- {
- return DoOneBlock(ChunkIndexes);
- }
- });
+ bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
+ if (OptionalWorkerPool)
+ {
+ WorkLatch.AddCount(1);
+ OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ if (!AsyncContinue)
+ {
+ return;
+ }
+ try
+ {
+ bool Continue = DoOneBlock(ChunkIndexes);
+ if (!Continue)
+ {
+ AsyncContinue.store(false);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
+ m_RootDirectory,
+ BlockIndex,
+ Ex.what());
+ }
+ });
+ return AsyncContinue.load();
+ }
+ else
+ {
+ return DoOneBlock(ChunkIndexes);
+ }
+ });
WorkLatch.CountDown();
WorkLatch.Wait();
return AsyncContinue.load() && Continue;
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 1c6aa539a..b3cdafd4e 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -852,15 +852,25 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
if (OptionalWorkerPool)
{
WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([&WorkLatch, &ProcessOne, ChunkIndex, &Continue]() {
+ OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, &Continue]() {
auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
if (!Continue)
{
return;
}
- if (!ProcessOne(ChunkIndex))
+ try
{
- Continue = false;
+ if (!ProcessOne(ChunkIndex))
+ {
+ Continue = false;
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'",
+ m_RootDirectory,
+ ChunkHashes[ChunkIndex],
+ Ex.what());
}
});
}