aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-09-20 14:07:55 +0200
committerStefan Boberg <[email protected]>2021-09-20 14:07:55 +0200
commit853f6f371d5eae5692c9bae615aba8964a4781ed (patch)
tree18a2bf8fbef49f2b9ee2b04c5d392296333a72a0
parentclang-format (diff)
downloadzen-853f6f371d5eae5692c9bae615aba8964a4781ed.tar.xz
zen-853f6f371d5eae5692c9bae615aba8964a4781ed.zip
Added mpscqueue (for future use)
-rw-r--r--zencore/include/zencore/mpscqueue.h109
-rw-r--r--zencore/include/zencore/zencore.h4
-rw-r--r--zencore/mpscqueue.cpp22
-rw-r--r--zencore/zencore.cpp2
-rw-r--r--zencore/zencore.vcxproj2
-rw-r--r--zencore/zencore.vcxproj.filters2
6 files changed, 141 insertions, 0 deletions
diff --git a/zencore/include/zencore/mpscqueue.h b/zencore/include/zencore/mpscqueue.h
new file mode 100644
index 000000000..bb558bb5a
--- /dev/null
+++ b/zencore/include/zencore/mpscqueue.h
@@ -0,0 +1,109 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <atomic>
+#include <new>
+#include <optional>
+
+#ifdef __cpp_lib_hardware_interference_size
+using std::hardware_constructive_interference_size;
+using std::hardware_destructive_interference_size;
+#else
+// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
+constexpr std::size_t hardware_constructive_interference_size = 64;
+constexpr std::size_t hardware_destructive_interference_size = 64;
+#endif
+
+namespace zen {
+
+/** An untyped array of data with compile-time alignment and size derived from another type. */
+template<typename ElementType>
+struct TypeCompatibleStorage
+{
+ ElementType* Data() { return (ElementType*)this; }
+ const ElementType* Data() const { return (const ElementType*)this; }
+
+ char alignas(ElementType) DataMember;
+};
+
+/** Fast multi-producer/single-consumer unbounded concurrent queue.
+
+ Based on http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
+ */
+
+template<typename T>
+class MpscQueue final
+{
+public:
+ using ElementType = T;
+
+ MpscQueue()
+ {
+ Node* Sentinel = new Node;
+ Head.store(Sentinel, std::memory_order_relaxed);
+ Tail = Sentinel;
+ }
+
+ ~MpscQueue()
+ {
+ Node* Next = Tail->Next.load(std::memory_order_relaxed);
+
+ // sentinel's value is already destroyed
+ delete Tail;
+
+ while (Next != nullptr)
+ {
+ Tail = Next;
+ Next = Tail->Next.load(std::memory_order_relaxed);
+
+ std::destroy_at((ElementType*)&Tail->Value);
+ delete Tail;
+ }
+ }
+
+ template<typename... ArgTypes>
+ void Enqueue(ArgTypes&&... Args)
+ {
+ Node* New = new Node;
+ new (&New->Value) ElementType(std::forward<ArgTypes>(Args)...);
+
+ Node* Prev = Head.exchange(New, std::memory_order_acq_rel);
+ Prev->Next.store(New, std::memory_order_release);
+ }
+
+ std::optional<ElementType> Dequeue()
+ {
+ Node* Next = Tail->Next.load(std::memory_order_acquire);
+
+ if (Next == nullptr)
+ {
+ return {};
+ }
+
+ ElementType* ValuePtr = (ElementType*)&Next->Value;
+ std::optional<ElementType> Res{std::move(*ValuePtr)};
+ std::destroy_at(ValuePtr);
+
+ delete Tail; // current sentinel
+
+ Tail = Next; // new sentinel
+ return Res;
+ }
+
+private:
+ struct Node
+ {
+ std::atomic<Node*> Next{nullptr};
+ TypeCompatibleStorage<ElementType> Value;
+ };
+
+private:
+ std::atomic<Node*> Head; // accessed only by producers
+ alignas(hardware_constructive_interference_size)
+ Node* Tail; // accessed only by consumer, hence should be on a different cache line than `Head`
+};
+
+void mpscqueue_forcelink();
+
+} // namespace zen
diff --git a/zencore/include/zencore/zencore.h b/zencore/include/zencore/zencore.h
index 4fad0b7a4..2b411af0e 100644
--- a/zencore/include/zencore/zencore.h
+++ b/zencore/include/zencore/zencore.h
@@ -60,6 +60,10 @@
# endif
#endif
+#if ZEN_COMPILER_MSC
+# pragma warning(disable : 4324) // warning C4324: '<type>': structure was padded due to alignment specifier
+#endif
+
//////////////////////////////////////////////////////////////////////////
// Architecture
//
diff --git a/zencore/mpscqueue.cpp b/zencore/mpscqueue.cpp
new file mode 100644
index 000000000..8b89ac31a
--- /dev/null
+++ b/zencore/mpscqueue.cpp
@@ -0,0 +1,22 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zencore/mpscqueue.h>
+
+#include <zencore/testing.h>
+
+namespace zen {
+
+TEST_CASE("mpsc")
+{
+ MpscQueue<std::string> Queue;
+ Queue.Enqueue("hello");
+ std::optional<std::string> Value = Queue.Dequeue();
+ CHECK_EQ(Value, "hello");
+}
+
+void
+mpscqueue_forcelink()
+{
+}
+
+} // namespace zen \ No newline at end of file
diff --git a/zencore/zencore.cpp b/zencore/zencore.cpp
index b025f5b5c..1ea8ceb37 100644
--- a/zencore/zencore.cpp
+++ b/zencore/zencore.cpp
@@ -18,6 +18,7 @@
#include <zencore/compress.h>
#include <zencore/iobuffer.h>
#include <zencore/memory.h>
+#include <zencore/mpscqueue.h>
#include <zencore/refcount.h>
#include <zencore/sha1.h>
#include <zencore/stats.h>
@@ -111,6 +112,7 @@ zencore_forcelinktests()
zen::intmath_forcelink();
zen::iobuffer_forcelink();
zen::memory_forcelink();
+ zen::mpscqueue_forcelink();
zen::refcount_forcelink();
zen::sha1_forcelink();
zen::stats_forcelink();
diff --git a/zencore/zencore.vcxproj b/zencore/zencore.vcxproj
index f0eae411e..2322f7173 100644
--- a/zencore/zencore.vcxproj
+++ b/zencore/zencore.vcxproj
@@ -128,6 +128,7 @@
<ClInclude Include="include\zencore\md5.h" />
<ClInclude Include="include\zencore\memory.h" />
<ClInclude Include="include\zencore\meta.h" />
+ <ClInclude Include="include\zencore\mpscqueue.h" />
<ClInclude Include="include\zencore\postwindows.h" />
<ClInclude Include="include\zencore\prewindows.h" />
<ClInclude Include="include\zencore\refcount.h" />
@@ -167,6 +168,7 @@
<ClCompile Include="logging.cpp" />
<ClCompile Include="md5.cpp" />
<ClCompile Include="memory.cpp" />
+ <ClCompile Include="mpscqueue.cpp" />
<ClCompile Include="refcount.cpp" />
<ClCompile Include="session.cpp" />
<ClCompile Include="sha1.cpp">
diff --git a/zencore/zencore.vcxproj.filters b/zencore/zencore.vcxproj.filters
index b9c69a33f..d2e7a3159 100644
--- a/zencore/zencore.vcxproj.filters
+++ b/zencore/zencore.vcxproj.filters
@@ -43,6 +43,7 @@
<ClInclude Include="include\zencore\session.h" />
<ClInclude Include="include\zencore\testutils.h" />
<ClInclude Include="include\zencore\testing.h" />
+ <ClInclude Include="include\zencore\mpscqueue.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="sha1.cpp" />
@@ -75,6 +76,7 @@
<ClCompile Include="intmath.cpp" />
<ClCompile Include="session.cpp" />
<ClCompile Include="testutils.cpp" />
+ <ClCompile Include="mpscqueue.cpp" />
</ItemGroup>
<ItemGroup>
<Filter Include="CAS">