aboutsummaryrefslogtreecommitdiff
path: root/src/rt
diff options
context:
space:
mode:
authorMichael Bebenita <[email protected]>2010-07-19 14:05:18 -0700
committerMichael Bebenita <[email protected]>2010-07-19 14:05:18 -0700
commit00d1465d13980fc3acf650f182ee0723fbda0e06 (patch)
treea73cf5f0f20c0bee6722b33d975eb930919fefdf /src/rt
parentAdd a test for an obvious-seeming (but not actually legal) kind of cast attem... (diff)
downloadrust-00d1465d13980fc3acf650f182ee0723fbda0e06.tar.xz
rust-00d1465d13980fc3acf650f182ee0723fbda0e06.zip
Added a message passing system based on lock free queues for inter-thread communication. Channels now buffer on the sending side, and no longer require blocking when sending. Lots of other refactoring and bug fixes.
Diffstat (limited to 'src/rt')
-rw-r--r--src/rt/circular_buffer.cpp118
-rw-r--r--src/rt/circular_buffer.h30
-rw-r--r--src/rt/globals.h33
-rw-r--r--src/rt/rust.cpp49
-rw-r--r--src/rt/rust_builtin.cpp4
-rw-r--r--src/rt/rust_chan.cpp54
-rw-r--r--src/rt/rust_chan.h4
-rw-r--r--src/rt/rust_comm.cpp119
-rw-r--r--src/rt/rust_crate_cache.cpp6
-rw-r--r--src/rt/rust_dom.cpp175
-rw-r--r--src/rt/rust_dom.h92
-rw-r--r--src/rt/rust_internal.h223
-rw-r--r--src/rt/rust_log.cpp177
-rw-r--r--src/rt/rust_log.h33
-rw-r--r--src/rt/rust_proxy.h31
-rw-r--r--src/rt/rust_task.cpp57
-rw-r--r--src/rt/rust_task.h107
-rw-r--r--src/rt/rust_timer.cpp33
-rw-r--r--src/rt/rust_upcall.cpp600
-rw-r--r--src/rt/rust_util.h4
-rw-r--r--src/rt/sync/condition_variable.cpp66
-rw-r--r--src/rt/sync/condition_variable.h19
-rw-r--r--src/rt/sync/lock_free_queue.cpp33
-rw-r--r--src/rt/sync/lock_free_queue.h6
-rw-r--r--src/rt/sync/spin_lock.cpp5
-rw-r--r--src/rt/sync/spin_lock.h6
-rw-r--r--src/rt/util/array_list.h28
27 files changed, 1290 insertions, 822 deletions
diff --git a/src/rt/circular_buffer.cpp b/src/rt/circular_buffer.cpp
new file mode 100644
index 00000000..0e1979c1
--- /dev/null
+++ b/src/rt/circular_buffer.cpp
@@ -0,0 +1,118 @@
+/*
+ * A simple resizable circular buffer.
+ */
+
+#include "rust_internal.h"
+
+circular_buffer::circular_buffer(rust_dom *dom, size_t unit_sz) :
+ dom(dom),
+ _buffer_sz(INITIAL_CIRCULAR_BUFFFER_SIZE_IN_UNITS * unit_sz),
+ unit_sz(unit_sz),
+ _next(0),
+ _unread(0),
+ _buffer((uint8_t *)dom->calloc(_buffer_sz)) {
+
+ A(dom, unit_sz, "Unit size must be larger than zero.");
+
+ dom->log(rust_log::MEM | rust_log::COMM,
+ "new circular_buffer(buffer_sz=%d, unread=%d)"
+ "-> circular_buffer=0x%" PRIxPTR,
+ _buffer_sz, _unread, this);
+
+ A(dom, _buffer, "Failed to allocate buffer.");
+}
+
+circular_buffer::~circular_buffer() {
+ dom->log(rust_log::MEM | rust_log::COMM,
+ "~circular_buffer 0x%" PRIxPTR,
+ this);
+ I(dom, _buffer);
+ // I(dom, unread == 0);
+ dom->free(_buffer);
+}
+
+/**
+ * Copies the unread data from this buffer to the "dst" address.
+ */
+void
+circular_buffer::transfer(void *dst) {
+ I(dom, dst);
+ uint8_t *ptr = (uint8_t *) dst;
+ for (size_t i = 0; i < _unread; i += unit_sz) {
+ memcpy(&ptr[i], &_buffer[_next + i % _buffer_sz], unit_sz);
+ }
+}
+
+/**
+ * Copies the data at the "src" address into this buffer. The buffer is
+ * grown if it isn't large enough.
+ */
+void
+circular_buffer::enqueue(void *src) {
+ I(dom, src);
+ I(dom, _unread <= _buffer_sz);
+
+ // Grow if necessary.
+ if (_unread == _buffer_sz) {
+ I(dom, _buffer_sz <= MAX_CIRCULAR_BUFFFER_SIZE);
+ void *tmp = dom->malloc(_buffer_sz << 1);
+ transfer(tmp);
+ _buffer_sz <<= 1;
+ dom->free(_buffer);
+ _buffer = (uint8_t *)tmp;
+ }
+
+ dom->log(rust_log::MEM | rust_log::COMM,
+ "circular_buffer enqueue "
+ "unread: %d, buffer_sz: %d, unit_sz: %d",
+ _unread, _buffer_sz, unit_sz);
+
+ I(dom, _unread < _buffer_sz);
+ I(dom, _unread + unit_sz <= _buffer_sz);
+
+ // Copy data
+ size_t i = (_next + _unread) % _buffer_sz;
+ memcpy(&_buffer[i], src, unit_sz);
+ _unread += unit_sz;
+
+ dom->log(rust_log::MEM | rust_log::COMM,
+ "circular_buffer pushed data at index: %d", i);
+}
+
+/**
+ * Copies data from this buffer to the "dst" address. The buffer is
+ * shrunk if possible.
+ */
+void
+circular_buffer::dequeue(void *dst) {
+ I(dom, dst);
+ I(dom, unit_sz > 0);
+ I(dom, _unread >= unit_sz);
+ I(dom, _unread <= _buffer_sz);
+ I(dom, _buffer);
+ size_t i = _next;
+ memcpy(dst, &_buffer[i], unit_sz);
+ dom->log(rust_log::MEM | rust_log::COMM,
+ "shifted data from index %d", i);
+ _unread -= unit_sz;
+ _next += unit_sz;
+ I(dom, _next <= _buffer_sz);
+ if (_next == _buffer_sz) {
+ _next = 0;
+ }
+
+ // Shrink if possible.
+ if (_buffer_sz >= INITIAL_CIRCULAR_BUFFFER_SIZE_IN_UNITS * unit_sz &&
+ _unread <= _buffer_sz / 4) {
+ void *tmp = dom->malloc(_buffer_sz / 2);
+ transfer(tmp);
+ _buffer_sz >>= 1;
+ dom->free(_buffer);
+ _buffer = (uint8_t *)tmp;
+ }
+}
+
+bool
+circular_buffer::is_empty() {
+ return _unread == 0;
+}
diff --git a/src/rt/circular_buffer.h b/src/rt/circular_buffer.h
new file mode 100644
index 00000000..c0c0da5e
--- /dev/null
+++ b/src/rt/circular_buffer.h
@@ -0,0 +1,30 @@
+/*
+ *
+ */
+
+#ifndef CIRCULAR_BUFFER_H
+#define CIRCULAR_BUFFER_H
+
+class
+circular_buffer : public dom_owned<circular_buffer> {
+ static const size_t INITIAL_CIRCULAR_BUFFFER_SIZE_IN_UNITS = 8;
+ static const size_t MAX_CIRCULAR_BUFFFER_SIZE = 1 << 24;
+
+public:
+ rust_dom *dom;
+ circular_buffer(rust_dom *dom, size_t unit_sz);
+ ~circular_buffer();
+ void transfer(void *dst);
+ void enqueue(void *src);
+ void dequeue(void *dst);
+ bool is_empty();
+
+private:
+ size_t _buffer_sz;
+ size_t unit_sz;
+ size_t _next;
+ size_t _unread;
+ uint8_t *_buffer;
+};
+
+#endif /* CIRCULAR_BUFFER_H */
diff --git a/src/rt/globals.h b/src/rt/globals.h
new file mode 100644
index 00000000..f8025a40
--- /dev/null
+++ b/src/rt/globals.h
@@ -0,0 +1,33 @@
+#ifndef GLOBALS_H
+#define GLOBALS_H
+
+#define __STDC_LIMIT_MACROS 1
+#define __STDC_CONSTANT_MACROS 1
+#define __STDC_FORMAT_MACROS 1
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+
+#include <stdio.h>
+#include <string.h>
+
+#if defined(__WIN32__)
+extern "C" {
+#include <windows.h>
+#include <tchar.h>
+#include <wincrypt.h>
+}
+#elif defined(__GNUC__)
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <dlfcn.h>
+#include <pthread.h>
+#include <errno.h>
+#else
+#error "Platform not supported."
+#endif
+
+#endif /* GLOBALS_H */
diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp
index 235eb8d0..00e709c9 100644
--- a/src/rt/rust.cpp
+++ b/src/rt/rust.cpp
@@ -1,7 +1,6 @@
#include "rust_internal.h"
#include "util/array_list.h"
-
// #define TRACK_ALLOCATIONS
// For debugging, keeps track of live allocations, so you can find out
// exactly what leaked.
@@ -100,52 +99,6 @@ rust_srv::clone()
return new rust_srv();
}
-
-int
-rust_main_loop(rust_dom *dom)
-{
- // Make sure someone is watching, to pull us out of infinite loops.
- rust_timer timer(*dom);
-
- int rval;
- rust_task *task;
-
- dom->log(rust_log::DOM,
- "running main-loop on domain 0x%" PRIxPTR, dom);
- dom->logptr("exit-task glue",
- dom->root_crate->get_exit_task_glue());
-
- while ((task = dom->sched()) != NULL) {
- I(dom, task->running());
-
- dom->log(rust_log::TASK,
- "activating task 0x%" PRIxPTR ", sp=0x%" PRIxPTR,
- (uintptr_t)task, task->rust_sp);
-
- dom->interrupt_flag = 0;
-
- dom->activate(task);
-
- dom->log(rust_log::TASK,
- "returned from task 0x%" PRIxPTR
- " in state '%s', sp=0x%" PRIxPTR,
- (uintptr_t)task,
- dom->state_vec_name(task->state),
- task->rust_sp);
-
- I(dom, task->rust_sp >= (uintptr_t) &task->stk->data[0]);
- I(dom, task->rust_sp < task->stk->limit);
-
- dom->reap_dead_tasks();
- }
-
- dom->log(rust_log::DOM, "finished main-loop (dom.rval = %d)", dom->rval);
- rval = dom->rval;
-
- return rval;
-}
-
-
struct
command_line_args
{
@@ -243,7 +196,7 @@ rust_start(uintptr_t main_fn, rust_crate const *crate, int argc, char **argv)
(uintptr_t)&main_args,
sizeof(main_args));
- ret = rust_main_loop(&dom);
+ ret = dom.start_main_loop();
}
#if !defined(__WIN32__)
diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp
index 339452c5..0b492de8 100644
--- a/src/rt/rust_builtin.cpp
+++ b/src/rt/rust_builtin.cpp
@@ -19,7 +19,7 @@ str_alloc(rust_task *task, size_t n_bytes)
extern "C" CDECL rust_str*
last_os_error(rust_task *task) {
rust_dom *dom = task->dom;
- dom->log(rust_log::TASK, "last_os_error()");
+ task->log(rust_log::TASK, "last_os_error()");
#if defined(__WIN32__)
LPTSTR buf;
@@ -95,7 +95,7 @@ extern "C" CDECL rust_vec*
vec_alloc(rust_task *task, type_desc *t, size_t n_elts)
{
rust_dom *dom = task->dom;
- dom->log(rust_log::MEM,
+ task->log(rust_log::MEM,
"vec_alloc %" PRIdPTR " elements of size %" PRIdPTR,
n_elts, t->size);
size_t fill = n_elts * t->size;
diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp
index 38f93a7d..6aa9121a 100644
--- a/src/rt/rust_chan.cpp
+++ b/src/rt/rust_chan.cpp
@@ -1,29 +1,29 @@
-
#include "rust_internal.h"
#include "rust_chan.h"
rust_chan::rust_chan(rust_task *task, rust_port *port) :
- task(task),
- port(port),
- buffer(task->dom, port->unit_sz),
- token(this)
-{
- if (port)
+ task(task), port(port), buffer(task->dom, port->unit_sz), token(this) {
+
+ if (port) {
port->chans.push(this);
+ ref();
+ }
+
+ task->log(rust_log::MEM | rust_log::COMM,
+ "new rust_chan(task=0x%" PRIxPTR
+ ", port=0x%" PRIxPTR ") -> chan=0x%"
+ PRIxPTR, (uintptr_t) task, (uintptr_t) port, (uintptr_t) this);
}
-rust_chan::~rust_chan()
-{
+rust_chan::~rust_chan() {
if (port) {
if (token.pending())
token.withdraw();
- port->chans.swapdel(this);
+ port->chans.swap_delete(this);
}
}
-void
-rust_chan::disassociate()
-{
+void rust_chan::disassociate() {
I(task->dom, port);
if (token.pending())
@@ -31,4 +31,32 @@ rust_chan::disassociate()
// Delete reference to the port/
port = NULL;
+
+ deref();
+}
+
+/**
+ * Attempt to transmit channel data to the associated port.
+ */
+int rust_chan::transmit() {
+ rust_dom *dom = task->dom;
+
+ // TODO: Figure out how and why the port would become null.
+ if (port == NULL) {
+ dom->log(rust_log::COMM, "invalid port, transmission incomplete");
+ return ERROR;
+ }
+
+ if (buffer.is_empty()) {
+ dom->log(rust_log::COMM, "buffer is empty, transmission incomplete");
+ return ERROR;
+ }
+
+ if(port->task->blocked_on(port)) {
+ buffer.dequeue(port->task->rendezvous_ptr);
+ port->task->wakeup(port);
+ }
+
+ return 0;
+
}
diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h
index a56ba0ca..3e32d838 100644
--- a/src/rt/rust_chan.h
+++ b/src/rt/rust_chan.h
@@ -9,7 +9,7 @@ public:
rust_task *task;
rust_port *port;
- circ_buf buffer;
+ circular_buffer buffer;
size_t idx; // Index into port->chans.
// Token belonging to this chan, it will be placed into a port's
@@ -17,6 +17,8 @@ public:
rust_token token;
void disassociate();
+
+ int transmit();
};
#endif /* RUST_CHAN_H */
diff --git a/src/rt/rust_comm.cpp b/src/rt/rust_comm.cpp
index 58b9ef4c..37929b8b 100644
--- a/src/rt/rust_comm.cpp
+++ b/src/rt/rust_comm.cpp
@@ -10,109 +10,6 @@ rust_alarm::rust_alarm(rust_task *receiver) :
{
}
-
-// Circular buffers.
-
-circ_buf::circ_buf(rust_dom *dom, size_t unit_sz) :
- dom(dom),
- alloc(INIT_CIRC_BUF_UNITS * unit_sz),
- unit_sz(unit_sz),
- next(0),
- unread(0),
- data((uint8_t *)dom->calloc(alloc))
-{
- I(dom, unit_sz);
- dom->log(rust_log::MEM|rust_log::COMM,
- "new circ_buf(alloc=%d, unread=%d) -> circ_buf=0x%" PRIxPTR,
- alloc, unread, this);
- I(dom, data);
-}
-
-circ_buf::~circ_buf()
-{
- dom->log(rust_log::MEM|rust_log::COMM,
- "~circ_buf 0x%" PRIxPTR,
- this);
- I(dom, data);
- // I(dom, unread == 0);
- dom->free(data);
-}
-
-void
-circ_buf::transfer(void *dst)
-{
- size_t i;
- uint8_t *d = (uint8_t *)dst;
- I(dom, dst);
- for (i = 0; i < unread; i += unit_sz)
- memcpy(&d[i], &data[next + i % alloc], unit_sz);
-}
-
-void
-circ_buf::push(void *src)
-{
- size_t i;
- void *tmp;
-
- I(dom, src);
- I(dom, unread <= alloc);
-
- /* Grow if necessary. */
- if (unread == alloc) {
- I(dom, alloc <= MAX_CIRC_BUF_SIZE);
- tmp = dom->malloc(alloc << 1);
- transfer(tmp);
- alloc <<= 1;
- dom->free(data);
- data = (uint8_t *)tmp;
- }
-
- dom->log(rust_log::MEM|rust_log::COMM,
- "circ buf push, unread=%d, alloc=%d, unit_sz=%d",
- unread, alloc, unit_sz);
-
- I(dom, unread < alloc);
- I(dom, unread + unit_sz <= alloc);
-
- i = (next + unread) % alloc;
- memcpy(&data[i], src, unit_sz);
-
- dom->log(rust_log::MEM|rust_log::COMM, "pushed data at index %d", i);
- unread += unit_sz;
-}
-
-void
-circ_buf::shift(void *dst)
-{
- size_t i;
- void *tmp;
-
- I(dom, dst);
- I(dom, unit_sz > 0);
- I(dom, unread >= unit_sz);
- I(dom, unread <= alloc);
- I(dom, data);
- i = next;
- memcpy(dst, &data[i], unit_sz);
- dom->log(rust_log::MEM|rust_log::COMM, "shifted data from index %d", i);
- unread -= unit_sz;
- next += unit_sz;
- I(dom, next <= alloc);
- if (next == alloc)
- next = 0;
-
- /* Shrink if necessary. */
- if (alloc >= INIT_CIRC_BUF_UNITS * unit_sz &&
- unread <= alloc / 4) {
- tmp = dom->malloc(alloc / 2);
- transfer(tmp);
- alloc >>= 1;
- dom->free(data);
- data = (uint8_t *)tmp;
- }
-}
-
-
// Ports.
rust_port::rust_port(rust_task *task, size_t unit_sz) :
@@ -121,18 +18,16 @@ rust_port::rust_port(rust_task *task, size_t unit_sz) :
writers(task->dom),
chans(task->dom)
{
- rust_dom *dom = task->dom;
- dom->log(rust_log::MEM|rust_log::COMM,
- "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
- PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
+ task->log(rust_log::MEM|rust_log::COMM,
+ "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
+ PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
}
rust_port::~rust_port()
{
- rust_dom *dom = task->dom;
- dom->log(rust_log::COMM|rust_log::MEM,
- "~rust_port 0x%" PRIxPTR,
- (uintptr_t)this);
+ task->log(rust_log::COMM|rust_log::MEM,
+ "~rust_port 0x%" PRIxPTR,
+ (uintptr_t)this);
while (chans.length() > 0)
chans.pop()->disassociate();
}
@@ -182,7 +77,7 @@ rust_token::withdraw()
if (task->blocked())
task->wakeup(this); // must be blocked on us (or dead)
- port->writers.swapdel(this);
+ port->writers.swap_delete(this);
submitted = false;
}
diff --git a/src/rt/rust_crate_cache.cpp b/src/rt/rust_crate_cache.cpp
index 2db0eb45..650e3bb1 100644
--- a/src/rt/rust_crate_cache.cpp
+++ b/src/rt/rust_crate_cache.cpp
@@ -251,7 +251,7 @@ rust_crate_cache::flush() {
if (s) {
dom->log(rust_log::CACHE,
"rust_crate_cache::flush() deref rust_sym %"
- PRIdPTR " (rc=%" PRIdPTR ")", i, s->refcnt);
+ PRIdPTR " (rc=%" PRIdPTR ")", i, s->ref_count);
s->deref();
}
rust_syms[i] = NULL;
@@ -262,7 +262,7 @@ rust_crate_cache::flush() {
if (s) {
dom->log(rust_log::CACHE,
"rust_crate_cache::flush() deref c_sym %"
- PRIdPTR " (rc=%" PRIdPTR ")", i, s->refcnt);
+ PRIdPTR " (rc=%" PRIdPTR ")", i, s->ref_count);
s->deref();
}
c_syms[i] = NULL;
@@ -272,7 +272,7 @@ rust_crate_cache::flush() {
lib *l = libs[i];
if (l) {
dom->log(rust_log::CACHE, "rust_crate_cache::flush() deref lib %"
- PRIdPTR " (rc=%" PRIdPTR ")", i, l->refcnt);
+ PRIdPTR " (rc=%" PRIdPTR ")", i, l->ref_count);
l->deref();
}
libs[i] = NULL;
diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp
index 3b5e23b2..39124491 100644
--- a/src/rt/rust_dom.cpp
+++ b/src/rt/rust_dom.cpp
@@ -4,6 +4,24 @@
template class ptr_vec<rust_task>;
+rust_message::rust_message(rust_dom *dom) : dom(dom) {
+
+}
+
+void rust_message::process() {
+
+}
+
+kill_task_message::kill_task_message(rust_dom *dom, rust_task *task) :
+ rust_message(dom), _task(task) {
+
+}
+
+void kill_task_message::process() {
+ _task->ref_count--;
+ _task->kill();
+}
+
rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate) :
interrupt_flag(0),
root_crate(root_crate),
@@ -81,13 +99,25 @@ rust_dom::activate(rust_task *task) {
}
void
+rust_dom::log(rust_task *task, uint32_t type_bits, char const *fmt, ...) {
+ char buf[256];
+ if (_log.is_tracing(type_bits)) {
+ va_list args;
+ va_start(args, fmt);
+ vsnprintf(buf, sizeof(buf), fmt, args);
+ _log.trace_ln(task, type_bits, buf);
+ va_end(args);
+ }
+}
+
+void
rust_dom::log(uint32_t type_bits, char const *fmt, ...) {
char buf[256];
if (_log.is_tracing(type_bits)) {
va_list args;
va_start(args, fmt);
vsnprintf(buf, sizeof(buf), fmt, args);
- _log.trace_ln(type_bits, buf);
+ _log.trace_ln(NULL, type_bits, buf);
va_end(args);
}
}
@@ -189,7 +219,7 @@ rust_dom::remove_task_from_state_vec(ptr_vec<rust_task> *v, rust_task *task)
"removing task 0x%" PRIxPTR " in state '%s' from vec 0x%" PRIxPTR,
(uintptr_t)task, state_vec_name(v), (uintptr_t)v);
I(this, (*v)[task->idx] == task);
- v->swapdel(task);
+ v->swap_delete(task);
}
const char *
@@ -203,25 +233,67 @@ rust_dom::state_vec_name(ptr_vec<rust_task> *v)
return "dead";
}
+/**
+ * Delete any dead tasks.
+ */
void
-rust_dom::reap_dead_tasks()
-{
+rust_dom::reap_dead_tasks() {
for (size_t i = 0; i < dead_tasks.length(); ) {
- rust_task *t = dead_tasks[i];
- if (t == root_task || t->refcnt == 0) {
- I(this, !t->waiting_tasks.length());
- dead_tasks.swapdel(t);
+ rust_task *task = dead_tasks[i];
+// log(rust_log::TASK, "dead task 0x%" PRIxPTR " with ref_count: %d",
+// task, task->ref_count);
+ if (task->ref_count == 0) {
+ I(this, !task->waiting_tasks.length());
+ dead_tasks.swap_delete(task);
log(rust_log::TASK,
- "deleting unreferenced dead task 0x%" PRIxPTR, t);
- delete t;
+ "deleting unreferenced dead task 0x%" PRIxPTR, task);
+ delete task;
continue;
}
++i;
}
}
+
+/**
+ * Enqueues a message in this domain's incoming message queue. It's the
+ * responsibility of the receiver to free the message once it's processed.
+ */
+void rust_dom::send_message(rust_message *message) {
+ log(rust_log::COMM, "enqueueing message 0x%" PRIxPTR
+ " in queue 0x%" PRIxPTR,
+ message,
+ &_incoming_message_queue);
+ _incoming_message_queue.enqueue(message);
+ _incoming_message_pending.signal();
+}
+
+/**
+ * Drains and processes incoming pending messages.
+ */
+void rust_dom::drain_incoming_message_queue() {
+ rust_message *message;
+ while ((message = (rust_message *) _incoming_message_queue.dequeue())) {
+ log(rust_log::COMM, "read 0x%" PRIxPTR
+ " from queue 0x%" PRIxPTR,
+ message,
+ &_incoming_message_queue);
+ log(rust_log::COMM, "processing incoming message 0x%" PRIxPTR,
+ message);
+ message->process();
+ delete message;
+ }
+}
+
+/**
+ * Schedules a running task for execution. Only running tasks can be
+ * activated. Blocked tasks have to be unblocked before they can be
+ * activated.
+ *
+ * Returns NULL if no tasks can be scheduled.
+ */
rust_task *
-rust_dom::sched()
+rust_dom::schedule_task()
{
I(this, this);
// FIXME: in the face of failing tasks, this is not always right.
@@ -231,11 +303,88 @@ rust_dom::sched()
i %= running_tasks.length();
return (rust_task *)running_tasks[i];
}
- log(rust_log::DOM|rust_log::TASK,
- "no schedulable tasks");
+ // log(rust_log::DOM|rust_log::TASK, "no schedulable tasks");
return NULL;
}
+/**
+ * Starts the main scheduler loop which performs task scheduling for this
+ * domain.
+ *
+ * Returns once no more tasks can be scheduled.
+ */
+int
+rust_dom::start_main_loop()
+{
+ // Make sure someone is watching, to pull us out of infinite loops.
+ rust_timer timer(this);
+
+ log(rust_log::DOM, "running main-loop on domain 0x%" PRIxPTR, this);
+ logptr("exit-task glue", root_crate->get_exit_task_glue());
+
+ while (n_live_tasks() > 0) {
+ rust_task *scheduled_task = schedule_task();
+
+ // If we cannot schedule a task because all other live tasks
+ // are blocked, wait on a condition variable which is signaled
+ // if progress is made in other domains.
+
+ if (scheduled_task == NULL) {
+ log(rust_log::TASK,
+ "all tasks are blocked, waiting for progress ...");
+ _progress.wait();
+ continue;
+ }
+
+ I(this, scheduled_task->running());
+
+ log(rust_log::TASK,
+ "activating task 0x%" PRIxPTR ", sp=x%" PRIxPTR,
+ (uintptr_t)scheduled_task, scheduled_task->rust_sp);
+
+ interrupt_flag = 0;
+
+ activate(scheduled_task);
+
+ log(rust_log::TASK,
+ "returned from task 0x%" PRIxPTR
+ " in state '%s', sp=0x%" PRIxPTR,
+ (uintptr_t)scheduled_task,
+ state_vec_name(scheduled_task->state),
+ scheduled_task->rust_sp);
+
+ I(this, scheduled_task->rust_sp >=
+ (uintptr_t) &scheduled_task->stk->data[0]);
+ I(this, scheduled_task->rust_sp < scheduled_task->stk->limit);
+
+ drain_incoming_message_queue();
+
+ reap_dead_tasks();
+ }
+
+ log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ...");
+
+ while (dead_tasks.length() > 0) {
+ log(rust_log::DOM,
+ "waiting for %d dead tasks to become dereferenced ...",
+ dead_tasks.length());
+
+ log(rust_log::DOM,
+ "waiting for %" PRIxPTR, dead_tasks[0]);
+
+ if (_incoming_message_queue.is_empty()) {
+ _incoming_message_pending.wait();
+ } else {
+ drain_incoming_message_queue();
+ }
+ reap_dead_tasks();
+ }
+
+ log(rust_log::DOM, "finished main-loop (dom.rval = %d)", rval);
+ return rval;
+}
+
+
rust_crate_cache *
rust_dom::get_cache(rust_crate const *crate) {
log(rust_log::CACHE,
diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h
new file mode 100644
index 00000000..38d04dbf
--- /dev/null
+++ b/src/rt/rust_dom.h
@@ -0,0 +1,92 @@
+/*
+ * rust_dom.h
+ */
+
+#ifndef RUST_DOM_H
+#define RUST_DOM_H
+
+#include "sync/lock_free_queue.h"
+
+class rust_message : public lock_free_queue_node,
+ public dom_owned<rust_message> {
+public:
+ rust_dom *dom;
+ rust_message(rust_dom *dom);
+ virtual void process();
+};
+
+class kill_task_message : public rust_message {
+ rust_task *_task;
+public:
+ kill_task_message(rust_dom *dom, rust_task *task);
+ void process();
+};
+
+struct rust_dom
+{
+ // Fields known to the compiler:
+ uintptr_t interrupt_flag;
+
+ // Fields known only by the runtime:
+
+ // NB: the root crate must remain in memory until the root of the
+ // tree of domains exits. All domains within this tree have a
+ // copy of this root_crate value and use it for finding utility
+ // glue.
+ rust_crate const *root_crate;
+ rust_log _log;
+ rust_srv *srv;
+ ptr_vec<rust_task> running_tasks;
+ ptr_vec<rust_task> blocked_tasks;
+ ptr_vec<rust_task> dead_tasks;
+ ptr_vec<rust_crate_cache> caches;
+ randctx rctx;
+ rust_task *root_task;
+ rust_task *curr_task;
+ int rval;
+
+ condition_variable _progress;
+
+ // Incoming messages from other domains.
+ condition_variable _incoming_message_pending;
+ lock_free_queue _incoming_message_queue;
+
+#ifndef __WIN32__
+ pthread_attr_t attr;
+#endif
+
+ rust_dom(rust_srv *srv, rust_crate const *root_crate);
+ ~rust_dom();
+
+ void activate(rust_task *task);
+ void log(rust_task *task, uint32_t logbit, char const *fmt, ...);
+ void log(uint32_t logbit, char const *fmt, ...);
+ rust_log & get_log();
+ void logptr(char const *msg, uintptr_t ptrval);
+ template<typename T>
+ void logptr(char const *msg, T* ptrval);
+ void fail();
+ void *malloc(size_t sz);
+ void *calloc(size_t sz);
+ void *realloc(void *data, size_t sz);
+ void free(void *p);
+
+ void send_message(rust_message *message);
+ void drain_incoming_message_queue();
+
+#ifdef __WIN32__
+ void win32_require(LPCTSTR fn, BOOL ok);
+#endif
+
+ rust_crate_cache *get_cache(rust_crate const *crate);
+ size_t n_live_tasks();
+ void add_task_to_state_vec(ptr_vec<rust_task> *v, rust_task *task);
+ void remove_task_from_state_vec(ptr_vec<rust_task> *v, rust_task *task);
+ const char *state_vec_name(ptr_vec<rust_task> *v);
+
+ void reap_dead_tasks();
+ rust_task *schedule_task();
+ int start_main_loop();
+};
+
+#endif /* RUST_DOM_H */
diff --git a/src/rt/rust_internal.h b/src/rt/rust_internal.h
index f877cefc..d962e894 100644
--- a/src/rt/rust_internal.h
+++ b/src/rt/rust_internal.h
@@ -5,6 +5,8 @@
#define __STDC_CONSTANT_MACROS 1
#define __STDC_FORMAT_MACROS 1
+#define ERROR 0
+
#include <stdlib.h>
#include <stdint.h>
#include <inttypes.h>
@@ -36,13 +38,18 @@ extern "C" {
#error "Platform not supported."
#endif
+#include "sync/condition_variable.h"
+
#ifndef __i386__
#error "Target CPU not supported."
#endif
-#define I(dom, e) ((e) ? (void)0 : \
+#define I(dom, e) ((e) ? (void)0 : \
(dom)->srv->fatal(#e, __FILE__, __LINE__))
+#define A(dom, e, s) ((e) ? (void)0 : \
+ (dom)->srv->fatal(#e " : " #s, __FILE__, __LINE__))
+
struct rust_task;
struct rust_port;
class rust_chan;
@@ -50,7 +57,7 @@ struct rust_token;
struct rust_dom;
class rust_crate;
class rust_crate_cache;
-class lockfree_queue;
+// class lockfree_queue;
struct stk_seg;
struct type_desc;
@@ -66,14 +73,14 @@ template <typename T>
struct
rc_base
{
- size_t refcnt;
+ size_t ref_count;
void ref() {
- ++refcnt;
+ ++ref_count;
}
void deref() {
- if (--refcnt == 0) {
+ if (--ref_count == 0) {
delete (T*)this;
}
}
@@ -122,71 +129,29 @@ public:
return fill;
}
+ bool is_empty() {
+ return fill == 0;
+ }
+
T *& operator[](size_t offset);
void push(T *p);
T *pop();
void trim(size_t fill);
- void swapdel(T* p);
+ void swap_delete(T* p);
};
-struct
-rust_dom
-{
- // Fields known to the compiler:
- uintptr_t interrupt_flag;
-
- // Fields known only by the runtime:
-
- // NB: the root crate must remain in memory until the root of the
- // tree of domains exits. All domains within this tree have a
- // copy of this root_crate value and use it for finding utility
- // glue.
- rust_crate const *root_crate;
- rust_log _log;
- rust_srv *srv;
- // uint32_t logbits;
- ptr_vec<rust_task> running_tasks;
- ptr_vec<rust_task> blocked_tasks;
- ptr_vec<rust_task> dead_tasks;
- ptr_vec<rust_crate_cache> caches;
- randctx rctx;
- rust_task *root_task;
- rust_task *curr_task;
- int rval;
- lockfree_queue *incoming; // incoming messages from other threads
-
-#ifndef __WIN32__
- pthread_attr_t attr;
-#endif
-
- rust_dom(rust_srv *srv, rust_crate const *root_crate);
- ~rust_dom();
-
- void activate(rust_task *task);
- void log(uint32_t logbit, char const *fmt, ...);
- rust_log & get_log();
- void logptr(char const *msg, uintptr_t ptrval);
- template<typename T>
- void logptr(char const *msg, T* ptrval);
- void fail();
- void *malloc(size_t sz);
- void *calloc(size_t sz);
- void *realloc(void *data, size_t sz);
- void free(void *p);
-
-#ifdef __WIN32__
- void win32_require(LPCTSTR fn, BOOL ok);
-#endif
+#include "rust_dom.h"
- rust_crate_cache *get_cache(rust_crate const *crate);
- size_t n_live_tasks();
- void add_task_to_state_vec(ptr_vec<rust_task> *v, rust_task *task);
- void remove_task_from_state_vec(ptr_vec<rust_task> *v, rust_task *task);
- const char *state_vec_name(ptr_vec<rust_task> *v);
+template <typename T> inline T
+check_null(rust_dom *dom, T value, char const *expr,
+ char const *file, size_t line) {
+ if (value == NULL) {
+ dom->srv->fatal(expr, file, line);
+ }
+ return value;
+}
- void reap_dead_tasks();
- rust_task *sched();
-};
+#define CHECK_NULL(dom, e) (check_null(dom, e, #e, __FILE__, __LINE__))
inline void *operator new(size_t sz, void *mem) {
return mem;
@@ -217,7 +182,7 @@ rust_timer
// For now it's just the most basic "thread that can interrupt
// its associated domain-thread" device, so that we have
// *some* form of task-preemption.
- rust_dom &dom;
+ rust_dom *dom;
uintptr_t exit_flag;
#if defined(__WIN32__)
@@ -227,7 +192,7 @@ rust_timer
pthread_t thread;
#endif
- rust_timer(rust_dom &dom);
+ rust_timer(rust_dom *dom);
~rust_timer();
};
@@ -608,94 +573,8 @@ struct gc_alloc {
}
};
-struct
-rust_task : public rc_base<rust_task>,
- public dom_owned<rust_task>,
- public rust_cond
-{
- // Fields known to the compiler.
- stk_seg *stk;
- uintptr_t runtime_sp; // Runtime sp while task running.
- uintptr_t rust_sp; // Saved sp when not running.
- gc_alloc *gc_alloc_chain; // Linked list of GC allocations.
- rust_dom *dom;
- rust_crate_cache *cache;
-
- // Fields known only to the runtime.
- ptr_vec<rust_task> *state;
- rust_cond *cond;
- uintptr_t* dptr; // Rendezvous pointer for send/recv.
- rust_task *supervisor; // Parent-link for failure propagation.
- size_t idx;
- size_t gc_alloc_thresh;
- size_t gc_alloc_accum;
-
- // Wait queue for tasks waiting for this task.
- rust_wait_queue waiting_tasks;
- rust_alarm alarm;
-
- rust_task(rust_dom *dom,
- rust_task *spawner);
- ~rust_task();
-
- void start(uintptr_t exit_task_glue,
- uintptr_t spawnee_fn,
- uintptr_t args,
- size_t callsz);
- void grow(size_t n_frame_bytes);
- bool running();
- bool blocked();
- bool blocked_on(rust_cond *cond);
- bool dead();
-
- void link_gc(gc_alloc *gcm);
- void unlink_gc(gc_alloc *gcm);
- void *malloc(size_t sz, type_desc *td=0);
- void *realloc(void *data, size_t sz, bool gc_mem=false);
- void free(void *p, bool gc_mem=false);
-
- const char *state_str();
- void transition(ptr_vec<rust_task> *svec, ptr_vec<rust_task> *dvec);
-
- void block(rust_cond *on);
- void wakeup(rust_cond *from);
- void die();
- void unblock();
-
- void check_active() { I(dom, dom->curr_task == this); }
- void check_suspended() { I(dom, dom->curr_task != this); }
-
- // Swap in some glue code to run when we have returned to the
- // task's context (assuming we're the active task).
- void run_after_return(size_t nargs, uintptr_t glue);
-
- // Swap in some glue code to run when we're next activated
- // (assuming we're the suspended task).
- void run_on_resume(uintptr_t glue);
-
- // Save callee-saved registers and return to the main loop.
- void yield(size_t nargs);
-
- // Fail this task (assuming caller-on-stack is different task).
- void kill();
-
- // Fail self, assuming caller-on-stack is this task.
- void fail(size_t nargs);
-
- // Run the gc glue on the task stack.
- void gc(size_t nargs);
-
- // Disconnect from our supervisor.
- void unsupervise();
-
- // Notify tasks waiting for us that we are about to die.
- void notify_waiting_tasks();
-
- uintptr_t get_fp();
- uintptr_t get_previous_fp(uintptr_t fp);
- frame_glue_fns *get_frame_glue_fns(uintptr_t fp);
- rust_crate_cache * get_crate_cache(rust_crate const *curr_crate);
-};
+#include "rust_proxy.h"
+#include "rust_task.h"
struct rust_port : public rc_base<rust_port>,
public task_owned<rust_port>,
@@ -722,31 +601,29 @@ struct rust_token : public rust_cond {
void withdraw();
};
+#include "circular_buffer.h"
-struct circ_buf : public dom_owned<circ_buf> {
- static const size_t INIT_CIRC_BUF_UNITS = 8;
- static const size_t MAX_CIRC_BUF_SIZE = 1 << 24;
-
- rust_dom *dom;
- size_t alloc;
- size_t unit_sz;
- size_t next;
- size_t unread;
- uint8_t *data;
-
- circ_buf(rust_dom *dom, size_t unit_sz);
- ~circ_buf();
-
- void transfer(void *dst);
- void push(void *src);
- void shift(void *dst);
-};
+//struct circ_buf : public dom_owned<circ_buf> {
+// static const size_t INIT_CIRC_BUF_UNITS = 8;
+// static const size_t MAX_CIRC_BUF_SIZE = 1 << 24;
+//
+// rust_dom *dom;
+// size_t alloc;
+// size_t unit_sz;
+// size_t next;
+// size_t unread;
+// uint8_t *data;
+//
+// circ_buf(rust_dom *dom, size_t unit_sz);
+// ~circ_buf();
+//
+// void transfer(void *dst);
+// void push(void *src);
+// void shift(void *dst);
+//};
#include "rust_chan.h"
-int
-rust_main_loop(rust_dom *dom);
-
//
// Local Variables:
// mode: C++
diff --git a/src/rt/rust_log.cpp b/src/rt/rust_log.cpp
index 5cdf315c..b67876eb 100644
--- a/src/rt/rust_log.cpp
+++ b/src/rt/rust_log.cpp
@@ -4,9 +4,13 @@
*/
#include "rust_internal.h"
+#include "sync/spin_lock.h"
+#include "util/array_list.h"
+#include <stdarg.h>
-static uint32_t read_type_bit_mask() {
- uint32_t bits = rust_log::ULOG | rust_log::ERR;
+static uint32_t
+read_type_bit_mask() {
+ uint32_t bits = rust_log::ULOG | rust_log::ERR | rust_log::ALL;
char *env_str = getenv("RUST_LOG");
if (env_str) {
bits = 0;
@@ -27,92 +31,167 @@ static uint32_t read_type_bit_mask() {
return bits;
}
-rust_log::ansi_color rust_log::get_type_color(log_type type) {
- switch (type) {
- case ERR:
- return rust_log::RED;
- case UPCALL:
- return rust_log::GREEN;
- case COMM:
- return rust_log::MAGENTA;
- case DOM:
- case TASK:
- return rust_log::LIGHTTEAL;
- case MEM:
- return rust_log::YELLOW;
- default:
- return rust_log::WHITE;
- }
+rust_log::ansi_color
+get_type_color(rust_log::log_type type) {
+ rust_log::ansi_color color = rust_log::WHITE;
+ if (type & rust_log::ERR)
+ color = rust_log::RED;
+ if (type & rust_log::MEM)
+ color = rust_log::YELLOW;
+ if (type & rust_log::UPCALL)
+ color = rust_log::GREEN;
+ if (type & rust_log::COMM)
+ color = rust_log::MAGENTA;
+ if (type & rust_log::DOM)
+ color = rust_log::LIGHTTEAL;
+ if (type & rust_log::TASK)
+ color = rust_log::LIGHTTEAL;
+ return color;
}
-static const char * _foreground_colors[] = { "[30m", "[1;30m", "[37m",
- "[31m", "[1;31m", "[32m",
- "[1;32m", "[33m", "[33m",
- "[34m", "[1;34m", "[35m",
- "[1;35m", "[36m", "[1;36m" };
+static const char * _foreground_colors[] = { "[37m",
+ "[31m", "[1;31m",
+ "[32m", "[1;32m",
+ "[33m", "[1;33m",
+ "[31m", "[1;31m",
+ "[35m", "[1;35m",
+ "[36m", "[1;36m" };
+
+/**
+ * Synchronizes access to the underlying logging mechanism.
+ */
+static spin_lock _log_lock;
+
rust_log::rust_log(rust_srv *srv, rust_dom *dom) :
- _srv(srv), _dom(dom), _type_bit_mask(read_type_bit_mask()),
- _use_colors(getenv("RUST_COLOR_LOG")), _indent(0) {
+ _srv(srv),
+ _dom(dom),
+ _type_bit_mask(read_type_bit_mask()),
+ _use_colors(getenv("RUST_COLOR_LOG")),
+ _indent(0) {
}
rust_log::~rust_log() {
}
-void rust_log::trace_ln(char *message) {
- char buffer[512];
- if (_use_colors) {
- snprintf(buffer, sizeof(buffer), "\x1b%s0x%08" PRIxPTR "\x1b[0m: ",
- _foreground_colors[1 + ((uintptr_t) _dom % 2687 % (LIGHTTEAL
- - 1))], (uintptr_t) _dom);
- } else {
- snprintf(buffer, sizeof(buffer), "0x%08" PRIxPTR ": ",
- (uintptr_t) _dom);
+const uint16_t
+hash(uintptr_t ptr) {
+ // Robert Jenkins' 32 bit integer hash function
+ ptr = (ptr + 0x7ed55d16) + (ptr << 12);
+ ptr = (ptr ^ 0xc761c23c) ^ (ptr >> 19);
+ ptr = (ptr + 0x165667b1) + (ptr << 5);
+ ptr = (ptr + 0xd3a2646c) ^ (ptr << 9);
+ ptr = (ptr + 0xfd7046c5) + (ptr << 3);
+ ptr = (ptr ^ 0xb55a4f09) ^ (ptr >> 16);
+ return (uint16_t) ptr;
+}
+
+const char *
+get_color(uintptr_t ptr) {
+ return _foreground_colors[hash(ptr) % rust_log::LIGHTTEAL];
+}
+
+char *
+copy_string(char *dst, const char *src, size_t length) {
+ return strncpy(dst, src, length) + length;
+}
+
+char *
+append_string(char *buffer, const char *format, ...) {
+ if (buffer != NULL && format) {
+ va_list args;
+ va_start(args, format);
+ vsprintf(buffer + strlen(buffer), format, args);
+ va_end(args);
}
+ return buffer;
+}
+char *
+append_string(char *buffer, rust_log::ansi_color color,
+ const char *format, ...) {
+ if (buffer != NULL && format) {
+ append_string(buffer, "\x1b%s", _foreground_colors[color]);
+ va_list args;
+ va_start(args, format);
+ vsprintf(buffer + strlen(buffer), format, args);
+ va_end(args);
+ append_string(buffer, "\x1b[0m");
+ }
+ return buffer;
+}
+
+void
+rust_log::trace_ln(char *prefix, char *message) {
+ char buffer[1024] = "";
+ _log_lock.lock();
+ append_string(buffer, "%-34s", prefix);
for (uint32_t i = 0; i < _indent; i++) {
- strncat(buffer, "\t", sizeof(buffer) - strlen(buffer) - 1);
+ append_string(buffer, " ");
}
- strncat(buffer, message, sizeof(buffer) - strlen(buffer) - 1);
+ append_string(buffer, "%s", message);
_srv->log(buffer);
+ _log_lock.unlock();
+}
+
+void
+rust_log::trace_ln(rust_task *task, char *message) {
+#if defined(__WIN32__)
+ uint32_t thread_id = 0;
+#else
+ uint32_t thread_id = (uint32_t) pthread_self();
+#endif
+ char prefix[1024] = "";
+ append_string(prefix, "0x%08" PRIxPTR ":0x%08" PRIxPTR ":",
+ thread_id, (uintptr_t) _dom);
+ if (task) {
+ append_string(prefix, "0x%08" PRIxPTR ":", (uintptr_t) task);
+ }
+ trace_ln(prefix, message);
}
/**
* Traces a log message if the specified logging type is not filtered.
*/
-void rust_log::trace_ln(uint32_t type_bits, char *message) {
- trace_ln(get_type_color((rust_log::log_type) type_bits), type_bits,
- message);
+void
+rust_log::trace_ln(rust_task *task, uint32_t type_bits, char *message) {
+ trace_ln(task, get_type_color((rust_log::log_type) type_bits),
+ type_bits, message);
}
/**
* Traces a log message using the specified ANSI color code.
*/
-void rust_log::trace_ln(ansi_color color, uint32_t type_bits, char *message) {
+void
+rust_log::trace_ln(rust_task *task, ansi_color color,
+ uint32_t type_bits, char *message) {
if (is_tracing(type_bits)) {
if (_use_colors) {
- char buffer[512];
- snprintf(buffer, sizeof(buffer), "\x1b%s%s\x1b[0m",
- _foreground_colors[color], message);
- trace_ln(buffer);
+ char buffer[512] = "";
+ append_string(buffer, color, "%s", message);
+ trace_ln(task, buffer);
} else {
- trace_ln(message);
+ trace_ln(task, message);
}
}
}
-bool rust_log::is_tracing(uint32_t type_bits) {
+bool
+rust_log::is_tracing(uint32_t type_bits) {
return type_bits & _type_bit_mask;
}
-void rust_log::indent() {
+void
+rust_log::indent() {
_indent++;
}
-void rust_log::outdent() {
+void
+rust_log::outdent() {
_indent--;
}
-void rust_log::reset_indent(uint32_t indent) {
+void
+rust_log::reset_indent(uint32_t indent) {
_indent = indent;
}
diff --git a/src/rt/rust_log.h b/src/rt/rust_log.h
index bd32c155..06712066 100644
--- a/src/rt/rust_log.h
+++ b/src/rt/rust_log.h
@@ -1,22 +1,18 @@
-#ifndef RUST_LOG_H_
-#define RUST_LOG_H_
+#ifndef RUST_LOG_H
+#define RUST_LOG_H
class rust_dom;
+class rust_task;
+
+
class rust_log {
- rust_srv *_srv;
- rust_dom *_dom;
- uint32_t _type_bit_mask;
- bool _use_colors;
- uint32_t _indent;
- void trace_ln(char *message);
+
public:
rust_log(rust_srv *srv, rust_dom *dom);
virtual ~rust_log();
enum ansi_color {
- BLACK,
- GRAY,
WHITE,
RED,
LIGHTRED,
@@ -51,10 +47,19 @@ public:
void indent();
void outdent();
void reset_indent(uint32_t indent);
- void trace_ln(uint32_t type_bits, char *message);
- void trace_ln(ansi_color color, uint32_t type_bits, char *message);
+ void trace_ln(char *prefix, char *message);
+ void trace_ln(rust_task *task, uint32_t type_bits, char *message);
+ void trace_ln(rust_task *task, ansi_color color, uint32_t type_bits, char *message);
bool is_tracing(uint32_t type_bits);
- static ansi_color get_type_color(log_type type);
+
+private:
+ rust_srv *_srv;
+ rust_dom *_dom;
+ uint32_t _type_bit_mask;
+ bool _use_labels;
+ bool _use_colors;
+ uint32_t _indent;
+ void trace_ln(rust_task *task, char *message);
};
-#endif /* RUST_LOG_H_ */
+#endif /* RUST_LOG_H */
diff --git a/src/rt/rust_proxy.h b/src/rt/rust_proxy.h
new file mode 100644
index 00000000..8059dd89
--- /dev/null
+++ b/src/rt/rust_proxy.h
@@ -0,0 +1,31 @@
+/**
+ * A proxy object is a wrapper around other Rust objects. One use of the proxy
+ * object is to mitigate access between tasks in different thread domains.
+ */
+
+#ifndef RUST_PROXY_H
+#define RUST_PROXY_H
+
+template <typename T> struct
+rust_proxy_delegate : public rc_base<T> {
+protected:
+ T *_delegate;
+public:
+ rust_proxy_delegate(T * delegate) : _delegate(delegate) {
+ }
+ T *delegate() { return _delegate; }
+};
+
+template <typename T> struct
+rust_proxy : public rust_proxy_delegate<T>,
+ public dom_owned<rust_proxy<T> > {
+public:
+ rust_dom *dom;
+ rust_proxy(rust_dom *dom, T *delegate) :
+ rust_proxy_delegate<T> (delegate),
+ dom(dom) {
+ delegate->ref();
+ }
+};
+
+#endif /* RUST_PROXY_H */
diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp
index 7c92c4ca..357edbf1 100644
--- a/src/rt/rust_task.cpp
+++ b/src/rt/rust_task.cpp
@@ -53,6 +53,7 @@ align_down(uintptr_t sp)
rust_task::rust_task(rust_dom *dom, rust_task *spawner) :
+ rust_proxy_delegate<rust_task>(this),
stk(new_stk(dom, 0)),
runtime_sp(0),
rust_sp(stk->limit),
@@ -61,20 +62,24 @@ rust_task::rust_task(rust_dom *dom, rust_task *spawner) :
cache(NULL),
state(&dom->running_tasks),
cond(NULL),
- dptr(0),
supervisor(spawner),
idx(0),
waiting_tasks(dom),
+ rendezvous_ptr(0),
alarm(this)
{
dom->logptr("new task", (uintptr_t)this);
+
+ if (spawner == NULL) {
+ ref_count = 0;
+ }
}
rust_task::~rust_task()
{
dom->log(rust_log::MEM|rust_log::TASK,
"~rust_task 0x%" PRIxPTR ", refcnt=%d",
- (uintptr_t)this, refcnt);
+ (uintptr_t)this, ref_count);
/*
for (uintptr_t fp = get_fp(); fp; fp = get_previous_fp(fp)) {
@@ -98,8 +103,8 @@ rust_task::~rust_task()
/* FIXME: tighten this up, there are some more
assertions that hold at task-lifecycle events. */
- I(dom, refcnt == 0 ||
- (refcnt == 1 && this == dom->root_task));
+ I(dom, ref_count == 0 ||
+ (ref_count == 1 && this == dom->root_task));
del_stk(dom, stk);
if (cache)
@@ -275,9 +280,9 @@ rust_task::run_after_return(size_t nargs, uintptr_t glue)
uintptr_t *retpc = ((uintptr_t *) sp) - 1;
dom->log(rust_log::TASK|rust_log::MEM,
- "run_after_return: overwriting retpc=0x%" PRIxPTR
- " @ runtime_sp=0x%" PRIxPTR
- " with glue=0x%" PRIxPTR,
+ "run_after_return: overwriting retpc=x%" PRIxPTR
+ " @ runtime_sp=x%" PRIxPTR
+ " with glue=x%" PRIxPTR,
*retpc, sp, glue);
// Move the current return address (which points into rust code)
@@ -296,9 +301,9 @@ rust_task::run_on_resume(uintptr_t glue)
uintptr_t* rsp = (uintptr_t*) rust_sp;
rsp += n_callee_saves;
dom->log(rust_log::TASK|rust_log::MEM,
- "run_on_resume: overwriting retpc=0x%" PRIxPTR
- " @ rust_sp=0x%" PRIxPTR
- " with glue=0x%" PRIxPTR,
+ "run_on_resume: overwriting retpc=x%" PRIxPTR
+ " @ rust_sp=x%" PRIxPTR
+ " with glue=x%" PRIxPTR,
*rsp, rsp, glue);
*rsp = glue;
}
@@ -306,8 +311,8 @@ rust_task::run_on_resume(uintptr_t glue)
void
rust_task::yield(size_t nargs)
{
- dom->log(rust_log::TASK,
- "task 0x%" PRIxPTR " yielding", this);
+ log(rust_log::TASK,
+ "task 0x%" PRIxPTR " yielding", this);
run_after_return(nargs, dom->root_crate->get_yield_glue());
}
@@ -322,11 +327,13 @@ rust_task::kill() {
// Note the distinction here: kill() is when you're in an upcall
// from task A and want to force-fail task B, you do B->kill().
// If you want to fail yourself you do self->fail(upcall_nargs).
- dom->log(rust_log::TASK, "killing task 0x%" PRIxPTR, this);
+ log(rust_log::TASK, "killing task 0x%" PRIxPTR, this);
// Unblock the task so it can unwind.
unblock();
+
if (this == dom->root_task)
dom->fail();
+
run_on_resume(dom->root_crate->get_unwind_glue());
}
@@ -369,9 +376,12 @@ void
rust_task::notify_waiting_tasks()
{
while (waiting_tasks.length() > 0) {
- rust_task *t = waiting_tasks.pop()->receiver;
- if (!t->dead())
- t->wakeup(this);
+ log(rust_log::ALL, "notify_waiting_tasks: %d",
+ waiting_tasks.length());
+ rust_task *waiting_task = waiting_tasks.pop()->receiver;
+ if (!waiting_task->dead()) {
+ waiting_task->wakeup(this);
+ }
}
}
@@ -532,6 +542,9 @@ void
rust_task::wakeup(rust_cond *from)
{
transition(&dom->blocked_tasks, &dom->running_tasks);
+ // TODO: Signaling every time the task is awaken is kind of silly,
+ // do this a nicer way.
+ dom->_progress.signal();
I(dom, cond == from);
}
@@ -565,6 +578,18 @@ rust_task::get_crate_cache(rust_crate const *curr_crate)
return cache;
}
+void
+rust_task::log(uint32_t type_bits, char const *fmt, ...) {
+ char buf[256];
+ if (dom->get_log().is_tracing(type_bits)) {
+ va_list args;
+ va_start(args, fmt);
+ vsnprintf(buf, sizeof(buf), fmt, args);
+ dom->get_log().trace_ln(this, type_bits, buf);
+ va_end(args);
+ }
+}
+
//
// Local Variables:
// mode: C++
diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h
new file mode 100644
index 00000000..879cb61a
--- /dev/null
+++ b/src/rt/rust_task.h
@@ -0,0 +1,107 @@
+/*
+ *
+ */
+
+#ifndef RUST_TASK_H
+#define RUST_TASK_H
+struct
+rust_task : public rust_proxy_delegate<rust_task>,
+ public dom_owned<rust_task>,
+ public rust_cond
+{
+ // Fields known to the compiler.
+ stk_seg *stk;
+ uintptr_t runtime_sp; // Runtime sp while task running.
+ uintptr_t rust_sp; // Saved sp when not running.
+ gc_alloc *gc_alloc_chain; // Linked list of GC allocations.
+ rust_dom *dom;
+ rust_crate_cache *cache;
+
+ // Fields known only to the runtime.
+ ptr_vec<rust_task> *state;
+ rust_cond *cond;
+ rust_task *supervisor; // Parent-link for failure propagation.
+ size_t idx;
+ size_t gc_alloc_thresh;
+ size_t gc_alloc_accum;
+
+ // Wait queue for tasks waiting for this task.
+ rust_wait_queue waiting_tasks;
+
+ // Rendezvous pointer for receiving data when blocked on a port. If we're
+ // trying to read data and no data is available on any incoming channel,
+ // we block on the port, and yield control to the scheduler. Since, we
+ // were not able to read anJything, we remember the location where the
+ // result should go in the rendezvous_ptr, and let the sender write to
+ // that location before waking us up.
+ uintptr_t* rendezvous_ptr;
+
+ rust_alarm alarm;
+
+ rust_task(rust_dom *dom,
+ rust_task *spawner);
+ ~rust_task();
+
+ void start(uintptr_t exit_task_glue,
+ uintptr_t spawnee_fn,
+ uintptr_t args,
+ size_t callsz);
+ void grow(size_t n_frame_bytes);
+ bool running();
+ bool blocked();
+ bool blocked_on(rust_cond *cond);
+ bool dead();
+
+ void link_gc(gc_alloc *gcm);
+ void unlink_gc(gc_alloc *gcm);
+ void *malloc(size_t sz, type_desc *td=0);
+ void *realloc(void *data, size_t sz, bool gc_mem=false);
+ void free(void *p, bool gc_mem=false);
+
+ const char *state_str();
+ void transition(ptr_vec<rust_task> *svec, ptr_vec<rust_task> *dvec);
+
+ void block(rust_cond *on);
+ void wakeup(rust_cond *from);
+ void die();
+ void unblock();
+
+ void check_active() { I(dom, dom->curr_task == this); }
+ void check_suspended() { I(dom, dom->curr_task != this); }
+
+ void log(uint32_t type_bits, char const *fmt, ...);
+
+ // Swap in some glue code to run when we have returned to the
+ // task's context (assuming we're the active task).
+ void run_after_return(size_t nargs, uintptr_t glue);
+
+ // Swap in some glue code to run when we're next activated
+ // (assuming we're the suspended task).
+ void run_on_resume(uintptr_t glue);
+
+ // Save callee-saved registers and return to the main loop.
+ void yield(size_t nargs);
+
+ // Fail this task (assuming caller-on-stack is different task).
+ void kill();
+
+ // Fail self, assuming caller-on-stack is this task.
+ void fail(size_t nargs);
+
+ // Run the gc glue on the task stack.
+ void gc(size_t nargs);
+
+ // Disconnect from our supervisor.
+ void unsupervise();
+
+ // Notify tasks waiting for us that we are about to die.
+ void notify_waiting_tasks();
+
+ uintptr_t get_fp();
+ uintptr_t get_previous_fp(uintptr_t fp);
+ frame_glue_fns *get_frame_glue_fns(uintptr_t fp);
+ rust_crate_cache * get_crate_cache(rust_crate const *curr_crate);
+};
+
+
+#endif /* RUST_TASK_H */
diff --git a/src/rt/rust_timer.cpp b/src/rt/rust_timer.cpp
index fdee3075..269942f5 100644
--- a/src/rt/rust_timer.cpp
+++ b/src/rt/rust_timer.cpp
@@ -1,4 +1,3 @@
-
#include "rust_internal.h"
#include "valgrind.h"
@@ -27,12 +26,11 @@ static void *
#else
#error "Platform not supported"
#endif
-timer_loop(void *ptr)
-{
+timer_loop(void *ptr) {
// We were handed the rust_timer that owns us.
rust_timer *timer = (rust_timer *)ptr;
- rust_dom &dom = timer->dom;
- dom.log(rust_log::TIMER, "in timer 0x%" PRIxPTR, (uintptr_t)timer);
+ rust_dom *dom = timer->dom;
+ dom->log(rust_log::TIMER, "in timer 0x%" PRIxPTR, (uintptr_t)timer);
size_t ms = TIME_SLICE_IN_MS;
if (!RUNNING_ON_VALGRIND)
ms = 1;
@@ -43,12 +41,10 @@ timer_loop(void *ptr)
#else
usleep(ms * 1000);
#endif
- dom.log(rust_log::TIMER,
- "timer 0x%" PRIxPTR
- " interrupting domain 0x%" PRIxPTR,
- (uintptr_t)timer,
- (uintptr_t)&dom);
- dom.interrupt_flag = 1;
+ dom->log(rust_log::TIMER, "timer 0x%" PRIxPTR
+ " interrupting domain 0x%" PRIxPTR, (uintptr_t) timer,
+ (uintptr_t) dom);
+ dom->interrupt_flag = 1;
}
#if defined(__WIN32__)
ExitThread(0);
@@ -58,10 +54,9 @@ timer_loop(void *ptr)
return 0;
}
-
-rust_timer::rust_timer(rust_dom &dom) : dom(dom), exit_flag(0)
-{
- dom.log(rust_log::TIMER, "creating timer for domain 0x%" PRIxPTR, &dom);
+rust_timer::rust_timer(rust_dom *dom) :
+ dom(dom), exit_flag(0) {
+ dom->log(rust_log::TIMER, "creating timer for domain 0x%" PRIxPTR, dom);
#if defined(__WIN32__)
thread = CreateThread(NULL, 0, timer_loop, this, 0, NULL);
dom.win32_require("CreateThread", thread != NULL);
@@ -76,13 +71,11 @@ rust_timer::rust_timer(rust_dom &dom) : dom(dom), exit_flag(0)
#endif
}
-rust_timer::~rust_timer()
-{
+rust_timer::~rust_timer() {
exit_flag = 1;
#if defined(__WIN32__)
- dom.win32_require("WaitForSingleObject",
- WaitForSingleObject(thread, INFINITE)
- == WAIT_OBJECT_0);
+ dom->win32_require("WaitForSingleObject",
+ WaitForSingleObject(thread, INFINITE) == WAIT_OBJECT_0);
#else
pthread_join(thread, NULL);
#endif
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index 99df91a6..1aaf89fb 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -1,127 +1,120 @@
-
#include "rust_internal.h"
-
// Upcalls.
#ifdef __GNUC__
#define LOG_UPCALL_ENTRY(task) \
(task)->dom->get_log().reset_indent(0); \
- (task)->dom->log(rust_log::UPCALL, \
- "upcall task: 0x%" PRIxPTR \
- " retpc: 0x%" PRIxPTR, \
- (task), __builtin_return_address(0)); \
+ (task)->log(rust_log::UPCALL, \
+ "> UPCALL %s - task: 0x%" PRIxPTR \
+ " retpc: x%" PRIxPTR, \
+ __FUNCTION__, \
+ (task), __builtin_return_address(0)); \
(task)->dom->get_log().indent();
#else
#define LOG_UPCALL_ENTRY(task) \
(task)->dom->get_log().reset_indent(0); \
- (task)->dom->log(rust_log::UPCALL, \
- "upcall task: 0x%" PRIxPTR (task)); \
+ (task)->log(rust_log::UPCALL, \
+ "> UPCALL task: x%" PRIxPTR (task)); \
(task)->dom->get_log().indent();
#endif
extern "C" CDECL char const *str_buf(rust_task *task, rust_str *s);
-extern "C" void
-upcall_grow_task(rust_task *task, size_t n_frame_bytes)
-{
+inline bool
+requires_message_passing(rust_task *sender, rust_task *receiver) {
+ return sender->dom != receiver->dom;
+}
+
+extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) {
LOG_UPCALL_ENTRY(task);
task->grow(n_frame_bytes);
}
-extern "C" CDECL void
-upcall_log_int(rust_task *task, int32_t i)
-{
+extern "C" CDECL void upcall_log_int(rust_task *task, int32_t i) {
LOG_UPCALL_ENTRY(task);
- task->dom->log(rust_log::UPCALL|rust_log::ULOG,
- "upcall log_int(0x%" PRIx32 " = %" PRId32 " = '%c')",
- i, i, (char)i);
+ task->log(rust_log::UPCALL | rust_log::ULOG,
+ "upcall log_int(0x%" PRIx32 " = %" PRId32 " = '%c')", i, i,
+ (char) i);
}
-extern "C" CDECL void
-upcall_log_str(rust_task *task, rust_str *str)
-{
+extern "C" CDECL void upcall_log_str(rust_task *task, rust_str *str) {
LOG_UPCALL_ENTRY(task);
const char *c = str_buf(task, str);
- task->dom->log(rust_log::UPCALL|rust_log::ULOG,
- "upcall log_str(\"%s\")",
- c);
+ task->log(rust_log::UPCALL | rust_log::ULOG, "upcall log_str(\"%s\")", c);
}
-extern "C" CDECL void
-upcall_trace_word(rust_task *task, uintptr_t i)
-{
+extern "C" CDECL void upcall_trace_word(rust_task *task, uintptr_t i) {
LOG_UPCALL_ENTRY(task);
- task->dom->log(rust_log::UPCALL|rust_log::TRACE,
- "trace: 0x%" PRIxPTR "",
- i, i, (char)i);
+ task->log(rust_log::UPCALL | rust_log::TRACE, "trace: 0x%" PRIxPTR "", i,
+ i, (char) i);
}
-extern "C" CDECL void
-upcall_trace_str(rust_task *task, char const *c)
-{
+extern "C" CDECL void upcall_trace_str(rust_task *task, char const *c) {
LOG_UPCALL_ENTRY(task);
- task->dom->log(rust_log::UPCALL|rust_log::TRACE,
- "trace: %s",
- c);
+ task->log(rust_log::UPCALL | rust_log::TRACE, "trace: %s", c);
}
extern "C" CDECL rust_port*
-upcall_new_port(rust_task *task, size_t unit_sz)
-{
+upcall_new_port(rust_task *task, size_t unit_sz) {
LOG_UPCALL_ENTRY(task);
rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM,
- "upcall_new_port(task=0x%" PRIxPTR ", unit_sz=%d)",
- (uintptr_t)task, unit_sz);
+ task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
+ "upcall_new_port(task=0x%" PRIxPTR ", unit_sz=%d)",
+ (uintptr_t) task, unit_sz);
return new (dom) rust_port(task, unit_sz);
}
-extern "C" CDECL void
-upcall_del_port(rust_task *task, rust_port *port)
-{
+extern "C" CDECL void upcall_del_port(rust_task *task, rust_port *port) {
LOG_UPCALL_ENTRY(task);
- task->dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM,
- "upcall del_port(0x%" PRIxPTR ")", (uintptr_t)port);
- I(task->dom, !port->refcnt);
+ task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
+ "upcall del_port(0x%" PRIxPTR ")", (uintptr_t) port);
+ I(task->dom, !port->ref_count);
delete port;
}
+/**
+ * Creates a new channel, pointed to a specified port.
+ */
extern "C" CDECL rust_chan*
-upcall_new_chan(rust_task *task, rust_port *port)
-{
+upcall_new_chan(rust_task *task, rust_port *port) {
LOG_UPCALL_ENTRY(task);
rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM,
- "upcall_new_chan(task=0x%" PRIxPTR ", port=0x%" PRIxPTR ")",
- (uintptr_t)task, port);
+ task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
+ "upcall_new_chan(task=0x%" PRIxPTR ", port=0x%" PRIxPTR ")",
+ (uintptr_t) task, port);
I(dom, port);
return new (dom) rust_chan(task, port);
}
-extern "C" CDECL void
-upcall_del_chan(rust_task *task, rust_chan *chan)
-{
+/**
+ * Called whenever the channel's ref count drops to zero.
+ */
+extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM,
- "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t)chan);
- I(dom, !chan->refcnt);
+ task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
+ "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
+ I(dom, !chan->ref_count);
delete chan;
}
+/**
+ * Clones a channel and stores it in the spawnee's domain. Each spawned task
+ * has it's own copy of the channel.
+ */
extern "C" CDECL rust_chan *
-upcall_clone_chan(rust_task *task, rust_task *owner, rust_chan *chan)
-{
+upcall_clone_chan(rust_task *task,
+ rust_proxy_delegate<rust_task> *spawnee_proxy,
+ rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
- rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::COMM,
- "upcall clone_chan(owner 0x%" PRIxPTR ", chan 0x%" PRIxPTR ")",
- (uintptr_t)owner, (uintptr_t)chan);
- return new (owner->dom) rust_chan(owner, chan->port);
+ rust_task *spawnee = spawnee_proxy->delegate();
+ task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
+ "spawnee: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR,
+ (uintptr_t) spawnee, (uintptr_t) chan);
+ return new (spawnee->dom) rust_chan(spawnee, chan->port);
}
-
/*
* Buffering protocol:
*
@@ -143,74 +136,64 @@ upcall_clone_chan(rust_task *task, rust_task *owner, rust_chan *chan)
* - Set blocked writer to running
*
*/
+//
+//static int
+//attempt_transmission(rust_dom *dom, rust_chan *src, rust_task *dst) {
+// I(dom, src);
+// I(dom, dst);
+//
+// rust_port *port = src->port;
+// if (!port) {
+// dom->log(rust_log::COMM, "src died, transmission incomplete");
+// return 0;
+// }
+//
+// circular_buffer *buf = &src->buffer;
+// if (buf->is_empty()) {
+// dom->log(rust_log::COMM, "buffer empty, transmission incomplete");
+// return 0;
+// }
+//
+// if (!dst->blocked_on(port)) {
+// dom->log(rust_log::COMM,
+// "dst in non-reading state, transmission incomplete");
+// return 0;
+// }
+//
+// uintptr_t *dptr = dst->dptr;
+// dom->log(rust_log::COMM, "receiving %d bytes into dst_task=0x%" PRIxPTR
+// ", dptr=0x%" PRIxPTR, port->unit_sz, dst, dptr);
+// buf->dequeue(dptr);
+//
+// // Wake up the sender if its waiting for the send operation.
+// rust_task *sender = src->task;
+// rust_token *token = &src->token;
+// if (sender->blocked_on(token))
+// sender->wakeup(token);
+//
+// // Wake up the receiver, there is new data.
+// dst->wakeup(port);
+//
+// dom->log(rust_log::COMM, "transmission complete");
+// return 1;
+//}
-static int
-attempt_transmission(rust_dom *dom,
- rust_chan *src,
- rust_task *dst)
-{
- I(dom, src);
- I(dom, dst);
-
- rust_port *port = src->port;
- if (!port) {
- dom->log(rust_log::COMM,
- "src died, transmission incomplete");
- return 0;
- }
-
- circ_buf *buf = &src->buffer;
- if (buf->unread == 0) {
- dom->log(rust_log::COMM,
- "buffer empty, transmission incomplete");
- return 0;
- }
-
- if (!dst->blocked_on(port)) {
- dom->log(rust_log::COMM,
- "dst in non-reading state, transmission incomplete");
- return 0;
- }
-
- uintptr_t *dptr = dst->dptr;
- dom->log(rust_log::COMM,
- "receiving %d bytes into dst_task=0x%" PRIxPTR
- ", dptr=0x%" PRIxPTR,
- port->unit_sz, dst, dptr);
- buf->shift(dptr);
-
- // Wake up the sender if its waiting for the send operation.
- rust_task *sender = src->task;
- rust_token *token = &src->token;
- if (sender->blocked_on(token))
- sender->wakeup(token);
-
- // Wake up the receiver, there is new data.
- dst->wakeup(port);
-
- dom->log(rust_log::COMM, "transmission complete");
- return 1;
-}
-
-extern "C" CDECL void
-upcall_yield(rust_task *task)
-{
+extern "C" CDECL void upcall_yield(rust_task *task) {
LOG_UPCALL_ENTRY(task);
- rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::COMM, "upcall yield()");
+ task->log(rust_log::UPCALL | rust_log::COMM, "upcall yield()");
task->yield(1);
}
-extern "C" CDECL void
-upcall_join(rust_task *task, rust_task *other)
-{
+extern "C" CDECL void upcall_join(rust_task *task,
+ rust_proxy_delegate<rust_task> *proxy) {
LOG_UPCALL_ENTRY(task);
- rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::COMM,
- "upcall join(other=0x%" PRIxPTR ")",
- (uintptr_t)other);
+ task->log(rust_log::UPCALL | rust_log::COMM,
+ "join proxy 0x%" PRIxPTR " -> task = 0x%" PRIxPTR,
+ proxy, proxy->delegate());
- // If the other task is already dying, we dont have to wait for it.
+ rust_task *other = proxy->delegate();
+
+ // If the other task is already dying, we don't have to wait for it.
if (!other->dead()) {
other->waiting_tasks.push(&task->alarm);
task->block(other);
@@ -218,106 +201,91 @@ upcall_join(rust_task *task, rust_task *other)
}
}
+/**
+ * Sends an chunk of data along the specified channel.
+ *
+ * sptr: pointer to a chunk of data to send
+ */
extern "C" CDECL void
-upcall_send(rust_task *task, rust_chan *chan, void *sptr)
-{
+upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
LOG_UPCALL_ENTRY(task);
- rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::COMM,
- "upcall send(chan=0x%" PRIxPTR ", sptr=0x%" PRIxPTR ")",
- (uintptr_t)chan,
- (uintptr_t)sptr);
+ task->log(rust_log::UPCALL | rust_log::COMM,
+ "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
+ (uintptr_t) chan, (uintptr_t) sptr, chan->port->unit_sz);
- I(dom, chan);
- I(dom, sptr);
-
- rust_port *port = chan->port;
- dom->log(rust_log::MEM|rust_log::COMM,
- "send to port", (uintptr_t)port);
- I(dom, port);
-
- rust_token *token = &chan->token;
- dom->log(rust_log::MEM|rust_log::COMM,
- "sending via token 0x%" PRIxPTR,
- (uintptr_t)token);
-
- if (port->task) {
- chan->buffer.push(sptr);
- task->block(token);
- attempt_transmission(dom, chan, port->task);
- if (chan->buffer.unread && !token->pending())
- token->submit();
- } else {
- dom->log(rust_log::COMM|rust_log::ERR,
- "port has no task (possibly throw?)");
- }
-
- if (!task->running())
- task->yield(3);
+ chan->buffer.enqueue(sptr);
+ chan->transmit();
+ task->log(rust_log::COMM, "=== WROTE DATA ===>");
}
extern "C" CDECL void
-upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port)
-{
+upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
LOG_UPCALL_ENTRY(task);
- rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::COMM,
- "upcall recv(dptr=0x%" PRIxPTR ", port=0x%" PRIxPTR ")",
- (uintptr_t)dptr,
- (uintptr_t)port);
+ task->log(rust_log::UPCALL | rust_log::COMM,
+ "port: 0x%" PRIxPTR ", dptr: 0x%" PRIxPTR
+ ", size: 0x%" PRIxPTR ", chan_no: %d",
+ (uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
+ port->chans.length());
+
+ for (uint32_t i = 0; i < port->chans.length(); i++) {
+ rust_chan *chan = port->chans[i];
+ if (chan->buffer.is_empty() == false) {
+ chan->buffer.dequeue(dptr);
+ task->log(rust_log::COMM, "<=== READ DATA ===");
+ return;
+ }
+ }
- I(dom, port);
- I(dom, port->task);
- I(dom, task);
- I(dom, port->task == task);
+ // No data was buffered on any incoming channel, so block this task
+ // on the port. Remember the rendezvous location so that any sender
+ // task can write to it before waking up this task.
+ task->rendezvous_ptr = dptr;
task->block(port);
-
- if (port->writers.length() > 0) {
- I(dom, task->dom);
- size_t i = rand(&dom->rctx);
- i %= port->writers.length();
- rust_token *token = port->writers[i];
- rust_chan *chan = token->chan;
- if (attempt_transmission(dom, chan, task))
- token->withdraw();
- } else {
- dom->log(rust_log::COMM,
- "no writers sending to port", (uintptr_t)port);
- }
-
- if (!task->running()) {
- task->dptr = dptr;
- task->yield(3);
- }
+ task->yield(3);
}
-extern "C" CDECL void
-upcall_fail(rust_task *task, char const *expr, char const *file, size_t line)
-{
+extern "C" CDECL void upcall_fail(rust_task *task, char const *expr,
+ char const *file, size_t line) {
LOG_UPCALL_ENTRY(task);
- task->dom->log(rust_log::UPCALL|rust_log::ERR,
- "upcall fail '%s', %s:%" PRIdPTR,
- expr, file, line);
+ task->log(rust_log::UPCALL | rust_log::ERR,
+ "upcall fail '%s', %s:%" PRIdPTR, expr, file, line);
task->fail(4);
}
+/**
+ * Called whenever a task's ref count drops to zero.
+ */
extern "C" CDECL void
-upcall_kill(rust_task *task, rust_task *target)
-{
+upcall_kill(rust_task *task, rust_proxy_delegate<rust_task> *target_proxy) {
LOG_UPCALL_ENTRY(task);
- task->dom->log(rust_log::UPCALL|rust_log::TASK,
- "upcall kill target=0x%" PRIxPTR, target);
- target->kill();
+ rust_task *target_task = target_proxy->delegate();
+ if (target_proxy != target_task) {
+ task->dom->free(target_proxy);
+ }
+ task->log(rust_log::UPCALL | rust_log::TASK,
+ "kill task 0x%" PRIxPTR ", ref count %d",
+ target_task,
+ target_task->ref_count);
+
+ if (requires_message_passing(task, target_task)) {
+ rust_dom *target_domain = target_task->dom;
+ target_domain->send_message(
+ new (target_domain)
+ kill_task_message(target_domain, target_task));
+ } else {
+ target_task->kill();
+ }
}
+/**
+ * Called by the exit glue when the task terminates.
+ */
extern "C" CDECL void
-upcall_exit(rust_task *task)
-{
+upcall_exit(rust_task *task) {
LOG_UPCALL_ENTRY(task);
-
- rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::TASK, "upcall exit");
+ task->log(rust_log::UPCALL | rust_log::TASK,
+ "task ref_count: %d", task->ref_count);
task->die();
task->notify_waiting_tasks();
task->yield(1);
@@ -341,11 +309,13 @@ upcall_malloc(rust_task *task, size_t nbytes, type_desc *td)
return (uintptr_t) p;
}
+/**
+ * Called whenever an object's ref count drops to zero.
+ */
extern "C" CDECL void
upcall_free(rust_task *task, void* ptr, uintptr_t is_gc)
{
LOG_UPCALL_ENTRY(task);
-
rust_dom *dom = task->dom;
dom->log(rust_log::UPCALL|rust_log::MEM,
"upcall free(0x%" PRIxPTR ")",
@@ -371,22 +341,19 @@ upcall_mark(rust_task *task, void* ptr)
}
extern "C" CDECL rust_str *
-upcall_new_str(rust_task *task, char const *s, size_t fill)
-{
+upcall_new_str(rust_task *task, char const *s, size_t fill) {
LOG_UPCALL_ENTRY(task);
rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::MEM,
- "upcall new_str('%s', %" PRIdPTR ")", s, fill);
size_t alloc = next_power_of_two(sizeof(rust_str) + fill);
void *mem = dom->malloc(alloc);
if (!mem) {
task->fail(3);
return NULL;
}
- rust_str *st = new (mem) rust_str(dom, alloc, fill, (uint8_t const *)s);
- dom->log(rust_log::UPCALL|rust_log::MEM,
- "upcall new_str('%s', %" PRIdPTR ") = 0x%" PRIxPTR,
- s, fill, st);
+ rust_str *st = new (mem) rust_str(dom, alloc, fill, (uint8_t const *) s);
+ task->log(rust_log::UPCALL | rust_log::MEM,
+ "upcall new_str('%s', %" PRIdPTR ") = 0x%" PRIxPTR,
+ s, fill, st);
return st;
}
@@ -405,34 +372,32 @@ upcall_new_vec(rust_task *task, size_t fill, type_desc *td)
return NULL;
}
rust_vec *v = new (mem) rust_vec(dom, alloc, 0, NULL);
- dom->log(rust_log::UPCALL|rust_log::MEM,
- "upcall new_vec(%" PRIdPTR ") = 0x%" PRIxPTR,
- fill, v);
+ task->log(rust_log::UPCALL | rust_log::MEM,
+ "upcall new_vec(%" PRIdPTR ") = 0x%" PRIxPTR, fill, v);
return v;
}
-
extern "C" CDECL rust_str *
upcall_vec_grow(rust_task *task, rust_vec *v, size_t n_bytes, uintptr_t is_gc)
{
LOG_UPCALL_ENTRY(task);
rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::MEM,
+ task->log(rust_log::UPCALL|rust_log::MEM,
"upcall vec_grow(%" PRIxPTR ", %" PRIdPTR
"), alloc=%" PRIdPTR ", fill=%" PRIdPTR,
v, n_bytes, v->alloc, v->fill);
size_t alloc = next_power_of_two(sizeof(rust_vec) + v->fill + n_bytes);
- if (v->refcnt == 1) {
+ if (v->ref_count == 1) {
// Fastest path: already large enough.
if (v->alloc >= alloc) {
- dom->log(rust_log::UPCALL|rust_log::MEM, "no-growth path");
+ task->log(rust_log::UPCALL | rust_log::MEM, "no-growth path");
return v;
}
// Second-fastest path: can at least realloc.
- dom->log(rust_log::UPCALL|rust_log::MEM, "realloc path");
- v = (rust_vec*)dom->realloc(v, alloc);
+ task->log(rust_log::UPCALL | rust_log::MEM, "realloc path");
+ v = (rust_vec*) dom->realloc(v, alloc);
if (!v) {
task->fail(4);
return NULL;
@@ -441,7 +406,7 @@ upcall_vec_grow(rust_task *task, rust_vec *v, size_t n_bytes, uintptr_t is_gc)
} else {
// Slowest path: make a new vec.
- dom->log(rust_log::UPCALL|rust_log::MEM, "new vec path");
+ task->log(rust_log::UPCALL | rust_log::MEM, "new vec path");
void *mem = dom->malloc(alloc);
if (!mem) {
task->fail(4);
@@ -454,121 +419,100 @@ upcall_vec_grow(rust_task *task, rust_vec *v, size_t n_bytes, uintptr_t is_gc)
return v;
}
-
static rust_crate_cache::c_sym *
-fetch_c_sym(rust_task *task,
- rust_crate const *curr_crate,
- size_t lib_num,
- size_t c_sym_num,
- char const *library,
- char const *symbol)
-{
+fetch_c_sym(rust_task *task, rust_crate const *curr_crate, size_t lib_num,
+ size_t c_sym_num, char const *library, char const *symbol) {
rust_crate_cache *cache = task->get_crate_cache(curr_crate);
rust_crate_cache::lib *l = cache->get_lib(lib_num, library);
return cache->get_c_sym(c_sym_num, l, symbol);
}
-extern "C" CDECL uintptr_t
-upcall_require_rust_sym(rust_task *task,
- rust_crate const *curr_crate,
- size_t lib_num, // # of lib
- size_t c_sym_num, // # of C sym "rust_crate" in lib
- size_t rust_sym_num, // # of rust sym
- char const *library,
- char const **path)
-{
+extern "C" CDECL uintptr_t upcall_require_rust_sym(rust_task *task,
+ rust_crate const *curr_crate, size_t lib_num, // # of lib
+ size_t c_sym_num, // # of C sym "rust_crate" in lib
+ size_t rust_sym_num, // # of rust sym
+ char const *library, char const **path) {
LOG_UPCALL_ENTRY(task);
rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::CACHE,
- "upcall require rust sym: lib #%" PRIdPTR
- " = %s, c_sym #%" PRIdPTR
- ", rust_sym #%" PRIdPTR
- ", curr_crate = 0x%" PRIxPTR,
- lib_num, library, c_sym_num, rust_sym_num,
- curr_crate);
+ task->log(rust_log::UPCALL | rust_log::CACHE,
+ "upcall require rust sym: lib #%" PRIdPTR
+ " = %s, c_sym #%" PRIdPTR
+ ", rust_sym #%" PRIdPTR
+ ", curr_crate = 0x%" PRIxPTR, lib_num, library, c_sym_num,
+ rust_sym_num, curr_crate);
for (char const **c = crate_rel(curr_crate, path); *c; ++c) {
- dom->log(rust_log::UPCALL, " + %s", crate_rel(curr_crate, *c));
+ task->log(rust_log::UPCALL, " + %s", crate_rel(curr_crate, *c));
}
- dom->log(rust_log::UPCALL|rust_log::CACHE,
- "require C symbol 'rust_crate' from lib #%" PRIdPTR,lib_num);
+ task->log(rust_log::UPCALL | rust_log::CACHE,
+ "require C symbol 'rust_crate' from lib #%" PRIdPTR, lib_num);
rust_crate_cache::c_sym *c =
- fetch_c_sym(task, curr_crate, lib_num, c_sym_num,
- library, "rust_crate");
+ fetch_c_sym(task, curr_crate, lib_num, c_sym_num, library,
+ "rust_crate");
- dom->log(rust_log::UPCALL|rust_log::CACHE,
- "require rust symbol inside crate");
- rust_crate_cache::rust_sym *s =
- task->cache->get_rust_sym(rust_sym_num, dom, curr_crate, c, path);
+ task->log(rust_log::UPCALL | rust_log::CACHE,
+ "require rust symbol inside crate");
+ rust_crate_cache::rust_sym *s = task->cache->get_rust_sym(rust_sym_num,
+ dom,
+ curr_crate, c,
+ path);
uintptr_t addr = s->get_val();
if (addr) {
- dom->log(rust_log::UPCALL|rust_log::CACHE,
- "found-or-cached addr: 0x%" PRIxPTR, addr);
+ task->log(rust_log::UPCALL | rust_log::CACHE,
+ "found-or-cached addr: 0x%" PRIxPTR, addr);
} else {
- dom->log(rust_log::UPCALL|rust_log::CACHE,
- "failed to resolve symbol");
+ task->log(rust_log::UPCALL | rust_log::CACHE,
+ "failed to resolve symbol");
task->fail(7);
}
return addr;
}
-extern "C" CDECL uintptr_t
-upcall_require_c_sym(rust_task *task,
- rust_crate const *curr_crate,
- size_t lib_num, // # of lib
- size_t c_sym_num, // # of C sym
- char const *library,
- char const *symbol)
-{
+extern "C" CDECL uintptr_t upcall_require_c_sym(rust_task *task,
+ rust_crate const *curr_crate, size_t lib_num, // # of lib
+ size_t c_sym_num, // # of C sym
+ char const *library, char const *symbol) {
LOG_UPCALL_ENTRY(task);
- rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::CACHE,
- "upcall require c sym: lib #%" PRIdPTR
- " = %s, c_sym #%" PRIdPTR
- " = %s"
- ", curr_crate = 0x%" PRIxPTR,
- lib_num, library, c_sym_num, symbol, curr_crate);
+ task->log(rust_log::UPCALL | rust_log::CACHE,
+ "upcall require c sym: lib #%" PRIdPTR
+ " = %s, c_sym #%" PRIdPTR
+ " = %s"
+ ", curr_crate = 0x%" PRIxPTR, lib_num, library, c_sym_num,
+ symbol, curr_crate);
- rust_crate_cache::c_sym *c =
- fetch_c_sym(task, curr_crate, lib_num, c_sym_num, library, symbol);
+ rust_crate_cache::c_sym *c = fetch_c_sym(task, curr_crate, lib_num,
+ c_sym_num, library, symbol);
uintptr_t addr = c->get_val();
if (addr) {
- dom->log(rust_log::UPCALL|rust_log::CACHE,
- "found-or-cached addr: 0x%" PRIxPTR, addr);
+ task->log(rust_log::UPCALL | rust_log::CACHE,
+ "found-or-cached addr: 0x%" PRIxPTR, addr);
} else {
- dom->log(rust_log::UPCALL|rust_log::CACHE,
- "failed to resolve symbol");
+ task->log(rust_log::UPCALL | rust_log::CACHE,
+ "failed to resolve symbol");
task->fail(6);
}
return addr;
}
extern "C" CDECL type_desc *
-upcall_get_type_desc(rust_task *task,
- rust_crate const *curr_crate,
- size_t size,
- size_t align,
- size_t n_descs,
- type_desc const **descs)
-{
+upcall_get_type_desc(rust_task *task, rust_crate const *curr_crate,
+ size_t size, size_t align, size_t n_descs, type_desc const **descs) {
LOG_UPCALL_ENTRY(task);
- rust_dom *dom = task->dom;
- dom->log(rust_log::UPCALL|rust_log::CACHE,
- "upcall get_type_desc with size=%" PRIdPTR
- ", align=%" PRIdPTR ", %" PRIdPTR " descs",
- size, align, n_descs);
+ task->log(rust_log::UPCALL | rust_log::CACHE,
+ "upcall get_type_desc with size=%" PRIdPTR
+ ", align=%" PRIdPTR ", %" PRIdPTR " descs", size, align,
+ n_descs);
rust_crate_cache *cache = task->get_crate_cache(curr_crate);
type_desc *td = cache->get_type_desc(size, align, n_descs, descs);
- dom->log(rust_log::UPCALL|rust_log::CACHE,
- "returning tydesc 0x%" PRIxPTR, td);
+ task->log(rust_log::UPCALL | rust_log::CACHE,
+ "returning tydesc 0x%" PRIxPTR, td);
return td;
}
-
#if defined(__WIN32__)
static DWORD WINAPI rust_thread_start(void *ptr)
#elif defined(__GNUC__)
@@ -578,10 +522,10 @@ static void *rust_thread_start(void *ptr)
#endif
{
// We were handed the domain we are supposed to run.
- rust_dom *dom = (rust_dom *)ptr;
+ rust_dom *dom = (rust_dom *) ptr;
// Start a new rust main loop for this thread.
- rust_main_loop(dom);
+ dom->start_main_loop();
rust_srv *srv = dom->srv;
delete dom;
@@ -591,81 +535,77 @@ static void *rust_thread_start(void *ptr)
}
extern "C" CDECL rust_task *
-upcall_new_task(rust_task *spawner)
-{
+upcall_new_task(rust_task *spawner) {
LOG_UPCALL_ENTRY(spawner);
rust_dom *dom = spawner->dom;
rust_task *task = new (dom) rust_task(dom, spawner);
- dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::TASK,
+ dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK,
"upcall new_task(spawner 0x%" PRIxPTR ") = 0x%" PRIxPTR,
spawner, task);
return task;
}
extern "C" CDECL rust_task *
-upcall_start_task(rust_task *spawner,
- rust_task *task,
- uintptr_t exit_task_glue,
- uintptr_t spawnee_fn,
- size_t callsz)
-{
+upcall_start_task(rust_task *spawner, rust_task *task,
+ uintptr_t exit_task_glue, uintptr_t spawnee_fn, size_t callsz) {
LOG_UPCALL_ENTRY(spawner);
rust_dom *dom = spawner->dom;
- dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::TASK,
+ dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK,
"upcall start_task(task 0x%" PRIxPTR
" exit_task_glue 0x%" PRIxPTR
", spawnee 0x%" PRIxPTR
- ", callsz %" PRIdPTR ")",
- task, exit_task_glue, spawnee_fn, callsz);
+ ", callsz %" PRIdPTR ")", task, exit_task_glue, spawnee_fn,
+ callsz);
task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz);
return task;
}
-extern "C" CDECL rust_task *
-upcall_new_thread(rust_task *task)
-{
+extern "C" CDECL rust_proxy_delegate<rust_task> *
+upcall_new_thread(rust_task *task) {
LOG_UPCALL_ENTRY(task);
rust_dom *old_dom = task->dom;
rust_dom *new_dom = new rust_dom(old_dom->srv->clone(),
old_dom->root_crate);
- new_dom->log(rust_log::UPCALL|rust_log::MEM,
- "upcall new_thread() = 0x%" PRIxPTR,
- new_dom->root_task);
- return new_dom->root_task;
+
+ task->log(rust_log::UPCALL | rust_log::MEM,
+ "upcall new_thread() = dom 0x%" PRIxPTR " task 0x%" PRIxPTR,
+ new_dom, new_dom->root_task);
+ rust_proxy<rust_task> *proxy =
+ new (old_dom) rust_proxy<rust_task>(old_dom, new_dom->root_task);
+ task->log(rust_log::UPCALL | rust_log::MEM,
+ "new proxy = 0x%" PRIxPTR " -> task = 0x%" PRIxPTR,
+ proxy, proxy->delegate());
+ return proxy;
}
-extern "C" CDECL rust_task *
+extern "C" CDECL rust_proxy_delegate<rust_task> *
upcall_start_thread(rust_task *spawner,
- rust_task *root_task,
- uintptr_t exit_task_glue,
- uintptr_t spawnee_fn,
- size_t callsz)
-{
+ rust_proxy_delegate<rust_task> *root_task_proxy,
+ uintptr_t exit_task_glue, uintptr_t spawnee_fn, size_t callsz) {
LOG_UPCALL_ENTRY(spawner);
rust_dom *dom = spawner->dom;
- dom->log(rust_log::UPCALL|rust_log::MEM|rust_log::TASK,
+ rust_task *root_task = root_task_proxy->delegate();
+ dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK,
"upcall start_thread(exit_task_glue 0x%" PRIxPTR
", spawnee 0x%" PRIxPTR
- ", callsz %" PRIdPTR ")",
- exit_task_glue, spawnee_fn, callsz);
+ ", callsz %" PRIdPTR ")", exit_task_glue, spawnee_fn, callsz);
root_task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz);
#if defined(__WIN32__)
HANDLE thread;
thread = CreateThread(NULL, 0, rust_thread_start, root_task->dom,
- 0, NULL);
+ 0, NULL);
dom->win32_require("CreateThread", thread != NULL);
#else
pthread_t thread;
pthread_create(&thread, &dom->attr, rust_thread_start,
- (void *)root_task->dom);
+ (void *) root_task->dom);
#endif
-
- return 0;
+ return root_task_proxy;
}
//
diff --git a/src/rt/rust_util.h b/src/rt/rust_util.h
index 6f34dad9..62ac7de2 100644
--- a/src/rt/rust_util.h
+++ b/src/rt/rust_util.h
@@ -5,7 +5,7 @@
template <typename T>
rc_base<T>::rc_base() :
- refcnt(1)
+ ref_count(1)
{
}
@@ -85,7 +85,7 @@ ptr_vec<T>::trim(size_t sz)
template <typename T>
void
-ptr_vec<T>::swapdel(T *item)
+ptr_vec<T>::swap_delete(T *item)
{
/* Swap the endpoint into i and decr fill. */
I(dom, data);
diff --git a/src/rt/sync/condition_variable.cpp b/src/rt/sync/condition_variable.cpp
new file mode 100644
index 00000000..d4032572
--- /dev/null
+++ b/src/rt/sync/condition_variable.cpp
@@ -0,0 +1,66 @@
+#include "../globals.h"
+
+/*
+ * Conditional variable. Implemented using pthreads condition variables, and
+ * using events on windows.
+ */
+
+#include "condition_variable.h"
+
+// #define TRACE
+
+condition_variable::condition_variable() {
+#if defined(__WIN32__)
+ _event = CreateEvent(NULL, FALSE, FALSE, NULL);
+#else
+ pthread_cond_init(&_cond, NULL);
+ pthread_mutex_init(&_mutex, NULL);
+#endif
+}
+
+condition_variable::~condition_variable() {
+#if defined(__WIN32__)
+ CloseHandle(_event);
+#else
+ pthread_cond_destroy(&_cond);
+ pthread_mutex_destroy(&_mutex);
+#endif
+}
+
+/**
+ * Wait indefinitely until condition is signaled.
+ */
+void condition_variable::wait() {
+#ifdef TRACE
+ printf("waiting on condition_variable: 0x%" PRIxPTR "\n",
+ (uintptr_t)this);
+#endif
+#if defined(__WIN32__)
+ WaitForSingleObject(_event, INFINITE);
+#else
+ pthread_mutex_lock(&_mutex);
+ pthread_cond_wait(&_cond, &_mutex);
+ pthread_mutex_unlock(&_mutex);
+#endif
+#ifdef TRACE
+ printf("resumed on condition_variable: 0x%" PRIxPTR "\n",
+ (uintptr_t)this);
+#endif
+}
+
+/**
+ * Signal condition, and resume the waiting thread.
+ */
+void condition_variable::signal() {
+#if defined(__WIN32__)
+ SetEvent(_event);
+#else
+ pthread_mutex_lock(&_mutex);
+ pthread_cond_signal(&_cond);
+ pthread_mutex_unlock(&_mutex);
+#endif
+#ifdef TRACE
+ printf("signal condition_variable: 0x%" PRIxPTR "\n",
+ (uintptr_t)this);
+#endif
+}
diff --git a/src/rt/sync/condition_variable.h b/src/rt/sync/condition_variable.h
new file mode 100644
index 00000000..f847ef99
--- /dev/null
+++ b/src/rt/sync/condition_variable.h
@@ -0,0 +1,19 @@
+#ifndef CONDITION_VARIABLE_H
+#define CONDITION_VARIABLE_H
+
+class condition_variable {
+#if defined(__WIN32__)
+ HANDLE _event;
+#else
+ pthread_cond_t _cond;
+ pthread_mutex_t _mutex;
+#endif
+public:
+ condition_variable();
+ virtual ~condition_variable();
+
+ void wait();
+ void signal();
+};
+
+#endif /* CONDITION_VARIABLE_H */
diff --git a/src/rt/sync/lock_free_queue.cpp b/src/rt/sync/lock_free_queue.cpp
index 9d1081de..69241ece 100644
--- a/src/rt/sync/lock_free_queue.cpp
+++ b/src/rt/sync/lock_free_queue.cpp
@@ -5,33 +5,46 @@
* dequeue() is not allowed to interrupt itself.
*/
+#include "../globals.h"
#include "lock_free_queue.h"
-lock_free_queue::lock_free_queue() :
- tail(this) {
+lock_free_queue_node::lock_free_queue_node() : next(NULL) {
+
+}
+
+lock_free_queue::lock_free_queue() : _tail(this) {
+
}
-void lock_free_queue::enqueue(lock_free_queue_node *item) {
- item->next = (lock_free_queue_node *) 0;
- lock_free_queue_node *last = tail;
- tail = item;
- while (last->next)
+void
+lock_free_queue::enqueue(lock_free_queue_node *item) {
+ item->next = (lock_free_queue_node *) NULL;
+ lock_free_queue_node *last = _tail;
+ _tail = item;
+ while (last->next) {
last = last->next;
+ }
last->next = item;
}
-lock_free_queue_node *lockfree_queue::dequeue() {
+lock_free_queue_node *
+lock_free_queue::dequeue() {
lock_free_queue_node *item = next;
if (item && !(next = item->next)) {
- tail = (lock_free_queue_node *) this;
+ _tail = (lock_free_queue_node *) this;
if (item->next) {
lock_free_queue_node *lost = item->next;
lock_free_queue_node *help;
do {
help = lost->next;
enqueue(lost);
- } while ((lost = help) != (lock_free_queue_node *) 0);
+ } while ((lost = help) != (lock_free_queue_node *) NULL);
}
}
return item;
}
+
+bool
+lock_free_queue::is_empty() {
+ return next == NULL;
+}
diff --git a/src/rt/sync/lock_free_queue.h b/src/rt/sync/lock_free_queue.h
index fba4aa9a..1f09ec52 100644
--- a/src/rt/sync/lock_free_queue.h
+++ b/src/rt/sync/lock_free_queue.h
@@ -2,14 +2,18 @@
#define LOCK_FREE_QUEUE_H
class lock_free_queue_node {
+public:
lock_free_queue_node *next;
+ lock_free_queue_node();
};
-class lock_free_queue {
+class lock_free_queue : lock_free_queue_node {
+ lock_free_queue_node *_tail;
public:
lock_free_queue();
void enqueue(lock_free_queue_node *item);
lock_free_queue_node *dequeue();
+ bool is_empty();
};
#endif /* LOCK_FREE_QUEUE_H */
diff --git a/src/rt/sync/spin_lock.cpp b/src/rt/sync/spin_lock.cpp
index 11a5cb20..4a113d1a 100644
--- a/src/rt/sync/spin_lock.cpp
+++ b/src/rt/sync/spin_lock.cpp
@@ -1,9 +1,10 @@
+#include "../globals.h"
+#include "spin_lock.h"
+
/*
* Your average spin lock.
*/
-#include "globals.h"
-
// #define TRACE
spin_lock::spin_lock() {
diff --git a/src/rt/sync/spin_lock.h b/src/rt/sync/spin_lock.h
index 3684c23a..f15416a2 100644
--- a/src/rt/sync/spin_lock.h
+++ b/src/rt/sync/spin_lock.h
@@ -1,5 +1,5 @@
-#ifndef UNFAIR_TICKET_LOCK_H
-#define UNFAIR_TICKET_LOCK_H
+#ifndef SPIN_LOCK_H
+#define SPIN_LOCK_H
class spin_lock {
unsigned ticket;
@@ -11,4 +11,4 @@ public:
void unlock();
};
-#endif /* UNFAIR_TICKET_LOCK_H */
+#endif /* SPIN_LOCK_H */
diff --git a/src/rt/util/array_list.h b/src/rt/util/array_list.h
index 0d112575..04fd833f 100644
--- a/src/rt/util/array_list.h
+++ b/src/rt/util/array_list.h
@@ -13,38 +13,44 @@ public:
array_list();
~array_list();
size_t size();
- void append(T value);
+ int32_t append(T value);
T replace(T old_value, T new_value);
- size_t index_of(T value);
+ int32_t index_of(T value);
T & operator[](size_t index);
};
-template<typename T> array_list<T>::array_list() {
+template<typename T>
+array_list<T>::array_list() {
_capacity = INITIAL_CAPACITY;
_data = (T *) malloc(sizeof(T) * _capacity);
}
-template<typename T> array_list<T>::~array_list() {
+template<typename T>
+array_list<T>::~array_list() {
delete _data;
}
-template<typename T> size_t array_list<T>::size() {
+template<typename T> size_t
+array_list<T>::size() {
return _size;
}
-template<typename T> void array_list<T>::append(T value) {
+template<typename T> int32_t
+array_list<T>::append(T value) {
if (_size == _capacity) {
_capacity = _capacity * 2;
_data = (T *) realloc(_data, _capacity * sizeof(T));
}
- _data[_size++] = value;
+ _data[_size ++] = value;
+ return _size - 1;
}
/**
* Replaces the old_value in the list with the new_value.
* Returns the old_value if the replacement succeeded, or NULL otherwise.
*/
-template<typename T> T array_list<T>::replace(T old_value, T new_value) {
+template<typename T> T
+array_list<T>::replace(T old_value, T new_value) {
int index = index_of(old_value);
if (index < 0) {
return NULL;
@@ -53,7 +59,8 @@ template<typename T> T array_list<T>::replace(T old_value, T new_value) {
return old_value;
}
-template<typename T> size_t array_list<T>::index_of(T value) {
+template<typename T> int32_t
+array_list<T>::index_of(T value) {
for (size_t i = 0; i < _size; i++) {
if (_data[i] == value) {
return i;
@@ -62,7 +69,8 @@ template<typename T> size_t array_list<T>::index_of(T value) {
return -1;
}
-template<typename T> T & array_list<T>::operator[](size_t index) {
+template<typename T> T &
+array_list<T>::operator[](size_t index) {
return _data[index];
}