aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-02-28 12:39:48 +0100
committerGitHub Enterprise <[email protected]>2025-02-28 12:39:48 +0100
commit5791f51cccea1d4e5365456c8da89dbac0dd3ec0 (patch)
tree137412a5e731a4ac33b53b0f0d33b39b29975a03 /src/zenutil
parent5.5.20 (diff)
downloadzen-5791f51cccea1d4e5365456c8da89dbac0dd3ec0.tar.xz
zen-5791f51cccea1d4e5365456c8da89dbac0dd3ec0.zip
improve error handling (#289)
* clearer errors * quicker abort * handle deleted local files * simplify parallellwork error handling * don't finish progress on destructor - gives wrong impression * graceful ctrl-c handling
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/chunkedcontent.cpp13
-rw-r--r--src/zenutil/chunkedfile.cpp11
-rw-r--r--src/zenutil/chunkingcontroller.cpp12
-rw-r--r--src/zenutil/include/zenutil/chunkedfile.h3
-rw-r--r--src/zenutil/include/zenutil/chunkingcontroller.h7
-rw-r--r--src/zenutil/include/zenutil/parallellwork.h48
6 files changed, 79 insertions, 15 deletions
diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp
index a41b71972..6dc2a20d8 100644
--- a/src/zenutil/chunkedcontent.cpp
+++ b/src/zenutil/chunkedcontent.cpp
@@ -92,7 +92,8 @@ namespace {
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& RawHashToSequenceRawHashIndex,
RwLock& Lock,
const std::filesystem::path& FolderPath,
- uint32_t PathIndex)
+ uint32_t PathIndex,
+ std::atomic<bool>& AbortFlag)
{
const uint64_t RawSize = OutChunkedContent.RawSizes[PathIndex];
const std::filesystem::path& Path = OutChunkedContent.Paths[PathIndex];
@@ -105,7 +106,7 @@ namespace {
{
ChunkedInfoWithSource Chunked;
const bool DidChunking =
- InChunkingController.ProcessFile((FolderPath / Path).make_preferred(), RawSize, Chunked, Stats.BytesHashed);
+ InChunkingController.ProcessFile((FolderPath / Path).make_preferred(), RawSize, Chunked, Stats.BytesHashed, AbortFlag);
if (DidChunking)
{
Lock.WithExclusiveLock([&]() {
@@ -753,15 +754,13 @@ ChunkFolderContent(ChunkingStatistics& Stats,
RawHashToSequenceRawHashIndex,
Lock,
RootPath,
- PathIndex);
+ PathIndex,
+ AbortFlag);
Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; });
Stats.FilesProcessed++;
}
},
- [&, PathIndex](const std::exception& Ex, std::atomic<bool>& AbortFlag) {
- ZEN_CONSOLE("Failed scanning file {}. Reason: {}", Result.Paths[PathIndex], Ex.what());
- AbortFlag = true;
- });
+ Work.DefaultErrorFunction());
}
Work.Wait(UpdateInteralMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
diff --git a/src/zenutil/chunkedfile.cpp b/src/zenutil/chunkedfile.cpp
index 3f3a6661c..4f9344039 100644
--- a/src/zenutil/chunkedfile.cpp
+++ b/src/zenutil/chunkedfile.cpp
@@ -112,7 +112,12 @@ Reconstruct(const ChunkedInfo& Info, const std::filesystem::path& TargetPath, st
}
ChunkedInfoWithSource
-ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params, std::atomic<uint64_t>* BytesProcessed)
+ChunkData(BasicFile& RawData,
+ uint64_t Offset,
+ uint64_t Size,
+ ChunkedParams Params,
+ std::atomic<uint64_t>* BytesProcessed,
+ std::atomic<bool>* AbortFlag)
{
ChunkedInfoWithSource Result;
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> FoundChunks;
@@ -129,6 +134,10 @@ ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Para
IoHashStream RawHashStream;
while (Offset < End)
{
+ if (AbortFlag != nullptr && AbortFlag->load())
+ {
+ return {};
+ }
size_t ScanLength = Chunker.ScanChunk(SliceView.GetData(), SliceSize);
if (ScanLength == ZenChunkHelper::kNoBoundaryFound)
{
diff --git a/src/zenutil/chunkingcontroller.cpp b/src/zenutil/chunkingcontroller.cpp
index bc0e57b14..017d12433 100644
--- a/src/zenutil/chunkingcontroller.cpp
+++ b/src/zenutil/chunkingcontroller.cpp
@@ -58,7 +58,8 @@ public:
virtual bool ProcessFile(const std::filesystem::path& InputPath,
uint64_t RawSize,
ChunkedInfoWithSource& OutChunked,
- std::atomic<uint64_t>& BytesProcessed) const override
+ std::atomic<uint64_t>& BytesProcessed,
+ std::atomic<bool>& AbortFlag) const override
{
const bool ExcludeFromChunking =
std::find(m_ChunkExcludeExtensions.begin(), m_ChunkExcludeExtensions.end(), InputPath.extension()) !=
@@ -70,7 +71,7 @@ public:
}
BasicFile Buffer(InputPath, BasicFile::Mode::kRead);
- OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed);
+ OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed, &AbortFlag);
return true;
}
@@ -132,7 +133,8 @@ public:
virtual bool ProcessFile(const std::filesystem::path& InputPath,
uint64_t RawSize,
ChunkedInfoWithSource& OutChunked,
- std::atomic<uint64_t>& BytesProcessed) const override
+ std::atomic<uint64_t>& BytesProcessed,
+ std::atomic<bool>& AbortFlag) const override
{
if (RawSize < m_ChunkFileSizeLimit)
{
@@ -150,6 +152,10 @@ public:
ChunkHashToChunkIndex.reserve(1 + (RawSize / m_FixedChunkingChunkSize));
while (Offset < RawSize)
{
+ if (AbortFlag)
+ {
+ return false;
+ }
uint64_t ChunkSize = std::min<uint64_t>(RawSize - Offset, m_FixedChunkingChunkSize);
IoBuffer Chunk(Source, Offset, ChunkSize);
MemoryView ChunkData = Chunk.GetView();
diff --git a/src/zenutil/include/zenutil/chunkedfile.h b/src/zenutil/include/zenutil/chunkedfile.h
index 7110ad317..4cec80fdb 100644
--- a/src/zenutil/include/zenutil/chunkedfile.h
+++ b/src/zenutil/include/zenutil/chunkedfile.h
@@ -47,7 +47,8 @@ ChunkedInfoWithSource ChunkData(BasicFile& RawData,
uint64_t Offset,
uint64_t Size,
ChunkedParams Params = {},
- std::atomic<uint64_t>* BytesProcessed = nullptr);
+ std::atomic<uint64_t>* BytesProcessed = nullptr,
+ std::atomic<bool>* AbortFlag = nullptr);
void Reconstruct(const ChunkedInfo& Info,
const std::filesystem::path& TargetPath,
std::function<IoBuffer(const IoHash& ChunkHash)> GetChunk);
diff --git a/src/zenutil/include/zenutil/chunkingcontroller.h b/src/zenutil/include/zenutil/chunkingcontroller.h
index fe4fc1bb5..ebc80e207 100644
--- a/src/zenutil/include/zenutil/chunkingcontroller.h
+++ b/src/zenutil/include/zenutil/chunkingcontroller.h
@@ -32,9 +32,10 @@ public:
virtual bool ProcessFile(const std::filesystem::path& InputPath,
uint64_t RawSize,
ChunkedInfoWithSource& OutChunked,
- std::atomic<uint64_t>& BytesProcessed) const = 0;
- virtual std::string_view GetName() const = 0;
- virtual CbObject GetParameters() const = 0;
+ std::atomic<uint64_t>& BytesProcessed,
+ std::atomic<bool>& AbortFlag) const = 0;
+ virtual std::string_view GetName() const = 0;
+ virtual CbObject GetParameters() const = 0;
};
std::unique_ptr<ChunkingController> CreateBasicChunkingController(
diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h
index 7a8218c51..79798fc8d 100644
--- a/src/zenutil/include/zenutil/parallellwork.h
+++ b/src/zenutil/include/zenutil/parallellwork.h
@@ -2,6 +2,8 @@
#pragma once
+#include <zencore/except.h>
+#include <zencore/fmtutils.h>
#include <zencore/thread.h>
#include <zencore/workthreadpool.h>
@@ -20,6 +22,14 @@ public:
ZEN_ASSERT(m_PendingWork.Remaining() == 0);
}
+ std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction()
+ {
+ return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) {
+ m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); });
+ AbortFlag = true;
+ };
+ }
+
void ScheduleWork(WorkerThreadPool& WorkerPool,
std::function<void(std::atomic<bool>& AbortFlag)>&& Work,
std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError)
@@ -32,6 +42,27 @@ public:
{
Work(m_AbortFlag);
}
+ catch (const AssertException& AssertEx)
+ {
+ OnError(
+ std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())),
+ m_AbortFlag);
+ }
+ catch (const std::system_error& SystemError)
+ {
+ if (IsOOM(SystemError.code()))
+ {
+ OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag);
+ }
+ else if (IsOOD(SystemError.code()))
+ {
+ OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag);
+ }
+ else
+ {
+ OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag);
+ }
+ }
catch (const std::exception& Ex)
{
OnError(Ex, m_AbortFlag);
@@ -58,12 +89,29 @@ public:
{
UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining());
}
+ if (m_Errors.size() == 1)
+ {
+ throw std::runtime_error(m_Errors.front());
+ }
+ else if (m_Errors.size() > 1)
+ {
+ ExtendableStringBuilder<128> SB;
+ SB.Append("Multiple errors:");
+ for (const std::string& Error : m_Errors)
+ {
+ SB.Append(fmt::format("\n {}", Error));
+ }
+ throw std::runtime_error(SB.ToString());
+ }
}
Latch& PendingWork() { return m_PendingWork; }
private:
std::atomic<bool>& m_AbortFlag;
Latch m_PendingWork;
+
+ RwLock m_ErrorLock;
+ std::vector<std::string> m_Errors;
};
} // namespace zen