aboutsummaryrefslogtreecommitdiff
path: root/src/rt
diff options
context:
space:
mode:
authorMichael Bebenita <[email protected]>2010-07-28 16:46:13 -0700
committerGraydon Hoare <[email protected]>2010-07-28 20:30:29 -0700
commit4246d567b7a999155524669e3b0419e8f71080b1 (patch)
tree38ab4d3aa04b25e37db9e663890235edf2f3a8a6 /src/rt
parentMove notification-messages out into their own file and unify into notify_mess... (diff)
downloadrust-4246d567b7a999155524669e3b0419e8f71080b1.tar.xz
rust-4246d567b7a999155524669e3b0419e8f71080b1.zip
Move ports out into their own file, add data_message and make communication system use it (and proxies) instead of existing token scheme.
Diffstat (limited to 'src/rt')
-rw-r--r--src/rt/rust_chan.cpp81
-rw-r--r--src/rt/rust_chan.h19
-rw-r--r--src/rt/rust_comm.cpp74
-rw-r--r--src/rt/rust_dom.cpp31
-rw-r--r--src/rt/rust_dom.h2
-rw-r--r--src/rt/rust_internal.h30
-rw-r--r--src/rt/rust_message.cpp32
-rw-r--r--src/rt/rust_message.h24
-rw-r--r--src/rt/rust_port.cpp39
-rw-r--r--src/rt/rust_port.h31
-rw-r--r--src/rt/rust_task.cpp14
-rw-r--r--src/rt/rust_task.h4
-rw-r--r--src/rt/rust_upcall.cpp51
13 files changed, 254 insertions, 178 deletions
diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp
index 43744a11..f107d287 100644
--- a/src/rt/rust_chan.cpp
+++ b/src/rt/rust_chan.cpp
@@ -1,12 +1,14 @@
#include "rust_internal.h"
#include "rust_chan.h"
-rust_chan::rust_chan(rust_task *task, rust_port *port) :
- task(task), port(port), buffer(task->dom, port->unit_sz), token(this) {
+/**
+ * Create a new rust channel and associate it with the specified port.
+ */
+rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) :
+ task(task), port(port), buffer(task->dom, port->delegate()->unit_sz) {
if (port) {
- port->chans.push(this);
- ref();
+ associate(port);
}
task->log(rust_log::MEM | rust_log::COMM,
@@ -16,49 +18,68 @@ rust_chan::rust_chan(rust_task *task, rust_port *port) :
}
rust_chan::~rust_chan() {
- if (port) {
- if (token.pending())
- token.withdraw();
- port->chans.swap_delete(this);
+ if (port && !port->is_proxy()) {
+ port->delegate()->chans.swap_delete(this);
}
}
-void rust_chan::disassociate() {
- I(task->dom, port);
+/**
+ * Link this channel with the specified port.
+ */
+void rust_chan::associate(maybe_proxy<rust_port> *port) {
+ this->port = port;
+ if (!port->is_proxy()) {
+ this->port->delegate()->chans.push(this);
+ }
+}
- if (token.pending())
- token.withdraw();
+bool rust_chan::is_associated() {
+ return port != NULL;
+}
- // Delete reference to the port/
- port = NULL;
+/**
+ * Unlink this channel from its associated port.
+ */
+void rust_chan::disassociate() {
+ A(task->dom, is_associated(), "Channel must be associated with a port.");
- deref();
+ // Delete reference to the port.
+ port = NULL;
}
/**
* Attempt to transmit channel data to the associated port.
*/
-int rust_chan::transmit() {
+void rust_chan::transmit() {
rust_dom *dom = task->dom;
-
- // TODO: Figure out how and why the port would become null.
- if (port == NULL) {
- dom->log(rust_log::COMM, "invalid port, transmission incomplete");
- return ERROR;
+ if (!is_associated()) {
+ W(dom, is_associated(),
+ "rust_chan::transmit with no associated port.");
+ return;
}
- if (buffer.is_empty()) {
- dom->log(rust_log::COMM, "buffer is empty, transmission incomplete");
- return ERROR;
- }
+ A(dom, !buffer.is_empty(),
+ "rust_chan::transmit with nothing to send.");
- if(port->task->blocked_on(port)) {
- buffer.dequeue(port->task->rendezvous_ptr);
- port->task->wakeup(port);
+ if (port->is_proxy()) {
+ // TODO: Cache port task locally.
+ rust_proxy<rust_task> *port_task =
+ dom->get_task_proxy(port->delegate()->task);
+ data_message::send(buffer.peek(), buffer.unit_sz,
+ "send data", task, port_task, port->as_proxy());
+ buffer.dequeue(NULL);
+ } else {
+ rust_port *target_port = port->delegate();
+ if (target_port->task->blocked_on(target_port)) {
+ dom->log(rust_log::COMM, "dequeued in rendezvous_ptr");
+ buffer.dequeue(target_port->task->rendezvous_ptr);
+ target_port->task->rendezvous_ptr = 0;
+ target_port->task->wakeup(target_port);
+ return;
+ }
}
- return 0;
-
+ return;
}
//
diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h
index 1d24ee78..055e359a 100644
--- a/src/rt/rust_chan.h
+++ b/src/rt/rust_chan.h
@@ -1,24 +1,23 @@
-
#ifndef RUST_CHAN_H
#define RUST_CHAN_H
-class rust_chan : public rc_base<rust_chan>, public task_owned<rust_chan> {
+class rust_chan : public rc_base<rust_chan>,
+ public task_owned<rust_chan>,
+ public rust_cond {
public:
- rust_chan(rust_task *task, rust_port *port);
+ rust_chan(rust_task *task, maybe_proxy<rust_port> *port);
~rust_chan();
rust_task *task;
- rust_port *port;
+ maybe_proxy<rust_port> *port;
+ size_t idx;
circular_buffer buffer;
- size_t idx; // Index into port->chans.
-
- // Token belonging to this chan, it will be placed into a port's
- // writers vector if we have something to send to the port.
- rust_token token;
+ void associate(maybe_proxy<rust_port> *port);
void disassociate();
+ bool is_associated();
- int transmit();
+ void transmit();
};
//
diff --git a/src/rt/rust_comm.cpp b/src/rt/rust_comm.cpp
index 37929b8b..3fbaaa04 100644
--- a/src/rt/rust_comm.cpp
+++ b/src/rt/rust_comm.cpp
@@ -1,87 +1,13 @@
#include "rust_internal.h"
-template class ptr_vec<rust_token>;
template class ptr_vec<rust_alarm>;
-template class ptr_vec<rust_chan>;
rust_alarm::rust_alarm(rust_task *receiver) :
receiver(receiver)
{
}
-// Ports.
-
-rust_port::rust_port(rust_task *task, size_t unit_sz) :
- task(task),
- unit_sz(unit_sz),
- writers(task->dom),
- chans(task->dom)
-{
- task->log(rust_log::MEM|rust_log::COMM,
- "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
- PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
-}
-
-rust_port::~rust_port()
-{
- task->log(rust_log::COMM|rust_log::MEM,
- "~rust_port 0x%" PRIxPTR,
- (uintptr_t)this);
- while (chans.length() > 0)
- chans.pop()->disassociate();
-}
-
-
-// Tokens.
-
-rust_token::rust_token(rust_chan *chan) :
- chan(chan),
- idx(0),
- submitted(false)
-{
-}
-
-rust_token::~rust_token()
-{
-}
-
-bool
-rust_token::pending() const
-{
- return submitted;
-}
-
-void
-rust_token::submit()
-{
- rust_port *port = chan->port;
- rust_dom *dom = chan->task->dom;
-
- I(dom, port);
- I(dom, !submitted);
-
- port->writers.push(this);
- submitted = true;
-}
-
-void
-rust_token::withdraw()
-{
- rust_task *task = chan->task;
- rust_port *port = chan->port;
- rust_dom *dom = task->dom;
-
- I(dom, port);
- I(dom, submitted);
-
- if (task->blocked())
- task->wakeup(this); // must be blocked on us (or dead)
- port->writers.swap_delete(this);
- submitted = false;
-}
-
-
//
// Local Variables:
// mode: C++
diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp
index 4d1917f4..0c175e78 100644
--- a/src/rt/rust_dom.cpp
+++ b/src/rt/rust_dom.cpp
@@ -46,6 +46,14 @@ rust_dom::delete_proxies() {
" in dom %" PRIxPTR, task_proxy, task_proxy->dom);
delete task_proxy;
}
+
+ rust_port *port;
+ rust_proxy<rust_port> *port_proxy;
+ while (_port_proxies.pop(&port, &port_proxy)) {
+ log(rust_log::TASK, "deleting proxy %" PRIxPTR
+ " in dom %" PRIxPTR, port_proxy, port_proxy->dom);
+ delete port_proxy;
+ }
}
rust_dom::~rust_dom() {
@@ -217,7 +225,6 @@ rust_dom::reap_dead_tasks() {
for (size_t i = 0; i < dead_tasks.length(); ) {
rust_task *task = dead_tasks[i];
if (task->ref_count == 0) {
- I(this, !task->waiting_tasks.length());
I(this, task->tasks_waiting_to_join.is_empty());
dead_tasks.swap_delete(task);
@@ -270,6 +277,28 @@ rust_dom::get_task_proxy(rust_task *task) {
_task_proxies.put(task, proxy);
return proxy;
}
+
+/**
+ * Gets a proxy for this port.
+ *
+ * TODO: This method needs to be synchronized since it's usually called
+ * during upcall_clone_chan in a different thread. However, for now
+ * since this usually happens before the thread actually starts,
+ * we may get lucky without synchronizing.
+ *
+ */
+rust_proxy<rust_port> *
+rust_dom::get_port_proxy_synchronized(rust_port *port) {
+ rust_proxy<rust_port> *proxy = NULL;
+ if (_port_proxies.get(port, &proxy)) {
+ return proxy;
+ }
+ log(rust_log::COMM, "no proxy for 0x%" PRIxPTR, port);
+ proxy = new (this) rust_proxy<rust_port> (this, port, false);
+ _port_proxies.put(port, proxy);
+ return proxy;
+}
+
/**
* Schedules a running task for execution. Only running tasks can be
* activated. Blocked tasks have to be unblocked before they can be
diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h
index 2f162972..528790d5 100644
--- a/src/rt/rust_dom.h
+++ b/src/rt/rust_dom.h
@@ -37,6 +37,7 @@ struct rust_dom
condition_variable _progress;
hash_map<rust_task *, rust_proxy<rust_task> *> _task_proxies;
+ hash_map<rust_port *, rust_proxy<rust_port> *> _port_proxies;
// Incoming messages from other domains.
condition_variable _incoming_message_pending;
@@ -66,6 +67,7 @@ struct rust_dom
void drain_incoming_message_queue();
rust_proxy<rust_task> *get_task_proxy(rust_task *task);
void delete_proxies();
+ rust_proxy<rust_port> *get_port_proxy_synchronized(rust_port *port);
#ifdef __WIN32__
void win32_require(LPCTSTR fn, BOOL ok);
diff --git a/src/rt/rust_internal.h b/src/rt/rust_internal.h
index bc5835fe..a89144d7 100644
--- a/src/rt/rust_internal.h
+++ b/src/rt/rust_internal.h
@@ -580,37 +580,11 @@ struct gc_alloc {
}
};
+#include "circular_buffer.h"
#include "rust_proxy.h"
#include "rust_task.h"
-
-struct rust_port : public rc_base<rust_port>,
- public task_owned<rust_port>,
- public rust_cond {
- rust_task *task;
- size_t unit_sz;
- ptr_vec<rust_token> writers;
- ptr_vec<rust_chan> chans;
-
- rust_port(rust_task *task, size_t unit_sz);
- ~rust_port();
-};
-
-struct rust_token : public rust_cond {
- rust_chan *chan; // Link back to the channel this token belongs to
- size_t idx; // Index into port->writers.
- bool submitted; // Whether token is in a port->writers.
-
- rust_token(rust_chan *chan);
- ~rust_token();
-
- bool pending() const;
- void submit();
- void withdraw();
-};
-
-#include "circular_buffer.h"
-
#include "rust_chan.h"
+#include "rust_port.h"
//
// Local Variables:
diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp
index 22f8ffe5..8b396b4d 100644
--- a/src/rt/rust_message.cpp
+++ b/src/rt/rust_message.cpp
@@ -27,6 +27,19 @@ notify_message(notification_type type, const char* label,
rust_message(label, source, target), type(type) {
}
+data_message::
+data_message(uint8_t *buffer, size_t buffer_sz, const char* label,
+ rust_task *source, rust_task *target, rust_port *port) :
+ rust_message(label, source, target),
+ _buffer_sz(buffer_sz), _port(port) {
+ _buffer = (uint8_t *)malloc(buffer_sz);
+ memcpy(_buffer, buffer, buffer_sz);
+}
+
+data_message::~data_message() {
+ free (_buffer);
+}
+
/**
* Sends a message to the target task via a proxy. The message is allocated
* in the target task domain along with a proxy which points back to the
@@ -63,6 +76,25 @@ void notify_message::process() {
}
}
+void data_message::
+send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source,
+ rust_proxy<rust_task> *target, rust_proxy<rust_port> *port) {
+
+ rust_task *target_task = target->delegate();
+ rust_port *target_port = port->delegate();
+ rust_dom *target_domain = target_task->dom;
+ data_message *message = new (target_domain)
+ data_message(buffer, buffer_sz, label, source,
+ target_task, target_port);
+ target_domain->send_message(message);
+}
+
+void data_message::process() {
+ _port->remote_channel->buffer.enqueue(_buffer);
+ _port->remote_channel->transmit();
+ _target->log(rust_log::COMM, "<=== received data via message ===");
+}
+
//
// Local Variables:
// mode: C++
diff --git a/src/rt/rust_message.h b/src/rt/rust_message.h
index f0c6bc0a..b7b8568a 100644
--- a/src/rt/rust_message.h
+++ b/src/rt/rust_message.h
@@ -58,6 +58,30 @@ public:
rust_proxy<rust_task> *target);
};
+/**
+ * Data messages carry a buffer.
+ */
+class data_message : public rust_message {
+private:
+ uint8_t *_buffer;
+ size_t _buffer_sz;
+ rust_port *_port;
+public:
+
+ data_message(uint8_t *buffer, size_t buffer_sz, const char* label,
+ rust_task *source, rust_task *target, rust_port *port);
+ ~data_message();
+ void process();
+
+ /**
+ * This code executes in the sending domain's thread.
+ */
+ static void
+ send(uint8_t *buffer, size_t buffer_sz, const char* label,
+ rust_task *source, rust_proxy<rust_task> *target,
+ rust_proxy<rust_port> *port);
+};
+
//
// Local Variables:
// mode: C++
diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp
new file mode 100644
index 00000000..0a5b7ee7
--- /dev/null
+++ b/src/rt/rust_port.cpp
@@ -0,0 +1,39 @@
+#include "rust_internal.h"
+#include "rust_port.h"
+
+rust_port::rust_port(rust_task *task, size_t unit_sz) :
+ maybe_proxy<rust_port>(this), task(task), unit_sz(unit_sz),
+ writers(task->dom), chans(task->dom) {
+
+ task->log(rust_log::MEM | rust_log::COMM,
+ "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%"
+ PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this);
+
+ // Allocate a remote channel, for remote channel data.
+ remote_channel = new (task->dom) rust_chan(task, this);
+}
+
+rust_port::~rust_port() {
+ task->log(rust_log::COMM | rust_log::MEM,
+ "~rust_port 0x%" PRIxPTR, (uintptr_t) this);
+
+ // Disassociate channels from this port.
+ while (chans.is_empty() == false) {
+ chans.pop()->disassociate();
+ }
+
+ // We're the only ones holding a reference to the remote channel, so
+ // clean it up.
+ delete remote_channel;
+}
+
+//
+// Local Variables:
+// mode: C++
+// fill-column: 78;
+// indent-tabs-mode: nil
+// c-basic-offset: 4
+// buffer-file-coding-system: utf-8-unix
+// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
+// End:
+//
diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h
new file mode 100644
index 00000000..49a89437
--- /dev/null
+++ b/src/rt/rust_port.h
@@ -0,0 +1,31 @@
+#ifndef RUST_PORT_H
+#define RUST_PORT_H
+
+class rust_port : public maybe_proxy<rust_port>,
+ public task_owned<rust_port> {
+
+public:
+ rust_task *task;
+ size_t unit_sz;
+ ptr_vec<rust_token> writers;
+ ptr_vec<rust_chan> chans;
+
+ // Data sent to this port from remote tasks is buffered in this channel.
+ rust_chan *remote_channel;
+
+ rust_port(rust_task *task, size_t unit_sz);
+ ~rust_port();
+};
+
+//
+// Local Variables:
+// mode: C++
+// fill-column: 78;
+// indent-tabs-mode: nil
+// c-basic-offset: 4
+// buffer-file-coding-system: utf-8-unix
+// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
+// End:
+//
+
+#endif /* RUST_PORT_H */
diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp
index b3caac25..7140692c 100644
--- a/src/rt/rust_task.cpp
+++ b/src/rt/rust_task.cpp
@@ -64,7 +64,6 @@ rust_task::rust_task(rust_dom *dom, rust_task *spawner) :
cond(NULL),
supervisor(spawner),
idx(0),
- waiting_tasks(dom),
rendezvous_ptr(0),
alarm(this)
{
@@ -373,19 +372,6 @@ rust_task::unsupervise()
}
void
-rust_task::notify_waiting_tasks()
-{
- while (waiting_tasks.length() > 0) {
- log(rust_log::ALL, "notify_waiting_tasks: %d",
- waiting_tasks.length());
- rust_task *waiting_task = waiting_tasks.pop()->receiver;
- if (!waiting_task->dead()) {
- waiting_task->wakeup(this);
- }
- }
-}
-
-void
rust_task::notify_tasks_waiting_to_join() {
while (tasks_waiting_to_join.is_empty() == false) {
log(rust_log::ALL, "notify_tasks_waiting_to_join: %d",
diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h
index 34553b6c..b657592a 100644
--- a/src/rt/rust_task.h
+++ b/src/rt/rust_task.h
@@ -28,9 +28,6 @@ rust_task : public maybe_proxy<rust_task>,
size_t gc_alloc_thresh;
size_t gc_alloc_accum;
- // Wait queue for tasks waiting for this task.
- rust_wait_queue waiting_tasks;
-
// Rendezvous pointer for receiving data when blocked on a port. If we're
// trying to read data and no data is available on any incoming channel,
// we block on the port, and yield control to the scheduler. Since, we
@@ -101,7 +98,6 @@ rust_task : public maybe_proxy<rust_task>,
void unsupervise();
// Notify tasks waiting for us that we are about to die.
- void notify_waiting_tasks();
void notify_tasks_waiting_to_join();
uintptr_t get_fp();
diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp
index 42203bec..574cb703 100644
--- a/src/rt/rust_upcall.cpp
+++ b/src/rt/rust_upcall.cpp
@@ -21,11 +21,6 @@
extern "C" CDECL char const *str_buf(rust_task *task, rust_str *s);
-inline bool
-requires_message_passing(rust_task *sender, rust_task *receiver) {
- return sender->dom != receiver->dom;
-}
-
extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) {
LOG_UPCALL_ENTRY(task);
task->grow(n_frame_bytes);
@@ -96,6 +91,18 @@ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
"upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
I(dom, !chan->ref_count);
+
+ if (!chan->buffer.is_empty() && chan->is_associated()) {
+ A(dom, !chan->port->is_proxy(),
+ "Channels to remote ports should be flushed automatically.");
+ // A target port may still be reading from this channel.
+ // Block on this channel until it has been completely drained
+ // by the port.
+ task->block(chan);
+ task->yield(2);
+ return;
+ }
+
delete chan;
}
@@ -105,14 +112,19 @@ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
*/
extern "C" CDECL rust_chan *
upcall_clone_chan(rust_task *task,
- maybe_proxy<rust_task> *spawnee_proxy,
+ maybe_proxy<rust_task> *target,
rust_chan *chan) {
LOG_UPCALL_ENTRY(task);
- rust_task *spawnee = spawnee_proxy->delegate();
- task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
- "spawnee: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR,
- (uintptr_t) spawnee, (uintptr_t) chan);
- return new (spawnee->dom) rust_chan(spawnee, chan->port);
+ task->log(rust_log::UPCALL | rust_log::COMM,
+ "target: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR,
+ target, chan);
+ rust_task *target_task = target->delegate();
+ maybe_proxy<rust_port> *port = chan->port;
+ if (target->is_proxy()) {
+ port = target_task->dom->get_port_proxy_synchronized(
+ chan->port->as_delegate());
+ }
+ return new (target_task->dom) rust_chan(target_task, port);
}
extern "C" CDECL void upcall_yield(rust_task *task) {
@@ -145,20 +157,20 @@ upcall_join(rust_task *task, maybe_proxy<rust_task> *target) {
}
/**
- * Sends an chunk of data along the specified channel.
+ * Buffers a chunk of data in the specified channel.
*
- * sptr: pointer to a chunk of data to send
+ * sptr: pointer to a chunk of data to buffer
*/
extern "C" CDECL void
upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
LOG_UPCALL_ENTRY(task);
task->log(rust_log::UPCALL | rust_log::COMM,
"chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
- (uintptr_t) chan, (uintptr_t) sptr, chan->port->unit_sz);
-
+ (uintptr_t) chan, (uintptr_t) sptr,
+ chan->port->delegate()->unit_sz);
chan->buffer.enqueue(sptr);
chan->transmit();
- task->log(rust_log::COMM, "=== WROTE DATA ===>");
+ task->log(rust_log::COMM, "=== sent data ===>");
}
extern "C" CDECL void
@@ -174,7 +186,11 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
rust_chan *chan = port->chans[i];
if (chan->buffer.is_empty() == false) {
chan->buffer.dequeue(dptr);
- task->log(rust_log::COMM, "<=== READ DATA ===");
+ if (chan->buffer.is_empty() && chan->task->blocked()) {
+ chan->task->wakeup(chan);
+ delete chan;
+ }
+ task->log(rust_log::COMM, "<=== read data ===");
return;
}
}
@@ -183,6 +199,7 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
// on the port. Remember the rendezvous location so that any sender
// task can write to it before waking up this task.
+ task->log(rust_log::COMM, "<=== waiting for rendezvous data ===");
task->rendezvous_ptr = dptr;
task->block(port);
task->yield(3);