diff options
Diffstat (limited to 'src/rt')
| -rw-r--r-- | src/rt/circular_buffer.cpp | 4 | ||||
| -rw-r--r-- | src/rt/globals.h | 2 | ||||
| -rw-r--r-- | src/rt/rust.cpp | 8 | ||||
| -rw-r--r-- | src/rt/rust_chan.cpp | 27 | ||||
| -rw-r--r-- | src/rt/rust_chan.h | 2 | ||||
| -rw-r--r-- | src/rt/rust_dom.cpp | 81 | ||||
| -rw-r--r-- | src/rt/rust_dom.h | 4 | ||||
| -rw-r--r-- | src/rt/rust_internal.h | 4 | ||||
| -rw-r--r-- | src/rt/rust_log.cpp | 1 | ||||
| -rw-r--r-- | src/rt/rust_message.cpp | 3 | ||||
| -rw-r--r-- | src/rt/rust_port.cpp | 40 | ||||
| -rw-r--r-- | src/rt/rust_port.h | 2 | ||||
| -rw-r--r-- | src/rt/rust_task.cpp | 13 | ||||
| -rw-r--r-- | src/rt/rust_upcall.cpp | 96 | ||||
| -rw-r--r-- | src/rt/rust_util.h | 26 | ||||
| -rw-r--r-- | src/rt/sync/condition_variable.cpp | 28 | ||||
| -rw-r--r-- | src/rt/sync/condition_variable.h | 1 | ||||
| -rw-r--r-- | src/rt/sync/sync.cpp | 12 | ||||
| -rw-r--r-- | src/rt/sync/sync.h | 9 | ||||
| -rw-r--r-- | src/rt/util/array_list.h | 10 |
20 files changed, 268 insertions, 105 deletions
diff --git a/src/rt/circular_buffer.cpp b/src/rt/circular_buffer.cpp index a93f2572..dbf5059b 100644 --- a/src/rt/circular_buffer.cpp +++ b/src/rt/circular_buffer.cpp @@ -31,9 +31,7 @@ circular_buffer::circular_buffer(rust_dom *dom, size_t unit_sz) : } circular_buffer::~circular_buffer() { - dom->log(rust_log::MEM | rust_log::COMM, - "~circular_buffer 0x%" PRIxPTR, - this); + dom->log(rust_log::MEM, "~circular_buffer 0x%" PRIxPTR, this); I(dom, _buffer); W(dom, _unread == 0, "~circular_buffer with unread messages."); dom->free(_buffer); diff --git a/src/rt/globals.h b/src/rt/globals.h index f8025a40..85aa5860 100644 --- a/src/rt/globals.h +++ b/src/rt/globals.h @@ -20,12 +20,14 @@ extern "C" { } #elif defined(__GNUC__) #include <unistd.h> +#include <sys/time.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <dlfcn.h> #include <pthread.h> #include <errno.h> +#include <time.h> #else #error "Platform not supported." #endif diff --git a/src/rt/rust.cpp b/src/rt/rust.cpp index 82a19cbc..d0215edc 100644 --- a/src/rt/rust.cpp +++ b/src/rt/rust.cpp @@ -53,7 +53,9 @@ rust_srv::realloc(void *p, size_t bytes) } void * val = ::realloc(p, bytes); #ifdef TRACK_ALLOCATIONS - if (allocation_list.replace(p, val) == NULL) { + if (allocation_list.replace(p, val) == false) { + printf("realloc: ptr 0x%" PRIxPTR " is not in allocation_list\n", + (uintptr_t) p); fatal("not in allocation_list", __FILE__, __LINE__); } #endif @@ -64,8 +66,8 @@ void rust_srv::free(void *p) { #ifdef TRACK_ALLOCATIONS - if (allocation_list.replace(p, NULL) == NULL) { - printf("ptr 0x%" PRIxPTR " is not in allocation_list\n", + if (allocation_list.replace(p, NULL) == false) { + printf("free: ptr 0x%" PRIxPTR " is not in allocation_list\n", (uintptr_t) p); fatal("not in allocation_list", __FILE__, __LINE__); } diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index f107d287..2a0a61db 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -18,9 +18,11 @@ rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) : } rust_chan::~rust_chan() { - if (port && !port->is_proxy()) { - port->delegate()->chans.swap_delete(this); - } + task->log(rust_log::MEM | rust_log::COMM, + "del rust_chan(task=0x%" PRIxPTR ")", (uintptr_t) this); + + A(task->dom, is_associated() == false, + "Channel must be disassociated before being freed."); } /** @@ -28,7 +30,10 @@ rust_chan::~rust_chan() { */ void rust_chan::associate(maybe_proxy<rust_port> *port) { this->port = port; - if (!port->is_proxy()) { + if (port->is_proxy() == false) { + task->log(rust_log::TASK, + "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, + this, port); this->port->delegate()->chans.push(this); } } @@ -43,14 +48,23 @@ bool rust_chan::is_associated() { void rust_chan::disassociate() { A(task->dom, is_associated(), "Channel must be associated with a port."); + if (port->is_proxy() == false) { + task->log(rust_log::TASK, + "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, + this, port->delegate()); + port->delegate()->chans.swap_delete(this); + } + // Delete reference to the port. port = NULL; } /** - * Attempt to transmit channel data to the associated port. + * Attempt to send data to the associated port. */ -void rust_chan::transmit() { +void rust_chan::send(void *sptr) { + buffer.enqueue(sptr); + rust_dom *dom = task->dom; if (!is_associated()) { W(dom, is_associated(), @@ -81,7 +95,6 @@ void rust_chan::transmit() { return; } - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 055e359a..6aa98247 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -17,7 +17,7 @@ public: void disassociate(); bool is_associated(); - void transmit(); + void send(void *sptr); }; // diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index 004a1027..f7b8e97b 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -4,6 +4,8 @@ template class ptr_vec<rust_task>; +// Keeps track of all live domains, for debugging purposes. +array_list<rust_dom*> _live_domains; rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate, const char *name) : @@ -28,13 +30,18 @@ rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate, pthread_attr_setdetachstate(&attr, true); #endif root_task = new (this) rust_task(this, NULL, name); + + if (_live_domains.replace(NULL, this) == false) { + _live_domains.append(this); + } } static void del_all_tasks(rust_dom *dom, ptr_vec<rust_task> *v) { I(dom, v); while (v->length()) { - dom->log(rust_log::TASK, "deleting task %" PRIdPTR, v->length() - 1); + dom->log(rust_log::TASK, "deleting task 0x%" PRIdPTR, + v->length() - 1); delete v->pop(); } } @@ -45,7 +52,7 @@ rust_dom::delete_proxies() { rust_proxy<rust_task> *task_proxy; while (_task_proxies.pop(&task, &task_proxy)) { log(rust_log::TASK, - "deleting proxy %" PRIxPTR " in dom %s @0x%" PRIxPTR, + "deleting proxy 0x%" PRIxPTR " in dom %s 0x%" PRIxPTR, task_proxy, task_proxy->dom->name, task_proxy->dom); delete task_proxy; } @@ -54,7 +61,7 @@ rust_dom::delete_proxies() { rust_proxy<rust_port> *port_proxy; while (_port_proxies.pop(&port, &port_proxy)) { log(rust_log::TASK, - "deleting proxy %" PRIxPTR " in dom %s @0x%" PRIxPTR, + "deleting proxy 0x%" PRIxPTR " in dom %s 0x%" PRIxPTR, port_proxy, port_proxy->dom->name, port_proxy->dom); delete port_proxy; } @@ -77,6 +84,8 @@ rust_dom::~rust_dom() { #endif while (caches.length()) delete caches.pop(); + + _live_domains.replace(this, NULL); } void @@ -232,7 +241,6 @@ rust_dom::reap_dead_tasks() { rust_task *task = dead_tasks[i]; if (task->ref_count == 0) { I(this, task->tasks_waiting_to_join.is_empty()); - dead_tasks.swap_delete(task); log(rust_log::TASK, "deleting unreferenced dead task %s @0x%" PRIxPTR, @@ -250,14 +258,14 @@ rust_dom::reap_dead_tasks() { */ void rust_dom::send_message(rust_message *message) { log(rust_log::COMM, "==> enqueueing \"%s\" 0x%" PRIxPTR - " in queue 0x%" PRIxPTR, + " in queue 0x%" PRIxPTR + " in domain 0x%" PRIxPTR, message->label, message, - &_incoming_message_queue); + &_incoming_message_queue, + this); A(this, message->dom == this, "Message owned by non-local domain."); _incoming_message_queue.enqueue(message); - _incoming_message_pending.signal(); - _progress.signal(); } /** @@ -329,6 +337,13 @@ rust_dom::schedule_task() } void +rust_dom::log_all_state() { + for (uint32_t i = 0; i < _live_domains.size(); i++) { + _live_domains[i]->log_state(); + } +} + +void rust_dom::log_state() { if (!running_tasks.is_empty()) { log(rust_log::TASK, "running tasks:"); @@ -352,8 +367,9 @@ rust_dom::log_state() { if (!dead_tasks.is_empty()) { log(rust_log::TASK, "dead tasks:"); for (size_t i = 0; i < dead_tasks.length(); i++) { - log(rust_log::TASK, "\t task: %s @0x%" PRIxPTR, - dead_tasks[i]->name, dead_tasks[i]); + log(rust_log::TASK, "\t task: %s 0x%" PRIxPTR ", ref_count: %d", + dead_tasks[i], dead_tasks[i]->name, + dead_tasks[i]->ref_count); } } } @@ -384,22 +400,29 @@ rust_dom::start_main_loop() // if progress is made in other domains. if (scheduled_task == NULL) { - log(rust_log::TASK, - "all tasks are blocked, waiting for progress ..."); - if (_log.is_tracing(rust_log::TASK)) + if (_log.is_tracing(rust_log::TASK)) { log_state(); - _progress.wait(); + } log(rust_log::TASK, - "progress made, resuming ..."); + "all tasks are blocked, scheduler yielding ..."); + sync::yield(); + log(rust_log::TASK, + "scheduler resuming ..."); continue; } I(this, scheduled_task->running()); log(rust_log::TASK, - "activating task %s @0x%" PRIxPTR ", sp=0x%" PRIxPTR, - scheduled_task->name, (uintptr_t)scheduled_task, - scheduled_task->rust_sp); + "activating task %s 0x%" PRIxPTR + ", sp=0x%" PRIxPTR + ", ref_count=%d" + ", state: %s", + scheduled_task->name, + (uintptr_t)scheduled_task, + scheduled_task->rust_sp, + scheduled_task->ref_count, + scheduled_task->state_str()); interrupt_flag = 0; @@ -423,21 +446,15 @@ rust_dom::start_main_loop() log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ..."); while (dead_tasks.length() > 0) { - log(rust_log::DOM, - "waiting for %d dead tasks to become dereferenced ...", - dead_tasks.length()); - - if (_log.is_tracing(rust_log::DOM)) { - for (size_t i = 0; i < dead_tasks.length(); i++) { - log(rust_log::DOM, - "task: %s @0x%" PRIxPTR ", index: %d, ref_count: %d", - dead_tasks[i]->name, dead_tasks[i], i, - dead_tasks[i]->ref_count); - } - } - if (_incoming_message_queue.is_empty()) { - _incoming_message_pending.wait(); + log(rust_log::DOM, + "waiting for %d dead tasks to become dereferenced, " + "scheduler yielding ...", + dead_tasks.length()); + if (_log.is_tracing(rust_log::TASK)) { + log_state(); + } + sync::yield(); } else { drain_incoming_message_queue(); } diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h index abf10cef..3aaf635a 100644 --- a/src/rt/rust_dom.h +++ b/src/rt/rust_dom.h @@ -35,13 +35,10 @@ struct rust_dom rust_task *curr_task; int rval; - condition_variable _progress; - hash_map<rust_task *, rust_proxy<rust_task> *> _task_proxies; hash_map<rust_port *, rust_proxy<rust_port> *> _port_proxies; // Incoming messages from other domains. - condition_variable _incoming_message_pending; lock_free_queue _incoming_message_queue; #ifndef __WIN32__ @@ -87,6 +84,7 @@ struct rust_dom int start_main_loop(); void log_state(); + static void log_all_state(); }; // diff --git a/src/rt/rust_internal.h b/src/rt/rust_internal.h index a89144d7..46ea843f 100644 --- a/src/rt/rust_internal.h +++ b/src/rt/rust_internal.h @@ -38,6 +38,7 @@ extern "C" { #error "Platform not supported." #endif +#include "sync/sync.h" #include "sync/condition_variable.h" #ifndef __i386__ @@ -75,7 +76,7 @@ template <typename T> struct rc_base { - size_t ref_count; + int32_t ref_count; void ref() { ++ref_count; @@ -151,6 +152,7 @@ public: T *& operator[](size_t offset); void push(T *p); T *pop(); + T *peek(); void trim(size_t fill); void swap_delete(T* p); }; diff --git a/src/rt/rust_log.cpp b/src/rt/rust_log.cpp index 48fcd0f1..8e089791 100644 --- a/src/rt/rust_log.cpp +++ b/src/rt/rust_log.cpp @@ -27,6 +27,7 @@ read_type_bit_mask() { bits |= strstr(env_str, "timer") ? rust_log::TIMER : 0; bits |= strstr(env_str, "gc") ? rust_log::GC : 0; bits |= strstr(env_str, "all") ? rust_log::ALL : 0; + bits = strstr(env_str, "none") ? 0 : bits; } return bits; } diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp index 8b396b4d..1de804c9 100644 --- a/src/rt/rust_message.cpp +++ b/src/rt/rust_message.cpp @@ -90,8 +90,7 @@ send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source, } void data_message::process() { - _port->remote_channel->buffer.enqueue(_buffer); - _port->remote_channel->transmit(); + _port->remote_channel->send(_buffer); _target->log(rust_log::COMM, "<=== received data via message ==="); } diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index 0a5b7ee7..c97b5d41 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -17,16 +17,50 @@ rust_port::~rust_port() { task->log(rust_log::COMM | rust_log::MEM, "~rust_port 0x%" PRIxPTR, (uintptr_t) this); + log_state(); + // Disassociate channels from this port. while (chans.is_empty() == false) { - chans.pop()->disassociate(); + rust_chan *chan = chans.peek(); + chan->disassociate(); } - // We're the only ones holding a reference to the remote channel, so - // clean it up. delete remote_channel; } +bool rust_port::receive(void *dptr) { + for (uint32_t i = 0; i < chans.length(); i++) { + rust_chan *chan = chans[i]; + if (chan->buffer.is_empty() == false) { + chan->buffer.dequeue(dptr); + if (chan->buffer.is_empty() && chan->task->blocked()) { + task->log(rust_log::COMM, + "chan: 0x%" PRIxPTR + " is flushing, wakeup task: 0x%" PRIxPTR, + chan, chan->task); + chan->task->wakeup(this); + } + task->log(rust_log::COMM, "<=== read data ==="); + return true; + } + } + return false; +} + +void rust_port::log_state() { + task->log(rust_log::COMM, + "rust_port: 0x%" PRIxPTR ", associated channel(s): %d", + this, chans.length()); + for (uint32_t i = 0; i < chans.length(); i++) { + rust_chan *chan = chans[i]; + task->log(rust_log::COMM, + "\tchan: 0x%" PRIxPTR ", data pending: %s, remote: %s", + chan, + !chan->buffer.is_empty() ? "yes" : "no", + chan == remote_channel ? "yes" : "no"); + } +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 49a89437..7a58f839 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -15,6 +15,8 @@ public: rust_port(rust_task *task, size_t unit_sz); ~rust_port(); + void log_state(); + bool receive(void *dptr); }; // diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 279850cb..aca8bca7 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -324,6 +324,11 @@ get_callee_save_fp(uintptr_t *top_of_callee_saves) void rust_task::kill() { + if (dead()) { + // Task is already dead, can't kill what's already dead. + return; + } + // Note the distinction here: kill() is when you're in an upcall // from task A and want to force-fail task B, you do B->kill(). // If you want to fail yourself you do self->fail(upcall_nargs). @@ -334,6 +339,7 @@ rust_task::kill() { if (this == dom->root_task) dom->fail(); + log(rust_log::TASK, "preparing to unwind task: 0x%" PRIxPTR, this); run_on_resume(dom->root_crate->get_unwind_glue()); } @@ -519,6 +525,10 @@ rust_task::free(void *p, bool is_gc) } } +const char * +rust_task::state_str() { + return dom->state_vec_name(state); +} void rust_task::transition(ptr_vec<rust_task> *src, ptr_vec<rust_task> *dst) @@ -556,9 +566,6 @@ rust_task::wakeup(rust_cond *from) A(dom, cond == from, "Cannot wake up blocked task on wrong condition."); transition(&dom->blocked_tasks, &dom->running_tasks); - // TODO: Signaling every time the task is awaken is kind of silly, - // do this a nicer way. - dom->_progress.signal(); I(dom, cond == from); cond = NULL; } diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index b0757e29..90d6f6d9 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -6,11 +6,13 @@ #define LOG_UPCALL_ENTRY(task) \ (task)->dom->get_log().reset_indent(0); \ (task)->log(rust_log::UPCALL, \ - "> UPCALL %s - task: %s @0x%" PRIxPTR \ - " retpc: x%" PRIxPTR, \ + "> UPCALL %s - task: %s 0x%" PRIxPTR \ + " retpc: x%" PRIxPTR \ + " ref_count: %d", \ __FUNCTION__, \ (task)->name, (task), \ - __builtin_return_address(0)); \ + __builtin_return_address(0), \ + (task->ref_count)); \ (task)->dom->get_log().indent(); #else #define LOG_UPCALL_ENTRY(task) \ @@ -21,6 +23,20 @@ (task)->dom->get_log().indent(); #endif +void +log_task_state(rust_task *task, maybe_proxy<rust_task> *target) { + rust_task *delegate = target->delegate(); + if (target->is_proxy()) { + task->log(rust_log::TASK, + "remote task: 0x%" PRIxPTR ", ref_count: %d state: %s", + delegate, delegate->ref_count, delegate->state_str()); + } else { + task->log(rust_log::TASK, + "local task: 0x%" PRIxPTR ", ref_count: %d state: %s", + delegate, delegate->ref_count, delegate->state_str()); + } +} + extern "C" CDECL char const *str_buf(rust_task *task, rust_str *s); extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { @@ -31,14 +47,13 @@ extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { extern "C" CDECL void upcall_log_int(rust_task *task, int32_t i) { LOG_UPCALL_ENTRY(task); task->log(rust_log::UPCALL | rust_log::ULOG, - "upcall log_int(0x%" PRIx32 " = %" PRId32 " = '%c')", i, i, - (char) i); + "rust: %" PRId32 " (0x%" PRIx32 ")", i, i); } extern "C" CDECL void upcall_log_str(rust_task *task, rust_str *str) { LOG_UPCALL_ENTRY(task); const char *c = str_buf(task, str); - task->log(rust_log::UPCALL | rust_log::ULOG, "upcall log_str(\"%s\")", c); + task->log(rust_log::UPCALL | rust_log::ULOG, "rust: %s", c); } extern "C" CDECL void upcall_trace_word(rust_task *task, uintptr_t i) { @@ -86,26 +101,50 @@ upcall_new_chan(rust_task *task, rust_port *port) { } /** + * Called whenever this channel needs to be flushed. This can happen due to a + * flush statement, or automatically whenever a channel's ref count is + * about to drop to zero. + */ +extern "C" CDECL void +upcall_flush_chan(rust_task *task, rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + rust_dom *dom = task->dom; + task->log(rust_log::UPCALL | rust_log::COMM, + "flush chan: 0x%" PRIxPTR, chan); + + if (chan->buffer.is_empty()) { + return; + } + + A(dom, chan->port->is_proxy() == false, + "Channels to remote ports should be flushed automatically."); + + // Block on the port until this channel has been completely drained + // by the port. + task->block(chan->port); + task->yield(2); +} + +/** * Called whenever the channel's ref count drops to zero. + * + * Cannot Yield: If the task were to unwind, the dropped ref would still + * appear to be live, causing modify-after-free errors. */ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); - I(dom, !chan->ref_count); - - if (!chan->buffer.is_empty() && chan->is_associated()) { - A(dom, !chan->port->is_proxy(), - "Channels to remote ports should be flushed automatically."); - // A target port may still be reading from this channel. - // Block on this channel until it has been completely drained - // by the port. - task->block(chan); - task->yield(2); - return; - } + A(task->dom, chan->ref_count == 0, + "Channel's ref count should be zero."); + + if (chan->is_associated()) { + A(task->dom, chan->buffer.is_empty(), + "Channel's buffer should be empty."); + chan->disassociate(); + } delete chan; } @@ -171,8 +210,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) { "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d", (uintptr_t) chan, (uintptr_t) sptr, chan->port->delegate()->unit_sz); - chan->buffer.enqueue(sptr); - chan->transmit(); + chan->send(sptr); task->log(rust_log::COMM, "=== sent data ===>"); } @@ -185,17 +223,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, port->chans.length()); - for (uint32_t i = 0; i < port->chans.length(); i++) { - rust_chan *chan = port->chans[i]; - if (chan->buffer.is_empty() == false) { - chan->buffer.dequeue(dptr); - if (chan->buffer.is_empty() && chan->task->blocked()) { - chan->task->wakeup(chan); - delete chan; - } - task->log(rust_log::COMM, "<=== read data ==="); - return; - } + if (port->receive(dptr)) { + return; } // No data was buffered on any incoming channel, so block this task @@ -222,6 +251,7 @@ extern "C" CDECL void upcall_fail(rust_task *task, char const *expr, extern "C" CDECL void upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) { LOG_UPCALL_ENTRY(task); + log_task_state(task, target); rust_task *target_task = target->delegate(); task->log(rust_log::UPCALL | rust_log::TASK, @@ -247,6 +277,8 @@ upcall_exit(rust_task *task) { LOG_UPCALL_ENTRY(task); task->log(rust_log::UPCALL | rust_log::TASK, "task ref_count: %d", task->ref_count); + A(task->dom, task->ref_count >= 0, + "Task ref_count should not be negative on exit!"); task->die(); task->notify_tasks_waiting_to_join(); task->yield(1); diff --git a/src/rt/rust_util.h b/src/rt/rust_util.h index 42082119..03b7766d 100644 --- a/src/rt/rust_util.h +++ b/src/rt/rust_util.h @@ -70,6 +70,13 @@ ptr_vec<T>::pop() } template <typename T> +T * +ptr_vec<T>::peek() +{ + return data[fill - 1]; +} + +template <typename T> void ptr_vec<T>::trim(size_t sz) { @@ -140,11 +147,20 @@ isaac_init(rust_dom *dom, randctx *rctx) CryptReleaseContext(hProv, 0)); } #else - int fd = open("/dev/urandom", O_RDONLY); - I(dom, fd > 0); - I(dom, read(fd, (void*) &rctx->randrsl, sizeof(rctx->randrsl)) - == sizeof(rctx->randrsl)); - I(dom, close(fd) == 0); + char *rust_seed = getenv("RUST_SEED"); + if (rust_seed != NULL) { + ub4 seed = (ub4) atoi(rust_seed); + for (size_t i = 0; i < RANDSIZ; i ++) { + memcpy(&rctx->randrsl[i], &seed, sizeof(ub4)); + seed = (seed + 0x7ed55d16) + (seed << 12); + } + } else { + int fd = open("/dev/urandom", O_RDONLY); + I(dom, fd > 0); + I(dom, read(fd, (void*) &rctx->randrsl, sizeof(rctx->randrsl)) + == sizeof(rctx->randrsl)); + I(dom, close(fd) == 0); + } #endif randinit(rctx, 1); } diff --git a/src/rt/sync/condition_variable.cpp b/src/rt/sync/condition_variable.cpp index d4032572..c34ab7f4 100644 --- a/src/rt/sync/condition_variable.cpp +++ b/src/rt/sync/condition_variable.cpp @@ -9,14 +9,16 @@ // #define TRACE -condition_variable::condition_variable() { #if defined(__WIN32__) +condition_variable::condition_variable() { _event = CreateEvent(NULL, FALSE, FALSE, NULL); +} #else +condition_variable::condition_variable() { pthread_cond_init(&_cond, NULL); pthread_mutex_init(&_mutex, NULL); -#endif } +#endif condition_variable::~condition_variable() { #if defined(__WIN32__) @@ -31,15 +33,31 @@ condition_variable::~condition_variable() { * Wait indefinitely until condition is signaled. */ void condition_variable::wait() { + timed_wait(0); +} + +void condition_variable::timed_wait(size_t timeout_in_ns) { #ifdef TRACE - printf("waiting on condition_variable: 0x%" PRIxPTR "\n", - (uintptr_t)this); + printf("waiting on condition_variable: 0x%" PRIxPTR " for %d ns. \n", + (uintptr_t) this, (int) timeout_in_ns); #endif #if defined(__WIN32__) WaitForSingleObject(_event, INFINITE); #else pthread_mutex_lock(&_mutex); - pthread_cond_wait(&_cond, &_mutex); + // wait() automatically releases the mutex while it waits, and acquires + // it right before exiting. This allows signal() to acquire the mutex + // when signaling.) + if (timeout_in_ns == 0) { + pthread_cond_wait(&_cond, &_mutex); + } else { + timeval time_val; + gettimeofday(&time_val, NULL); + timespec time_spec; + time_spec.tv_sec = time_val.tv_sec + 0; + time_spec.tv_nsec = time_val.tv_usec * 1000 + timeout_in_ns; + pthread_cond_timedwait(&_cond, &_mutex, &time_spec); + } pthread_mutex_unlock(&_mutex); #endif #ifdef TRACE diff --git a/src/rt/sync/condition_variable.h b/src/rt/sync/condition_variable.h index f847ef99..f336a7f2 100644 --- a/src/rt/sync/condition_variable.h +++ b/src/rt/sync/condition_variable.h @@ -13,6 +13,7 @@ public: virtual ~condition_variable(); void wait(); + void timed_wait(size_t timeout_in_ns); void signal(); }; diff --git a/src/rt/sync/sync.cpp b/src/rt/sync/sync.cpp new file mode 100644 index 00000000..eba51a49 --- /dev/null +++ b/src/rt/sync/sync.cpp @@ -0,0 +1,12 @@ +#include "../globals.h" +#include "sync.h" + +void sync::yield() { +#ifdef __APPLE__ + pthread_yield_np(); +#elif __WIN32__ + +#else + pthread_yield(); +#endif +} diff --git a/src/rt/sync/sync.h b/src/rt/sync/sync.h new file mode 100644 index 00000000..902b5661 --- /dev/null +++ b/src/rt/sync/sync.h @@ -0,0 +1,9 @@ +#ifndef SYNC_H +#define SYNC_H + +class sync { +public: + static void yield(); +}; + +#endif /* SYNC_H */ diff --git a/src/rt/util/array_list.h b/src/rt/util/array_list.h index e6ce55ab..929117f3 100644 --- a/src/rt/util/array_list.h +++ b/src/rt/util/array_list.h @@ -16,7 +16,7 @@ public: int32_t append(T value); int32_t push(T value); T pop(); - T replace(T old_value, T new_value); + bool replace(T old_value, T new_value); int32_t index_of(T value); bool is_empty(); T & operator[](size_t index); @@ -62,16 +62,16 @@ array_list<T>::pop() { /** * Replaces the old_value in the list with the new_value. - * Returns the old_value if the replacement succeeded, or NULL otherwise. + * Returns the true if the replacement succeeded, or false otherwise. */ -template<typename T> T +template<typename T> bool array_list<T>::replace(T old_value, T new_value) { int index = index_of(old_value); if (index < 0) { - return NULL; + return false; } _data[index] = new_value; - return old_value; + return true; } template<typename T> int32_t |