aboutsummaryrefslogtreecommitdiff
path: root/src/rt/rust_chan.cpp
diff options
context:
space:
mode:
authorMichael Bebenita <[email protected]>2010-07-28 16:46:13 -0700
committerGraydon Hoare <[email protected]>2010-07-28 20:30:29 -0700
commit4246d567b7a999155524669e3b0419e8f71080b1 (patch)
tree38ab4d3aa04b25e37db9e663890235edf2f3a8a6 /src/rt/rust_chan.cpp
parentMove notification-messages out into their own file and unify into notify_mess... (diff)
downloadrust-4246d567b7a999155524669e3b0419e8f71080b1.tar.xz
rust-4246d567b7a999155524669e3b0419e8f71080b1.zip
Move ports out into their own file, add data_message and make communication system use it (and proxies) instead of existing token scheme.
Diffstat (limited to 'src/rt/rust_chan.cpp')
-rw-r--r--src/rt/rust_chan.cpp81
1 files changed, 51 insertions, 30 deletions
diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp
index 43744a11..f107d287 100644
--- a/src/rt/rust_chan.cpp
+++ b/src/rt/rust_chan.cpp
@@ -1,12 +1,14 @@
#include "rust_internal.h"
#include "rust_chan.h"
-rust_chan::rust_chan(rust_task *task, rust_port *port) :
- task(task), port(port), buffer(task->dom, port->unit_sz), token(this) {
+/**
+ * Create a new rust channel and associate it with the specified port.
+ */
+rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) :
+ task(task), port(port), buffer(task->dom, port->delegate()->unit_sz) {
if (port) {
- port->chans.push(this);
- ref();
+ associate(port);
}
task->log(rust_log::MEM | rust_log::COMM,
@@ -16,49 +18,68 @@ rust_chan::rust_chan(rust_task *task, rust_port *port) :
}
rust_chan::~rust_chan() {
- if (port) {
- if (token.pending())
- token.withdraw();
- port->chans.swap_delete(this);
+ if (port && !port->is_proxy()) {
+ port->delegate()->chans.swap_delete(this);
}
}
-void rust_chan::disassociate() {
- I(task->dom, port);
+/**
+ * Link this channel with the specified port.
+ */
+void rust_chan::associate(maybe_proxy<rust_port> *port) {
+ this->port = port;
+ if (!port->is_proxy()) {
+ this->port->delegate()->chans.push(this);
+ }
+}
- if (token.pending())
- token.withdraw();
+bool rust_chan::is_associated() {
+ return port != NULL;
+}
- // Delete reference to the port/
- port = NULL;
+/**
+ * Unlink this channel from its associated port.
+ */
+void rust_chan::disassociate() {
+ A(task->dom, is_associated(), "Channel must be associated with a port.");
- deref();
+ // Delete reference to the port.
+ port = NULL;
}
/**
* Attempt to transmit channel data to the associated port.
*/
-int rust_chan::transmit() {
+void rust_chan::transmit() {
rust_dom *dom = task->dom;
-
- // TODO: Figure out how and why the port would become null.
- if (port == NULL) {
- dom->log(rust_log::COMM, "invalid port, transmission incomplete");
- return ERROR;
+ if (!is_associated()) {
+ W(dom, is_associated(),
+ "rust_chan::transmit with no associated port.");
+ return;
}
- if (buffer.is_empty()) {
- dom->log(rust_log::COMM, "buffer is empty, transmission incomplete");
- return ERROR;
- }
+ A(dom, !buffer.is_empty(),
+ "rust_chan::transmit with nothing to send.");
- if(port->task->blocked_on(port)) {
- buffer.dequeue(port->task->rendezvous_ptr);
- port->task->wakeup(port);
+ if (port->is_proxy()) {
+ // TODO: Cache port task locally.
+ rust_proxy<rust_task> *port_task =
+ dom->get_task_proxy(port->delegate()->task);
+ data_message::send(buffer.peek(), buffer.unit_sz,
+ "send data", task, port_task, port->as_proxy());
+ buffer.dequeue(NULL);
+ } else {
+ rust_port *target_port = port->delegate();
+ if (target_port->task->blocked_on(target_port)) {
+ dom->log(rust_log::COMM, "dequeued in rendezvous_ptr");
+ buffer.dequeue(target_port->task->rendezvous_ptr);
+ target_port->task->rendezvous_ptr = 0;
+ target_port->task->wakeup(target_port);
+ return;
+ }
}
- return 0;
-
+ return;
}
//