aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/include
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-02-28 12:39:48 +0100
committerGitHub Enterprise <[email protected]>2025-02-28 12:39:48 +0100
commit5791f51cccea1d4e5365456c8da89dbac0dd3ec0 (patch)
tree137412a5e731a4ac33b53b0f0d33b39b29975a03 /src/zenutil/include
parent5.5.20 (diff)
downloadzen-5791f51cccea1d4e5365456c8da89dbac0dd3ec0.tar.xz
zen-5791f51cccea1d4e5365456c8da89dbac0dd3ec0.zip
improve error handling (#289)
* clearer errors * quicker abort * handle deleted local files * simplify parallellwork error handling * don't finish progress on destructor - gives wrong impression * graceful ctrl-c handling
Diffstat (limited to 'src/zenutil/include')
-rw-r--r--src/zenutil/include/zenutil/chunkedfile.h3
-rw-r--r--src/zenutil/include/zenutil/chunkingcontroller.h7
-rw-r--r--src/zenutil/include/zenutil/parallellwork.h48
3 files changed, 54 insertions, 4 deletions
diff --git a/src/zenutil/include/zenutil/chunkedfile.h b/src/zenutil/include/zenutil/chunkedfile.h
index 7110ad317..4cec80fdb 100644
--- a/src/zenutil/include/zenutil/chunkedfile.h
+++ b/src/zenutil/include/zenutil/chunkedfile.h
@@ -47,7 +47,8 @@ ChunkedInfoWithSource ChunkData(BasicFile& RawData,
uint64_t Offset,
uint64_t Size,
ChunkedParams Params = {},
- std::atomic<uint64_t>* BytesProcessed = nullptr);
+ std::atomic<uint64_t>* BytesProcessed = nullptr,
+ std::atomic<bool>* AbortFlag = nullptr);
void Reconstruct(const ChunkedInfo& Info,
const std::filesystem::path& TargetPath,
std::function<IoBuffer(const IoHash& ChunkHash)> GetChunk);
diff --git a/src/zenutil/include/zenutil/chunkingcontroller.h b/src/zenutil/include/zenutil/chunkingcontroller.h
index fe4fc1bb5..ebc80e207 100644
--- a/src/zenutil/include/zenutil/chunkingcontroller.h
+++ b/src/zenutil/include/zenutil/chunkingcontroller.h
@@ -32,9 +32,10 @@ public:
virtual bool ProcessFile(const std::filesystem::path& InputPath,
uint64_t RawSize,
ChunkedInfoWithSource& OutChunked,
- std::atomic<uint64_t>& BytesProcessed) const = 0;
- virtual std::string_view GetName() const = 0;
- virtual CbObject GetParameters() const = 0;
+ std::atomic<uint64_t>& BytesProcessed,
+ std::atomic<bool>& AbortFlag) const = 0;
+ virtual std::string_view GetName() const = 0;
+ virtual CbObject GetParameters() const = 0;
};
std::unique_ptr<ChunkingController> CreateBasicChunkingController(
diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h
index 7a8218c51..79798fc8d 100644
--- a/src/zenutil/include/zenutil/parallellwork.h
+++ b/src/zenutil/include/zenutil/parallellwork.h
@@ -2,6 +2,8 @@
#pragma once
+#include <zencore/except.h>
+#include <zencore/fmtutils.h>
#include <zencore/thread.h>
#include <zencore/workthreadpool.h>
@@ -20,6 +22,14 @@ public:
ZEN_ASSERT(m_PendingWork.Remaining() == 0);
}
+ std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction()
+ {
+ return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) {
+ m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); });
+ AbortFlag = true;
+ };
+ }
+
void ScheduleWork(WorkerThreadPool& WorkerPool,
std::function<void(std::atomic<bool>& AbortFlag)>&& Work,
std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError)
@@ -32,6 +42,27 @@ public:
{
Work(m_AbortFlag);
}
+ catch (const AssertException& AssertEx)
+ {
+ OnError(
+ std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())),
+ m_AbortFlag);
+ }
+ catch (const std::system_error& SystemError)
+ {
+ if (IsOOM(SystemError.code()))
+ {
+ OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag);
+ }
+ else if (IsOOD(SystemError.code()))
+ {
+ OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag);
+ }
+ else
+ {
+ OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag);
+ }
+ }
catch (const std::exception& Ex)
{
OnError(Ex, m_AbortFlag);
@@ -58,12 +89,29 @@ public:
{
UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining());
}
+ if (m_Errors.size() == 1)
+ {
+ throw std::runtime_error(m_Errors.front());
+ }
+ else if (m_Errors.size() > 1)
+ {
+ ExtendableStringBuilder<128> SB;
+ SB.Append("Multiple errors:");
+ for (const std::string& Error : m_Errors)
+ {
+ SB.Append(fmt::format("\n {}", Error));
+ }
+ throw std::runtime_error(SB.ToString());
+ }
}
Latch& PendingWork() { return m_PendingWork; }
private:
std::atomic<bool>& m_AbortFlag;
Latch m_PendingWork;
+
+ RwLock m_ErrorLock;
+ std::vector<std::string> m_Errors;
};
} // namespace zen