diff options
| -rw-r--r-- | src/boot/me/trans.ml | 20 | ||||
| -rw-r--r-- | src/rt/rust_chan.cpp | 27 | ||||
| -rw-r--r-- | src/rt/rust_chan.h | 2 | ||||
| -rw-r--r-- | src/rt/rust_dom.cpp | 18 | ||||
| -rw-r--r-- | src/rt/rust_message.cpp | 3 | ||||
| -rw-r--r-- | src/rt/rust_port.cpp | 24 | ||||
| -rw-r--r-- | src/rt/rust_port.h | 1 | ||||
| -rw-r--r-- | src/rt/rust_task.cpp | 5 | ||||
| -rw-r--r-- | src/rt/rust_upcall.cpp | 68 |
9 files changed, 113 insertions, 55 deletions
diff --git a/src/boot/me/trans.ml b/src/boot/me/trans.ml index b708bb26..97dce2b2 100644 --- a/src/boot/me/trans.ml +++ b/src/boot/me/trans.ml @@ -2932,6 +2932,7 @@ let trans_visitor (slot:Ast.slot) (curr_iso:Ast.ty_iso option) : unit = + check_and_flush_chan cell slot; drop_slot (get_ty_params_of_current_frame()) cell slot curr_iso and drop_ty_in_current_frame @@ -4188,6 +4189,25 @@ let trans_visitor let last_jumps = Array.map trans_arm at.Ast.alt_tag_arms in Array.iter patch last_jumps + (* If we're about to drop a channel, synthesize an upcall_flush_chan. + * TODO: This should rather appear in a chan dtor when chans become + * objects. *) + and check_and_flush_chan + (cell:Il.cell) + (slot:Ast.slot) + : unit = + let ty = strip_mutable_or_constrained_ty (slot_ty slot) in + match simplified_ty ty with + Ast.TY_chan _ -> + annotate "check_and_flush_chan, flush_chan"; + let rc = box_rc_cell cell in + emit (Il.cmp (Il.Cell rc) one); + let jump = mark () in + emit (Il.jmp Il.JNE Il.CodeNone); + trans_void_upcall "upcall_flush_chan" [| Il.Cell cell |]; + patch jump; + | _ -> () + and drop_slots_at_curr_stmt _ : unit = let stmt = Stack.top curr_stmt in match htab_search cx.ctxt_post_stmt_slot_drops stmt with diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index f107d287..2a0a61db 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -18,9 +18,11 @@ rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) : } rust_chan::~rust_chan() { - if (port && !port->is_proxy()) { - port->delegate()->chans.swap_delete(this); - } + task->log(rust_log::MEM | rust_log::COMM, + "del rust_chan(task=0x%" PRIxPTR ")", (uintptr_t) this); + + A(task->dom, is_associated() == false, + "Channel must be disassociated before being freed."); } /** @@ -28,7 +30,10 @@ rust_chan::~rust_chan() { */ void rust_chan::associate(maybe_proxy<rust_port> *port) { this->port = port; - if (!port->is_proxy()) { + if (port->is_proxy() == false) { + task->log(rust_log::TASK, + "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, + this, port); this->port->delegate()->chans.push(this); } } @@ -43,14 +48,23 @@ bool rust_chan::is_associated() { void rust_chan::disassociate() { A(task->dom, is_associated(), "Channel must be associated with a port."); + if (port->is_proxy() == false) { + task->log(rust_log::TASK, + "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, + this, port->delegate()); + port->delegate()->chans.swap_delete(this); + } + // Delete reference to the port. port = NULL; } /** - * Attempt to transmit channel data to the associated port. + * Attempt to send data to the associated port. */ -void rust_chan::transmit() { +void rust_chan::send(void *sptr) { + buffer.enqueue(sptr); + rust_dom *dom = task->dom; if (!is_associated()) { W(dom, is_associated(), @@ -81,7 +95,6 @@ void rust_chan::transmit() { return; } - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 055e359a..6aa98247 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -17,7 +17,7 @@ public: void disassociate(); bool is_associated(); - void transmit(); + void send(void *sptr); }; // diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index b66fca82..af147252 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -237,7 +237,6 @@ rust_dom::reap_dead_tasks() { rust_task *task = dead_tasks[i]; if (task->ref_count == 0) { I(this, task->tasks_waiting_to_join.is_empty()); - dead_tasks.swap_delete(task); log(rust_log::TASK, "deleting unreferenced dead task 0x%" PRIxPTR, task); @@ -392,10 +391,9 @@ rust_dom::start_main_loop() // if progress is made in other domains. if (scheduled_task == NULL) { - log(rust_log::TASK, - "all tasks are blocked, waiting for progress ..."); - if (_log.is_tracing(rust_log::TASK)) + if (_log.is_tracing(rust_log::TASK)) { log_state(); + } log(rust_log::TASK, "all tasks are blocked, scheduler yielding ..."); sync::yield(); @@ -437,18 +435,6 @@ rust_dom::start_main_loop() log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ..."); while (dead_tasks.length() > 0) { - log(rust_log::DOM, - "waiting for %d dead tasks to become dereferenced ...", - dead_tasks.length()); - - if (_log.is_tracing(rust_log::DOM)) { - for (size_t i = 0; i < dead_tasks.length(); i++) { - log(rust_log::DOM, - "task: 0x%" PRIxPTR ", index: %d, ref_count: %d", - dead_tasks[i], i, dead_tasks[i]->ref_count); - } - } - if (_incoming_message_queue.is_empty()) { log(rust_log::DOM, "waiting for %d dead tasks to become dereferenced, " diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp index 8b396b4d..1de804c9 100644 --- a/src/rt/rust_message.cpp +++ b/src/rt/rust_message.cpp @@ -90,8 +90,7 @@ send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source, } void data_message::process() { - _port->remote_channel->buffer.enqueue(_buffer); - _port->remote_channel->transmit(); + _port->remote_channel->send(_buffer); _target->log(rust_log::COMM, "<=== received data via message ==="); } diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index bd9af6f9..c97b5d41 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -21,14 +21,32 @@ rust_port::~rust_port() { // Disassociate channels from this port. while (chans.is_empty() == false) { - chans.pop()->disassociate(); + rust_chan *chan = chans.peek(); + chan->disassociate(); } - // We're the only ones holding a reference to the remote channel, so - // clean it up. delete remote_channel; } +bool rust_port::receive(void *dptr) { + for (uint32_t i = 0; i < chans.length(); i++) { + rust_chan *chan = chans[i]; + if (chan->buffer.is_empty() == false) { + chan->buffer.dequeue(dptr); + if (chan->buffer.is_empty() && chan->task->blocked()) { + task->log(rust_log::COMM, + "chan: 0x%" PRIxPTR + " is flushing, wakeup task: 0x%" PRIxPTR, + chan, chan->task); + chan->task->wakeup(this); + } + task->log(rust_log::COMM, "<=== read data ==="); + return true; + } + } + return false; +} + void rust_port::log_state() { task->log(rust_log::COMM, "rust_port: 0x%" PRIxPTR ", associated channel(s): %d", diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 3330701a..7a58f839 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -16,6 +16,7 @@ public: rust_port(rust_task *task, size_t unit_sz); ~rust_port(); void log_state(); + bool receive(void *dptr); }; // diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index c8831e8e..f52db868 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -323,6 +323,11 @@ get_callee_save_fp(uintptr_t *top_of_callee_saves) void rust_task::kill() { + if (dead()) { + // Task is already dead, can't kill what's already dead. + return; + } + // Note the distinction here: kill() is when you're in an upcall // from task A and want to force-fail task B, you do B->kill(). // If you want to fail yourself you do self->fail(upcall_nargs). diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 5040a157..039aa2fd 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -98,26 +98,50 @@ upcall_new_chan(rust_task *task, rust_port *port) { } /** + * Called whenever this channel needs to be flushed. This can happen due to a + * flush statement, or automatically whenever a channel's ref count is + * about to drop to zero. + */ +extern "C" CDECL void +upcall_flush_chan(rust_task *task, rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + rust_dom *dom = task->dom; + task->log(rust_log::UPCALL | rust_log::COMM, + "flush chan: 0x%" PRIxPTR, chan); + + if (chan->buffer.is_empty()) { + return; + } + + A(dom, chan->port->is_proxy() == false, + "Channels to remote ports should be flushed automatically."); + + // Block on the port until this channel has been completely drained + // by the port. + task->block(chan->port); + task->yield(2); +} + +/** * Called whenever the channel's ref count drops to zero. + * + * Cannot Yield: If the task were to unwind, the dropped ref would still + * appear to be live, causing modify-after-free errors. */ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); - I(dom, !chan->ref_count); - - if (!chan->buffer.is_empty() && chan->is_associated()) { - A(dom, !chan->port->is_proxy(), - "Channels to remote ports should be flushed automatically."); - // A target port may still be reading from this channel. - // Block on this channel until it has been completely drained - // by the port. - task->block(chan); - task->yield(2); - return; - } + A(task->dom, chan->ref_count == 0, + "Channel's ref count should be zero."); + + if (chan->is_associated()) { + A(task->dom, chan->buffer.is_empty(), + "Channel's buffer should be empty."); + chan->disassociate(); + } delete chan; } @@ -183,8 +207,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) { "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d", (uintptr_t) chan, (uintptr_t) sptr, chan->port->delegate()->unit_sz); - chan->buffer.enqueue(sptr); - chan->transmit(); + chan->send(sptr); task->log(rust_log::COMM, "=== sent data ===>"); } @@ -197,17 +220,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, port->chans.length()); - for (uint32_t i = 0; i < port->chans.length(); i++) { - rust_chan *chan = port->chans[i]; - if (chan->buffer.is_empty() == false) { - chan->buffer.dequeue(dptr); - if (chan->buffer.is_empty() && chan->task->blocked()) { - chan->task->wakeup(chan); - delete chan; - } - task->log(rust_log::COMM, "<=== read data ==="); - return; - } + if (port->receive(dptr)) { + return; } // No data was buffered on any incoming channel, so block this task @@ -260,6 +274,8 @@ upcall_exit(rust_task *task) { LOG_UPCALL_ENTRY(task); task->log(rust_log::UPCALL | rust_log::TASK, "task ref_count: %d", task->ref_count); + A(task->dom, task->ref_count >= 0, + "Task ref_count should not be negative on exit!"); task->die(); task->notify_tasks_waiting_to_join(); task->yield(1); |