diff options
Diffstat (limited to 'src/rt')
| -rw-r--r-- | src/rt/circular_buffer.cpp | 4 | ||||
| -rw-r--r-- | src/rt/globals.h | 2 | ||||
| -rw-r--r-- | src/rt/rust.cpp | 10 | ||||
| -rw-r--r-- | src/rt/rust_builtin.cpp | 6 | ||||
| -rw-r--r-- | src/rt/rust_chan.cpp | 27 | ||||
| -rw-r--r-- | src/rt/rust_chan.h | 2 | ||||
| -rw-r--r-- | src/rt/rust_dom.cpp | 127 | ||||
| -rw-r--r-- | src/rt/rust_dom.h | 9 | ||||
| -rw-r--r-- | src/rt/rust_internal.h | 4 | ||||
| -rw-r--r-- | src/rt/rust_log.cpp | 1 | ||||
| -rw-r--r-- | src/rt/rust_message.cpp | 3 | ||||
| -rw-r--r-- | src/rt/rust_port.cpp | 40 | ||||
| -rw-r--r-- | src/rt/rust_port.h | 2 | ||||
| -rw-r--r-- | src/rt/rust_task.cpp | 56 | ||||
| -rw-r--r-- | src/rt/rust_task.h | 5 | ||||
| -rw-r--r-- | src/rt/rust_upcall.cpp | 142 | ||||
| -rw-r--r-- | src/rt/rust_util.h | 26 | ||||
| -rw-r--r-- | src/rt/sync/condition_variable.cpp | 28 | ||||
| -rw-r--r-- | src/rt/sync/condition_variable.h | 1 | ||||
| -rw-r--r-- | src/rt/sync/sync.cpp | 12 | ||||
| -rw-r--r-- | src/rt/sync/sync.h | 9 | ||||
| -rw-r--r-- | src/rt/util/array_list.h | 10 |
22 files changed, 362 insertions, 164 deletions
diff --git a/src/rt/circular_buffer.cpp b/src/rt/circular_buffer.cpp index a93f2572..dbf5059b 100644 --- a/src/rt/circular_buffer.cpp +++ b/src/rt/circular_buffer.cpp @@ -31,9 +31,7 @@ circular_buffer::circular_buffer(rust_dom *dom, size_t unit_sz) : } circular_buffer::~circular_buffer() { - dom->log(rust_log::MEM | rust_log::COMM, - "~circular_buffer 0x%" PRIxPTR, - this); + dom->log(rust_log::MEM, "~circular_buffer 0x%" PRIxPTR, this); I(dom, _buffer); W(dom, _unread == 0, "~circular_buffer with unread messages."); dom->free(_buffer); diff --git a/src/rt/globals.h b/src/rt/globals.h index f8025a40..85aa5860 100644 --- a/src/rt/globals.h +++ b/src/rt/globals.h @@ -20,12 +20,14 @@ extern "C" { } #elif defined(__GNUC__) #include <unistd.h> +#include <sys/time.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <dlfcn.h> #include <pthread.h> #include <errno.h> +#include <time.h> #else #error "Platform not supported." #endif diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index 9cc2fe41..d0215edc 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -53,7 +53,9 @@ rust_srv::realloc(void *p, size_t bytes) } void * val = ::realloc(p, bytes); #ifdef TRACK_ALLOCATIONS - if (allocation_list.replace(p, val) == NULL) { + if (allocation_list.replace(p, val) == false) { + printf("realloc: ptr 0x%" PRIxPTR " is not in allocation_list\n", + (uintptr_t) p); fatal("not in allocation_list", __FILE__, __LINE__); } #endif @@ -64,8 +66,8 @@ void rust_srv::free(void *p) { #ifdef TRACK_ALLOCATIONS - if (allocation_list.replace(p, NULL) == NULL) { - printf("ptr 0x%" PRIxPTR " is not in allocation_list\n", + if (allocation_list.replace(p, NULL) == false) { + printf("free: ptr 0x%" PRIxPTR " is not in allocation_list\n", (uintptr_t) p); fatal("not in allocation_list", __FILE__, __LINE__); } @@ -182,7 +184,7 @@ rust_start(uintptr_t main_fn, rust_crate const *crate, int argc, char **argv) int ret; { rust_srv srv; - rust_dom dom(&srv, crate); + rust_dom dom(&srv, crate, "main"); command_line_args args(dom, argc, argv); dom.log(rust_log::DOM, "startup: %d args", args.argc); diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index 657109c6..d8d9b8d6 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -115,6 +115,12 @@ str_buf(rust_task *task, rust_str *s) return (char const *)&s->data[0]; } +extern "C" CDECL size_t +str_byte_len(rust_task *task, rust_str *s) +{ + return s->fill - 1; // -1 for the '\0' terminator. +} + extern "C" CDECL void * vec_buf(rust_task *task, type_desc *ty, rust_vec *v, size_t offset) { diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index f107d287..2a0a61db 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -18,9 +18,11 @@ rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) : } rust_chan::~rust_chan() { - if (port && !port->is_proxy()) { - port->delegate()->chans.swap_delete(this); - } + task->log(rust_log::MEM | rust_log::COMM, + "del rust_chan(task=0x%" PRIxPTR ")", (uintptr_t) this); + + A(task->dom, is_associated() == false, + "Channel must be disassociated before being freed."); } /** @@ -28,7 +30,10 @@ rust_chan::~rust_chan() { */ void rust_chan::associate(maybe_proxy<rust_port> *port) { this->port = port; - if (!port->is_proxy()) { + if (port->is_proxy() == false) { + task->log(rust_log::TASK, + "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, + this, port); this->port->delegate()->chans.push(this); } } @@ -43,14 +48,23 @@ bool rust_chan::is_associated() { void rust_chan::disassociate() { A(task->dom, is_associated(), "Channel must be associated with a port."); + if (port->is_proxy() == false) { + task->log(rust_log::TASK, + "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, + this, port->delegate()); + port->delegate()->chans.swap_delete(this); + } + // Delete reference to the port. port = NULL; } /** - * Attempt to transmit channel data to the associated port. + * Attempt to send data to the associated port. */ -void rust_chan::transmit() { +void rust_chan::send(void *sptr) { + buffer.enqueue(sptr); + rust_dom *dom = task->dom; if (!is_associated()) { W(dom, is_associated(), @@ -81,7 +95,6 @@ void rust_chan::transmit() { return; } - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 055e359a..6aa98247 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -17,7 +17,7 @@ public: void disassociate(); bool is_associated(); - void transmit(); + void send(void *sptr); }; // diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index 99aaddb2..f7b8e97b 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -4,12 +4,16 @@ template class ptr_vec<rust_task>; +// Keeps track of all live domains, for debugging purposes. +array_list<rust_dom*> _live_domains; -rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate) : +rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate, + const char *name) : interrupt_flag(0), root_crate(root_crate), _log(srv, this), srv(srv), + name(name), running_tasks(this), blocked_tasks(this), dead_tasks(this), @@ -25,14 +29,19 @@ rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate) : pthread_attr_setstacksize(&attr, 1024 * 1024); pthread_attr_setdetachstate(&attr, true); #endif - root_task = new (this) rust_task(this, NULL); + root_task = new (this) rust_task(this, NULL, name); + + if (_live_domains.replace(NULL, this) == false) { + _live_domains.append(this); + } } static void del_all_tasks(rust_dom *dom, ptr_vec<rust_task> *v) { I(dom, v); while (v->length()) { - dom->log(rust_log::TASK, "deleting task %" PRIdPTR, v->length() - 1); + dom->log(rust_log::TASK, "deleting task 0x%" PRIdPTR, + v->length() - 1); delete v->pop(); } } @@ -42,23 +51,25 @@ rust_dom::delete_proxies() { rust_task *task; rust_proxy<rust_task> *task_proxy; while (_task_proxies.pop(&task, &task_proxy)) { - log(rust_log::TASK, "deleting proxy %" PRIxPTR - " in dom %" PRIxPTR, task_proxy, task_proxy->dom); + log(rust_log::TASK, + "deleting proxy 0x%" PRIxPTR " in dom %s 0x%" PRIxPTR, + task_proxy, task_proxy->dom->name, task_proxy->dom); delete task_proxy; } rust_port *port; rust_proxy<rust_port> *port_proxy; while (_port_proxies.pop(&port, &port_proxy)) { - log(rust_log::TASK, "deleting proxy %" PRIxPTR - " in dom %" PRIxPTR, port_proxy, port_proxy->dom); + log(rust_log::TASK, + "deleting proxy 0x%" PRIxPTR " in dom %s 0x%" PRIxPTR, + port_proxy, port_proxy->dom->name, port_proxy->dom); delete port_proxy; } } rust_dom::~rust_dom() { log(rust_log::MEM | rust_log::DOM, - "~rust_dom 0x%" PRIxPTR, (uintptr_t)this); + "~rust_dom %s @0x%" PRIxPTR, name, (uintptr_t)this); log(rust_log::TASK, "deleting all proxies"); delete_proxies(); @@ -73,6 +84,8 @@ rust_dom::~rust_dom() { #endif while (caches.length()) delete caches.pop(); + + _live_domains.replace(this, NULL); } void @@ -124,7 +137,8 @@ rust_dom::logptr(char const *msg, T* ptrval) { void rust_dom::fail() { - log(rust_log::DOM, "domain 0x%" PRIxPTR " root task failed", this); + log(rust_log::DOM, "domain %s @0x%" PRIxPTR " root task failed", + name, this); I(this, rval == 0); rval = 1; } @@ -133,8 +147,9 @@ void * rust_dom::malloc(size_t sz) { void *p = srv->malloc(sz); I(this, p); - log(rust_log::MEM, "0x%" PRIxPTR " rust_dom::malloc(%d) -> 0x%" PRIxPTR, - (uintptr_t) this, sz, p); + log(rust_log::MEM, + "%s @0x%" PRIxPTR " rust_dom::malloc(%d) -> 0x%" PRIxPTR, + name, (uintptr_t) this, sz, p); return p; } @@ -190,8 +205,8 @@ void rust_dom::add_task_to_state_vec(ptr_vec<rust_task> *v, rust_task *task) { log(rust_log::MEM|rust_log::TASK, - "adding task 0x%" PRIxPTR " in state '%s' to vec 0x%" PRIxPTR, - (uintptr_t)task, state_vec_name(v), (uintptr_t)v); + "adding task %s @0x%" PRIxPTR " in state '%s' to vec 0x%" PRIxPTR, + task->name, (uintptr_t)task, state_vec_name(v), (uintptr_t)v); v->push(task); } @@ -200,8 +215,8 @@ void rust_dom::remove_task_from_state_vec(ptr_vec<rust_task> *v, rust_task *task) { log(rust_log::MEM|rust_log::TASK, - "removing task 0x%" PRIxPTR " in state '%s' from vec 0x%" PRIxPTR, - (uintptr_t)task, state_vec_name(v), (uintptr_t)v); + "removing task %s @0x%" PRIxPTR " in state '%s' from vec 0x%" PRIxPTR, + task->name, (uintptr_t)task, state_vec_name(v), (uintptr_t)v); I(this, (*v)[task->idx] == task); v->swap_delete(task); } @@ -226,10 +241,10 @@ rust_dom::reap_dead_tasks() { rust_task *task = dead_tasks[i]; if (task->ref_count == 0) { I(this, task->tasks_waiting_to_join.is_empty()); - dead_tasks.swap_delete(task); log(rust_log::TASK, - "deleting unreferenced dead task 0x%" PRIxPTR, task); + "deleting unreferenced dead task %s @0x%" PRIxPTR, + task->name, task); delete task; continue; } @@ -243,14 +258,14 @@ rust_dom::reap_dead_tasks() { */ void rust_dom::send_message(rust_message *message) { log(rust_log::COMM, "==> enqueueing \"%s\" 0x%" PRIxPTR - " in queue 0x%" PRIxPTR, + " in queue 0x%" PRIxPTR + " in domain 0x%" PRIxPTR, message->label, message, - &_incoming_message_queue); + &_incoming_message_queue, + this); A(this, message->dom == this, "Message owned by non-local domain."); _incoming_message_queue.enqueue(message); - _incoming_message_pending.signal(); - _progress.signal(); } /** @@ -272,7 +287,7 @@ rust_dom::get_task_proxy(rust_task *task) { if (_task_proxies.get(task, &proxy)) { return proxy; } - log(rust_log::COMM, "no proxy for 0x%" PRIxPTR, task); + log(rust_log::COMM, "no proxy for %s @0x%" PRIxPTR, task->name, task); proxy = new (this) rust_proxy<rust_task> (this, task, false); _task_proxies.put(task, proxy); return proxy; @@ -322,12 +337,20 @@ rust_dom::schedule_task() } void +rust_dom::log_all_state() { + for (uint32_t i = 0; i < _live_domains.size(); i++) { + _live_domains[i]->log_state(); + } +} + +void rust_dom::log_state() { if (!running_tasks.is_empty()) { log(rust_log::TASK, "running tasks:"); for (size_t i = 0; i < running_tasks.length(); i++) { log(rust_log::TASK, - "\t task: 0x%" PRIxPTR, running_tasks[i]); + "\t task: %s @0x%" PRIxPTR, + running_tasks[i]->name, running_tasks[i]); } } @@ -335,15 +358,18 @@ rust_dom::log_state() { log(rust_log::TASK, "blocked tasks:"); for (size_t i = 0; i < blocked_tasks.length(); i++) { log(rust_log::TASK, - "\t task: 0x%" PRIxPTR ", blocked on: 0x%" PRIxPTR, - blocked_tasks[i], blocked_tasks[i]->cond); + "\t task: %s @0x%" PRIxPTR ", blocked on: 0x%" PRIxPTR, + blocked_tasks[i]->name, blocked_tasks[i], + blocked_tasks[i]->cond); } } if (!dead_tasks.is_empty()) { log(rust_log::TASK, "dead tasks:"); for (size_t i = 0; i < dead_tasks.length(); i++) { - log(rust_log::TASK, "\t task: 0x%" PRIxPTR, dead_tasks[i]); + log(rust_log::TASK, "\t task: %s 0x%" PRIxPTR ", ref_count: %d", + dead_tasks[i], dead_tasks[i]->name, + dead_tasks[i]->ref_count); } } } @@ -360,7 +386,8 @@ rust_dom::start_main_loop() // Make sure someone is watching, to pull us out of infinite loops. rust_timer timer(this); - log(rust_log::DOM, "running main-loop on domain 0x%" PRIxPTR, this); + log(rust_log::DOM, "running main-loop on domain %s @0x%" PRIxPTR, + name, this); logptr("exit-task glue", root_crate->get_exit_task_glue()); while (n_live_tasks() > 0) { @@ -373,29 +400,38 @@ rust_dom::start_main_loop() // if progress is made in other domains. if (scheduled_task == NULL) { - log(rust_log::TASK, - "all tasks are blocked, waiting for progress ..."); - if (_log.is_tracing(rust_log::TASK)) + if (_log.is_tracing(rust_log::TASK)) { log_state(); - _progress.wait(); + } log(rust_log::TASK, - "progress made, resuming ..."); + "all tasks are blocked, scheduler yielding ..."); + sync::yield(); + log(rust_log::TASK, + "scheduler resuming ..."); continue; } I(this, scheduled_task->running()); log(rust_log::TASK, - "activating task 0x%" PRIxPTR ", sp=0x%" PRIxPTR, - (uintptr_t)scheduled_task, scheduled_task->rust_sp); + "activating task %s 0x%" PRIxPTR + ", sp=0x%" PRIxPTR + ", ref_count=%d" + ", state: %s", + scheduled_task->name, + (uintptr_t)scheduled_task, + scheduled_task->rust_sp, + scheduled_task->ref_count, + scheduled_task->state_str()); interrupt_flag = 0; activate(scheduled_task); log(rust_log::TASK, - "returned from task 0x%" PRIxPTR + "returned from task %s @0x%" PRIxPTR " in state '%s', sp=0x%" PRIxPTR, + scheduled_task->name, (uintptr_t)scheduled_task, state_vec_name(scheduled_task->state), scheduled_task->rust_sp); @@ -410,20 +446,15 @@ rust_dom::start_main_loop() log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ..."); while (dead_tasks.length() > 0) { - log(rust_log::DOM, - "waiting for %d dead tasks to become dereferenced ...", - dead_tasks.length()); - - if (_log.is_tracing(rust_log::DOM)) { - for (size_t i = 0; i < dead_tasks.length(); i++) { - log(rust_log::DOM, - "task: 0x%" PRIxPTR ", index: %d, ref_count: %d", - dead_tasks[i], i, dead_tasks[i]->ref_count); - } - } - if (_incoming_message_queue.is_empty()) { - _incoming_message_pending.wait(); + log(rust_log::DOM, + "waiting for %d dead tasks to become dereferenced, " + "scheduler yielding ...", + dead_tasks.length()); + if (_log.is_tracing(rust_log::TASK)) { + log_state(); + } + sync::yield(); } else { drain_incoming_message_queue(); } diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h index 528790d5..3aaf635a 100644 --- a/src/rt/rust_dom.h +++ b/src/rt/rust_dom.h @@ -25,6 +25,7 @@ struct rust_dom rust_crate const *root_crate; rust_log _log; rust_srv *srv; + const char *const name; ptr_vec<rust_task> running_tasks; ptr_vec<rust_task> blocked_tasks; ptr_vec<rust_task> dead_tasks; @@ -34,20 +35,19 @@ struct rust_dom rust_task *curr_task; int rval; - condition_variable _progress; - hash_map<rust_task *, rust_proxy<rust_task> *> _task_proxies; hash_map<rust_port *, rust_proxy<rust_port> *> _port_proxies; // Incoming messages from other domains. - condition_variable _incoming_message_pending; lock_free_queue _incoming_message_queue; #ifndef __WIN32__ pthread_attr_t attr; #endif - rust_dom(rust_srv *srv, rust_crate const *root_crate); + // Only a pointer to 'name' is kept, so it must live as long as this + // domain. + rust_dom(rust_srv *srv, rust_crate const *root_crate, const char *name); ~rust_dom(); void activate(rust_task *task); @@ -84,6 +84,7 @@ struct rust_dom int start_main_loop(); void log_state(); + static void log_all_state(); }; // diff --git a/src/rt/rust_internal.h b/src/rt/rust_internal.h index a89144d7..46ea843f 100644 --- a/src/rt/rust_internal.h +++ b/src/rt/rust_internal.h @@ -38,6 +38,7 @@ extern "C" { #error "Platform not supported." #endif +#include "sync/sync.h" #include "sync/condition_variable.h" #ifndef __i386__ @@ -75,7 +76,7 @@ template <typename T> struct rc_base { - size_t ref_count; + int32_t ref_count; void ref() { ++ref_count; @@ -151,6 +152,7 @@ public: T *& operator[](size_t offset); void push(T *p); T *pop(); + T *peek(); void trim(size_t fill); void swap_delete(T* p); }; diff --git a/src/rt/rust_log.cpp b/src/rt/rust_log.cpp index 48fcd0f1..8e089791 100644 --- a/src/rt/rust_log.cpp +++ b/src/rt/rust_log.cpp @@ -27,6 +27,7 @@ read_type_bit_mask() { bits |= strstr(env_str, "timer") ? rust_log::TIMER : 0; bits |= strstr(env_str, "gc") ? rust_log::GC : 0; bits |= strstr(env_str, "all") ? rust_log::ALL : 0; + bits = strstr(env_str, "none") ? 0 : bits; } return bits; } diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp index 8b396b4d..1de804c9 100644 --- a/src/rt/rust_message.cpp +++ b/src/rt/rust_message.cpp @@ -90,8 +90,7 @@ send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source, } void data_message::process() { - _port->remote_channel->buffer.enqueue(_buffer); - _port->remote_channel->transmit(); + _port->remote_channel->send(_buffer); _target->log(rust_log::COMM, "<=== received data via message ==="); } diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index 0a5b7ee7..c97b5d41 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -17,16 +17,50 @@ rust_port::~rust_port() { task->log(rust_log::COMM | rust_log::MEM, "~rust_port 0x%" PRIxPTR, (uintptr_t) this); + log_state(); + // Disassociate channels from this port. while (chans.is_empty() == false) { - chans.pop()->disassociate(); + rust_chan *chan = chans.peek(); + chan->disassociate(); } - // We're the only ones holding a reference to the remote channel, so - // clean it up. delete remote_channel; } +bool rust_port::receive(void *dptr) { + for (uint32_t i = 0; i < chans.length(); i++) { + rust_chan *chan = chans[i]; + if (chan->buffer.is_empty() == false) { + chan->buffer.dequeue(dptr); + if (chan->buffer.is_empty() && chan->task->blocked()) { + task->log(rust_log::COMM, + "chan: 0x%" PRIxPTR + " is flushing, wakeup task: 0x%" PRIxPTR, + chan, chan->task); + chan->task->wakeup(this); + } + task->log(rust_log::COMM, "<=== read data ==="); + return true; + } + } + return false; +} + +void rust_port::log_state() { + task->log(rust_log::COMM, + "rust_port: 0x%" PRIxPTR ", associated channel(s): %d", + this, chans.length()); + for (uint32_t i = 0; i < chans.length(); i++) { + rust_chan *chan = chans[i]; + task->log(rust_log::COMM, + "\tchan: 0x%" PRIxPTR ", data pending: %s, remote: %s", + chan, + !chan->buffer.is_empty() ? "yes" : "no", + chan == remote_channel ? "yes" : "no"); + } +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 49a89437..7a58f839 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -15,6 +15,8 @@ public: rust_port(rust_task *task, size_t unit_sz); ~rust_port(); + void log_state(); + bool receive(void *dptr); }; // diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 9f4fa611..aca8bca7 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -52,7 +52,7 @@ align_down(uintptr_t sp) } -rust_task::rust_task(rust_dom *dom, rust_task *spawner) : +rust_task::rust_task(rust_dom *dom, rust_task *spawner, const char *name) : maybe_proxy<rust_task>(this), stk(new_stk(dom, 0)), runtime_sp(0), @@ -60,6 +60,7 @@ rust_task::rust_task(rust_dom *dom, rust_task *spawner) : gc_alloc_chain(0), dom(dom), cache(NULL), + name(name), state(&dom->running_tasks), cond(NULL), supervisor(spawner), @@ -77,8 +78,8 @@ rust_task::rust_task(rust_dom *dom, rust_task *spawner) : rust_task::~rust_task() { dom->log(rust_log::MEM|rust_log::TASK, - "~rust_task 0x%" PRIxPTR ", refcnt=%d", - (uintptr_t)this, ref_count); + "~rust_task %s @0x%" PRIxPTR ", refcnt=%d", + name, (uintptr_t)this, ref_count); /* for (uintptr_t fp = get_fp(); fp; fp = get_previous_fp(fp)) { @@ -311,7 +312,7 @@ void rust_task::yield(size_t nargs) { log(rust_log::TASK, - "task 0x%" PRIxPTR " yielding", this); + "task %s @0x%" PRIxPTR " yielding", name, this); run_after_return(nargs, dom->root_crate->get_yield_glue()); } @@ -323,23 +324,29 @@ get_callee_save_fp(uintptr_t *top_of_callee_saves) void rust_task::kill() { + if (dead()) { + // Task is already dead, can't kill what's already dead. + return; + } + // Note the distinction here: kill() is when you're in an upcall // from task A and want to force-fail task B, you do B->kill(). // If you want to fail yourself you do self->fail(upcall_nargs). - log(rust_log::TASK, "killing task 0x%" PRIxPTR, this); + log(rust_log::TASK, "killing task %s @0x%" PRIxPTR, name, this); // Unblock the task so it can unwind. unblock(); if (this == dom->root_task) dom->fail(); + log(rust_log::TASK, "preparing to unwind task: 0x%" PRIxPTR, this); run_on_resume(dom->root_crate->get_unwind_glue()); } void rust_task::fail(size_t nargs) { // See note in ::kill() regarding who should call this. - dom->log(rust_log::TASK, "task 0x%" PRIxPTR " failing", this); + dom->log(rust_log::TASK, "task %s @0x%" PRIxPTR " failing", name, this); // Unblock the task so it can unwind. unblock(); if (this == dom->root_task) @@ -347,9 +354,9 @@ rust_task::fail(size_t nargs) { run_after_return(nargs, dom->root_crate->get_unwind_glue()); if (supervisor) { dom->log(rust_log::TASK, - "task 0x%" PRIxPTR - " propagating failure to supervisor 0x%" PRIxPTR, - this, supervisor); + "task %s @0x%" PRIxPTR + " propagating failure to supervisor %s @0x%" PRIxPTR, + name, this, supervisor->name, supervisor); supervisor->kill(); } } @@ -358,7 +365,7 @@ void rust_task::gc(size_t nargs) { dom->log(rust_log::TASK|rust_log::MEM, - "task 0x%" PRIxPTR " garbage collecting", this); + "task %s @0x%" PRIxPTR " garbage collecting", name, this); run_after_return(nargs, dom->root_crate->get_gc_glue()); } @@ -366,8 +373,9 @@ void rust_task::unsupervise() { dom->log(rust_log::TASK, - "task 0x%" PRIxPTR " disconnecting from supervisor 0x%" PRIxPTR, - this, supervisor); + "task %s @0x%" PRIxPTR + " disconnecting from supervisor %s @0x%" PRIxPTR, + name, this, supervisor->name, supervisor); supervisor = NULL; } @@ -468,8 +476,9 @@ rust_task::malloc(size_t sz, type_desc *td) if (td) { gc_alloc *gcm = (gc_alloc*) mem; dom->log(rust_log::TASK|rust_log::MEM|rust_log::GC, - "task 0x%" PRIxPTR " allocated %d GC bytes = 0x%" PRIxPTR, - (uintptr_t)this, sz, gcm); + "task %s @0x%" PRIxPTR + " allocated %d GC bytes = 0x%" PRIxPTR, + name, (uintptr_t)this, sz, gcm); memset((void*) gcm, 0, sizeof(gc_alloc)); link_gc(gcm); gcm->ctrl_word = (uintptr_t)td; @@ -488,8 +497,9 @@ rust_task::realloc(void *data, size_t sz, bool is_gc) sz += sizeof(gc_alloc); gcm = (gc_alloc*) dom->realloc((void*)gcm, sz); dom->log(rust_log::TASK|rust_log::MEM|rust_log::GC, - "task 0x%" PRIxPTR " reallocated %d GC bytes = 0x%" PRIxPTR, - (uintptr_t)this, sz, gcm); + "task %s @0x%" PRIxPTR + " reallocated %d GC bytes = 0x%" PRIxPTR, + name, (uintptr_t)this, sz, gcm); if (!gcm) return gcm; link_gc(gcm); @@ -507,21 +517,26 @@ rust_task::free(void *p, bool is_gc) gc_alloc *gcm = (gc_alloc*)(((char *)p) - sizeof(gc_alloc)); unlink_gc(gcm); dom->log(rust_log::TASK|rust_log::MEM|rust_log::GC, - "task 0x%" PRIxPTR " freeing GC memory = 0x%" PRIxPTR, - (uintptr_t)this, gcm); + "task %s @0x%" PRIxPTR " freeing GC memory = 0x%" PRIxPTR, + name, (uintptr_t)this, gcm); dom->free(gcm); } else { dom->free(p); } } +const char * +rust_task::state_str() { + return dom->state_vec_name(state); +} void rust_task::transition(ptr_vec<rust_task> *src, ptr_vec<rust_task> *dst) { I(dom, state == src); dom->log(rust_log::TASK, - "task 0x%" PRIxPTR " state change '%s' -> '%s'", + "task %s @0x%" PRIxPTR " state change '%s' -> '%s'", + name, (uintptr_t)this, dom->state_vec_name(src), dom->state_vec_name(dst)); @@ -551,9 +566,6 @@ rust_task::wakeup(rust_cond *from) A(dom, cond == from, "Cannot wake up blocked task on wrong condition."); transition(&dom->blocked_tasks, &dom->running_tasks); - // TODO: Signaling every time the task is awaken is kind of silly, - // do this a nicer way. - dom->_progress.signal(); I(dom, cond == from); cond = NULL; } diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index b657592a..27495e2c 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -21,6 +21,7 @@ rust_task : public maybe_proxy<rust_task>, rust_crate_cache *cache; // Fields known only to the runtime. + const char *const name; ptr_vec<rust_task> *state; rust_cond *cond; rust_task *supervisor; // Parent-link for failure propagation. @@ -41,8 +42,10 @@ rust_task : public maybe_proxy<rust_task>, rust_alarm alarm; + // Only a pointer to 'name' is kept, so it must live as long as this task. rust_task(rust_dom *dom, - rust_task *spawner); + rust_task *spawner, + const char *name); ~rust_task(); void start(uintptr_t exit_task_glue, diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 574cb703..90d6f6d9 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -6,19 +6,37 @@ #define LOG_UPCALL_ENTRY(task) \ (task)->dom->get_log().reset_indent(0); \ (task)->log(rust_log::UPCALL, \ - "> UPCALL %s - task: 0x%" PRIxPTR \ - " retpc: x%" PRIxPTR, \ + "> UPCALL %s - task: %s 0x%" PRIxPTR \ + " retpc: x%" PRIxPTR \ + " ref_count: %d", \ __FUNCTION__, \ - (task), __builtin_return_address(0)); \ + (task)->name, (task), \ + __builtin_return_address(0), \ + (task->ref_count)); \ (task)->dom->get_log().indent(); #else #define LOG_UPCALL_ENTRY(task) \ (task)->dom->get_log().reset_indent(0); \ (task)->log(rust_log::UPCALL, \ - "> UPCALL task: x%" PRIxPTR (task)); \ + "> UPCALL task: %s @x%" PRIxPTR, \ + (task)->name, (task)); \ (task)->dom->get_log().indent(); #endif +void +log_task_state(rust_task *task, maybe_proxy<rust_task> *target) { + rust_task *delegate = target->delegate(); + if (target->is_proxy()) { + task->log(rust_log::TASK, + "remote task: 0x%" PRIxPTR ", ref_count: %d state: %s", + delegate, delegate->ref_count, delegate->state_str()); + } else { + task->log(rust_log::TASK, + "local task: 0x%" PRIxPTR ", ref_count: %d state: %s", + delegate, delegate->ref_count, delegate->state_str()); + } +} + extern "C" CDECL char const *str_buf(rust_task *task, rust_str *s); extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { @@ -29,14 +47,13 @@ extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { extern "C" CDECL void upcall_log_int(rust_task *task, int32_t i) { LOG_UPCALL_ENTRY(task); task->log(rust_log::UPCALL | rust_log::ULOG, - "upcall log_int(0x%" PRIx32 " = %" PRId32 " = '%c')", i, i, - (char) i); + "rust: %" PRId32 " (0x%" PRIx32 ")", i, i); } extern "C" CDECL void upcall_log_str(rust_task *task, rust_str *str) { LOG_UPCALL_ENTRY(task); const char *c = str_buf(task, str); - task->log(rust_log::UPCALL | rust_log::ULOG, "upcall log_str(\"%s\")", c); + task->log(rust_log::UPCALL | rust_log::ULOG, "rust: %s", c); } extern "C" CDECL void upcall_trace_word(rust_task *task, uintptr_t i) { @@ -55,8 +72,8 @@ upcall_new_port(rust_task *task, size_t unit_sz) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, - "upcall_new_port(task=0x%" PRIxPTR ", unit_sz=%d)", - (uintptr_t) task, unit_sz); + "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)", + (uintptr_t) task, task->name, unit_sz); return new (dom) rust_port(task, unit_sz); } @@ -76,33 +93,58 @@ upcall_new_chan(rust_task *task, rust_port *port) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, - "upcall_new_chan(task=0x%" PRIxPTR ", port=0x%" PRIxPTR ")", - (uintptr_t) task, port); + "upcall_new_chan(" + "task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")", + (uintptr_t) task, task->name, port); I(dom, port); return new (dom) rust_chan(task, port); } /** + * Called whenever this channel needs to be flushed. This can happen due to a + * flush statement, or automatically whenever a channel's ref count is + * about to drop to zero. + */ +extern "C" CDECL void +upcall_flush_chan(rust_task *task, rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + rust_dom *dom = task->dom; + task->log(rust_log::UPCALL | rust_log::COMM, + "flush chan: 0x%" PRIxPTR, chan); + + if (chan->buffer.is_empty()) { + return; + } + + A(dom, chan->port->is_proxy() == false, + "Channels to remote ports should be flushed automatically."); + + // Block on the port until this channel has been completely drained + // by the port. + task->block(chan->port); + task->yield(2); +} + +/** * Called whenever the channel's ref count drops to zero. + * + * Cannot Yield: If the task were to unwind, the dropped ref would still + * appear to be live, causing modify-after-free errors. */ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); - I(dom, !chan->ref_count); - - if (!chan->buffer.is_empty() && chan->is_associated()) { - A(dom, !chan->port->is_proxy(), - "Channels to remote ports should be flushed automatically."); - // A target port may still be reading from this channel. - // Block on this channel until it has been completely drained - // by the port. - task->block(chan); - task->yield(2); - return; - } + A(task->dom, chan->ref_count == 0, + "Channel's ref count should be zero."); + + if (chan->is_associated()) { + A(task->dom, chan->buffer.is_empty(), + "Channel's buffer should be empty."); + chan->disassociate(); + } delete chan; } @@ -136,11 +178,11 @@ extern "C" CDECL void upcall_yield(rust_task *task) { extern "C" CDECL void upcall_join(rust_task *task, maybe_proxy<rust_task> *target) { LOG_UPCALL_ENTRY(task); + rust_task *target_task = target->delegate(); task->log(rust_log::UPCALL | rust_log::COMM, - "target: 0x%" PRIxPTR ", task: 0x%" PRIxPTR, - target, target->delegate()); + "target: 0x%" PRIxPTR ", task: %s @0x%" PRIxPTR, + target, target_task->name, target_task); - rust_task *target_task = target->delegate(); if (target->is_proxy()) { notify_message:: send(notify_message::JOIN, "join", task, target->as_proxy()); @@ -168,8 +210,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) { "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d", (uintptr_t) chan, (uintptr_t) sptr, chan->port->delegate()->unit_sz); - chan->buffer.enqueue(sptr); - chan->transmit(); + chan->send(sptr); task->log(rust_log::COMM, "=== sent data ===>"); } @@ -182,17 +223,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, port->chans.length()); - for (uint32_t i = 0; i < port->chans.length(); i++) { - rust_chan *chan = port->chans[i]; - if (chan->buffer.is_empty() == false) { - chan->buffer.dequeue(dptr); - if (chan->buffer.is_empty() && chan->task->blocked()) { - chan->task->wakeup(chan); - delete chan; - } - task->log(rust_log::COMM, "<=== read data ==="); - return; - } + if (port->receive(dptr)) { + return; } // No data was buffered on any incoming channel, so block this task @@ -219,11 +251,12 @@ extern "C" CDECL void upcall_fail(rust_task *task, char const *expr, extern "C" CDECL void upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) { LOG_UPCALL_ENTRY(task); + log_task_state(task, target); rust_task *target_task = target->delegate(); task->log(rust_log::UPCALL | rust_log::TASK, - "kill task 0x%" PRIxPTR ", ref count %d", - target_task, + "kill task %s @0x%" PRIxPTR ", ref count %d", + target_task->name, target_task, target_task->ref_count); if (target->is_proxy()) { @@ -244,6 +277,8 @@ upcall_exit(rust_task *task) { LOG_UPCALL_ENTRY(task); task->log(rust_log::UPCALL | rust_log::TASK, "task ref_count: %d", task->ref_count); + A(task->dom, task->ref_count >= 0, + "Task ref_count should not be negative on exit!"); task->die(); task->notify_tasks_waiting_to_join(); task->yield(1); @@ -498,14 +533,14 @@ static void *rust_thread_start(void *ptr) } extern "C" CDECL rust_task * -upcall_new_task(rust_task *spawner) { +upcall_new_task(rust_task *spawner, const char *name) { LOG_UPCALL_ENTRY(spawner); rust_dom *dom = spawner->dom; - rust_task *task = new (dom) rust_task(dom, spawner); + rust_task *task = new (dom) rust_task(dom, spawner, name); dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, - "upcall new_task(spawner 0x%" PRIxPTR ") = 0x%" PRIxPTR, - spawner, task); + "upcall new_task(spawner %s @0x%" PRIxPTR ", %s) = 0x%" PRIxPTR, + spawner->name, spawner, name, task); return task; } @@ -516,26 +551,27 @@ upcall_start_task(rust_task *spawner, rust_task *task, rust_dom *dom = spawner->dom; dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, - "upcall start_task(task 0x%" PRIxPTR + "upcall start_task(task %s @0x%" PRIxPTR " exit_task_glue 0x%" PRIxPTR ", spawnee 0x%" PRIxPTR - ", callsz %" PRIdPTR ")", task, exit_task_glue, spawnee_fn, - callsz); + ", callsz %" PRIdPTR ")", task->name, task, exit_task_glue, + spawnee_fn, callsz); task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz); return task; } extern "C" CDECL maybe_proxy<rust_task> * -upcall_new_thread(rust_task *task) { +upcall_new_thread(rust_task *task, const char *name) { LOG_UPCALL_ENTRY(task); rust_dom *old_dom = task->dom; rust_dom *new_dom = new rust_dom(old_dom->srv->clone(), - old_dom->root_crate); + old_dom->root_crate, + name); task->log(rust_log::UPCALL | rust_log::MEM, - "upcall new_thread() = dom 0x%" PRIxPTR " task 0x%" PRIxPTR, - new_dom, new_dom->root_task); + "upcall new_thread(%s) = dom 0x%" PRIxPTR " task 0x%" PRIxPTR, + name, new_dom, new_dom->root_task); rust_proxy<rust_task> *proxy = new (old_dom) rust_proxy<rust_task>(old_dom, new_dom->root_task, true); diff --git a/src/rt/rust_util.h b/src/rt/rust_util.h index 42082119..03b7766d 100644 --- a/src/rt/rust_util.h +++ b/src/rt/rust_util.h @@ -70,6 +70,13 @@ ptr_vec<T>::pop() } template <typename T> +T * +ptr_vec<T>::peek() +{ + return data[fill - 1]; +} + +template <typename T> void ptr_vec<T>::trim(size_t sz) { @@ -140,11 +147,20 @@ isaac_init(rust_dom *dom, randctx *rctx) CryptReleaseContext(hProv, 0)); } #else - int fd = open("/dev/urandom", O_RDONLY); - I(dom, fd > 0); - I(dom, read(fd, (void*) &rctx->randrsl, sizeof(rctx->randrsl)) - == sizeof(rctx->randrsl)); - I(dom, close(fd) == 0); + char *rust_seed = getenv("RUST_SEED"); + if (rust_seed != NULL) { + ub4 seed = (ub4) atoi(rust_seed); + for (size_t i = 0; i < RANDSIZ; i ++) { + memcpy(&rctx->randrsl[i], &seed, sizeof(ub4)); + seed = (seed + 0x7ed55d16) + (seed << 12); + } + } else { + int fd = open("/dev/urandom", O_RDONLY); + I(dom, fd > 0); + I(dom, read(fd, (void*) &rctx->randrsl, sizeof(rctx->randrsl)) + == sizeof(rctx->randrsl)); + I(dom, close(fd) == 0); + } #endif randinit(rctx, 1); } diff --git a/src/rt/sync/condition_variable.cpp b/src/rt/sync/condition_variable.cpp index d4032572..c34ab7f4 100644 --- a/src/rt/sync/condition_variable.cpp +++ b/src/rt/sync/condition_variable.cpp @@ -9,14 +9,16 @@ // #define TRACE -condition_variable::condition_variable() { #if defined(__WIN32__) +condition_variable::condition_variable() { _event = CreateEvent(NULL, FALSE, FALSE, NULL); +} #else +condition_variable::condition_variable() { pthread_cond_init(&_cond, NULL); pthread_mutex_init(&_mutex, NULL); -#endif } +#endif condition_variable::~condition_variable() { #if defined(__WIN32__) @@ -31,15 +33,31 @@ condition_variable::~condition_variable() { * Wait indefinitely until condition is signaled. */ void condition_variable::wait() { + timed_wait(0); +} + +void condition_variable::timed_wait(size_t timeout_in_ns) { #ifdef TRACE - printf("waiting on condition_variable: 0x%" PRIxPTR "\n", - (uintptr_t)this); + printf("waiting on condition_variable: 0x%" PRIxPTR " for %d ns. \n", + (uintptr_t) this, (int) timeout_in_ns); #endif #if defined(__WIN32__) WaitForSingleObject(_event, INFINITE); #else pthread_mutex_lock(&_mutex); - pthread_cond_wait(&_cond, &_mutex); + // wait() automatically releases the mutex while it waits, and acquires + // it right before exiting. This allows signal() to acquire the mutex + // when signaling.) + if (timeout_in_ns == 0) { + pthread_cond_wait(&_cond, &_mutex); + } else { + timeval time_val; + gettimeofday(&time_val, NULL); + timespec time_spec; + time_spec.tv_sec = time_val.tv_sec + 0; + time_spec.tv_nsec = time_val.tv_usec * 1000 + timeout_in_ns; + pthread_cond_timedwait(&_cond, &_mutex, &time_spec); + } pthread_mutex_unlock(&_mutex); #endif #ifdef TRACE diff --git a/src/rt/sync/condition_variable.h b/src/rt/sync/condition_variable.h index f847ef99..f336a7f2 100644 --- a/src/rt/sync/condition_variable.h +++ b/src/rt/sync/condition_variable.h @@ -13,6 +13,7 @@ public: virtual ~condition_variable(); void wait(); + void timed_wait(size_t timeout_in_ns); void signal(); }; diff --git a/src/rt/sync/sync.cpp b/src/rt/sync/sync.cpp new file mode 100644 index 00000000..eba51a49 --- /dev/null +++ b/src/rt/sync/sync.cpp @@ -0,0 +1,12 @@ +#include "../globals.h" +#include "sync.h" + +void sync::yield() { +#ifdef __APPLE__ + pthread_yield_np(); +#elif __WIN32__ + +#else + pthread_yield(); +#endif +} diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h new file mode 100644 index 00000000..902b5661 --- /dev/null +++ b/src/rt/sync/sync.h @@ -0,0 +1,9 @@ +#ifndef SYNC_H +#define SYNC_H + +class sync { +public: + static void yield(); +}; + +#endif /* SYNC_H */ diff --git a/src/rt/util/array_list.h b/src/rt/util/array_list.h index e6ce55ab..929117f3 100644 --- a/src/rt/util/array_list.h +++ b/src/rt/util/array_list.h @@ -16,7 +16,7 @@ public: int32_t append(T value); int32_t push(T value); T pop(); - T replace(T old_value, T new_value); + bool replace(T old_value, T new_value); int32_t index_of(T value); bool is_empty(); T & operator[](size_t index); @@ -62,16 +62,16 @@ array_list<T>::pop() { /** * Replaces the old_value in the list with the new_value. - * Returns the old_value if the replacement succeeded, or NULL otherwise. + * Returns the true if the replacement succeeded, or false otherwise. */ -template<typename T> T +template<typename T> bool array_list<T>::replace(T old_value, T new_value) { int index = index_of(old_value); if (index < 0) { - return NULL; + return false; } _data[index] = new_value; - return old_value; + return true; } template<typename T> int32_t |