aboutsummaryrefslogtreecommitdiff
path: root/src/rt/rust_upcall.cpp
diff options
context:
space:
mode:
authorMichael Bebenita <[email protected]>2010-07-28 16:46:13 -0700
committerGraydon Hoare <[email protected]>2010-07-28 20:30:29 -0700
commit4246d567b7a999155524669e3b0419e8f71080b1 (patch)
tree38ab4d3aa04b25e37db9e663890235edf2f3a8a6 /src/rt/rust_upcall.cpp
parentMove notification-messages out into their own file and unify into notify_mess... (diff)
downloadrust-4246d567b7a999155524669e3b0419e8f71080b1.tar.xz
rust-4246d567b7a999155524669e3b0419e8f71080b1.zip
Move ports out into their own file, add data_message and make communication system use it (and proxies) instead of existing token scheme.
Diffstat (limited to 'src/rt/rust_upcall.cpp')
-rw-r--r--src/rt/rust_upcall.cpp51
1 files changed, 34 insertions, 17 deletions
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index 42203bec..574cb703 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -21,11 +21,6 @@
extern "C" CDECL char const *str_buf(rust_task *task, rust_str *s);
-inline bool
-requires_message_passing(rust_task *sender, rust_task *receiver) {
- return sender->dom != receiver->dom;
-}
-
extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) {
LOG_UPCALL_ENTRY(task);
task->grow(n_frame_bytes);
@@ -96,6 +91,18 @@ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
"upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
I(dom, !chan->ref_count);
+
+ if (!chan->buffer.is_empty() && chan->is_associated()) {
+ A(dom, !chan->port->is_proxy(),
+ "Channels to remote ports should be flushed automatically.");
+ // A target port may still be reading from this channel.
+ // Block on this channel until it has been completely drained
+ // by the port.
+ task->block(chan);
+ task->yield(2);
+ return;
+ }
+
delete chan;
}
@@ -105,14 +112,19 @@ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
*/
extern "C" CDECL rust_chan *
upcall_clone_chan(rust_task *task,
- maybe_proxy<rust_task> *spawnee_proxy,
+ maybe_proxy<rust_task> *target,
rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
- rust_task *spawnee = spawnee_proxy->delegate();
- task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
- "spawnee: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR,
- (uintptr_t) spawnee, (uintptr_t) chan);
- return new (spawnee->dom) rust_chan(spawnee, chan->port);
+ task->log(rust_log::UPCALL | rust_log::COMM,
+ "target: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR,
+ target, chan);
+ rust_task *target_task = target->delegate();
+ maybe_proxy<rust_port> *port = chan->port;
+ if (target->is_proxy()) {
+ port = target_task->dom->get_port_proxy_synchronized(
+ chan->port->as_delegate());
+ }
+ return new (target_task->dom) rust_chan(target_task, port);
}
extern "C" CDECL void upcall_yield(rust_task *task) {
@@ -145,20 +157,20 @@ upcall_join(rust_task *task, maybe_proxy<rust_task> *target) {
}
/**
- * Sends an chunk of data along the specified channel.
+ * Buffers a chunk of data in the specified channel.
*
- * sptr: pointer to a chunk of data to send
+ * sptr: pointer to a chunk of data to buffer
*/
extern "C" CDECL void
upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
LOG_UPCALL_ENTRY(task);
task->log(rust_log::UPCALL | rust_log::COMM,
"chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
- (uintptr_t) chan, (uintptr_t) sptr, chan->port->unit_sz);
-
+ (uintptr_t) chan, (uintptr_t) sptr,
+ chan->port->delegate()->unit_sz);
chan->buffer.enqueue(sptr);
chan->transmit();
- task->log(rust_log::COMM, "=== WROTE DATA ===>");
+ task->log(rust_log::COMM, "=== sent data ===>");
}
extern "C" CDECL void
@@ -174,7 +186,11 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
rust_chan *chan = port->chans[i];
if (chan->buffer.is_empty() == false) {
chan->buffer.dequeue(dptr);
- task->log(rust_log::COMM, "<=== READ DATA ===");
+ if (chan->buffer.is_empty() && chan->task->blocked()) {
+ chan->task->wakeup(chan);
+ delete chan;
+ }
+ task->log(rust_log::COMM, "<=== read data ===");
return;
}
}
@@ -183,6 +199,7 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
// on the port. Remember the rendezvous location so that any sender
// task can write to it before waking up this task.
+ task->log(rust_log::COMM, "<=== waiting for rendezvous data ===");
task->rendezvous_ptr = dptr;
task->block(port);
task->yield(3);