diff options
| -rw-r--r-- | zencore/include/zencore/blockingqueue.h | 73 | ||||
| -rw-r--r-- | zenhttp/iothreadpool.cpp | 6 | ||||
| -rw-r--r-- | zenhttp/iothreadpool.h | 8 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 68 |
4 files changed, 87 insertions, 68 deletions
diff --git a/zencore/include/zencore/blockingqueue.h b/zencore/include/zencore/blockingqueue.h new file mode 100644 index 000000000..fbb4480a1 --- /dev/null +++ b/zencore/include/zencore/blockingqueue.h @@ -0,0 +1,73 @@ +#pragma once + +#include <atomic> +#include <deque> +#include <mutex> + +namespace zen { + +template<typename T> +class BlockingQueue +{ +public: + BlockingQueue() = default; + + ~BlockingQueue() { CompleteAdding(); } + + void Enqueue(T&& Item) + { + { + std::lock_guard Lock(m_Lock); + m_Queue.emplace_back(std::move(Item)); + m_Size++; + } + + m_NewItemSignal.notify_one(); + } + + bool WaitAndDequeue(T& Item) + { + if (m_CompleteAdding.load()) + { + return false; + } + + std::unique_lock Lock(m_Lock); + m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); + + if (!m_Queue.empty()) + { + Item = std::move(m_Queue.front()); + m_Queue.pop_front(); + m_Size--; + + return true; + } + + return false; + } + + void CompleteAdding() + { + if (!m_CompleteAdding.load()) + { + m_CompleteAdding.store(true); + m_NewItemSignal.notify_all(); + } + } + + std::size_t Size() const + { + std::unique_lock Lock(m_Lock); + return m_Queue.size(); + } + +private: + mutable std::mutex m_Lock; + std::condition_variable m_NewItemSignal; + std::deque<T> m_Queue; + std::atomic_bool m_CompleteAdding{false}; + std::atomic_uint32_t m_Size; +}; + +} // namespace zen diff --git a/zenhttp/iothreadpool.cpp b/zenhttp/iothreadpool.cpp index 4f1a6642b..6087e69ec 100644 --- a/zenhttp/iothreadpool.cpp +++ b/zenhttp/iothreadpool.cpp @@ -4,6 +4,8 @@ #include <zencore/except.h> +#if ZEN_PLATFORM_WINDOWS + namespace zen { WinIoThreadPool::WinIoThreadPool(int InThreadCount) @@ -32,6 +34,8 @@ WinIoThreadPool::~WinIoThreadPool() void WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) { + ZEN_ASSERT(!m_ThreadPoolIo); + m_ThreadPoolIo = CreateThreadpoolIo(IoHandle, Callback, Context, &m_CallbackEnvironment); if (!m_ThreadPoolIo) @@ -41,3 +45,5 @@ WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, voi } } // namespace zen + +#endif diff --git a/zenhttp/iothreadpool.h b/zenhttp/iothreadpool.h index 4418b940b..8333964c3 100644 --- a/zenhttp/iothreadpool.h +++ b/zenhttp/iothreadpool.h @@ -2,9 +2,12 @@ #pragma once -#include <zencore/windows.h> +#include <zencore/zencore.h> -#include <system_error> +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> + +# include <system_error> namespace zen { @@ -31,3 +34,4 @@ private: }; } // namespace zen +#endif diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index a183e7ebf..ba5991b3f 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -8,6 +8,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> +#include <zencore/blockingqueue.h> #include <zencore/fmtutils.h> #include <zencore/stats.h> #include <zencore/stream.h> @@ -23,7 +24,6 @@ #include <algorithm> #include <atomic> -#include <deque> #include <thread> #include <unordered_map> @@ -33,70 +33,6 @@ using namespace std::literals; namespace detail { - template<typename T> - class BlockingQueue - { - public: - BlockingQueue() = default; - - ~BlockingQueue() { CompleteAdding(); } - - void Enqueue(T&& Item) - { - { - std::lock_guard Lock(m_Lock); - m_Queue.emplace_back(std::move(Item)); - m_Size++; - } - - m_NewItemSignal.notify_one(); - } - - bool WaitAndDequeue(T& Item) - { - if (m_CompleteAdding.load()) - { - return false; - } - - std::unique_lock Lock(m_Lock); - m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); - - if (!m_Queue.empty()) - { - Item = std::move(m_Queue.front()); - m_Queue.pop_front(); - m_Size--; - - return true; - } - - return false; - } - - void CompleteAdding() - { - if (!m_CompleteAdding.load()) - { - m_CompleteAdding.store(true); - m_NewItemSignal.notify_all(); - } - } - - std::size_t Size() const - { - std::unique_lock Lock(m_Lock); - return m_Queue.size(); - } - - private: - mutable std::mutex m_Lock; - std::condition_variable m_NewItemSignal; - std::deque<T> m_Queue; - std::atomic_bool m_CompleteAdding{false}; - std::atomic_uint32_t m_Size; - }; - class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: @@ -1029,7 +965,7 @@ private: spdlog::logger& Log() { return m_Log; } - using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; + using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>; struct RunState { |