diff options
Diffstat (limited to 'src/rt/rust_upcall.cpp')
| -rw-r--r-- | src/rt/rust_upcall.cpp | 142 |
1 files changed, 89 insertions, 53 deletions
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 574cb703..90d6f6d9 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -6,19 +6,37 @@ #define LOG_UPCALL_ENTRY(task) \ (task)->dom->get_log().reset_indent(0); \ (task)->log(rust_log::UPCALL, \ - "> UPCALL %s - task: 0x%" PRIxPTR \ - " retpc: x%" PRIxPTR, \ + "> UPCALL %s - task: %s 0x%" PRIxPTR \ + " retpc: x%" PRIxPTR \ + " ref_count: %d", \ __FUNCTION__, \ - (task), __builtin_return_address(0)); \ + (task)->name, (task), \ + __builtin_return_address(0), \ + (task->ref_count)); \ (task)->dom->get_log().indent(); #else #define LOG_UPCALL_ENTRY(task) \ (task)->dom->get_log().reset_indent(0); \ (task)->log(rust_log::UPCALL, \ - "> UPCALL task: x%" PRIxPTR (task)); \ + "> UPCALL task: %s @x%" PRIxPTR, \ + (task)->name, (task)); \ (task)->dom->get_log().indent(); #endif +void +log_task_state(rust_task *task, maybe_proxy<rust_task> *target) { + rust_task *delegate = target->delegate(); + if (target->is_proxy()) { + task->log(rust_log::TASK, + "remote task: 0x%" PRIxPTR ", ref_count: %d state: %s", + delegate, delegate->ref_count, delegate->state_str()); + } else { + task->log(rust_log::TASK, + "local task: 0x%" PRIxPTR ", ref_count: %d state: %s", + delegate, delegate->ref_count, delegate->state_str()); + } +} + extern "C" CDECL char const *str_buf(rust_task *task, rust_str *s); extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { @@ -29,14 +47,13 @@ extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { extern "C" CDECL void upcall_log_int(rust_task *task, int32_t i) { LOG_UPCALL_ENTRY(task); task->log(rust_log::UPCALL | rust_log::ULOG, - "upcall log_int(0x%" PRIx32 " = %" PRId32 " = '%c')", i, i, - (char) i); + "rust: %" PRId32 " (0x%" PRIx32 ")", i, i); } extern "C" CDECL void upcall_log_str(rust_task *task, rust_str *str) { LOG_UPCALL_ENTRY(task); const char *c = str_buf(task, str); - task->log(rust_log::UPCALL | rust_log::ULOG, "upcall log_str(\"%s\")", c); + task->log(rust_log::UPCALL | rust_log::ULOG, "rust: %s", c); } extern "C" CDECL void upcall_trace_word(rust_task *task, uintptr_t i) { @@ -55,8 +72,8 @@ upcall_new_port(rust_task *task, size_t unit_sz) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, - "upcall_new_port(task=0x%" PRIxPTR ", unit_sz=%d)", - (uintptr_t) task, unit_sz); + "upcall_new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)", + (uintptr_t) task, task->name, unit_sz); return new (dom) rust_port(task, unit_sz); } @@ -76,33 +93,58 @@ upcall_new_chan(rust_task *task, rust_port *port) { LOG_UPCALL_ENTRY(task); rust_dom *dom = task->dom; task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, - "upcall_new_chan(task=0x%" PRIxPTR ", port=0x%" PRIxPTR ")", - (uintptr_t) task, port); + "upcall_new_chan(" + "task=0x%" PRIxPTR " (%s), port=0x%" PRIxPTR ")", + (uintptr_t) task, task->name, port); I(dom, port); return new (dom) rust_chan(task, port); } /** + * Called whenever this channel needs to be flushed. This can happen due to a + * flush statement, or automatically whenever a channel's ref count is + * about to drop to zero. + */ +extern "C" CDECL void +upcall_flush_chan(rust_task *task, rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + rust_dom *dom = task->dom; + task->log(rust_log::UPCALL | rust_log::COMM, + "flush chan: 0x%" PRIxPTR, chan); + + if (chan->buffer.is_empty()) { + return; + } + + A(dom, chan->port->is_proxy() == false, + "Channels to remote ports should be flushed automatically."); + + // Block on the port until this channel has been completely drained + // by the port. + task->block(chan->port); + task->yield(2); +} + +/** * Called whenever the channel's ref count drops to zero. + * + * Cannot Yield: If the task were to unwind, the dropped ref would still + * appear to be live, causing modify-after-free errors. */ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); - I(dom, !chan->ref_count); - - if (!chan->buffer.is_empty() && chan->is_associated()) { - A(dom, !chan->port->is_proxy(), - "Channels to remote ports should be flushed automatically."); - // A target port may still be reading from this channel. - // Block on this channel until it has been completely drained - // by the port. - task->block(chan); - task->yield(2); - return; - } + A(task->dom, chan->ref_count == 0, + "Channel's ref count should be zero."); + + if (chan->is_associated()) { + A(task->dom, chan->buffer.is_empty(), + "Channel's buffer should be empty."); + chan->disassociate(); + } delete chan; } @@ -136,11 +178,11 @@ extern "C" CDECL void upcall_yield(rust_task *task) { extern "C" CDECL void upcall_join(rust_task *task, maybe_proxy<rust_task> *target) { LOG_UPCALL_ENTRY(task); + rust_task *target_task = target->delegate(); task->log(rust_log::UPCALL | rust_log::COMM, - "target: 0x%" PRIxPTR ", task: 0x%" PRIxPTR, - target, target->delegate()); + "target: 0x%" PRIxPTR ", task: %s @0x%" PRIxPTR, + target, target_task->name, target_task); - rust_task *target_task = target->delegate(); if (target->is_proxy()) { notify_message:: send(notify_message::JOIN, "join", task, target->as_proxy()); @@ -168,8 +210,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) { "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d", (uintptr_t) chan, (uintptr_t) sptr, chan->port->delegate()->unit_sz); - chan->buffer.enqueue(sptr); - chan->transmit(); + chan->send(sptr); task->log(rust_log::COMM, "=== sent data ===>"); } @@ -182,17 +223,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, port->chans.length()); - for (uint32_t i = 0; i < port->chans.length(); i++) { - rust_chan *chan = port->chans[i]; - if (chan->buffer.is_empty() == false) { - chan->buffer.dequeue(dptr); - if (chan->buffer.is_empty() && chan->task->blocked()) { - chan->task->wakeup(chan); - delete chan; - } - task->log(rust_log::COMM, "<=== read data ==="); - return; - } + if (port->receive(dptr)) { + return; } // No data was buffered on any incoming channel, so block this task @@ -219,11 +251,12 @@ extern "C" CDECL void upcall_fail(rust_task *task, char const *expr, extern "C" CDECL void upcall_kill(rust_task *task, maybe_proxy<rust_task> *target) { LOG_UPCALL_ENTRY(task); + log_task_state(task, target); rust_task *target_task = target->delegate(); task->log(rust_log::UPCALL | rust_log::TASK, - "kill task 0x%" PRIxPTR ", ref count %d", - target_task, + "kill task %s @0x%" PRIxPTR ", ref count %d", + target_task->name, target_task, target_task->ref_count); if (target->is_proxy()) { @@ -244,6 +277,8 @@ upcall_exit(rust_task *task) { LOG_UPCALL_ENTRY(task); task->log(rust_log::UPCALL | rust_log::TASK, "task ref_count: %d", task->ref_count); + A(task->dom, task->ref_count >= 0, + "Task ref_count should not be negative on exit!"); task->die(); task->notify_tasks_waiting_to_join(); task->yield(1); @@ -498,14 +533,14 @@ static void *rust_thread_start(void *ptr) } extern "C" CDECL rust_task * -upcall_new_task(rust_task *spawner) { +upcall_new_task(rust_task *spawner, const char *name) { LOG_UPCALL_ENTRY(spawner); rust_dom *dom = spawner->dom; - rust_task *task = new (dom) rust_task(dom, spawner); + rust_task *task = new (dom) rust_task(dom, spawner, name); dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, - "upcall new_task(spawner 0x%" PRIxPTR ") = 0x%" PRIxPTR, - spawner, task); + "upcall new_task(spawner %s @0x%" PRIxPTR ", %s) = 0x%" PRIxPTR, + spawner->name, spawner, name, task); return task; } @@ -516,26 +551,27 @@ upcall_start_task(rust_task *spawner, rust_task *task, rust_dom *dom = spawner->dom; dom->log(rust_log::UPCALL | rust_log::MEM | rust_log::TASK, - "upcall start_task(task 0x%" PRIxPTR + "upcall start_task(task %s @0x%" PRIxPTR " exit_task_glue 0x%" PRIxPTR ", spawnee 0x%" PRIxPTR - ", callsz %" PRIdPTR ")", task, exit_task_glue, spawnee_fn, - callsz); + ", callsz %" PRIdPTR ")", task->name, task, exit_task_glue, + spawnee_fn, callsz); task->start(exit_task_glue, spawnee_fn, spawner->rust_sp, callsz); return task; } extern "C" CDECL maybe_proxy<rust_task> * -upcall_new_thread(rust_task *task) { +upcall_new_thread(rust_task *task, const char *name) { LOG_UPCALL_ENTRY(task); rust_dom *old_dom = task->dom; rust_dom *new_dom = new rust_dom(old_dom->srv->clone(), - old_dom->root_crate); + old_dom->root_crate, + name); task->log(rust_log::UPCALL | rust_log::MEM, - "upcall new_thread() = dom 0x%" PRIxPTR " task 0x%" PRIxPTR, - new_dom, new_dom->root_task); + "upcall new_thread(%s) = dom 0x%" PRIxPTR " task 0x%" PRIxPTR, + name, new_dom, new_dom->root_task); rust_proxy<rust_task> *proxy = new (old_dom) rust_proxy<rust_task>(old_dom, new_dom->root_task, true); |