aboutsummaryrefslogtreecommitdiff
path: root/zencore/thread.cpp
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-11-24 10:32:34 +0100
committerMartin Ridgers <[email protected]>2021-11-24 15:56:26 +0100
commit3f15f6652d15bdcaf15575efd682f63895c993e7 (patch)
treec258a4f51e97b0ba63c6b180c435a3e614b2f2af /zencore/thread.cpp
parentAdded POSIX's close-on-exec flag to files that Zen opens (diff)
downloadzen-3f15f6652d15bdcaf15575efd682f63895c993e7.tar.xz
zen-3f15f6652d15bdcaf15575efd682f63895c993e7.zip
Reimplemented NamedEvent on Linux using POSIX message queues
Diffstat (limited to 'zencore/thread.cpp')
-rw-r--r--zencore/thread.cpp123
1 files changed, 62 insertions, 61 deletions
diff --git a/zencore/thread.cpp b/zencore/thread.cpp
index 14fb0ed65..68c5cb3e1 100644
--- a/zencore/thread.cpp
+++ b/zencore/thread.cpp
@@ -16,11 +16,10 @@
# include <condition_variable>
# include <mutex>
-# include <poll.h>
+# include <fcntl.h>
+# include <mqueue.h>
# include <pthread.h>
# include <signal.h>
-# include <sys/socket.h>
-# include <sys/un.h>
# include <time.h>
# include <unistd.h>
#endif
@@ -232,14 +231,17 @@ Event::Wait(int TimeoutMs)
//////////////////////////////////////////////////////////////////////////
-#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MACOS
-struct NamedEventPosix
+#if ZEN_PLATFORM_LINUX
+bool IsThereAMessageInQueue(int Fd)
{
- int SocketFd = 0;
- sockaddr_un SocketAddr = {};
- bool bBound = false;
-};
-#endif
+ // Check if there is already a message in the queue.
+ mq_attr Attributes = { O_NONBLOCK, 1, 1, 0 };
+ mq_getattr(Fd, &Attributes);
+ return (Attributes.mq_curmsgs > 0);
+}
+#endif // ZEN_PLATFORM_LINUX
+
+
NamedEvent::NamedEvent(std::string_view EventName)
{
@@ -251,26 +253,29 @@ NamedEvent::NamedEvent(std::string_view EventName)
Name << EventName;
m_EventHandle = CreateEventA(nullptr, true, false, Name.c_str());
-#else
- int SocketFd = socket(AF_UNIX, SOCK_DGRAM, 0);
- if (SocketFd < 0)
+#elif ZEN_PLATFORM_LINUX
+ ExtendableStringBuilder<64> Name;
+ Name << "/";
+ Name << EventName;
+
+ mq_attr Attributes = {
+ 0, // flags
+ 1, // max message count
+ 1, // max message size
+ 0, // current messages
+ };
+
+ int Inner = mq_open(Name.c_str(), O_RDWR|O_CREAT|O_CLOEXEC, 0666, &Attributes);
+ if (Inner < 0)
{
- ThrowLastError("Failed to create IPC socket");
+ ThrowLastError("Failed to get message queue from mq_open()");
}
- auto* Inner = new NamedEventPosix();
- Inner->SocketFd = SocketFd;
-
- char* PathPtr = Inner->SocketAddr.sun_path;
- size_t PathLen = sizeof(Inner->SocketAddr.sun_path) - 1; // -1 for null-term
-# if ZEN_PLATFORM_LINUX
- PathPtr[0] = '\0'; // make the domain socket...
- PathPtr += 1; // ...use the abstract namespace
- PathLen -= 1;
-# endif
- EventName.copy(PathPtr, PathLen);
+ //mq_unlink(Name.c_str());
- m_EventHandle = Inner;
+ m_EventHandle = (void*)intptr_t(Inner);
+#else
+# error Implement NamedEvent for this platform
#endif
}
@@ -288,10 +293,9 @@ void NamedEvent::Close()
#if ZEN_PLATFORM_WINDOWS
CloseHandle(m_EventHandle);
-#else
- auto* Inner = (NamedEventPosix*)m_EventHandle;
- close(Inner->SocketFd);
- delete Inner;
+#elif ZEN_PLATFORM_LINUX
+ int Inner = int(intptr_t(m_EventHandle));
+ close(Inner);
#endif
m_EventHandle = nullptr;
@@ -302,15 +306,21 @@ void NamedEvent::Set()
{
#if ZEN_PLATFORM_WINDOWS
SetEvent(m_EventHandle);
-#else
- auto* Inner = (NamedEventPosix*)m_EventHandle;
-
- uint8_t OneByte = 0x49;
- sendto(
- Inner->SocketFd,
- &OneByte, sizeof(OneByte),
- 0, (sockaddr*)&Inner->SocketAddr, sizeof(Inner->SocketAddr)
- );
+#elif ZEN_PLATFORM_LINUX
+ int Inner = int(intptr_t(m_EventHandle));
+
+ if (IsThereAMessageInQueue(Inner))
+ {
+ return;
+ }
+
+ char Message = 0x49;
+ if (mq_send(Inner, &Message, sizeof(Message), 0) != 0)
+ {
+ ThrowLastError("Unable to send set message to queue");
+ }
+
+ IsThereAMessageInQueue(Inner);
#endif
}
@@ -328,33 +338,24 @@ bool NamedEvent::Wait(int TimeoutMs)
}
return (Result == WAIT_OBJECT_0);
-#else
- auto* Inner = (NamedEventPosix*)m_EventHandle;
- int SocketFd = Inner->SocketFd;
-
- int Result;
-
- if (!Inner->bBound)
- {
- Result = bind(SocketFd, (sockaddr*)&(Inner->SocketAddr), sizeof(Inner->SocketAddr));
- if (!Result)
- {
- zen::ThrowLastError("Bind IPC socket failed");
- }
- Inner->bBound = true;
- }
+#elif ZEN_PLATFORM_LINUX
+ int Inner = int(intptr_t(m_EventHandle));
- pollfd PollFd = { SocketFd, POLLIN };
- Result = poll(&PollFd, 1, TimeoutMs);
- if (Result > 0)
+ if (IsThereAMessageInQueue(Inner))
{
- uint8_t OneByte;
- Result = recv(SocketFd, &OneByte, sizeof(OneByte), 0);
-
return true;
}
- return false;
+ struct timeval TimeoutValue = {
+ .tv_sec = 0,
+ .tv_usec = TimeoutMs << 10,
+ };
+ struct timeval* TimeoutPtr = (TimeoutMs < 0) ? nullptr : &TimeoutValue;
+
+ fd_set FdSet;
+ FD_ZERO(&FdSet);
+ FD_SET(Inner, &FdSet);
+ return select(Inner + 1, &FdSet, nullptr, nullptr, TimeoutPtr) > 0;
#endif
}