diff options
| author | Michael Bebenita <[email protected]> | 2010-07-28 16:46:13 -0700 |
|---|---|---|
| committer | Graydon Hoare <[email protected]> | 2010-07-28 20:30:29 -0700 |
| commit | 4246d567b7a999155524669e3b0419e8f71080b1 (patch) | |
| tree | 38ab4d3aa04b25e37db9e663890235edf2f3a8a6 /src/rt | |
| parent | Move notification-messages out into their own file and unify into notify_mess... (diff) | |
| download | rust-4246d567b7a999155524669e3b0419e8f71080b1.tar.xz rust-4246d567b7a999155524669e3b0419e8f71080b1.zip | |
Move ports out into their own file, add data_message and make communication system use it (and proxies) instead of existing token scheme.
Diffstat (limited to 'src/rt')
| -rw-r--r-- | src/rt/rust_chan.cpp | 81 | ||||
| -rw-r--r-- | src/rt/rust_chan.h | 19 | ||||
| -rw-r--r-- | src/rt/rust_comm.cpp | 74 | ||||
| -rw-r--r-- | src/rt/rust_dom.cpp | 31 | ||||
| -rw-r--r-- | src/rt/rust_dom.h | 2 | ||||
| -rw-r--r-- | src/rt/rust_internal.h | 30 | ||||
| -rw-r--r-- | src/rt/rust_message.cpp | 32 | ||||
| -rw-r--r-- | src/rt/rust_message.h | 24 | ||||
| -rw-r--r-- | src/rt/rust_port.cpp | 39 | ||||
| -rw-r--r-- | src/rt/rust_port.h | 31 | ||||
| -rw-r--r-- | src/rt/rust_task.cpp | 14 | ||||
| -rw-r--r-- | src/rt/rust_task.h | 4 | ||||
| -rw-r--r-- | src/rt/rust_upcall.cpp | 51 |
13 files changed, 254 insertions, 178 deletions
diff --git a/src/rt/rust_chan.cpp b/src/rt/rust_chan.cpp index 43744a11..f107d287 100644 --- a/src/rt/rust_chan.cpp +++ b/src/rt/rust_chan.cpp @@ -1,12 +1,14 @@ #include "rust_internal.h" #include "rust_chan.h" -rust_chan::rust_chan(rust_task *task, rust_port *port) : - task(task), port(port), buffer(task->dom, port->unit_sz), token(this) { +/** + * Create a new rust channel and associate it with the specified port. + */ +rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) : + task(task), port(port), buffer(task->dom, port->delegate()->unit_sz) { if (port) { - port->chans.push(this); - ref(); + associate(port); } task->log(rust_log::MEM | rust_log::COMM, @@ -16,49 +18,68 @@ rust_chan::rust_chan(rust_task *task, rust_port *port) : } rust_chan::~rust_chan() { - if (port) { - if (token.pending()) - token.withdraw(); - port->chans.swap_delete(this); + if (port && !port->is_proxy()) { + port->delegate()->chans.swap_delete(this); } } -void rust_chan::disassociate() { - I(task->dom, port); +/** + * Link this channel with the specified port. + */ +void rust_chan::associate(maybe_proxy<rust_port> *port) { + this->port = port; + if (!port->is_proxy()) { + this->port->delegate()->chans.push(this); + } +} - if (token.pending()) - token.withdraw(); +bool rust_chan::is_associated() { + return port != NULL; +} - // Delete reference to the port/ - port = NULL; +/** + * Unlink this channel from its associated port. + */ +void rust_chan::disassociate() { + A(task->dom, is_associated(), "Channel must be associated with a port."); - deref(); + // Delete reference to the port. + port = NULL; } /** * Attempt to transmit channel data to the associated port. */ -int rust_chan::transmit() { +void rust_chan::transmit() { rust_dom *dom = task->dom; - - // TODO: Figure out how and why the port would become null. - if (port == NULL) { - dom->log(rust_log::COMM, "invalid port, transmission incomplete"); - return ERROR; + if (!is_associated()) { + W(dom, is_associated(), + "rust_chan::transmit with no associated port."); + return; } - if (buffer.is_empty()) { - dom->log(rust_log::COMM, "buffer is empty, transmission incomplete"); - return ERROR; - } + A(dom, !buffer.is_empty(), + "rust_chan::transmit with nothing to send."); - if(port->task->blocked_on(port)) { - buffer.dequeue(port->task->rendezvous_ptr); - port->task->wakeup(port); + if (port->is_proxy()) { + // TODO: Cache port task locally. + rust_proxy<rust_task> *port_task = + dom->get_task_proxy(port->delegate()->task); + data_message::send(buffer.peek(), buffer.unit_sz, + "send data", task, port_task, port->as_proxy()); + buffer.dequeue(NULL); + } else { + rust_port *target_port = port->delegate(); + if (target_port->task->blocked_on(target_port)) { + dom->log(rust_log::COMM, "dequeued in rendezvous_ptr"); + buffer.dequeue(target_port->task->rendezvous_ptr); + target_port->task->rendezvous_ptr = 0; + target_port->task->wakeup(target_port); + return; + } } - return 0; - + return; } // diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 1d24ee78..055e359a 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -1,24 +1,23 @@ - #ifndef RUST_CHAN_H #define RUST_CHAN_H -class rust_chan : public rc_base<rust_chan>, public task_owned<rust_chan> { +class rust_chan : public rc_base<rust_chan>, + public task_owned<rust_chan>, + public rust_cond { public: - rust_chan(rust_task *task, rust_port *port); + rust_chan(rust_task *task, maybe_proxy<rust_port> *port); ~rust_chan(); rust_task *task; - rust_port *port; + maybe_proxy<rust_port> *port; + size_t idx; circular_buffer buffer; - size_t idx; // Index into port->chans. - - // Token belonging to this chan, it will be placed into a port's - // writers vector if we have something to send to the port. - rust_token token; + void associate(maybe_proxy<rust_port> *port); void disassociate(); + bool is_associated(); - int transmit(); + void transmit(); }; // diff --git a/src/rt/rust_comm.cpp b/src/rt/rust_comm.cpp index 37929b8b..3fbaaa04 100644 --- a/src/rt/rust_comm.cpp +++ b/src/rt/rust_comm.cpp @@ -1,87 +1,13 @@ #include "rust_internal.h" -template class ptr_vec<rust_token>; template class ptr_vec<rust_alarm>; -template class ptr_vec<rust_chan>; rust_alarm::rust_alarm(rust_task *receiver) : receiver(receiver) { } -// Ports. - -rust_port::rust_port(rust_task *task, size_t unit_sz) : - task(task), - unit_sz(unit_sz), - writers(task->dom), - chans(task->dom) -{ - task->log(rust_log::MEM|rust_log::COMM, - "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%" - PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this); -} - -rust_port::~rust_port() -{ - task->log(rust_log::COMM|rust_log::MEM, - "~rust_port 0x%" PRIxPTR, - (uintptr_t)this); - while (chans.length() > 0) - chans.pop()->disassociate(); -} - - -// Tokens. - -rust_token::rust_token(rust_chan *chan) : - chan(chan), - idx(0), - submitted(false) -{ -} - -rust_token::~rust_token() -{ -} - -bool -rust_token::pending() const -{ - return submitted; -} - -void -rust_token::submit() -{ - rust_port *port = chan->port; - rust_dom *dom = chan->task->dom; - - I(dom, port); - I(dom, !submitted); - - port->writers.push(this); - submitted = true; -} - -void -rust_token::withdraw() -{ - rust_task *task = chan->task; - rust_port *port = chan->port; - rust_dom *dom = task->dom; - - I(dom, port); - I(dom, submitted); - - if (task->blocked()) - task->wakeup(this); // must be blocked on us (or dead) - port->writers.swap_delete(this); - submitted = false; -} - - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index 4d1917f4..0c175e78 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -46,6 +46,14 @@ rust_dom::delete_proxies() { " in dom %" PRIxPTR, task_proxy, task_proxy->dom); delete task_proxy; } + + rust_port *port; + rust_proxy<rust_port> *port_proxy; + while (_port_proxies.pop(&port, &port_proxy)) { + log(rust_log::TASK, "deleting proxy %" PRIxPTR + " in dom %" PRIxPTR, port_proxy, port_proxy->dom); + delete port_proxy; + } } rust_dom::~rust_dom() { @@ -217,7 +225,6 @@ rust_dom::reap_dead_tasks() { for (size_t i = 0; i < dead_tasks.length(); ) { rust_task *task = dead_tasks[i]; if (task->ref_count == 0) { - I(this, !task->waiting_tasks.length()); I(this, task->tasks_waiting_to_join.is_empty()); dead_tasks.swap_delete(task); @@ -270,6 +277,28 @@ rust_dom::get_task_proxy(rust_task *task) { _task_proxies.put(task, proxy); return proxy; } + +/** + * Gets a proxy for this port. + * + * TODO: This method needs to be synchronized since it's usually called + * during upcall_clone_chan in a different thread. However, for now + * since this usually happens before the thread actually starts, + * we may get lucky without synchronizing. + * + */ +rust_proxy<rust_port> * +rust_dom::get_port_proxy_synchronized(rust_port *port) { + rust_proxy<rust_port> *proxy = NULL; + if (_port_proxies.get(port, &proxy)) { + return proxy; + } + log(rust_log::COMM, "no proxy for 0x%" PRIxPTR, port); + proxy = new (this) rust_proxy<rust_port> (this, port, false); + _port_proxies.put(port, proxy); + return proxy; +} + /** * Schedules a running task for execution. Only running tasks can be * activated. Blocked tasks have to be unblocked before they can be diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h index 2f162972..528790d5 100644 --- a/src/rt/rust_dom.h +++ b/src/rt/rust_dom.h @@ -37,6 +37,7 @@ struct rust_dom condition_variable _progress; hash_map<rust_task *, rust_proxy<rust_task> *> _task_proxies; + hash_map<rust_port *, rust_proxy<rust_port> *> _port_proxies; // Incoming messages from other domains. condition_variable _incoming_message_pending; @@ -66,6 +67,7 @@ struct rust_dom void drain_incoming_message_queue(); rust_proxy<rust_task> *get_task_proxy(rust_task *task); void delete_proxies(); + rust_proxy<rust_port> *get_port_proxy_synchronized(rust_port *port); #ifdef __WIN32__ void win32_require(LPCTSTR fn, BOOL ok); diff --git a/src/rt/rust_internal.h b/src/rt/rust_internal.h index bc5835fe..a89144d7 100644 --- a/src/rt/rust_internal.h +++ b/src/rt/rust_internal.h @@ -580,37 +580,11 @@ struct gc_alloc { } }; +#include "circular_buffer.h" #include "rust_proxy.h" #include "rust_task.h" - -struct rust_port : public rc_base<rust_port>, - public task_owned<rust_port>, - public rust_cond { - rust_task *task; - size_t unit_sz; - ptr_vec<rust_token> writers; - ptr_vec<rust_chan> chans; - - rust_port(rust_task *task, size_t unit_sz); - ~rust_port(); -}; - -struct rust_token : public rust_cond { - rust_chan *chan; // Link back to the channel this token belongs to - size_t idx; // Index into port->writers. - bool submitted; // Whether token is in a port->writers. - - rust_token(rust_chan *chan); - ~rust_token(); - - bool pending() const; - void submit(); - void withdraw(); -}; - -#include "circular_buffer.h" - #include "rust_chan.h" +#include "rust_port.h" // // Local Variables: diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp index 22f8ffe5..8b396b4d 100644 --- a/src/rt/rust_message.cpp +++ b/src/rt/rust_message.cpp @@ -27,6 +27,19 @@ notify_message(notification_type type, const char* label, rust_message(label, source, target), type(type) { } +data_message:: +data_message(uint8_t *buffer, size_t buffer_sz, const char* label, + rust_task *source, rust_task *target, rust_port *port) : + rust_message(label, source, target), + _buffer_sz(buffer_sz), _port(port) { + _buffer = (uint8_t *)malloc(buffer_sz); + memcpy(_buffer, buffer, buffer_sz); +} + +data_message::~data_message() { + free (_buffer); +} + /** * Sends a message to the target task via a proxy. The message is allocated * in the target task domain along with a proxy which points back to the @@ -63,6 +76,25 @@ void notify_message::process() { } } +void data_message:: +send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source, + rust_proxy<rust_task> *target, rust_proxy<rust_port> *port) { + + rust_task *target_task = target->delegate(); + rust_port *target_port = port->delegate(); + rust_dom *target_domain = target_task->dom; + data_message *message = new (target_domain) + data_message(buffer, buffer_sz, label, source, + target_task, target_port); + target_domain->send_message(message); +} + +void data_message::process() { + _port->remote_channel->buffer.enqueue(_buffer); + _port->remote_channel->transmit(); + _target->log(rust_log::COMM, "<=== received data via message ==="); +} + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_message.h b/src/rt/rust_message.h index f0c6bc0a..b7b8568a 100644 --- a/src/rt/rust_message.h +++ b/src/rt/rust_message.h @@ -58,6 +58,30 @@ public: rust_proxy<rust_task> *target); }; +/** + * Data messages carry a buffer. + */ +class data_message : public rust_message { +private: + uint8_t *_buffer; + size_t _buffer_sz; + rust_port *_port; +public: + + data_message(uint8_t *buffer, size_t buffer_sz, const char* label, + rust_task *source, rust_task *target, rust_port *port); + ~data_message(); + void process(); + + /** + * This code executes in the sending domain's thread. + */ + static void + send(uint8_t *buffer, size_t buffer_sz, const char* label, + rust_task *source, rust_proxy<rust_task> *target, + rust_proxy<rust_port> *port); +}; + // // Local Variables: // mode: C++ diff --git a/src/rt/rust_port.cpp b/src/rt/rust_port.cpp new file mode 100644 index 00000000..0a5b7ee7 --- /dev/null +++ b/src/rt/rust_port.cpp @@ -0,0 +1,39 @@ +#include "rust_internal.h" +#include "rust_port.h" + +rust_port::rust_port(rust_task *task, size_t unit_sz) : + maybe_proxy<rust_port>(this), task(task), unit_sz(unit_sz), + writers(task->dom), chans(task->dom) { + + task->log(rust_log::MEM | rust_log::COMM, + "new rust_port(task=0x%" PRIxPTR ", unit_sz=%d) -> port=0x%" + PRIxPTR, (uintptr_t)task, unit_sz, (uintptr_t)this); + + // Allocate a remote channel, for remote channel data. + remote_channel = new (task->dom) rust_chan(task, this); +} + +rust_port::~rust_port() { + task->log(rust_log::COMM | rust_log::MEM, + "~rust_port 0x%" PRIxPTR, (uintptr_t) this); + + // Disassociate channels from this port. + while (chans.is_empty() == false) { + chans.pop()->disassociate(); + } + + // We're the only ones holding a reference to the remote channel, so + // clean it up. + delete remote_channel; +} + +// +// Local Variables: +// mode: C++ +// fill-column: 78; +// indent-tabs-mode: nil +// c-basic-offset: 4 +// buffer-file-coding-system: utf-8-unix +// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'"; +// End: +// diff --git a/src/rt/rust_port.h b/src/rt/rust_port.h new file mode 100644 index 00000000..49a89437 --- /dev/null +++ b/src/rt/rust_port.h @@ -0,0 +1,31 @@ +#ifndef RUST_PORT_H +#define RUST_PORT_H + +class rust_port : public maybe_proxy<rust_port>, + public task_owned<rust_port> { + +public: + rust_task *task; + size_t unit_sz; + ptr_vec<rust_token> writers; + ptr_vec<rust_chan> chans; + + // Data sent to this port from remote tasks is buffered in this channel. + rust_chan *remote_channel; + + rust_port(rust_task *task, size_t unit_sz); + ~rust_port(); +}; + +// +// Local Variables: +// mode: C++ +// fill-column: 78; +// indent-tabs-mode: nil +// c-basic-offset: 4 +// buffer-file-coding-system: utf-8-unix +// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'"; +// End: +// + +#endif /* RUST_PORT_H */ diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index b3caac25..7140692c 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -64,7 +64,6 @@ rust_task::rust_task(rust_dom *dom, rust_task *spawner) : cond(NULL), supervisor(spawner), idx(0), - waiting_tasks(dom), rendezvous_ptr(0), alarm(this) { @@ -373,19 +372,6 @@ rust_task::unsupervise() } void -rust_task::notify_waiting_tasks() -{ - while (waiting_tasks.length() > 0) { - log(rust_log::ALL, "notify_waiting_tasks: %d", - waiting_tasks.length()); - rust_task *waiting_task = waiting_tasks.pop()->receiver; - if (!waiting_task->dead()) { - waiting_task->wakeup(this); - } - } -} - -void rust_task::notify_tasks_waiting_to_join() { while (tasks_waiting_to_join.is_empty() == false) { log(rust_log::ALL, "notify_tasks_waiting_to_join: %d", diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 34553b6c..b657592a 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -28,9 +28,6 @@ rust_task : public maybe_proxy<rust_task>, size_t gc_alloc_thresh; size_t gc_alloc_accum; - // Wait queue for tasks waiting for this task. - rust_wait_queue waiting_tasks; - // Rendezvous pointer for receiving data when blocked on a port. If we're // trying to read data and no data is available on any incoming channel, // we block on the port, and yield control to the scheduler. Since, we @@ -101,7 +98,6 @@ rust_task : public maybe_proxy<rust_task>, void unsupervise(); // Notify tasks waiting for us that we are about to die. - void notify_waiting_tasks(); void notify_tasks_waiting_to_join(); uintptr_t get_fp(); diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index 42203bec..574cb703 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -21,11 +21,6 @@ extern "C" CDECL char const *str_buf(rust_task *task, rust_str *s); -inline bool -requires_message_passing(rust_task *sender, rust_task *receiver) { - return sender->dom != receiver->dom; -} - extern "C" void upcall_grow_task(rust_task *task, size_t n_frame_bytes) { LOG_UPCALL_ENTRY(task); task->grow(n_frame_bytes); @@ -96,6 +91,18 @@ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, "upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan); I(dom, !chan->ref_count); + + if (!chan->buffer.is_empty() && chan->is_associated()) { + A(dom, !chan->port->is_proxy(), + "Channels to remote ports should be flushed automatically."); + // A target port may still be reading from this channel. + // Block on this channel until it has been completely drained + // by the port. + task->block(chan); + task->yield(2); + return; + } + delete chan; } @@ -105,14 +112,19 @@ extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) { */ extern "C" CDECL rust_chan * upcall_clone_chan(rust_task *task, - maybe_proxy<rust_task> *spawnee_proxy, + maybe_proxy<rust_task> *target, rust_chan *chan) { LOG_UPCALL_ENTRY(task); - rust_task *spawnee = spawnee_proxy->delegate(); - task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM, - "spawnee: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR, - (uintptr_t) spawnee, (uintptr_t) chan); - return new (spawnee->dom) rust_chan(spawnee, chan->port); + task->log(rust_log::UPCALL | rust_log::COMM, + "target: 0x%" PRIxPTR ", chan: 0x%" PRIxPTR, + target, chan); + rust_task *target_task = target->delegate(); + maybe_proxy<rust_port> *port = chan->port; + if (target->is_proxy()) { + port = target_task->dom->get_port_proxy_synchronized( + chan->port->as_delegate()); + } + return new (target_task->dom) rust_chan(target_task, port); } extern "C" CDECL void upcall_yield(rust_task *task) { @@ -145,20 +157,20 @@ upcall_join(rust_task *task, maybe_proxy<rust_task> *target) { } /** - * Sends an chunk of data along the specified channel. + * Buffers a chunk of data in the specified channel. * - * sptr: pointer to a chunk of data to send + * sptr: pointer to a chunk of data to buffer */ extern "C" CDECL void upcall_send(rust_task *task, rust_chan *chan, void *sptr) { LOG_UPCALL_ENTRY(task); task->log(rust_log::UPCALL | rust_log::COMM, "chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d", - (uintptr_t) chan, (uintptr_t) sptr, chan->port->unit_sz); - + (uintptr_t) chan, (uintptr_t) sptr, + chan->port->delegate()->unit_sz); chan->buffer.enqueue(sptr); chan->transmit(); - task->log(rust_log::COMM, "=== WROTE DATA ===>"); + task->log(rust_log::COMM, "=== sent data ===>"); } extern "C" CDECL void @@ -174,7 +186,11 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { rust_chan *chan = port->chans[i]; if (chan->buffer.is_empty() == false) { chan->buffer.dequeue(dptr); - task->log(rust_log::COMM, "<=== READ DATA ==="); + if (chan->buffer.is_empty() && chan->task->blocked()) { + chan->task->wakeup(chan); + delete chan; + } + task->log(rust_log::COMM, "<=== read data ==="); return; } } @@ -183,6 +199,7 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) { // on the port. Remember the rendezvous location so that any sender // task can write to it before waking up this task. + task->log(rust_log::COMM, "<=== waiting for rendezvous data ==="); task->rendezvous_ptr = dptr; task->block(port); task->yield(3); |