diff options
Diffstat (limited to 'src/rt/rust_upcall.cpp')
| -rw-r--r-- | src/rt/rust_upcall.cpp | 51 |
1 files changed, 34 insertions, 17 deletions
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 42203bec..574cb703 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -21,11 +21,6 @@ extern "C" CDECL char const *str_buf(rust_task *task, rust_str *s); -inline bool -requires_message_passing(rust_task *sender, rust_task *receiver) { - return sender->dom != receiver->dom; -} - extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { LOG_UPCALL_ENTRY(task); task->grow(n_frame_bytes); @@ -96,6 +91,18 @@ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); I(dom, !chan->ref_count); + + if (!chan->buffer.is_empty() && chan->is_associated()) { + A(dom, !chan->port->is_proxy(), + "Channels to remote ports should be flushed automatically."); + // A target port may still be reading from this channel. + // Block on this channel until it has been completely drained + // by the port. + task->block(chan); + task->yield(2); + return; + } + delete chan; } @@ -105,14 +112,19 @@ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { */ extern "C" CDECL rust_chan * upcall_clone_chan(rust_task *task, - maybe_proxy<rust_task> *spawnee_proxy, + maybe_proxy<rust_task> *target, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - rust_task *spawnee = spawnee_proxy->delegate(); - task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, - "spawnee: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR, - (uintptr_t) spawnee, (uintptr_t) chan); - return new (spawnee->dom) rust_chan(spawnee, chan->port); + task->log(rust_log::UPCALL | rust_log::COMM, + "target: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR, + target, chan); + rust_task *target_task = target->delegate(); + maybe_proxy<rust_port> *port = chan->port; + if (target->is_proxy()) { + port = target_task->dom->get_port_proxy_synchronized( + chan->port->as_delegate()); + } + return new (target_task->dom) rust_chan(target_task, port); } extern "C" CDECL void upcall_yield(rust_task *task) { @@ -145,20 +157,20 @@ upcall_join(rust_task *task, maybe_proxy<rust_task> *target) { } /** - * Sends an chunk of data along the specified channel. + * Buffers a chunk of data in the specified channel. * - * sptr: pointer to a chunk of data to send + * sptr: pointer to a chunk of data to buffer */ extern "C" CDECL void upcall_send(rust_task *task, rust_chan *chan, void *sptr) { LOG_UPCALL_ENTRY(task); task->log(rust_log::UPCALL | rust_log::COMM, "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d", - (uintptr_t) chan, (uintptr_t) sptr, chan->port->unit_sz); - + (uintptr_t) chan, (uintptr_t) sptr, + chan->port->delegate()->unit_sz); chan->buffer.enqueue(sptr); chan->transmit(); - task->log(rust_log::COMM, "=== WROTE DATA ===>"); + task->log(rust_log::COMM, "=== sent data ===>"); } extern "C" CDECL void @@ -174,7 +186,11 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { rust_chan *chan = port->chans[i]; if (chan->buffer.is_empty() == false) { chan->buffer.dequeue(dptr); - task->log(rust_log::COMM, "<=== READ DATA ==="); + if (chan->buffer.is_empty() && chan->task->blocked()) { + chan->task->wakeup(chan); + delete chan; + } + task->log(rust_log::COMM, "<=== read data ==="); return; } } @@ -183,6 +199,7 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { // on the port. Remember the rendezvous location so that any sender // task can write to it before waking up this task. + task->log(rust_log::COMM, "<=== waiting for rendezvous data ==="); task->rendezvous_ptr = dptr; task->block(port); task->yield(3); |