aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/boot/me/trans.ml20
-rw-r--r--src/rt/rust_chan.cpp27
-rw-r--r--src/rt/rust_chan.h2
-rw-r--r--src/rt/rust_dom.cpp18
-rw-r--r--src/rt/rust_message.cpp3
-rw-r--r--src/rt/rust_port.cpp24
-rw-r--r--src/rt/rust_port.h1
-rw-r--r--src/rt/rust_task.cpp5
-rw-r--r--src/rt/rust_upcall.cpp68
9 files changed, 113 insertions, 55 deletions
diff --git a/src/boot/me/trans.ml b/src/boot/me/trans.ml
index b708bb26..97dce2b2 100644
--- a/src/boot/me/trans.ml
+++ b/src/boot/me/trans.ml
@@ -2932,6 +2932,7 @@ let trans_visitor
(slot:Ast.slot)
(curr_iso:Ast.ty_iso option)
: unit =
+ check_and_flush_chan cell slot;
drop_slot (get_ty_params_of_current_frame()) cell slot curr_iso
and drop_ty_in_current_frame
@@ -4188,6 +4189,25 @@ let trans_visitor
let last_jumps = Array.map trans_arm at.Ast.alt_tag_arms in
Array.iter patch last_jumps
+ (* If we're about to drop a channel, synthesize an upcall_flush_chan.
+ * TODO: This should rather appear in a chan dtor when chans become
+ * objects. *)
+ and check_and_flush_chan
+ (cell:Il.cell)
+ (slot:Ast.slot)
+ : unit =
+ let ty = strip_mutable_or_constrained_ty (slot_ty slot) in
+ match simplified_ty ty with
+ Ast.TY_chan _ ->
+ annotate "check_and_flush_chan, flush_chan";
+ let rc = box_rc_cell cell in
+ emit (Il.cmp (Il.Cell rc) one);
+ let jump = mark () in
+ emit (Il.jmp Il.JNE Il.CodeNone);
+ trans_void_upcall "upcall_flush_chan" [| Il.Cell cell |];
+ patch jump;
+ | _ -> ()
+
and drop_slots_at_curr_stmt _ : unit =
let stmt = Stack.top curr_stmt in
match htab_search cx.ctxt_post_stmt_slot_drops stmt with
diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp
index f107d287..2a0a61db 100644
--- a/src/rt/rust_chan.cpp
+++ b/src/rt/rust_chan.cpp
@@ -18,9 +18,11 @@ rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) :
}
rust_chan::~rust_chan() {
- if (port && !port->is_proxy()) {
- port->delegate()->chans.swap_delete(this);
- }
+ task->log(rust_log::MEM | rust_log::COMM,
+ "del rust_chan(task=0x%" PRIxPTR ")", (uintptr_t) this);
+
+ A(task->dom, is_associated() == false,
+ "Channel must be disassociated before being freed.");
}
/**
@@ -28,7 +30,10 @@ rust_chan::~rust_chan() {
*/
void rust_chan::associate(maybe_proxy<rust_port> *port) {
this->port = port;
- if (!port->is_proxy()) {
+ if (port->is_proxy() == false) {
+ task->log(rust_log::TASK,
+ "associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
+ this, port);
this->port->delegate()->chans.push(this);
}
}
@@ -43,14 +48,23 @@ bool rust_chan::is_associated() {
void rust_chan::disassociate() {
A(task->dom, is_associated(), "Channel must be associated with a port.");
+ if (port->is_proxy() == false) {
+ task->log(rust_log::TASK,
+ "disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
+ this, port->delegate());
+ port->delegate()->chans.swap_delete(this);
+ }
+
// Delete reference to the port.
port = NULL;
}
/**
- * Attempt to transmit channel data to the associated port.
+ * Attempt to send data to the associated port.
*/
-void rust_chan::transmit() {
+void rust_chan::send(void *sptr) {
+ buffer.enqueue(sptr);
+
rust_dom *dom = task->dom;
if (!is_associated()) {
W(dom, is_associated(),
@@ -81,7 +95,6 @@ void rust_chan::transmit() {
return;
}
-
//
// Local Variables:
// mode: C++
diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h
index 055e359a..6aa98247 100644
--- a/src/rt/rust_chan.h
+++ b/src/rt/rust_chan.h
@@ -17,7 +17,7 @@ public:
void disassociate();
bool is_associated();
- void transmit();
+ void send(void *sptr);
};
//
diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp
index b66fca82..af147252 100644
--- a/src/rt/rust_dom.cpp
+++ b/src/rt/rust_dom.cpp
@@ -237,7 +237,6 @@ rust_dom::reap_dead_tasks() {
rust_task *task = dead_tasks[i];
if (task->ref_count == 0) {
I(this, task->tasks_waiting_to_join.is_empty());
-
dead_tasks.swap_delete(task);
log(rust_log::TASK,
"deleting unreferenced dead task 0x%" PRIxPTR, task);
@@ -392,10 +391,9 @@ rust_dom::start_main_loop()
// if progress is made in other domains.
if (scheduled_task == NULL) {
- log(rust_log::TASK,
- "all tasks are blocked, waiting for progress ...");
- if (_log.is_tracing(rust_log::TASK))
+ if (_log.is_tracing(rust_log::TASK)) {
log_state();
+ }
log(rust_log::TASK,
"all tasks are blocked, scheduler yielding ...");
sync::yield();
@@ -437,18 +435,6 @@ rust_dom::start_main_loop()
log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ...");
while (dead_tasks.length() > 0) {
- log(rust_log::DOM,
- "waiting for %d dead tasks to become dereferenced ...",
- dead_tasks.length());
-
- if (_log.is_tracing(rust_log::DOM)) {
- for (size_t i = 0; i < dead_tasks.length(); i++) {
- log(rust_log::DOM,
- "task: 0x%" PRIxPTR ", index: %d, ref_count: %d",
- dead_tasks[i], i, dead_tasks[i]->ref_count);
- }
- }
-
if (_incoming_message_queue.is_empty()) {
log(rust_log::DOM,
"waiting for %d dead tasks to become dereferenced, "
diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp
index 8b396b4d..1de804c9 100644
--- a/src/rt/rust_message.cpp
+++ b/src/rt/rust_message.cpp
@@ -90,8 +90,7 @@ send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source,
}
void data_message::process() {
- _port->remote_channel->buffer.enqueue(_buffer);
- _port->remote_channel->transmit();
+ _port->remote_channel->send(_buffer);
_target->log(rust_log::COMM, "<=== received data via message ===");
}
diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp
index bd9af6f9..c97b5d41 100644
--- a/src/rt/rust_port.cpp
+++ b/src/rt/rust_port.cpp
@@ -21,14 +21,32 @@ rust_port::~rust_port() {
// Disassociate channels from this port.
while (chans.is_empty() == false) {
- chans.pop()->disassociate();
+ rust_chan *chan = chans.peek();
+ chan->disassociate();
}
- // We're the only ones holding a reference to the remote channel, so
- // clean it up.
delete remote_channel;
}
+bool rust_port::receive(void *dptr) {
+ for (uint32_t i = 0; i < chans.length(); i++) {
+ rust_chan *chan = chans[i];
+ if (chan->buffer.is_empty() == false) {
+ chan->buffer.dequeue(dptr);
+ if (chan->buffer.is_empty() && chan->task->blocked()) {
+ task->log(rust_log::COMM,
+ "chan: 0x%" PRIxPTR
+ " is flushing, wakeup task: 0x%" PRIxPTR,
+ chan, chan->task);
+ chan->task->wakeup(this);
+ }
+ task->log(rust_log::COMM, "<=== read data ===");
+ return true;
+ }
+ }
+ return false;
+}
+
void rust_port::log_state() {
task->log(rust_log::COMM,
"rust_port: 0x%" PRIxPTR ", associated channel(s): %d",
diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h
index 3330701a..7a58f839 100644
--- a/src/rt/rust_port.h
+++ b/src/rt/rust_port.h
@@ -16,6 +16,7 @@ public:
rust_port(rust_task *task, size_t unit_sz);
~rust_port();
void log_state();
+ bool receive(void *dptr);
};
//
diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp
index c8831e8e..f52db868 100644
--- a/src/rt/rust_task.cpp
+++ b/src/rt/rust_task.cpp
@@ -323,6 +323,11 @@ get_callee_save_fp(uintptr_t *top_of_callee_saves)
void
rust_task::kill() {
+ if (dead()) {
+ // Task is already dead, can't kill what's already dead.
+ return;
+ }
+
// Note the distinction here: kill() is when you're in an upcall
// from task A and want to force-fail task B, you do B->kill().
// If you want to fail yourself you do self->fail(upcall_nargs).
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index 5040a157..039aa2fd 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -98,26 +98,50 @@ upcall_new_chan(rust_task *task, rust_port *port) {
}
/**
+ * Called whenever this channel needs to be flushed. This can happen due to a
+ * flush statement, or automatically whenever a channel's ref count is
+ * about to drop to zero.
+ */
+extern "C" CDECL void
+upcall_flush_chan(rust_task *task, rust_chan *chan) {
+ LOG_UPCALL_ENTRY(task);
+ rust_dom *dom = task->dom;
+ task->log(rust_log::UPCALL | rust_log::COMM,
+ "flush chan: 0x%" PRIxPTR, chan);
+
+ if (chan->buffer.is_empty()) {
+ return;
+ }
+
+ A(dom, chan->port->is_proxy() == false,
+ "Channels to remote ports should be flushed automatically.");
+
+ // Block on the port until this channel has been completely drained
+ // by the port.
+ task->block(chan->port);
+ task->yield(2);
+}
+
+/**
* Called whenever the channel's ref count drops to zero.
+ *
+ * Cannot Yield: If the task were to unwind, the dropped ref would still
+ * appear to be live, causing modify-after-free errors.
*/
extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
- rust_dom *dom = task->dom;
+
task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
"upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
- I(dom, !chan->ref_count);
-
- if (!chan->buffer.is_empty() && chan->is_associated()) {
- A(dom, !chan->port->is_proxy(),
- "Channels to remote ports should be flushed automatically.");
- // A target port may still be reading from this channel.
- // Block on this channel until it has been completely drained
- // by the port.
- task->block(chan);
- task->yield(2);
- return;
- }
+ A(task->dom, chan->ref_count == 0,
+ "Channel's ref count should be zero.");
+
+ if (chan->is_associated()) {
+ A(task->dom, chan->buffer.is_empty(),
+ "Channel's buffer should be empty.");
+ chan->disassociate();
+ }
delete chan;
}
@@ -183,8 +207,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
"chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
(uintptr_t) chan, (uintptr_t) sptr,
chan->port->delegate()->unit_sz);
- chan->buffer.enqueue(sptr);
- chan->transmit();
+ chan->send(sptr);
task->log(rust_log::COMM, "=== sent data ===>");
}
@@ -197,17 +220,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
port->chans.length());
- for (uint32_t i = 0; i < port->chans.length(); i++) {
- rust_chan *chan = port->chans[i];
- if (chan->buffer.is_empty() == false) {
- chan->buffer.dequeue(dptr);
- if (chan->buffer.is_empty() && chan->task->blocked()) {
- chan->task->wakeup(chan);
- delete chan;
- }
- task->log(rust_log::COMM, "<=== read data ===");
- return;
- }
+ if (port->receive(dptr)) {
+ return;
}
// No data was buffered on any incoming channel, so block this task
@@ -260,6 +274,8 @@ upcall_exit(rust_task *task) {
LOG_UPCALL_ENTRY(task);
task->log(rust_log::UPCALL | rust_log::TASK,
"task ref_count: %d", task->ref_count);
+ A(task->dom, task->ref_count >= 0,
+ "Task ref_count should not be negative on exit!");
task->die();
task->notify_tasks_waiting_to_join();
task->yield(1);