diff options
| author | Stefan Boberg <[email protected]> | 2021-10-05 22:25:53 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-10-05 22:25:53 +0200 |
| commit | 20ac7384f8ca558f1fb933eda846604792240ea0 (patch) | |
| tree | e5c95b422b847af50b77807af916e389fcaf83aa /zenserver/upstream/upstreamcache.cpp | |
| parent | stats: Mean returns zero when the count is zero (diff) | |
| download | zen-20ac7384f8ca558f1fb933eda846604792240ea0.tar.xz zen-20ac7384f8ca558f1fb933eda846604792240ea0.zip | |
Merged from upstream
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 68 |
1 files changed, 2 insertions, 66 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index a183e7ebf..ba5991b3f 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -8,6 +8,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> +#include <zencore/blockingqueue.h> #include <zencore/fmtutils.h> #include <zencore/stats.h> #include <zencore/stream.h> @@ -23,7 +24,6 @@ #include <algorithm> #include <atomic> -#include <deque> #include <thread> #include <unordered_map> @@ -33,70 +33,6 @@ using namespace std::literals; namespace detail { - template<typename T> - class BlockingQueue - { - public: - BlockingQueue() = default; - - ~BlockingQueue() { CompleteAdding(); } - - void Enqueue(T&& Item) - { - { - std::lock_guard Lock(m_Lock); - m_Queue.emplace_back(std::move(Item)); - m_Size++; - } - - m_NewItemSignal.notify_one(); - } - - bool WaitAndDequeue(T& Item) - { - if (m_CompleteAdding.load()) - { - return false; - } - - std::unique_lock Lock(m_Lock); - m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); - - if (!m_Queue.empty()) - { - Item = std::move(m_Queue.front()); - m_Queue.pop_front(); - m_Size--; - - return true; - } - - return false; - } - - void CompleteAdding() - { - if (!m_CompleteAdding.load()) - { - m_CompleteAdding.store(true); - m_NewItemSignal.notify_all(); - } - } - - std::size_t Size() const - { - std::unique_lock Lock(m_Lock); - return m_Queue.size(); - } - - private: - mutable std::mutex m_Lock; - std::condition_variable m_NewItemSignal; - std::deque<T> m_Queue; - std::atomic_bool m_CompleteAdding{false}; - std::atomic_uint32_t m_Size; - }; - class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: @@ -1029,7 +965,7 @@ private: spdlog::logger& Log() { return m_Log; } - using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; + using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>; struct RunState { |