diff options
| author | Michael Bebenita <[email protected]> | 2010-08-09 08:15:34 -0700 |
|---|---|---|
| committer | Michael Bebenita <[email protected]> | 2010-08-09 08:15:34 -0700 |
| commit | 97d6342bf08e55f8d2b4f8df5c4b5a099df0191c (patch) | |
| tree | bfed15fefbc032deba1c34908f25c1562d88aa6b | |
| parent | Fixed deadlock in the scheduler caused by condition variables. (diff) | |
| download | rust-97d6342bf08e55f8d2b4f8df5c4b5a099df0191c.tar.xz rust-97d6342bf08e55f8d2b4f8df5c4b5a099df0191c.zip | |
Synthesize a flush_chan upcall right before a channel's ref_count drops to zero. This should only happen in the Rust code and not in the drop glue, or on the unwind path. This change allows the task owning the channel to block on a flush and delete its own channel. This change also cleans up some code around rust_port and rust_chan.
| -rw-r--r-- | src/boot/me/trans.ml | 20 | ||||
| -rw-r--r-- | src/rt/rust_chan.cpp | 27 | ||||
| -rw-r--r-- | src/rt/rust_chan.h | 2 | ||||
| -rw-r--r-- | src/rt/rust_dom.cpp | 18 | ||||
| -rw-r--r-- | src/rt/rust_message.cpp | 3 | ||||
| -rw-r--r-- | src/rt/rust_port.cpp | 24 | ||||
| -rw-r--r-- | src/rt/rust_port.h | 1 | ||||
| -rw-r--r-- | src/rt/rust_task.cpp | 5 | ||||
| -rw-r--r-- | src/rt/rust_upcall.cpp | 68 |
9 files changed, 113 insertions, 55 deletions
diff --git a/src/boot/me/trans.ml b/src/boot/me/trans.ml index b708bb26..97dce2b2 100644 --- a/src/boot/me/trans.ml +++ b/src/boot/me/trans.ml @@ -2932,6 +2932,7 @@ let trans_visitor (slot:Ast.slot) (curr_iso:Ast.ty_iso option) : unit = + check_and_flush_chan cell slot; drop_slot (get_ty_params_of_current_frame()) cell slot curr_iso and drop_ty_in_current_frame @@ -4188,6 +4189,25 @@ let trans_visitor let last_jumps = Array.map trans_arm at.Ast.alt_tag_arms in Array.iter patch last_jumps + (* If we're about to drop a channel, synthesize an upcall_flush_chan. + * TODO: This should rather appear in a chan dtor when chans become + * objects. *) + and check_and_flush_chan + (cell:Il.cell) + (slot:Ast.slot) + : unit = + let ty = strip_mutable_or_constrained_ty (slot_ty slot) in + match simplified_ty ty with + Ast.TY_chan _ -> + annotate "check_and_flush_chan, flush_chan"; + let rc = box_rc_cell cell in + emit (Il.cmp (Il.Cell rc) one); + let jump = mark () in + emit (Il.jmp Il.JNE Il.CodeNone); + trans_void_upcall "upcall_flush_chan" [| Il.Cell cell |]; + patch jump; + | _ -> () + and drop_slots_at_curr_stmt _ : unit = let stmt = Stack.top curr_stmt in match htab_search cx.ctxt_post_stmt_slot_drops stmt with diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index f107d287..2a0a61db 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -18,9 +18,11 @@ rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) : } rust_chan::~rust_chan() { - if (port && !port->is_proxy()) { - port->delegate()->chans.swap_delete(this); - } + task->log(rust_log::MEM | rust_log::COMM, + "del rust_chan(task=0x%" PRIxPTR ")", (uintptr_t) this); + + A(task->dom, is_associated() == false, + "Channel must be disassociated before being freed."); } /** @@ -28,7 +30,10 @@ rust_chan::~rust_chan() { */ void rust_chan::associate(maybe_proxy<rust_port> *port) { this->port = port; - if (!port->is_proxy()) { + if (port->is_proxy() == false) { + task->log(rust_log::TASK, + "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR, + this, port); this->port->delegate()->chans.push(this); } } @@ -43,14 +48,23 @@ bool rust_chan::is_associated() { void rust_chan::disassociate() { A(task->dom, is_associated(), "Channel must be associated with a port."); + if (port->is_proxy() == false) { + task->log(rust_log::TASK, + "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR, + this, port->delegate()); + port->delegate()->chans.swap_delete(this); + } + // Delete reference to the port. port = NULL; } /** - * Attempt to transmit channel data to the associated port. + * Attempt to send data to the associated port. */ -void rust_chan::transmit() { +void rust_chan::send(void *sptr) { + buffer.enqueue(sptr); + rust_dom *dom = task->dom; if (!is_associated()) { W(dom, is_associated(), @@ -81,7 +95,6 @@ void rust_chan::transmit() { return; } - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 055e359a..6aa98247 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -17,7 +17,7 @@ public: void disassociate(); bool is_associated(); - void transmit(); + void send(void *sptr); }; // diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index b66fca82..af147252 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -237,7 +237,6 @@ rust_dom::reap_dead_tasks() { rust_task *task = dead_tasks[i]; if (task->ref_count == 0) { I(this, task->tasks_waiting_to_join.is_empty()); - dead_tasks.swap_delete(task); log(rust_log::TASK, "deleting unreferenced dead task 0x%" PRIxPTR, task); @@ -392,10 +391,9 @@ rust_dom::start_main_loop() // if progress is made in other domains. if (scheduled_task == NULL) { - log(rust_log::TASK, - "all tasks are blocked, waiting for progress ..."); - if (_log.is_tracing(rust_log::TASK)) + if (_log.is_tracing(rust_log::TASK)) { log_state(); + } log(rust_log::TASK, "all tasks are blocked, scheduler yielding ..."); sync::yield(); @@ -437,18 +435,6 @@ rust_dom::start_main_loop() log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ..."); while (dead_tasks.length() > 0) { - log(rust_log::DOM, - "waiting for %d dead tasks to become dereferenced ...", - dead_tasks.length()); - - if (_log.is_tracing(rust_log::DOM)) { - for (size_t i = 0; i < dead_tasks.length(); i++) { - log(rust_log::DOM, - "task: 0x%" PRIxPTR ", index: %d, ref_count: %d", - dead_tasks[i], i, dead_tasks[i]->ref_count); - } - } - if (_incoming_message_queue.is_empty()) { log(rust_log::DOM, "waiting for %d dead tasks to become dereferenced, " diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp index 8b396b4d..1de804c9 100644 --- a/src/rt/rust_message.cpp +++ b/src/rt/rust_message.cpp @@ -90,8 +90,7 @@ send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source, } void data_message::process() { - _port->remote_channel->buffer.enqueue(_buffer); - _port->remote_channel->transmit(); + _port->remote_channel->send(_buffer); _target->log(rust_log::COMM, "<=== received data via message ==="); } diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp index bd9af6f9..c97b5d41 100644 --- a/src/rt/rust_port.cpp +++ b/src/rt/rust_port.cpp @@ -21,14 +21,32 @@ rust_port::~rust_port() { // Disassociate channels from this port. while (chans.is_empty() == false) { - chans.pop()->disassociate(); + rust_chan *chan = chans.peek(); + chan->disassociate(); } - // We're the only ones holding a reference to the remote channel, so - // clean it up. delete remote_channel; } +bool rust_port::receive(void *dptr) { + for (uint32_t i = 0; i < chans.length(); i++) { + rust_chan *chan = chans[i]; + if (chan->buffer.is_empty() == false) { + chan->buffer.dequeue(dptr); + if (chan->buffer.is_empty() && chan->task->blocked()) { + task->log(rust_log::COMM, + "chan: 0x%" PRIxPTR + " is flushing, wakeup task: 0x%" PRIxPTR, + chan, chan->task); + chan->task->wakeup(this); + } + task->log(rust_log::COMM, "<=== read data ==="); + return true; + } + } + return false; +} + void rust_port::log_state() { task->log(rust_log::COMM, "rust_port: 0x%" PRIxPTR ", associated channel(s): %d", diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h index 3330701a..7a58f839 100644 --- a/src/rt/rust_port.h +++ b/src/rt/rust_port.h @@ -16,6 +16,7 @@ public: rust_port(rust_task *task, size_t unit_sz); ~rust_port(); void log_state(); + bool receive(void *dptr); }; // diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index c8831e8e..f52db868 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -323,6 +323,11 @@ get_callee_save_fp(uintptr_t *top_of_callee_saves) void rust_task::kill() { + if (dead()) { + // Task is already dead, can't kill what's already dead. + return; + } + // Note the distinction here: kill() is when you're in an upcall // from task A and want to force-fail task B, you do B->kill(). // If you want to fail yourself you do self->fail(upcall_nargs). diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 5040a157..039aa2fd 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -98,26 +98,50 @@ upcall_new_chan(rust_task *task, rust_port *port) { } /** + * Called whenever this channel needs to be flushed. This can happen due to a + * flush statement, or automatically whenever a channel's ref count is + * about to drop to zero. + */ +extern "C" CDECL void +upcall_flush_chan(rust_task *task, rust_chan *chan) { + LOG_UPCALL_ENTRY(task); + rust_dom *dom = task->dom; + task->log(rust_log::UPCALL | rust_log::COMM, + "flush chan: 0x%" PRIxPTR, chan); + + if (chan->buffer.is_empty()) { + return; + } + + A(dom, chan->port->is_proxy() == false, + "Channels to remote ports should be flushed automatically."); + + // Block on the port until this channel has been completely drained + // by the port. + task->block(chan->port); + task->yield(2); +} + +/** * Called whenever the channel's ref count drops to zero. + * + * Cannot Yield: If the task were to unwind, the dropped ref would still + * appear to be live, causing modify-after-free errors. */ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - rust_dom *dom = task->dom; + task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); - I(dom, !chan->ref_count); - - if (!chan->buffer.is_empty() && chan->is_associated()) { - A(dom, !chan->port->is_proxy(), - "Channels to remote ports should be flushed automatically."); - // A target port may still be reading from this channel. - // Block on this channel until it has been completely drained - // by the port. - task->block(chan); - task->yield(2); - return; - } + A(task->dom, chan->ref_count == 0, + "Channel's ref count should be zero."); + + if (chan->is_associated()) { + A(task->dom, chan->buffer.is_empty(), + "Channel's buffer should be empty."); + chan->disassociate(); + } delete chan; } @@ -183,8 +207,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) { "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d", (uintptr_t) chan, (uintptr_t) sptr, chan->port->delegate()->unit_sz); - chan->buffer.enqueue(sptr); - chan->transmit(); + chan->send(sptr); task->log(rust_log::COMM, "=== sent data ===>"); } @@ -197,17 +220,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { (uintptr_t) port, (uintptr_t) dptr, port->unit_sz, port->chans.length()); - for (uint32_t i = 0; i < port->chans.length(); i++) { - rust_chan *chan = port->chans[i]; - if (chan->buffer.is_empty() == false) { - chan->buffer.dequeue(dptr); - if (chan->buffer.is_empty() && chan->task->blocked()) { - chan->task->wakeup(chan); - delete chan; - } - task->log(rust_log::COMM, "<=== read data ==="); - return; - } + if (port->receive(dptr)) { + return; } // No data was buffered on any incoming channel, so block this task @@ -260,6 +274,8 @@ upcall_exit(rust_task *task) { LOG_UPCALL_ENTRY(task); task->log(rust_log::UPCALL | rust_log::TASK, "task ref_count: %d", task->ref_count); + A(task->dom, task->ref_count >= 0, + "Task ref_count should not be negative on exit!"); task->die(); task->notify_tasks_waiting_to_join(); task->yield(1); |