aboutsummaryrefslogtreecommitdiff
path: root/src/rt/rust_upcall.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/rt/rust_upcall.cpp')
-rw-r--r--src/rt/rust_upcall.cpp68
1 files changed, 42 insertions, 26 deletions
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index 5040a157..039aa2fd 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -98,26 +98,50 @@ upcall_new_chan(rust_task *task, rust_port *port) {
}
/**
+ * Called whenever this channel needs to be flushed. This can happen due to a
+ * flush statement, or automatically whenever a channel's ref count is
+ * about to drop to zero.
+ */
+extern "C" CDECL void
+upcall_flush_chan(rust_task *task, rust_chan *chan) {
+ LOG_UPCALL_ENTRY(task);
+ rust_dom *dom = task->dom;
+ task->log(rust_log::UPCALL | rust_log::COMM,
+ "flush chan: 0x%" PRIxPTR, chan);
+
+ if (chan->buffer.is_empty()) {
+ return;
+ }
+
+ A(dom, chan->port->is_proxy() == false,
+ "Channels to remote ports should be flushed automatically.");
+
+ // Block on the port until this channel has been completely drained
+ // by the port.
+ task->block(chan->port);
+ task->yield(2);
+}
+
+/**
* Called whenever the channel's ref count drops to zero.
+ *
+ * Cannot Yield: If the task were to unwind, the dropped ref would still
+ * appear to be live, causing modify-after-free errors.
*/
extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
- rust_dom *dom = task->dom;
+
task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
"upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
- I(dom, !chan->ref_count);
-
- if (!chan->buffer.is_empty() && chan->is_associated()) {
- A(dom, !chan->port->is_proxy(),
- "Channels to remote ports should be flushed automatically.");
- // A target port may still be reading from this channel.
- // Block on this channel until it has been completely drained
- // by the port.
- task->block(chan);
- task->yield(2);
- return;
- }
+ A(task->dom, chan->ref_count == 0,
+ "Channel's ref count should be zero.");
+
+ if (chan->is_associated()) {
+ A(task->dom, chan->buffer.is_empty(),
+ "Channel's buffer should be empty.");
+ chan->disassociate();
+ }
delete chan;
}
@@ -183,8 +207,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
"chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
(uintptr_t) chan, (uintptr_t) sptr,
chan->port->delegate()->unit_sz);
- chan->buffer.enqueue(sptr);
- chan->transmit();
+ chan->send(sptr);
task->log(rust_log::COMM, "=== sent data ===>");
}
@@ -197,17 +220,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
port->chans.length());
- for (uint32_t i = 0; i < port->chans.length(); i++) {
- rust_chan *chan = port->chans[i];
- if (chan->buffer.is_empty() == false) {
- chan->buffer.dequeue(dptr);
- if (chan->buffer.is_empty() && chan->task->blocked()) {
- chan->task->wakeup(chan);
- delete chan;
- }
- task->log(rust_log::COMM, "<=== read data ===");
- return;
- }
+ if (port->receive(dptr)) {
+ return;
}
// No data was buffered on any incoming channel, so block this task
@@ -260,6 +274,8 @@ upcall_exit(rust_task *task) {
LOG_UPCALL_ENTRY(task);
task->log(rust_log::UPCALL | rust_log::TASK,
"task ref_count: %d", task->ref_count);
+ A(task->dom, task->ref_count >= 0,
+ "Task ref_count should not be negative on exit!");
task->die();
task->notify_tasks_waiting_to_join();
task->yield(1);