aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-11-17 14:38:13 +0100
committerGitHub <[email protected]>2023-11-17 14:38:13 +0100
commitb0082066596178f7b72d9963bffdec306a5b6250 (patch)
tree83505561e50e8a09237b05bddb579a714344c219 /src
parentuse dynamic port assignment for tests (#545) (diff)
downloadzen-b0082066596178f7b72d9963bffdec306a5b6250.tar.xz
zen-b0082066596178f7b72d9963bffdec306a5b6250.zip
fix named event (#553)
* fix named event timout and test, fix blocking queue
Diffstat (limited to 'src')
-rw-r--r--src/zencore/include/zencore/blockingqueue.h24
-rw-r--r--src/zencore/thread.cpp13
-rw-r--r--src/zencore/workthreadpool.cpp16
3 files changed, 34 insertions, 19 deletions
diff --git a/src/zencore/include/zencore/blockingqueue.h b/src/zencore/include/zencore/blockingqueue.h
index 995ba6bfb..e91fdc659 100644
--- a/src/zencore/include/zencore/blockingqueue.h
+++ b/src/zencore/include/zencore/blockingqueue.h
@@ -30,19 +30,22 @@ public:
bool WaitAndDequeue(T& Item)
{
std::unique_lock Lock(m_Lock);
- if (m_Queue.empty() && !m_CompleteAdding)
+ if (m_Queue.empty())
{
+ if (m_CompleteAdding)
+ {
+ return false;
+ }
m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding; });
+ if (m_Queue.empty())
+ {
+ ZEN_ASSERT(m_CompleteAdding);
+ return false;
+ }
}
-
- if (!m_Queue.empty())
- {
- Item = std::move(m_Queue.front());
- m_Queue.pop_front();
- return true;
- }
-
- return false;
+ Item = std::move(m_Queue.front());
+ m_Queue.pop_front();
+ return true;
}
void CompleteAdding()
@@ -50,7 +53,6 @@ public:
std::unique_lock Lock(m_Lock);
if (!m_CompleteAdding)
{
- ZEN_ASSERT(m_Queue.empty());
m_CompleteAdding = true;
Lock.unlock();
diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp
index a55bc5d69..6092895b0 100644
--- a/src/zencore/thread.cpp
+++ b/src/zencore/thread.cpp
@@ -411,9 +411,10 @@ NamedEvent::Wait(int TimeoutMs)
}
# if defined(_GNU_SOURCE)
+ const int TimeoutSec = TimeoutMs / 1000;
struct timespec TimeoutValue = {
- .tv_sec = TimeoutMs >> 10,
- .tv_nsec = (TimeoutMs & 0x3ff) << 20,
+ .tv_sec = TimeoutSec,
+ .tv_nsec = (TimeoutMs - (TimeoutSec * 1000)) * 1000000,
};
Result = semtimedop(Sem, &SemOp, 1, &TimeoutValue);
# else
@@ -431,7 +432,6 @@ NamedEvent::Wait(int TimeoutMs)
TimeoutMs -= SleepTimeMs;
} while (TimeoutMs > 0);
# endif // _GNU_SOURCE
-
return Result == 0;
#endif
}
@@ -1225,19 +1225,20 @@ TEST_CASE("NamedEvent")
CHECK(!bEventSet);
}
+ NamedEvent ReadyEvent(Name + "_ready");
+
// Thread check
std::thread Waiter = std::thread([Name]() {
NamedEvent ReadyEvent(Name + "_ready");
ReadyEvent.Set();
NamedEvent TestEvent(Name);
- TestEvent.Wait(100);
+ TestEvent.Wait(1000);
});
- NamedEvent ReadyEvent(Name + "_ready");
ReadyEvent.Wait();
- zen::Sleep(50);
+ zen::Sleep(100);
TestEvent.Set();
Waiter.join();
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index 3a4b1e6a1..bdb9de9dc 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -150,7 +150,10 @@ struct WorkerThreadPool::Impl
for (std::thread& Thread : m_WorkerThreads)
{
- Thread.join();
+ if (Thread.joinable())
+ {
+ Thread.join();
+ }
}
m_WorkerThreads.clear();
@@ -219,7 +222,16 @@ WorkerThreadPool::ScheduleWork(Ref<IWork> Work)
}
else
{
- Work->Execute();
+ try
+ {
+ Work->Execute();
+ }
+ catch (std::exception& e)
+ {
+ Work->m_Exception = std::current_exception();
+
+ ZEN_WARN("Caught exception when executing worker synchronously: {}", e.what());
+ }
}
}