From 4ff8e15128f90d4e9e57897c48280c6f82bb8343 Mon Sep 17 00:00:00 2001 From: Michael Bebenita Date: Wed, 28 Jul 2010 16:24:50 -0700 Subject: Move notification-messages out into their own file and unify into notify_message, make them use proxies, cache task proxies in dom. --- src/rt/rust_dom.cpp | 68 +++++++++++++++++++++++++------------------- src/rt/rust_dom.h | 22 +++++---------- src/rt/rust_message.cpp | 75 +++++++++++++++++++++++++++++++++++++++++++++++++ src/rt/rust_message.h | 72 +++++++++++++++++++++++++++++++++++++++++++++++ src/rt/rust_task.cpp | 18 ++++++++++++ src/rt/rust_task.h | 8 ++++++ src/rt/rust_upcall.cpp | 46 ++++++++++++++++-------------- 7 files changed, 244 insertions(+), 65 deletions(-) create mode 100644 src/rt/rust_message.cpp create mode 100644 src/rt/rust_message.h (limited to 'src/rt') diff --git a/src/rt/rust_dom.cpp b/src/rt/rust_dom.cpp index 17cb1498..4d1917f4 100644 --- a/src/rt/rust_dom.cpp +++ b/src/rt/rust_dom.cpp @@ -4,23 +4,6 @@ template class ptr_vec; -rust_message::rust_message(rust_dom *dom) : dom(dom) { - -} - -void rust_message::process() { - -} - -kill_task_message::kill_task_message(rust_dom *dom, rust_task *task) : - rust_message(dom), _task(task) { - -} - -void kill_task_message::process() { - _task->ref_count--; - _task->kill(); -} rust_dom::rust_dom(rust_srv *srv, rust_crate const *root_crate) : interrupt_flag(0), @@ -54,7 +37,23 @@ del_all_tasks(rust_dom *dom, ptr_vec *v) { } } +void +rust_dom::delete_proxies() { + rust_task *task; + rust_proxy *task_proxy; + while (_task_proxies.pop(&task, &task_proxy)) { + log(rust_log::TASK, "deleting proxy %" PRIxPTR + " in dom %" PRIxPTR, task_proxy, task_proxy->dom); + delete task_proxy; + } +} + rust_dom::~rust_dom() { + log(rust_log::MEM | rust_log::DOM, + "~rust_dom 0x%" PRIxPTR, (uintptr_t)this); + + log(rust_log::TASK, "deleting all proxies"); + delete_proxies(); log(rust_log::TASK, "deleting all running tasks"); del_all_tasks(this, &running_tasks); log(rust_log::TASK, "deleting all blocked tasks"); @@ -126,8 +125,8 @@ void * rust_dom::malloc(size_t sz) { void *p = srv->malloc(sz); I(this, p); - log(rust_log::MEM, "rust_dom::malloc(%d) -> 0x%" PRIxPTR, - sz, p); + log(rust_log::MEM, "0x%" PRIxPTR " rust_dom::malloc(%d) -> 0x%" PRIxPTR, + (uintptr_t) this, sz, p); return p; } @@ -219,6 +218,8 @@ rust_dom::reap_dead_tasks() { rust_task *task = dead_tasks[i]; if (task->ref_count == 0) { I(this, !task->waiting_tasks.length()); + I(this, task->tasks_waiting_to_join.is_empty()); + dead_tasks.swap_delete(task); log(rust_log::TASK, "deleting unreferenced dead task 0x%" PRIxPTR, task); @@ -229,18 +230,20 @@ rust_dom::reap_dead_tasks() { } } - /** * Enqueues a message in this domain's incoming message queue. It's the * responsibility of the receiver to free the message once it's processed. */ void rust_dom::send_message(rust_message *message) { - log(rust_log::COMM, "enqueueing message 0x%" PRIxPTR + log(rust_log::COMM, "==> enqueueing \"%s\" 0x%" PRIxPTR " in queue 0x%" PRIxPTR, + message->label, message, &_incoming_message_queue); + A(this, message->dom == this, "Message owned by non-local domain."); _incoming_message_queue.enqueue(message); _incoming_message_pending.signal(); + _progress.signal(); } /** @@ -249,17 +252,24 @@ void rust_dom::send_message(rust_message *message) { void rust_dom::drain_incoming_message_queue() { rust_message *message; while ((message = (rust_message *) _incoming_message_queue.dequeue())) { - log(rust_log::COMM, "read 0x%" PRIxPTR - " from queue 0x%" PRIxPTR, - message, - &_incoming_message_queue); - log(rust_log::COMM, "processing incoming message 0x%" PRIxPTR, - message); + log(rust_log::COMM, "<== processing incoming message \"%s\" 0x%" + PRIxPTR, message->label, message); message->process(); delete message; } } +rust_proxy * +rust_dom::get_task_proxy(rust_task *task) { + rust_proxy *proxy = NULL; + if (_task_proxies.get(task, &proxy)) { + return proxy; + } + log(rust_log::COMM, "no proxy for 0x%" PRIxPTR, task); + proxy = new (this) rust_proxy (this, task, false); + _task_proxies.put(task, proxy); + return proxy; +} /** * Schedules a running task for execution. Only running tasks can be * activated. Blocked tasks have to be unblocked before they can be @@ -324,6 +334,8 @@ rust_dom::start_main_loop() logptr("exit-task glue", root_crate->get_exit_task_glue()); while (n_live_tasks() > 0) { + drain_incoming_message_queue(); + rust_task *scheduled_task = schedule_task(); // If we cannot schedule a task because all other live tasks @@ -362,8 +374,6 @@ rust_dom::start_main_loop() (uintptr_t) &scheduled_task->stk->data[0]); I(this, scheduled_task->rust_sp < scheduled_task->stk->limit); - drain_incoming_message_queue(); - reap_dead_tasks(); } diff --git a/src/rt/rust_dom.h b/src/rt/rust_dom.h index 8247cbbd..2f162972 100644 --- a/src/rt/rust_dom.h +++ b/src/rt/rust_dom.h @@ -6,22 +6,10 @@ #define RUST_DOM_H #include "sync/lock_free_queue.h" +#include "util/hash_map.h" -class rust_message : public lock_free_queue_node, - public dom_owned { -public: - rust_dom *dom; - rust_message(rust_dom *dom); - virtual ~rust_message() {} - virtual void process(); -}; - -class kill_task_message : public rust_message { - rust_task *_task; -public: - kill_task_message(rust_dom *dom, rust_task *task); - void process(); -}; +#include "rust_proxy.h" +#include "rust_message.h" struct rust_dom { @@ -48,6 +36,8 @@ struct rust_dom condition_variable _progress; + hash_map *> _task_proxies; + // Incoming messages from other domains. condition_variable _incoming_message_pending; lock_free_queue _incoming_message_queue; @@ -74,6 +64,8 @@ struct rust_dom void send_message(rust_message *message); void drain_incoming_message_queue(); + rust_proxy *get_task_proxy(rust_task *task); + void delete_proxies(); #ifdef __WIN32__ void win32_require(LPCTSTR fn, BOOL ok); diff --git a/src/rt/rust_message.cpp b/src/rt/rust_message.cpp new file mode 100644 index 00000000..22f8ffe5 --- /dev/null +++ b/src/rt/rust_message.cpp @@ -0,0 +1,75 @@ +#include "rust_internal.h" +#include "rust_message.h" + +rust_message:: +rust_message(const char* label, rust_task *source, rust_task *target) : + dom(target->dom), label(label), + _source(source), + _target(target) { +} + +rust_message::~rust_message() { +} + +void rust_message::process() { + I(dom, false); +} + +rust_proxy * +rust_message::get_source_proxy() { + return dom->get_task_proxy(_source); +} + +notify_message:: +notify_message(notification_type type, const char* label, + rust_task *source, + rust_task *target) : + rust_message(label, source, target), type(type) { +} + +/** + * Sends a message to the target task via a proxy. The message is allocated + * in the target task domain along with a proxy which points back to the + * source task. + */ +void notify_message:: +send(notification_type type, const char* label, rust_task *source, + rust_proxy *target) { + rust_task *target_task = target->delegate(); + rust_dom *target_domain = target_task->dom; + notify_message *message = new (target_domain) + notify_message(type, label, source, target_task); + target_domain->send_message(message); +} + +void notify_message::process() { + rust_task *task = _target; + switch (type) { + case KILL: + task->ref_count--; + task->kill(); + break; + case JOIN: { + if (task->dead() == false) { + task->tasks_waiting_to_join.append(get_source_proxy()); + } else { + send(WAKEUP, "wakeup", task, get_source_proxy()); + } + break; + } + case WAKEUP: + task->wakeup(get_source_proxy()->delegate()); + break; + } +} + +// +// Local Variables: +// mode: C++ +// fill-column: 78; +// indent-tabs-mode: nil +// c-basic-offset: 4 +// buffer-file-coding-system: utf-8-unix +// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'"; +// End: +// diff --git a/src/rt/rust_message.h b/src/rt/rust_message.h new file mode 100644 index 00000000..f0c6bc0a --- /dev/null +++ b/src/rt/rust_message.h @@ -0,0 +1,72 @@ +#ifndef RUST_MESSAGE_H +#define RUST_MESSAGE_H + +/** + * Rust messages are used for inter-thread communication. They are enqueued + * and allocated in the target domain. + */ + +/** + * Abstract base class for all message types. + */ +class rust_message : public lock_free_queue_node, + public dom_owned { +public: + rust_dom *dom; + const char* label; +private: + rust_task *_source; +protected: + rust_task *_target; +public: + rust_message(const char* label, rust_task *source, rust_task *target); + virtual ~rust_message(); + + /** + * We can only access the source task through a proxy, so create one + * on demand if we need it. + */ + rust_proxy *get_source_proxy(); + + /** + * Processes the message in the target domain thread. + */ + virtual void process(); +}; + +/** + * Notify messages are simple argument-less messages. + */ +class notify_message : public rust_message { +public: + enum notification_type { + KILL, JOIN, WAKEUP + }; + + const notification_type type; + + notify_message(notification_type type, const char* label, + rust_task *source, rust_task *target); + + void process(); + + /** + * This code executes in the sending domain's thread. + */ + static void + send(notification_type type, const char* label, rust_task *source, + rust_proxy *target); +}; + +// +// Local Variables: +// mode: C++ +// fill-column: 78; +// indent-tabs-mode: nil +// c-basic-offset: 4 +// buffer-file-coding-system: utf-8-unix +// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'"; +// End: +// + +#endif /* RUST_MESSAGE_H */ diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 189824ac..b3caac25 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -385,6 +385,24 @@ rust_task::notify_waiting_tasks() } } +void +rust_task::notify_tasks_waiting_to_join() { + while (tasks_waiting_to_join.is_empty() == false) { + log(rust_log::ALL, "notify_tasks_waiting_to_join: %d", + tasks_waiting_to_join.size()); + maybe_proxy *waiting_task = tasks_waiting_to_join.pop(); + if (waiting_task->is_proxy()) { + notify_message::send(notify_message::WAKEUP, "wakeup", + this, waiting_task->as_proxy()); + } else { + rust_task *task = waiting_task->delegate(); + if (task->dead() == false) { + task->wakeup(this); + } + } + } +} + uintptr_t rust_task::get_fp() { // sp in any suspended task points to the last callee-saved reg on diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index 0c723a9d..34553b6c 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -4,6 +4,10 @@ #ifndef RUST_TASK_H #define RUST_TASK_H + +#include "util/array_list.h" + + struct rust_task : public maybe_proxy, public dom_owned @@ -35,6 +39,9 @@ rust_task : public maybe_proxy, // that location before waking us up. uintptr_t* rendezvous_ptr; + // List of tasks waiting for this task to finish. + array_list *> tasks_waiting_to_join; + rust_alarm alarm; rust_task(rust_dom *dom, @@ -95,6 +102,7 @@ rust_task : public maybe_proxy, // Notify tasks waiting for us that we are about to die. void notify_waiting_tasks(); + void notify_tasks_waiting_to_join(); uintptr_t get_fp(); uintptr_t get_previous_fp(uintptr_t fp); diff --git a/src/rt/rust_upcall.cpp b/src/rt/rust_upcall.cpp index e4604049..42203bec 100644 --- a/src/rt/rust_upcall.cpp +++ b/src/rt/rust_upcall.cpp @@ -122,19 +122,25 @@ extern "C" CDECL void upcall_yield(rust_task *task) { } extern "C" CDECL void -upcall_join(rust_task *task, maybe_proxy *proxy) { +upcall_join(rust_task *task, maybe_proxy *target) { LOG_UPCALL_ENTRY(task); task->log(rust_log::UPCALL | rust_log::COMM, - "join proxy 0x%" PRIxPTR " -> task = 0x%" PRIxPTR, - proxy, proxy->delegate()); - - rust_task *other = proxy->delegate(); - - // If the other task is already dying, we don't have to wait for it. - if (!other->dead()) { - other->waiting_tasks.push(&task->alarm); - task->block(other); + "target: 0x%" PRIxPTR ", task: 0x%" PRIxPTR, + target, target->delegate()); + + rust_task *target_task = target->delegate(); + if (target->is_proxy()) { + notify_message:: + send(notify_message::JOIN, "join", task, target->as_proxy()); + task->block(target_task); task->yield(2); + } else { + // If the other task is already dying, we don't have to wait for it. + if (target_task->dead() == false) { + target_task->tasks_waiting_to_join.push(task); + task->block(target_task); + task->yield(2); + } } } @@ -194,22 +200,20 @@ extern "C" CDECL void upcall_fail(rust_task *task, char const *expr, * Called whenever a task's ref count drops to zero. */ extern "C" CDECL void -upcall_kill(rust_task *task, maybe_proxy *target_proxy) { +upcall_kill(rust_task *task, maybe_proxy *target) { LOG_UPCALL_ENTRY(task); - rust_task *target_task = target_proxy->delegate(); - if (target_proxy != target_task) { - task->dom->free(target_proxy); - } + rust_task *target_task = target->delegate(); + task->log(rust_log::UPCALL | rust_log::TASK, "kill task 0x%" PRIxPTR ", ref count %d", target_task, target_task->ref_count); - if (requires_message_passing(task, target_task)) { - rust_dom *target_domain = target_task->dom; - target_domain->send_message( - new (target_domain) - kill_task_message(target_domain, target_task)); + if (target->is_proxy()) { + notify_message:: + send(notify_message::KILL, "kill", task, target->as_proxy()); + // The proxy ref_count dropped to zero, delete it here. + delete target->as_proxy(); } else { target_task->kill(); } @@ -224,7 +228,7 @@ upcall_exit(rust_task *task) { task->log(rust_log::UPCALL | rust_log::TASK, "task ref_count: %d", task->ref_count); task->die(); - task->notify_waiting_tasks(); + task->notify_tasks_waiting_to_join(); task->yield(1); } -- cgit v1.2.3