diff options
| author | Martin Ridgers <[email protected]> | 2021-11-24 10:32:34 +0100 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-11-24 15:56:26 +0100 |
| commit | 3f15f6652d15bdcaf15575efd682f63895c993e7 (patch) | |
| tree | c258a4f51e97b0ba63c6b180c435a3e614b2f2af /zencore/thread.cpp | |
| parent | Added POSIX's close-on-exec flag to files that Zen opens (diff) | |
| download | zen-3f15f6652d15bdcaf15575efd682f63895c993e7.tar.xz zen-3f15f6652d15bdcaf15575efd682f63895c993e7.zip | |
Reimplemented NamedEvent on Linux using POSIX message queues
Diffstat (limited to 'zencore/thread.cpp')
| -rw-r--r-- | zencore/thread.cpp | 123 |
1 files changed, 62 insertions, 61 deletions
diff --git a/zencore/thread.cpp b/zencore/thread.cpp index 14fb0ed65..68c5cb3e1 100644 --- a/zencore/thread.cpp +++ b/zencore/thread.cpp @@ -16,11 +16,10 @@ # include <condition_variable> # include <mutex> -# include <poll.h> +# include <fcntl.h> +# include <mqueue.h> # include <pthread.h> # include <signal.h> -# include <sys/socket.h> -# include <sys/un.h> # include <time.h> # include <unistd.h> #endif @@ -232,14 +231,17 @@ Event::Wait(int TimeoutMs) ////////////////////////////////////////////////////////////////////////// -#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MACOS -struct NamedEventPosix +#if ZEN_PLATFORM_LINUX +bool IsThereAMessageInQueue(int Fd) { - int SocketFd = 0; - sockaddr_un SocketAddr = {}; - bool bBound = false; -}; -#endif + // Check if there is already a message in the queue. + mq_attr Attributes = { O_NONBLOCK, 1, 1, 0 }; + mq_getattr(Fd, &Attributes); + return (Attributes.mq_curmsgs > 0); +} +#endif // ZEN_PLATFORM_LINUX + + NamedEvent::NamedEvent(std::string_view EventName) { @@ -251,26 +253,29 @@ NamedEvent::NamedEvent(std::string_view EventName) Name << EventName; m_EventHandle = CreateEventA(nullptr, true, false, Name.c_str()); -#else - int SocketFd = socket(AF_UNIX, SOCK_DGRAM, 0); - if (SocketFd < 0) +#elif ZEN_PLATFORM_LINUX + ExtendableStringBuilder<64> Name; + Name << "/"; + Name << EventName; + + mq_attr Attributes = { + 0, // flags + 1, // max message count + 1, // max message size + 0, // current messages + }; + + int Inner = mq_open(Name.c_str(), O_RDWR|O_CREAT|O_CLOEXEC, 0666, &Attributes); + if (Inner < 0) { - ThrowLastError("Failed to create IPC socket"); + ThrowLastError("Failed to get message queue from mq_open()"); } - auto* Inner = new NamedEventPosix(); - Inner->SocketFd = SocketFd; - - char* PathPtr = Inner->SocketAddr.sun_path; - size_t PathLen = sizeof(Inner->SocketAddr.sun_path) - 1; // -1 for null-term -# if ZEN_PLATFORM_LINUX - PathPtr[0] = '\0'; // make the domain socket... - PathPtr += 1; // ...use the abstract namespace - PathLen -= 1; -# endif - EventName.copy(PathPtr, PathLen); + //mq_unlink(Name.c_str()); - m_EventHandle = Inner; + m_EventHandle = (void*)intptr_t(Inner); +#else +# error Implement NamedEvent for this platform #endif } @@ -288,10 +293,9 @@ void NamedEvent::Close() #if ZEN_PLATFORM_WINDOWS CloseHandle(m_EventHandle); -#else - auto* Inner = (NamedEventPosix*)m_EventHandle; - close(Inner->SocketFd); - delete Inner; +#elif ZEN_PLATFORM_LINUX + int Inner = int(intptr_t(m_EventHandle)); + close(Inner); #endif m_EventHandle = nullptr; @@ -302,15 +306,21 @@ void NamedEvent::Set() { #if ZEN_PLATFORM_WINDOWS SetEvent(m_EventHandle); -#else - auto* Inner = (NamedEventPosix*)m_EventHandle; - - uint8_t OneByte = 0x49; - sendto( - Inner->SocketFd, - &OneByte, sizeof(OneByte), - 0, (sockaddr*)&Inner->SocketAddr, sizeof(Inner->SocketAddr) - ); +#elif ZEN_PLATFORM_LINUX + int Inner = int(intptr_t(m_EventHandle)); + + if (IsThereAMessageInQueue(Inner)) + { + return; + } + + char Message = 0x49; + if (mq_send(Inner, &Message, sizeof(Message), 0) != 0) + { + ThrowLastError("Unable to send set message to queue"); + } + + IsThereAMessageInQueue(Inner); #endif } @@ -328,33 +338,24 @@ bool NamedEvent::Wait(int TimeoutMs) } return (Result == WAIT_OBJECT_0); -#else - auto* Inner = (NamedEventPosix*)m_EventHandle; - int SocketFd = Inner->SocketFd; - - int Result; - - if (!Inner->bBound) - { - Result = bind(SocketFd, (sockaddr*)&(Inner->SocketAddr), sizeof(Inner->SocketAddr)); - if (!Result) - { - zen::ThrowLastError("Bind IPC socket failed"); - } - Inner->bBound = true; - } +#elif ZEN_PLATFORM_LINUX + int Inner = int(intptr_t(m_EventHandle)); - pollfd PollFd = { SocketFd, POLLIN }; - Result = poll(&PollFd, 1, TimeoutMs); - if (Result > 0) + if (IsThereAMessageInQueue(Inner)) { - uint8_t OneByte; - Result = recv(SocketFd, &OneByte, sizeof(OneByte), 0); - return true; } - return false; + struct timeval TimeoutValue = { + .tv_sec = 0, + .tv_usec = TimeoutMs << 10, + }; + struct timeval* TimeoutPtr = (TimeoutMs < 0) ? nullptr : &TimeoutValue; + + fd_set FdSet; + FD_ZERO(&FdSet); + FD_SET(Inner, &FdSet); + return select(Inner + 1, &FdSet, nullptr, nullptr, TimeoutPtr) > 0; #endif } |