aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bebenita <[email protected]>2010-08-24 21:06:56 -0700
committerMichael Bebenita <[email protected]>2010-08-24 21:07:14 -0700
commit64ff82ecf98ceb4725f0f51c73e23d1efc2160bc (patch)
tree6a6ae7452066716947c4d262eb72041a6a11cb95
parentComment on env var required for std.dbg to do any logging. (diff)
downloadrust-64ff82ecf98ceb4725f0f51c73e23d1efc2160bc.tar.xz
rust-64ff82ecf98ceb4725f0f51c73e23d1efc2160bc.zip
Implemented an lock free queue based on this paper http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf, the "lock free queue" we had before wasn't lock free at all.
-rw-r--r--src/Makefile15
-rw-r--r--src/rt/rust_dom.cpp12
-rw-r--r--src/rt/rust_dom.h4
-rw-r--r--src/rt/rust_message.h2
-rw-r--r--src/rt/sync/interrupt_transparent_queue.cpp56
-rw-r--r--src/rt/sync/interrupt_transparent_queue.h22
-rw-r--r--src/rt/sync/lock_free_queue.h214
-rw-r--r--src/rt/sync/sync.h5
-rw-r--r--src/test/run-fail/task-comm-14.rs26
-rw-r--r--src/test/run-pass/task-comm-12.rs23
-rw-r--r--src/test/run-pass/task-comm-13-thread.rs18
-rw-r--r--src/test/run-pass/task-comm-13.rs18
-rw-r--r--src/test/run-pass/task-comm-15.rs14
13 files changed, 404 insertions, 25 deletions
diff --git a/src/Makefile b/src/Makefile
index 08699538..3de9b38c 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -34,7 +34,7 @@ DSYMUTIL := true
ifeq ($(CFG_OSTYPE), Linux)
CFG_RUNTIME := librustrt.so
CFG_STDLIB := libstd.so
- CFG_GCC_CFLAGS += -fPIC
+ CFG_GCC_CFLAGS += -fPIC -march=i686
CFG_GCC_LINK_FLAGS += -shared -fPIC -ldl -lpthread -lrt
ifeq ($(CFG_CPUTYPE), x86_64)
CFG_GCC_CFLAGS += -m32
@@ -107,7 +107,7 @@ ifdef CFG_UNIXY
endif
CFG_OBJ_SUFFIX := .o
CFG_EXE_SUFFIX := .exe
- CFG_GCC_CFLAGS :=
+ CFG_GCC_CFLAGS := -march=i686
CFG_GCC_LINK_FLAGS := -shared
ifeq ($(CFG_CPUTYPE), x86_64)
CFG_GCC_CFLAGS += -m32
@@ -248,7 +248,6 @@ BOOT_CMIS := $(BOOT_MLS:.ml=.cmi)
RUNTIME_CS := rt/sync/timer.cpp \
rt/sync/sync.cpp \
rt/sync/spin_lock.cpp \
- rt/sync/lock_free_queue.cpp \
rt/sync/condition_variable.cpp \
rt/rust.cpp \
rt/rust_builtin.cpp \
@@ -286,6 +285,7 @@ RUNTIME_HDR := rt/globals.h \
rt/util/hash_map.h \
rt/sync/sync.h \
rt/sync/timer.h \
+ rt/sync/lock_free_queue.h \
rt/rust_srv.h \
rt/memory_region.h \
rt/memory.h
@@ -394,7 +394,9 @@ TASK_XFAILS := test/run-pass/acyclic-unwind.rs \
test/run-pass/task-life-0.rs \
test/run-pass/task-comm.rs \
test/run-pass/threads.rs \
- test/run-pass/yield.rs
+ test/run-pass/yield.rs \
+ test/run-pass/task-comm-15.rs \
+ test/run-pass/task-life-0.rs
TEST_XFAILS_X86 := $(TASK_XFAILS) \
test/run-pass/child-outlives-parent.rs \
@@ -537,6 +539,11 @@ TEST_XFAILS_LLVM := $(TASK_XFAILS) \
task-comm-9.rs \
task-comm-10.rs \
task-comm-11.rs \
+ task-comm-12.rs \
+ task-comm-13.rs \
+ task-comm-13-thread.rs \
+ task-comm-14.rs \
+ task-comm-15.rs \
task-life-0.rs \
threads.rs \
type-sizes.rs \
diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp
index ce6bebf3..dc42286d 100644
--- a/src/rt/rust_dom.cpp
+++ b/src/rt/rust_dom.cpp
@@ -301,12 +301,14 @@ void rust_dom::send_message(rust_message *message) {
/**
* Drains and processes incoming pending messages.
*/
-void rust_dom::drain_incoming_message_queue() {
+void rust_dom::drain_incoming_message_queue(bool process) {
rust_message *message;
- while ((message = (rust_message *) _incoming_message_queue.dequeue())) {
+ while (_incoming_message_queue.dequeue(&message)) {
log(rust_log::COMM, "<== processing incoming message \"%s\" 0x%"
PRIxPTR, message->label, message);
- message->process();
+ if (process) {
+ message->process();
+ }
message->~rust_message();
this->synchronized_region.free(message);
}
@@ -454,7 +456,7 @@ rust_dom::start_main_loop()
while (n_live_tasks() > 0) {
A(this, is_deadlocked() == false, "deadlock");
- drain_incoming_message_queue();
+ drain_incoming_message_queue(true);
rust_task *scheduled_task = schedule_task();
@@ -519,7 +521,7 @@ rust_dom::start_main_loop()
}
sync::yield();
} else {
- drain_incoming_message_queue();
+ drain_incoming_message_queue(true);
}
reap_dead_tasks();
}
diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h
index 34d8c694..44f56cc4 100644
--- a/src/rt/rust_dom.h
+++ b/src/rt/rust_dom.h
@@ -41,7 +41,7 @@ struct rust_dom
hash_map<rust_port *, rust_proxy<rust_port> *> _port_proxies;
// Incoming messages from other domains.
- lock_free_queue _incoming_message_queue;
+ lock_free_queue<rust_message*> _incoming_message_queue;
#ifndef __WIN32__
pthread_attr_t attr;
@@ -71,7 +71,7 @@ struct rust_dom
void free(void *mem, memory_region::memory_region_type type);
void send_message(rust_message *message);
- void drain_incoming_message_queue();
+ void drain_incoming_message_queue(bool process);
rust_proxy<rust_task> *get_task_proxy(rust_task *task);
void delete_proxies();
rust_proxy<rust_port> *get_port_proxy_synchronized(rust_port *port);
diff --git a/src/rt/rust_message.h b/src/rt/rust_message.h
index 6d986acf..7aee6d9f 100644
--- a/src/rt/rust_message.h
+++ b/src/rt/rust_message.h
@@ -9,7 +9,7 @@
/**
* Abstract base class for all message types.
*/
-class rust_message : public lock_free_queue_node {
+class rust_message {
public:
const char* label;
private:
diff --git a/src/rt/sync/interrupt_transparent_queue.cpp b/src/rt/sync/interrupt_transparent_queue.cpp
new file mode 100644
index 00000000..064b25f1
--- /dev/null
+++ b/src/rt/sync/interrupt_transparent_queue.cpp
@@ -0,0 +1,56 @@
+/*
+ * Interrupt transparent queue, Schoen et. al, "On Interrupt-Transparent
+ * Synchronization in an Embedded Object-Oriented Operating System", 2000.
+ * enqueue() is allowed to interrupt enqueue() and dequeue(), however,
+ * dequeue() is not allowed to interrupt itself.
+ */
+
+#include "../globals.h"
+#include "interrupt_transparent_queue.h"
+
+interrupt_transparent_queue_node::interrupt_transparent_queue_node() :
+ next(NULL) {
+
+}
+
+interrupt_transparent_queue::interrupt_transparent_queue() : _tail(this) {
+
+}
+
+void
+interrupt_transparent_queue::enqueue(interrupt_transparent_queue_node *item) {
+ lock.lock();
+ item->next = (interrupt_transparent_queue_node *) NULL;
+ interrupt_transparent_queue_node *last = _tail;
+ _tail = item;
+ while (last->next) {
+ last = last->next;
+ }
+ last->next = item;
+ lock.unlock();
+}
+
+interrupt_transparent_queue_node *
+interrupt_transparent_queue::dequeue() {
+ lock.lock();
+ interrupt_transparent_queue_node *item = next;
+ if (item && !(next = item->next)) {
+ _tail = (interrupt_transparent_queue_node *) this;
+ if (item->next) {
+ interrupt_transparent_queue_node *lost = item->next;
+ interrupt_transparent_queue_node *help;
+ do {
+ help = lost->next;
+ enqueue(lost);
+ } while ((lost = help) !=
+ (interrupt_transparent_queue_node *) NULL);
+ }
+ }
+ lock.unlock();
+ return item;
+}
+
+bool
+interrupt_transparent_queue::is_empty() {
+ return next == NULL;
+}
diff --git a/src/rt/sync/interrupt_transparent_queue.h b/src/rt/sync/interrupt_transparent_queue.h
new file mode 100644
index 00000000..7c02d0c8
--- /dev/null
+++ b/src/rt/sync/interrupt_transparent_queue.h
@@ -0,0 +1,22 @@
+#ifndef INTERRUPT_TRANSPARENT_QUEUE_H
+#define INTERRUPT_TRANSPARENT_QUEUE_H
+
+#include "spin_lock.h"
+
+class interrupt_transparent_queue_node {
+public:
+ interrupt_transparent_queue_node *next;
+ interrupt_transparent_queue_node();
+};
+
+class interrupt_transparent_queue : interrupt_transparent_queue_node {
+ spin_lock lock;
+ interrupt_transparent_queue_node *_tail;
+public:
+ interrupt_transparent_queue();
+ void enqueue(interrupt_transparent_queue_node *item);
+ interrupt_transparent_queue_node *dequeue();
+ bool is_empty();
+};
+
+#endif /* INTERRUPT_TRANSPARENT_QUEUE_H */
diff --git a/src/rt/sync/lock_free_queue.h b/src/rt/sync/lock_free_queue.h
index c1a309b8..ac0c5b04 100644
--- a/src/rt/sync/lock_free_queue.h
+++ b/src/rt/sync/lock_free_queue.h
@@ -1,22 +1,210 @@
#ifndef LOCK_FREE_QUEUE_H
#define LOCK_FREE_QUEUE_H
-#include "spin_lock.h"
+/**
+ * How and why this lock free queue works:
+ *
+ * Adapted from the paper titled "Simple, Fast, and Practical Non-Blocking
+ * and Blocking Concurrent Queue Algorithms" by Maged M. Michael,
+ * Michael L. Scott.
+ *
+ * Safety Properties:
+ *
+ * 1. The linked list is always connected.
+ * 2. Nodes are only inserted after the last node in the linked list.
+ * 3. Nodes are only deleted from the beginning of the linked list.
+ * 4. Head always points to the first node in the linked list.
+ * 5. Tail always points to a node in the linked list.
+ *
+ *
+ * 1. The linked list is always connected because the next pointer is not set
+ * to null before the node is freed, and no node is freed until deleted
+ * from the linked list.
+ *
+ * 2. Nodes are only inserted at the end of the linked list because they are
+ * linked through the tail pointer which always points to a node in the
+ * linked list (5) and an inserted node is only linked to a node that has
+ * a null next pointer, and the only such node is the last one (1).
+ *
+ * 3. Nodes are deleted from the beginning of the list because they are
+ * deleted only when they are pointed to by head which always points to the
+ * first node (4).
+ *
+ * 4. Head always points to the first node in the list because it only changes
+ * its value to the next node atomically. The new value of head cannot be
+ * null because if there is only one node in the list the dequeue operation
+ * returns without deleting any nodes.
+ *
+ * 5. Tail always points to a node in the linked list because it never lags
+ * behind head, so it can never point to a deleted node. Also, when tail
+ * changes its value it always swings to the next node in the list and it
+ * never tires to change its value if the next pointer is NULL.
+ */
-class lock_free_queue_node {
-public:
- lock_free_queue_node *next;
- lock_free_queue_node();
-};
+#include <assert.h>
+template <class T>
+class lock_free_queue {
+
+ struct node_t;
+ struct pointer_t {
+ node_t *node;
+ uint32_t count;
+ pointer_t() : node(NULL), count(0) {
+ // Nop.
+ }
+ pointer_t(node_t *node, uint32_t count) {
+ this->node = node;
+ this->count = count;
+ }
+ bool equals(pointer_t &other) {
+ return node == other.node && count == other.count;
+ }
+ };
+
+ struct node_t {
+ T value;
+ pointer_t next;
+
+ node_t() {
+ next.node = NULL;
+ next.count = 0;
+ }
+
+ node_t(pointer_t next, T value) {
+ this->next = next;
+ this->value = value;
+ }
+ };
+
+ // Always points to the first node in the list.
+ pointer_t head;
+
+ // Always points to a node in the list, (not necessarily the last).
+ pointer_t tail;
+
+ // Compare and swap counted pointers, we can only do this if pointr_t is
+ // 8 bytes or less since that the maximum size CAS can handle.
+ bool compare_and_swap(pointer_t *address,
+ pointer_t *oldValue,
+ pointer_t newValue) {
+
+ if (sync::compare_and_swap(
+ (uint64_t*) address,
+ *(uint64_t*) oldValue,
+ *(uint64_t*) &newValue)) {
+ return true;
+ }
+ return false;
+ }
-class lock_free_queue : lock_free_queue_node {
- spin_lock lock;
- lock_free_queue_node *_tail;
public:
- lock_free_queue();
- void enqueue(lock_free_queue_node *item);
- lock_free_queue_node *dequeue();
- bool is_empty();
+ lock_free_queue() {
+ // We can only handle 64bit CAS for counted pointers, so this will
+ // not work with 64bit pointers.
+ assert (sizeof(pointer_t) == sizeof(uint64_t));
+
+ // Allocate a dummy node to be used as the first node in the list.
+ node_t *node = new node_t();
+
+ // Head and tail both start out pointing to the dummy node.
+ head.node = node;
+ tail.node = node;
+ }
+
+ virtual ~lock_free_queue() {
+ // Delete dummy node.
+ delete head.node;
+ }
+
+ bool is_empty() {
+ return head.node == tail.node;
+ }
+
+ void enqueue(T value) {
+
+ // Create a new node to be inserted in the linked list, and set the
+ // next node to NULL.
+ node_t *node = new node_t();
+ node->value = value;
+ node->next.node = NULL;
+ pointer_t tail;
+
+ // Keep trying until enqueue is done.
+ while (true) {
+ // Read the current tail which may either point to the last node
+ // or to the second to last node (not sure why second to last,
+ // and not any other node).
+ tail = this->tail;
+
+ // Reads the next node after the tail which will be the last node
+ // if null.
+ pointer_t next;
+ if (tail.node != NULL) {
+ next = tail.node->next;
+ }
+
+ // Loop if another thread changed the tail since we last read it.
+ if (tail.equals(this->tail) == false) {
+ continue;
+ }
+
+ // If next is not pointing to the last node try to swing tail to
+ // the last node and loop.
+ if (next.node != NULL) {
+ compare_and_swap(&this->tail, &tail,
+ pointer_t(next.node, tail.count + 1));
+ continue;
+ }
+
+ // Try to link node at the end of the linked list.
+ if (compare_and_swap(&tail.node->next, &next,
+ pointer_t(node, next.count + 1))) {
+ // Enqueueing is done.
+ break;
+ }
+ }
+
+ // Enqueue is done, try to swing tail to the inserted node.
+ compare_and_swap(&this->tail, &tail,
+ pointer_t(node, tail.count + 1));
+ }
+
+ bool dequeue(T *value) {
+ pointer_t head;
+
+ // Keep trying until dequeue is done.
+ while(true) {
+ head = this->head;
+ pointer_t tail = this->tail;
+ pointer_t next = head.node->next;
+
+ if (head.equals(this->head) == false) {
+ continue;
+ }
+
+ // If queue is empty, or if tail is falling behind.
+ if (head.node == tail.node) {
+ // If queue is empty.
+ if (next.node == NULL) {
+ return false;
+ }
+ // Tail is falling behind, advance it.
+ compare_and_swap(&this->tail,
+ &tail,
+ pointer_t(next.node, tail.count + 1));
+ } else {
+ // Read value before CAS, otherwise another
+ // dequeue might advance it.
+ *value = next.node->value;
+ if (compare_and_swap(&this->head, &head,
+ pointer_t(next.node, head.count + 1))) {
+ break;
+ }
+ }
+ }
+ delete head.node;
+ return true;
+ }
};
#endif /* LOCK_FREE_QUEUE_H */
diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h
index 902b5661..8c9a13f0 100644
--- a/src/rt/sync/sync.h
+++ b/src/rt/sync/sync.h
@@ -4,6 +4,11 @@
class sync {
public:
static void yield();
+ template <class T>
+ static bool compare_and_swap(T *address,
+ T oldValue, T newValue) {
+ return __sync_bool_compare_and_swap(address, oldValue, newValue);
+ }
};
#endif /* SYNC_H */
diff --git a/src/test/run-fail/task-comm-14.rs b/src/test/run-fail/task-comm-14.rs
new file mode 100644
index 00000000..f5fa27ac
--- /dev/null
+++ b/src/test/run-fail/task-comm-14.rs
@@ -0,0 +1,26 @@
+io fn main() {
+ let port[int] po = port();
+
+ // Spawn 10 tasks each sending us back one int.
+ let int i = 10;
+ while (i > 0) {
+ log i;
+ spawn "child" child(i, chan(po));
+ i = i - 1;
+ }
+
+ i = 10;
+ let int value = 0;
+ while (i > 0) {
+ log i;
+ value <- po;
+ i = i - 1;
+ }
+
+ log "main thread exiting";
+}
+
+io fn child(int x, chan[int] ch) {
+ log x;
+ ch <| x;
+}
diff --git a/src/test/run-pass/task-comm-12.rs b/src/test/run-pass/task-comm-12.rs
new file mode 100644
index 00000000..ab7c25e8
--- /dev/null
+++ b/src/test/run-pass/task-comm-12.rs
@@ -0,0 +1,23 @@
+use std;
+import std._task;
+
+fn main() -> () {
+ test00();
+}
+
+fn start(int task_number) {
+ log "Started / Finished Task.";
+}
+
+fn test00() {
+ let int i = 0;
+ let task t = spawn thread start(i);
+
+ // Sleep long enough for the task to finish.
+ _task.sleep(10000u);
+
+ // Try joining tasks that have already finished.
+ join t;
+
+ log "Joined Task.";
+} \ No newline at end of file
diff --git a/src/test/run-pass/task-comm-13-thread.rs b/src/test/run-pass/task-comm-13-thread.rs
new file mode 100644
index 00000000..0dab20ed
--- /dev/null
+++ b/src/test/run-pass/task-comm-13-thread.rs
@@ -0,0 +1,18 @@
+use std;
+import std._task;
+
+io fn start(chan[int] c, int start, int number_of_messages) {
+ let int i = 0;
+ while (i < number_of_messages) {
+ c <| start + i;
+ i += 1;
+ }
+}
+
+fn main() -> () {
+ log "Check that we don't deadlock.";
+ let port[int] p = port();
+ let task a = spawn thread "start" start(chan(p), 0, 10);
+ join a;
+ log "Joined Task";
+} \ No newline at end of file
diff --git a/src/test/run-pass/task-comm-13.rs b/src/test/run-pass/task-comm-13.rs
new file mode 100644
index 00000000..97bdcb6a
--- /dev/null
+++ b/src/test/run-pass/task-comm-13.rs
@@ -0,0 +1,18 @@
+use std;
+import std._task;
+
+io fn start(chan[int] c, int start, int number_of_messages) {
+ let int i = 0;
+ while (i < number_of_messages) {
+ c <| start + i;
+ i += 1;
+ }
+}
+
+fn main() -> () {
+ log "Check that we don't deadlock.";
+ let port[int] p = port();
+ let task a = spawn "start" start(chan(p), 0, 10);
+ join a;
+ log "Joined Task";
+} \ No newline at end of file
diff --git a/src/test/run-pass/task-comm-15.rs b/src/test/run-pass/task-comm-15.rs
new file mode 100644
index 00000000..8d748f59
--- /dev/null
+++ b/src/test/run-pass/task-comm-15.rs
@@ -0,0 +1,14 @@
+io fn start(chan[int] c, int n) {
+ let int i = n;
+
+ while(i > 0) {
+ c <| 0;
+ i = i - 1;
+ }
+}
+
+io fn main() {
+ let port[int] p = port();
+ auto child = spawn thread "child" start(chan(p), 10);
+ auto c <- p;
+} \ No newline at end of file