aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bebenita <[email protected]>2010-09-07 18:39:07 -0700
committerMichael Bebenita <[email protected]>2010-09-07 18:44:12 -0700
commitde611a309006f0976bc9a579eb1087e7a89f79a7 (patch)
treecd30b33ab1986c0cc84e0fc0743593bd99b0caaa
parentStarted work on a framework for writing runtime tests, added some simple test... (diff)
downloadrust-de611a309006f0976bc9a579eb1087e7a89f79a7.tar.xz
rust-de611a309006f0976bc9a579eb1087e7a89f79a7.zip
Lots of design changes around proxies and message passing. Made it so that domains can only talk to other domains via handles, and with the help of the rust_kernel.
-rw-r--r--src/Makefile30
-rw-r--r--src/boot/be/abi.ml2
-rw-r--r--src/rt/memory.h5
-rw-r--r--src/rt/memory_region.cpp7
-rw-r--r--src/rt/rust.cpp85
-rw-r--r--src/rt/rust_chan.cpp25
-rw-r--r--src/rt/rust_chan.h3
-rw-r--r--src/rt/rust_dom.cpp121
-rw-r--r--src/rt/rust_dom.h17
-rw-r--r--src/rt/rust_internal.h28
-rw-r--r--src/rt/rust_kernel.cpp170
-rw-r--r--src/rt/rust_kernel.h105
-rw-r--r--src/rt/rust_message.cpp102
-rw-r--r--src/rt/rust_message.h73
-rw-r--r--src/rt/rust_port.cpp6
-rw-r--r--src/rt/rust_proxy.h53
-rw-r--r--src/rt/rust_srv.cpp12
-rw-r--r--src/rt/rust_srv.h2
-rw-r--r--src/rt/rust_task.cpp13
-rw-r--r--src/rt/rust_task.h3
-rw-r--r--src/rt/rust_upcall.cpp212
-rw-r--r--src/rt/sync/lock_free_queue.h2
-rw-r--r--src/rt/util/indexed_list.h4
23 files changed, 643 insertions, 437 deletions
diff --git a/src/Makefile b/src/Makefile
index 827eb2ee..36aaf2a8 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -380,33 +380,10 @@ self: $(CFG_COMPILER)
# Temporarily xfail the entire multi-tasking system, pending resolution
# of inter-task shutdown races introduced with notification proxies.
-TASK_XFAILS := test/run-pass/acyclic-unwind.rs \
- test/run-pass/alt-type-simple.rs \
- test/run-pass/basic.rs \
- test/run-pass/clone-with-exterior.rs \
- test/run-pass/comm.rs \
- test/run-pass/lazychan.rs \
- test/run-pass/many.rs \
- test/run-pass/obj-dtor.rs \
- test/run-pass/preempt.rs \
- test/run-pass/spawn-fn.rs \
- test/run-pass/spawn-module-qualified.rs \
- test/run-pass/spawn.rs \
- test/run-pass/task-comm-0.rs \
- test/run-pass/task-comm-1.rs \
- test/run-pass/task-comm-2.rs \
- test/run-pass/task-comm-3.rs \
- test/run-pass/task-comm-7.rs \
- test/run-pass/task-comm-8.rs \
- test/run-pass/task-comm-9.rs \
- test/run-pass/task-comm-10.rs \
- test/run-pass/task-comm-11.rs \
- test/run-pass/task-life-0.rs \
- test/run-pass/task-comm.rs \
- test/run-pass/threads.rs \
- test/run-pass/yield.rs \
+TASK_XFAILS := test/run-pass/task-comm-10.rs \
test/run-pass/task-comm-15.rs \
- test/run-pass/task-life-0.rs
+ test/run-pass/task-life-0.rs \
+ test/run-pass/alt-type-simple.rs
TEST_XFAILS_X86 := $(TASK_XFAILS) \
test/run-pass/child-outlives-parent.rs \
@@ -425,6 +402,7 @@ TEST_XFAILS_X86 := $(TASK_XFAILS) \
test/run-pass/task-comm.rs \
test/run-pass/vec-slice.rs \
test/run-pass/task-comm-3.rs \
+ test/run-fail/task-comm-14.rs \
test/compile-fail/bad-recv.rs \
test/compile-fail/bad-send.rs \
test/compile-fail/infinite-tag-type-recursion.rs \
diff --git a/src/boot/be/abi.ml b/src/boot/be/abi.ml
index b58f9125..43043e11 100644
--- a/src/boot/be/abi.ml
+++ b/src/boot/be/abi.ml
@@ -20,7 +20,7 @@ let task_field_gc_alloc_chain = task_field_rust_sp + 1;;
let task_field_dom = task_field_gc_alloc_chain + 1;;
let n_visible_task_fields = task_field_dom + 1;;
-let dom_field_interrupt_flag = 0;;
+let dom_field_interrupt_flag = 1;;
let frame_glue_fns_field_mark = 0;;
let frame_glue_fns_field_drop = 1;;
diff --git a/src/rt/memory.h b/src/rt/memory.h
index 22bc15d3..9196e28d 100644
--- a/src/rt/memory.h
+++ b/src/rt/memory.h
@@ -1,11 +1,6 @@
-/*
- *
- */
-
#ifndef MEMORY_H
#define MEMORY_H
-
inline void *operator new(size_t size, void *mem) {
return mem;
}
diff --git a/src/rt/memory_region.cpp b/src/rt/memory_region.cpp
index 797a7c1d..2f841935 100644
--- a/src/rt/memory_region.cpp
+++ b/src/rt/memory_region.cpp
@@ -1,7 +1,3 @@
-/*
- *
- */
-
#include "rust_internal.h"
#include "memory_region.h"
@@ -20,6 +16,7 @@ memory_region::memory_region(memory_region *parent) :
}
void memory_region::free(void *mem) {
+ // printf("free: ptr 0x%" PRIxPTR"\n", (uintptr_t) mem);
if (_synchronized) { _lock.lock(); }
#ifdef TRACK_ALLOCATIONS
if (_allocation_list.replace(mem, NULL) == false) {
@@ -34,7 +31,6 @@ void memory_region::free(void *mem) {
_live_allocations--;
_srv->free(mem);
if (_synchronized) { _lock.unlock(); }
-
}
void *
@@ -63,6 +59,7 @@ memory_region::malloc(size_t size) {
#ifdef TRACK_ALLOCATIONS
_allocation_list.append(mem);
#endif
+ // printf("malloc: ptr 0x%" PRIxPTR "\n", (uintptr_t) mem);
if (_synchronized) { _lock.unlock(); }
return mem;
}
diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp
index 43818de5..0ea167a4 100644
--- a/src/rt/rust.cpp
+++ b/src/rt/rust.cpp
@@ -1,16 +1,16 @@
#include "rust_internal.h"
struct
-command_line_args
+command_line_args : public dom_owned<command_line_args>
{
- rust_dom &dom;
+ rust_dom *dom;
int argc;
char **argv;
// vec[str] passed to rust_task::start.
rust_vec *args;
- command_line_args(rust_dom &dom,
+ command_line_args(rust_dom *dom,
int sys_argc,
char **sys_argv)
: dom(dom),
@@ -21,29 +21,29 @@ command_line_args
#if defined(__WIN32__)
LPCWSTR cmdline = GetCommandLineW();
LPWSTR *wargv = CommandLineToArgvW(cmdline, &argc);
- dom.win32_require("CommandLineToArgvW", wargv != NULL);
- argv = (char **) dom.malloc(sizeof(char*) * argc);
+ dom->win32_require("CommandLineToArgvW", wargv != NULL);
+ argv = (char **) dom->malloc(sizeof(char*) * argc);
for (int i = 0; i < argc; ++i) {
int n_chars = WideCharToMultiByte(CP_UTF8, 0, wargv[i], -1,
NULL, 0, NULL, NULL);
- dom.win32_require("WideCharToMultiByte(0)", n_chars != 0);
- argv[i] = (char *) dom.malloc(n_chars);
+ dom->win32_require("WideCharToMultiByte(0)", n_chars != 0);
+ argv[i] = (char *) dom->malloc(n_chars);
n_chars = WideCharToMultiByte(CP_UTF8, 0, wargv[i], -1,
argv[i], n_chars, NULL, NULL);
- dom.win32_require("WideCharToMultiByte(1)", n_chars != 0);
+ dom->win32_require("WideCharToMultiByte(1)", n_chars != 0);
}
LocalFree(wargv);
#endif
size_t vec_fill = sizeof(rust_str *) * argc;
size_t vec_alloc = next_power_of_two(sizeof(rust_vec) + vec_fill);
- void *mem = dom.malloc(vec_alloc);
- args = new (mem) rust_vec(&dom, vec_alloc, 0, NULL);
+ void *mem = dom->malloc(vec_alloc);
+ args = new (mem) rust_vec(dom, vec_alloc, 0, NULL);
rust_str **strs = (rust_str**) &args->data[0];
for (int i = 0; i < argc; ++i) {
size_t str_fill = strlen(argv[i]) + 1;
size_t str_alloc = next_power_of_two(sizeof(rust_str) + str_fill);
- mem = dom.malloc(str_alloc);
- strs[i] = new (mem) rust_str(&dom, str_alloc, str_fill,
+ mem = dom->malloc(str_alloc);
+ strs[i] = new (mem) rust_str(dom, str_alloc, str_fill,
(uint8_t const *)argv[i]);
}
args->fill = vec_fill;
@@ -58,50 +58,55 @@ command_line_args
// Drop the args we've had pinned here.
rust_str **strs = (rust_str**) &args->data[0];
for (int i = 0; i < argc; ++i)
- dom.free(strs[i]);
- dom.free(args);
+ dom->free(strs[i]);
+ dom->free(args);
}
#ifdef __WIN32__
for (int i = 0; i < argc; ++i) {
- dom.free(argv[i]);
+ dom->free(argv[i]);
}
- dom.free(argv);
+ dom->free(argv);
#endif
}
};
+/**
+ * Main entry point into the Rust runtime. Here we create a Rust service,
+ * initialize the kernel, create the root domain and run it.
+ */
extern "C" CDECL int
-rust_start(uintptr_t main_fn, rust_crate const *crate, int argc, char **argv)
-{
- int ret;
- {
- rust_srv srv;
- rust_dom dom(&srv, crate, "main");
- srv.kernel->register_domain(&dom);
- command_line_args args(dom, argc, argv);
+rust_start(uintptr_t main_fn, rust_crate const *crate, int argc,
+ char **argv) {
- dom.log(rust_log::DOM, "startup: %d args", args.argc);
- for (int i = 0; i < args.argc; ++i)
- dom.log(rust_log::DOM,
- "startup: arg[%d] = '%s'", i, args.argv[i]);
-
- if (dom._log.is_tracing(rust_log::DWARF)) {
- rust_crate_reader rdr(&dom, crate);
- }
+ rust_srv *srv = new rust_srv();
+ rust_kernel *kernel = new rust_kernel(srv);
+ kernel->start();
+ rust_handle<rust_dom> *handle = kernel->create_domain(crate, "main");
+ rust_dom *dom = handle->referent();
+ command_line_args *args = new (dom) command_line_args(dom, argc, argv);
- uintptr_t main_args[4] = { 0, 0, 0, (uintptr_t)args.args };
-
- dom.root_task->start(crate->get_exit_task_glue(),
- main_fn,
- (uintptr_t)&main_args,
- sizeof(main_args));
+ dom->log(rust_log::DOM, "startup: %d args", args->argc);
+ for (int i = 0; i < args->argc; i++) {
+ dom->log(rust_log::DOM,
+ "startup: arg[%d] = '%s'", i, args->argv[i]);
+ }
- ret = dom.start_main_loop();
- srv.kernel->deregister_domain(&dom);
+ if (dom->_log.is_tracing(rust_log::DWARF)) {
+ rust_crate_reader create_reader(dom, crate);
}
+ uintptr_t main_args[4] = {0, 0, 0, (uintptr_t)args->args};
+ dom->root_task->start(crate->get_exit_task_glue(),
+ main_fn, (uintptr_t)&main_args, sizeof(main_args));
+ int ret = dom->start_main_loop();
+ delete args;
+ kernel->destroy_domain(dom);
+ kernel->join_all_domains();
+ delete kernel;
+ delete srv;
+
#if !defined(__WIN32__)
// Don't take down the process if the main thread exits without an
// error.
diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp
index 2a0a61db..9a666453 100644
--- a/src/rt/rust_chan.cpp
+++ b/src/rt/rust_chan.cpp
@@ -4,13 +4,15 @@
/**
* Create a new rust channel and associate it with the specified port.
*/
-rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) :
- task(task), port(port), buffer(task->dom, port->delegate()->unit_sz) {
-
+rust_chan::rust_chan(rust_task *task,
+ maybe_proxy<rust_port> *port,
+ size_t unit_sz) :
+ task(task),
+ port(port),
+ buffer(task->dom, unit_sz) {
if (port) {
associate(port);
}
-
task->log(rust_log::MEM | rust_log::COMM,
"new rust_chan(task=0x%" PRIxPTR
", port=0x%" PRIxPTR ") -> chan=0x%" PRIxPTR,
@@ -34,7 +36,7 @@ void rust_chan::associate(maybe_proxy<rust_port> *port) {
task->log(rust_log::TASK,
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
this, port);
- this->port->delegate()->chans.push(this);
+ this->port->referent()->chans.push(this);
}
}
@@ -51,8 +53,8 @@ void rust_chan::disassociate() {
if (port->is_proxy() == false) {
task->log(rust_log::TASK,
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
- this, port->delegate());
- port->delegate()->chans.swap_delete(this);
+ this, port->referent());
+ port->referent()->chans.swap_delete(this);
}
// Delete reference to the port.
@@ -76,14 +78,11 @@ void rust_chan::send(void *sptr) {
"rust_chan::transmit with nothing to send.");
if (port->is_proxy()) {
- // TODO: Cache port task locally.
- rust_proxy<rust_task> *port_task =
- dom->get_task_proxy(port->delegate()->task);
- data_message::send(buffer.peek(), buffer.unit_sz,
- "send data", task, port_task, port->as_proxy());
+ data_message::send(buffer.peek(), buffer.unit_sz, "send data",
+ task->get_handle(), port->as_proxy()->handle());
buffer.dequeue(NULL);
} else {
- rust_port *target_port = port->delegate();
+ rust_port *target_port = port->referent();
if (target_port->task->blocked_on(target_port)) {
dom->log(rust_log::COMM, "dequeued in rendezvous_ptr");
buffer.dequeue(target_port->task->rendezvous_ptr);
diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h
index 6aa98247..a6e4d3d6 100644
--- a/src/rt/rust_chan.h
+++ b/src/rt/rust_chan.h
@@ -5,7 +5,8 @@ class rust_chan : public rc_base<rust_chan>,
public task_owned<rust_chan>,
public rust_cond {
public:
- rust_chan(rust_task *task, maybe_proxy<rust_port> *port);
+ rust_chan(rust_task *task, maybe_proxy<rust_port> *port, size_t unit_sz);
+
~rust_chan();
rust_task *task;
diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp
index a1207ec7..323e9c39 100644
--- a/src/rt/rust_dom.cpp
+++ b/src/rt/rust_dom.cpp
@@ -4,8 +4,9 @@
template class ptr_vec<rust_task>;
-rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate,
- const char *name) :
+rust_dom::rust_dom(rust_kernel *kernel,
+ rust_message_queue *message_queue, rust_srv *srv,
+ rust_crate const *root_crate, const char *name) :
interrupt_flag(0),
root_crate(root_crate),
_log(srv, this),
@@ -20,7 +21,8 @@ rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate,
root_task(NULL),
curr_task(NULL),
rval(0),
- _kernel(srv->kernel)
+ kernel(kernel),
+ message_queue(message_queue)
{
logptr("new dom", (uintptr_t)this);
isaac_init(this, &rctx);
@@ -42,33 +44,9 @@ del_all_tasks(rust_dom *dom, ptr_vec<rust_task> *v) {
}
}
-void
-rust_dom::delete_proxies() {
- rust_task *task;
- rust_proxy<rust_task> *task_proxy;
- while (_task_proxies.pop(&task, &task_proxy)) {
- log(rust_log::TASK,
- "deleting proxy 0x%" PRIxPTR " in dom %s 0x%" PRIxPTR,
- task_proxy, task_proxy->dom->name, task_proxy->dom);
- delete task_proxy;
- }
-
- rust_port *port;
- rust_proxy<rust_port> *port_proxy;
- while (_port_proxies.pop(&port, &port_proxy)) {
- log(rust_log::TASK,
- "deleting proxy 0x%" PRIxPTR " in dom %s 0x%" PRIxPTR,
- port_proxy, port_proxy->dom->name, port_proxy->dom);
- delete port_proxy;
- }
-}
-
rust_dom::~rust_dom() {
log(rust_log::MEM | rust_log::DOM,
"~rust_dom %s @0x%" PRIxPTR, name, (uintptr_t)this);
-
- log(rust_log::TASK, "deleting all proxies");
- delete_proxies();
log(rust_log::TASK, "deleting all running tasks");
del_all_tasks(this, &running_tasks);
log(rust_log::TASK, "deleting all blocked tasks");
@@ -78,8 +56,9 @@ rust_dom::~rust_dom() {
#ifndef __WIN32__
pthread_attr_destroy(&attr);
#endif
- while (caches.length())
+ while (caches.length()) {
delete caches.pop();
+ }
}
void
@@ -276,67 +255,18 @@ rust_dom::reap_dead_tasks() {
}
/**
- * Enqueues a message in this domain's incoming message queue. It's the
- * responsibility of the receiver to free the message once it's processed.
- */
-void rust_dom::send_message(rust_message *message) {
- log(rust_log::COMM, "==> enqueueing \"%s\" 0x%" PRIxPTR
- " in queue 0x%" PRIxPTR
- " in domain 0x%" PRIxPTR,
- message->label,
- message,
- &_incoming_message_queue,
- this);
- _incoming_message_queue.enqueue(message);
-}
-
-/**
* Drains and processes incoming pending messages.
*/
void rust_dom::drain_incoming_message_queue(bool process) {
rust_message *message;
- while (_incoming_message_queue.dequeue(&message)) {
- log(rust_log::COMM, "<== processing incoming message \"%s\" 0x%"
- PRIxPTR, message->label, message);
+ while (message_queue->dequeue(&message)) {
+ log(rust_log::COMM, "<== receiving \"%s\" " PTR,
+ message->label, message);
if (process) {
message->process();
}
- message->~rust_message();
- this->synchronized_region.free(message);
- }
-}
-
-rust_proxy<rust_task> *
-rust_dom::get_task_proxy(rust_task *task) {
- rust_proxy<rust_task> *proxy = NULL;
- if (_task_proxies.get(task, &proxy)) {
- return proxy;
+ delete message;
}
- log(rust_log::COMM, "no proxy for %s @0x%" PRIxPTR, task->name, task);
- proxy = new (this) rust_proxy<rust_task> (this, task, false);
- _task_proxies.put(task, proxy);
- return proxy;
-}
-
-/**
- * Gets a proxy for this port.
- *
- * TODO: This method needs to be synchronized since it's usually called
- * during upcall_clone_chan in a different thread. However, for now
- * since this usually happens before the thread actually starts,
- * we may get lucky without synchronizing.
- *
- */
-rust_proxy<rust_port> *
-rust_dom::get_port_proxy_synchronized(rust_port *port) {
- rust_proxy<rust_port> *proxy = NULL;
- if (_port_proxies.get(port, &proxy)) {
- return proxy;
- }
- log(rust_log::COMM, "no proxy for 0x%" PRIxPTR, port);
- proxy = new (this) rust_proxy<rust_port> (this, port, false);
- _port_proxies.put(port, proxy);
- return proxy;
}
/**
@@ -362,31 +292,6 @@ rust_dom::schedule_task() {
return NULL;
}
-/**
- * Checks for simple deadlocks.
- */
-bool
-rust_dom::is_deadlocked() {
- if (_kernel->domains.length() != 1) {
- // We cannot tell if we are deadlocked if other domains exists.
- return false;
- }
-
- if (running_tasks.length() != 0) {
- // We are making progress and therefore we are not deadlocked.
- return false;
- }
-
- if (_incoming_message_queue.is_empty() && blocked_tasks.length() > 0) {
- // We have no messages to process, no running tasks to schedule
- // and some blocked tasks therefore we are likely in a deadlock.
- _kernel->log_all_domain_state();
- return true;
- }
-
- return false;
-}
-
void
rust_dom::log_state() {
if (!running_tasks.is_empty()) {
@@ -439,7 +344,7 @@ rust_dom::start_main_loop()
logptr("exit-task glue", root_crate->get_exit_task_glue());
while (n_live_tasks() > 0) {
- A(this, is_deadlocked() == false, "deadlock");
+ A(this, kernel->is_deadlocked() == false, "deadlock");
drain_incoming_message_queue(true);
@@ -496,7 +401,7 @@ rust_dom::start_main_loop()
log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ...");
while (dead_tasks.length() > 0) {
- if (_incoming_message_queue.is_empty()) {
+ if (message_queue->is_empty()) {
log(rust_log::DOM,
"waiting for %d dead tasks to become dereferenced, "
"scheduler yielding ...",
diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h
index 5c9c2953..3f90bb67 100644
--- a/src/rt/rust_dom.h
+++ b/src/rt/rust_dom.h
@@ -1,7 +1,7 @@
#ifndef RUST_DOM_H
#define RUST_DOM_H
-struct rust_dom
+struct rust_dom : public kernel_owned<rust_dom>, rc_base<rust_dom>
{
// Fields known to the compiler:
uintptr_t interrupt_flag;
@@ -27,14 +27,14 @@ struct rust_dom
rust_task *curr_task;
int rval;
- rust_kernel *_kernel;
+ rust_kernel *kernel;
int32_t list_index;
hash_map<rust_task *, rust_proxy<rust_task> *> _task_proxies;
hash_map<rust_port *, rust_proxy<rust_port> *> _port_proxies;
// Incoming messages from other domains.
- lock_free_queue<rust_message*> _incoming_message_queue;
+ rust_message_queue *message_queue;
#ifndef __WIN32__
pthread_attr_t attr;
@@ -42,9 +42,10 @@ struct rust_dom
// Only a pointer to 'name' is kept, so it must live as long as this
// domain.
- rust_dom(rust_srv *srv, rust_crate const *root_crate, const char *name);
+ rust_dom(rust_kernel *kernel,
+ rust_message_queue *message_queue, rust_srv *srv,
+ rust_crate const *root_crate, const char *name);
~rust_dom();
-
void activate(rust_task *task);
void log(rust_task *task, uint32_t logbit, char const *fmt, ...);
void log(uint32_t logbit, char const *fmt, ...);
@@ -63,11 +64,7 @@ struct rust_dom
void free(void *mem);
void free(void *mem, memory_region::memory_region_type type);
- void send_message(rust_message *message);
void drain_incoming_message_queue(bool process);
- rust_proxy<rust_task> *get_task_proxy(rust_task *task);
- void delete_proxies();
- rust_proxy<rust_port> *get_port_proxy_synchronized(rust_port *port);
#ifdef __WIN32__
void win32_require(LPCTSTR fn, BOOL ok);
@@ -81,7 +78,7 @@ struct rust_dom
void reap_dead_tasks();
rust_task *schedule_task();
- bool is_deadlocked();
+
int start_main_loop();
void log_state();
diff --git a/src/rt/rust_internal.h b/src/rt/rust_internal.h
index 60d86f61..c327f8c0 100644
--- a/src/rt/rust_internal.h
+++ b/src/rt/rust_internal.h
@@ -72,6 +72,11 @@ struct frame_glue_fns;
#define A(dom, e, s, ...) ((e) ? (void)0 : \
(dom)->srv->fatal(#e, __FILE__, __LINE__, s, ## __VA_ARGS__))
+#define K(srv, e, s, ...) ((e) ? (void)0 : \
+ srv->fatal(#e, __FILE__, __LINE__, s, ## __VA_ARGS__))
+
+#define PTR "0x%" PRIxPTR
+
// This drives our preemption scheme.
static size_t const TIME_SLICE_IN_MS = 10;
@@ -96,22 +101,26 @@ template <typename T> struct rc_base {
};
template <typename T> struct dom_owned {
- rust_dom *get_dom() const {
- return ((T*)this)->dom;
- }
-
void operator delete(void *ptr) {
((T *)ptr)->dom->free(ptr);
}
};
template <typename T> struct task_owned {
- rust_dom *get_dom() const {
- return ((T *)this)->task->dom;
+ void operator delete(void *ptr) {
+ ((T *)ptr)->task->dom->free(ptr);
}
+};
+template <typename T> struct kernel_owned {
void operator delete(void *ptr) {
- ((T *)ptr)->task->dom->free(ptr);
+ ((T *)ptr)->kernel->free(ptr);
+ }
+};
+
+template <typename T> struct region_owned {
+ void operator delete(void *ptr) {
+ ((T *)ptr)->region->free(ptr);
}
};
@@ -152,8 +161,8 @@ public:
#include "rust_srv.h"
#include "rust_log.h"
#include "rust_proxy.h"
-#include "rust_message.h"
#include "rust_kernel.h"
+#include "rust_message.h"
#include "rust_dom.h"
#include "memory.h"
@@ -552,6 +561,9 @@ struct gc_alloc {
#include "rust_chan.h"
#include "rust_port.h"
+#include "test/rust_test_harness.h"
+#include "test/rust_test_util.h"
+
//
// Local Variables:
// mode: C++
diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp
index b82e4615..9ea1f2eb 100644
--- a/src/rt/rust_kernel.cpp
+++ b/src/rt/rust_kernel.cpp
@@ -1,25 +1,81 @@
#include "rust_internal.h"
rust_kernel::rust_kernel(rust_srv *srv) :
- _region(srv->local_region),
+ _region(&srv->local_region),
_log(srv, NULL),
- domains(srv->local_region),
- message_queues(srv->local_region) {
+ _srv(srv),
+ _interrupt_kernel_loop(FALSE),
+ domains(&srv->local_region),
+ message_queues(&srv->local_region) {
// Nop.
}
-rust_kernel::~rust_kernel() {
- // Nop.
+rust_handle<rust_dom> *
+rust_kernel::create_domain(const rust_crate *crate, const char *name) {
+ rust_message_queue *message_queue =
+ new (this) rust_message_queue(_srv, this);
+ rust_srv *srv = _srv->clone();
+ rust_dom *dom =
+ new (this) rust_dom(this, message_queue, srv, crate, name);
+ rust_handle<rust_dom> *handle = get_dom_handle(dom);
+ message_queue->associate(handle);
+ domains.append(dom);
+ message_queues.append(message_queue);
+ return handle;
}
void
-rust_kernel::register_domain(rust_dom *dom) {
- domains.append(dom);
+rust_kernel::destroy_domain(rust_dom *dom) {
+ log(rust_log::KERN, "deleting domain: " PTR ", index: %d, domains %d",
+ dom, dom->list_index, domains.length());
+ domains.remove(dom);
+ dom->message_queue->disassociate();
+ rust_srv *srv = dom->srv;
+ delete dom;
+ delete srv;
+}
+
+rust_handle<rust_dom> *
+rust_kernel::get_dom_handle(rust_dom *dom) {
+ rust_handle<rust_dom> *handle = NULL;
+ if (_dom_handles.get(dom, &handle)) {
+ return handle;
+ }
+ handle = new (this) rust_handle<rust_dom>(this, dom->message_queue, dom);
+ _dom_handles.put(dom, handle);
+ return handle;
+}
+
+rust_handle<rust_task> *
+rust_kernel::get_task_handle(rust_task *task) {
+ rust_handle<rust_task> *handle = NULL;
+ if (_task_handles.get(task, &handle)) {
+ return handle;
+ }
+ handle = new (this) rust_handle<rust_task>(this, task->dom->message_queue,
+ task);
+ _task_handles.put(task, handle);
+ return handle;
+}
+
+rust_handle<rust_port> *
+rust_kernel::get_port_handle(rust_port *port) {
+ rust_handle<rust_port> *handle = NULL;
+ if (_port_handles.get(port, &handle)) {
+ return handle;
+ }
+ handle = new (this) rust_handle<rust_port>(this,
+ port->task->dom->message_queue, port);
+ _port_handles.put(port, handle);
+ return handle;
}
void
-rust_kernel::deregister_domain(rust_dom *dom) {
- domains.remove(dom);
+rust_kernel::join_all_domains() {
+ // TODO: Perhaps we can do this a little smarter. Just spin wait for now.
+ while (domains.length() > 0) {
+ sync::yield();
+ }
}
void
@@ -30,6 +86,36 @@ rust_kernel::log_all_domain_state() {
}
}
+/**
+ * Checks for simple deadlocks.
+ */
+bool
+rust_kernel::is_deadlocked() {
+ return false;
+// _lock.lock();
+// if (domains.length() != 1) {
+// // We can only check for deadlocks when only one domain exists.
+// return false;
+// }
+//
+// if (domains[0]->running_tasks.length() != 0) {
+// // We are making progress and therefore we are not deadlocked.
+// return false;
+// }
+//
+// if (domains[0]->message_queue->is_empty() &&
+// domains[0]->blocked_tasks.length() > 0) {
+// // We have no messages to process, no running tasks to schedule
+// // and some blocked tasks therefore we are likely in a deadlock.
+// log_all_domain_state();
+// return true;
+// }
+//
+// _lock.unlock();
+// return false;
+}
+
+
void
rust_kernel::log(uint32_t type_bits, char const *fmt, ...) {
char buf[256];
@@ -41,3 +127,69 @@ rust_kernel::log(uint32_t type_bits, char const *fmt, ...) {
va_end(args);
}
}
+
+void
+rust_kernel::start_kernel_loop() {
+ while (_interrupt_kernel_loop == false) {
+ message_queues.global.lock();
+ for (size_t i = 0; i < message_queues.length(); i++) {
+ rust_message_queue *queue = message_queues[i];
+ if (queue->is_associated() == false) {
+ rust_message *message = NULL;
+ while (queue->dequeue(&message)) {
+ message->kernel_process();
+ delete message;
+ }
+ }
+ }
+ message_queues.global.unlock();
+ }
+}
+
+void
+rust_kernel::run() {
+ log(rust_log::KERN, "started kernel loop");
+ start_kernel_loop();
+ log(rust_log::KERN, "finished kernel loop");
+}
+
+rust_kernel::~rust_kernel() {
+ K(_srv, domains.length() == 0,
+ "Kernel has %d live domain(s), join all domains before killing "
+ "the kernel.", domains.length());
+
+ // If the kernel loop is running, interrupt it, join and exit.
+ if (is_running()) {
+ _interrupt_kernel_loop = true;
+ join();
+ }
+
+ free_handles(_task_handles);
+ free_handles(_port_handles);
+ free_handles(_dom_handles);
+
+ rust_message_queue *queue = NULL;
+ while (message_queues.pop(&queue)) {
+ K(_srv, queue->is_empty(), "Kernel message queue should be empty "
+ "before killing the kernel.");
+ delete queue;
+ }
+}
+
+void *
+rust_kernel::malloc(size_t size) {
+ return _region->malloc(size);
+}
+
+void rust_kernel::free(void *mem) {
+ _region->free(mem);
+}
+
+template<class T> void
+rust_kernel::free_handles(hash_map<T*, rust_handle<T>* > &map) {
+ T* key;
+ rust_handle<T> *value;
+ while (map.pop(&key, &value)) {
+ delete value;
+ }
+}
diff --git a/src/rt/rust_kernel.h b/src/rt/rust_kernel.h
index 478d030c..902a9a2f 100644
--- a/src/rt/rust_kernel.h
+++ b/src/rt/rust_kernel.h
@@ -2,20 +2,113 @@
#define RUST_KERNEL_H
/**
- * A global object shared by all domains.
+ * A handle object for Rust tasks. We need a reference to the message queue
+ * of the referent's domain which we can safely hang on to since it's a
+ * kernel object. We use the referent reference as a label we stash in
+ * messages sent via this proxy.
*/
-class rust_kernel {
- memory_region &_region;
+
+class rust_kernel;
+
+template <typename T> class
+rust_handle :
+ public rust_cond,
+ public rc_base<rust_handle<T> >,
+ public kernel_owned<rust_handle<T> > {
+public:
+ rust_kernel *kernel;
+ rust_message_queue *message_queue;
+ T *_referent;
+ T * referent() {
+ return _referent;
+ }
+ rust_handle(rust_kernel *kernel,
+ rust_message_queue *message_queue,
+ T *referent) :
+ kernel(kernel),
+ message_queue(message_queue),
+ _referent(referent) {
+ // Nop.
+ }
+};
+
+/**
+ * A global object shared by all thread domains. Most of the data structures
+ * in this class are synchronized since they are accessed from multiple
+ * threads.
+ */
+class rust_kernel : public rust_thread {
+ memory_region *_region;
rust_log _log;
+ rust_srv *_srv;
+
+ /**
+ * Task proxy objects are kernel owned handles to Rust objects.
+ */
+ hash_map<rust_task *, rust_handle<rust_task> *> _task_handles;
+ hash_map<rust_port *, rust_handle<rust_port> *> _port_handles;
+ hash_map<rust_dom *, rust_handle<rust_dom> *> _dom_handles;
+
+ template<class T> void free_handles(hash_map<T*, rust_handle<T>* > &map);
+
+ void run();
+ void start_kernel_loop();
+ bool volatile _interrupt_kernel_loop;
+
+ /**
+ * Lock for the message queue list, so we can safely
+ */
+ spin_lock _message_queues_lock;
+
public:
+
+ /**
+ * List of domains that are currently executing.
+ */
synchronized_indexed_list<rust_dom> domains;
- synchronized_indexed_list<lock_free_queue<rust_message*> > message_queues;
+
+ /**
+ * Message queues are kernel objects and are associated with domains.
+ * Their lifetime is not bound to the lifetime of a domain and in fact
+ * live on after their associated domain has died. This way we can safely
+ * communicate with domains that may have died.
+ *
+ * Although the message_queues list is synchronized, each individual
+ * message queue is lock free.
+ */
+ synchronized_indexed_list<rust_message_queue> message_queues;
+
+ rust_handle<rust_dom> *get_dom_handle(rust_dom *dom);
+ rust_handle<rust_task> *get_task_handle(rust_task *task);
+ rust_handle<rust_port> *get_port_handle(rust_port *port);
+
rust_kernel(rust_srv *srv);
- void register_domain(rust_dom *dom);
- void deregister_domain(rust_dom *dom);
+
+ rust_handle<rust_dom> *create_domain(rust_crate const *root_crate,
+ const char *name);
+ void destroy_domain(rust_dom *dom);
+
+ bool is_deadlocked();
+
+ /**
+ * Blocks until all domains have terminated.
+ */
+ void join_all_domains();
+
void log_all_domain_state();
void log(uint32_t type_bits, char const *fmt, ...);
virtual ~rust_kernel();
+
+ void *malloc(size_t size);
+ void free(void *mem);
};
+inline void *operator new(size_t size, rust_kernel *kernel) {
+ return kernel->malloc(size);
+}
+
+inline void *operator new(size_t size, rust_kernel &kernel) {
+ return kernel.malloc(size);
+}
+
#endif /* RUST_KERNEL_H */
diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp
index b6b7fbf0..dab13c09 100644
--- a/src/rt/rust_message.cpp
+++ b/src/rt/rust_message.cpp
@@ -2,36 +2,35 @@
#include "rust_message.h"
rust_message::
-rust_message(const char* label, rust_task *source, rust_task *target) :
- label(label),
- _dom(target->dom),
- _source(source),
- _target(target) {
+rust_message(memory_region *region, const char* label,
+ rust_handle<rust_task> *source, rust_handle<rust_task> *target) :
+ label(label), region(region), _source(source), _target(target) {
}
rust_message::~rust_message() {
+ // Nop.
}
void rust_message::process() {
- I(_dom, false);
+ // Nop.
}
-rust_proxy<rust_task> *
-rust_message::get_source_proxy() {
- return _dom->get_task_proxy(_source);
+void rust_message::kernel_process() {
+ // Nop.
}
notify_message::
-notify_message(notification_type type, const char* label,
- rust_task *source,
- rust_task *target) :
- rust_message(label, source, target), type(type) {
+notify_message(memory_region *region, notification_type type,
+ const char* label, rust_handle<rust_task> *source,
+ rust_handle<rust_task> *target) :
+ rust_message(region, label, source, target), type(type) {
}
data_message::
-data_message(uint8_t *buffer, size_t buffer_sz, const char* label,
- rust_task *source, rust_task *target, rust_port *port) :
- rust_message(label, source, target),
+data_message(memory_region *region, uint8_t *buffer, size_t buffer_sz,
+ const char* label, rust_handle<rust_task> *source,
+ rust_handle<rust_port> *port) :
+ rust_message(region, label, source, NULL),
_buffer_sz(buffer_sz), _port(port) {
_buffer = (uint8_t *)malloc(buffer_sz);
memcpy(_buffer, buffer, buffer_sz);
@@ -47,54 +46,79 @@ data_message::~data_message() {
* source task.
*/
void notify_message::
-send(notification_type type, const char* label, rust_task *source,
- rust_proxy<rust_task> *target) {
- rust_task *target_task = target->delegate();
- rust_dom *target_domain = target_task->dom;
+send(notification_type type, const char* label,
+ rust_handle<rust_task> *source, rust_handle<rust_task> *target) {
+ memory_region *region = &target->message_queue->region;
notify_message *message =
- new (target_domain, memory_region::SYNCHRONIZED) notify_message(type,
- label, source, target_task);
- target_domain->send_message(message);
+ new (region) notify_message(region, type, label, source, target);
+// target->referent()->log(rust_log::COMM,
+// "==> sending \"%s\" " PTR " in queue " PTR,
+// label, message, &target->message_queue);
+ target->message_queue->enqueue(message);
}
void notify_message::process() {
- rust_task *task = _target;
+ rust_task *task = _target->referent();
switch (type) {
case KILL:
- task->ref_count--;
+ // task->ref_count--;
task->kill();
break;
case JOIN: {
if (task->dead() == false) {
- task->tasks_waiting_to_join.append(get_source_proxy());
+ rust_proxy<rust_task> *proxy = new rust_proxy<rust_task>(_source);
+ task->tasks_waiting_to_join.append(proxy);
} else {
- send(WAKEUP, "wakeup", task, get_source_proxy());
+ send(WAKEUP, "wakeup", _target, _source);
}
break;
}
case WAKEUP:
- task->wakeup(get_source_proxy()->delegate());
+ task->wakeup(_source);
+ break;
+ }
+}
+
+void notify_message::kernel_process() {
+ switch(type) {
+ case WAKEUP:
+ case KILL:
+ // Ignore.
+ break;
+ case JOIN:
+ send(WAKEUP, "wakeup", _target, _source);
break;
}
}
void data_message::
-send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source,
- rust_proxy<rust_task> *target, rust_proxy<rust_port> *port) {
+send(uint8_t *buffer, size_t buffer_sz, const char* label,
+ rust_handle<rust_task> *source, rust_handle<rust_port> *port) {
- rust_task *target_task = target->delegate();
- rust_port *target_port = port->delegate();
- rust_dom *target_domain = target_task->dom;
+ memory_region *region = &port->message_queue->region;
data_message *message =
- new (target_domain, memory_region::SYNCHRONIZED)
- data_message(buffer, buffer_sz, label, source,
- target_task, target_port);
- target_domain->send_message(message);
+ new (region) data_message(region, buffer, buffer_sz, label, source,
+ port);
+ source->referent()->log(rust_log::COMM,
+ "==> sending \"%s\"" PTR " in queue " PTR,
+ label, message, &port->message_queue);
+ port->message_queue->enqueue(message);
}
void data_message::process() {
- _port->remote_channel->send(_buffer);
- _target->log(rust_log::COMM, "<=== received data via message ===");
+ _port->referent()->remote_channel->send(_buffer);
+ // _target->referent()->log(rust_log::COMM,
+ // "<=== received data via message ===");
+}
+
+void data_message::kernel_process() {
+
+}
+
+rust_message_queue::rust_message_queue(rust_srv *srv, rust_kernel *kernel) :
+ region (srv, true), kernel(kernel),
+ dom_handle(NULL) {
+ // Nop.
}
//
diff --git a/src/rt/rust_message.h b/src/rt/rust_message.h
index 7aee6d9f..c342e3e4 100644
--- a/src/rt/rust_message.h
+++ b/src/rt/rust_message.h
@@ -9,28 +9,31 @@
/**
* Abstract base class for all message types.
*/
-class rust_message {
+class rust_message : public region_owned<rust_message> {
public:
const char* label;
+ memory_region *region;
private:
- rust_dom *_dom;
- rust_task *_source;
protected:
- rust_task *_target;
+ rust_handle<rust_task> *_source;
+ rust_handle<rust_task> *_target;
public:
- rust_message(const char* label, rust_task *source, rust_task *target);
+ rust_message(memory_region *region,
+ const char* label,
+ rust_handle<rust_task> *source,
+ rust_handle<rust_task> *target);
+
virtual ~rust_message();
/**
- * We can only access the source task through a proxy, so create one
- * on demand if we need it.
+ * Processes the message in the target domain.
*/
- rust_proxy<rust_task> *get_source_proxy();
+ virtual void process();
/**
- * Processes the message in the target domain thread.
+ * Processes the message in the kernel.
*/
- virtual void process();
+ virtual void kernel_process();
};
/**
@@ -44,17 +47,19 @@ public:
const notification_type type;
- notify_message(notification_type type, const char* label,
- rust_task *source, rust_task *target);
+ notify_message(memory_region *region, notification_type type,
+ const char* label, rust_handle<rust_task> *source,
+ rust_handle<rust_task> *target);
void process();
+ void kernel_process();
/**
* This code executes in the sending domain's thread.
*/
static void
- send(notification_type type, const char* label, rust_task *source,
- rust_proxy<rust_task> *target);
+ send(notification_type type, const char* label,
+ rust_handle<rust_task> *source, rust_handle<rust_task> *target);
};
/**
@@ -64,21 +69,51 @@ class data_message : public rust_message {
private:
uint8_t *_buffer;
size_t _buffer_sz;
- rust_port *_port;
+ rust_handle<rust_port> *_port;
+
public:
+ data_message(memory_region *region, uint8_t *buffer, size_t buffer_sz,
+ const char* label, rust_handle<rust_task> *source,
+ rust_handle<rust_port> *port);
- data_message(uint8_t *buffer, size_t buffer_sz, const char* label,
- rust_task *source, rust_task *target, rust_port *port);
virtual ~data_message();
void process();
+ void kernel_process();
/**
* This code executes in the sending domain's thread.
*/
static void
send(uint8_t *buffer, size_t buffer_sz, const char* label,
- rust_task *source, rust_proxy<rust_task> *target,
- rust_proxy<rust_port> *port);
+ rust_handle<rust_task> *source, rust_handle<rust_port> *port);
+};
+
+class rust_message_queue : public lock_free_queue<rust_message*>,
+ public kernel_owned<rust_message_queue> {
+public:
+ memory_region region;
+ rust_kernel *kernel;
+ rust_handle<rust_dom> *dom_handle;
+ int32_t list_index;
+ rust_message_queue(rust_srv *srv, rust_kernel *kernel);
+
+ void associate(rust_handle<rust_dom> *dom_handle) {
+ this->dom_handle = dom_handle;
+ }
+
+ /**
+ * The Rust domain relinquishes control to the Rust kernel.
+ */
+ void disassociate() {
+ this->dom_handle = NULL;
+ }
+
+ /**
+ * Checks if a Rust domain is responsible for draining the message queue.
+ */
+ bool is_associated() {
+ return this->dom_handle != NULL;
+ }
};
//
diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp
index 458283fb..82054687 100644
--- a/src/rt/rust_port.cpp
+++ b/src/rt/rust_port.cpp
@@ -2,15 +2,15 @@
#include "rust_port.h"
rust_port::rust_port(rust_task *task, size_t unit_sz) :
- maybe_proxy<rust_port>(this), task(task), unit_sz(unit_sz),
- writers(task->dom), chans(task->dom) {
+ maybe_proxy<rust_port>(this), task(task),
+ unit_sz(unit_sz), writers(task->dom), chans(task->dom) {
task->log(rust_log::MEM | rust_log::COMM,
"new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
// Allocate a remote channel, for remote channel data.
- remote_channel = new (task->dom) rust_chan(task, this);
+ remote_channel = new (task->dom) rust_chan(task, this, unit_sz);
}
rust_port::~rust_port() {
diff --git a/src/rt/rust_proxy.h b/src/rt/rust_proxy.h
index bf12e1d5..2b5e820d 100644
--- a/src/rt/rust_proxy.h
+++ b/src/rt/rust_proxy.h
@@ -2,57 +2,70 @@
#define RUST_PROXY_H
/**
- * A proxy object is a wrapper around other Rust objects. One use of the proxy
- * object is to mitigate access between tasks in different thread domains.
+ * A proxy object is a wrapper for remote objects. Proxy objects are domain
+ * owned and provide a way distinguish between local and remote objects.
*/
template <typename T> struct rust_proxy;
+
/**
* The base class of all objects that may delegate.
*/
template <typename T> struct
maybe_proxy : public rc_base<T>, public rust_cond {
protected:
- T *_delegate;
+ T *_referent;
public:
- maybe_proxy(T * delegate) : _delegate(delegate) {
-
+ maybe_proxy(T *referent) : _referent(referent) {
+ // Nop.
}
- T *delegate() {
- return _delegate;
+
+ T *referent() {
+ return (T *)_referent;
}
+
bool is_proxy() {
- return _delegate != this;
+ return _referent != this;
}
+
rust_proxy<T> *as_proxy() {
return (rust_proxy<T> *) this;
}
- T *as_delegate() {
- I(_delegate->get_dom(), !is_proxy());
+
+ T *as_referent() {
return (T *) this;
}
};
+template <typename T> class rust_handle;
+
/**
* A proxy object that delegates to another.
*/
template <typename T> struct
-rust_proxy : public maybe_proxy<T>,
- public dom_owned<rust_proxy<T> > {
+rust_proxy : public maybe_proxy<T> {
private:
bool _strong;
+ rust_handle<T> *_handle;
public:
- rust_dom *dom;
- rust_proxy(rust_dom *dom, T *delegate, bool strong) :
- maybe_proxy<T> (delegate), _strong(strong), dom(dom) {
- this->dom->log(rust_log::COMM,
- "new proxy: 0x%" PRIxPTR " => 0x%" PRIxPTR, this, delegate);
- if (strong) {
- delegate->ref();
- }
+ rust_proxy(rust_handle<T> *handle) :
+ maybe_proxy<T> (NULL), _strong(FALSE), _handle(handle) {
+ // Nop.
+ }
+
+ rust_proxy(T *referent) :
+ maybe_proxy<T> (referent), _strong(FALSE), _handle(NULL) {
+ // Nop.
+ }
+
+ rust_handle<T> *handle() {
+ return _handle;
}
};
+class rust_message_queue;
+class rust_task;
+
//
// Local Variables:
// mode: C++
diff --git a/src/rt/rust_srv.cpp b/src/rt/rust_srv.cpp
index d9223562..a5fcde9b 100644
--- a/src/rt/rust_srv.cpp
+++ b/src/rt/rust_srv.cpp
@@ -7,13 +7,14 @@
rust_srv::rust_srv() :
local_region(this, false),
- synchronized_region(this, true),
- kernel(new rust_kernel(this)) {
+ synchronized_region(this, true) {
// Nop.
}
rust_srv::~rust_srv() {
- // Nop.
+// char msg[1024];
+// snprintf(msg, sizeof(msg), "~rust_srv %" PRIxPTR, (uintptr_t) this);
+// log(msg);
}
void
@@ -74,3 +75,8 @@ rust_srv::warning(char const *expression,
expression, file, (int)line, buf);
log(msg);
}
+
+rust_srv *
+rust_srv::clone() {
+ return new rust_srv();
+}
diff --git a/src/rt/rust_srv.h b/src/rt/rust_srv.h
index ab646ae6..465b03b7 100644
--- a/src/rt/rust_srv.h
+++ b/src/rt/rust_srv.h
@@ -7,7 +7,6 @@ class rust_srv {
public:
memory_region local_region;
memory_region synchronized_region;
- rust_kernel *kernel;
virtual void log(char const *msg);
virtual void fatal(char const *expression,
char const *file,
@@ -24,6 +23,7 @@ public:
virtual void *realloc(void *, size_t);
rust_srv();
virtual ~rust_srv();
+ virtual rust_srv *clone();
};
#endif /* RUST_SRV_H */
diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp
index 08d5974e..408996f6 100644
--- a/src/rt/rust_task.cpp
+++ b/src/rt/rust_task.cpp
@@ -404,9 +404,10 @@ rust_task::notify_tasks_waiting_to_join() {
tasks_waiting_to_join.pop(&waiting_task);
if (waiting_task->is_proxy()) {
notify_message::send(notify_message::WAKEUP, "wakeup",
- this, waiting_task->as_proxy());
+ get_handle(), waiting_task->as_proxy()->handle());
+ delete waiting_task;
} else {
- rust_task *task = waiting_task->delegate();
+ rust_task *task = waiting_task->referent();
if (task->dead() == false) {
task->wakeup(this);
}
@@ -563,8 +564,7 @@ rust_task::transition(ptr_vec<rust_task> *src, ptr_vec<rust_task> *dst)
}
void
-rust_task::block(rust_cond *on, const char* name)
-{
+rust_task::block(rust_cond *on, const char* name) {
log(rust_log::TASK, "Blocking on 0x%" PRIxPTR ", cond: 0x%" PRIxPTR,
(uintptr_t) on, (uintptr_t) cond);
A(dom, cond == NULL, "Cannot block an already blocked task.");
@@ -631,6 +631,11 @@ rust_task::log(uint32_t type_bits, char const *fmt, ...) {
}
}
+rust_handle<rust_task> *
+rust_task::get_handle() {
+ return dom->kernel->get_task_handle(this);
+}
+
//
// Local Variables:
// mode: C++
diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h
index 9a4e0c7b..383707f0 100644
--- a/src/rt/rust_task.h
+++ b/src/rt/rust_task.h
@@ -50,6 +50,7 @@ rust_task : public maybe_proxy<rust_task>,
rust_task(rust_dom *dom,
rust_task *spawner,
const char *name);
+
~rust_task();
void start(uintptr_t exit_task_glue,
@@ -110,6 +111,8 @@ rust_task : public maybe_proxy<rust_task>,
// Notify tasks waiting for us that we are about to die.
void notify_tasks_waiting_to_join();
+ rust_handle<rust_task> * get_handle();
+
uintptr_t get_fp();
uintptr_t get_previous_fp(uintptr_t fp);
frame_glue_fns *get_frame_glue_fns(uintptr_t fp);
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index 0e3961bc..4b5fa1d6 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -23,20 +23,6 @@
(task)->dom->get_log().indent();
#endif
-void
-log_task_state(rust_task *task, maybe_proxy<rust_task> *target) {
- rust_task *delegate = target->delegate();
- if (target->is_proxy()) {
- task->log(rust_log::TASK,
- "remote task: 0x%" PRIxPTR ", ref_count: %d state: %s",
- delegate, delegate->ref_count, delegate->state_str());
- } else {
- task->log(rust_log::TASK,
- "local task: 0x%" PRIxPTR ", ref_count: %d state: %s",
- delegate, delegate->ref_count, delegate->state_str());
- }
-}
-
extern "C" CDECL char const *
str_buf(rust_task *task, rust_str *s);
@@ -104,7 +90,7 @@ upcall_new_chan(rust_task *task, rust_port *port) {
"task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")",
(uintptr_t) task, task->name, port);
I(dom, port);
- return new (dom) rust_chan(task, port);
+ return new (dom) rust_chan(task, port, port->unit_sz);
}
/**
@@ -135,43 +121,55 @@ void upcall_del_chan(rust_task *task, rust_chan *chan) {
"Channel's ref count should be zero.");
if (chan->is_associated()) {
- // We're trying to delete a channel that another task may be reading
- // from. We have two options:
- //
- // 1. We can flush the channel by blocking in upcall_flush_chan()
- // and resuming only when the channel is flushed. The problem
- // here is that we can get ourselves in a deadlock if the parent
- // task tries to join us.
- //
- // 2. We can leave the channel in a "dormnat" state by not freeing
- // it and letting the receiver task delete it for us instead.
- if (chan->buffer.is_empty() == false) {
- return;
+ if (chan->port->is_proxy()) {
+ // Here is a good place to delete the port proxy we allocated
+ // in upcall_clone_chan.
+ rust_proxy<rust_port> *proxy = chan->port->as_proxy();
+ chan->disassociate();
+ delete proxy;
+ } else {
+ // We're trying to delete a channel that another task may be
+ // reading from. We have two options:
+ //
+ // 1. We can flush the channel by blocking in upcall_flush_chan()
+ // and resuming only when the channel is flushed. The problem
+ // here is that we can get ourselves in a deadlock if the
+ // parent task tries to join us.
+ //
+ // 2. We can leave the channel in a "dormnat" state by not freeing
+ // it and letting the receiver task delete it for us instead.
+ if (chan->buffer.is_empty() == false) {
+ return;
+ }
+ chan->disassociate();
}
- chan->disassociate();
}
delete chan;
}
/**
* Clones a channel and stores it in the spawnee's domain. Each spawned task
- * has it's own copy of the channel.
+ * has its own copy of the channel.
*/
extern "C" CDECL rust_chan *
-upcall_clone_chan(rust_task *task,
- maybe_proxy<rust_task> *target,
+upcall_clone_chan(rust_task *task, maybe_proxy<rust_task> *target,
rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
- task->log(rust_log::UPCALL | rust_log::COMM,
- "target: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR,
- target, chan);
- rust_task *target_task = target->delegate();
+ size_t unit_sz = chan->buffer.unit_sz;
maybe_proxy<rust_port> *port = chan->port;
- if (target->is_proxy()) {
- port = target_task->dom->get_port_proxy_synchronized(
- chan->port->as_delegate());
+ rust_task *target_task = NULL;
+ if (target->is_proxy() == false) {
+ port = chan->port;
+ target_task = target->referent();
+ } else {
+ rust_handle<rust_port> *handle =
+ task->dom->kernel->get_port_handle(port->as_referent());
+ maybe_proxy<rust_port> *proxy = new rust_proxy<rust_port> (handle);
+ task->log(rust_log::MEM, "new proxy: " PTR, proxy);
+ port = proxy;
+ target_task = target->as_proxy()->handle()->referent();
}
- return new (target_task->dom) rust_chan(target_task, port);
+ return new (target_task->dom) rust_chan(target_task, port, unit_sz);
}
extern "C" CDECL void
@@ -193,17 +191,15 @@ upcall_sleep(rust_task *task, size_t time_in_us) {
extern "C" CDECL void
upcall_join(rust_task *task, maybe_proxy<rust_task> *target) {
LOG_UPCALL_ENTRY(task);
- rust_task *target_task = target->delegate();
- task->log(rust_log::UPCALL | rust_log::COMM,
- "target: 0x%" PRIxPTR ", task: %s @0x%" PRIxPTR,
- target, target_task->name, target_task);
if (target->is_proxy()) {
- notify_message::
- send(notify_message::JOIN, "join", task, target->as_proxy());
- task->block(target_task, "joining remote task");
+ rust_handle<rust_task> *task_handle = target->as_proxy()->handle();
+ notify_message::send(notify_message::JOIN, "join",
+ task->get_handle(), task_handle);
+ task->block(task_handle, "joining remote task");
task->yield(2);
} else {
+ rust_task *target_task = target->referent();
// If the other task is already dying, we don't have to wait for it.
if (target_task->dead() == false) {
target_task->tasks_waiting_to_join.push(task);
@@ -221,10 +217,6 @@ upcall_join(rust_task *task, maybe_proxy<rust_task> *target) {
extern "C" CDECL void
upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
LOG_UPCALL_ENTRY(task);
- task->log(rust_log::UPCALL | rust_log::COMM,
- "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
- (uintptr_t) chan, (uintptr_t) sptr,
- chan->port->delegate()->unit_sz);
chan->send(sptr);
task->log(rust_log::COMM, "=== sent data ===>");
}
@@ -269,21 +261,14 @@ upcall_fail(rust_task *task,
extern "C" CDECL void
upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) {
LOG_UPCALL_ENTRY(task);
- log_task_state(task, target);
- rust_task *target_task = target->delegate();
-
- task->log(rust_log::UPCALL | rust_log::TASK,
- "kill task %s @0x%" PRIxPTR ", ref count %d",
- target_task->name, target_task,
- target_task->ref_count);
-
if (target->is_proxy()) {
notify_message::
- send(notify_message::KILL, "kill", task, target->as_proxy());
+ send(notify_message::KILL, "kill", task->get_handle(),
+ target->as_proxy()->handle());
// The proxy ref_count dropped to zero, delete it here.
delete target->as_proxy();
} else {
- target_task->kill();
+ target->referent()->kill();
}
}
@@ -554,25 +539,6 @@ upcall_get_type_desc(rust_task *task,
return td;
}
-#if defined(__WIN32__)
-static DWORD WINAPI rust_thread_start(void *ptr)
-#elif defined(__GNUC__)
-static void *rust_thread_start(void *ptr)
-#else
-#error "Platform not supported"
-#endif
-{
- // We were handed the domain we are supposed to run.
- rust_dom *dom = (rust_dom *) ptr;
-
- // Start a new rust main loop for this thread.
- dom->start_main_loop();
- rust_srv *srv = dom->srv;
- srv->kernel->deregister_domain(dom);
- delete dom;
- return 0;
-}
-
extern "C" CDECL rust_task *
upcall_new_task(rust_task *spawner, const char *name) {
LOG_UPCALL_ENTRY(spawner);
@@ -604,54 +570,76 @@ upcall_start_task(rust_task *spawner,
return task;
}
+/**
+ * Called whenever a new domain is created.
+ */
extern "C" CDECL maybe_proxy<rust_task> *
upcall_new_thread(rust_task *task, const char *name) {
LOG_UPCALL_ENTRY(task);
-
- rust_dom *old_dom = task->dom;
- rust_dom *new_dom = new rust_dom(old_dom->srv,
- old_dom->root_crate,
- name);
- old_dom->srv->kernel->register_domain(new_dom);
+ rust_dom *parent_dom = task->dom;
+ rust_kernel *kernel = parent_dom->kernel;
+ rust_handle<rust_dom> *child_dom_handle =
+ kernel->create_domain(parent_dom->root_crate, name);
+ rust_handle<rust_task> *child_task_handle =
+ kernel->get_task_handle(child_dom_handle->referent()->root_task);
task->log(rust_log::UPCALL | rust_log::MEM,
- "upcall new_thread(%s) = dom 0x%" PRIxPTR " task 0x%" PRIxPTR,
- name, new_dom, new_dom->root_task);
- rust_proxy<rust_task> *proxy =
- new (old_dom) rust_proxy<rust_task>(old_dom,
- new_dom->root_task, true);
- task->log(rust_log::UPCALL | rust_log::MEM,
- "new proxy = 0x%" PRIxPTR " -> task = 0x%" PRIxPTR,
- proxy, proxy->delegate());
- return proxy;
+ "child name: %s, child_dom_handle: " PTR
+ ", child_task_handle: " PTR,
+ name, child_dom_handle, child_task_handle);
+ rust_proxy<rust_task> *child_task_proxy =
+ new rust_proxy<rust_task> (child_task_handle);
+ return child_task_proxy;
+}
+
+#if defined(__WIN32__)
+static DWORD WINAPI rust_thread_start(void *ptr)
+#elif defined(__GNUC__)
+static void *rust_thread_start(void *ptr)
+#else
+#error "Platform not supported"
+#endif
+{
+ // We were handed the domain we are supposed to run.
+ rust_dom *dom = (rust_dom *) ptr;
+
+ // Start a new rust main loop for this thread.
+ dom->start_main_loop();
+
+ // Destroy the domain.
+ dom->kernel->destroy_domain(dom);
+
+ return 0;
}
+/**
+ * Called after a new domain is created. Here we create a new thread and
+ * and start the domain main loop.
+ */
extern "C" CDECL maybe_proxy<rust_task> *
-upcall_start_thread(rust_task *spawner,
- maybe_proxy<rust_task> *root_task_proxy,
+upcall_start_thread(rust_task *task,
+ rust_proxy<rust_task> *child_task_proxy,
uintptr_t exit_task_glue,
uintptr_t spawnee_fn,
size_t callsz) {
- LOG_UPCALL_ENTRY(spawner);
-
- rust_dom *dom = spawner->dom;
- rust_task *root_task = root_task_proxy->delegate();
- dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK,
- "upcall start_thread(exit_task_glue 0x%" PRIxPTR
- ", spawnee 0x%" PRIxPTR
- ", callsz %" PRIdPTR ")", exit_task_glue, spawnee_fn, callsz);
- root_task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz);
-
+ LOG_UPCALL_ENTRY(task);
+ rust_dom *parenet_dom = task->dom;
+ rust_handle<rust_task> *child_task_handle = child_task_proxy->handle();
+ task->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK,
+ "exit_task_glue: " PTR ", spawnee_fn " PTR
+ ", callsz %" PRIdPTR ")", exit_task_glue, spawnee_fn, callsz);
+ rust_task *child_task = child_task_handle->referent();
+ child_task->start(exit_task_glue, spawnee_fn, task->rust_sp, callsz);
#if defined(__WIN32__)
HANDLE thread;
- thread = CreateThread(NULL, 0, rust_thread_start, root_task->dom,
- 0, NULL);
- dom->win32_require("CreateThread", thread != NULL);
+ thread = CreateThread(NULL, 0, rust_thread_start, child_task->dom, 0,
+ NULL);
+ parenet_dom->win32_require("CreateThread", thread != NULL);
#else
pthread_t thread;
- pthread_create(&thread, &dom->attr, rust_thread_start,
- (void *) root_task->dom);
+ pthread_create(&thread, &parenet_dom->attr, rust_thread_start,
+ (void *) child_task->dom);
#endif
- return root_task_proxy;
+ return child_task_proxy;
}
//
diff --git a/src/rt/sync/lock_free_queue.h b/src/rt/sync/lock_free_queue.h
index 5ebc70f1..ac0c5b04 100644
--- a/src/rt/sync/lock_free_queue.h
+++ b/src/rt/sync/lock_free_queue.h
@@ -98,8 +98,6 @@ class lock_free_queue {
}
public:
- int32_t list_index;
-
lock_free_queue() {
// We can only handle 64bit CAS for counted pointers, so this will
// not work with 64bit pointers.
diff --git a/src/rt/util/indexed_list.h b/src/rt/util/indexed_list.h
index b93945cc..d643a050 100644
--- a/src/rt/util/indexed_list.h
+++ b/src/rt/util/indexed_list.h
@@ -28,10 +28,10 @@ public:
* object inserted in this list must define a "int32_t list_index" member.
*/
template<typename T> class indexed_list {
- memory_region &region;
+ memory_region *region;
array_list<T*> list;
public:
- indexed_list(memory_region &region) : region(region) {}
+ indexed_list(memory_region *region) : region(region) {}
virtual int32_t append(T *value);
virtual bool pop(T **value);
virtual size_t length() {